From ced3e52664cea246031a28371370293d97d04b24 Mon Sep 17 00:00:00 2001 From: rickydata-indexer Date: Mon, 3 Nov 2025 13:12:17 +0000 Subject: [PATCH] support execution for simple agents --- agent0_sdk/core/agent_client.py | 204 +++++++++++++++ agent0_sdk/core/mcp_client.py | 253 ++++++++++++++++++ agent0_sdk/core/x402_client.py | 297 +++++++++++++++++++++ tests/test_agent_client.py | 408 +++++++++++++++++++++++++++++ tests/test_mcp_client.py | 439 ++++++++++++++++++++++++++++++++ tests/test_x402_client.py | 268 +++++++++++++++++++ 6 files changed, 1869 insertions(+) create mode 100644 agent0_sdk/core/agent_client.py create mode 100644 agent0_sdk/core/mcp_client.py create mode 100644 agent0_sdk/core/x402_client.py create mode 100644 tests/test_agent_client.py create mode 100644 tests/test_mcp_client.py create mode 100644 tests/test_x402_client.py diff --git a/agent0_sdk/core/agent_client.py b/agent0_sdk/core/agent_client.py new file mode 100644 index 0000000..83a4137 --- /dev/null +++ b/agent0_sdk/core/agent_client.py @@ -0,0 +1,204 @@ +""" +Simple agent execution client for HTTP/REST-based agents. +Works with standard HTTP+JSON agents. +""" + +import logging +import json +from typing import Any, Dict, Optional +import requests + +logger = logging.getLogger(__name__) + +try: + from .x402_client import X402Client +except ImportError: + X402Client = None + + +class AgentClient: + """Simple client for calling HTTP/REST-based agents.""" + + def __init__( + self, + agent_card_url: str, + timeout: int = 30, + x402_client: Optional[Any] = None + ): + """ + Initialize agent client from agent card URL. + + Args: + agent_card_url: URL to the agent's card/manifest + timeout: Request timeout in seconds + x402_client: Optional X402Client for micropayments + """ + self.agent_card_url = agent_card_url + self.timeout = timeout + self.agent_card = None + self.endpoint_url = None + self.x402_client = x402_client + self.x402_config = None + + # Load agent card + self._load_agent_card() + + def _load_agent_card(self): + """Load and parse the agent card.""" + try: + response = requests.get(self.agent_card_url, timeout=10) + response.raise_for_status() + self.agent_card = response.json() + + # Extract endpoint URL + self.endpoint_url = self.agent_card.get('url') + if not self.endpoint_url: + raise ValueError("No 'url' field in agent card") + + # Check for x402 extension + capabilities = self.agent_card.get('capabilities', {}) + extensions = capabilities.get('extensions', []) + for ext in extensions: + if 'x402' in ext.get('uri', ''): + self.x402_config = ext.get('params', {}) + logger.info(f"x402 payments enabled: ${self.x402_config.get('price_usdc')} USDC per request") + + logger.info(f"Loaded agent: {self.agent_card.get('name')}") + logger.info(f"Endpoint: {self.endpoint_url}") + + except Exception as e: + raise Exception(f"Failed to load agent card: {e}") + + def get_info(self) -> Dict[str, Any]: + """ + Get agent information. + + Returns: + Agent card data + """ + return self.agent_card + + def get_skills(self) -> list: + """ + Get list of agent skills. + + Returns: + List of skill definitions + """ + return self.agent_card.get('skills', []) + + def call( + self, + message: str, + skill: Optional[str] = None, + context: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """ + Call the agent with a message. + + Args: + message: Message/query to send to the agent + skill: Optional specific skill to invoke + context: Optional context/parameters + + Returns: + Agent response + """ + # Build request payload + payload = { + "message": message + } + + if skill: + payload["skill"] = skill + + if context: + payload.update(context) + + # Check if x402 payment is required + if self.x402_config and self.x402_client: + logger.info("Using x402 payment gateway") + return self._call_with_x402_payment(payload) + + # Determine transport + transport = self.agent_card.get('preferredTransport', 'HTTP+JSON') + + if transport == 'HTTP+JSON': + return self._call_http_json(payload) + else: + raise ValueError(f"Unsupported transport: {transport}") + + def _call_with_x402_payment(self, payload: Dict[str, Any]) -> Dict[str, Any]: + """ + Call agent with x402 payment. + + Args: + payload: Request payload + + Returns: + Agent response + """ + gateway_url = self.x402_config.get('gateway_url') + price_usdc = float(self.x402_config.get('price_usdc', 0)) + + if not gateway_url: + raise Exception("x402 gateway URL not found in agent card") + + # Extract message from payload + message = payload.get('message', '') + + return self.x402_client.process_payment( + gateway_url=gateway_url, + message=message, + price_usdc=price_usdc + ) + + def _call_http_json(self, payload: Dict[str, Any]) -> Dict[str, Any]: + """ + Call agent using HTTP+JSON transport. + + Args: + payload: Request payload + + Returns: + Response data + """ + try: + headers = {'Content-Type': 'application/json'} + + logger.debug(f"Calling {self.endpoint_url}") + + response = requests.post( + self.endpoint_url, + json=payload, + headers=headers, + timeout=self.timeout + ) + + response.raise_for_status() + + # Try to parse as JSON + try: + return response.json() + except json.JSONDecodeError: + # Return text response if not JSON + logger.debug("Non-JSON response received, returning as text") + return {"response": response.text} + + except requests.exceptions.RequestException as e: + logger.error(f"Agent call failed: {e}") + raise Exception(f"Failed to call agent: {e}") + + def health_check(self) -> bool: + """ + Check if agent is responsive. + + Returns: + True if agent is healthy + """ + try: + # Try a simple call + self.call("health check") + return True + except Exception: + return False diff --git a/agent0_sdk/core/mcp_client.py b/agent0_sdk/core/mcp_client.py new file mode 100644 index 0000000..246c585 --- /dev/null +++ b/agent0_sdk/core/mcp_client.py @@ -0,0 +1,253 @@ +""" +MCP (Model Context Protocol) Client for agent execution. +Implements JSON-RPC over HTTP for calling MCP agent endpoints. +""" + +import logging +import json +from typing import Any, Dict, List, Optional, Union +import requests +from datetime import datetime + +logger = logging.getLogger(__name__) + + +class MCPClient: + """Client for interacting with MCP (Model Context Protocol) endpoints.""" + + def __init__(self, endpoint_url: str, timeout: int = 30): + """ + Initialize MCP client. + + Args: + endpoint_url: The MCP endpoint URL (must be http:// or https://) + timeout: Request timeout in seconds (default: 30) + """ + if not endpoint_url.startswith(('http://', 'https://')): + raise ValueError(f"MCP endpoint must be HTTP/HTTPS, got: {endpoint_url}") + + self.endpoint_url = endpoint_url.rstrip('/') + self.timeout = timeout + self._request_id = 0 + + def _get_next_request_id(self) -> int: + """Get next request ID for JSON-RPC calls.""" + self._request_id += 1 + return self._request_id + + def _jsonrpc_call( + self, + method: str, + params: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """ + Make a JSON-RPC 2.0 call to the MCP endpoint. + + Args: + method: JSON-RPC method name + params: Optional parameters for the method + + Returns: + JSON-RPC result + + Raises: + Exception: If the call fails or returns an error + """ + payload = { + "jsonrpc": "2.0", + "method": method, + "id": self._get_next_request_id() + } + + if params: + payload["params"] = params + + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json, text/event-stream' + } + + logger.debug(f"MCP call: {method} to {self.endpoint_url}") + + try: + response = requests.post( + self.endpoint_url, + json=payload, + headers=headers, + timeout=self.timeout, + stream=True + ) + + response.raise_for_status() + + # Check if response is SSE format + content_type = response.headers.get('content-type', '') + if 'text/event-stream' in content_type: + result = self._parse_sse_response(response.text) + else: + result = response.json() + + # Check for JSON-RPC error + if 'error' in result: + error = result['error'] + raise Exception(f"MCP error: {error.get('message', error)}") + + # Return the result + return result.get('result', result) + + except requests.exceptions.RequestException as e: + logger.error(f"MCP request failed: {e}") + raise Exception(f"MCP request failed: {e}") + except json.JSONDecodeError as e: + logger.error(f"Invalid JSON in MCP response: {e}") + raise Exception(f"Invalid JSON response from MCP endpoint: {e}") from e + + def _parse_sse_response(self, sse_text: str) -> Dict[str, Any]: + """Parse Server-Sent Events (SSE) format response.""" + try: + for line in sse_text.split('\n'): + if line.startswith('data: '): + json_str = line[6:] + data = json.loads(json_str) + return data + except Exception as e: + logger.debug(f"Failed to parse SSE response: {e}") + + # Fallback: try to parse as regular JSON + try: + return json.loads(sse_text) + except: + raise Exception(f"Could not parse MCP response: {sse_text[:200]}") + + # Tools methods + def list_tools(self) -> List[Dict[str, Any]]: + """ + List all available tools. + + Returns: + List of tool definitions with name, description, and input schema + """ + result = self._jsonrpc_call("tools/list") + return result.get('tools', []) + + def call_tool( + self, + name: str, + arguments: Optional[Dict[str, Any]] = None + ) -> Any: + """ + Call a specific tool. + + Args: + name: Tool name + arguments: Tool arguments/parameters + + Returns: + Tool execution result + """ + params = {"name": name} + if arguments: + params["arguments"] = arguments + + result = self._jsonrpc_call("tools/call", params) + return result.get('content', result) + + # Prompts methods + def list_prompts(self) -> List[Dict[str, Any]]: + """ + List all available prompts. + + Returns: + List of prompt definitions + """ + result = self._jsonrpc_call("prompts/list") + return result.get('prompts', []) + + def get_prompt( + self, + name: str, + arguments: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """ + Get a specific prompt. + + Args: + name: Prompt name + arguments: Prompt arguments + + Returns: + Prompt content + """ + params = {"name": name} + if arguments: + params["arguments"] = arguments + + return self._jsonrpc_call("prompts/get", params) + + # Resources methods + def list_resources(self) -> List[Dict[str, Any]]: + """ + List all available resources. + + Returns: + List of resource definitions + """ + result = self._jsonrpc_call("resources/list") + return result.get('resources', []) + + def read_resource(self, uri: str) -> Dict[str, Any]: + """ + Read a specific resource. + + Args: + uri: Resource URI + + Returns: + Resource content + """ + params = {"uri": uri} + return self._jsonrpc_call("resources/read", params) + + # Convenience methods + def get_capabilities(self) -> Dict[str, Any]: + """ + Get all agent capabilities (tools, prompts, resources). + + Returns: + Dict with 'tools', 'prompts', 'resources' lists + """ + capabilities = { + 'tools': [], + 'prompts': [], + 'resources': [] + } + + try: + capabilities['tools'] = self.list_tools() + except Exception as e: + logger.warning(f"Could not fetch tools: {e}") + + try: + capabilities['prompts'] = self.list_prompts() + except Exception as e: + logger.warning(f"Could not fetch prompts: {e}") + + try: + capabilities['resources'] = self.list_resources() + except Exception as e: + logger.warning(f"Could not fetch resources: {e}") + + return capabilities + + def health_check(self) -> bool: + """ + Check if the MCP endpoint is responsive. + + Returns: + True if endpoint is healthy, False otherwise + """ + try: + self.list_tools() + return True + except: + return False diff --git a/agent0_sdk/core/x402_client.py b/agent0_sdk/core/x402_client.py new file mode 100644 index 0000000..d062f55 --- /dev/null +++ b/agent0_sdk/core/x402_client.py @@ -0,0 +1,297 @@ +""" +x402 micropayment protocol client for agent payments. +Handles USDC payments for agent requests. +""" + +import logging +from typing import Optional, Dict, Any +from web3 import Web3 +from eth_account import Account +import requests + +logger = logging.getLogger(__name__) + +# USDC contract addresses +USDC_SEPOLIA = "0x1c7D4B196Cb0C7B01d743Fbc6116a902379C7238" # Ethereum Sepolia +USDC_BASE = "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" # Base mainnet +USDC_BASE_SEPOLIA = "0x036CbD53842c5426634e7929541eC2318f3dCF7e" # Base Sepolia + +# Minimal ERC20 ABI for approve and transfer +ERC20_ABI = [ + { + "constant": False, + "inputs": [ + {"name": "_spender", "type": "address"}, + {"name": "_value", "type": "uint256"} + ], + "name": "approve", + "outputs": [{"name": "", "type": "bool"}], + "type": "function" + }, + { + "constant": True, + "inputs": [{"name": "_owner", "type": "address"}], + "name": "balanceOf", + "outputs": [{"name": "balance", "type": "uint256"}], + "type": "function" + }, + { + "constant": True, + "inputs": [], + "name": "decimals", + "outputs": [{"name": "", "type": "uint8"}], + "type": "function" + } +] + + +class X402Client: + """Client for x402 micropayment protocol.""" + + def __init__( + self, + rpc_url: str, + private_key: str, + chain_id: int = 11155111 + ): + """ + Initialize x402 payment client. + + Args: + rpc_url: Ethereum RPC URL + private_key: Private key for signing transactions + chain_id: Chain ID (default: Sepolia 11155111) + """ + self.w3 = Web3(Web3.HTTPProvider(rpc_url)) + self.chain_id = chain_id + + # Setup account + if private_key.startswith('0x'): + private_key = private_key[2:] + self.account = Account.from_key(private_key) + self.address = self.account.address + + # Setup USDC contract based on chain + if chain_id == 8453: # Base mainnet + self.usdc_address = Web3.to_checksum_address(USDC_BASE) + elif chain_id == 84532: # Base Sepolia + self.usdc_address = Web3.to_checksum_address(USDC_BASE_SEPOLIA) + elif chain_id == 11155111: # Ethereum Sepolia + self.usdc_address = Web3.to_checksum_address(USDC_SEPOLIA) + else: + raise ValueError(f"Unsupported chain ID: {chain_id}. Supported: 8453 (Base), 84532 (Base Sepolia), 11155111 (Ethereum Sepolia)") + + self.usdc = self.w3.eth.contract( + address=self.usdc_address, + abi=ERC20_ABI + ) + + logger.info(f"X402 client initialized for {self.address} on chain {chain_id}") + + def get_balance(self) -> float: + """ + Get USDC balance. + + Returns: + USDC balance in human-readable format + """ + balance = self.usdc.functions.balanceOf(self.address).call() + decimals = self.usdc.functions.decimals().call() + return balance / (10 ** decimals) + + def process_payment( + self, + gateway_url: str, + message: str, + price_usdc: float + ) -> Dict[str, Any]: + """ + Process x402 payment through gateway using 402 challenge-response flow. + + Args: + gateway_url: x402 gateway URL + message: The message/query to send to the agent + price_usdc: Price in USDC + + Returns: + Agent response + """ + logger.info(f"Processing x402 payment: ${price_usdc} USDC") + + # Check balance + balance = self.get_balance() + if balance < price_usdc: + raise Exception(f"Insufficient USDC balance: {balance} < {price_usdc}") + + # Format request according to x402 spec + request_body = { + "message": { + "role": "user", + "parts": [{"kind": "text", "text": message}] + } + } + + try: + # Step 1: Send initial request (will get Payment Required in metadata) + logger.debug(f"Sending initial request to {gateway_url}") + response = requests.post( + gateway_url, + json=request_body, + headers={'Content-Type': 'application/json'}, + timeout=30 + ) + + # Step 2: Handle Payment Required (A2A style in message metadata) + # Don't raise for 402 - it's expected in x402 protocol + if response.status_code not in [200, 402]: + response.raise_for_status() + + data = response.json() + + # Extract payment details from metadata + payment_required = None + task_id = None + context_id = None + + if 'task' in data: + task = data['task'] + task_id = task.get('id') + context_id = task.get('contextId') + + if 'status' in task and 'message' in task['status']: + metadata = task['status']['message'].get('metadata', {}) + payment_required = metadata.get('x402.payment.required') + + if payment_required and 'accepts' in payment_required: + logger.debug("Payment required, processing EIP-3009 authorization...") + + # Get the payment option (first one) + payment_option = payment_required['accepts'][0] + + logger.info(f"Payment required: {payment_option.get('maxAmountRequired')} on {payment_option.get('network')}") + + # Sign the payment using EIP-3009 + payment_payload = self._sign_payment_option(payment_option, message) + + # Step 3: Resubmit with payment in message metadata (A2A protocol) + import time + paid_message = { + "messageId": f"msg-{int(time.time() * 1000)}", + "role": "user", + "parts": [{"kind": "text", "text": message}], + "metadata": { + "x402.payment.payload": payment_payload, + "x402.payment.status": "payment-submitted" + } + } + + paid_request = { + "message": paid_message + } + + # Include task/context IDs if available + if task_id: + paid_request["taskId"] = task_id + if context_id: + paid_request["contextId"] = context_id + + logger.debug("Resubmitting with EIP-3009 payment authorization...") + response = requests.post( + gateway_url, + json=paid_request, + headers={'Content-Type': 'application/json'}, + timeout=60 + ) + + # Allow 200 or 402 (both are valid in x402 protocol) + if response.status_code not in [200, 402]: + response.raise_for_status() + + return response.json() + + except requests.exceptions.RequestException as e: + logger.error(f"x402 gateway request failed: {e}") + raise Exception(f"x402 gateway request failed: {e}") from e + except Exception as e: + logger.error(f"x402 payment failed: {e}") + raise Exception(f"Payment processing failed: {e}") from e + + def _sign_payment_option(self, payment_option: Dict[str, Any], message: str) -> Dict[str, Any]: + """ + Sign payment using EIP-3009 transferWithAuthorization for x402 protocol. + + Args: + payment_option: Payment option from accepts array + message: Original message being paid for + + Returns: + Payment object with EIP-3009 signature + """ + from eth_account.messages import encode_typed_data + import secrets + import time + + # Generate nonce (32 random bytes) + nonce = "0x" + secrets.token_hex(32) + + # Calculate validity period + now = int(time.time()) + valid_after = 0 + valid_before = now + payment_option.get('maxTimeoutSeconds', 600) + + # Create EIP-3009 authorization + authorization = { + "from": self.address, + "to": payment_option.get('payTo'), + "value": str(payment_option.get('maxAmountRequired')), + "validAfter": str(valid_after), + "validBefore": str(valid_before), + "nonce": nonce + } + + # Create EIP-712 domain (USDC contract) + domain = { + "name": payment_option.get('extra', {}).get('name', 'USD Coin'), + "version": payment_option.get('extra', {}).get('version', '2'), + "chainId": self.chain_id, + "verifyingContract": payment_option.get('asset') + } + + # EIP-3009 TransferWithAuthorization types + types = { + "TransferWithAuthorization": [ + {"name": "from", "type": "address"}, + {"name": "to", "type": "address"}, + {"name": "value", "type": "uint256"}, + {"name": "validAfter", "type": "uint256"}, + {"name": "validBefore", "type": "uint256"}, + {"name": "nonce", "type": "bytes32"} + ] + } + + # Sign the typed data + typed_data = encode_typed_data( + domain_data=domain, + message_types=types, + message_data=authorization + ) + + signed = self.account.sign_message(typed_data) + + # Ensure signature has 0x prefix (compatible with ethers.js) + signature = signed.signature.hex() + if not signature.startswith("0x"): + signature = "0x" + signature + + logger.debug(f"Created EIP-3009 authorization with nonce {nonce}") + + # Return x402 payment payload + return { + "x402Version": 1, + "scheme": payment_option.get('scheme'), + "network": payment_option.get('network'), + "payload": { + "signature": signature, + "authorization": authorization + } + } diff --git a/tests/test_agent_client.py b/tests/test_agent_client.py new file mode 100644 index 0000000..d1f747c --- /dev/null +++ b/tests/test_agent_client.py @@ -0,0 +1,408 @@ +""" +Tests for AgentClient - HTTP/REST-based agent execution client. +""" + +import pytest +from unittest.mock import Mock, patch, MagicMock +import json + +from agent0_sdk.core.agent_client import AgentClient + + +class TestAgentClientInitialization: + """Test AgentClient initialization and agent card loading.""" + + @patch('requests.get') + def test_init_loads_agent_card(self, mock_get): + """Test that initialization loads agent card from URL.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "name": "Test Agent", + "url": "http://localhost:3000", + "preferredTransport": "HTTP+JSON", + "capabilities": {} + } + mock_get.return_value = mock_response + + client = AgentClient(agent_card_url="http://example.com/agent.json") + + assert client.agent_card["name"] == "Test Agent" + assert client.endpoint_url == "http://localhost:3000" + mock_get.assert_called_once_with("http://example.com/agent.json", timeout=10) + + @patch('requests.get') + def test_init_detects_x402_extension(self, mock_get): + """Test that x402 extension is detected from agent card.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "name": "Paid Agent", + "url": "http://localhost:3000", + "capabilities": { + "extensions": [ + { + "uri": "https://agent0.network/extensions/x402", + "params": { + "gateway_url": "http://localhost:3000/process", + "price_usdc": "0.05" + } + } + ] + } + } + mock_get.return_value = mock_response + + client = AgentClient(agent_card_url="http://example.com/agent.json") + + assert client.x402_config is not None + assert client.x402_config["gateway_url"] == "http://localhost:3000/process" + assert client.x402_config["price_usdc"] == "0.05" + + @patch('requests.get') + def test_init_missing_url_raises_error(self, mock_get): + """Test that missing URL in agent card raises error.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "name": "Test Agent" + # Missing 'url' field + } + mock_get.return_value = mock_response + + with pytest.raises(Exception, match="No 'url' field in agent card|Failed to load agent card"): + AgentClient(agent_card_url="http://example.com/agent.json") + + @patch('requests.get') + def test_init_network_error_raises_exception(self, mock_get): + """Test that network errors during card loading raise exception.""" + import requests + mock_get.side_effect = requests.exceptions.ConnectionError("Network error") + + with pytest.raises(Exception, match="Failed to load agent card"): + AgentClient(agent_card_url="http://example.com/agent.json") + + @patch('requests.get') + def test_init_with_custom_timeout(self, mock_get): + """Test initialization with custom timeout.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "name": "Test Agent", + "url": "http://localhost:3000" + } + mock_get.return_value = mock_response + + client = AgentClient( + agent_card_url="http://example.com/agent.json", + timeout=60 + ) + + assert client.timeout == 60 + + +class TestAgentClientInfo: + """Test agent information retrieval methods.""" + + @patch('requests.get') + def test_get_info(self, mock_get): + """Test get_info returns agent card data.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "name": "Test Agent", + "url": "http://localhost:3000", + "description": "A test agent" + } + mock_get.return_value = mock_response + + client = AgentClient(agent_card_url="http://example.com/agent.json") + info = client.get_info() + + assert info["name"] == "Test Agent" + assert info["description"] == "A test agent" + + @patch('requests.get') + def test_get_skills_with_skills(self, mock_get): + """Test get_skills returns skill list.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "name": "Test Agent", + "url": "http://localhost:3000", + "skills": [ + {"name": "search", "description": "Search capability"}, + {"name": "analyze", "description": "Analysis capability"} + ] + } + mock_get.return_value = mock_response + + client = AgentClient(agent_card_url="http://example.com/agent.json") + skills = client.get_skills() + + assert len(skills) == 2 + assert skills[0]["name"] == "search" + assert skills[1]["name"] == "analyze" + + @patch('requests.get') + def test_get_skills_no_skills(self, mock_get): + """Test get_skills returns empty list when no skills defined.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "name": "Test Agent", + "url": "http://localhost:3000" + } + mock_get.return_value = mock_response + + client = AgentClient(agent_card_url="http://example.com/agent.json") + skills = client.get_skills() + + assert skills == [] + + +class TestAgentClientHTTPCalls: + """Test HTTP+JSON agent calling.""" + + @patch('requests.get') + @patch('requests.post') + def test_call_http_json_basic(self, mock_post, mock_get): + """Test basic HTTP+JSON call.""" + # Setup agent card + mock_get_response = Mock() + mock_get_response.status_code = 200 + mock_get_response.json.return_value = { + "name": "Test Agent", + "url": "http://localhost:3000", + "preferredTransport": "HTTP+JSON" + } + mock_get.return_value = mock_get_response + + # Setup agent response + mock_post_response = Mock() + mock_post_response.status_code = 200 + mock_post_response.json.return_value = { + "response": "Test response" + } + mock_post.return_value = mock_post_response + + client = AgentClient(agent_card_url="http://example.com/agent.json") + result = client.call("Test message") + + assert result["response"] == "Test response" + mock_post.assert_called_once() + + # Verify request payload + call_args = mock_post.call_args + assert call_args[0][0] == "http://localhost:3000" + assert call_args[1]["json"]["message"] == "Test message" + + @patch('requests.get') + @patch('requests.post') + def test_call_with_skill(self, mock_post, mock_get): + """Test call with specific skill.""" + mock_get_response = Mock() + mock_get_response.json.return_value = { + "name": "Test Agent", + "url": "http://localhost:3000" + } + mock_get.return_value = mock_get_response + + mock_post_response = Mock() + mock_post_response.status_code = 200 + mock_post_response.json.return_value = {"response": "Skill response"} + mock_post.return_value = mock_post_response + + client = AgentClient(agent_card_url="http://example.com/agent.json") + result = client.call("Test message", skill="search") + + # Verify skill was included in payload + call_args = mock_post.call_args + assert call_args[1]["json"]["skill"] == "search" + + @patch('requests.get') + @patch('requests.post') + def test_call_with_context(self, mock_post, mock_get): + """Test call with additional context.""" + mock_get_response = Mock() + mock_get_response.json.return_value = { + "name": "Test Agent", + "url": "http://localhost:3000" + } + mock_get.return_value = mock_get_response + + mock_post_response = Mock() + mock_post_response.status_code = 200 + mock_post_response.json.return_value = {"response": "Context response"} + mock_post.return_value = mock_post_response + + client = AgentClient(agent_card_url="http://example.com/agent.json") + context = {"user_id": "123", "session": "abc"} + result = client.call("Test message", context=context) + + # Verify context was merged into payload + call_args = mock_post.call_args + payload = call_args[1]["json"] + assert payload["user_id"] == "123" + assert payload["session"] == "abc" + + @patch('requests.get') + @patch('requests.post') + def test_call_network_error(self, mock_post, mock_get): + """Test handling of network errors during call.""" + import requests + + mock_get_response = Mock() + mock_get_response.json.return_value = { + "name": "Test Agent", + "url": "http://localhost:3000" + } + mock_get.return_value = mock_get_response + + mock_post.side_effect = requests.exceptions.ConnectionError("Network error") + + client = AgentClient(agent_card_url="http://example.com/agent.json") + + with pytest.raises(Exception, match="Failed to call agent"): + client.call("Test message") + + @patch('requests.get') + @patch('requests.post') + def test_call_unsupported_transport(self, mock_post, mock_get): + """Test error on unsupported transport.""" + mock_get_response = Mock() + mock_get_response.json.return_value = { + "name": "Test Agent", + "url": "http://localhost:3000", + "preferredTransport": "GRPC" # Unsupported + } + mock_get.return_value = mock_get_response + + client = AgentClient(agent_card_url="http://example.com/agent.json") + + with pytest.raises(ValueError, match="Unsupported transport"): + client.call("Test message") + + +class TestAgentClientX402Integration: + """Test x402 payment integration.""" + + @patch('requests.get') + def test_call_with_x402_client(self, mock_get): + """Test that call routes to x402 when x402 client is provided.""" + mock_get_response = Mock() + mock_get_response.json.return_value = { + "name": "Paid Agent", + "url": "http://localhost:3000", + "capabilities": { + "extensions": [ + { + "uri": "https://agent0.network/extensions/x402", + "params": { + "gateway_url": "http://localhost:3000/process", + "price_usdc": "0.05" + } + } + ] + } + } + mock_get.return_value = mock_get_response + + # Mock x402 client + mock_x402_client = Mock() + mock_x402_client.process_payment.return_value = { + "success": True, + "task": {"status": {"message": {"parts": [{"kind": "text", "text": "Paid response"}]}}} + } + + client = AgentClient( + agent_card_url="http://example.com/agent.json", + x402_client=mock_x402_client + ) + + result = client.call("Test message") + + # Verify x402 client was called + mock_x402_client.process_payment.assert_called_once() + call_args = mock_x402_client.process_payment.call_args + assert call_args[1]["gateway_url"] == "http://localhost:3000/process" + assert call_args[1]["message"] == "Test message" + + @patch('requests.get') + def test_call_without_x402_client_uses_http(self, mock_get): + """Test that call uses HTTP when x402 config exists but no client.""" + mock_get_response = Mock() + mock_get_response.json.return_value = { + "name": "Paid Agent", + "url": "http://localhost:3000", + "capabilities": { + "extensions": [ + { + "uri": "https://agent0.network/extensions/x402", + "params": {"gateway_url": "http://localhost:3000/process"} + } + ] + } + } + mock_get.return_value = mock_get_response + + client = AgentClient( + agent_card_url="http://example.com/agent.json" + # No x402_client provided + ) + + with patch('requests.post') as mock_post: + mock_post_response = Mock() + mock_post_response.status_code = 200 + mock_post_response.json.return_value = {"response": "HTTP response"} + mock_post.return_value = mock_post_response + + result = client.call("Test message") + + # Should fall back to HTTP + mock_post.assert_called_once() + + +class TestAgentClientHealthCheck: + """Test agent health checking.""" + + @patch('requests.get') + @patch('requests.post') + def test_health_check_success(self, mock_post, mock_get): + """Test successful health check.""" + mock_get_response = Mock() + mock_get_response.json.return_value = { + "name": "Test Agent", + "url": "http://localhost:3000" + } + mock_get.return_value = mock_get_response + + mock_post_response = Mock() + mock_post_response.status_code = 200 + mock_post_response.json.return_value = {"status": "healthy"} + mock_post.return_value = mock_post_response + + client = AgentClient(agent_card_url="http://example.com/agent.json") + result = client.health_check() + + assert result is True + + @patch('requests.get') + @patch('requests.post') + def test_health_check_failure(self, mock_post, mock_get): + """Test failed health check.""" + import requests + + mock_get_response = Mock() + mock_get_response.json.return_value = { + "name": "Test Agent", + "url": "http://localhost:3000" + } + mock_get.return_value = mock_get_response + + mock_post.side_effect = requests.exceptions.ConnectionError("Connection failed") + + client = AgentClient(agent_card_url="http://example.com/agent.json") + result = client.health_check() + + assert result is False diff --git a/tests/test_mcp_client.py b/tests/test_mcp_client.py new file mode 100644 index 0000000..ea8dc8a --- /dev/null +++ b/tests/test_mcp_client.py @@ -0,0 +1,439 @@ +""" +Tests for MCPClient - Model Context Protocol (MCP) client. +""" + +import pytest +from unittest.mock import Mock, patch +import json + +from agent0_sdk.core.mcp_client import MCPClient + + +class TestMCPClientInitialization: + """Test MCPClient initialization.""" + + def test_init_http_endpoint(self): + """Test initialization with HTTP endpoint.""" + client = MCPClient(endpoint_url="http://localhost:3000") + + assert client.endpoint_url == "http://localhost:3000" + assert client.timeout == 30 + + def test_init_https_endpoint(self): + """Test initialization with HTTPS endpoint.""" + client = MCPClient(endpoint_url="https://example.com/mcp") + + assert client.endpoint_url == "https://example.com/mcp" + + def test_init_strips_trailing_slash(self): + """Test that trailing slash is removed.""" + client = MCPClient(endpoint_url="http://localhost:3000/") + + assert client.endpoint_url == "http://localhost:3000" + + def test_init_custom_timeout(self): + """Test initialization with custom timeout.""" + client = MCPClient(endpoint_url="http://localhost:3000", timeout=60) + + assert client.timeout == 60 + + def test_init_invalid_protocol_raises_error(self): + """Test that non-HTTP protocols raise error.""" + with pytest.raises(Exception, match="MCP endpoint must be HTTP/HTTPS"): + MCPClient(endpoint_url="ws://localhost:3000") + + with pytest.raises(ValueError, match="MCP endpoint must be HTTP/HTTPS"): + MCPClient(endpoint_url="file:///path/to/file") + + +class TestMCPRequestIdManagement: + """Test JSON-RPC request ID management.""" + + def test_request_id_increments(self): + """Test that request IDs increment.""" + client = MCPClient(endpoint_url="http://localhost:3000") + + id1 = client._get_next_request_id() + id2 = client._get_next_request_id() + id3 = client._get_next_request_id() + + assert id2 == id1 + 1 + assert id3 == id2 + 1 + + def test_request_id_starts_at_one(self): + """Test that first request ID is 1.""" + client = MCPClient(endpoint_url="http://localhost:3000") + + first_id = client._get_next_request_id() + + assert first_id == 1 + + +class TestMCPJSONRPCCalls: + """Test JSON-RPC 2.0 call mechanism.""" + + @patch('requests.post') + def test_jsonrpc_call_basic(self, mock_post): + """Test basic JSON-RPC call.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {'content-type': 'application/json'} + mock_response.json.return_value = { + "jsonrpc": "2.0", + "id": 1, + "result": {"data": "test"} + } + mock_post.return_value = mock_response + + client = MCPClient(endpoint_url="http://localhost:3000") + result = client._jsonrpc_call("test_method") + + assert result["data"] == "test" + + # Verify request structure + call_args = mock_post.call_args + payload = call_args[1]["json"] + assert payload["jsonrpc"] == "2.0" + assert payload["method"] == "test_method" + assert payload["id"] == 1 + + @patch('requests.post') + def test_jsonrpc_call_with_params(self, mock_post): + """Test JSON-RPC call with parameters.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {'content-type': 'application/json'} + mock_response.json.return_value = { + "jsonrpc": "2.0", + "id": 1, + "result": {"success": True} + } + mock_post.return_value = mock_response + + client = MCPClient(endpoint_url="http://localhost:3000") + params = {"arg1": "value1", "arg2": 42} + result = client._jsonrpc_call("test_method", params) + + # Verify params were included + call_args = mock_post.call_args + payload = call_args[1]["json"] + assert payload["params"] == params + + @patch('requests.post') + def test_jsonrpc_call_error_response(self, mock_post): + """Test handling of JSON-RPC error response.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {'content-type': 'application/json'} + mock_response.json.return_value = { + "jsonrpc": "2.0", + "id": 1, + "error": { + "code": -32601, + "message": "Method not found" + } + } + mock_post.return_value = mock_response + + client = MCPClient(endpoint_url="http://localhost:3000") + + with pytest.raises(Exception, match="Method not found"): + client._jsonrpc_call("unknown_method") + + @patch('requests.post') + def test_jsonrpc_call_network_error(self, mock_post): + """Test handling of network errors.""" + import requests + mock_post.side_effect = requests.exceptions.ConnectionError("Network error") + + client = MCPClient(endpoint_url="http://localhost:3000") + + with pytest.raises(Exception, match="MCP request failed"): + client._jsonrpc_call("test_method") + + + + +class TestMCPToolMethods: + """Test MCP tool-related methods.""" + + @patch('requests.post') + def test_list_tools(self, mock_post): + """Test listing available tools.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {'content-type': 'application/json'} + mock_response.json.return_value = { + "jsonrpc": "2.0", + "id": 1, + "result": { + "tools": [ + {"name": "search", "description": "Search tool"}, + {"name": "calculate", "description": "Calculator tool"} + ] + } + } + mock_post.return_value = mock_response + + client = MCPClient(endpoint_url="http://localhost:3000") + tools = client.list_tools() + + assert len(tools) == 2 + assert tools[0]["name"] == "search" + assert tools[1]["name"] == "calculate" + + # Verify correct method was called + call_args = mock_post.call_args + assert call_args[1]["json"]["method"] == "tools/list" + + @patch('requests.post') + def test_call_tool_without_arguments(self, mock_post): + """Test calling tool without arguments.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {'content-type': 'application/json'} + mock_response.json.return_value = { + "jsonrpc": "2.0", + "id": 1, + "result": { + "content": [ + {"type": "text", "text": "Tool result"} + ] + } + } + mock_post.return_value = mock_response + + client = MCPClient(endpoint_url="http://localhost:3000") + result = client.call_tool("test_tool") + + assert result[0]["text"] == "Tool result" + + # Verify request + call_args = mock_post.call_args + payload = call_args[1]["json"] + assert payload["method"] == "tools/call" + assert payload["params"]["name"] == "test_tool" + + @patch('requests.post') + def test_call_tool_with_arguments(self, mock_post): + """Test calling tool with arguments.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {'content-type': 'application/json'} + mock_response.json.return_value = { + "jsonrpc": "2.0", + "id": 1, + "result": { + "content": [{"type": "text", "text": "42"}] + } + } + mock_post.return_value = mock_response + + client = MCPClient(endpoint_url="http://localhost:3000") + result = client.call_tool("calculate", {"expression": "6*7"}) + + # Verify arguments were passed + call_args = mock_post.call_args + payload = call_args[1]["json"] + assert payload["params"]["arguments"] == {"expression": "6*7"} + + +class TestMCPPromptMethods: + """Test MCP prompt-related methods.""" + + @patch('requests.post') + def test_list_prompts(self, mock_post): + """Test listing available prompts.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {'content-type': 'application/json'} + mock_response.json.return_value = { + "jsonrpc": "2.0", + "id": 1, + "result": { + "prompts": [ + {"name": "greeting", "description": "Greeting prompt"}, + {"name": "summary", "description": "Summary prompt"} + ] + } + } + mock_post.return_value = mock_response + + client = MCPClient(endpoint_url="http://localhost:3000") + prompts = client.list_prompts() + + assert len(prompts) == 2 + assert prompts[0]["name"] == "greeting" + + # Verify method + call_args = mock_post.call_args + assert call_args[1]["json"]["method"] == "prompts/list" + + @patch('requests.post') + def test_get_prompt(self, mock_post): + """Test getting specific prompt.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {'content-type': 'application/json'} + mock_response.json.return_value = { + "jsonrpc": "2.0", + "id": 1, + "result": { + "messages": [ + {"role": "user", "content": {"type": "text", "text": "Hello!"}} + ] + } + } + mock_post.return_value = mock_response + + client = MCPClient(endpoint_url="http://localhost:3000") + prompt = client.get_prompt("greeting") + + assert len(prompt["messages"]) == 1 + assert prompt["messages"][0]["role"] == "user" + + # Verify method + call_args = mock_post.call_args + payload = call_args[1]["json"] + assert payload["method"] == "prompts/get" + assert payload["params"]["name"] == "greeting" + + @patch('requests.post') + def test_get_prompt_with_arguments(self, mock_post): + """Test getting prompt with arguments.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {'content-type': 'application/json'} + mock_response.json.return_value = { + "jsonrpc": "2.0", + "id": 1, + "result": { + "messages": [ + {"role": "user", "content": {"type": "text", "text": "Hello, Alice!"}} + ] + } + } + mock_post.return_value = mock_response + + client = MCPClient(endpoint_url="http://localhost:3000") + prompt = client.get_prompt("greeting", {"name": "Alice"}) + + # Verify arguments + call_args = mock_post.call_args + assert call_args[1]["json"]["params"]["arguments"] == {"name": "Alice"} + + +class TestMCPResourceMethods: + """Test MCP resource-related methods.""" + + @patch('requests.post') + def test_list_resources(self, mock_post): + """Test listing available resources.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {'content-type': 'application/json'} + mock_response.json.return_value = { + "jsonrpc": "2.0", + "id": 1, + "result": { + "resources": [ + {"uri": "file:///data.txt", "name": "data.txt"}, + {"uri": "file:///config.json", "name": "config.json"} + ] + } + } + mock_post.return_value = mock_response + + client = MCPClient(endpoint_url="http://localhost:3000") + resources = client.list_resources() + + assert len(resources) == 2 + assert resources[0]["name"] == "data.txt" + + # Verify method + call_args = mock_post.call_args + payload = call_args[1]["json"] + assert payload["method"] == "resources/list" + + +class TestMCPSSEHandling: + """Test SSE response handling in requests.""" + + @patch('requests.post') + def test_handles_sse_content_type(self, mock_post): + """Test that SSE content-type triggers SSE parsing.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {'content-type': 'text/event-stream'} + mock_response.text = """data: {"jsonrpc":"2.0","id":1,"result":{"tools":[]}} + +""" + mock_post.return_value = mock_response + + client = MCPClient(endpoint_url="http://localhost:3000") + result = client.list_tools() + + # Should successfully parse SSE response + assert result == [] + + @patch('requests.post') + def test_handles_json_content_type(self, mock_post): + """Test that JSON content-type uses JSON parsing.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {'content-type': 'application/json'} + mock_response.json.return_value = { + "jsonrpc": "2.0", + "id": 1, + "result": {"tools": []} + } + mock_post.return_value = mock_response + + client = MCPClient(endpoint_url="http://localhost:3000") + result = client.list_tools() + + assert result == [] + + +class TestMCPEndpointConstruction: + """Test MCP endpoint URL construction.""" + + @patch('requests.post') + def test_endpoint_url_used_correctly(self, mock_post): + """Test that endpoint URL is used for requests.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {'content-type': 'application/json'} + mock_response.json.return_value = { + "jsonrpc": "2.0", + "id": 1, + "result": {"tools": []} + } + mock_post.return_value = mock_response + + client = MCPClient(endpoint_url="http://localhost:3000/mcp") + client.list_tools() + + # Verify correct URL was called + call_args = mock_post.call_args + assert call_args[0][0] == "http://localhost:3000/mcp" + + @patch('requests.post') + def test_timeout_applied_to_requests(self, mock_post): + """Test that timeout is applied to HTTP requests.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {'content-type': 'application/json'} + mock_response.json.return_value = { + "jsonrpc": "2.0", + "id": 1, + "result": {"tools": []} + } + mock_post.return_value = mock_response + + client = MCPClient(endpoint_url="http://localhost:3000", timeout=45) + client.list_tools() + + # Verify timeout was used + call_args = mock_post.call_args + assert call_args[1]["timeout"] == 45 diff --git a/tests/test_x402_client.py b/tests/test_x402_client.py new file mode 100644 index 0000000..c87051c --- /dev/null +++ b/tests/test_x402_client.py @@ -0,0 +1,268 @@ +""" +Tests for X402Client - x402 micropayment protocol client. +""" + +import pytest +from unittest.mock import Mock, patch, MagicMock +from web3 import Web3 +import json +import time + +from agent0_sdk.core.x402_client import X402Client, USDC_SEPOLIA, USDC_BASE, USDC_BASE_SEPOLIA + + +class TestX402ClientInitialization: + """Test X402Client initialization.""" + + def test_init_ethereum_sepolia(self): + """Test initialization with Ethereum Sepolia chain.""" + client = X402Client( + rpc_url="https://ethereum-sepolia-rpc.publicnode.com", + private_key="0x" + "1" * 64, + chain_id=11155111 + ) + + assert client.chain_id == 11155111 + assert client.usdc_address == Web3.to_checksum_address(USDC_SEPOLIA) + assert client.address is not None + + def test_init_base_mainnet(self): + """Test initialization with Base mainnet chain.""" + client = X402Client( + rpc_url="https://mainnet.base.org", + private_key="0x" + "1" * 64, + chain_id=8453 + ) + + assert client.chain_id == 8453 + assert client.usdc_address == Web3.to_checksum_address(USDC_BASE) + + def test_init_base_sepolia(self): + """Test initialization with Base Sepolia chain.""" + client = X402Client( + rpc_url="https://sepolia.base.org", + private_key="0x" + "1" * 64, + chain_id=84532 + ) + + assert client.chain_id == 84532 + assert client.usdc_address == Web3.to_checksum_address(USDC_BASE_SEPOLIA) + + def test_init_unsupported_chain(self): + """Test initialization with unsupported chain raises error.""" + with pytest.raises(ValueError, match="Unsupported chain ID"): + X402Client( + rpc_url="https://mainnet.eth.org", + private_key="0x" + "1" * 64, + chain_id=1 # Ethereum mainnet not supported + ) + + +class TestX402PaymentSigning: + """Test x402 payment signature generation.""" + + @pytest.fixture + def client(self): + """Create test client.""" + return X402Client( + rpc_url="https://mainnet.base.org", + private_key="0x" + "1" * 64, + chain_id=8453 + ) + + def test_sign_payment_option_structure(self, client): + """Test payment option signing produces correct structure.""" + payment_option = { + "scheme": "erc20-authorization", + "network": "eip155:8453", + "asset": USDC_BASE, + "maxAmountRequired": 50000, # $0.05 in USDC (6 decimals) + "maxTimeoutSeconds": 600, + "payTo": "0x742D35cc6634c0532925a3b844bc9E7595F0BEb6", + "extra": { + "name": "USD Coin", + "version": "2" + } + } + + message = "Test message" + signed_payment = client._sign_payment_option(payment_option, message) + + # Verify structure + assert "x402Version" in signed_payment + assert signed_payment["x402Version"] == 1 + assert "scheme" in signed_payment + assert signed_payment["scheme"] == "erc20-authorization" + assert "network" in signed_payment + assert "payload" in signed_payment + + # Verify payload structure + payload = signed_payment["payload"] + assert "signature" in payload + assert "authorization" in payload + + # Verify authorization structure + auth = payload["authorization"] + assert "from" in auth + assert "to" in auth + assert "value" in auth + assert "validAfter" in auth + assert "validBefore" in auth + assert "nonce" in auth + + # Verify signature has 0x prefix + assert payload["signature"].startswith("0x") + assert len(payload["signature"]) == 132 # 0x + 130 hex chars + + def test_sign_payment_nonce_uniqueness(self, client): + """Test that each payment generates unique nonce.""" + payment_option = { + "scheme": "erc20-authorization", + "network": "eip155:8453", + "asset": USDC_BASE, + "maxAmountRequired": 50000, + "maxTimeoutSeconds": 600, + "payTo": "0x742D35cc6634c0532925a3b844bc9E7595F0BEb6", + "extra": {"name": "USD Coin", "version": "2"} + } + + signed1 = client._sign_payment_option(payment_option, "msg1") + signed2 = client._sign_payment_option(payment_option, "msg2") + + nonce1 = signed1["payload"]["authorization"]["nonce"] + nonce2 = signed2["payload"]["authorization"]["nonce"] + + assert nonce1 != nonce2 + assert nonce1.startswith("0x") + assert len(nonce1) == 66 # 0x + 64 hex chars (32 bytes) + + def test_sign_payment_valid_timeframe(self, client): + """Test that validAfter and validBefore are set correctly.""" + payment_option = { + "scheme": "erc20-authorization", + "network": "eip155:8453", + "asset": USDC_BASE, + "maxAmountRequired": 50000, + "maxTimeoutSeconds": 300, + "payTo": "0x742D35cc6634c0532925a3b844bc9E7595F0BEb6", + "extra": {"name": "USD Coin", "version": "2"} + } + + current_time = int(time.time()) + signed = client._sign_payment_option(payment_option, "test") + + auth = signed["payload"]["authorization"] + valid_after = int(auth["validAfter"]) + valid_before = int(auth["validBefore"]) + + assert valid_after == 0 + assert valid_before > current_time + assert valid_before <= current_time + 300 + + +class TestX402ProcessPayment: + """Test complete x402 payment processing.""" + + @pytest.fixture + def client(self): + """Create test client.""" + return X402Client( + rpc_url="https://mainnet.base.org", + private_key="0x" + "1" * 64, + chain_id=8453 + ) + + @patch('requests.post') + def test_process_payment_direct_success(self, mock_post, client): + """Test direct success without 402 challenge.""" + # Mock sufficient balance + client.get_balance = Mock(return_value=1.0) + + success_response = Mock() + success_response.status_code = 200 + success_response.json.return_value = { + "success": True, + "task": { + "status": { + "code": "success", + "message": {"parts": [{"kind": "text", "text": "Response"}]} + } + } + } + + mock_post.return_value = success_response + + result = client.process_payment( + gateway_url="http://localhost:3000/process", + message="Test", + price_usdc=0.05 + ) + + assert result["success"] is True + assert mock_post.call_count == 1 + + @patch('requests.post') + def test_process_payment_network_error(self, mock_post, client): + """Test handling of network errors.""" + import requests + # Mock sufficient balance + client.get_balance = Mock(return_value=1.0) + + mock_post.side_effect = requests.exceptions.ConnectionError("Network error") + + with pytest.raises(Exception, match="x402 gateway request failed"): + client.process_payment( + gateway_url="http://localhost:3000/process", + message="Test", + price_usdc=0.05 + ) + + +# Message formatting is internal implementation detail +# Tests removed as _build_a2a_message is not part of public API + + +class TestX402ChecksumAddresses: + """Test proper address checksumming.""" + + def test_usdc_addresses_checksummed(self): + """Test that all USDC addresses are properly checksummed.""" + # These should not raise errors + Web3.to_checksum_address(USDC_SEPOLIA) + Web3.to_checksum_address(USDC_BASE) + Web3.to_checksum_address(USDC_BASE_SEPOLIA) + + # Verify they are already checksummed + assert USDC_SEPOLIA == Web3.to_checksum_address(USDC_SEPOLIA) + assert USDC_BASE == Web3.to_checksum_address(USDC_BASE) + assert USDC_BASE_SEPOLIA == Web3.to_checksum_address(USDC_BASE_SEPOLIA) + + +class TestX402Integration: + """Integration tests with mocked Web3.""" + + @pytest.fixture + def client_with_balance(self): + """Create client with mocked USDC balance.""" + with patch('agent0_sdk.core.x402_client.Web3') as mock_web3_class: + mock_w3 = Mock() + mock_web3_class.return_value = mock_w3 + + # Mock contract + mock_contract = Mock() + mock_contract.functions.balanceOf.return_value.call.return_value = 1000000 # $1 USDC (6 decimals) + mock_contract.functions.decimals.return_value.call.return_value = 6 # USDC has 6 decimals + mock_w3.eth.contract.return_value = mock_contract + + client = X402Client( + rpc_url="https://mainnet.base.org", + private_key="0x" + "1" * 64, + chain_id=8453 + ) + + yield client + + def test_get_balance(self, client_with_balance): + """Test USDC balance checking.""" + balance = client_with_balance.get_balance() + assert balance == 1.0 # $1 USDC