diff --git a/pelix/rsa/providers/distribution/py4j.py b/pelix/rsa/providers/distribution/py4j.py index 9727670d..d48d7884 100644 --- a/pelix/rsa/providers/distribution/py4j.py +++ b/pelix/rsa/providers/distribution/py4j.py @@ -83,6 +83,8 @@ # ------------------------------------------------------------------------------ # Note: These must match the Java-side constants recored in Java interface # class: org.eclipse.ecf.provider.py4j.Py4jConstants +ECF_PY4J_DEFAULT_TIMEOUT = 15 # seconds + ECF_PY4J_CONTAINER_CONFIG_TYPE = "ecf.py4j" ECF_PY4J_NAMESPACE = "ecf.namespace.py4j" @@ -147,7 +149,7 @@ def _export_service(self, svc: Any, ed: EndpointDescription) -> None: # modify svc class to have appropriate metadata for py4j timeout = ed.get_osgi_basic_timeout() if not timeout: - timeout = 30 + timeout = ECF_PY4J_DEFAULT_TIMEOUT dp = cast(Py4jDistributionProvider, self._get_distribution_provider()) @@ -186,7 +188,7 @@ def _prepare_proxy(self, endpoint_description: EndpointDescription) -> Any: proxy = bridge.get_import_endpoint(endpoint_description.get_id())[0] timeout = endpoint_description.get_osgi_basic_timeout() if not timeout: - timeout = self._container_props.get(ECF_PY4J_DEFAULT_SERVICE_TIMEOUT, 30) + timeout = 30 args = [ bridge.get_jvm(), diff --git a/samples/remotetoolsserver/descriptions.py b/samples/remotetoolsserver/descriptions.py new file mode 100644 index 00000000..590d8bcf --- /dev/null +++ b/samples/remotetoolsserver/descriptions.py @@ -0,0 +1,49 @@ +from dataclasses import dataclass +from mcp.types import ToolAnnotations +from typing import List, Any + + +# Dataclasses to represent ToolDescription, ToolParamDescription, ToolResultDescription +@dataclass +class ToolParamDescription: + name: str = '' + description: str = '' + required: bool = True + + +@dataclass +class ToolResultDescription: + description: str = '' + + +@dataclass +class ToolDescription: + name: str = '' + description: str = '' + tool_param_descriptions: List[ToolParamDescription] = None + result_description: ToolResultDescription = None + tool_annotations: List[ToolAnnotations] = None + + +# convert a single Java ToolDescription object to a puthon ToolDescription +def convert_tool_description(tool_desc: Any) -> ToolDescription: + tool_param_descs = [ToolParamDescription(d.name(), d.description(), d.required()) for d in tool_desc.toolParamDescriptions()] + tool_result_desc = ToolResultDescription(tool_desc.resultDescription().description()) + d = tool_desc.toolAnnotationsDescription() + tool_annotations = ToolAnnotations(destructiveHint=d.destructiveHint(), idempotentHint=d.idempotentHint(), openWorldHint=d.openWorldHint(), readOnlyHint=d.readOnlyHint(), title=d.title()) if d else None + return ToolDescription(tool_desc.name(), tool_desc.description(), tool_param_descs, tool_result_desc, tool_annotations) + + +# convert a list of Java ToolDescription object to a puthon ToolDescription +def convert_tool_descriptions(tool_descs: List[Any]) -> List[ToolDescription]: + return [convert_tool_description(tool_desc) for tool_desc in tool_descs] + + +# Given a Java service instance and a service interface implemented by that service +# get and then convert all tool descriptions by calling service.getToolDescriptions +# which assumes that the service implements (or extends) the org.eclipse.ecf.ai.mcp.service.ToolGroupService +# interface class +def get_tool_descriptions_from_service(service: Any, service_interface: str): + java_descs = service.getToolDescriptions(service_interface) + return convert_tool_descriptions(java_descs) + diff --git a/samples/remotetoolsserver/requirements.txt b/samples/remotetoolsserver/requirements.txt new file mode 100644 index 00000000..b90030b4 --- /dev/null +++ b/samples/remotetoolsserver/requirements.txt @@ -0,0 +1,2 @@ +iPOPO >= 3.1.0 +mcp >= 1.9.2 diff --git a/samples/remotetoolsserver/run_framework.py b/samples/remotetoolsserver/run_framework.py new file mode 100644 index 00000000..e15e7d3e --- /dev/null +++ b/samples/remotetoolsserver/run_framework.py @@ -0,0 +1,41 @@ +import pelix.framework as pelix + +# For Pydev debugging +# import sys;sys.path.append(r'C:\eclipse-2025-03a\eclipse\plugins\org.python.pydev.core_13.0.2.202503021229\pysrc') +# import pydevd;pydevd.settrace() + +# These represent all the bundles/modules +bundles = ( + # ipopo framework core impl + 'pelix.ipopo.core', + # remote service admin (RSA) impl + 'pelix.rsa.remoteserviceadmin', + # RSA distribution provider using py4j. When the framework is started, + # this will try to connect to a Java process using the ecf py4j + # distribution provider. The expected ports set to the + # ecf.py4j.javaport and ecf.py4j.pythonport properties in the + # create_framework call below + 'pelix.rsa.providers.distribution.py4j', + # the RemoteToolsFastMCP server implemented as a pelix bundle + # See the declaration of the RemoteToolsFastMCPServer in + # samples/remotetoolserver.server.py.. + 'samples.remotetoolsserver.server' + ) + +# Use the utility method to create, run and delete the framework +framework = pelix.create_framework( + # NOTE: A Java Py4j distribution provider when started by the + # pelix framework will try to connect to java/osgi process assumed to be + # running at 25333 on localhost. If the java/osgi server is not listening + # the start of the pelix.rsa.providers.distribution.p4j provider + # will raise a connect exception. + bundles, {'ecf.py4j.javaport': 25333, 'ecf.py4j.pythonport': 25334, + # FastMCP server name property set here + 'remotetoolsfastmpcserver.name': 'Sample RemoteTools FastMCP Server' + } + +) +# everything actually started here +framework.start() +# wait for framework to stop itself here +framework.wait_for_stop() diff --git a/samples/remotetoolsserver/server.py b/samples/remotetoolsserver/server.py new file mode 100644 index 00000000..62f7af76 --- /dev/null +++ b/samples/remotetoolsserver/server.py @@ -0,0 +1,262 @@ +from typing import List, Any, Annotated, Callable + +from pelix.ipopo.decorators import ComponentFactory, Instantiate, Validate, Requires, Property, \ + Bind, Unbind +from pelix.framework import BundleContext + +from pelix.internals.registry import ServiceReference +from threading import Thread + +from mcp.server.fastmcp.tools.base import Tool +from mcp.server.fastmcp.utilities.func_metadata import FuncMetadata +from mcp.server.fastmcp.utilities.func_metadata import _get_typed_annotation, ArgModelBase +from pydantic import Field, WithJsonSchema, create_model +from pydantic.fields import FieldInfo +from pydantic_core import PydanticUndefined +from mcp.server.fastmcp.tools.tool_manager import ToolManager +from mcp.server.fastmcp.server import FastMCP +from .descriptions import ToolDescription, ToolParamDescription +from mcp.server.fastmcp.exceptions import InvalidSignature +from _thread import RLock +from mcp.types import ToolAnnotations +from samples.remotetoolsserver.descriptions import get_tool_descriptions_from_service +import logging + +logger = logging.getLogger(__name__) + + +# Subclass of mcp.server.fastmcp.tools.tool_manager.ToolManager +# This is necessary so that the ToolManager used by the RemoteToolsFastMCP +# class and have new methods: add_tool_from_description +class RemoteToolManager(ToolManager): + + _lock: RLock = RLock() + + def get_tool(self, name: str) -> Tool | None: + with self._lock: + return ToolManager.get_tool(self, name) + + def _params_metadata(self, param_descs: list[ToolParamDescription]) -> FuncMetadata: + dynamic_pydantic_model_params: dict[str, Any] = {} + + for param_desc in param_descs: + if param_desc.name.startswith("_"): + raise InvalidSignature( + f"Parameter {param_desc.name} cannot start with '_'" + ) + annotation = Annotated[ + Any, + Field(description=param_desc.description), + WithJsonSchema({"title": param_desc.name, "type": "string"}), + ] + field_info = FieldInfo.from_annotated_attribute( + _get_typed_annotation(annotation, {}), + PydanticUndefined, + ) + dynamic_pydantic_model_params[param_desc.name] = (field_info.annotation, field_info) + arguments_model = create_model( + f"{param_desc.name}Arguments", + **dynamic_pydantic_model_params, + __base__=ArgModelBase, + ) + return FuncMetadata(arg_model=arguments_model) + + def _from_tool_description(self, service_proxy, tool_description: ToolDescription) -> Tool: + func_name = tool_description.name + annotations = tool_description.tool_annotations + + args_metadata = self._params_metadata(tool_description.tool_param_descriptions) + parameters = args_metadata.arg_model.model_json_schema() + + def _call_tool(**kwargs): + args = kwargs.values() + return getattr(service_proxy, func_name)(*args) + + return Tool( + fn=_call_tool, + name=func_name, + description=tool_description.description, + parameters=parameters, + fn_metadata=args_metadata, + is_async=False, + context_kwarg=None, + annotations=annotations, + ) + + def add_tool_from_description(self, service_proxy, tool_description: ToolDescription) -> Tool: + tool = self._from_tool_description(service_proxy, tool_description) + with (self._lock): + existing = self._tools.get(tool.name) + if existing: + if self.warn_on_duplicate_tools: + logger.warning(f"Tool already exists: {tool.name}") + return existing + self._tools[tool.name] = tool + return tool + + def add_tool( + self, + fn:Callable[..., Any], + name:str | None=None, + description:str | None=None, + annotations:ToolAnnotations | None=None) -> Tool: + # this method overrides superclass as guards the super class + # add_tool with the _lock + with (self._lock): + return ToolManager.add_tool(self, fn, name=name, description=description, annotations=annotations) + + def list_tools(self) -> list[Tool]: + # this method overrides superclass as guards the super class + # list_tools with the _lock + logger.debug('list_tools=') + with (self._lock): + return ToolManager.list_tools(self) + + def remove_tool(self, name: str): + with (self._lock): + return self._tools.pop(name, None) + + +from mcp.server.auth.provider import OAuthAuthorizationServerProvider +from mcp.server.streamable_http import EventStore +from mcp.server.fastmcp.server import Settings, lifespan_wrapper +from mcp.server.lowlevel.server import Server as MCPServer +from mcp.server.lowlevel.server import lifespan as default_lifespan +from mcp.server.fastmcp.utilities.logging import configure_logging +from mcp.server.fastmcp.prompts import PromptManager +from mcp.server.fastmcp.resources import ResourceManager +from mcp.server.streamable_http_manager import StreamableHTTPSessionManager +from starlette.routing import Route + + +# Subclass of FastMCP server. Subclassing allows adding of add_tools_from_service +# and remove_tools_from_service, which are methods that dynamically adds and remove +# remote tools, implemented in Java as OSGi remote services +class RemoteToolFastMCP(FastMCP): + + _tool_descriptions: dict = {} + + def __init__( + self, + name: str | None=None, + instructions: str | None=None, + auth_server_provider: OAuthAuthorizationServerProvider[Any, Any, Any] + | None=None, + event_store: EventStore | None=None, + *, + tools: list[Tool] | None=None, + **settings: Any, + ): + self.settings = Settings(**settings) + + self._mcp_server = MCPServer( + name=name or "FastMCP", + instructions=instructions, + lifespan=( + lifespan_wrapper(self, self.settings.lifespan) + if self.settings.lifespan + else default_lifespan + ), + ) + self._tool_manager = RemoteToolManager( + tools=tools, warn_on_duplicate_tools=self.settings.warn_on_duplicate_tools + ) + self._resource_manager = ResourceManager( + warn_on_duplicate_resources=self.settings.warn_on_duplicate_resources + ) + self._prompt_manager = PromptManager( + warn_on_duplicate_prompts=self.settings.warn_on_duplicate_prompts + ) + if (self.settings.auth is not None) != (auth_server_provider is not None): + raise ValueError( + "settings.auth must be specified if and only if auth_server_provider " + "is specified" + ) + self._auth_server_provider = auth_server_provider + self._event_store = event_store + self._custom_starlette_routes: list[Route] = [] + self.dependencies = self.settings.dependencies + self._session_manager: StreamableHTTPSessionManager | None = None + + # Set up MCP protocol handlers + self._setup_handlers() + + # Configure logging + configure_logging(self.settings.log_level) + + def add_tools_from_service( + self, + service_proxy: Any, + service_interface: str + ) -> None: + ptool_descriptions = get_tool_descriptions_from_service(service_proxy, service_interface) + with (self._tool_manager._lock): + for desc in ptool_descriptions: + tool = self._tool_manager.add_tool_from_description(service_proxy, desc) + if tool: + self._tool_descriptions[desc.name] = (service_proxy, service_interface, desc) + + def remove_tools_from_service(self, service_proxy: Any, service_interface: str): + with (self._tool_manager._lock): + descs = [*self._tool_descriptions.values()] + for val in descs: + if (val[0] == service_proxy and val[1] == service_interface): + self._tool_descriptions.pop(val[2].name) + self._tool_manager.remove_tool(val[2].name) + + +# Declaration of the service interface classname. When an iPopo component is annotatied +# with the @Requires annotation as the RemoteToolsFastMCPServer is below, this means that +# when a service instance implementing that interface is registered in the pelix service +# registry that it will be added to the _tools_services by calling the method annotated +# with @Bind below. When the service is unregistered the method annotated with the @Unbind +# annotation will be called. This corresponds to the fully qualified name of the java +# interface located here: +# https://github.com/ECF/Py4j-RemoteServicesProvider/blob/master/examples/org.eclipse.ecf.examples.ai.mcp.toolservice.api/src/org/eclipse/ecf/examples/ai/mcp/toolservice/api/ArithmeticTools.java +ARITMETIC_TOOLS_SERVICE_INTERFACE = 'org.eclipse.ecf.examples.ai.mcp.toolservice.api.ArithmeticTools' + + +# Component that is created and started by framework. Once validated +# see _validate method, an instance of RemoteToolFastMCP (above) +# is created and run in new thread. When instances of ARITHMETIC_TOOLS_SERVICE_INTERFACE +# are imported via RSA, the _bind_tool_service method is called and +# any tool descriptions found are added to set of tools available to the +# RemoteToolManager class declared above +@ComponentFactory("remote-tools-fastmcp-server-factory") +@Instantiate("remote-tools-fastmcp-server") +@Requires( + "_tools_services", + ARITMETIC_TOOLS_SERVICE_INTERFACE, + True, + True, + None, + False, +) +@Property('_name', "remotetoolsfastmpcserver.name", 'RemoteToolsFastMCPServer') +@Property('_instructions', 'remotetoolsfastmpcserver.instructions', "RemoteToolsFastMCPServer") +class RemoteToolsFastMCPServer(): + _tools_services: List[Any] + _name: str | None = None, + _instructions: str | None = None, + + def __init__(self): + self._mcp = None + + @Validate + def _validate(self, bcontext: BundleContext) -> None: + # create a RemoteToolFastMCP instance + self._mcp = RemoteToolFastMCP(name=self._name, instructions=self._instructions) + # Run separate thread so that the mcp server anyio doesn't use same thread + # calling this method (a pelix thread) + t = Thread(target=self._mcp.run, name="RemoteTools Thread") + t.daemon = True + t.start() + + @Bind + def _bind_tool_service(self, service_proxy: Any, service_reference: ServiceReference): + self._mcp.add_tools_from_service(service_proxy, ARITMETIC_TOOLS_SERVICE_INTERFACE) + + @Unbind + def _unbind_tool_service(self, service_proxy: Any, service_reference: ServiceReference): + self._mcp.remove_tools_from_service(service_proxy, ARITMETIC_TOOLS_SERVICE_INTERFACE) +