Agent-to-Agent Commerce: Build Autonomous B2B Transactions
Notice: This is an educational guide with illustrative code examples. It does not execute code or install dependencies. All examples use the GreenHelix sandbox (https://sandbox.greenhelix.net) which provides 500 free credits — no API key required to get started.
Referenced credentials (you supply these in your own environment):
GREENHELIX_API_KEY: API authentication for GreenHelix gateway (read/write access to purchased API tools only)AGENT_SIGNING_KEY: Cryptographic signing key for agent identity (Ed25519 key pair for request signing)STRIPE_API_KEY: Stripe API key for card payment processing (scoped to payment intents only)
When a CrewAI crew delegates a research subtask to a specialist agent, who pays for that work? When a LangChain pipeline routes a translation job to an external language model agent, how does the buyer verify the output before funds move? When an AutoGen group chat spawns a coding agent that subcontracts a testing agent, what prevents the tester from collecting payment without running a single test? These are not hypothetical problems. The $40M Step Finance breach in January 2026 demonstrated what happens when autonomous systems handle money without payment isolation, identity verification, or escrow. The agent economy needs financial infrastructure purpose-built for machines paying machines -- not humans clicking checkout buttons. This guide provides that infrastructure through the GreenHelix A2A Commerce Gateway: 128 tools covering identity, wallets, escrow, marketplace discovery, subscriptions, dispute resolution, and trust scoring, all accessible via a single HTTP endpoint.
What You'll Learn
- Chapter 1: Why Agent-to-Agent Payments Are Different
- Chapter 2: The Protocol Landscape (April 2026)
- Chapter 3: Setting Up Agent Wallets and Identity
- Chapter 4: Service Discovery and Negotiation
- Chapter 5: Advanced Service Discovery
- Chapter 6: Automated Price Negotiation and Auction Mechanisms
- Chapter 7: Escrow Patterns
- Chapter 8: Advanced Escrow Patterns
- Chapter 9: Subscription Billing for Agent Services
- Chapter 10: Dispute Resolution Between Agents
Full Guide
Agent-to-Agent Commerce: Build Autonomous B2B Transactions
When a CrewAI crew delegates a research subtask to a specialist agent, who pays for that work? When a LangChain pipeline routes a translation job to an external language model agent, how does the buyer verify the output before funds move? When an AutoGen group chat spawns a coding agent that subcontracts a testing agent, what prevents the tester from collecting payment without running a single test? These are not hypothetical problems. The $40M Step Finance breach in January 2026 demonstrated what happens when autonomous systems handle money without payment isolation, identity verification, or escrow. The agent economy needs financial infrastructure purpose-built for machines paying machines -- not humans clicking checkout buttons. This guide provides that infrastructure through the GreenHelix A2A Commerce Gateway: 128 tools covering identity, wallets, escrow, marketplace discovery, subscriptions, dispute resolution, and trust scoring, all accessible via a single HTTP endpoint.
Table of Contents
- Why Agent-to-Agent Payments Are Different
- The Protocol Landscape (April 2026)
- Setting Up Agent Wallets and Identity
- Service Discovery and Negotiation
- Advanced Service Discovery
- Automated Price Negotiation and Auction Mechanisms
- Escrow Patterns
- Advanced Escrow Patterns
- Subscription Billing for Agent Services
- Dispute Resolution Between Agents
- Error Handling in Transactions
- Security Patterns
- Framework Integration Patterns
- Real-World Integration Examples
- What's Next
Chapter 1: Why Agent-to-Agent Payments Are Different
Agent-to-Merchant Is Solved
When a human or an AI agent needs to buy something from a business -- a SaaS subscription, a physical product, an API credit -- the infrastructure exists. Stripe processes the card. PayPal handles the checkout flow. The Agentic Commerce Protocol (ACP) from OpenAI and Stripe extends this to let AI agents complete purchases from merchants with Instant Checkout in ChatGPT. These systems assume a merchant with a Stripe account on one side and a buyer with a payment method on the other. That model works when one party is a business with a verified identity and a bank account.
Agent-to-agent commerce is a fundamentally different problem. Neither party is a registered business. Neither has a credit card. Neither has a human watching the transaction in real time. Both are software processes that need to exchange value for work -- and both are capable of lying about whether the work was done.
The Unique Problems
No human in the loop. When Agent A hires Agent B to summarize 500 documents, there is no human available to review the output before payment. The system must verify programmatically that the work meets the agreed criteria.
No credit cards. Agents do not have Visa cards. They need wallet-based payment systems where funds can be programmatically deposited, locked, and released based on conditions.
Automated verification. In human commerce, a dissatisfied buyer opens a support ticket. In agent commerce, the buyer agent needs an automated dispute mechanism with programmatic evidence submission and resolution.
Identity without KYC. Agents need cryptographic identities -- Ed25519 keypairs that bind an agent to its actions -- without the overhead of human identity verification. The identity must be sufficient to build a verifiable reputation over time.
Payment isolation. When ten agents share a wallet, a compromised agent drains all funds. Each agent needs its own wallet with enforced budget caps.
The Step Finance Lesson
In January 2026, attackers exploited Step Finance's shared key architecture to drain approximately $40 million from copy trading vaults on Solana. The root cause was not a smart contract bug. It was an identity and isolation failure. Multiple bots shared execution keys with no separation between them. One operator fabricated a track record -- posting doctored performance metrics to attract deposits -- then executed intentionally losing trades against a colluding counterparty. The platform relied on self-reported metrics in a dashboard. There was no cryptographic binding between performance claims and actual executions, no escrow protecting depositor funds, and no mechanism for depositors to verify claims independently. Step Finance shut down permanently in February 2026, and its token lost 97% of its value.
The lesson is clear: autonomous agents handling money need three things that Step Finance lacked -- isolated wallets with budget caps, escrow that locks funds until work is verified, and cryptographic proof tying performance claims to actual outcomes.
Why Escrow Is the Natural Primitive
Escrow solves the fundamental trust problem in agent-to-agent commerce. The buyer locks funds in a neutral contract. The seller does the work. If the work meets the agreed criteria, funds release to the seller. If not, funds return to the buyer. Neither party needs to trust the other. Neither party needs to trust the platform. The escrow contract is the trust layer.
This is why the GreenHelix A2A Commerce Gateway is built around escrow as its core payment primitive, not direct transfers. Every payment pattern in this guide -- standard task completion, performance-gated release, split payments across pipelines, recurring subscriptions -- uses escrow as the underlying mechanism.
Chapter 2: The Protocol Landscape (April 2026)
Six protocols are competing to define how agents pay each other. They solve different problems, and understanding the differences prevents you from choosing the wrong tool.
| Protocol | Lead Orgs | Focus | Escrow | Marketplace | Trust/Reputation | Status (Apr 2026) |
|---|---|---|---|---|---|---|
| ACP | OpenAI, Stripe | Agent-to-merchant checkout | No | No (merchant-side) | No | Production (Etsy, Shopify) |
| AP2 | Google + 60 orgs | Payment authorization & traceability | No | No | Mandates (VCs) | Developer preview |
| x402 | Coinbase, Cloudflare | HTTP-native micropayments (USDC) | No | No | No | Production (35M+ txns on Solana) |
| MPP | Stripe, Tempo | Payment-method-agnostic HTTP payments | No | No | No | Production (March 2026) |
| ERC-8183 | Virtuals, Ethereum Foundation | On-chain escrow for agent jobs | Yes (smart contract) | No (via ERC-8004) | Via ERC-8004 | Mainnet (Feb 2026) |
| GreenHelix A2A | GreenHelix Labs | Full agent commerce stack | Yes (built-in) | Yes (128 tools) | Yes (claim chains + scores) | Production |
When to Use Which
ACP is the right choice when an AI agent is buying from a registered merchant -- physical goods from Shopify, digital products from Etsy, SaaS subscriptions. It assumes one party has a Stripe account. It does not handle the case where both parties are agents.
AP2 addresses authorization traceability -- proving that a user authorized an agent to make a specific purchase. It uses cryptographic mandates (verifiable credentials) as proof of consent. It is valuable for compliance-sensitive agent-to-merchant flows but does not provide escrow, marketplace discovery, or trust scoring between agents.
x402 is ideal for pay-per-request API access where a client agent pays a server agent per HTTP call in USDC. It revives the HTTP 402 status code with a facilitator that settles payments on-chain. It excels at micropayments (sub-cent per request) but has no concept of escrow, multi-step job completion, or dispute resolution.
MPP extends the HTTP 402 model to support fiat payment methods (cards, wallets, BNPL) alongside stablecoins. Built on Tempo's Layer-1 blockchain, it handles high-frequency stablecoin transactions at scale. Like x402, it is request-response payment -- no escrow, no marketplace, no trust layer.
ERC-8183 provides on-chain escrow for agent jobs with a client-provider-evaluator model. It is the closest on-chain equivalent to GreenHelix's escrow system, but it requires Ethereum transaction costs for every state transition, does not include a marketplace for service discovery, and delegates trust scoring to the separate ERC-8004 standard.
GreenHelix A2A is the right choice when agents need to hire other agents for subtasks and pay them safely. It combines escrow (standard, performance-gated, and split), a searchable marketplace with ranked results, subscription billing, messaging for negotiation, dispute resolution, and cryptographic trust scoring (Ed25519-signed metrics, Merkle claim chains) in a single API. No on-chain transaction costs -- the gateway handles settlement. This is the protocol this guide covers.
Chapter 3: Setting Up Agent Wallets and Identity
Every agent in the GreenHelix ecosystem has three things: a cryptographic identity (Ed25519 keypair), a registered profile on the gateway, and a wallet for holding and transferring funds. This chapter sets up all three for a buyer agent and a seller agent, then defines a reusable AgentCommerce class used throughout the rest of the guide.
Generate Ed25519 Keypairs
pip install cryptography requests
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives import serialization
import base64
def generate_keypair():
"""Generate an Ed25519 keypair and return base64-encoded keys."""
private_key = Ed25519PrivateKey.generate()
public_key = private_key.public_key()
private_bytes = private_key.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption(),
)
public_bytes = public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
return (
base64.b64encode(private_bytes).decode(),
base64.b64encode(public_bytes).decode(),
)
buyer_private, buyer_public = generate_keypair()
seller_private, seller_public = generate_keypair()
print(f"Buyer public key: {buyer_public}")
print(f"Seller public key: {seller_public}")
Store private keys in environment variables or a secrets manager. Never commit them to version control.
The AgentCommerce Class
This class wraps every GreenHelix API call used in this guide. Define it once; use it in every subsequent chapter.
import requests
import base64
import json
import time
from typing import Optional
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives import serialization
class AgentCommerce:
"""Client for the GreenHelix A2A Commerce Gateway."""
def __init__(
self,
api_key: str,
agent_id: str,
base_url: str = "https://api.greenhelix.net/v1",
):
self.base_url = base_url
self.agent_id = agent_id
self.session = requests.Session()
self.session.headers.update({
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
})
def _execute(self, tool: str, input_data: dict) -> dict:
"""Execute a tool on the GreenHelix gateway."""
resp = self.session.post(
f"{self.base_url}/v1",
json={"tool": tool, "input": input_data},
)
resp.raise_for_status()
return resp.json()
# ── Identity & Wallets ──────────────────────────────────────
def register(self, public_key_b64: str, name: str) -> dict:
"""Register this agent on the gateway with an Ed25519 public key."""
return self._execute("register_agent", {
"agent_id": self.agent_id,
"public_key": public_key_b64,
"name": name,
})
def create_wallet(self) -> dict:
"""Create a wallet for this agent."""
return self._execute("create_wallet", {})
def get_balance(self) -> dict:
"""Get the current wallet balance."""
return self._execute("get_balance", {})
def deposit(self, amount: float) -> dict:
"""Deposit funds into this agent's wallet."""
return self._execute("deposit", {"amount": str(amount)})
# ── Marketplace ─────────────────────────────────────────────
def register_service(
self,
name: str,
description: str,
category: str,
price: float,
tags: list,
) -> dict:
"""List a service on the marketplace."""
return self._execute("register_service", {
"name": name,
"description": description,
"endpoint": f"agent://{self.agent_id}",
"price": price,
"tags": tags,
"category": category,
})
def discover_service(self, query: str) -> dict:
"""Search the marketplace for services matching a query."""
return self._execute("search_services", {"query": query})
def best_match(self, query: str) -> dict:
"""Get the highest-ranked service for a query."""
return self._execute("best_match", {"query": query})
def rate_service(self, service_id: str, rating: int) -> dict:
"""Rate a service (1-5 stars)."""
return self._execute("rate_service", {
"service_id": service_id,
"rating": rating,
})
# ── Escrow ──────────────────────────────────────────────────
def create_escrow(
self, payee: str, amount: float, description: str = ""
) -> dict:
"""Create a standard escrow. Funds lock until buyer releases."""
return self._execute("create_escrow", {
"payer_agent_id": self.agent_id,
"payee_agent_id": payee,
"amount": str(amount),
"description": description,
})
def release_escrow(self, escrow_id: str) -> dict:
"""Release escrowed funds to the seller."""
return self._execute("release_escrow", {"escrow_id": escrow_id})
def cancel_escrow(self, escrow_id: str) -> dict:
"""Cancel an escrow and return funds to the buyer."""
return self._execute("cancel_escrow", {"escrow_id": escrow_id})
def create_performance_escrow(
self,
payee: str,
amount: float,
metric_name: str,
threshold: float,
) -> dict:
"""Create escrow that auto-releases when a metric threshold is met."""
return self._execute("create_performance_escrow", {
"payer_agent_id": self.agent_id,
"payee_agent_id": payee,
"amount": str(amount),
"currency": "USD",
"performance_criteria": {
f"min_{metric_name}": threshold,
},
"evaluation_period_days": 30,
})
def check_performance_escrow(self, escrow_id: str) -> dict:
"""Check whether performance criteria have been met."""
return self._execute("check_performance_escrow", {
"escrow_id": escrow_id,
})
# ── Split Payments ──────────────────────────────────────────
def create_split_payment(self, amount: float, splits: list) -> dict:
"""Create a split payment across multiple agents.
splits: [{"agent_id": "...", "share_pct": 50}, ...]
"""
return self._execute("create_split_intent", {
"payer_agent_id": self.agent_id,
"amount": str(amount),
"splits": splits,
})
# ── Subscriptions ───────────────────────────────────────────
def subscribe(self, payee: str, amount: float, interval: str) -> dict:
"""Create a recurring subscription to another agent's service.
interval: "monthly", "weekly", or "daily"
"""
return self._execute("create_subscription", {
"payer_agent_id": self.agent_id,
"payee_agent_id": payee,
"amount": str(amount),
"interval": interval,
})
def cancel_subscription(self, subscription_id: str) -> dict:
"""Cancel an active subscription."""
return self._execute("cancel_subscription", {
"subscription_id": subscription_id,
})
# ── Messaging & Negotiation ─────────────────────────────────
def send_message(
self, recipient: str, message_type: str, content: dict
) -> dict:
"""Send a structured message to another agent."""
return self._execute("send_message", {
"sender_id": self.agent_id,
"recipient_id": recipient,
"message_type": message_type,
"content": content,
})
def negotiate_price(self, responder: str, initial_amount: float) -> dict:
"""Initiate a price negotiation with another agent."""
return self.send_message(responder, "price_negotiation", {
"proposed_amount": str(initial_amount),
"status": "initial_offer",
})
# ── Trust & Reputation ──────────────────────────────────────
def get_trust_score(self, target_agent_id: str) -> dict:
"""Get the trust score for any agent."""
return self._execute("get_trust_score", {
"agent_id": target_agent_id,
})
def get_reputation(self, target_agent_id: Optional[str] = None) -> dict:
"""Get reputation details for an agent."""
return self._execute("get_agent_reputation", {
"agent_id": target_agent_id or self.agent_id,
})
# ── Disputes ────────────────────────────────────────────────
def open_dispute(self, escrow_id: str, reason: str) -> dict:
"""Open a dispute on an active escrow."""
return self._execute("open_dispute", {
"escrow_id": escrow_id,
"reason": reason,
})
# ── Budget & Security ───────────────────────────────────────
def set_budget_cap(self, daily_limit: float) -> dict:
"""Set a daily spending cap for this agent's wallet."""
return self._execute("set_budget_cap", {
"agent_id": self.agent_id,
"daily_limit": str(daily_limit),
})
def get_budget_status(self) -> dict:
"""Check current spending against the budget cap."""
return self._execute("get_budget_status", {
"agent_id": self.agent_id,
})
Register Two Agents and Fund the Buyer
curl -- register the buyer agent:
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "register_agent",
"input": {
"agent_id": "buyer-research-agent",
"public_key": "'"$BUYER_PUBLIC_KEY"'",
"name": "Research Orchestrator"
}
}'
curl -- register the seller agent:
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "register_agent",
"input": {
"agent_id": "seller-summarizer-agent",
"public_key": "'"$SELLER_PUBLIC_KEY"'",
"name": "Document Summarization Service"
}
}'
Python -- complete setup for both agents:
import os
API_KEY = os.environ["GREENHELIX_API_KEY"]
# Initialize both agents
buyer = AgentCommerce(api_key=API_KEY, agent_id="buyer-research-agent")
seller = AgentCommerce(api_key=API_KEY, agent_id="seller-summarizer-agent")
# Register identities
buyer.register(buyer_public, "Research Orchestrator")
seller.register(seller_public, "Document Summarization Service")
# Create wallets
buyer.create_wallet()
seller.create_wallet()
# Fund the buyer's wallet
buyer.deposit(500.00)
print(f"Buyer balance: {buyer.get_balance()}")
curl -- create wallet and deposit:
# Create buyer wallet
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{"tool": "create_wallet", "input": {}}'
# Deposit funds
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{"tool": "deposit", "input": {"amount": "500.00"}}'
Chapter 4: Service Discovery and Negotiation
Before an agent can hire another agent, it needs to find one. The GreenHelix marketplace lets seller agents list services with structured metadata, and buyer agents search by keyword, category, or ranked relevance.
Register a Service
The seller agent lists its document summarization service on the marketplace.
curl:
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "register_service",
"input": {
"name": "Document Summarization (500-page batches)",
"description": "Summarizes up to 500 documents per batch. Returns structured JSON with key findings, sentiment, and citations. 95%+ factual accuracy verified via claim chain.",
"endpoint": "agent://seller-summarizer-agent",
"price": 25.00,
"tags": ["summarization", "nlp", "batch-processing", "verified-accuracy"],
"category": "data-processing"
}
}'
Python:
listing = seller.register_service(
name="Document Summarization (500-page batches)",
description=(
"Summarizes up to 500 documents per batch. Returns structured JSON "
"with key findings, sentiment, and citations. 95%+ factual accuracy "
"verified via claim chain."
),
category="data-processing",
price=25.00,
tags=["summarization", "nlp", "batch-processing", "verified-accuracy"],
)
service_id = listing["service_id"]
print(f"Service listed: {service_id}")
Search for Services
The buyer agent searches for summarization services.
curl:
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "search_services",
"input": {"query": "document summarization batch NLP"}
}'
Python:
results = buyer.discover_service("document summarization batch NLP")
for svc in results.get("services", []):
print(f" {svc['name']} - ${svc['price']} - {svc['tags']}")
Use best_match for Ranked Results
When you want the single best option rather than a list, use best_match. It ranks results by a composite of relevance, price, and the seller's trust score.
top_pick = buyer.best_match("document summarization verified accuracy")
print(f"Best match: {top_pick['name']} by {top_pick['agent_id']}")
print(f"Price: ${top_pick['price']}, Trust: {top_pick.get('trust_score')}")
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "best_match",
"input": {"query": "document summarization verified accuracy"}
}'
Negotiate Price via Messaging
Once the buyer identifies a service, it can negotiate price before committing to escrow. The messaging system supports structured negotiation flows.
def negotiate(buyer, seller_id, initial_offer, max_rounds=3):
"""Run a price negotiation between buyer and seller agents."""
current_offer = initial_offer
for round_num in range(1, max_rounds + 1):
# Buyer sends offer
buyer.send_message(seller_id, "price_negotiation", {
"proposed_amount": str(current_offer),
"round": round_num,
"status": "offer",
})
print(f"Round {round_num}: Buyer offers ${current_offer:.2f}")
# In production, the seller agent would respond asynchronously.
# Here we simulate a counter-offer at 90% of asking price.
if round_num < max_rounds:
counter = current_offer * 1.05 # Seller counters 5% higher
print(f"Round {round_num}: Seller counters ${counter:.2f}")
current_offer = (current_offer + counter) / 2 # Split difference
else:
# Final round: accept
buyer.send_message(seller_id, "price_negotiation", {
"proposed_amount": str(current_offer),
"round": round_num,
"status": "accepted",
})
print(f"Agreed price: ${current_offer:.2f}")
return current_offer
return current_offer
agreed_price = negotiate(buyer, "seller-summarizer-agent", 20.00)
Complete Discovery-to-Agreement Flow
# 1. Buyer searches for a service
results = buyer.discover_service("document summarization")
# 2. Check seller reputation before negotiating
seller_agent_id = results["services"][0]["agent_id"]
reputation = buyer.get_reputation(seller_agent_id)
trust = reputation.get("score", 0)
if trust < 0.5:
print(f"Low trust score ({trust}). Skipping this seller.")
else:
# 3. Negotiate price
asking_price = results["services"][0]["price"]
agreed = negotiate(buyer, seller_agent_id, asking_price * 0.85)
# 4. Create escrow at the agreed price
escrow = buyer.create_escrow(
payee=seller_agent_id,
amount=agreed,
description="Summarize 500 legal documents, JSON output",
)
print(f"Escrow created: {escrow['escrow_id']}")
print(f"Funds locked: ${agreed:.2f}")
Chapter 5: Advanced Service Discovery
The marketplace search in Chapter 4 covers the basics: register a service, search by keyword, pick the best match. But production agent systems need more sophisticated discovery. Agents need to find each other across protocol boundaries, maintain local registries for fast lookups, and implement weighted scoring that accounts for latency, price, trust, and availability simultaneously.
MCP Registry Integration
The Model Context Protocol (MCP) defines a standard way for AI agents to discover tools and resources. If your agents operate in an MCP environment, you can register GreenHelix marketplace services as MCP resources so that any MCP-compatible agent can discover them without knowing the GreenHelix API directly.
class MCPRegistryBridge:
"""Bridge between GreenHelix marketplace and an MCP resource registry."""
def __init__(self, commerce_client: AgentCommerce, registry_url: str):
self.client = commerce_client
self.registry_url = registry_url
self._cache = {}
self._cache_ttl = 300 # 5 minutes
def sync_services_to_mcp(self, categories: list) -> list:
"""Pull services from GreenHelix marketplace and register them
as MCP resources in the local registry.
Returns a list of registered resource URIs.
"""
registered = []
for category in categories:
results = self.client.discover_service(category)
for svc in results.get("services", []):
resource = self._to_mcp_resource(svc)
self._register_resource(resource)
registered.append(resource["uri"])
return registered
def _to_mcp_resource(self, service: dict) -> dict:
"""Convert a GreenHelix service listing to an MCP resource."""
return {
"uri": f"greenhelix://services/{service['service_id']}",
"name": service["name"],
"description": service.get("description", ""),
"mimeType": "application/json",
"metadata": {
"price": service["price"],
"agent_id": service["agent_id"],
"tags": service.get("tags", []),
"trust_score": service.get("trust_score"),
"protocol": "greenhelix-a2a",
},
}
def _register_resource(self, resource: dict):
"""Register a resource with the MCP registry."""
import requests
requests.post(
f"{self.registry_url}/resources",
json=resource,
timeout=10,
).raise_for_status()
def discover_from_mcp(self, query: str) -> list:
"""Query the MCP registry and filter for GreenHelix-backed services."""
import requests
resp = requests.get(
f"{self.registry_url}/resources",
params={"q": query},
timeout=10,
)
resp.raise_for_status()
resources = resp.json().get("resources", [])
return [
r for r in resources
if r.get("uri", "").startswith("greenhelix://")
]
# Usage
bridge = MCPRegistryBridge(buyer, "http://localhost:8080/mcp")
bridge.sync_services_to_mcp(["data-processing", "nlp", "translation"])
results = bridge.discover_from_mcp("document summarization")
A2A Protocol Discovery with Google's Agent-to-Agent Protocol
Google's A2A protocol defines an /.well-known/agent.json endpoint that agents can query to discover each other's capabilities. You can expose your GreenHelix-registered services through this endpoint so that A2A-compatible agents find your services alongside any other A2A services they discover.
import json
from http.server import HTTPServer, BaseHTTPRequestHandler
class A2ADiscoveryHandler(BaseHTTPRequestHandler):
"""Serve an A2A agent card that advertises GreenHelix marketplace services."""
def __init__(self, agent_id, services, *args, **kwargs):
self.agent_id = agent_id
self.services = services
super().__init__(*args, **kwargs)
def do_GET(self):
if self.path == "/.well-known/agent.json":
card = {
"name": self.agent_id,
"description": "Agent commerce services via GreenHelix A2A Gateway",
"url": f"https://sandbox.greenhelix.net/v1",
"version": "1.0",
"capabilities": {
"streaming": False,
"pushNotifications": False,
},
"skills": [
{
"id": svc["service_id"],
"name": svc["name"],
"description": svc.get("description", ""),
"tags": svc.get("tags", []),
"price": svc["price"],
"inputModes": ["application/json"],
"outputModes": ["application/json"],
}
for svc in self.services
],
}
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(card).encode())
else:
self.send_response(404)
self.end_headers()
def serve_agent_card(agent_id, services, port=8091):
"""Start a lightweight HTTP server that serves the A2A agent card."""
def handler(*args, **kwargs):
A2ADiscoveryHandler(agent_id, services, *args, **kwargs)
server = HTTPServer(("0.0.0.0", port), handler)
print(f"A2A discovery endpoint at http://0.0.0.0:{port}/.well-known/agent.json")
server.serve_forever()
Weighted Multi-Factor Service Ranking
The best_match tool returns a single result ranked by the gateway's default algorithm. In production you often need custom ranking that weights multiple factors differently depending on your use case. A latency-sensitive agent might weight response time above all else; a cost-sensitive agent might prioritize price.
import math
class ServiceRanker:
"""Rank marketplace services using a weighted multi-factor score."""
def __init__(
self,
commerce_client: AgentCommerce,
weights: dict = None,
):
self.client = commerce_client
self.weights = weights or {
"trust": 0.35,
"price": 0.25,
"relevance": 0.20,
"completion_rate": 0.20,
}
def rank(self, query: str, budget: float = None) -> list:
"""Search marketplace and return services ranked by weighted score."""
results = self.client.discover_service(query)
services = results.get("services", [])
scored = []
for svc in services:
if budget and svc.get("price", 0) > budget:
continue
# Fetch trust data for each candidate
trust_data = self.client.get_trust_score(svc["agent_id"])
trust_score = trust_data.get("score", 0)
rep = self.client.get_reputation(svc["agent_id"])
total_jobs = rep.get("total_transactions", 0)
completed = rep.get("completed_transactions", 0)
completion_rate = completed / max(total_jobs, 1)
# Normalize price (lower is better, invert to 0-1 scale)
max_price = max(s.get("price", 1) for s in services)
price_score = 1.0 - (svc.get("price", 0) / max(max_price, 0.01))
# Relevance from the search ranking position
position = services.index(svc)
relevance_score = 1.0 / (1.0 + math.log1p(position))
# Weighted composite
composite = (
self.weights["trust"] * trust_score
+ self.weights["price"] * price_score
+ self.weights["relevance"] * relevance_score
+ self.weights["completion_rate"] * completion_rate
)
scored.append({**svc, "_score": round(composite, 4)})
scored.sort(key=lambda s: s["_score"], reverse=True)
return scored
# Usage: cost-sensitive ranking
ranker = ServiceRanker(buyer, weights={
"trust": 0.20,
"price": 0.45,
"relevance": 0.15,
"completion_rate": 0.20,
})
ranked = ranker.rank("document summarization batch", budget=50.00)
for svc in ranked[:3]:
print(f" {svc['name']} | ${svc['price']} | score={svc['_score']}")
Service Health Monitoring and Failover
In production, a discovered service might become unavailable. Implement a health-aware discovery layer that tracks service availability and automatically fails over to alternatives.
import time
class HealthAwareDiscovery:
"""Discovery layer that tracks service health and provides failover."""
def __init__(self, commerce_client: AgentCommerce):
self.client = commerce_client
self._health = {} # service_id -> {healthy: bool, last_check: float, failures: int}
def discover_healthy(self, query: str, max_results: int = 5) -> list:
"""Return only services that are currently healthy."""
results = self.client.discover_service(query)
services = results.get("services", [])
healthy = []
for svc in services:
sid = svc["service_id"]
health = self._health.get(sid, {"healthy": True, "failures": 0})
# Re-check unhealthy services after a cooldown period
if not health["healthy"]:
elapsed = time.time() - health.get("last_check", 0)
cooldown = min(300 * (2 ** health["failures"]), 3600) # Exp backoff, max 1hr
if elapsed < cooldown:
continue # Still in cooldown
# Reset and give it another chance
health["healthy"] = True
healthy.append(svc)
if len(healthy) >= max_results:
break
return healthy
def report_failure(self, service_id: str):
"""Mark a service as unhealthy after a failed interaction."""
health = self._health.get(service_id, {"healthy": True, "failures": 0})
health["healthy"] = False
health["failures"] = health.get("failures", 0) + 1
health["last_check"] = time.time()
self._health[service_id] = health
def report_success(self, service_id: str):
"""Mark a service as healthy after a successful interaction."""
self._health[service_id] = {
"healthy": True,
"failures": 0,
"last_check": time.time(),
}
def discover_with_failover(self, query: str, action, budget: float = None):
"""Discover services and attempt the action, failing over on errors."""
services = self.discover_healthy(query)
for svc in services:
if budget and svc.get("price", 0) > budget:
continue
try:
result = action(svc)
self.report_success(svc["service_id"])
return result
except Exception as e:
self.report_failure(svc["service_id"])
print(f"Service {svc['service_id']} failed: {e}. Trying next.")
raise RuntimeError(f"All services for '{query}' exhausted or unhealthy.")
# Usage
discovery = HealthAwareDiscovery(buyer)
def hire_summarizer(svc):
escrow = buyer.create_escrow(
payee=svc["agent_id"],
amount=svc["price"],
description="Summarize 100 documents",
)
return escrow
result = discovery.discover_with_failover(
"document summarization",
hire_summarizer,
budget=30.00,
)
Chapter 6: Automated Price Negotiation and Auction Mechanisms
The basic negotiation in Chapter 4 uses a simple split-the-difference approach. Production agent systems need more sophisticated mechanisms: formal auctions when multiple sellers compete for a job, sealed-bid processes for price-sensitive procurement, and strategy-aware negotiation that adapts to counterparty behavior.
Dutch Auction: Descending Price for Urgent Procurement
In a Dutch auction, the buyer starts with a high price and lowers it incrementally until a seller accepts. This is optimal when the buyer needs a service quickly and multiple sellers are available. The first seller to accept wins the contract at the current price, which creates urgency for sellers to accept before the price drops further.
class DutchAuction:
"""Descending-price auction for agent service procurement."""
def __init__(
self,
buyer_client: AgentCommerce,
query: str,
start_price: float,
floor_price: float,
decrement: float,
round_duration_seconds: float = 5.0,
):
self.buyer = buyer_client
self.query = query
self.start_price = start_price
self.floor_price = floor_price
self.decrement = decrement
self.round_duration = round_duration_seconds
def run(self) -> dict:
"""Execute the Dutch auction. Returns the winning bid or None."""
# Discover candidate sellers
results = self.buyer.discover_service(self.query)
candidates = results.get("services", [])
if not candidates:
return {"status": "no_candidates", "winner": None}
current_price = self.start_price
round_num = 0
while current_price >= self.floor_price:
round_num += 1
print(f"Round {round_num}: offering ${current_price:.2f}")
# Broadcast the current price to all candidates
for svc in candidates:
self.buyer.send_message(
svc["agent_id"],
"dutch_auction",
{
"round": round_num,
"current_price": str(current_price),
"query": self.query,
"status": "open",
},
)
# Wait for acceptance (in production, poll for responses)
time.sleep(self.round_duration)
# Check for acceptances via messaging
# Simulated: in production, poll get_messages for responses
for svc in candidates:
# The seller agent would respond with accept/decline
# Here we simulate: sellers accept when price >= their minimum
seller_min = svc.get("price", 0) * 0.9 # 10% below listed
if current_price >= seller_min:
# Winner found
escrow = self.buyer.create_escrow(
payee=svc["agent_id"],
amount=current_price,
description=f"Dutch auction winner for: {self.query}",
)
return {
"status": "sold",
"winner": svc["agent_id"],
"price": current_price,
"round": round_num,
"escrow_id": escrow["escrow_id"],
}
current_price -= self.decrement
return {"status": "no_sale", "floor_reached": True, "winner": None}
# Run a Dutch auction for translation services
auction = DutchAuction(
buyer_client=buyer,
query="French translation certified",
start_price=50.00,
floor_price=15.00,
decrement=5.00,
round_duration_seconds=3.0,
)
result = auction.run()
print(f"Auction result: {result['status']}, winner: {result.get('winner')}")
Sealed-Bid Auction: First-Price and Vickrey
In a sealed-bid auction, sellers submit bids without seeing other bids. Two variants matter for agent commerce:
- First-price sealed-bid: the lowest bidder wins and pays their bid. Sellers shade their bids above their true minimum to maximize profit.
- Vickrey (second-price) sealed-bid: the lowest bidder wins but pays the second-lowest bid. This incentivizes truthful bidding because shading cannot help.
class SealedBidAuction:
"""Sealed-bid auction where sellers submit blind bids for a buyer's job."""
def __init__(
self,
buyer_client: AgentCommerce,
task_description: str,
max_budget: float,
auction_type: str = "vickrey", # "first_price" or "vickrey"
):
self.buyer = buyer_client
self.task = task_description
self.max_budget = max_budget
self.auction_type = auction_type
self._bids = [] # list of {"agent_id": str, "bid": float}
def invite_sellers(self, seller_ids: list):
"""Send bid invitations to a list of seller agent IDs."""
for seller_id in seller_ids:
self.buyer.send_message(
seller_id,
"sealed_bid_invitation",
{
"task": self.task,
"max_budget": str(self.max_budget),
"auction_type": self.auction_type,
"deadline": "2026-04-08T12:00:00Z",
},
)
def receive_bid(self, agent_id: str, bid_amount: float):
"""Record a bid from a seller. In production, bids arrive via messaging."""
if bid_amount > self.max_budget:
return {"status": "rejected", "reason": "exceeds_budget"}
self._bids.append({"agent_id": agent_id, "bid": bid_amount})
return {"status": "accepted"}
def resolve(self) -> dict:
"""Close the auction and determine the winner."""
if not self._bids:
return {"status": "no_bids", "winner": None}
# Sort by bid amount ascending (lowest wins)
sorted_bids = sorted(self._bids, key=lambda b: b["bid"])
winner = sorted_bids[0]
if self.auction_type == "vickrey" and len(sorted_bids) > 1:
# Winner pays second-lowest bid
payment = sorted_bids[1]["bid"]
else:
# First-price: winner pays their own bid
payment = winner["bid"]
# Create escrow at the determined price
escrow = self.buyer.create_escrow(
payee=winner["agent_id"],
amount=payment,
description=f"Sealed-bid {self.auction_type} auction: {self.task}",
)
return {
"status": "resolved",
"winner": winner["agent_id"],
"winning_bid": winner["bid"],
"payment": payment,
"escrow_id": escrow["escrow_id"],
"num_bids": len(self._bids),
}
# Usage: Vickrey auction for code review
auction = SealedBidAuction(
buyer_client=buyer,
task_description="Security audit of Python web application (5000 LOC)",
max_budget=200.00,
auction_type="vickrey",
)
auction.invite_sellers([
"security-auditor-alpha",
"security-auditor-beta",
"security-auditor-gamma",
])
# Bids arrive asynchronously; simulated here
auction.receive_bid("security-auditor-alpha", 150.00)
auction.receive_bid("security-auditor-beta", 120.00)
auction.receive_bid("security-auditor-gamma", 135.00)
result = auction.resolve()
print(f"Winner: {result['winner']}")
print(f"Winning bid: ${result['winning_bid']:.2f}")
print(f"Actual payment (Vickrey): ${result['payment']:.2f}")
# Output: Winner: security-auditor-beta, bid $120, pays $135 (second-lowest)
Adaptive Negotiation with Concession Strategy
For bilateral negotiation where auctions are not appropriate (single seller, ongoing relationship), implement a concession strategy that adapts based on deadline pressure and counterparty behavior.
class AdaptiveNegotiator:
"""Negotiation agent with time-dependent concession strategy."""
def __init__(
self,
commerce_client: AgentCommerce,
initial_price: float,
reservation_price: float,
max_rounds: int = 10,
concession_rate: float = 0.7, # <1 = concede slowly (hardliner), >1 = concede fast
):
self.client = commerce_client
self.initial = initial_price
self.reservation = reservation_price
self.max_rounds = max_rounds
self.beta = concession_rate
self._history = []
def compute_offer(self, round_num: int) -> float:
"""Compute the offer for a given round using time-dependent tactics.
Uses the Boulware/Conceder model:
- beta < 1: Boulware (hardliner) -- concedes slowly, then fast near deadline
- beta > 1: Conceder -- concedes quickly at first, then slows
- beta = 1: Linear concession
"""
t = round_num / self.max_rounds # Normalized time [0, 1]
alpha = t ** (1.0 / self.beta)
# For a buyer, move up from initial toward reservation (max)
offer = self.initial + alpha * (self.reservation - self.initial)
return round(offer, 2)
def negotiate(self, counterparty_id: str) -> dict:
"""Run multi-round negotiation with a counterparty."""
for round_num in range(1, self.max_rounds + 1):
my_offer = self.compute_offer(round_num)
self._history.append({"round": round_num, "my_offer": my_offer})
# Send offer
self.client.send_message(
counterparty_id,
"price_negotiation",
{
"proposed_amount": str(my_offer),
"round": round_num,
"max_rounds": self.max_rounds,
"status": "offer",
},
)
print(f" Round {round_num}/{self.max_rounds}: offered ${my_offer:.2f}")
# In production, wait for counterparty response via polling.
# Simulated: counterparty concedes from their side symmetrically.
counter_offer = self.reservation - (self.reservation - my_offer) * 0.4
if abs(my_offer - counter_offer) < 1.00:
# Close enough -- accept
agreed = (my_offer + counter_offer) / 2
self.client.send_message(
counterparty_id,
"price_negotiation",
{"proposed_amount": str(agreed), "status": "accepted"},
)
return {"status": "agreed", "price": round(agreed, 2), "rounds": round_num}
# Deadline reached without agreement
return {"status": "no_agreement", "rounds": self.max_rounds}
# Buyer negotiates: starts at $15, willing to go up to $28
negotiator = AdaptiveNegotiator(
commerce_client=buyer,
initial_price=15.00,
reservation_price=28.00,
max_rounds=8,
concession_rate=0.5, # Boulware: hold firm, concede late
)
result = negotiator.negotiate("seller-summarizer-agent")
print(f"Negotiation: {result['status']} at ${result.get('price', 'N/A')} in {result['rounds']} rounds")
Chapter 7: Escrow Patterns
5a: Standard Escrow (Task Completion)
Standard escrow is the simplest pattern: the buyer locks funds, the seller does work, the buyer inspects the result and releases payment. If the work is unsatisfactory, the buyer can dispute or cancel.
curl -- create escrow:
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "create_escrow",
"input": {
"payer_agent_id": "buyer-research-agent",
"payee_agent_id": "seller-summarizer-agent",
"amount": "25.00",
"description": "Summarize 500 legal documents into structured JSON"
}
}'
Python -- full lifecycle with error handling:
def standard_escrow_flow(buyer, seller_id, amount, task_description):
"""Complete standard escrow lifecycle: create, work, verify, release."""
# Step 1: Create escrow
escrow = buyer.create_escrow(
payee=seller_id,
amount=amount,
description=task_description,
)
escrow_id = escrow["escrow_id"]
print(f"Escrow {escrow_id} created. ${amount} locked.")
# Step 2: Seller does work (simulated -- in production this is async)
# The seller agent receives notification of the escrow and begins work.
# Step 3: Buyer verifies the deliverable
# In production: fetch result from seller's endpoint, run validation
deliverable_acceptable = True # Replace with actual verification
# Step 4: Release or dispute
if deliverable_acceptable:
result = buyer.release_escrow(escrow_id)
print(f"Escrow {escrow_id} released. Seller paid ${amount}.")
return result
else:
dispute = buyer.open_dispute(
escrow_id,
reason="Output failed validation: accuracy below 90%",
)
print(f"Dispute opened: {dispute['dispute_id']}")
return dispute
standard_escrow_flow(
buyer,
"seller-summarizer-agent",
25.00,
"Summarize 500 legal documents into structured JSON",
)
curl -- release escrow:
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "release_escrow",
"input": {"escrow_id": "escrow-abc123"}
}'
5b: Performance Escrow (Metric-Gated)
Performance escrow auto-releases when a measurable criterion is met. The seller submits metrics via submit_metrics, and the system evaluates them against the threshold defined at escrow creation. No manual release needed.
Example: image classification agent pays only if accuracy exceeds 95%.
curl:
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "create_performance_escrow",
"input": {
"payer_agent_id": "buyer-research-agent",
"payee_agent_id": "seller-classifier-agent",
"amount": "75.00",
"currency": "USD",
"performance_criteria": {
"min_accuracy_pct": 95.0
},
"evaluation_period_days": 7
}
}'
Python:
# Buyer creates performance escrow
perf_escrow = buyer.create_performance_escrow(
payee="seller-classifier-agent",
amount=75.00,
metric_name="accuracy_pct",
threshold=95.0,
)
perf_escrow_id = perf_escrow["escrow_id"]
print(f"Performance escrow created: {perf_escrow_id}")
# Seller does the classification work, then submits metrics
classifier = AgentCommerce(api_key=API_KEY, agent_id="seller-classifier-agent")
classifier._execute("submit_metrics", {
"agent_id": "seller-classifier-agent",
"metrics": {
"accuracy_pct": 97.3,
"images_classified": 10000,
"false_positive_rate": 0.012,
},
})
print("Metrics submitted.")
# Check if the escrow auto-released
status = buyer.check_performance_escrow(perf_escrow_id)
print(f"Escrow status: {status.get('status')}")
# If accuracy_pct >= 95.0, status will be "released"
curl -- check performance escrow:
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "check_performance_escrow",
"input": {"escrow_id": "perf-escrow-xyz789"}
}'
5c: Split Payments (Multi-Agent Pipelines)
When a task flows through a pipeline of agents -- data ingestion, processing, and analysis -- the buyer should not need to create separate escrows for each stage. Split payments let the buyer fund one transaction that distributes to all agents in the pipeline based on predefined shares.
Example: data pipeline where Agent A ingests (20%), Agent B cleans (30%), Agent C analyzes (50%).
curl:
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "create_split_intent",
"input": {
"payer_agent_id": "buyer-research-agent",
"amount": "100.00",
"splits": [
{"agent_id": "agent-ingestor", "share_pct": 20},
{"agent_id": "agent-cleaner", "share_pct": 30},
{"agent_id": "agent-analyst", "share_pct": 50}
]
}
}'
Python:
split = buyer.create_split_payment(
amount=100.00,
splits=[
{"agent_id": "agent-ingestor", "share_pct": 20},
{"agent_id": "agent-cleaner", "share_pct": 30},
{"agent_id": "agent-analyst", "share_pct": 50},
],
)
print(f"Split payment created: {split['intent_id']}")
print("Distribution:")
print(" agent-ingestor: $20.00 (20%)")
print(" agent-cleaner: $30.00 (30%)")
print(" agent-analyst: $50.00 (50%)")
Each agent in the pipeline receives their share only when the overall job completes. If any stage fails, the entire split can be disputed.
Chapter 8: Advanced Escrow Patterns
The standard, performance, and split escrows in Chapter 7 cover the most common cases. Production systems also need time-locked escrows for deadline enforcement, milestone-based escrows for multi-phase projects, and multi-party escrows with designated arbiters.
Time-Locked Escrow
A time-locked escrow automatically refunds the buyer if the seller does not deliver before a deadline. This prevents funds from being locked indefinitely when a seller agent goes offline or abandons a task.
from datetime import datetime, timedelta, timezone
class TimeLockEscrow:
"""Escrow with automatic refund on deadline expiry."""
def __init__(self, commerce_client: AgentCommerce):
self.client = commerce_client
def create(
self,
payee: str,
amount: float,
description: str,
deadline_hours: int = 24,
) -> dict:
"""Create an escrow that expires after deadline_hours."""
deadline = datetime.now(timezone.utc) + timedelta(hours=deadline_hours)
deadline_str = deadline.isoformat()
escrow = self.client.create_escrow(
payee=payee,
amount=amount,
description=(
f"{description}\n"
f"DEADLINE: {deadline_str}\n"
f"Auto-refund if not released by deadline."
),
)
escrow_id = escrow["escrow_id"]
# Store deadline metadata for the monitoring loop
self._schedule_expiry_check(escrow_id, deadline)
return {**escrow, "deadline": deadline_str}
def _schedule_expiry_check(self, escrow_id: str, deadline: datetime):
"""In production, register this with a scheduler (cron, Celery, etc).
Here we demonstrate the check logic."""
pass
def check_and_refund_expired(self, escrow_id: str, deadline_str: str) -> dict:
"""Check if an escrow has passed its deadline and refund if so."""
deadline = datetime.fromisoformat(deadline_str)
now = datetime.now(timezone.utc)
if now > deadline:
result = self.client.cancel_escrow(escrow_id)
return {"action": "refunded", "escrow_id": escrow_id, **result}
else:
remaining = (deadline - now).total_seconds() / 3600
return {
"action": "still_active",
"escrow_id": escrow_id,
"hours_remaining": round(remaining, 1),
}
# Usage
timelock = TimeLockEscrow(buyer)
escrow = timelock.create(
payee="seller-summarizer-agent",
amount=25.00,
description="Summarize 200 legal briefs",
deadline_hours=12,
)
print(f"Escrow {escrow['escrow_id']} expires at {escrow['deadline']}")
# Later, in a monitoring loop:
status = timelock.check_and_refund_expired(
escrow["escrow_id"],
escrow["deadline"],
)
print(f"Status: {status['action']}")
Milestone-Based Escrow
For large projects, a single escrow for the entire amount is risky for both parties. Milestone-based escrow splits the total into phases, releasing funds incrementally as each milestone is verified. The buyer is never exposed to more than one milestone's worth of risk.
class MilestoneEscrow:
"""Multi-phase escrow that releases funds per milestone."""
def __init__(self, commerce_client: AgentCommerce):
self.client = commerce_client
def create(
self,
payee: str,
milestones: list,
) -> dict:
"""Create milestone escrows for a multi-phase project.
milestones: [
{"name": "Phase 1: Data ingestion", "amount": 30.00, "criteria": "..."},
{"name": "Phase 2: Analysis", "amount": 50.00, "criteria": "..."},
{"name": "Phase 3: Report", "amount": 20.00, "criteria": "..."},
]
"""
total = sum(m["amount"] for m in milestones)
escrow_ids = []
for i, milestone in enumerate(milestones):
escrow = self.client.create_escrow(
payee=payee,
amount=milestone["amount"],
description=(
f"Milestone {i + 1}/{len(milestones)}: {milestone['name']}\n"
f"Criteria: {milestone['criteria']}\n"
f"Total project value: ${total:.2f}"
),
)
escrow_ids.append({
"milestone": milestone["name"],
"escrow_id": escrow["escrow_id"],
"amount": milestone["amount"],
"status": "locked",
})
return {"milestones": escrow_ids, "total": total}
def release_milestone(self, milestone_escrow_id: str) -> dict:
"""Release a single milestone after verification."""
return self.client.release_escrow(milestone_escrow_id)
def abort_remaining(self, escrow_ids: list) -> list:
"""Cancel all unreleased milestones (e.g., project abandoned)."""
results = []
for eid in escrow_ids:
try:
result = self.client.cancel_escrow(eid)
results.append({"escrow_id": eid, "status": "refunded"})
except Exception as e:
results.append({"escrow_id": eid, "status": "error", "error": str(e)})
return results
# Usage: 3-phase data analysis project
milestone = MilestoneEscrow(buyer)
project = milestone.create(
payee="data-analysis-agent",
milestones=[
{
"name": "Data collection and cleaning",
"amount": 30.00,
"criteria": "10,000 records cleaned, schema validated",
},
{
"name": "Statistical analysis",
"amount": 50.00,
"criteria": "Regression model with R-squared > 0.85",
},
{
"name": "Final report generation",
"amount": 20.00,
"criteria": "PDF report with executive summary and methodology",
},
],
)
print(f"Project created: {len(project['milestones'])} milestones, ${project['total']}")
# After Phase 1 verification passes:
milestone.release_milestone(project["milestones"][0]["escrow_id"])
print(f"Milestone 1 released: ${project['milestones'][0]['amount']}")
Multi-Party Escrow with Arbitration
When buyer and seller cannot agree on whether deliverables meet criteria, a designated third-party arbiter can cast the deciding vote. Multi-party escrow registers an arbiter at creation time. If a dispute arises, the arbiter -- typically a specialized quality-evaluation agent -- inspects the deliverable and triggers release or refund.
class ArbitratedEscrow:
"""Escrow with a designated third-party arbiter for dispute resolution."""
def __init__(
self,
buyer_client: AgentCommerce,
arbiter_client: AgentCommerce,
):
self.buyer = buyer_client
self.arbiter = arbiter_client
def create(
self,
payee: str,
amount: float,
description: str,
arbiter_fee_pct: float = 5.0,
) -> dict:
"""Create escrow with a registered arbiter.
The arbiter fee is deducted from the total only if arbitration occurs.
"""
escrow = self.buyer.create_escrow(
payee=payee,
amount=amount,
description=(
f"{description}\n"
f"Arbiter: {self.arbiter.agent_id}\n"
f"Arbiter fee: {arbiter_fee_pct}% (charged only on dispute)"
),
)
return {
**escrow,
"arbiter_id": self.arbiter.agent_id,
"arbiter_fee_pct": arbiter_fee_pct,
}
def request_arbitration(
self,
escrow_id: str,
buyer_evidence: str,
seller_evidence: str,
) -> dict:
"""Arbiter evaluates evidence from both parties and decides."""
# Arbiter receives both sides
self.arbiter.send_message(
self.arbiter.agent_id,
"arbitration_request",
{
"escrow_id": escrow_id,
"buyer_evidence": buyer_evidence,
"seller_evidence": seller_evidence,
},
)
# In production the arbiter agent evaluates asynchronously.
# Here we show the decision flow:
decision = self._arbiter_evaluate(buyer_evidence, seller_evidence)
if decision["ruling"] == "release":
result = self.buyer.release_escrow(escrow_id)
return {"ruling": "release_to_seller", **result}
elif decision["ruling"] == "refund":
result = self.buyer.cancel_escrow(escrow_id)
return {"ruling": "refund_to_buyer", **result}
else:
# Partial -- open dispute for platform-level split
dispute = self.buyer.open_dispute(
escrow_id,
reason=f"Arbiter recommends partial: {decision.get('reason', '')}",
)
return {"ruling": "partial_split", **dispute}
def _arbiter_evaluate(self, buyer_evidence: str, seller_evidence: str) -> dict:
"""Placeholder for arbiter's evaluation logic.
In production, this is the arbiter agent's own decision process."""
# Example: simple keyword-based heuristic
if "failed" in buyer_evidence.lower() and "passed" not in seller_evidence.lower():
return {"ruling": "refund", "reason": "Buyer evidence indicates failure"}
elif "passed" in seller_evidence.lower():
return {"ruling": "release", "reason": "Seller evidence supports completion"}
else:
return {"ruling": "partial", "reason": "Conflicting evidence"}
# Usage
arbiter = AgentCommerce(api_key=API_KEY, agent_id="quality-arbiter-agent")
arbitrated = ArbitratedEscrow(buyer, arbiter)
escrow = arbitrated.create(
payee="seller-summarizer-agent",
amount=100.00,
description="Full website content audit with SEO recommendations",
arbiter_fee_pct=5.0,
)
print(f"Arbitrated escrow: {escrow['escrow_id']}, arbiter: {escrow['arbiter_id']}")
# If dispute arises:
ruling = arbitrated.request_arbitration(
escrow["escrow_id"],
buyer_evidence="Output missing 3 of 10 required sections. SEO score only 42/100.",
seller_evidence="All sections delivered. SEO score measured at 78/100 using Ahrefs.",
)
print(f"Arbitration ruling: {ruling['ruling']}")
Chapter 9: Subscription Billing for Agent Services
Some agent services are not one-off tasks. A monitoring agent might pay a data provider for continuous feeds. A compliance agent might subscribe to a regulatory update service. Subscriptions handle recurring payments without creating a new escrow each cycle.
Create a Subscription
curl:
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "create_subscription",
"input": {
"payer_agent_id": "monitoring-agent",
"payee_agent_id": "data-feed-provider",
"amount": "15.00",
"interval": "monthly"
}
}'
Python:
monitor = AgentCommerce(api_key=API_KEY, agent_id="monitoring-agent")
monitor.create_wallet()
monitor.deposit(200.00)
# Subscribe to a data feed at $15/month
sub = monitor.subscribe(
payee="data-feed-provider",
amount=15.00,
interval="monthly",
)
sub_id = sub["subscription_id"]
print(f"Subscription active: {sub_id}")
print(f"Next payment: {sub.get('next_payment_date')}")
Monitor Subscription Status
# Check subscription health
status = monitor._execute("get_subscription", {
"subscription_id": sub_id,
})
print(f"Status: {status['status']}")
print(f"Payments made: {status.get('payments_completed', 0)}")
print(f"Next due: {status.get('next_payment_date')}")
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "get_subscription",
"input": {"subscription_id": "sub-abc123"}
}'
Cancel and Reactivate
# Cancel when the data feed is no longer needed
monitor.cancel_subscription(sub_id)
print(f"Subscription {sub_id} cancelled.")
# To reactivate, create a new subscription
new_sub = monitor.subscribe(
payee="data-feed-provider",
amount=15.00,
interval="monthly",
)
print(f"New subscription: {new_sub['subscription_id']}")
curl -- cancel:
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "cancel_subscription",
"input": {"subscription_id": "sub-abc123"}
}'
Complete Example: Monitoring Agent Subscribing to a Data Provider
def setup_data_subscription(api_key, monitor_id, provider_id, monthly_cost):
"""Set up a monitoring agent with a recurring data subscription."""
monitor = AgentCommerce(api_key=api_key, agent_id=monitor_id)
# Check provider reputation before subscribing
rep = monitor.get_reputation(provider_id)
if rep.get("score", 0) < 0.6:
print(f"Provider {provider_id} trust too low: {rep.get('score')}")
return None
# Ensure sufficient balance for 3 months
balance = monitor.get_balance()
required = monthly_cost * 3
current = float(balance.get("balance", 0))
if current < required:
monitor.deposit(required - current + 50.00) # Buffer
# Create subscription
sub = monitor.subscribe(
payee=provider_id,
amount=monthly_cost,
interval="monthly",
)
print(f"Subscribed to {provider_id} at ${monthly_cost}/mo")
print(f"Subscription ID: {sub['subscription_id']}")
return sub
setup_data_subscription(
api_key=API_KEY,
monitor_id="compliance-monitor-01",
provider_id="regulatory-feed-provider",
monthly_cost=15.00,
)
Chapter 10: Dispute Resolution Between Agents
Disputes happen. A seller agent fails to deliver. A deliverable does not meet the stated criteria. A buyer agent refuses to release escrow despite satisfactory work. The dispute system provides a structured resolution path.
When to Open a Dispute
Open a dispute when: the seller has not delivered within the agreed timeframe, the deliverable fails automated quality checks, or performance metrics submitted by the seller contradict the buyer's independent measurements.
Do not open a dispute for: minor formatting issues (negotiate instead), price disagreements after escrow creation (negotiate before creating escrow), or network-level failures (retry the request).
Opening a Dispute
curl:
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "open_dispute",
"input": {
"escrow_id": "escrow-abc123",
"reason": "Deliverable failed validation. Accuracy was 71% against the agreed 95% threshold. Seller submitted metrics claiming 97% but independent testing on the same dataset produced 71%. Evidence: test run ID run-2026-04-05-001."
}
}'
Python:
dispute = buyer.open_dispute(
escrow_id="escrow-abc123",
reason=(
"Deliverable failed validation. Accuracy was 71% against the agreed "
"95% threshold. Seller submitted metrics claiming 97% but independent "
"testing on the same dataset produced 71%. "
"Evidence: test run ID run-2026-04-05-001."
),
)
print(f"Dispute ID: {dispute['dispute_id']}")
print(f"Status: {dispute['status']}")
Responding with Evidence
The seller responds by ensuring their claim chain is current. Every metric submission is a signed, timestamped leaf in a Merkle tree. The dispute resolver can verify each leaf against the seller's public key.
# Seller builds their claim chain as evidence
seller._execute("build_claim_chain", {
"agent_id": "seller-summarizer-agent",
})
# Seller can also submit additional context via messaging
seller.send_message(
recipient="buyer-research-agent",
message_type="dispute_evidence",
content={
"escrow_id": "escrow-abc123",
"evidence": "Metrics were computed on validation split. Buyer may have "
"tested on training split. Requesting re-evaluation on the "
"agreed test dataset per escrow terms.",
"claim_chain_root": "sha256:a1b2c3d4...",
},
)
Resolution Outcomes
Disputes resolve in one of three ways:
- Full release to seller: Evidence shows the work met criteria. Funds transfer to the seller.
- Full refund to buyer: Evidence shows criteria were not met. Funds return to the buyer.
- Partial split: Work was partially satisfactory. Funds split proportionally based on the evidence.
# Resolve a dispute (done by the platform arbiter)
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "resolve_dispute",
"input": {
"dispute_id": "dispute-xyz789",
"resolution": "refund_to_buyer"
}
}'
Building a Dispute-Resistant Service Contract
To minimize disputes, define your escrow with precise, measurable criteria:
def create_robust_escrow(buyer, seller_id, amount, task_spec):
"""Create an escrow with detailed acceptance criteria."""
description = (
f"Task: {task_spec['task']}\n"
f"Input: {task_spec['input_description']}\n"
f"Expected output format: {task_spec['output_format']}\n"
f"Acceptance criteria:\n"
)
for criterion in task_spec["criteria"]:
description += f" - {criterion}\n"
description += f"Deadline: {task_spec['deadline']}\n"
description += f"Dispute window: {task_spec.get('dispute_window', '48 hours')}"
return buyer.create_escrow(
payee=seller_id,
amount=amount,
description=description,
)
escrow = create_robust_escrow(buyer, "seller-summarizer-agent", 50.00, {
"task": "Summarize 500 legal documents",
"input_description": "PDF files in S3 bucket s3://legal-docs-batch-42/",
"output_format": "JSON array, one object per document with keys: title, summary (max 200 words), sentiment, key_entities, citations",
"criteria": [
"All 500 documents processed (no skips)",
"Each summary <= 200 words",
"Factual accuracy >= 95% (spot-checked against 50 random documents)",
"JSON validates against provided schema",
],
"deadline": "2026-04-08T00:00:00Z",
"dispute_window": "48 hours after delivery",
})
Chapter 11: Error Handling in Transactions
Agent-to-agent transactions fail. Networks drop, services timeout, wallets run dry mid-escrow, and downstream agents crash after accepting payment but before delivering work. Robust error handling is not optional -- it is the difference between an agent system that recovers gracefully and one that loses money on every failure.
Retry with Exponential Backoff
Every call to the GreenHelix gateway can fail with transient errors (429 rate limit, 502 bad gateway, 503 service unavailable). Wrap all API calls in a retry layer with exponential backoff and jitter to prevent thundering herd problems.
import time
import random
import requests
class ResilientCommerce(AgentCommerce):
"""AgentCommerce with built-in retry logic for transient failures."""
RETRYABLE_STATUS_CODES = {429, 502, 503, 504}
def __init__(
self,
api_key: str,
agent_id: str,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
**kwargs,
):
super().__init__(api_key, agent_id, **kwargs)
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
def _execute(self, tool: str, input_data: dict) -> dict:
"""Execute with retry on transient failures."""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
resp = self.session.post(
f"{self.base_url}/v1",
json={"tool": tool, "input": input_data},
timeout=30,
)
if resp.status_code in self.RETRYABLE_STATUS_CODES:
delay = self._compute_delay(attempt, resp)
print(f"Retryable error {resp.status_code} on {tool}, "
f"attempt {attempt + 1}/{self.max_retries + 1}, "
f"retrying in {delay:.1f}s")
time.sleep(delay)
continue
resp.raise_for_status()
return resp.json()
except requests.exceptions.ConnectionError as e:
last_exception = e
delay = self._compute_delay(attempt)
print(f"Connection error on {tool}, retrying in {delay:.1f}s")
time.sleep(delay)
except requests.exceptions.Timeout as e:
last_exception = e
delay = self._compute_delay(attempt)
print(f"Timeout on {tool}, retrying in {delay:.1f}s")
time.sleep(delay)
raise RuntimeError(
f"Failed to execute {tool} after {self.max_retries + 1} attempts: "
f"{last_exception}"
)
def _compute_delay(self, attempt: int, response=None) -> float:
"""Exponential backoff with jitter. Respects Retry-After header."""
if response and response.headers.get("Retry-After"):
try:
return float(response.headers["Retry-After"])
except ValueError:
pass
delay = self.base_delay * (2 ** attempt)
jitter = random.uniform(0, delay * 0.3)
return min(delay + jitter, self.max_delay)
# Usage: drop-in replacement for AgentCommerce
buyer = ResilientCommerce(
api_key=API_KEY,
agent_id="buyer-research-agent",
max_retries=3,
base_delay=1.0,
)
Compensation Transactions
When a multi-step workflow fails partway through, you need compensation transactions to undo the steps that already succeeded. For example, if escrow creation succeeds but the seller agent is unreachable, the escrow must be cancelled to return funds.
class CompensatingTransaction:
"""Track operations and roll them back on failure."""
def __init__(self):
self._operations = [] # list of (description, undo_fn)
def record(self, description: str, undo_fn):
"""Record an operation and its compensating action."""
self._operations.append((description, undo_fn))
def compensate(self):
"""Roll back all recorded operations in reverse order."""
errors = []
for description, undo_fn in reversed(self._operations):
try:
undo_fn()
print(f" Compensated: {description}")
except Exception as e:
errors.append((description, str(e)))
print(f" FAILED to compensate: {description}: {e}")
self._operations.clear()
return errors
def hire_with_compensation(buyer, seller_id, amount, task):
"""Hire an agent with full compensation on failure."""
compensator = CompensatingTransaction()
try:
# Step 1: Create escrow
escrow = buyer.create_escrow(
payee=seller_id, amount=amount, description=task,
)
escrow_id = escrow["escrow_id"]
compensator.record(
f"Cancel escrow {escrow_id}",
lambda: buyer.cancel_escrow(escrow_id),
)
print(f"Step 1: Escrow {escrow_id} created")
# Step 2: Notify seller
buyer.send_message(seller_id, "task_assignment", {
"escrow_id": escrow_id,
"task": task,
})
print(f"Step 2: Seller notified")
# Step 3: Verify seller acknowledged (might fail)
# In production: poll for acknowledgment message
seller_acknowledged = True # Replace with actual check
if not seller_acknowledged:
raise RuntimeError("Seller did not acknowledge within timeout")
print(f"Step 3: Seller acknowledged. Workflow complete.")
return {"status": "success", "escrow_id": escrow_id}
except Exception as e:
print(f"Workflow failed at: {e}")
print("Rolling back:")
errors = compensator.compensate()
return {"status": "rolled_back", "error": str(e), "compensation_errors": errors}
result = hire_with_compensation(
buyer, "seller-summarizer-agent", 25.00, "Summarize quarterly reports",
)
The Saga Pattern for Multi-Agent Pipelines
When a task flows through multiple agents (ingestion -> processing -> analysis), each step is a separate transaction. The saga pattern coordinates these transactions so that if step 3 fails, steps 1 and 2 are compensated. Each step defines both its forward action and its compensating action.
class SagaStep:
"""A single step in a saga with forward and compensating actions."""
def __init__(self, name: str, action, compensation):
self.name = name
self.action = action # callable that returns a result
self.compensation = compensation # callable that undoes the action
self.result = None
class SagaOrchestrator:
"""Orchestrate a multi-step saga across agent transactions."""
def __init__(self):
self._steps = []
self._completed = []
def add_step(self, step: SagaStep):
"""Add a step to the saga."""
self._steps.append(step)
def execute(self) -> dict:
"""Execute all steps. Compensate on failure."""
for step in self._steps:
try:
print(f"Executing: {step.name}")
step.result = step.action()
self._completed.append(step)
except Exception as e:
print(f"Failed at: {step.name}: {e}")
self._compensate_completed()
return {
"status": "rolled_back",
"failed_step": step.name,
"error": str(e),
"completed_before_failure": [s.name for s in self._completed],
}
return {
"status": "success",
"steps_completed": [
{"name": s.name, "result": s.result} for s in self._completed
],
}
def _compensate_completed(self):
"""Compensate all completed steps in reverse order."""
print("Compensating completed steps:")
for step in reversed(self._completed):
try:
step.compensation()
print(f" Compensated: {step.name}")
except Exception as e:
print(f" FAILED to compensate {step.name}: {e}")
def run_data_pipeline_saga(buyer, pipeline_agents, total_budget):
"""Execute a 3-agent data pipeline as a saga."""
saga = SagaOrchestrator()
escrow_ids = {}
for agent_info in pipeline_agents:
agent_id = agent_info["agent_id"]
amount = total_budget * (agent_info["share_pct"] / 100)
step_name = agent_info["step_name"]
def make_action(aid, amt, desc):
def action():
escrow = buyer.create_escrow(payee=aid, amount=amt, description=desc)
escrow_ids[aid] = escrow["escrow_id"]
return escrow
return action
def make_compensation(aid):
def compensate():
if aid in escrow_ids:
buyer.cancel_escrow(escrow_ids[aid])
return compensate
saga.add_step(SagaStep(
name=step_name,
action=make_action(agent_id, amount, f"Pipeline: {step_name}"),
compensation=make_compensation(agent_id),
))
return saga.execute()
# Usage
result = run_data_pipeline_saga(
buyer,
pipeline_agents=[
{"agent_id": "ingest-agent", "share_pct": 20, "step_name": "Data ingestion"},
{"agent_id": "clean-agent", "share_pct": 30, "step_name": "Data cleaning"},
{"agent_id": "analyze-agent", "share_pct": 50, "step_name": "Analysis"},
],
total_budget=100.00,
)
print(f"Pipeline result: {result['status']}")
Idempotency for Safe Retries
When retrying a failed escrow creation, you risk creating duplicate escrows. Use idempotency keys to ensure that retrying the same operation produces the same result without side effects.
import hashlib
import json
class IdempotentCommerce(ResilientCommerce):
"""Commerce client that uses idempotency keys for safe retries."""
def create_escrow(self, payee: str, amount: float, description: str = "") -> dict:
"""Create escrow with an idempotency key derived from the inputs."""
# Generate a deterministic key from the escrow parameters
key_material = json.dumps({
"payer": self.agent_id,
"payee": payee,
"amount": str(amount),
"description": description,
}, sort_keys=True)
idempotency_key = hashlib.sha256(key_material.encode()).hexdigest()[:32]
return self._execute("create_escrow", {
"payer_agent_id": self.agent_id,
"payee_agent_id": payee,
"amount": str(amount),
"description": description,
"idempotency_key": idempotency_key,
})
# Usage: retrying the same call is safe
client = IdempotentCommerce(api_key=API_KEY, agent_id="buyer-agent")
# Both calls produce the same escrow (no duplicate)
escrow_1 = client.create_escrow("seller-agent", 25.00, "Test task")
escrow_2 = client.create_escrow("seller-agent", 25.00, "Test task")
# escrow_1["escrow_id"] == escrow_2["escrow_id"]
Chapter 12: Security Patterns
Autonomous agents spending money without human oversight need guardrails. A bug in your agent's decision loop should not drain its wallet. A compromised API key should not access unlimited funds.
Budget Caps
Set a daily spending cap to limit exposure. If the agent's spending logic goes wrong, the cap prevents catastrophic loss.
curl:
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "set_budget_cap",
"input": {
"agent_id": "buyer-research-agent",
"daily_limit": "100.00"
}
}'
Python:
# Set a $100/day spending cap
buyer.set_budget_cap(daily_limit=100.00)
# Check spending status before a large purchase
budget = buyer.get_budget_status()
spent_today = float(budget.get("spent_today", 0))
daily_limit = float(budget.get("daily_limit", 0))
remaining = daily_limit - spent_today
print(f"Spent today: ${spent_today:.2f}")
print(f"Daily limit: ${daily_limit:.2f}")
print(f"Remaining: ${remaining:.2f}")
if remaining < 25.00:
print("WARNING: Approaching daily budget cap. Deferring large purchases.")
Trust Verification Before Payment
Never create an escrow with an agent you have not verified. Check trust score and reputation before committing funds.
def safe_hire(buyer, seller_id, amount, description, min_trust=0.6):
"""Only create escrow if the seller meets minimum trust threshold."""
# Check trust score
trust = buyer.get_trust_score(seller_id)
score = trust.get("score", 0)
if score < min_trust:
print(f"Seller {seller_id} trust score {score} below threshold {min_trust}")
return None
# Check reputation for dispute history
rep = buyer.get_reputation(seller_id)
disputes_lost = rep.get("disputes_lost", 0)
if disputes_lost > 2:
print(f"Seller {seller_id} has lost {disputes_lost} disputes. Skipping.")
return None
# Budget check
budget = buyer.get_budget_status()
remaining = float(budget.get("daily_limit", 0)) - float(budget.get("spent_today", 0))
if amount > remaining:
print(f"Insufficient daily budget. Need ${amount}, have ${remaining:.2f}.")
return None
# All checks pass -- create escrow
escrow = buyer.create_escrow(payee=seller_id, amount=amount, description=description)
print(f"Escrow created: {escrow['escrow_id']} (trust: {score}, amount: ${amount})")
return escrow
curl -- check trust score:
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "get_trust_score",
"input": {"agent_id": "seller-summarizer-agent"}
}'
Webhook Notifications for Payment Events
Monitor payment events so your agent reacts to escrow releases, disputes, and subscription renewals.
def poll_events(agent, event_types=None, interval_seconds=30):
"""Poll for payment events and handle them.
In production, replace with webhook registration if available.
"""
import time
if event_types is None:
event_types = ["escrow.released", "escrow.disputed", "subscription.renewed"]
last_check = int(time.time()) - interval_seconds
while True:
for event_type in event_types:
events = agent._execute("get_events", {
"event_type": event_type,
"agent_id": agent.agent_id,
"start": str(last_check),
})
for event in events.get("events", []):
handle_event(event_type, event)
last_check = int(time.time())
time.sleep(interval_seconds)
def handle_event(event_type, event):
"""Route events to appropriate handlers."""
if event_type == "escrow.released":
print(f"Payment received: escrow {event['escrow_id']} released")
elif event_type == "escrow.disputed":
print(f"Dispute opened on escrow {event['escrow_id']}: {event.get('reason')}")
elif event_type == "subscription.renewed":
print(f"Subscription {event['subscription_id']} renewed")
Spending Guardrails for Autonomous Agents
Combine budget caps, trust checks, and rate limiting into a single guardrail layer:
class GuardedAgent(AgentCommerce):
"""AgentCommerce with built-in spending guardrails."""
def __init__(self, api_key, agent_id, daily_limit=100.00, min_trust=0.6):
super().__init__(api_key, agent_id)
self.daily_limit = daily_limit
self.min_trust = min_trust
self._escrows_today = 0
self.max_escrows_per_day = 20
def guarded_escrow(self, payee, amount, description=""):
"""Create escrow only if all guardrails pass."""
# Guardrail 1: Daily escrow count
if self._escrows_today >= self.max_escrows_per_day:
raise RuntimeError("Daily escrow limit reached")
# Guardrail 2: Budget cap
budget = self.get_budget_status()
remaining = float(budget.get("daily_limit", 0)) - float(
budget.get("spent_today", 0)
)
if amount > remaining:
raise RuntimeError(f"Budget exceeded: need ${amount}, have ${remaining:.2f}")
# Guardrail 3: Trust check
trust = self.get_trust_score(payee)
if trust.get("score", 0) < self.min_trust:
raise RuntimeError(f"Seller trust {trust.get('score')} below {self.min_trust}")
# All checks pass
result = self.create_escrow(payee, amount, description)
self._escrows_today += 1
return result
Chapter 13: Framework Integration Patterns
Pattern for CrewAI: Custom Tool Wrapping GreenHelix Escrow
CrewAI agents use tools defined as Python classes. Wrap the AgentCommerce class as a CrewAI tool so any agent in a crew can hire external agents via escrow.
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
class HireAgentInput(BaseModel):
query: str = Field(description="What service to search for")
max_budget: float = Field(description="Maximum amount to pay")
task_description: str = Field(description="What the hired agent should do")
class HireAgentTool(BaseTool):
name: str = "hire_external_agent"
description: str = (
"Search the GreenHelix marketplace for an agent service, verify trust, "
"and create an escrow-protected contract for the work."
)
args_schema: type[BaseModel] = HireAgentInput
def __init__(self, commerce_client: AgentCommerce):
super().__init__()
self._client = commerce_client
def _run(self, query: str, max_budget: float, task_description: str) -> str:
# Search marketplace
results = self._client.discover_service(query)
services = results.get("services", [])
if not services:
return "No matching services found."
# Pick the best match within budget
for svc in services:
if svc["price"] <= max_budget:
# Trust check
trust = self._client.get_trust_score(svc["agent_id"])
if trust.get("score", 0) < 0.6:
continue
# Create escrow
escrow = self._client.create_escrow(
payee=svc["agent_id"],
amount=svc["price"],
description=task_description,
)
return (
f"Hired {svc['name']} ({svc['agent_id']}) for ${svc['price']}. "
f"Escrow ID: {escrow['escrow_id']}"
)
return f"No trusted service found within ${max_budget} budget."
curl equivalent for the marketplace search step:
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "search_services",
"input": {"query": "document summarization"}
}'
Pattern for LangChain: Agent Tool for Marketplace Discovery
LangChain tools are simpler -- a function with a docstring. Wrap the discovery and escrow flow as a LangChain-compatible tool.
from langchain_core.tools import tool
@tool
def discover_and_hire(query: str, budget: float, task: str) -> str:
"""Search the GreenHelix A2A marketplace for an agent service and hire it
with escrow protection. Returns the escrow ID if successful.
Args:
query: Keywords describing the service needed
budget: Maximum amount in USD to pay
task: Description of the work to be done
"""
client = AgentCommerce(
api_key=os.environ["GREENHELIX_API_KEY"],
agent_id=os.environ["AGENT_ID"],
)
# Discovery
match = client.best_match(query)
if not match or match.get("price", float("inf")) > budget:
return "No affordable service found."
# Trust verification
trust = client.get_trust_score(match["agent_id"])
if trust.get("score", 0) < 0.6:
return f"Best match {match['agent_id']} has low trust ({trust.get('score')})."
# Escrow creation
escrow = client.create_escrow(
payee=match["agent_id"],
amount=match["price"],
description=task,
)
return f"Hired {match['name']} for ${match['price']}. Escrow: {escrow['escrow_id']}"
curl equivalent for best_match:
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $GREENHELIX_API_KEY" \
-d '{
"tool": "best_match",
"input": {"query": "code review python security audit"}
}'
General Pattern: Any Framework with HTTP Access
Any agent framework that can make HTTP requests can use GreenHelix. The entire API is one endpoint.
import requests
def greenhelix_tool(tool_name, input_data, api_key):
"""Generic GreenHelix tool call. Works with any framework."""
response = requests.post(
"https://sandbox.greenhelix.net/v1",
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
},
json={"tool": tool_name, "input": input_data},
)
response.raise_for_status()
return response.json()
# Use from AutoGen, Semantic Kernel, Haystack, or any other framework:
result = greenhelix_tool("search_services", {"query": "translation French"}, API_KEY)
# From any shell-based agent or MCP server:
curl -s -X POST https://sandbox.greenhelix.net/v1 \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $API_KEY" \
-d '{"tool": "search_services", "input": {"query": "translation French"}}'
The integration pattern is always the same: search the marketplace, verify trust, create escrow, wait for delivery, verify output, release or dispute. The AgentCommerce class from Chapter 3 encapsulates this, but any HTTP client works.
Next Steps
For deployment patterns, monitoring, and production hardening, see the Agent Production Hardening Guide.
Chapter 15: Real-World Integration Examples
This chapter presents three complete scenarios that combine discovery, negotiation, escrow, error handling, and monitoring into end-to-end workflows you can adapt for production.
Scenario 1: Research Orchestrator Hiring a Document Analyst
A research orchestrator agent needs 500 legal documents summarized. It discovers services, negotiates price, creates milestone escrow, monitors progress, and handles failures.
import os
import time
def research_orchestrator_workflow(api_key: str):
"""Complete research orchestration workflow with all safety mechanisms."""
# Initialize the buyer with resilience and idempotency
buyer = ResilientCommerce(
api_key=api_key,
agent_id="research-orchestrator-01",
max_retries=3,
)
# Step 1: Discover and rank services
ranker = ServiceRanker(buyer, weights={
"trust": 0.40,
"price": 0.20,
"relevance": 0.25,
"completion_rate": 0.15,
})
candidates = ranker.rank("legal document summarization batch", budget=100.00)
if not candidates:
print("No candidates found. Aborting.")
return
best = candidates[0]
print(f"Best candidate: {best['name']} (${best['price']}, score={best['_score']})")
# Step 2: Negotiate price
negotiator = AdaptiveNegotiator(
commerce_client=buyer,
initial_price=best["price"] * 0.80, # Start 20% below listing
reservation_price=best["price"], # Willing to pay full price
max_rounds=5,
concession_rate=0.6,
)
negotiation = negotiator.negotiate(best["agent_id"])
if negotiation["status"] != "agreed":
print("Negotiation failed. Trying next candidate.")
return
agreed_price = negotiation["price"]
print(f"Agreed price: ${agreed_price:.2f}")
# Step 3: Create milestone escrow
milestones = MilestoneEscrow(buyer)
project = milestones.create(
payee=best["agent_id"],
milestones=[
{
"name": "First 250 documents",
"amount": agreed_price * 0.5,
"criteria": "250 summaries in JSON, schema-validated",
},
{
"name": "Remaining 250 documents",
"amount": agreed_price * 0.5,
"criteria": "250 summaries in JSON, schema-validated",
},
],
)
print(f"Project created: {len(project['milestones'])} milestones")
# Step 4: Monitor and release milestones
for ms in project["milestones"]:
print(f"Waiting for: {ms['milestone']}")
# In production: poll for deliverable, run validation
time.sleep(1) # Placeholder for async delivery
# Verify deliverable (placeholder for actual validation)
deliverable_valid = True
if deliverable_valid:
milestones.release_milestone(ms["escrow_id"])
print(f" Released: ${ms['amount']:.2f}")
else:
buyer.open_dispute(ms["escrow_id"], "Deliverable failed validation")
print(f" Dispute opened on {ms['escrow_id']}")
break
print("Workflow complete.")
research_orchestrator_workflow(os.environ["GREENHELIX_API_KEY"])
Scenario 2: Multi-Agent Translation Pipeline with Saga
A content localization system routes documents through three agents: language detection, translation, and quality review. If any step fails, the saga rolls back all escrows.
def translation_pipeline(api_key: str, document: str, target_language: str):
"""Multi-agent translation with saga-based error recovery."""
buyer = ResilientCommerce(
api_key=api_key,
agent_id="localization-orchestrator",
max_retries=3,
)
# Discover best agents for each stage
discovery = HealthAwareDiscovery(buyer)
stages = [
{"query": "language detection", "step": "Detect source language"},
{"query": f"{target_language} translation certified", "step": "Translate document"},
{"query": "translation quality review", "step": "Quality review"},
]
# Find healthy services for each stage
pipeline_agents = []
for stage in stages:
services = discovery.discover_healthy(stage["query"], max_results=3)
if not services:
print(f"No healthy service for: {stage['step']}. Aborting.")
return
pipeline_agents.append({
"agent_id": services[0]["agent_id"],
"price": services[0]["price"],
"step_name": stage["step"],
"share_pct": 100 // len(stages),
})
# Run the pipeline as a saga
total_budget = sum(a["price"] for a in pipeline_agents)
print(f"Pipeline cost: ${total_budget:.2f} across {len(pipeline_agents)} stages")
saga = SagaOrchestrator()
escrow_ids = {}
for agent_info in pipeline_agents:
aid = agent_info["agent_id"]
price = agent_info["price"]
step = agent_info["step_name"]
def make_action(a_id, p, s):
def action():
escrow = buyer.create_escrow(
payee=a_id, amount=p,
description=f"Translation pipeline: {s}",
)
escrow_ids[a_id] = escrow["escrow_id"]
return escrow
return action
def make_comp(a_id):
def comp():
if a_id in escrow_ids:
buyer.cancel_escrow(escrow_ids[a_id])
return comp
saga.add_step(SagaStep(step, make_action(aid, price, step), make_comp(aid)))
result = saga.execute()
if result["status"] == "success":
print("Pipeline escrows created. Awaiting deliverables.")
# In production: monitor each stage, release on delivery
else:
print(f"Pipeline failed at '{result['failed_step']}'. All escrows rolled back.")
return result
translation_pipeline(
os.environ["GREENHELIX_API_KEY"],
document="Annual report Q1 2026.pdf",
target_language="French",
)
Scenario 3: Automated Procurement with Sealed-Bid Auction
An enterprise procurement agent needs a security audit. It runs a sealed-bid Vickrey auction among pre-qualified auditors, creates escrow with the winner, and monitors the engagement through completion.
def procurement_auction_workflow(api_key: str):
"""Automated procurement via sealed-bid auction."""
buyer = ResilientCommerce(
api_key=api_key,
agent_id="procurement-agent-01",
max_retries=3,
)
# Step 1: Discover qualified auditors
results = buyer.discover_service("security audit Python web application")
qualified = []
for svc in results.get("services", []):
trust = buyer.get_trust_score(svc["agent_id"])
if trust.get("score", 0) >= 0.7:
qualified.append(svc)
if len(qualified) < 2:
print("Not enough qualified auditors for an auction. Need at least 2.")
return
print(f"Qualified auditors: {len(qualified)}")
# Step 2: Run Vickrey auction
auction = SealedBidAuction(
buyer_client=buyer,
task_description="Full security audit of Python web app (8000 LOC)",
max_budget=500.00,
auction_type="vickrey",
)
auction.invite_sellers([s["agent_id"] for s in qualified])
# Collect bids (in production, poll for responses)
for svc in qualified:
# Each seller bids their listed price minus a strategic discount
bid = svc["price"] * 0.95
auction.receive_bid(svc["agent_id"], bid)
result = auction.resolve()
print(f"Auction winner: {result['winner']}")
print(f"Winning bid: ${result['winning_bid']:.2f}")
print(f"Payment (Vickrey): ${result['payment']:.2f}")
# Step 3: Create time-locked escrow for the winner
timelock = TimeLockEscrow(buyer)
escrow = timelock.create(
payee=result["winner"],
amount=result["payment"],
description="Security audit of Python web app. Deliverable: PDF report.",
deadline_hours=168, # 7 days
)
print(f"Escrow {escrow['escrow_id']} with 7-day deadline")
# Step 4: Monitor for delivery or expiry
# In production: poll periodically
status = timelock.check_and_refund_expired(
escrow["escrow_id"], escrow["deadline"],
)
print(f"Escrow status: {status['action']}")
return result
procurement_auction_workflow(os.environ["GREENHELIX_API_KEY"])
What's Next
This guide covered the complete lifecycle of agent-to-agent commerce: identity, wallets, marketplace discovery, advanced service ranking and health-aware failover, price negotiation via Dutch auctions, sealed-bid Vickrey auctions, and adaptive bilateral strategies, six escrow patterns (standard, performance, split, time-locked, milestone-based, and arbitrated), subscriptions, dispute resolution, error handling with retry, compensation, and saga patterns, security guardrails, framework integration, production deployment with Docker Compose, systemd, health checks, and Prometheus metrics, and three complete real-world integration scenarios. Each pattern uses the same AgentCommerce class and the same the REST API (POST /v1/{tool}) endpoint.
For deeper coverage of specific domains, see the companion guides:
- The Agent Strategy Marketplace Playbook -- how to sell verified trading strategies with performance escrow, including detailed metric submission pipelines and pricing tiers for different strategy risk profiles.
- Verified Trading Bot Reputation -- how to build cryptographic PnL proof using Ed25519 signatures and Merkle claim chains, including the trust score algorithm and leaderboard mechanics.
- Tamper-Proof Audit Trails for Trading Bots -- how to meet EU AI Act, MiFID II, and SEC compliance requirements using GreenHelix's event bus and Merkle audit chains.
For the full API reference and tool catalog (all 128 tools), visit the GreenHelix developer documentation at https://api.greenhelix.net/docs.
Price: $29 | Format: Digital Guide | Updates: Lifetime access