From 22b21828ba68ba623cd2cecd13afdfe39817dff1 Mon Sep 17 00:00:00 2001 From: emptychan Date: Thu, 19 Jun 2025 23:12:32 +0800 Subject: [PATCH] support sse and streamable-http mode --- CONFIG.md | 28 +++++++++++- README.md | 15 +++++++ mcp-server-config-example.json | 15 +++++++ src/mcp_client_cli/cli.py | 80 +++++++++++++++++++++++----------- src/mcp_client_cli/config.py | 17 ++++++-- src/mcp_client_cli/const.py | 20 ++++++++- src/mcp_client_cli/storage.py | 21 ++++++--- src/mcp_client_cli/tool.py | 39 ++++++++++++----- 8 files changed, 186 insertions(+), 49 deletions(-) mode change 100755 => 100644 src/mcp_client_cli/cli.py mode change 100755 => 100644 src/mcp_client_cli/tool.py diff --git a/CONFIG.md b/CONFIG.md index e54ec2b..97b9480 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -21,7 +21,7 @@ The configuration file can be placed in either: "base_url": "string" }, "mcpServers": { - "server_name": { + "stdio_server_name": { "command": "string", "args": ["string"], "env": { @@ -30,6 +30,15 @@ The configuration file can be placed in either: "enabled": boolean, "exclude_tools": ["string"], "requires_confirmation": ["string"] + }, + "remote_server_name": { + "url": "string", + "headers": {}, + "timeout": float, + "sse_read_timeout": float, + "enabled": boolean, + "exclude_tools": ["string"], + "requires_confirmation": ["string"] } } } @@ -62,9 +71,13 @@ The configuration file can be placed in either: | Field | Type | Required | Default | Description | |-------|------|----------|---------|-------------| -| `command` | string | Yes | - | Command to run the server | +| `command` | string | Yes (stdio mode) | - | Command to run the server | | `args` | array | No | `[]` | Command-line arguments | | `env` | object | No | `{}` | Environment variables | +| `url` | string | Yes (sse or streamable-http mode) | - | Sse or Streamable-HTTP url | +| `headers` | object | No | `{}` | Http request headers | +| `timeout` | number | No | 30 | Http request timeout | +| `sse_read_timeout` | number | No | 300 | Sse data read timeout | | `enabled` | boolean | No | `true` | Whether the server is enabled | | `exclude_tools` | array | No | `[]` | Tool names to exclude | | `requires_confirmation` | array | No | `[]` | Tools requiring user confirmation | @@ -85,6 +98,17 @@ The configuration file can be placed in either: "command": "uvx", "args": ["mcp-server-fetch"] }, + "add": { + "url": "http://localhost:8000/sse", + "headers": {}, + "timeout": 50, + "requires_confirmation": ["add"] + }, + "subtract": { + "url": "http://localhost:8000/mcp", + "headers": {}, + "timeout": 50 + }, "brave-search": { "command": "npx", "args": ["-y", "@modelcontextprotocol/server-brave-search"], diff --git a/README.md b/README.md index a698fcf..ba5e5cf 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,21 @@ This act as alternative client beside Claude Desktop. Additionally you can use a "base_url": "https://api.openai.com/v1" // Optional, for OpenRouter or other providers }, "mcpServers": { + "add": { + "url": "http://localhost:8000/sse", // SSE + "headers": {}, + "timeout": 50, + "requires_confirmation": ["add"], + "enabled": true, // Optional, defaults to true + "exclude_tools": [] // Optional, list of tool names to exclude + }, + "subtract": { + "url": "http://localhost:8000/mcp", //Streamable-HTTP + "headers": {}, + "timeout": 50, + "enabled": true, // Optional, defaults to true + "exclude_tools": [] // Optional, list of tool names to exclude + }, "fetch": { "command": "uvx", "args": ["mcp-server-fetch"], diff --git a/mcp-server-config-example.json b/mcp-server-config-example.json index dba019f..a6ba04d 100644 --- a/mcp-server-config-example.json +++ b/mcp-server-config-example.json @@ -11,6 +11,21 @@ "command": "uvx", "args": ["mcp-server-fetch"] }, + "add": { + "url": "http://localhost:8000/sse", + "headers": {}, + "timeout": 50, + "requires_confirmation": ["add"], + "enabled": true, + "exclude_tools": [] + }, + "subtract": { + "url": "http://localhost:8000/mcp", + "headers": {}, + "timeout": 50, + "enabled": true, + "exclude_tools": [] + }, "brave-search": { "command": "npx", "args": ["-y", "@modelcontextprotocol/server-brave-search"], diff --git a/src/mcp_client_cli/cli.py b/src/mcp_client_cli/cli.py old mode 100755 new mode 100644 index 00ddce5..e3d5e8e --- a/src/mcp_client_cli/cli.py +++ b/src/mcp_client_cli/cli.py @@ -111,18 +111,34 @@ def setup_argument_parser() -> argparse.Namespace: async def handle_list_tools(app_config: AppConfig, args: argparse.Namespace) -> None: """Handle the --list-tools command.""" - server_configs = [ - McpServerConfig( - server_name=name, - server_param=StdioServerParameters( + server_configs = [] + for name, config in app_config.get_enabled_servers().items(): + mcp_type = McpType.STDIO + server_param = None + if config.command: + server_param = StdioServerParameters( command=config.command, args=config.args or [], env={**(config.env or {}), **os.environ} - ), + ) + elif config.url and config.url.endswith('sse'): + mcp_type = McpType.SSE + elif config.url: + mcp_type = McpType.STREAMABLE_HTTP + if server_param is None: + server_param = StramableHttpOrSseParameters( + url = config.url, + headers=config.headers, + timeout=config.timeout, + sse_read_timeout=config.sse_read_timeout, + terminate_on_close=config.terminate_on_close, + ) + server_configs.append(McpServerConfig( + mcp_type=mcp_type, + server_name=name, + server_param=server_param, exclude_tools=config.exclude_tools or [] - ) - for name, config in app_config.get_enabled_servers().items() - ] + )) toolkits, tools = await load_tools(server_configs, args.no_tools, args.force_refresh) console = Console() @@ -186,18 +202,34 @@ async def convert_toolkit(server_config: McpServerConfig): async def handle_conversation(args: argparse.Namespace, query: HumanMessage, is_conversation_continuation: bool, app_config: AppConfig) -> None: """Handle the main conversation flow.""" - server_configs = [ - McpServerConfig( - server_name=name, - server_param=StdioServerParameters( + server_configs = [] + for name, config in app_config.get_enabled_servers().items(): + mcp_type = McpType.STDIO + server_param = None + if config.command: + server_param = StdioServerParameters( command=config.command, args=config.args or [], env={**(config.env or {}), **os.environ} - ), + ) + elif config.url and config.url.endswith('sse'): + mcp_type = McpType.SSE + elif config.url: + mcp_type = McpType.STREAMABLE_HTTP + if server_param is None: + server_param = StramableHttpOrSseParameters( + url = config.url, + headers=config.headers, + timeout=config.timeout, + sse_read_timeout=config.sse_read_timeout, + terminate_on_close=config.terminate_on_close + ) + server_configs.append(McpServerConfig( + mcp_type=mcp_type, + server_name=name, + server_param=server_param, exclude_tools=config.exclude_tools or [] - ) - for name, config in app_config.get_enabled_servers().items() - ] + )) toolkits, tools = await load_tools(server_configs, args.no_tools, args.force_refresh) extra_body = {} @@ -233,19 +265,17 @@ async def handle_conversation(args: argparse.Namespace, query: HumanMessage, formatted_memories = "\n".join(f"- {memory}" for memory in memories) agent_executor = create_react_agent( model, tools, state_schema=AgentState, - state_modifier=prompt, checkpointer=checkpointer, store=store + prompt=prompt, checkpointer=checkpointer, store=store ) thread_id = (await conversation_manager.get_last_id() if is_conversation_continuation else uuid.uuid4().hex) - - input_messages = AgentState( - messages=[query], - today_datetime=datetime.now().isoformat(), - memories=formatted_memories, - remaining_steps=3 - ) - + input_messages = { + "messages": [query], + "today_datetime": datetime.now().isoformat(), + "memories": formatted_memories, + "remaining_steps": 3 + } output = OutputHandler(text_only=args.text_only, only_last_message=args.no_intermediates) output.start() try: diff --git a/src/mcp_client_cli/config.py b/src/mcp_client_cli/config.py index b3ace46..0fb7dbb 100644 --- a/src/mcp_client_cli/config.py +++ b/src/mcp_client_cli/config.py @@ -5,7 +5,6 @@ import os import commentjson from typing import Dict, List, Optional - from .const import CONFIG_FILE, CONFIG_DIR @dataclass @@ -31,7 +30,13 @@ def from_dict(cls, config: dict) -> "LLMConfig": @dataclass class ServerConfig: """Configuration for an MCP server.""" - command: str + url: str = None + headers: dict[str, str] | None = None + timeout: float = 30 + sse_read_timeout: float = 60 * 5 + terminate_on_close: bool = True + + command: str = None args: List[str] = None env: Dict[str, str] = None enabled: bool = True @@ -42,7 +47,11 @@ class ServerConfig: def from_dict(cls, config: dict) -> "ServerConfig": """Create ServerConfig from dictionary.""" return cls( - command=config["command"], + url=config.get("url", ""), + headers=config.get("headers", {}), + timeout=config.get("timeout", 30), + sse_read_timeout=config.get("sse_read_timeout", 60 * 5), + command=config.get("command", ""), args=config.get("args", []), env=config.get("env", {}), enabled=config.get("enabled", True), @@ -67,7 +76,7 @@ def load(cls) -> "AppConfig": if chosen_path is None: raise FileNotFoundError(f"Could not find config file in any of: {', '.join(map(str, config_paths))}") - with open(chosen_path, 'r') as f: + with open(chosen_path, 'r', encoding='utf-8') as f: config = commentjson.load(f) # Extract tools requiring confirmation diff --git a/src/mcp_client_cli/const.py b/src/mcp_client_cli/const.py index 92b2066..82d08e8 100644 --- a/src/mcp_client_cli/const.py +++ b/src/mcp_client_cli/const.py @@ -1,8 +1,26 @@ +import httpx +from datetime import timedelta from pathlib import Path +from enum import Enum +from pydantic import BaseModel CACHE_EXPIRY_HOURS = 24 DEFAULT_QUERY = "Summarize https://www.youtube.com/watch?v=NExtKbS1Ljc" CONFIG_FILE = 'mcp-server-config.json' CONFIG_DIR = Path.home() / ".llm" SQLITE_DB = CONFIG_DIR / "conversations.db" -CACHE_DIR = CONFIG_DIR / "mcp-tools" \ No newline at end of file +CACHE_DIR = CONFIG_DIR / "mcp-tools" + + +class McpType(Enum): + STDIO = 1 + SSE = 2 + STREAMABLE_HTTP = 3 + + +class StramableHttpOrSseParameters(BaseModel): + url: str + headers: dict[str, str] | None = None + timeout: float = 30 + sse_read_timeout: float = 60 * 5 + terminate_on_close: bool = True diff --git a/src/mcp_client_cli/storage.py b/src/mcp_client_cli/storage.py index 15f151f..a54c7e3 100644 --- a/src/mcp_client_cli/storage.py +++ b/src/mcp_client_cli/storage.py @@ -1,5 +1,5 @@ from datetime import datetime, timedelta -from typing import Optional, List +from typing import Optional, List, Union from mcp import StdioServerParameters, types import json import aiosqlite @@ -7,17 +7,20 @@ from .const import * -def get_cached_tools(server_param: StdioServerParameters) -> Optional[List[types.Tool]]: +def get_cached_tools(server_param: Union[StdioServerParameters, StramableHttpOrSseParameters]) -> Optional[List[types.Tool]]: """Retrieve cached tools if available and not expired. Args: - server_param (StdioServerParameters): The server parameters to identify the cache. + server_param (StdioServerParameters | StramableHttpOrSseParameters): The server parameters to identify the cache. Returns: Optional[List[types.Tool]]: A list of tools if cache is available and not expired, otherwise None. """ CACHE_DIR.mkdir(parents=True, exist_ok=True) - cache_key = f"{server_param.command}-{'-'.join(server_param.args)}".replace("/", "-") + if isinstance(server_param, StdioServerParameters): + cache_key = f"{server_param.command}-{'-'.join(server_param.args)}".replace("/", "-") + elif isinstance(server_param, StramableHttpOrSseParameters): + cache_key = f"{server_param.url}".replace("/", "-").replace(':', '') cache_file = CACHE_DIR / f"{cache_key}.json" if not cache_file.exists(): @@ -32,14 +35,18 @@ def get_cached_tools(server_param: StdioServerParameters) -> Optional[List[types return [types.Tool(**tool) for tool in cache_data["tools"]] -def save_tools_cache(server_param: StdioServerParameters, tools: List[types.Tool]) -> None: +def save_tools_cache(server_param: Union[StdioServerParameters, StramableHttpOrSseParameters], + tools: List[types.Tool]) -> None: """Save tools to cache. Args: - server_param (StdioServerParameters): The server parameters to identify the cache. + server_param (StdioServerParameters | StramableHttpOrSseParameters): The server parameters to identify the cache. tools (List[types.Tool]): The list of tools to be cached. """ - cache_key = f"{server_param.command}-{'-'.join(server_param.args)}".replace("/", "-") + if isinstance(server_param, StdioServerParameters): + cache_key = f"{server_param.command}-{'-'.join(server_param.args)}".replace("/", "-") + elif isinstance(server_param, StramableHttpOrSseParameters): + cache_key = f"{server_param.url}".replace("/", "-").replace(':', '') cache_file = CACHE_DIR / f"{cache_key}.json" cache_data = { diff --git a/src/mcp_client_cli/tool.py b/src/mcp_client_cli/tool.py old mode 100755 new mode 100644 index df39308..45601da --- a/src/mcp_client_cli/tool.py +++ b/src/mcp_client_cli/tool.py @@ -1,14 +1,16 @@ -from typing import List, Type, Optional, Any, override +from typing import List, Type, Optional, Union from pydantic import BaseModel from langchain_core.tools import BaseTool, BaseToolkit, ToolException from mcp import StdioServerParameters, types, ClientSession from mcp.client.stdio import stdio_client +from mcp.client.streamable_http import streamablehttp_client +from mcp.client.sse import sse_client import pydantic from pydantic_core import to_json from jsonschema_pydantic import jsonschema_to_pydantic import asyncio - from .storage import * +from .const import McpType, StramableHttpOrSseParameters class McpServerConfig(BaseModel): """Configuration for an MCP server. @@ -22,19 +24,21 @@ class McpServerConfig(BaseModel): command, arguments and environment variables exclude_tools (list[str]): List of tool names to exclude from this server """ - + mcp_type: McpType server_name: str - server_param: StdioServerParameters + server_param: Union[StdioServerParameters, StramableHttpOrSseParameters] exclude_tools: list[str] = [] + class McpToolkit(BaseToolkit): name: str - server_param: StdioServerParameters + mcp_type: McpType + server_param: Union[StdioServerParameters, StramableHttpOrSseParameters] exclude_tools: list[str] = [] _session: Optional[ClientSession] = None _tools: List[BaseTool] = [] _client = None - _init_lock: asyncio.Lock = None + _init_lock: asyncio.Lock | None = None model_config = pydantic.ConfigDict(arbitrary_types_allowed=True) @@ -46,9 +50,23 @@ async def _start_session(self): async with self._init_lock: if self._session: return self._session - - self._client = stdio_client(self.server_param) - read, write = await self._client.__aenter__() + read, write, get_session_id = None, None, None + if self.mcp_type == McpType.STDIO: + self._client = stdio_client(self.server_param) + read, write = await self._client.__aenter__() + elif self.mcp_type == McpType.STREAMABLE_HTTP: + self._client = streamablehttp_client(url=self.server_param.url, + headers=self.server_param.headers, + timeout=self.server_param.timeout, + sse_read_timeout=self.server_param.sse_read_timeout, + terminate_on_close=self.server_param.terminate_on_close) + read, write, _ = await self._client.__aenter__() + elif self.mcp_type == McpType.SSE: + self._client = sse_client(url=self.server_param.url, + headers=self.server_param.headers, + timeout=self.server_param.timeout, + sse_read_timeout=self.server_param.sse_read_timeout,) + read, write = await self._client.__aenter__() self._session = ClientSession(read, write) await self._session.__aenter__() await self._session.initialize() @@ -75,7 +93,7 @@ async def initialize(self, force_refresh: bool = False): continue self._tools.append(create_langchain_tool(tool, self._session, self)) except Exception as e: - print(f"Error gathering tools for {self.server_param.command} {' '.join(self.server_param.args)}: {e}") + print(f"Error gathering tools for {self.server_param.model_dump()}") raise e async def close(self): @@ -167,6 +185,7 @@ async def convert_mcp_to_langchain_tools(server_config: McpServerConfig, force_r McpToolkit: A toolkit containing the converted LangChain tools. """ toolkit = McpToolkit( + mcp_type=server_config.mcp_type, name=server_config.server_name, server_param=server_config.server_param, exclude_tools=server_config.exclude_tools