Skip to main content

LangGraph Integration

Build LangGraph workflows with payment capabilities using the AgentPay payment kernel.

Overview

LangGraph is a library for building stateful, multi-actor applications with LLMs. This guide shows you how to create custom nodes that integrate AgentPay's payment capabilities into your workflows.

Installation

pip install langgraph agentpay-sdk

Building Payment Nodes

1. Create Payment Node Class

from langgraph.graph import StateGraph, END
from agentpay import AgentPaySDK
from typing import TypedDict, Dict, Any
import os

class PaymentNode:
def __init__(self, api_key: str):
self.agentpay = AgentPaySDK(api_key=api_key)

def send_payment(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""Send a payment and update workflow state"""
try:
result = self.agentpay.payments.send(
to=state.get("recipient"),
amount=state.get("payment_amount"),
currency=state.get("currency", "USD")
)

return {
**state,
"payment_status": "completed",
"transaction_id": result["transaction_id"],
"messages": state.get("messages", []) + [
f"Payment of {state['payment_amount']} sent to {state['recipient']}"
]
}
except Exception as e:
return {
**state,
"payment_status": "failed",
"error": str(e),
"messages": state.get("messages", []) + [f"Payment failed: {str(e)}"]
}

def check_balance(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""Check wallet balance and update state"""
try:
balance = self.agentpay.wallet.get_balance()
return {
**state,
"wallet_balance": balance["amount"],
"messages": state.get("messages", []) + [
f"Current balance: {balance['amount']} {balance['currency']}"
]
}
except Exception as e:
return {
**state,
"error": str(e),
"messages": state.get("messages", []) + [f"Balance check failed: {str(e)}"]
}

2. Define Workflow State

class WorkflowState(TypedDict):
messages: list
payment_amount: float
recipient: str
currency: str
payment_status: str
transaction_id: str
wallet_balance: float
error: str

3. Build Your Workflow

def create_payment_workflow():
# Initialize payment node
payment_node = PaymentNode(api_key=os.getenv("AGENTPAY_API_KEY"))

# Create workflow graph
workflow = StateGraph(WorkflowState)

# Add nodes
workflow.add_node("check_balance", payment_node.check_balance)
workflow.add_node("send_payment", payment_node.send_payment)
workflow.add_node("handle_result", handle_payment_result)

# Define edges
workflow.set_entry_point("check_balance")
workflow.add_edge("check_balance", "send_payment")
workflow.add_edge("send_payment", "handle_result")
workflow.add_edge("handle_result", END)

return workflow.compile()

def handle_payment_result(state: WorkflowState) -> WorkflowState:
"""Process payment result and determine next steps"""
if state.get("payment_status") == "completed":
return {
**state,
"messages": state["messages"] + ["Payment workflow completed successfully"]
}
else:
return {
**state,
"messages": state["messages"] + ["Payment workflow failed, may need retry"]
}

4. Execute Payment Workflow

# Create and run workflow
workflow = create_payment_workflow()

initial_state = {
"messages": [],
"payment_amount": 10.0,
"recipient": "0x742d35Cc6C6C27f5CC6666B6A3C9d4d5C2b3F",
"currency": "USD",
"payment_status": "",
"transaction_id": "",
"wallet_balance": 0.0,
"error": ""
}

result = workflow.invoke(initial_state)
print("Workflow completed:", result)

Advanced Patterns

Conditional Payment Routing

def conditional_payment_router(state: WorkflowState) -> str:
"""Route based on payment conditions"""
if state["wallet_balance"] < state["payment_amount"]:
return "insufficient_funds"
elif state["payment_amount"] > 100:
return "large_payment_approval"
else:
return "standard_payment"

# Add conditional routing to workflow
workflow.add_conditional_edges(
"check_balance",
conditional_payment_router,
{
"insufficient_funds": "handle_insufficient_funds",
"large_payment_approval": "request_approval",
"standard_payment": "send_payment"
}
)

Core Components

PaymentNode

Handles x402 API payments within workflow steps:

from paystabl_langgraph import PaymentNode

def create_research_node():
return PaymentNode(
agent_id="research_workflow",
name="premium_research",
description="Access premium research databases",
config={
"api_url": "https://research-api.com/search",
"method": "POST",
"headers": {"Content-Type": "application/json"},
"max_retries": 3,
"timeout": 30
}
)

# Add to workflow
workflow.add_node("research", create_research_node())

# Define the node behavior
def research_step(state: WorkflowState):
research_node = workflow.get_node("research")

result = research_node.execute({
"query": state["query"],
"depth": "comprehensive"
})

return {
"results": result["data"],
"payment_receipts": state["payment_receipts"] + [result["receipt"]]
}

AgentPaymentNode

Facilitates payments between agents in your workflow:

from paystabl_langgraph import AgentPaymentNode

def create_collaboration_node():
return AgentPaymentNode(
from_agent="orchestrator",
to_agent="specialist_agent",
payment_logic="dynamic", # or "fixed"
base_amount="3.00",
quality_multiplier=True
)

# Usage in workflow
def delegate_task(state: WorkflowState):
payment_node = workflow.get_node("collaboration")

# Calculate payment based on task complexity
complexity_score = analyze_task_complexity(state["task"])
payment_amount = payment_node.calculate_payment(complexity_score)

# Pay the specialist
payment_result = payment_node.execute({
"amount": payment_amount,
"purpose": f"Task delegation: {state['task'][:50]}...",
"metadata": {
"complexity": complexity_score,
"task_id": state["task_id"]
}
})

return {
"delegation_receipt": payment_result,
"payment_receipts": state["payment_receipts"] + [payment_result]
}

Advanced Workflows

Multi-Agent Research Pipeline

from langgraph import StateGraph
from paystabl_langgraph import PaymentNode, AgentPaymentNode, ConditionalPaymentNode

class ResearchState(TypedDict):
topic: str
research_data: dict
analysis: dict
summary: dict
total_cost: float
payment_log: list

def create_research_pipeline():
workflow = StateGraph(ResearchState)

# Step 1: Premium data collection
workflow.add_node("data_collection", PaymentNode(
agent_id="research_coordinator",
api_url="https://academic-api.com/search",
price_calculator=lambda query: len(query.split()) * 0.10, # Dynamic pricing
retry_policy={
"max_retries": 3,
"backoff_factor": 2,
"retry_on_payment_fail": True
}
))

# Step 2: Pay specialist for analysis
workflow.add_node("specialist_analysis", AgentPaymentNode(
from_agent="research_coordinator",
to_agent="domain_expert",
payment_logic="quality_based",
base_amount="8.00",
quality_threshold=0.8
))

# Step 3: Conditional payment for additional resources
workflow.add_node("additional_resources", ConditionalPaymentNode(
condition=lambda state: state["analysis"]["confidence"] < 0.9,
payment_node=PaymentNode(
agent_id="research_coordinator",
api_url="https://expert-insights.com/api",
amount="12.00"
)
))

# Step 4: Final summarization
workflow.add_node("summarization", AgentPaymentNode(
from_agent="research_coordinator",
to_agent="summary_specialist",
amount="4.00"
))

# Define workflow edges
workflow.add_edge("data_collection", "specialist_analysis")
workflow.add_edge("specialist_analysis", "additional_resources")
workflow.add_edge("additional_resources", "summarization")

return workflow.compile()

# Execute the workflow
research_workflow = create_research_pipeline()

result = research_workflow.invoke({
"topic": "AI agent payment systems",
"total_cost": 0.0,
"payment_log": []
})

print(f"Research completed. Total cost: ${result['total_cost']}")

Dynamic Pricing Workflow

class DynamicPricingWorkflow:
def __init__(self):
self.workflow = StateGraph(self.WorkflowState)
self.setup_nodes()

class WorkflowState(TypedDict):
task: str
complexity: float
urgency: float
quality_required: float
budget: float
results: dict

def calculate_dynamic_price(self, state):
base_price = 5.00
complexity_multiplier = 1 + (state["complexity"] - 0.5) * 2
urgency_multiplier = 1 + (state["urgency"] - 0.5) * 1.5
quality_multiplier = 1 + (state["quality_required"] - 0.5) * 1.2

final_price = base_price * complexity_multiplier * urgency_multiplier * quality_multiplier
return min(final_price, state["budget"]) # Respect budget constraints

def setup_nodes(self):
# Dynamic pricing node
self.workflow.add_node("price_calculation", self.calculate_price_step)

# Payment execution node
self.workflow.add_node("execute_payment", PaymentNode(
agent_id="dynamic_coordinator",
payment_calculator=self.calculate_dynamic_price,
fallback_options=[
{"agent": "budget_specialist", "max_amount": "3.00"},
{"agent": "standard_service", "max_amount": "1.50"}
]
))

# Quality verification
self.workflow.add_node("verify_quality", self.quality_check_step)

# Conditional refund/bonus
self.workflow.add_node("final_payment", ConditionalPaymentNode(
conditions=[
{
"condition": lambda state: state["results"]["quality_score"] > 0.95,
"action": "bonus_payment",
"amount": "2.00"
},
{
"condition": lambda state: state["results"]["quality_score"] < 0.6,
"action": "partial_refund",
"amount": "1.50"
}
]
))

def calculate_price_step(self, state):
price = self.calculate_dynamic_price(state)
return {"calculated_price": price}

def quality_check_step(self, state):
# Simulate quality assessment
quality_score = evaluate_result_quality(state["results"])
return {"quality_score": quality_score}

Payment Strategies

Batch Processing

Handle multiple payments efficiently:

from paystabl_langgraph import BatchPaymentNode

def create_batch_processor():
return BatchPaymentNode(
agent_id="batch_coordinator",
batch_size=10,
parallel_processing=True,
error_handling="continue", # or "abort", "retry"
payment_configs=[
{
"type": "api_payment",
"url": "https://service1.com/api",
"amount": "1.00"
},
{
"type": "agent_payment",
"to_agent": "processor_agent",
"amount": "2.50"
}
]
)

# Usage
def batch_processing_step(state):
batch_node = workflow.get_node("batch_processor")

results = batch_node.execute_batch([
{"data": item, "payment_type": "api_payment"}
for item in state["batch_items"]
])

return {
"processed_items": [r["result"] for r in results],
"payment_receipts": [r["receipt"] for r in results],
"failed_items": [r for r in results if r["status"] == "failed"]
}

Subscription Management

Handle recurring payments within workflows:

from paystabl_langgraph import SubscriptionNode

class SubscriptionWorkflow:
def __init__(self):
self.subscription_node = SubscriptionNode(
agent_id="subscription_manager",
payment_schedule="monthly",
auto_renewal=True,
billing_preferences={
"currency": "USD",
"payment_method": "agent_wallet",
"notification_threshold": "5.00" # Notify when balance is low
}
)

def setup_subscription(self, service_agent, monthly_fee):
return self.subscription_node.create_subscription({
"to_agent": service_agent,
"amount": monthly_fee,
"service_name": f"Monthly service from {service_agent}",
"auto_renew": True,
"cancellation_policy": "end_of_billing_period"
})

def manage_subscriptions_step(self, state):
active_subscriptions = self.subscription_node.get_active_subscriptions()

for subscription in active_subscriptions:
if subscription["next_payment"] <= datetime.now():
payment_result = self.subscription_node.process_renewal(subscription["id"])
state["payment_log"].append(payment_result)

return state

Error Handling and Recovery

Robust Payment Handling

from paystabl_langgraph import RobustPaymentNode

def create_fault_tolerant_node():
return RobustPaymentNode(
agent_id="resilient_agent",
retry_strategies=[
{
"error_type": "insufficient_funds",
"action": "request_funding",
"max_attempts": 3,
"delay": 60 # Wait 60 seconds between attempts
},
{
"error_type": "network_error",
"action": "exponential_backoff",
"max_attempts": 5,
"initial_delay": 1
},
{
"error_type": "policy_violation",
"action": "escalate_approval",
"approval_agent": "human_supervisor"
}
],
fallback_options=[
{"service": "budget_alternative", "max_cost": "1.00"},
{"service": "free_tier", "max_cost": "0.00"}
]
)

def resilient_payment_step(state):
payment_node = workflow.get_node("resilient_payment")

try:
result = payment_node.execute_with_recovery({
"primary_service": "premium_api",
"backup_services": ["standard_api", "basic_api"],
"budget_constraint": state["max_budget"]
})

return {
"service_used": result["service"],
"data": result["response"],
"cost": result["amount"],
"fallback_used": result["fallback_used"]
}

except PaymentException as e:
return {
"error": str(e),
"suggested_action": e.recovery_suggestion,
"cost": 0
}

Integration Examples

Research Assistant Workflow

# Complete research workflow with payments
def create_research_assistant():
workflow = StateGraph(ResearchState)

# Literature search with payment
workflow.add_node("literature_search", PaymentNode(
agent_id="research_assistant",
api_url="https://academic-search.com/api",
price_per_query="3.50"
))

# Expert consultation via agent payment
workflow.add_node("expert_consultation", AgentPaymentNode(
from_agent="research_assistant",
to_agent="domain_expert",
amount_calculator=lambda topic: expertise_pricing[topic]
))

# Data analysis service
workflow.add_node("data_analysis", PaymentNode(
agent_id="research_assistant",
api_url="https://analysis-service.com/api",
payment_per_dataset="5.00"
))

# Final report generation
workflow.add_node("report_generation", AgentPaymentNode(
from_agent="research_assistant",
to_agent="technical_writer",
quality_based_payment=True,
base_amount="10.00"
))

return workflow.compile()

Customer Service Workflow

def create_customer_service_workflow():
workflow = StateGraph(CustomerServiceState)

# Sentiment analysis via paid API
workflow.add_node("sentiment_analysis", PaymentNode(
agent_id="customer_service",
api_url="https://sentiment-api.com/analyze",
price_per_request="0.25"
))

# Escalate to human agent (paid consultation)
workflow.add_node("human_escalation", ConditionalPaymentNode(
condition=lambda state: state["sentiment"]["anger"] > 0.8,
payment_node=AgentPaymentNode(
from_agent="customer_service",
to_agent="human_specialist",
amount="15.00",
priority="urgent"
)
))

# Knowledge base lookup (paid premium search)
workflow.add_node("knowledge_search", PaymentNode(
agent_id="customer_service",
api_url="https://knowledge-api.com/search",
pricing_tiers={
"basic": "0.50",
"advanced": "1.50",
"premium": "3.00"
}
))

return workflow.compile()

Best Practices

Cost Optimization

# Implement cost tracking and optimization
class CostOptimizedWorkflow:
def __init__(self, daily_budget=100.00):
self.daily_budget = daily_budget
self.current_spend = 0.0
self.cost_tracker = {}

def track_cost(self, node_name, amount):
self.current_spend += float(amount)
self.cost_tracker[node_name] = self.cost_tracker.get(node_name, 0) + float(amount)

def check_budget(self, proposed_amount):
return (self.current_spend + float(proposed_amount)) <= self.daily_budget

def get_cost_summary(self):
return {
"total_spend": self.current_spend,
"remaining_budget": self.daily_budget - self.current_spend,
"spend_by_node": self.cost_tracker,
"budget_utilization": (self.current_spend / self.daily_budget) * 100
}

Quality Assurance

def implement_quality_gates():
return QualityGateNode(
quality_checks=[
{
"check": "response_completeness",
"threshold": 0.8,
"action_if_fail": "request_revision"
},
{
"check": "accuracy_score",
"threshold": 0.9,
"action_if_fail": "escalate_to_expert"
}
],
payment_adjustments={
"high_quality": "+20%",
"poor_quality": "-30%",
"revision_required": "-10%"
}
)

Monitoring and Analytics

Payment Analytics

from paystabl_langgraph import PaymentAnalytics

analytics = PaymentAnalytics(workflow)

# Get workflow cost analysis
cost_analysis = analytics.get_cost_breakdown(
time_period="last_30_days",
group_by=["node", "agent", "api"]
)

# Performance metrics
performance = analytics.get_performance_metrics([
"average_cost_per_execution",
"cost_per_successful_outcome",
"payment_failure_rate",
"most_expensive_nodes"
])

# Budget tracking
budget_status = analytics.get_budget_status([
"daily_spend",
"weekly_trend",
"projected_monthly_cost",
"cost_optimization_suggestions"
])

Support