Skip to main content

LangGraph Payment Retry Example

This example demonstrates how to implement robust payment retry logic in LangGraph workflows using PayStabl. It shows how to handle payment failures gracefully and implement different retry strategies.

Overview

This workflow implements:

  • Payment retry mechanisms for failed transactions
  • Exponential backoff for network-related failures
  • Alternative payment paths when primary methods fail
  • Fallback services when expensive options are unavailable
  • Cost optimization through smart retry logic

Basic Retry Workflow

from langgraph import StateGraph
from paystabl import PayStablAgent
import asyncio
from typing import TypedDict, List
from enum import Enum

class RetryStrategy(Enum):
EXPONENTIAL_BACKOFF = "exponential"
LINEAR_BACKOFF = "linear"
IMMEDIATE = "immediate"
ALTERNATIVE_SERVICE = "alternative"

class WorkflowState(TypedDict):
task: str
payment_attempts: List[dict]
current_service: str
retry_count: int
max_retries: int
fallback_services: List[str]
result: dict
error: str
total_cost: float

class PaymentRetryWorkflow:
def __init__(self, agent_id: str):
self.agent = PayStablAgent(
agent_id=agent_id,
network="base-sepolia"
)
self.workflow = self.build_workflow()

def build_workflow(self) -> StateGraph:
workflow = StateGraph(WorkflowState)

# Core workflow nodes
workflow.add_node("validate_request", self.validate_request)
workflow.add_node("attempt_payment", self.attempt_payment)
workflow.add_node("handle_failure", self.handle_payment_failure)
workflow.add_node("retry_with_backoff", self.retry_with_backoff)
workflow.add_node("try_alternative", self.try_alternative_service)
workflow.add_node("use_fallback", self.use_fallback_service)
workflow.add_node("complete_task", self.complete_task)
workflow.add_node("final_failure", self.handle_final_failure)

# Define workflow edges
workflow.add_edge("validate_request", "attempt_payment")
workflow.add_conditional_edges(
"attempt_payment",
self.route_payment_result,
{
"success": "complete_task",
"retry": "handle_failure",
"failed": "final_failure"
}
)
workflow.add_conditional_edges(
"handle_failure",
self.route_failure_handling,
{
"backoff": "retry_with_backoff",
"alternative": "try_alternative",
"fallback": "use_fallback",
"give_up": "final_failure"
}
)
workflow.add_edge("retry_with_backoff", "attempt_payment")
workflow.add_edge("try_alternative", "attempt_payment")
workflow.add_edge("use_fallback", "attempt_payment")

return workflow.compile()

async def validate_request(self, state: WorkflowState) -> WorkflowState:
"""Validate the payment request and set up retry parameters"""

# Initialize retry parameters
state["payment_attempts"] = []
state["retry_count"] = 0
state["max_retries"] = 5
state["total_cost"] = 0.0

# Define service hierarchy (expensive to cheap)
state["fallback_services"] = [
"premium_ai_service", # $10.00 - Best quality
"standard_ai_service", # $5.00 - Good quality
"basic_ai_service", # $2.00 - Adequate quality
"free_service" # $0.00 - Basic quality
]

# Start with the best service
state["current_service"] = state["fallback_services"][0]

print(f"🔍 Validating request for task: {state['task']}")
print(f"🎯 Starting with service: {state['current_service']}")

return state

async def attempt_payment(self, state: WorkflowState) -> WorkflowState:
"""Attempt payment for the current service"""

service_config = self.get_service_config(state["current_service"])
attempt_info = {
"attempt": state["retry_count"] + 1,
"service": state["current_service"],
"amount": service_config["cost"],
"timestamp": datetime.now().isoformat()
}

print(f"💳 Payment attempt #{attempt_info['attempt']} for {service_config['name']}")
print(f"💰 Cost: ${service_config['cost']}")

try:
# Check if this is a free service
if service_config["cost"] == "0.00":
state["result"] = await self.call_free_service(service_config, state["task"])
attempt_info["status"] = "success"
attempt_info["method"] = "free_service"
else:
# Attempt paid service
payment_result = await self.agent.pay_api_endpoint(
url=service_config["url"],
method="POST",
data={"task": state["task"]},
timeout=30
)

state["result"] = payment_result["data"]
state["total_cost"] += float(service_config["cost"])

attempt_info["status"] = "success"
attempt_info["tx_hash"] = payment_result["receipt"]["txHash"]
attempt_info["method"] = "paystabl_payment"

print(f"✅ Payment successful!")

except Exception as error:
print(f"❌ Payment failed: {error}")

attempt_info["status"] = "failed"
attempt_info["error"] = str(error)
attempt_info["error_code"] = getattr(error, 'code', 'UNKNOWN')

state["error"] = str(error)

state["payment_attempts"].append(attempt_info)
return state

def route_payment_result(self, state: WorkflowState) -> str:
"""Route based on payment result"""

last_attempt = state["payment_attempts"][-1]

if last_attempt["status"] == "success":
return "success"
elif state["retry_count"] < state["max_retries"]:
return "retry"
else:
return "failed"

async def handle_payment_failure(self, state: WorkflowState) -> WorkflowState:
"""Analyze failure and determine retry strategy"""

last_attempt = state["payment_attempts"][-1]
error_code = last_attempt.get("error_code", "UNKNOWN")

print(f"🔍 Analyzing failure: {error_code}")

# Determine retry strategy based on error type
if error_code == "INSUFFICIENT_FUNDS":
# Try cheaper alternative
state["retry_strategy"] = RetryStrategy.ALTERNATIVE_SERVICE
print("💡 Strategy: Try cheaper service")

elif error_code == "NETWORK_ERROR":
# Network issues - use exponential backoff
state["retry_strategy"] = RetryStrategy.EXPONENTIAL_BACKOFF
print("💡 Strategy: Exponential backoff retry")

elif error_code == "API_UNAVAILABLE":
# Service down - try alternative
state["retry_strategy"] = RetryStrategy.ALTERNATIVE_SERVICE
print("💡 Strategy: Switch to alternative service")

elif error_code == "RATE_LIMITED":
# Rate limited - wait and retry
state["retry_strategy"] = RetryStrategy.LINEAR_BACKOFF
print("💡 Strategy: Linear backoff retry")

else:
# Unknown error - try fallback
state["retry_strategy"] = RetryStrategy.ALTERNATIVE_SERVICE
print("💡 Strategy: Use fallback service")

return state

def route_failure_handling(self, state: WorkflowState) -> str:
"""Route failure handling based on strategy"""

strategy = state["retry_strategy"]

if strategy == RetryStrategy.EXPONENTIAL_BACKOFF:
return "backoff"
elif strategy == RetryStrategy.LINEAR_BACKOFF:
return "backoff"
elif strategy == RetryStrategy.ALTERNATIVE_SERVICE:
# Check if we have alternatives
current_index = state["fallback_services"].index(state["current_service"])
if current_index < len(state["fallback_services"]) - 1:
return "alternative"
else:
return "fallback"
else:
return "give_up"

async def retry_with_backoff(self, state: WorkflowState) -> WorkflowState:
"""Implement backoff retry logic"""

strategy = state["retry_strategy"]
state["retry_count"] += 1

if strategy == RetryStrategy.EXPONENTIAL_BACKOFF:
# Exponential backoff: 2^attempt seconds
delay = 2 ** state["retry_count"]
print(f"⏳ Exponential backoff: waiting {delay} seconds...")

elif strategy == RetryStrategy.LINEAR_BACKOFF:
# Linear backoff: attempt * 5 seconds
delay = state["retry_count"] * 5
print(f"⏳ Linear backoff: waiting {delay} seconds...")

else:
delay = 1

# Wait for the calculated delay
await asyncio.sleep(min(delay, 60)) # Cap at 60 seconds

print(f"🔄 Retry attempt #{state['retry_count']} starting...")
return state

async def try_alternative_service(self, state: WorkflowState) -> WorkflowState:
"""Switch to the next service in the fallback chain"""

current_index = state["fallback_services"].index(state["current_service"])
next_index = current_index + 1

if next_index < len(state["fallback_services"]):
old_service = state["current_service"]
state["current_service"] = state["fallback_services"][next_index]
state["retry_count"] = 0 # Reset retry count for new service

print(f"🔄 Switching from {old_service} to {state['current_service']}")

# Log the service switch
switch_info = {
"type": "service_switch",
"from": old_service,
"to": state["current_service"],
"reason": "payment_failure",
"timestamp": datetime.now().isoformat()
}
state["payment_attempts"].append(switch_info)
else:
print("❌ No more alternative services available")
state["error"] = "All services exhausted"

return state

async def use_fallback_service(self, state: WorkflowState) -> WorkflowState:
"""Use the final fallback service (usually free)"""

state["current_service"] = state["fallback_services"][-1] # Last service (free)
state["retry_count"] = 0

print(f"🆘 Using final fallback: {state['current_service']}")

return state

async def complete_task(self, state: WorkflowState) -> WorkflowState:
"""Task completed successfully"""

service_config = self.get_service_config(state["current_service"])

print(f"✅ Task completed successfully!")
print(f"📊 Service used: {service_config['name']}")
print(f"💰 Total cost: ${state['total_cost']}")
print(f"🔄 Total attempts: {len(state['payment_attempts'])}")

# Generate completion summary
state["completion_summary"] = {
"success": True,
"service_used": state["current_service"],
"total_cost": state["total_cost"],
"attempts": len(state["payment_attempts"]),
"retry_count": state["retry_count"]
}

return state

async def handle_final_failure(self, state: WorkflowState) -> WorkflowState:
"""Handle final failure when all retries exhausted"""

print(f"❌ Task failed after all retry attempts")
print(f"🔄 Total attempts: {len(state['payment_attempts'])}")
print(f"💰 Total cost (partial): ${state['total_cost']}")

# Generate failure summary
state["completion_summary"] = {
"success": False,
"final_error": state.get("error", "Unknown error"),
"total_cost": state["total_cost"],
"attempts": len(state["payment_attempts"]),
"services_tried": list(set([
attempt.get("service") for attempt in state["payment_attempts"]
if attempt.get("service")
]))
}

return state

def get_service_config(self, service_name: str) -> dict:
"""Get configuration for a service"""

configs = {
"premium_ai_service": {
"name": "Premium AI Service",
"url": "https://premium-ai.com/api/analyze",
"cost": "10.00",
"quality": "excellent",
"features": ["advanced_reasoning", "multimodal", "real_time"]
},
"standard_ai_service": {
"name": "Standard AI Service",
"url": "https://standard-ai.com/api/process",
"cost": "5.00",
"quality": "good",
"features": ["standard_reasoning", "text_only"]
},
"basic_ai_service": {
"name": "Basic AI Service",
"url": "https://basic-ai.com/api/simple",
"cost": "2.00",
"quality": "adequate",
"features": ["basic_processing"]
},
"free_service": {
"name": "Free Community Service",
"url": "https://free-ai.com/api/community",
"cost": "0.00",
"quality": "basic",
"features": ["limited_processing"]
}
}

return configs.get(service_name, configs["free_service"])

async def call_free_service(self, service_config: dict, task: str) -> dict:
"""Simulate calling a free service"""

print(f"🆓 Using free service: {service_config['name']}")

# Simulate processing delay
await asyncio.sleep(2)

return {
"result": f"Basic analysis of: {task}",
"quality": "basic",
"service": service_config["name"],
"cost": 0.00
}

# Usage example
async def run_retry_example():
"""Run the payment retry workflow example"""

workflow = PaymentRetryWorkflow("retry_demo_agent")

# Example task that might require multiple retry attempts
initial_state = {
"task": "Analyze customer sentiment from 10,000 reviews and generate insights"
}

print("🚀 Starting Payment Retry Workflow Demo")
print("=" * 50)

try:
final_state = await workflow.workflow.ainvoke(initial_state)

print("\n📋 Final Results:")
print("=" * 20)

if final_state["completion_summary"]["success"]:
print("✅ Workflow completed successfully")
print(f"📊 Result: {final_state['result']['result']}")
else:
print("❌ Workflow failed")
print(f"🔍 Final error: {final_state['completion_summary']['final_error']}")

# Display payment attempt log
print(f"\n💳 Payment Attempts Log:")
for i, attempt in enumerate(final_state["payment_attempts"], 1):
if attempt.get("type") == "service_switch":
print(f" {i}. 🔄 Service switch: {attempt['from']}{attempt['to']}")
else:
status_icon = "✅" if attempt["status"] == "success" else "❌"
print(f" {i}. {status_icon} {attempt['service']}: ${attempt['amount']} ({attempt['status']})")

print(f"\n💰 Total Cost: ${final_state['total_cost']}")

except Exception as error:
print(f"❌ Workflow execution failed: {error}")

# Advanced retry strategies
class AdvancedRetryStrategies:
"""Advanced retry strategies for different scenarios"""

@staticmethod
async def adaptive_retry(workflow_state: WorkflowState, failure_history: List[dict]) -> RetryStrategy:
"""Choose retry strategy based on historical failure patterns"""

# Analyze recent failures
recent_failures = failure_history[-5:] # Last 5 failures
error_counts = {}

for failure in recent_failures:
error_code = failure.get("error_code", "UNKNOWN")
error_counts[error_code] = error_counts.get(error_code, 0) + 1

# Choose strategy based on predominant error type
if error_counts.get("NETWORK_ERROR", 0) > 2:
return RetryStrategy.EXPONENTIAL_BACKOFF
elif error_counts.get("RATE_LIMITED", 0) > 1:
return RetryStrategy.LINEAR_BACKOFF
elif error_counts.get("INSUFFICIENT_FUNDS", 0) > 0:
return RetryStrategy.ALTERNATIVE_SERVICE
else:
return RetryStrategy.EXPONENTIAL_BACKOFF

@staticmethod
async def cost_aware_retry(
workflow_state: WorkflowState,
budget_limit: float
) -> bool:
"""Determine if retry should continue based on cost constraints"""

remaining_budget = budget_limit - workflow_state["total_cost"]
next_service_cost = 0.0

# Calculate cost of next retry attempt
current_service_index = workflow_state["fallback_services"].index(
workflow_state["current_service"]
)

if current_service_index < len(workflow_state["fallback_services"]) - 1:
next_service = workflow_state["fallback_services"][current_service_index + 1]
next_service_cost = float(
PaymentRetryWorkflow(None).get_service_config(next_service)["cost"]
)

return remaining_budget >= next_service_cost

@staticmethod
async def quality_threshold_retry(
workflow_state: WorkflowState,
minimum_quality: str
) -> bool:
"""Determine if retry should continue based on quality requirements"""

quality_hierarchy = ["basic", "adequate", "good", "excellent"]
min_quality_index = quality_hierarchy.index(minimum_quality)

current_service_config = PaymentRetryWorkflow(None).get_service_config(
workflow_state["current_service"]
)
current_quality_index = quality_hierarchy.index(
current_service_config["quality"]
)

# Continue retrying if current service quality is below threshold
return current_quality_index < min_quality_index

# Monitoring and analytics for retry workflows
class RetryAnalytics:
"""Analytics for payment retry workflows"""

def __init__(self):
self.retry_stats = {}

def analyze_retry_patterns(self, workflow_states: List[WorkflowState]) -> dict:
"""Analyze retry patterns across multiple workflow executions"""

analysis = {
"total_workflows": len(workflow_states),
"success_rate": 0,
"average_retries": 0,
"common_failure_points": {},
"cost_analysis": {
"average_cost": 0,
"cost_by_service": {},
"cost_efficiency": 0
},
"service_reliability": {}
}

successful_workflows = 0
total_retries = 0
total_cost = 0

for state in workflow_states:
if state.get("completion_summary", {}).get("success"):
successful_workflows += 1

total_retries += state.get("retry_count", 0)
total_cost += state.get("total_cost", 0)

# Analyze failure points
for attempt in state.get("payment_attempts", []):
if attempt.get("status") == "failed":
error_code = attempt.get("error_code", "UNKNOWN")
analysis["common_failure_points"][error_code] = \
analysis["common_failure_points"].get(error_code, 0) + 1

# Service reliability
service = attempt.get("service")
if service:
if service not in analysis["service_reliability"]:
analysis["service_reliability"][service] = {
"attempts": 0,
"successes": 0,
"reliability": 0
}

analysis["service_reliability"][service]["attempts"] += 1
if attempt.get("status") == "success":
analysis["service_reliability"][service]["successes"] += 1

# Calculate metrics
analysis["success_rate"] = successful_workflows / len(workflow_states) * 100
analysis["average_retries"] = total_retries / len(workflow_states)
analysis["cost_analysis"]["average_cost"] = total_cost / len(workflow_states)

# Calculate service reliability percentages
for service, stats in analysis["service_reliability"].items():
if stats["attempts"] > 0:
stats["reliability"] = (stats["successes"] / stats["attempts"]) * 100

return analysis

def generate_retry_recommendations(self, analysis: dict) -> List[str]:
"""Generate recommendations based on retry analysis"""

recommendations = []

# Success rate recommendations
if analysis["success_rate"] < 80:
recommendations.append(
"Consider increasing retry limits or improving service reliability"
)

# Cost optimization recommendations
if analysis["cost_analysis"]["average_cost"] > 5.0:
recommendations.append(
"Review service hierarchy to prioritize cost-effective options"
)

# Service reliability recommendations
unreliable_services = [
service for service, stats in analysis["service_reliability"].items()
if stats["reliability"] < 70
]

if unreliable_services:
recommendations.append(
f"Consider replacing unreliable services: {', '.join(unreliable_services)}"
)

# Common failure recommendations
if "INSUFFICIENT_FUNDS" in analysis["common_failure_points"]:
recommendations.append(
"Implement automatic wallet funding or better balance monitoring"
)

if "NETWORK_ERROR" in analysis["common_failure_points"]:
recommendations.append(
"Implement better network error handling and longer timeouts"
)

return recommendations

if __name__ == "__main__":
import asyncio
asyncio.run(run_retry_example())

Key Features Demonstrated

1. Intelligent Retry Logic

  • Exponential backoff for network errors
  • Linear backoff for rate limiting
  • Immediate alternative service switching for cost issues

2. Service Fallback Chain

  • Premium → Standard → Basic → Free service progression
  • Automatic cost optimization
  • Quality-aware service selection

3. Error Categorization

  • Different strategies for different error types
  • Historical pattern analysis
  • Adaptive retry behavior

4. Cost Management

  • Budget-aware retry decisions
  • Cost tracking across attempts
  • Service cost optimization

5. Comprehensive Logging

  • Detailed attempt tracking
  • Payment receipt management
  • Performance analytics

Usage Examples

Basic Retry Workflow

# Simple usage with default settings
workflow = PaymentRetryWorkflow("my_agent")

result = await workflow.workflow.ainvoke({
"task": "Analyze this dataset for trends and patterns"
})

print(f"Success: {result['completion_summary']['success']}")
print(f"Total cost: ${result['total_cost']}")

Advanced Configuration

# Workflow with custom retry parameters
workflow = PaymentRetryWorkflow("advanced_agent")

# Custom configuration
custom_state = {
"task": "Generate comprehensive market analysis",
"max_retries": 3,
"budget_limit": 15.00,
"quality_threshold": "good",
"preferred_services": [
"premium_analysis_service",
"standard_analysis_service",
"basic_analysis_service"
]
}

result = await workflow.workflow.ainvoke(custom_state)

Integration with Monitoring

# With analytics and monitoring
analytics = RetryAnalytics()
results = []

# Run multiple workflows
for task in task_list:
result = await workflow.workflow.ainvoke({"task": task})
results.append(result)

# Analyze patterns
analysis = analytics.analyze_retry_patterns(results)
recommendations = analytics.generate_retry_recommendations(analysis)

print("Retry Analysis:", analysis)
print("Recommendations:", recommendations)

Best Practices

1. Service Hierarchy Design

  • Order services by cost-effectiveness
  • Include at least one free fallback option
  • Balance cost vs. quality considerations

2. Error Handling Strategy

  • Categorize errors by retry-ability
  • Implement appropriate backoff strategies
  • Set reasonable retry limits

3. Cost Optimization

  • Monitor total costs across retries
  • Implement budget constraints
  • Track cost-effectiveness metrics

4. Quality Assurance

  • Define minimum quality thresholds
  • Validate results before completing
  • Implement quality-based routing

5. Monitoring and Analytics

  • Track retry patterns and success rates
  • Monitor service reliability
  • Generate actionable recommendations

Support