diff --git a/.gitignore b/.gitignore index 6fb16575..456e8100 100644 --- a/.gitignore +++ b/.gitignore @@ -220,3 +220,6 @@ studio/.nvmrc **/logs/** *.lock + +docs/marketing +docs/projects diff --git a/src/openagents/agentid/__init__.py b/src/openagents/agentid/__init__.py new file mode 100644 index 00000000..4812ae90 --- /dev/null +++ b/src/openagents/agentid/__init__.py @@ -0,0 +1,105 @@ +""" +OpenAgents AgentID - Cryptographic Identity for AI Agents. + +This module provides tools for verifying and authenticating AI agents +using the OpenAgents AgentID system. + +Two identifier formats are supported: +- Level 2: openagents:agent-name[@org] - For JWT-based authentication +- Level 3: did:openagents:agent-name[@org] - For W3C DID verification + +Quick Start: + + # Validate (check if an agent exists) + from openagents.agentid import AgentIDVerifier + + client = AgentIDVerifier() + result = client.validate("openagents:my-agent") + print(f"Valid: {result.verified}") + + # Authentication (get JWT for your agent) + from openagents.agentid import AgentIDAuth + + auth = AgentIDAuth( + agent_name="my-agent", + org="my-org", + private_key_path="agent_private.pem" + ) + token = auth.get_token() + print(f"Token: {token.access_token}") +""" + +from openagents.agentid.client import AgentIDVerifier, AgentIDAuth +from openagents.agentid.models import ( + AgentIDLevel, + AgentIDFormat, + ParsedAgentID, + AgentInfo, + VerificationResult, + ChallengeResponse, + TokenResponse, + TokenValidationResult, + ClaimResponse, + DIDDocument, + DIDVerificationMethod, + DIDServiceEndpoint, +) +from openagents.agentid.parser import ( + parse_agent_id, + normalize_to_level2, + normalize_to_level3, + normalize_to_simple, + extract_components, + is_valid_agent_id, + get_format, + validate_agent_name, + validate_org_name, +) +from openagents.agentid.exceptions import ( + AgentIDError, + AgentIDNotFoundError, + AgentIDFormatError, + AgentIDConnectionError, + AgentIDAuthenticationError, + AgentIDTokenExpiredError, + AgentIDChallengeExpiredError, + AgentIDSignatureError, +) + +__all__ = [ + # Client classes + "AgentIDVerifier", + "AgentIDAuth", + # Models + "AgentIDLevel", + "AgentIDFormat", + "ParsedAgentID", + "AgentInfo", + "VerificationResult", + "ChallengeResponse", + "TokenResponse", + "TokenValidationResult", + "ClaimResponse", + "DIDDocument", + "DIDVerificationMethod", + "DIDServiceEndpoint", + # Parser functions + "parse_agent_id", + "normalize_to_level2", + "normalize_to_level3", + "normalize_to_simple", + "extract_components", + "is_valid_agent_id", + "get_format", + "validate_agent_name", + "validate_org_name", + # Exceptions + "AgentIDError", + "AgentIDNotFoundError", + "AgentIDFormatError", + "AgentIDConnectionError", + "AgentIDAuthenticationError", + "AgentIDTokenExpiredError", + "AgentIDChallengeExpiredError", + "AgentIDSignatureError", +] diff --git a/src/openagents/agentid/client.py b/src/openagents/agentid/client.py new file mode 100644 index 00000000..e073cef4 --- /dev/null +++ b/src/openagents/agentid/client.py @@ -0,0 +1,747 @@ +""" +AgentID client for verification and authentication. + +This module provides: +- AgentIDVerifier: For verifying agent IDs, validating tokens, and resolving DIDs +- AgentIDAuth: For authenticating as an agent (challenge-response + JWT) +""" + +import asyncio +import base64 +import logging +from pathlib import Path +from typing import Optional, Union + +try: + import aiohttp +except ImportError: + aiohttp = None # type: ignore + +from openagents.agentid.models import ( + AgentInfo, + VerificationResult, + ChallengeResponse, + TokenResponse, + TokenValidationResult, + ClaimResponse, + DIDDocument, + AgentIDLevel, +) +from openagents.agentid.parser import parse_agent_id, extract_components +from openagents.agentid.exceptions import ( + AgentIDError, + AgentIDNotFoundError, + AgentIDConnectionError, + AgentIDAuthenticationError, + AgentIDSignatureError, + AgentIDChallengeExpiredError, + AgentIDTokenExpiredError, +) + +logger = logging.getLogger(__name__) + +# Default API endpoint +DEFAULT_ENDPOINT = "https://endpoint.openagents.org" + + +class AgentIDVerifier: + """Client for AgentID verification and lookup operations. + + This client provides methods to: + - Validate agent IDs exist in the registry + - Get agent information + - Verify JWT tokens + - Resolve DID documents + - Request authentication challenges + - Exchange signatures for tokens + + Usage: + client = AgentIDVerifier() + + # Validate an agent exists + result = client.validate("openagents:my-agent") + + # Get agent info + info = client.get_agent_info("my-agent", org="my-org") + + # Resolve DID + did_doc = client.resolve_did("did:openagents:my-agent") + + # Verify a token + validation = client.verify_token("eyJ...") + """ + + def __init__( + self, + endpoint: str = DEFAULT_ENDPOINT, + timeout: int = 30, + ): + """Initialize the AgentID client. + + Args: + endpoint: Base URL for the AgentID API + timeout: Request timeout in seconds + """ + self.endpoint = endpoint.rstrip("/") + self.timeout = timeout + self._session = None + self._aiohttp = None + + async def _get_session(self): + """Get or create an aiohttp session.""" + if self._aiohttp is None: + import aiohttp + + self._aiohttp = aiohttp + + if self._session is None or self._session.closed: + connector = self._aiohttp.TCPConnector( + limit=100, + limit_per_host=30, + ttl_dns_cache=300, + ) + timeout = self._aiohttp.ClientTimeout(total=self.timeout) + self._session = self._aiohttp.ClientSession( + connector=connector, + timeout=timeout, + headers={"Content-Type": "application/json"}, + ) + return self._session + + async def close(self): + """Close the HTTP session.""" + if self._session and not self._session.closed: + await self._session.close() + self._session = None + + async def __aenter__(self): + """Async context manager entry.""" + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit.""" + await self.close() + + # ========================================================================= + # Async API Methods + # ========================================================================= + + async def validate_async(self, agent_id: str) -> VerificationResult: + """Verify that an agent ID exists in the registry. + + Args: + agent_id: Agent ID in any format (simple, Level 2, or Level 3) + + Returns: + VerificationResult with verification status + + Raises: + AgentIDConnectionError: On network errors + """ + agent_name, org = extract_components(agent_id) + + try: + info = await self.get_agent_info_async(agent_name, org=org) + return VerificationResult( + verified=True, + level=AgentIDLevel.LEVEL_2, + agent_name=agent_name, + org=org, + status=info.status, + message="Agent verified successfully", + ) + except AgentIDNotFoundError: + return VerificationResult( + verified=False, + level=AgentIDLevel.LEVEL_2, + agent_name=agent_name, + org=org, + status=None, + message="Agent not found in registry", + ) + + async def get_agent_info_async( + self, agent_name: str, org: str = None + ) -> AgentInfo: + """Get agent information from the registry. + + Args: + agent_name: Agent name + org: Optional organization scope + + Returns: + AgentInfo with agent details + + Raises: + AgentIDNotFoundError: If agent doesn't exist + AgentIDConnectionError: On network errors + """ + session = await self._get_session() + url = f"{self.endpoint}/v1/agent-ids/{agent_name}" + params = {} + if org: + params["org"] = org + + try: + async with session.get(url, params=params) as resp: + data = await resp.json() + + if resp.status == 404 or data.get("code") == 404: + raise AgentIDNotFoundError(agent_name, org) + + if resp.status != 200: + raise AgentIDConnectionError( + f"API error: {data.get('message', 'Unknown error')}", + status_code=resp.status, + response=str(data), + ) + + agent_data = data.get("data", data) + return AgentInfo( + agent_name=agent_data.get("agentName", agent_name), + org=agent_data.get("org", org), + status=agent_data.get("status", "active"), + public_key_pem=agent_data.get("publicKeyPem"), + cert_serial=agent_data.get("serial"), + algorithm=agent_data.get("algorithm"), + ) + + except Exception as e: + if aiohttp and isinstance(e, aiohttp.ClientError): + raise AgentIDConnectionError(f"Connection error: {e}") + raise + + async def resolve_did_async(self, did: str) -> DIDDocument: + """Resolve a DID to get the DID document. + + Args: + did: DID in format did:openagents:xxx + + Returns: + DIDDocument with verification methods and services + + Raises: + AgentIDNotFoundError: If DID doesn't exist + AgentIDConnectionError: On network errors + """ + # Ensure it's a valid DID format + parsed = parse_agent_id(did) + did_str = parsed.level_3_id + + session = await self._get_session() + url = f"{self.endpoint}/v1/agentid/did/{did_str}" + + try: + async with session.get(url) as resp: + data = await resp.json() + + if resp.status == 404 or data.get("code") == 404: + raise AgentIDNotFoundError( + parsed.agent_name, parsed.org + ) + + if resp.status != 200: + raise AgentIDConnectionError( + f"API error: {data.get('message', 'Unknown error')}", + status_code=resp.status, + response=str(data), + ) + + doc_data = data.get("data", data) + return DIDDocument(**doc_data) + + except Exception as e: + if aiohttp and isinstance(e, aiohttp.ClientError): + raise AgentIDConnectionError(f"Connection error: {e}") + raise + + async def verify_token_async(self, token: str) -> TokenValidationResult: + """Validate a JWT token. + + Args: + token: JWT token to validate + + Returns: + TokenValidationResult with validation status + + Raises: + AgentIDConnectionError: On network errors + """ + session = await self._get_session() + url = f"{self.endpoint}/v1/agentid/verify-token" + + try: + async with session.post(url, json={"token": token}) as resp: + data = await resp.json() + + if resp.status != 200: + return TokenValidationResult( + valid=False, + reason=data.get("message", "Validation failed"), + ) + + result_data = data.get("data", data) + return TokenValidationResult( + valid=result_data.get("valid", False), + agent_name=result_data.get("agentName"), + org=result_data.get("org"), + expires_at=result_data.get("expiresAt"), + verification_level=result_data.get("verificationLevel"), + reason=result_data.get("reason"), + ) + + except Exception as e: + if aiohttp and isinstance(e, aiohttp.ClientError): + raise AgentIDConnectionError(f"Connection error: {e}") + raise + + async def request_challenge_async( + self, + agent_name: str, + org: str = None, + algorithm: str = "RS256", + ) -> ChallengeResponse: + """Request an authentication challenge. + + Args: + agent_name: Agent name + org: Optional organization scope + algorithm: Signing algorithm (RS256 or Ed25519) + + Returns: + ChallengeResponse with challenge and nonce + + Raises: + AgentIDNotFoundError: If agent doesn't exist + AgentIDConnectionError: On network errors + """ + session = await self._get_session() + url = f"{self.endpoint}/v1/agentid/challenge" + + payload = { + "agentName": agent_name, + "algorithm": algorithm, + } + if org: + payload["org"] = org + + try: + async with session.post(url, json=payload) as resp: + data = await resp.json() + + if resp.status == 404 or data.get("code") == 404: + raise AgentIDNotFoundError(agent_name, org) + + if resp.status != 200: + raise AgentIDConnectionError( + f"API error: {data.get('message', 'Unknown error')}", + status_code=resp.status, + response=str(data), + ) + + challenge_data = data.get("data", data) + return ChallengeResponse( + challenge=challenge_data["challenge"], + nonce=challenge_data["nonce"], + algorithm=challenge_data.get("algorithm", algorithm), + expires_in=challenge_data.get("expiresIn", 300), + ) + + except Exception as e: + if aiohttp and isinstance(e, aiohttp.ClientError): + raise AgentIDConnectionError(f"Connection error: {e}") + raise + + async def get_token_async( + self, + agent_name: str, + nonce: str, + signature: str, + org: str = None, + ) -> TokenResponse: + """Exchange a signature for a JWT token. + + Args: + agent_name: Agent name + nonce: Challenge nonce + signature: Base64-encoded signature of the challenge + org: Optional organization scope + + Returns: + TokenResponse with JWT token + + Raises: + AgentIDAuthenticationError: If signature verification fails + AgentIDConnectionError: On network errors + """ + session = await self._get_session() + url = f"{self.endpoint}/v1/agentid/token" + + payload = { + "agentName": agent_name, + "nonce": nonce, + "signature": signature, + } + if org: + payload["org"] = org + + try: + async with session.post(url, json=payload) as resp: + data = await resp.json() + + if resp.status == 401: + message = data.get("message", "Authentication failed") + if "expired" in message.lower(): + raise AgentIDChallengeExpiredError() + raise AgentIDSignatureError(message) + + if resp.status != 200: + raise AgentIDConnectionError( + f"API error: {data.get('message', 'Unknown error')}", + status_code=resp.status, + response=str(data), + ) + + token_data = data.get("data", data) + return TokenResponse( + access_token=token_data["accessToken"], + token_type=token_data.get("tokenType", "bearer"), + expires_in=token_data["expiresIn"], + verification_level=token_data.get("verificationLevel", 2), + ) + + except Exception as e: + if aiohttp and isinstance(e, aiohttp.ClientError): + raise AgentIDConnectionError(f"Connection error: {e}") + raise + + async def claim_agent_id_async( + self, + agent_name: str, + public_key_pem: str, + org: str = None, + api_key: str = None, + namespace_type: str = "org", + ) -> ClaimResponse: + """Claim/register a new agent ID. + + Args: + agent_name: Desired agent name + public_key_pem: Public key in PEM format + org: Organization scope (required if using org namespace) + api_key: API key for authentication + namespace_type: Namespace type ("org" or "global") + + Returns: + ClaimResponse with the claimed agent details and certificate + + Raises: + AgentIDConnectionError: On network errors or API failures + AgentIDAuthenticationError: If authentication fails + """ + session = await self._get_session() + url = f"{self.endpoint}/v1/agent-ids/create" + + payload = { + "agentName": agent_name, + "publicKeyPem": public_key_pem, + "namespaceType": namespace_type, + } + if org: + payload["org"] = org + + headers = {} + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + + try: + async with session.post(url, json=payload, headers=headers) as resp: + data = await resp.json() + + if resp.status == 401: + raise AgentIDAuthenticationError( + data.get("message", "Authentication failed") + ) + + if resp.status == 409: + raise AgentIDConnectionError( + f"Agent ID already exists: {agent_name}", + status_code=resp.status, + response=str(data), + ) + + if resp.status not in (200, 201): + raise AgentIDConnectionError( + f"API error: {data.get('message', 'Unknown error')}", + status_code=resp.status, + response=str(data), + ) + + claim_data = data.get("data", data) + return ClaimResponse( + agent_name=claim_data.get("agentName", agent_name), + org=claim_data.get("org", org), + cert_pem=claim_data.get("certPem"), + serial=claim_data.get("serial"), + status=claim_data.get("status", "active"), + ) + + except Exception as e: + if aiohttp and isinstance(e, aiohttp.ClientError): + raise AgentIDConnectionError(f"Connection error: {e}") + raise + + # ========================================================================= + # Sync API Methods (wrappers around async methods) + # ========================================================================= + + def _run_async(self, coro): + """Run an async coroutine synchronously with proper cleanup.""" + + async def run_with_cleanup(): + try: + return await coro + finally: + await self.close() + + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + + if loop and loop.is_running(): + # We're in an async context, can't use run_until_complete + import concurrent.futures + + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(asyncio.run, run_with_cleanup()) + return future.result() + else: + return asyncio.run(run_with_cleanup()) + + def validate(self, agent_id: str) -> VerificationResult: + """Validate that an agent ID exists (sync version).""" + return self._run_async(self.validate_async(agent_id)) + + def get_agent_info(self, agent_name: str, org: str = None) -> AgentInfo: + """Get agent information (sync version).""" + return self._run_async(self.get_agent_info_async(agent_name, org=org)) + + def resolve_did(self, did: str) -> DIDDocument: + """Resolve a DID document (sync version).""" + return self._run_async(self.resolve_did_async(did)) + + def verify_token(self, token: str) -> TokenValidationResult: + """Verify a JWT token (sync version).""" + return self._run_async(self.verify_token_async(token)) + + def request_challenge( + self, + agent_name: str, + org: str = None, + algorithm: str = "RS256", + ) -> ChallengeResponse: + """Request an authentication challenge (sync version).""" + return self._run_async( + self.request_challenge_async(agent_name, org=org, algorithm=algorithm) + ) + + def get_token( + self, + agent_name: str, + nonce: str, + signature: str, + org: str = None, + ) -> TokenResponse: + """Exchange a signature for a JWT token (sync version).""" + return self._run_async( + self.get_token_async(agent_name, nonce, signature, org=org) + ) + + def claim_agent_id( + self, + agent_name: str, + public_key_pem: str, + org: str = None, + api_key: str = None, + namespace_type: str = "org", + ) -> ClaimResponse: + """Claim/register a new agent ID (sync version).""" + return self._run_async( + self.claim_agent_id_async( + agent_name, + public_key_pem, + org=org, + api_key=api_key, + namespace_type=namespace_type, + ) + ) + + +class AgentIDAuth: + """Authentication helper for getting JWT tokens. + + This class handles the full challenge-response flow to obtain + JWT tokens for an agent you own. + + Usage: + auth = AgentIDAuth( + agent_name="my-agent", + org="my-org", + private_key_path="agent_private.pem" + ) + + # Get a JWT token (handles challenge-response automatically) + token = auth.get_token() + + # Use the token for API calls + headers = {"Authorization": f"Bearer {token.access_token}"} + """ + + def __init__( + self, + agent_name: str, + org: str = None, + private_key_path: Union[str, Path] = None, + private_key_pem: str = None, + algorithm: str = "RS256", + endpoint: str = DEFAULT_ENDPOINT, + ): + """Initialize the authentication helper. + + Args: + agent_name: Agent name + org: Optional organization scope + private_key_path: Path to private key PEM file + private_key_pem: Private key in PEM format (alternative to path) + algorithm: Signing algorithm (RS256 or Ed25519) + endpoint: API endpoint URL + + Raises: + ValueError: If neither private_key_path nor private_key_pem is provided + """ + if not private_key_path and not private_key_pem: + raise ValueError( + "Either private_key_path or private_key_pem must be provided" + ) + + self.agent_name = agent_name + self.org = org + self.algorithm = algorithm + self.client = AgentIDVerifier(endpoint=endpoint) + + # Load private key + if private_key_path: + with open(private_key_path, "rb") as f: + private_key_pem = f.read() + elif isinstance(private_key_pem, str): + private_key_pem = private_key_pem.encode() + + self._private_key_pem = private_key_pem + self._private_key = None + + def _load_private_key(self): + """Load and cache the private key.""" + if self._private_key is None: + from cryptography.hazmat.primitives.serialization import ( + load_pem_private_key, + ) + + self._private_key = load_pem_private_key( + self._private_key_pem, password=None + ) + return self._private_key + + def sign_challenge(self, challenge_b64: str) -> str: + """Sign a challenge with the private key. + + Args: + challenge_b64: Base64-encoded challenge bytes + + Returns: + Base64-encoded signature + """ + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.asymmetric import padding, ec + + private_key = self._load_private_key() + challenge_bytes = base64.b64decode(challenge_b64) + + # Determine signing method based on key type + if hasattr(private_key, "sign"): + if self.algorithm == "RS256": + # RSA signature + signature = private_key.sign( + challenge_bytes, + padding.PKCS1v15(), + hashes.SHA256(), + ) + elif self.algorithm in ("Ed25519", "EdDSA"): + # Ed25519 signature (no extra params needed) + signature = private_key.sign(challenge_bytes) + elif self.algorithm == "ES256": + # ECDSA signature + signature = private_key.sign( + challenge_bytes, + ec.ECDSA(hashes.SHA256()), + ) + else: + raise ValueError(f"Unsupported algorithm: {self.algorithm}") + else: + raise ValueError("Private key does not support signing") + + return base64.b64encode(signature).decode() + + async def get_token_async(self) -> TokenResponse: + """Get a JWT token using challenge-response (async). + + Returns: + TokenResponse with JWT token + + Raises: + AgentIDAuthenticationError: If authentication fails + AgentIDConnectionError: On network errors + """ + # Step 1: Request challenge + challenge = await self.client.request_challenge_async( + self.agent_name, + org=self.org, + algorithm=self.algorithm, + ) + + # Step 2: Sign challenge + signature = self.sign_challenge(challenge.challenge) + + # Step 3: Get token + token = await self.client.get_token_async( + self.agent_name, + nonce=challenge.nonce, + signature=signature, + org=self.org, + ) + + return token + + def get_token(self) -> TokenResponse: + """Get a JWT token using challenge-response (sync). + + Returns: + TokenResponse with JWT token + + Raises: + AgentIDAuthenticationError: If authentication fails + AgentIDConnectionError: On network errors + """ + return self.client._run_async(self.get_token_async()) + + async def close(self): + """Close the underlying client.""" + await self.client.close() + + async def __aenter__(self): + """Async context manager entry.""" + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit.""" + await self.close() diff --git a/src/openagents/agentid/exceptions.py b/src/openagents/agentid/exceptions.py new file mode 100644 index 00000000..c5e556b6 --- /dev/null +++ b/src/openagents/agentid/exceptions.py @@ -0,0 +1,122 @@ +""" +Custom exceptions for the AgentID module. + +This module defines exception classes for handling errors related to +AgentID verification, authentication, and API communication. +""" + + +class AgentIDError(Exception): + """Base exception for AgentID-related errors.""" + + def __init__(self, message: str, details: dict = None): + """Initialize the exception. + + Args: + message: Human-readable error message + details: Optional dictionary with additional error details + """ + super().__init__(message) + self.message = message + self.details = details or {} + + +class AgentIDNotFoundError(AgentIDError): + """Raised when an agent ID is not found in the registry.""" + + def __init__(self, agent_id: str, org: str = None): + """Initialize the exception. + + Args: + agent_id: The agent name that was not found + org: Optional organization scope + """ + full_id = f"{agent_id}@{org}" if org else agent_id + message = f"Agent ID not found: {full_id}" + super().__init__(message, {"agent_id": agent_id, "org": org}) + self.agent_id = agent_id + self.org = org + + +class AgentIDFormatError(AgentIDError): + """Raised when an agent ID format is invalid.""" + + def __init__(self, agent_id: str, reason: str = None): + """Initialize the exception. + + Args: + agent_id: The invalid agent ID string + reason: Optional reason for the format error + """ + message = f"Invalid agent ID format: {agent_id}" + if reason: + message += f" ({reason})" + super().__init__(message, {"agent_id": agent_id, "reason": reason}) + self.agent_id = agent_id + self.reason = reason + + +class AgentIDConnectionError(AgentIDError): + """Raised when there's a network/connection error with the AgentID API.""" + + def __init__(self, message: str, status_code: int = None, response: str = None): + """Initialize the exception. + + Args: + message: Error message + status_code: HTTP status code if applicable + response: Raw response body if available + """ + super().__init__( + message, {"status_code": status_code, "response": response} + ) + self.status_code = status_code + self.response = response + + +class AgentIDAuthenticationError(AgentIDError): + """Raised when authentication fails (challenge-response or token).""" + + def __init__(self, message: str, reason: str = None): + """Initialize the exception. + + Args: + message: Error message + reason: Specific reason for auth failure + """ + super().__init__(message, {"reason": reason}) + self.reason = reason + + +class AgentIDTokenExpiredError(AgentIDAuthenticationError): + """Raised when a JWT token has expired.""" + + def __init__(self): + """Initialize the exception.""" + super().__init__("JWT token has expired", reason="token_expired") + + +class AgentIDChallengeExpiredError(AgentIDAuthenticationError): + """Raised when an authentication challenge has expired.""" + + def __init__(self): + """Initialize the exception.""" + super().__init__( + "Challenge has expired (challenges expire after 5 minutes)", + reason="challenge_expired", + ) + + +class AgentIDSignatureError(AgentIDAuthenticationError): + """Raised when signature verification fails.""" + + def __init__(self, reason: str = None): + """Initialize the exception. + + Args: + reason: Specific reason for signature failure + """ + message = "Signature verification failed" + if reason: + message += f": {reason}" + super().__init__(message, reason=reason or "invalid_signature") diff --git a/src/openagents/agentid/models.py b/src/openagents/agentid/models.py new file mode 100644 index 00000000..12d0c2d7 --- /dev/null +++ b/src/openagents/agentid/models.py @@ -0,0 +1,229 @@ +""" +Pydantic models for the AgentID module. + +This module defines data models for Agent ID parsing, verification, +authentication, and DID document handling. +""" + +from typing import Optional, List, Dict, Any, Literal +from datetime import datetime +from enum import Enum +from pydantic import BaseModel, Field, ConfigDict + + +class AgentIDLevel(str, Enum): + """Verification level for an Agent ID.""" + + LEVEL_1 = "level_1" # Key-proof verified (challenge-response) + LEVEL_2 = "level_2" # JWT token (openagents:xxx format) + LEVEL_3 = "level_3" # DID document (did:openagents:xxx format) + + +class AgentIDFormat(str, Enum): + """Format type of an Agent ID string.""" + + SIMPLE = "simple" # Just the name: my-agent or my-agent@org + LEVEL_2 = "level_2" # openagents:my-agent or openagents:my-agent@org + LEVEL_3 = "level_3" # did:openagents:my-agent or did:openagents:my-agent@org + + +class ParsedAgentID(BaseModel): + """Parsed components of an Agent ID.""" + + agent_name: str = Field(..., description="The agent's unique name") + org: Optional[str] = Field(None, description="Organization scope (optional)") + format: AgentIDFormat = Field(..., description="The format of the original ID") + + @property + def full_name(self) -> str: + """Return the full name with optional org suffix.""" + if self.org: + return f"{self.agent_name}@{self.org}" + return self.agent_name + + @property + def level_2_id(self) -> str: + """Return the Level 2 format (openagents:xxx).""" + return f"openagents:{self.full_name}" + + @property + def level_3_id(self) -> str: + """Return the Level 3 format (did:openagents:xxx).""" + return f"did:openagents:{self.full_name}" + + +class AgentInfo(BaseModel): + """Agent information from the registry.""" + + agent_name: str = Field(..., description="The agent's name") + org: Optional[str] = Field(None, description="Organization scope") + status: str = Field(..., description="Agent status (active, inactive, etc.)") + created_at: Optional[datetime] = Field(None, description="When the agent was created") + public_key_pem: Optional[str] = Field( + None, alias="publicKeyPem", description="Public key in PEM format" + ) + cert_serial: Optional[str] = Field( + None, alias="serial", description="X.509 certificate serial number" + ) + algorithm: Optional[str] = Field( + None, description="Key algorithm (RS256, Ed25519, etc.)" + ) + + model_config = ConfigDict(populate_by_name=True) + + +class VerificationResult(BaseModel): + """Result of verifying an Agent ID.""" + + verified: bool = Field(..., description="Whether the agent was verified") + level: AgentIDLevel = Field(..., description="Verification level achieved") + agent_name: str = Field(..., description="The agent's name") + org: Optional[str] = Field(None, description="Organization scope") + status: Optional[str] = Field(None, description="Agent status") + message: Optional[str] = Field(None, description="Additional verification message") + + @property + def level_2_id(self) -> str: + """Return the Level 2 format.""" + full_name = f"{self.agent_name}@{self.org}" if self.org else self.agent_name + return f"openagents:{full_name}" + + @property + def level_3_id(self) -> str: + """Return the Level 3 format.""" + full_name = f"{self.agent_name}@{self.org}" if self.org else self.agent_name + return f"did:openagents:{full_name}" + + +class ChallengeResponse(BaseModel): + """Response from requesting an authentication challenge.""" + + challenge: str = Field(..., description="Base64-encoded challenge bytes") + nonce: str = Field(..., description="Unique nonce for this challenge") + algorithm: str = Field(..., description="Expected signing algorithm") + expires_in: int = Field( + default=300, alias="expiresIn", description="Seconds until challenge expires" + ) + + model_config = ConfigDict(populate_by_name=True) + + +class TokenResponse(BaseModel): + """Response containing a JWT token.""" + + access_token: str = Field(..., alias="accessToken", description="JWT access token") + token_type: str = Field( + default="bearer", alias="tokenType", description="Token type" + ) + expires_in: int = Field(..., alias="expiresIn", description="Seconds until expiry") + verification_level: int = Field( + default=2, alias="verificationLevel", description="Verification level" + ) + + model_config = ConfigDict(populate_by_name=True) + + +class TokenValidationResult(BaseModel): + """Result of validating a JWT token.""" + + valid: bool = Field(..., description="Whether the token is valid") + agent_name: Optional[str] = Field( + None, alias="agentName", description="Agent name from token" + ) + org: Optional[str] = Field(None, description="Organization from token") + expires_at: Optional[datetime] = Field( + None, alias="expiresAt", description="Token expiration time" + ) + verification_level: Optional[int] = Field( + None, alias="verificationLevel", description="Verification level" + ) + reason: Optional[str] = Field(None, description="Reason if validation failed") + + model_config = ConfigDict(populate_by_name=True) + + +class DIDVerificationMethod(BaseModel): + """A verification method in a DID document.""" + + id: str = Field(..., description="Verification method ID") + type: str = Field(..., description="Verification method type") + controller: str = Field(..., description="Controller DID") + public_key_pem: Optional[str] = Field( + None, alias="publicKeyPem", description="Public key in PEM format" + ) + public_key_jwk: Optional[Dict[str, Any]] = Field( + None, alias="publicKeyJwk", description="Public key in JWK format" + ) + + model_config = ConfigDict(populate_by_name=True) + + +class DIDServiceEndpoint(BaseModel): + """A service endpoint in a DID document.""" + + id: str = Field(..., description="Service ID") + type: str = Field(..., description="Service type") + service_endpoint: str = Field( + ..., alias="serviceEndpoint", description="Service endpoint URL" + ) + + model_config = ConfigDict(populate_by_name=True) + + +class ClaimResponse(BaseModel): + """Response from claiming/registering an agent ID.""" + + agent_name: str = Field(..., alias="agentName", description="The claimed agent name") + org: Optional[str] = Field(None, description="Organization scope") + cert_pem: Optional[str] = Field( + None, alias="certPem", description="Issued certificate in PEM format" + ) + serial: Optional[str] = Field( + None, description="Certificate serial number" + ) + status: str = Field(default="active", description="Agent status") + + model_config = ConfigDict(populate_by_name=True) + + +class DIDDocument(BaseModel): + """W3C DID Document for an agent.""" + + context: List[str] = Field( + default=["https://www.w3.org/ns/did/v1"], + alias="@context", + description="JSON-LD context", + ) + id: str = Field(..., description="DID identifier") + verification_method: List[DIDVerificationMethod] = Field( + default_factory=list, + alias="verificationMethod", + description="Verification methods", + ) + authentication: List[str] = Field( + default_factory=list, description="Authentication method references" + ) + service: List[DIDServiceEndpoint] = Field( + default_factory=list, description="Service endpoints" + ) + + model_config = ConfigDict(populate_by_name=True) + + @property + def agent_name(self) -> Optional[str]: + """Extract agent name from DID.""" + if self.id.startswith("did:openagents:"): + full_name = self.id[len("did:openagents:") :] + if "@" in full_name: + return full_name.split("@")[0] + return full_name + return None + + @property + def org(self) -> Optional[str]: + """Extract organization from DID.""" + if self.id.startswith("did:openagents:"): + full_name = self.id[len("did:openagents:") :] + if "@" in full_name: + return full_name.split("@")[1] + return None diff --git a/src/openagents/agentid/parser.py b/src/openagents/agentid/parser.py new file mode 100644 index 00000000..6041d6e9 --- /dev/null +++ b/src/openagents/agentid/parser.py @@ -0,0 +1,244 @@ +""" +Agent ID format parsing utilities. + +This module provides functions for parsing, validating, and normalizing +Agent ID strings in different formats (Level 2 and Level 3). + +Supported formats: +- Simple: my-agent, my-agent@org +- Level 2: openagents:my-agent, openagents:my-agent@org +- Level 3: did:openagents:my-agent, did:openagents:my-agent@org +""" + +import re +from typing import Tuple, Optional + +from openagents.agentid.models import ParsedAgentID, AgentIDFormat +from openagents.agentid.exceptions import AgentIDFormatError + + +# Agent name validation pattern +# - 3-64 characters +# - Alphanumeric, hyphens, underscores +# - Must start with alphanumeric +AGENT_NAME_PATTERN = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_-]{2,63}$") + +# Organization name validation pattern (same rules) +ORG_NAME_PATTERN = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_-]{2,63}$") + +# Format prefixes +LEVEL_2_PREFIX = "openagents:" +LEVEL_3_PREFIX = "did:openagents:" + + +def validate_agent_name(name: str) -> bool: + """Validate an agent name. + + Args: + name: Agent name to validate + + Returns: + True if valid, False otherwise + """ + if not name: + return False + return bool(AGENT_NAME_PATTERN.match(name)) + + +def validate_org_name(org: str) -> bool: + """Validate an organization name. + + Args: + org: Organization name to validate + + Returns: + True if valid, False otherwise + """ + if not org: + return False + return bool(ORG_NAME_PATTERN.match(org)) + + +def _parse_name_and_org(full_name: str) -> Tuple[str, Optional[str]]: + """Parse a full name into agent name and optional org. + + Args: + full_name: Agent name with optional @org suffix + + Returns: + Tuple of (agent_name, org or None) + + Raises: + AgentIDFormatError: If the format is invalid + """ + if "@" in full_name: + parts = full_name.split("@") + if len(parts) != 2: + raise AgentIDFormatError( + full_name, "Multiple '@' characters found" + ) + agent_name, org = parts + if not agent_name: + raise AgentIDFormatError(full_name, "Agent name cannot be empty") + if not org: + raise AgentIDFormatError(full_name, "Organization cannot be empty after '@'") + return agent_name, org + return full_name, None + + +def parse_agent_id(agent_id: str) -> ParsedAgentID: + """Parse an agent ID string into its components. + + Supports all formats: + - Simple: my-agent, my-agent@org + - Level 2: openagents:my-agent, openagents:my-agent@org + - Level 3: did:openagents:my-agent, did:openagents:my-agent@org + + Args: + agent_id: Agent ID string to parse + + Returns: + ParsedAgentID with components + + Raises: + AgentIDFormatError: If the format is invalid + """ + if not agent_id or not isinstance(agent_id, str): + raise AgentIDFormatError(str(agent_id), "Agent ID must be a non-empty string") + + agent_id = agent_id.strip() + + # Determine format and extract full name + if agent_id.startswith(LEVEL_3_PREFIX): + format_type = AgentIDFormat.LEVEL_3 + full_name = agent_id[len(LEVEL_3_PREFIX) :] + elif agent_id.startswith(LEVEL_2_PREFIX): + format_type = AgentIDFormat.LEVEL_2 + full_name = agent_id[len(LEVEL_2_PREFIX) :] + else: + format_type = AgentIDFormat.SIMPLE + full_name = agent_id + + if not full_name: + raise AgentIDFormatError(agent_id, "Agent name cannot be empty") + + # Parse name and org + agent_name, org = _parse_name_and_org(full_name) + + # Validate agent name + if not validate_agent_name(agent_name): + raise AgentIDFormatError( + agent_id, + f"Invalid agent name '{agent_name}': must be 3-64 characters, " + "alphanumeric with hyphens/underscores, starting with alphanumeric", + ) + + # Validate org if present + if org and not validate_org_name(org): + raise AgentIDFormatError( + agent_id, + f"Invalid organization '{org}': must be 3-64 characters, " + "alphanumeric with hyphens/underscores, starting with alphanumeric", + ) + + return ParsedAgentID( + agent_name=agent_name, + org=org, + format=format_type, + ) + + +def normalize_to_level2(agent_id: str) -> str: + """Normalize any agent ID format to Level 2 format. + + Args: + agent_id: Agent ID in any supported format + + Returns: + Level 2 format (openagents:xxx) + + Raises: + AgentIDFormatError: If the format is invalid + """ + parsed = parse_agent_id(agent_id) + return parsed.level_2_id + + +def normalize_to_level3(agent_id: str) -> str: + """Normalize any agent ID format to Level 3 DID format. + + Args: + agent_id: Agent ID in any supported format + + Returns: + Level 3 format (did:openagents:xxx) + + Raises: + AgentIDFormatError: If the format is invalid + """ + parsed = parse_agent_id(agent_id) + return parsed.level_3_id + + +def normalize_to_simple(agent_id: str) -> str: + """Normalize any agent ID format to simple format. + + Args: + agent_id: Agent ID in any supported format + + Returns: + Simple format (xxx or xxx@org) + + Raises: + AgentIDFormatError: If the format is invalid + """ + parsed = parse_agent_id(agent_id) + return parsed.full_name + + +def extract_components(agent_id: str) -> Tuple[str, Optional[str]]: + """Extract agent name and org from any format. + + Args: + agent_id: Agent ID in any supported format + + Returns: + Tuple of (agent_name, org or None) + + Raises: + AgentIDFormatError: If the format is invalid + """ + parsed = parse_agent_id(agent_id) + return parsed.agent_name, parsed.org + + +def is_valid_agent_id(agent_id: str) -> bool: + """Check if an agent ID string is valid. + + Args: + agent_id: Agent ID string to validate + + Returns: + True if valid, False otherwise + """ + try: + parse_agent_id(agent_id) + return True + except AgentIDFormatError: + return False + + +def get_format(agent_id: str) -> AgentIDFormat: + """Determine the format of an agent ID string. + + Args: + agent_id: Agent ID string + + Returns: + AgentIDFormat enum value + + Raises: + AgentIDFormatError: If the format is invalid + """ + parsed = parse_agent_id(agent_id) + return parsed.format diff --git a/src/openagents/cli.py b/src/openagents/cli.py index 91ccbbe1..0cbd3d4b 100644 --- a/src/openagents/cli.py +++ b/src/openagents/cli.py @@ -1607,11 +1607,19 @@ async def run_studio(): rich_markup_mode="rich" ) +# AgentID command group for agent identity management +agentid_app = typer.Typer( + name="agentid", + help="🪪 Agent Identity verification and authentication", + rich_markup_mode="rich" +) + # Add subcommands to main app app.add_typer(network_app, name="network") app.add_typer(agent_app, name="agent") app.add_typer(agents_app, name="agents") app.add_typer(certs_app, name="certs") +app.add_typer(agentid_app, name="agentid") # OpenAgents API constants @@ -3299,6 +3307,487 @@ def verbose_callback(value: bool): return value +# ============================================================================ +# AgentID Commands +# ============================================================================ + +@agentid_app.command("parse") +def agentid_parse( + agent_id: str = typer.Argument(..., help="Agent ID to parse (e.g., openagents:my-agent or did:openagents:my-agent@org)") +): + """🔍 Parse an Agent ID format (no API call)""" + from openagents.agentid import parse_agent_id, AgentIDFormatError + + try: + parsed = parse_agent_id(agent_id) + + table = Table(title="📋 Parsed Agent ID", box=box.ROUNDED) + table.add_column("Field", style="cyan") + table.add_column("Value", style="green") + + table.add_row("Input", agent_id) + table.add_row("Agent Name", parsed.agent_name) + table.add_row("Organization", parsed.org or "[dim]None[/dim]") + table.add_row("Format", parsed.format.value) + table.add_row("Level 2 ID", parsed.level_2_id) + table.add_row("Level 3 ID", parsed.level_3_id) + + console.print(table) + + except AgentIDFormatError as e: + console.print(f"[red]❌ Invalid format: {e.message}[/red]") + raise typer.Exit(1) + + +@agentid_app.command("verify") +def agentid_verify( + agent_id: str = typer.Argument(..., help="Agent ID to verify (e.g., openagents:my-agent)") +): + """✅ Verify an Agent ID exists in the registry""" + from openagents.agentid import AgentIDVerifier, AgentIDFormatError, AgentIDConnectionError + + try: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + transient=True, + ) as progress: + progress.add_task(f"🔍 Verifying {agent_id}...", total=None) + + client = AgentIDVerifier() + result = client.validate(agent_id) + + if result.verified: + panel_content = f"""[green]✅ Agent Verified[/green] + +[bold]Agent Name:[/bold] {result.agent_name} +[bold]Organization:[/bold] {result.org or '[dim]None[/dim]'} +[bold]Status:[/bold] {result.status or 'active'} + +[bold]Level 2 ID:[/bold] [cyan]{result.level_2_id}[/cyan] +[bold]Level 3 ID:[/bold] [cyan]{result.level_3_id}[/cyan]""" + console.print(Panel(panel_content, title="🪪 Agent Identity", border_style="green")) + else: + console.print(Panel( + f"[red]❌ Agent not found[/red]\n\n{result.message}", + title="🪪 Agent Identity", + border_style="red" + )) + raise typer.Exit(1) + + except AgentIDFormatError as e: + console.print(f"[red]❌ Invalid format: {e.message}[/red]") + raise typer.Exit(1) + except AgentIDConnectionError as e: + console.print(f"[red]❌ Connection error: {e.message}[/red]") + raise typer.Exit(1) + + +@agentid_app.command("info") +def agentid_info( + agent_name: str = typer.Argument(..., help="Agent name to look up"), + org: Optional[str] = typer.Option(None, "--org", "-o", help="Organization scope") +): + """📋 Get detailed agent information""" + from openagents.agentid import AgentIDVerifier, AgentIDNotFoundError, AgentIDConnectionError + + try: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + transient=True, + ) as progress: + progress.add_task(f"🔍 Looking up {agent_name}...", total=None) + + client = AgentIDVerifier() + info = client.get_agent_info(agent_name, org=org) + + table = Table(title="📋 Agent Information", box=box.ROUNDED) + table.add_column("Field", style="cyan") + table.add_column("Value", style="green") + + table.add_row("Agent Name", info.agent_name) + table.add_row("Organization", info.org or "[dim]None[/dim]") + table.add_row("Status", info.status) + table.add_row("Algorithm", info.algorithm or "[dim]Not specified[/dim]") + table.add_row("Certificate Serial", info.cert_serial or "[dim]None[/dim]") + if info.created_at: + table.add_row("Created At", str(info.created_at)) + + console.print(table) + + except AgentIDNotFoundError: + full_id = f"{agent_name}@{org}" if org else agent_name + console.print(f"[red]❌ Agent not found: {full_id}[/red]") + raise typer.Exit(1) + except AgentIDConnectionError as e: + console.print(f"[red]❌ Connection error: {e.message}[/red]") + raise typer.Exit(1) + + +@agentid_app.command("resolve") +def agentid_resolve( + did: str = typer.Argument(..., help="DID to resolve (e.g., did:openagents:my-agent)") +): + """🔗 Resolve a DID document""" + from openagents.agentid import AgentIDVerifier, AgentIDNotFoundError, AgentIDConnectionError, AgentIDFormatError + + try: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + transient=True, + ) as progress: + progress.add_task(f"🔍 Resolving {did}...", total=None) + + client = AgentIDVerifier() + doc = client.resolve_did(did) + + # Display DID Document + console.print(Panel( + f"[bold]DID:[/bold] [cyan]{doc.id}[/cyan]", + title="🔗 DID Document", + border_style="blue" + )) + + # Verification Methods + if doc.verification_method: + table = Table(title="🔑 Verification Methods", box=box.ROUNDED) + table.add_column("ID", style="cyan") + table.add_column("Type", style="green") + + for method in doc.verification_method: + method_id = method.id.split("#")[-1] if "#" in method.id else method.id + table.add_row(method_id, method.type) + + console.print(table) + + # Authentication + if doc.authentication: + console.print(f"[bold]Authentication:[/bold] {', '.join(doc.authentication)}") + + # Services + if doc.service: + table = Table(title="🌐 Services", box=box.ROUNDED) + table.add_column("Type", style="cyan") + table.add_column("Endpoint", style="green") + + for svc in doc.service: + table.add_row(svc.type, svc.service_endpoint) + + console.print(table) + + except AgentIDFormatError as e: + console.print(f"[red]❌ Invalid format: {e.message}[/red]") + raise typer.Exit(1) + except AgentIDNotFoundError: + console.print(f"[red]❌ DID not found: {did}[/red]") + raise typer.Exit(1) + except AgentIDConnectionError as e: + console.print(f"[red]❌ Connection error: {e.message}[/red]") + raise typer.Exit(1) + + +@agentid_app.command("verify-token") +def agentid_verify_token( + token: str = typer.Argument(..., help="JWT token to verify") +): + """🔐 Verify a JWT token""" + from openagents.agentid import AgentIDVerifier, AgentIDConnectionError + + try: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + transient=True, + ) as progress: + progress.add_task("🔍 Verifying token...", total=None) + + client = AgentIDVerifier() + result = client.verify_token(token) + + if result.valid: + table = Table(title="✅ Token Valid", box=box.ROUNDED) + table.add_column("Field", style="cyan") + table.add_column("Value", style="green") + + table.add_row("Agent Name", result.agent_name or "[dim]Unknown[/dim]") + table.add_row("Organization", result.org or "[dim]None[/dim]") + table.add_row("Verification Level", str(result.verification_level or 2)) + if result.expires_at: + table.add_row("Expires At", str(result.expires_at)) + + console.print(table) + else: + console.print(Panel( + f"[red]❌ Token Invalid[/red]\n\nReason: {result.reason or 'Unknown'}", + title="🔐 Token Verification", + border_style="red" + )) + raise typer.Exit(1) + + except AgentIDConnectionError as e: + console.print(f"[red]❌ Connection error: {e.message}[/red]") + raise typer.Exit(1) + + +@agentid_app.command("challenge") +def agentid_challenge( + agent_name: str = typer.Argument(..., help="Agent name to request challenge for"), + org: Optional[str] = typer.Option(None, "--org", "-o", help="Organization scope"), + algorithm: str = typer.Option("RS256", "--algorithm", "-a", help="Signing algorithm (RS256 or Ed25519)") +): + """🎯 Request an authentication challenge""" + from openagents.agentid import AgentIDVerifier, AgentIDNotFoundError, AgentIDConnectionError + + try: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + transient=True, + ) as progress: + progress.add_task(f"🔍 Requesting challenge for {agent_name}...", total=None) + + client = AgentIDVerifier() + challenge = client.request_challenge(agent_name, org=org, algorithm=algorithm) + + console.print(Panel( + f"""[bold]Challenge requested successfully![/bold] + +[bold]Nonce:[/bold] [cyan]{challenge.nonce}[/cyan] +[bold]Algorithm:[/bold] {challenge.algorithm} +[bold]Expires In:[/bold] {challenge.expires_in} seconds + +[bold]Challenge (Base64):[/bold] +[dim]{challenge.challenge}[/dim] + +[yellow]Sign this challenge with your private key and use the 'token' command to get a JWT.[/yellow]""", + title="🎯 Authentication Challenge", + border_style="blue" + )) + + except AgentIDNotFoundError: + full_id = f"{agent_name}@{org}" if org else agent_name + console.print(f"[red]❌ Agent not found: {full_id}[/red]") + raise typer.Exit(1) + except AgentIDConnectionError as e: + console.print(f"[red]❌ Connection error: {e.message}[/red]") + raise typer.Exit(1) + + +@agentid_app.command("token") +def agentid_token( + agent_name: str = typer.Argument(..., help="Agent name"), + nonce: str = typer.Option(..., "--nonce", "-n", help="Challenge nonce"), + signature: str = typer.Option(..., "--signature", "-s", help="Base64-encoded signature"), + org: Optional[str] = typer.Option(None, "--org", "-o", help="Organization scope"), + quiet: bool = typer.Option(False, "--quiet", "-q", help="Output only the token") +): + """🎫 Exchange a signature for a JWT token""" + from openagents.agentid import AgentIDVerifier, AgentIDAuthenticationError, AgentIDConnectionError + + try: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + transient=True, + ) as progress: + progress.add_task("🔍 Exchanging signature for token...", total=None) + + client = AgentIDVerifier() + token = client.get_token(agent_name, nonce, signature, org=org) + + if quiet: + console.print(token.access_token) + else: + console.print(Panel( + f"""[green]✅ Token obtained successfully![/green] + +[bold]Token Type:[/bold] {token.token_type} +[bold]Expires In:[/bold] {token.expires_in} seconds +[bold]Verification Level:[/bold] {token.verification_level} + +[bold]Access Token:[/bold] +[dim]{token.access_token[:50]}...{token.access_token[-20:]}[/dim] + +Use with: [cyan]Authorization: Bearer [/cyan]""", + title="🎫 JWT Token", + border_style="green" + )) + + except AgentIDAuthenticationError as e: + console.print(f"[red]❌ Authentication failed: {e.message}[/red]") + raise typer.Exit(1) + except AgentIDConnectionError as e: + console.print(f"[red]❌ Connection error: {e.message}[/red]") + raise typer.Exit(1) + + +@agentid_app.command("auth") +def agentid_auth( + agent_name: str = typer.Argument(..., help="Agent name"), + key: str = typer.Option(..., "--key", "-k", help="Path to private key PEM file"), + org: Optional[str] = typer.Option(None, "--org", "-o", help="Organization scope"), + algorithm: str = typer.Option("RS256", "--algorithm", "-a", help="Signing algorithm"), + quiet: bool = typer.Option(False, "--quiet", "-q", help="Output only the token") +): + """🔑 Authenticate and get a JWT token (complete flow)""" + from openagents.agentid import AgentIDAuth, AgentIDAuthenticationError, AgentIDConnectionError, AgentIDNotFoundError + from pathlib import Path + + key_path = Path(key) + if not key_path.exists(): + console.print(f"[red]❌ Private key file not found: {key}[/red]") + raise typer.Exit(1) + + try: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + transient=True, + ) as progress: + progress.add_task(f"🔐 Authenticating as {agent_name}...", total=None) + + auth = AgentIDAuth( + agent_name=agent_name, + org=org, + private_key_path=key_path, + algorithm=algorithm + ) + token = auth.get_token() + + if quiet: + console.print(token.access_token) + else: + full_id = f"{agent_name}@{org}" if org else agent_name + console.print(Panel( + f"""[green]✅ Authentication successful![/green] + +[bold]Agent:[/bold] [cyan]{full_id}[/cyan] +[bold]Token Type:[/bold] {token.token_type} +[bold]Expires In:[/bold] {token.expires_in} seconds +[bold]Verification Level:[/bold] {token.verification_level} + +[bold]Access Token:[/bold] +[dim]{token.access_token[:50]}...{token.access_token[-20:]}[/dim] + +Use with: [cyan]Authorization: Bearer [/cyan]""", + title="🔑 Authenticated", + border_style="green" + )) + + except AgentIDNotFoundError: + full_id = f"{agent_name}@{org}" if org else agent_name + console.print(f"[red]❌ Agent not found: {full_id}[/red]") + raise typer.Exit(1) + except AgentIDAuthenticationError as e: + console.print(f"[red]❌ Authentication failed: {e.message}[/red]") + raise typer.Exit(1) + except AgentIDConnectionError as e: + console.print(f"[red]❌ Connection error: {e.message}[/red]") + raise typer.Exit(1) + except Exception as e: + console.print(f"[red]❌ Error: {e}[/red]") + raise typer.Exit(1) + + +@agentid_app.command("claim") +def agentid_claim( + agent_name: str = typer.Argument(..., help="Agent name to claim"), + key: str = typer.Option(..., "--key", "-k", help="Path to public key PEM file"), + org: Optional[str] = typer.Option(None, "--org", "-o", help="Organization scope"), + api_key: Optional[str] = typer.Option(None, "--api-key", "-a", help="API key for authentication"), + save_cert: Optional[str] = typer.Option(None, "--save-cert", "-c", help="Path to save the issued certificate"), +): + """📝 Claim/register a new Agent ID""" + from openagents.agentid import AgentIDVerifier, AgentIDAuthenticationError, AgentIDConnectionError + from pathlib import Path + import os + + key_path = Path(key) + if not key_path.exists(): + console.print(f"[red]❌ Public key file not found: {key}[/red]") + raise typer.Exit(1) + + # Read public key + try: + with open(key_path, "r") as f: + public_key_pem = f.read() + except Exception as e: + console.print(f"[red]❌ Failed to read public key: {e}[/red]") + raise typer.Exit(1) + + # Try to get API key from environment if not provided + if not api_key: + api_key = os.environ.get("OPENAGENTS_API_KEY") + + if not api_key: + console.print("[yellow]⚠️ No API key provided. Use --api-key or set OPENAGENTS_API_KEY environment variable.[/yellow]") + raise typer.Exit(1) + + try: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + transient=True, + ) as progress: + progress.add_task(f"📝 Claiming {agent_name}...", total=None) + + client = AgentIDVerifier() + result = client.claim_agent_id( + agent_name=agent_name, + public_key_pem=public_key_pem, + org=org, + api_key=api_key, + ) + + # Save certificate if requested + if save_cert and result.cert_pem: + cert_path = Path(save_cert) + cert_path.write_text(result.cert_pem) + cert_saved_msg = f"\n[bold]Certificate saved to:[/bold] [cyan]{save_cert}[/cyan]" + else: + cert_saved_msg = "" + + full_id = f"{result.agent_name}@{result.org}" if result.org else result.agent_name + console.print(Panel( + f"""[green]✅ Agent ID claimed successfully![/green] + +[bold]Agent Name:[/bold] {result.agent_name} +[bold]Organization:[/bold] {result.org or '[dim]None[/dim]'} +[bold]Status:[/bold] {result.status} +[bold]Certificate Serial:[/bold] {result.serial or '[dim]None[/dim]'} + +[bold]Level 2 ID:[/bold] [cyan]openagents:{full_id}[/cyan] +[bold]Level 3 ID:[/bold] [cyan]did:openagents:{full_id}[/cyan]{cert_saved_msg}""", + title="📝 Agent ID Claimed", + border_style="green" + )) + + except AgentIDAuthenticationError as e: + console.print(f"[red]❌ Authentication failed: {e.message}[/red]") + console.print("[dim]Make sure your API key is valid and belongs to the specified organization.[/dim]") + raise typer.Exit(1) + except AgentIDConnectionError as e: + if "already exists" in str(e.message).lower(): + console.print(f"[red]❌ Agent ID already exists: {agent_name}[/red]") + console.print("[dim]Choose a different agent name or check if you already own this agent.[/dim]") + else: + console.print(f"[red]❌ Error: {e.message}[/red]") + raise typer.Exit(1) + except Exception as e: + console.print(f"[red]❌ Error: {e}[/red]") + raise typer.Exit(1) + + def show_banner(): """Show a beautiful startup banner""" banner_text = """ diff --git a/tests/agentid/__init__.py b/tests/agentid/__init__.py new file mode 100644 index 00000000..eb93a47a --- /dev/null +++ b/tests/agentid/__init__.py @@ -0,0 +1 @@ +"""Tests for the agentid module.""" diff --git a/tests/agentid/test_client.py b/tests/agentid/test_client.py new file mode 100644 index 00000000..04f4a9db --- /dev/null +++ b/tests/agentid/test_client.py @@ -0,0 +1,433 @@ +"""Tests for the agentid client module.""" + +import pytest +import base64 +from unittest.mock import AsyncMock, MagicMock, patch +from openagents.agentid.client import AgentIDVerifier, AgentIDAuth +from openagents.agentid.models import ( + VerificationResult, + AgentInfo, + ChallengeResponse, + TokenResponse, + DIDDocument, + TokenValidationResult, + AgentIDLevel, +) +from openagents.agentid.exceptions import ( + AgentIDNotFoundError, + AgentIDConnectionError, + AgentIDSignatureError, + AgentIDChallengeExpiredError, +) + + +class AsyncContextManagerMock: + """Helper to create async context manager mocks.""" + + def __init__(self, return_value): + self.return_value = return_value + + async def __aenter__(self): + return self.return_value + + async def __aexit__(self, *args): + pass + + +class TestAgentIDVerifier: + """Tests for AgentIDVerifier class.""" + + @pytest.fixture + def client(self): + """Create a test client.""" + return AgentIDVerifier(endpoint="https://test.example.com") + + @pytest.mark.asyncio + async def test_validate_async_success(self, client): + """Test successful verification.""" + mock_response = MagicMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value={ + "code": 200, + "data": { + "agentName": "my-agent", + "org": "my-org", + "status": "active", + } + }) + + mock_session = MagicMock() + mock_session.get = MagicMock(return_value=AsyncContextManagerMock(mock_response)) + + with patch.object(client, '_get_session', new=AsyncMock(return_value=mock_session)): + result = await client.validate_async("openagents:my-agent@my-org") + + assert result.verified is True + assert result.agent_name == "my-agent" + assert result.org == "my-org" + assert result.level == AgentIDLevel.LEVEL_2 + + @pytest.mark.asyncio + async def test_validate_async_not_found(self, client): + """Test verification when agent not found.""" + mock_response = MagicMock() + mock_response.status = 404 + mock_response.json = AsyncMock(return_value={ + "code": 404, + "message": "Agent not found" + }) + + mock_session = MagicMock() + mock_session.get = MagicMock(return_value=AsyncContextManagerMock(mock_response)) + + with patch.object(client, '_get_session', new=AsyncMock(return_value=mock_session)): + result = await client.validate_async("openagents:unknown-agent") + + assert result.verified is False + assert result.agent_name == "unknown-agent" + + @pytest.mark.asyncio + async def test_get_agent_info_async_success(self, client): + """Test getting agent info successfully.""" + mock_response = MagicMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value={ + "code": 200, + "data": { + "agentName": "my-agent", + "org": "my-org", + "status": "active", + "publicKeyPem": "-----BEGIN PUBLIC KEY-----\n...", + "serial": "ABC123", + "algorithm": "RS256", + } + }) + + mock_session = MagicMock() + mock_session.get = MagicMock(return_value=AsyncContextManagerMock(mock_response)) + + with patch.object(client, '_get_session', new=AsyncMock(return_value=mock_session)): + result = await client.get_agent_info_async("my-agent", org="my-org") + + assert isinstance(result, AgentInfo) + assert result.agent_name == "my-agent" + assert result.org == "my-org" + assert result.status == "active" + assert result.algorithm == "RS256" + + @pytest.mark.asyncio + async def test_get_agent_info_async_not_found(self, client): + """Test getting agent info when not found.""" + mock_response = MagicMock() + mock_response.status = 404 + mock_response.json = AsyncMock(return_value={ + "code": 404, + "message": "Agent not found" + }) + + mock_session = MagicMock() + mock_session.get = MagicMock(return_value=AsyncContextManagerMock(mock_response)) + + with patch.object(client, '_get_session', new=AsyncMock(return_value=mock_session)): + with pytest.raises(AgentIDNotFoundError): + await client.get_agent_info_async("unknown-agent") + + @pytest.mark.asyncio + async def test_request_challenge_async_success(self, client): + """Test requesting a challenge successfully.""" + mock_response = MagicMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value={ + "code": 200, + "data": { + "challenge": base64.b64encode(b"test-challenge").decode(), + "nonce": "test-nonce-123", + "algorithm": "RS256", + "expiresIn": 300, + } + }) + + mock_session = MagicMock() + mock_session.post = MagicMock(return_value=AsyncContextManagerMock(mock_response)) + + with patch.object(client, '_get_session', new=AsyncMock(return_value=mock_session)): + result = await client.request_challenge_async("my-agent", org="my-org") + + assert isinstance(result, ChallengeResponse) + assert result.nonce == "test-nonce-123" + assert result.algorithm == "RS256" + assert result.expires_in == 300 + + @pytest.mark.asyncio + async def test_get_token_async_success(self, client): + """Test getting a token successfully.""" + mock_response = MagicMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value={ + "code": 200, + "data": { + "accessToken": "eyJ...", + "tokenType": "bearer", + "expiresIn": 899, + "verificationLevel": 2, + } + }) + + mock_session = MagicMock() + mock_session.post = MagicMock(return_value=AsyncContextManagerMock(mock_response)) + + with patch.object(client, '_get_session', new=AsyncMock(return_value=mock_session)): + result = await client.get_token_async( + "my-agent", + nonce="test-nonce", + signature="test-signature", + org="my-org" + ) + + assert isinstance(result, TokenResponse) + assert result.access_token == "eyJ..." + assert result.token_type == "bearer" + assert result.expires_in == 899 + assert result.verification_level == 2 + + @pytest.mark.asyncio + async def test_get_token_async_signature_invalid(self, client): + """Test getting a token with invalid signature.""" + mock_response = MagicMock() + mock_response.status = 401 + mock_response.json = AsyncMock(return_value={ + "code": 401, + "message": "Signature verification failed" + }) + + mock_session = MagicMock() + mock_session.post = MagicMock(return_value=AsyncContextManagerMock(mock_response)) + + with patch.object(client, '_get_session', new=AsyncMock(return_value=mock_session)): + with pytest.raises(AgentIDSignatureError): + await client.get_token_async( + "my-agent", + nonce="test-nonce", + signature="invalid-signature", + ) + + @pytest.mark.asyncio + async def test_get_token_async_challenge_expired(self, client): + """Test getting a token with expired challenge.""" + mock_response = MagicMock() + mock_response.status = 401 + mock_response.json = AsyncMock(return_value={ + "code": 401, + "message": "Challenge has expired" + }) + + mock_session = MagicMock() + mock_session.post = MagicMock(return_value=AsyncContextManagerMock(mock_response)) + + with patch.object(client, '_get_session', new=AsyncMock(return_value=mock_session)): + with pytest.raises(AgentIDChallengeExpiredError): + await client.get_token_async( + "my-agent", + nonce="expired-nonce", + signature="test-signature", + ) + + @pytest.mark.asyncio + async def test_verify_token_async_valid(self, client): + """Test validating a valid token.""" + mock_response = MagicMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value={ + "code": 200, + "data": { + "valid": True, + "agentName": "my-agent", + "org": "my-org", + "verificationLevel": 2, + } + }) + + mock_session = MagicMock() + mock_session.post = MagicMock(return_value=AsyncContextManagerMock(mock_response)) + + with patch.object(client, '_get_session', new=AsyncMock(return_value=mock_session)): + result = await client.verify_token_async("eyJ...") + + assert isinstance(result, TokenValidationResult) + assert result.valid is True + assert result.agent_name == "my-agent" + + @pytest.mark.asyncio + async def test_verify_token_async_invalid(self, client): + """Test validating an invalid token.""" + mock_response = MagicMock() + mock_response.status = 401 + mock_response.json = AsyncMock(return_value={ + "code": 401, + "message": "Token expired" + }) + + mock_session = MagicMock() + mock_session.post = MagicMock(return_value=AsyncContextManagerMock(mock_response)) + + with patch.object(client, '_get_session', new=AsyncMock(return_value=mock_session)): + result = await client.verify_token_async("expired-token") + + assert result.valid is False + assert result.reason == "Token expired" + + @pytest.mark.asyncio + async def test_resolve_did_async_success(self, client): + """Test resolving a DID successfully.""" + mock_response = MagicMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value={ + "code": 200, + "data": { + "@context": ["https://www.w3.org/ns/did/v1"], + "id": "did:openagents:my-agent@my-org", + "verificationMethod": [{ + "id": "did:openagents:my-agent@my-org#key-1", + "type": "RsaVerificationKey2018", + "controller": "did:openagents:my-agent@my-org", + "publicKeyPem": "-----BEGIN PUBLIC KEY-----\n...", + }], + "authentication": ["did:openagents:my-agent@my-org#key-1"], + } + }) + + mock_session = MagicMock() + mock_session.get = MagicMock(return_value=AsyncContextManagerMock(mock_response)) + + with patch.object(client, '_get_session', new=AsyncMock(return_value=mock_session)): + result = await client.resolve_did_async("did:openagents:my-agent@my-org") + + assert isinstance(result, DIDDocument) + assert result.id == "did:openagents:my-agent@my-org" + assert len(result.verification_method) == 1 + assert result.agent_name == "my-agent" + assert result.org == "my-org" + + +class TestAgentIDAuth: + """Tests for AgentIDAuth class.""" + + def test_init_with_path(self, tmp_path): + """Test initialization with private key path.""" + key_file = tmp_path / "key.pem" + key_file.write_text("-----BEGIN PRIVATE KEY-----\ntest\n-----END PRIVATE KEY-----") + + auth = AgentIDAuth( + agent_name="my-agent", + org="my-org", + private_key_path=str(key_file), + ) + + assert auth.agent_name == "my-agent" + assert auth.org == "my-org" + assert auth.algorithm == "RS256" + + def test_init_with_pem(self): + """Test initialization with private key PEM string.""" + pem = "-----BEGIN PRIVATE KEY-----\ntest\n-----END PRIVATE KEY-----" + + auth = AgentIDAuth( + agent_name="my-agent", + private_key_pem=pem, + ) + + assert auth.agent_name == "my-agent" + + def test_init_without_key_raises(self): + """Test that initialization without key raises error.""" + with pytest.raises(ValueError): + AgentIDAuth(agent_name="my-agent") + + def test_sign_challenge_rsa(self, tmp_path): + """Test signing a challenge with RSA key.""" + # Generate a test RSA key + from cryptography.hazmat.primitives.asymmetric import rsa + from cryptography.hazmat.primitives import serialization + + private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + ) + pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + + key_file = tmp_path / "key.pem" + key_file.write_bytes(pem) + + auth = AgentIDAuth( + agent_name="my-agent", + private_key_path=str(key_file), + algorithm="RS256", + ) + + challenge = base64.b64encode(b"test-challenge").decode() + signature = auth.sign_challenge(challenge) + + # Signature should be base64 encoded + assert isinstance(signature, str) + # Should be able to decode it + sig_bytes = base64.b64decode(signature) + assert len(sig_bytes) > 0 + + @pytest.mark.asyncio + async def test_get_token_async(self, tmp_path): + """Test the complete authentication flow.""" + # Generate a test RSA key + from cryptography.hazmat.primitives.asymmetric import rsa + from cryptography.hazmat.primitives import serialization + + private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + ) + pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + + key_file = tmp_path / "key.pem" + key_file.write_bytes(pem) + + auth = AgentIDAuth( + agent_name="my-agent", + org="my-org", + private_key_path=str(key_file), + ) + + # Mock the client methods + challenge = ChallengeResponse( + challenge=base64.b64encode(b"test-challenge").decode(), + nonce="test-nonce", + algorithm="RS256", + expires_in=300, + ) + + token = TokenResponse( + access_token="eyJ...", + token_type="bearer", + expires_in=899, + verification_level=2, + ) + + auth.client.request_challenge_async = AsyncMock(return_value=challenge) + auth.client.get_token_async = AsyncMock(return_value=token) + + result = await auth.get_token_async() + + assert result.access_token == "eyJ..." + assert result.expires_in == 899 + + # Verify the client methods were called correctly + auth.client.request_challenge_async.assert_called_once_with( + "my-agent", org="my-org", algorithm="RS256" + ) + auth.client.get_token_async.assert_called_once() diff --git a/tests/agentid/test_parser.py b/tests/agentid/test_parser.py new file mode 100644 index 00000000..1e232176 --- /dev/null +++ b/tests/agentid/test_parser.py @@ -0,0 +1,270 @@ +"""Tests for the agentid parser module.""" + +import pytest +from openagents.agentid.parser import ( + parse_agent_id, + normalize_to_level2, + normalize_to_level3, + normalize_to_simple, + extract_components, + is_valid_agent_id, + get_format, + validate_agent_name, + validate_org_name, +) +from openagents.agentid.models import AgentIDFormat +from openagents.agentid.exceptions import AgentIDFormatError + + +class TestValidateAgentName: + """Tests for validate_agent_name function.""" + + def test_valid_names(self): + """Valid agent names should pass validation.""" + valid_names = [ + "abc", # minimum length + "my-agent", + "my_agent", + "myAgent123", + "agent-123-test", + "a" * 64, # maximum length + ] + for name in valid_names: + assert validate_agent_name(name), f"'{name}' should be valid" + + def test_invalid_names(self): + """Invalid agent names should fail validation.""" + invalid_names = [ + "", # empty + "ab", # too short + "a" * 65, # too long + "-agent", # starts with hyphen + "_agent", # starts with underscore + "agent!", # invalid character + "agent name", # space + "agent@org", # @ is not allowed in name + ] + for name in invalid_names: + assert not validate_agent_name(name), f"'{name}' should be invalid" + + +class TestValidateOrgName: + """Tests for validate_org_name function.""" + + def test_valid_orgs(self): + """Valid organization names should pass validation.""" + valid_orgs = [ + "abc", + "my-org", + "my_org", + "MyOrg123", + ] + for org in valid_orgs: + assert validate_org_name(org), f"'{org}' should be valid" + + def test_invalid_orgs(self): + """Invalid organization names should fail validation.""" + invalid_orgs = [ + "", + "ab", # too short + "-org", # starts with hyphen + ] + for org in invalid_orgs: + assert not validate_org_name(org), f"'{org}' should be invalid" + + +class TestParseAgentId: + """Tests for parse_agent_id function.""" + + def test_parse_simple_format(self): + """Parse simple format without prefix.""" + result = parse_agent_id("my-agent") + assert result.agent_name == "my-agent" + assert result.org is None + assert result.format == AgentIDFormat.SIMPLE + + def test_parse_simple_format_with_org(self): + """Parse simple format with organization.""" + result = parse_agent_id("my-agent@my-org") + assert result.agent_name == "my-agent" + assert result.org == "my-org" + assert result.format == AgentIDFormat.SIMPLE + + def test_parse_level2_format(self): + """Parse Level 2 format (openagents:xxx).""" + result = parse_agent_id("openagents:my-agent") + assert result.agent_name == "my-agent" + assert result.org is None + assert result.format == AgentIDFormat.LEVEL_2 + + def test_parse_level2_format_with_org(self): + """Parse Level 2 format with organization.""" + result = parse_agent_id("openagents:my-agent@my-org") + assert result.agent_name == "my-agent" + assert result.org == "my-org" + assert result.format == AgentIDFormat.LEVEL_2 + + def test_parse_level3_format(self): + """Parse Level 3 DID format.""" + result = parse_agent_id("did:openagents:my-agent") + assert result.agent_name == "my-agent" + assert result.org is None + assert result.format == AgentIDFormat.LEVEL_3 + + def test_parse_level3_format_with_org(self): + """Parse Level 3 DID format with organization.""" + result = parse_agent_id("did:openagents:my-agent@my-org") + assert result.agent_name == "my-agent" + assert result.org == "my-org" + assert result.format == AgentIDFormat.LEVEL_3 + + def test_parse_with_whitespace(self): + """Parse agent ID with leading/trailing whitespace.""" + result = parse_agent_id(" openagents:my-agent ") + assert result.agent_name == "my-agent" + + def test_parse_invalid_empty(self): + """Empty string should raise error.""" + with pytest.raises(AgentIDFormatError): + parse_agent_id("") + + def test_parse_invalid_name(self): + """Invalid agent name should raise error.""" + with pytest.raises(AgentIDFormatError): + parse_agent_id("openagents:ab") # too short + + def test_parse_invalid_multiple_at(self): + """Multiple @ symbols should raise error.""" + with pytest.raises(AgentIDFormatError): + parse_agent_id("openagents:my-agent@org@extra") + + def test_parse_empty_after_prefix(self): + """Empty name after prefix should raise error.""" + with pytest.raises(AgentIDFormatError): + parse_agent_id("openagents:") + + +class TestNormalizeFunctions: + """Tests for normalization functions.""" + + def test_normalize_to_level2_from_simple(self): + """Normalize simple format to Level 2.""" + assert normalize_to_level2("my-agent") == "openagents:my-agent" + assert normalize_to_level2("my-agent@org") == "openagents:my-agent@org" + + def test_normalize_to_level2_from_level2(self): + """Normalize Level 2 format to Level 2 (no change).""" + assert normalize_to_level2("openagents:my-agent") == "openagents:my-agent" + + def test_normalize_to_level2_from_level3(self): + """Normalize Level 3 format to Level 2.""" + assert normalize_to_level2("did:openagents:my-agent") == "openagents:my-agent" + + def test_normalize_to_level3_from_simple(self): + """Normalize simple format to Level 3.""" + assert normalize_to_level3("my-agent") == "did:openagents:my-agent" + assert normalize_to_level3("my-agent@org") == "did:openagents:my-agent@org" + + def test_normalize_to_level3_from_level2(self): + """Normalize Level 2 format to Level 3.""" + assert normalize_to_level3("openagents:my-agent") == "did:openagents:my-agent" + + def test_normalize_to_level3_from_level3(self): + """Normalize Level 3 format to Level 3 (no change).""" + assert normalize_to_level3("did:openagents:my-agent") == "did:openagents:my-agent" + + def test_normalize_to_simple(self): + """Normalize any format to simple format.""" + assert normalize_to_simple("my-agent") == "my-agent" + assert normalize_to_simple("openagents:my-agent") == "my-agent" + assert normalize_to_simple("did:openagents:my-agent") == "my-agent" + assert normalize_to_simple("openagents:my-agent@org") == "my-agent@org" + + +class TestExtractComponents: + """Tests for extract_components function.""" + + def test_extract_from_simple(self): + """Extract from simple format.""" + name, org = extract_components("my-agent") + assert name == "my-agent" + assert org is None + + def test_extract_with_org(self): + """Extract with organization.""" + name, org = extract_components("openagents:my-agent@my-org") + assert name == "my-agent" + assert org == "my-org" + + def test_extract_from_did(self): + """Extract from DID format.""" + name, org = extract_components("did:openagents:my-agent@org") + assert name == "my-agent" + assert org == "org" + + +class TestIsValidAgentId: + """Tests for is_valid_agent_id function.""" + + def test_valid_ids(self): + """Valid agent IDs should return True.""" + valid_ids = [ + "my-agent", + "openagents:my-agent", + "did:openagents:my-agent", + "openagents:my-agent@my-org", + ] + for agent_id in valid_ids: + assert is_valid_agent_id(agent_id), f"'{agent_id}' should be valid" + + def test_invalid_ids(self): + """Invalid agent IDs should return False.""" + invalid_ids = [ + "", + "ab", # too short + "openagents:", # empty name + "my-agent@", # empty org + ] + for agent_id in invalid_ids: + assert not is_valid_agent_id(agent_id), f"'{agent_id}' should be invalid" + + +class TestGetFormat: + """Tests for get_format function.""" + + def test_get_simple_format(self): + """Get format for simple IDs.""" + assert get_format("my-agent") == AgentIDFormat.SIMPLE + assert get_format("my-agent@org") == AgentIDFormat.SIMPLE + + def test_get_level2_format(self): + """Get format for Level 2 IDs.""" + assert get_format("openagents:my-agent") == AgentIDFormat.LEVEL_2 + + def test_get_level3_format(self): + """Get format for Level 3 IDs.""" + assert get_format("did:openagents:my-agent") == AgentIDFormat.LEVEL_3 + + +class TestParsedAgentIDProperties: + """Tests for ParsedAgentID model properties.""" + + def test_full_name_without_org(self): + """Full name without org should be just the agent name.""" + result = parse_agent_id("my-agent") + assert result.full_name == "my-agent" + + def test_full_name_with_org(self): + """Full name with org should include @org suffix.""" + result = parse_agent_id("openagents:my-agent@my-org") + assert result.full_name == "my-agent@my-org" + + def test_level_2_id_property(self): + """level_2_id property should return correct format.""" + result = parse_agent_id("my-agent@org") + assert result.level_2_id == "openagents:my-agent@org" + + def test_level_3_id_property(self): + """level_3_id property should return correct format.""" + result = parse_agent_id("my-agent@org") + assert result.level_3_id == "did:openagents:my-agent@org"