From ac6653970d2686b5985a7435f62d8553ad1c711b Mon Sep 17 00:00:00 2001 From: Pablo Valverde <92272697+pavalso@users.noreply.github.com> Date: Sun, 21 Dec 2025 14:01:38 +0100 Subject: [PATCH 01/24] Add get_plugin function and update type hints for plugin management --- src/awioc/__init__.py | 2 ++ src/awioc/api.py | 2 ++ src/awioc/container.py | 17 ++++++++++++++++- src/awioc/di/__init__.py | 2 ++ src/awioc/di/providers.py | 23 ++++++++++++++++------- 5 files changed, 38 insertions(+), 8 deletions(-) diff --git a/src/awioc/__init__.py b/src/awioc/__init__.py index 1495958..e03ec02 100644 --- a/src/awioc/__init__.py +++ b/src/awioc/__init__.py @@ -25,6 +25,7 @@ get_raw_container, get_app, get_logger, + get_plugin, wire, inject, # Config @@ -75,6 +76,7 @@ "get_raw_container", "get_app", "get_logger", + "get_plugin", "wire", "inject", # Config diff --git a/src/awioc/api.py b/src/awioc/api.py index 3c52345..10f3d85 100644 --- a/src/awioc/api.py +++ b/src/awioc/api.py @@ -51,6 +51,7 @@ get_raw_container, get_app, get_logger, + get_plugin ) from .di.wiring import wire from .loader.module_loader import compile_component @@ -84,6 +85,7 @@ "get_raw_container", "get_app", "get_logger", + "get_plugin", "wire", "inject", # Config diff --git a/src/awioc/container.py b/src/awioc/container.py index 23c812a..0d15413 100644 --- a/src/awioc/container.py +++ b/src/awioc/container.py @@ -1,6 +1,6 @@ import logging from logging import Logger -from typing import TypeVar, Optional, overload +from typing import TypeVar, Optional, overload, Union import pydantic from dependency_injector import containers, providers @@ -17,6 +17,7 @@ from .config.models import IOCBaseConfig _Lib_type = TypeVar("_Lib_type") +_Plugin_type = TypeVar("_Plugin_type") _Model_type = TypeVar("_Model_type", bound=pydantic.BaseModel) logger = logging.getLogger(__name__) @@ -80,7 +81,17 @@ def raw_container(self) -> AppContainer: def provided_libs(self) -> set[LibraryComponent]: return set(lib() for lib in self._libs_map.values()) + @overload def provided_lib(self, type_: type[_Lib_type]) -> _Lib_type: + ... + + @overload + def provided_lib(self, type_: str) -> _Lib_type: + ... + + def provided_lib(self, type_: Union[type[_Lib_type], str]) -> _Lib_type: + if isinstance(type_, str): + return self._libs_map[type_]() return self._libs_map[type_.__qualname__]() @overload @@ -105,6 +116,10 @@ def provided_app(self) -> AppComponent: def provided_plugins(self) -> set[PluginComponent]: return set(plugin() for plugin in self._plugins_map.values()) + def provided_plugin(self, type_: str) -> Optional[_Plugin_type]: + provider = self._plugins_map.get(type_) + return provider() if provider is not None else None + def provided_logger(self) -> Logger: return self._container.logger() diff --git a/src/awioc/di/__init__.py b/src/awioc/di/__init__.py index 36f8ab1..415b956 100644 --- a/src/awioc/di/__init__.py +++ b/src/awioc/di/__init__.py @@ -5,6 +5,7 @@ get_raw_container, get_app, get_logger, + get_plugin ) from .wiring import wire, inject_dependencies @@ -15,6 +16,7 @@ "get_raw_container", "get_app", "get_logger", + "get_plugin", "wire", "inject_dependencies", ] diff --git a/src/awioc/di/providers.py b/src/awioc/di/providers.py index ab92bba..9f4dcea 100644 --- a/src/awioc/di/providers.py +++ b/src/awioc/di/providers.py @@ -1,24 +1,29 @@ import inspect from logging import Logger -from typing import TypeVar, Optional, Union, overload +from types import ModuleType +from typing import TypeVar, Optional, Union, overload, Any import pydantic from dependency_injector.wiring import Provide, provided -from ..components.protocols import AppComponent +from ..components.protocols import AppComponent, Component from ..container import AppContainer, ContainerInterface -_Lib_type = TypeVar("_Lib_type") -_Model_type = TypeVar("_Model_type", bound=pydantic.BaseModel) +_Component = Union[Component, ModuleType] +_AppComponent = Union[AppComponent, ModuleType] +_Component_type = Union[Component, Any] +_Lib_type = TypeVar("_Lib_type", bound=_Component) +_Plugin_type = TypeVar("_Plugin_type", bound=_Component) +_Model_type = TypeVar("_Model_type", bound=pydantic.BaseModel) @overload -def get_library(type_: type[_Lib_type]) -> _Lib_type: # pragma: no cover +def get_library(type_: str) -> _Component_type: # pragma: no cover ... @overload -def get_library(type_: str) -> _Lib_type: # pragma: no cover +def get_library(type_: type[_Lib_type]) -> _Lib_type: # pragma: no cover ... @@ -26,6 +31,10 @@ def get_library(type_: Union[type[_Lib_type], str]) -> _Lib_type: return Provide["api", provided().provided_lib.call(type_)] +def get_plugin(type_: str) -> Optional[_Component_type]: + return Provide["api", provided().provided_plugin.call(type_)] + + @overload def get_config(model: type[_Model_type]) -> _Model_type: # pragma: no cover ... @@ -50,7 +59,7 @@ def get_raw_container() -> AppContainer: return Provide["__self__", provided()] -def get_app() -> AppComponent: +def get_app() -> _AppComponent: return Provide["app", provided()] From 51d800e2fd05eefbcc31f013abcad7cf7a3f777a Mon Sep 17 00:00:00 2001 From: Pablo Valverde <92272697+pavalso@users.noreply.github.com> Date: Sun, 21 Dec 2025 14:19:12 +0100 Subject: [PATCH 02/24] Add management dashboard with HTML interface and API endpoints for component and plugin management --- samples/management_dashboard/ioc.yaml | 15 + .../management_dashboard.py | 335 ++++++++++++++++++ .../plugins/sample_plugin_a.py | 43 +++ .../plugins/sample_plugin_b.py | 57 +++ .../plugins/sample_plugin_c.py | 69 ++++ samples/management_dashboard/web/index.html | 153 ++++++++ 6 files changed, 672 insertions(+) create mode 100644 samples/management_dashboard/ioc.yaml create mode 100644 samples/management_dashboard/management_dashboard.py create mode 100644 samples/management_dashboard/plugins/sample_plugin_a.py create mode 100644 samples/management_dashboard/plugins/sample_plugin_b.py create mode 100644 samples/management_dashboard/plugins/sample_plugin_c.py create mode 100644 samples/management_dashboard/web/index.html diff --git a/samples/management_dashboard/ioc.yaml b/samples/management_dashboard/ioc.yaml new file mode 100644 index 0000000..b197bb4 --- /dev/null +++ b/samples/management_dashboard/ioc.yaml @@ -0,0 +1,15 @@ +# IOC Framework Configuration +# Management Dashboard Sample + +app: management_dashboard + +# Plugins to demonstrate enable/disable functionality +plugins: + - plugins/sample_plugin_a.py + - plugins/sample_plugin_b.py + - plugins/sample_plugin_c.py + +# Dashboard configuration +dashboard: + host: "127.0.0.1" + port: 8090 diff --git a/samples/management_dashboard/management_dashboard.py b/samples/management_dashboard/management_dashboard.py new file mode 100644 index 0000000..992c4b1 --- /dev/null +++ b/samples/management_dashboard/management_dashboard.py @@ -0,0 +1,335 @@ +""" +Management Dashboard App Component + +A web server that exposes endpoints for: +- Listing all activated components +- Enabling/disabling plugins +- Showing overall application state +""" + +import asyncio +import json +from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler +from pathlib import Path +from threading import Thread +from typing import Optional +from urllib.parse import urlparse + +# Path to the web assets directory +WEB_DIR = Path(__file__).parent / "web" + +import pydantic + +from awioc import ( + get_config, + get_logger, + get_container_api, + inject, + ContainerInterface, + component_internals, + initialize_components, + shutdown_components, +) + + +class DashboardConfig(pydantic.BaseModel): + """Dashboard Server configuration.""" + __prefix__ = "dashboard" + + host: str = "127.0.0.1" + port: int = 8090 + + +__metadata__ = { + "name": "management_dashboard_app", + "version": "1.0.0", + "description": "Management Dashboard for IOC Components", + "wire": True, + "config": DashboardConfig +} + + +class DashboardRequestHandler(BaseHTTPRequestHandler): + """HTTP request handler for the management dashboard.""" + + container: Optional[ContainerInterface] = None + + @inject + def _get_dependencies( + self, + logger=get_logger(), + container=get_container_api() + ): + return logger, container + + def _send_json_response(self, data: dict, status: int = 200): + """Send a JSON response.""" + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + self.wfile.write(json.dumps(data, indent=2).encode()) + + def _send_html_response(self, html: str, status: int = 200): + """Send an HTML response.""" + self.send_response(status) + self.send_header("Content-Type", "text/html") + self.end_headers() + self.wfile.write(html.encode()) + + def _get_component_info(self, component) -> dict: + """Get information about a component.""" + internals = component_internals(component) + metadata = component.__metadata__ + return { + "name": metadata.get("name", "unknown"), + "version": metadata.get("version", "unknown"), + "description": metadata.get("description", ""), + "type": internals.type.value, + "state": { + "is_initialized": internals.is_initialized, + "is_initializing": internals.is_initializing, + "is_shutting_down": internals.is_shutting_down, + }, + "required_by": [ + req.__metadata__.get("name", "unknown") + for req in internals.required_by + ] + } + + def do_GET(self): + """Handle GET requests.""" + logger, container = self._get_dependencies() + parsed_path = urlparse(self.path) + path = parsed_path.path + + logger.info(f"GET {self.path} FROM {self.client_address[0]}:{self.client_address[1]}") + + if path == "/": + self._serve_dashboard_html() + elif path == "/api/components": + self._handle_list_components(container) + elif path == "/api/state": + self._handle_app_state(container) + elif path == "/api/plugins": + self._handle_list_plugins(container) + else: + self._send_json_response({"error": "Not Found"}, 404) + + def do_POST(self): + """Handle POST requests.""" + logger, container = self._get_dependencies() + parsed_path = urlparse(self.path) + path = parsed_path.path + + logger.info(f"POST {self.path} FROM {self.client_address[0]}:{self.client_address[1]}") + + if path == "/api/plugins/enable": + self._handle_enable_plugin(container) + elif path == "/api/plugins/disable": + self._handle_disable_plugin(container) + else: + self._send_json_response({"error": "Not Found"}, 404) + + def do_OPTIONS(self): + """Handle CORS preflight requests.""" + self.send_response(200) + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") + self.send_header("Access-Control-Allow-Headers", "Content-Type") + self.end_headers() + + def _serve_dashboard_html(self): + """Serve the dashboard HTML page from web/index.html.""" + index_path = WEB_DIR / "index.html" + try: + html = index_path.read_text(encoding="utf-8") + self._send_html_response(html) + except FileNotFoundError: + self._send_json_response({"error": "Dashboard not found"}, 404) + + def _handle_list_components(self, container: ContainerInterface): + """List all registered components.""" + components = container.components + components_info = [self._get_component_info(c) for c in components] + self._send_json_response({"components": components_info}) + + def _handle_app_state(self, container: ContainerInterface): + """Get overall application state.""" + components = container.components + app = container.provided_app() + plugins = container.provided_plugins() + libs = container.provided_libs() + + initialized_count = sum( + 1 for c in components + if component_internals(c).is_initialized + ) + + state = { + "app_name": app.__metadata__.get("name", "unknown"), + "app_version": app.__metadata__.get("version", "unknown"), + "total_components": len(components), + "initialized_components": initialized_count, + "plugins_count": len(plugins), + "libraries_count": len(libs), + "plugins": [p.__metadata__.get("name") for p in plugins], + "libraries": [lib.__metadata__.get("name") for lib in libs], + } + self._send_json_response(state) + + def _handle_list_plugins(self, container: ContainerInterface): + """List all registered plugins.""" + plugins = container.provided_plugins() + plugins_info = [self._get_component_info(p) for p in plugins] + self._send_json_response({"plugins": plugins_info}) + + def _handle_enable_plugin(self, container: ContainerInterface): + """Enable (initialize) a plugin.""" + content_length = int(self.headers.get('Content-Length', 0)) + body = self.rfile.read(content_length).decode() + + try: + data = json.loads(body) + plugin_name = data.get("name") + except json.JSONDecodeError: + self._send_json_response({"error": "Invalid JSON"}, 400) + return + + if not plugin_name: + self._send_json_response({"error": "Plugin name required"}, 400) + return + + plugin = container.provided_plugin(plugin_name) + if plugin is None: + self._send_json_response({"error": f"Plugin '{plugin_name}' not found"}, 404) + return + + internals = component_internals(plugin) + if internals.is_initialized: + self._send_json_response({"message": f"Plugin '{plugin_name}' is already enabled"}) + return + + # Run initialization in event loop + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(initialize_components(plugin)) + self._send_json_response({"message": f"Plugin '{plugin_name}' enabled successfully"}) + except Exception as e: + self._send_json_response({"error": str(e)}, 500) + finally: + loop.close() + + def _handle_disable_plugin(self, container: ContainerInterface): + """Disable (shutdown) a plugin.""" + content_length = int(self.headers.get('Content-Length', 0)) + body = self.rfile.read(content_length).decode() + + try: + data = json.loads(body) + plugin_name = data.get("name") + except json.JSONDecodeError: + self._send_json_response({"error": "Invalid JSON"}, 400) + return + + if not plugin_name: + self._send_json_response({"error": "Plugin name required"}, 400) + return + + plugin = container.provided_plugin(plugin_name) + if plugin is None: + self._send_json_response({"error": f"Plugin '{plugin_name}' not found"}, 404) + return + + internals = component_internals(plugin) + if not internals.is_initialized: + self._send_json_response({"message": f"Plugin '{plugin_name}' is already disabled"}) + return + + if internals.required_by: + required_names = [r.__metadata__.get("name") for r in internals.required_by] + self._send_json_response({ + "error": f"Cannot disable plugin '{plugin_name}': required by {required_names}" + }, 400) + return + + # Run shutdown in event loop + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(shutdown_components(plugin)) + self._send_json_response({"message": f"Plugin '{plugin_name}' disabled successfully"}) + except Exception as e: + self._send_json_response({"error": str(e)}, 500) + finally: + loop.close() + + def log_message(self, format, *args): + """Suppress default logging.""" + pass + + +class ManagementDashboardApp: + """ + Management Dashboard App Component. + + Provides a web interface for monitoring and managing IOC components. + """ + + def __init__(self): + self._server: Optional[ThreadingHTTPServer] = None + self._thread: Optional[Thread] = None + self._running = False + self._shutdown_event: Optional[asyncio.Event] = None + + @inject + async def initialize( + self, + logger=get_logger(), + config=get_config(DashboardConfig), + container=get_container_api() + ) -> None: + """Start the management dashboard server.""" + self._shutdown_event = asyncio.Event() + + # Store container reference in handler class + DashboardRequestHandler.container = container + + logger.info(f"Starting Management Dashboard on {config.host}:{config.port}") + + self._server = ThreadingHTTPServer( + (config.host, config.port), + DashboardRequestHandler + ) + self._running = True + + self._thread = Thread(target=self._server.serve_forever, daemon=True) + self._thread.start() + + logger.info(f"Management Dashboard running at http://{config.host}:{config.port}") + + async def wait(self) -> None: + """Wait until shutdown is requested.""" + if self._shutdown_event: + await self._shutdown_event.wait() + + async def shutdown(self) -> None: + """Stop the management dashboard server.""" + self._running = False + + if self._shutdown_event: + self._shutdown_event.set() + + if self._server: + self._server.shutdown() + self._server.server_close() + self._server = None + + if self._thread: + self._thread.join(timeout=2) + self._thread = None + + +management_dashboard_app = ManagementDashboardApp() +initialize = management_dashboard_app.initialize +shutdown = management_dashboard_app.shutdown +wait = management_dashboard_app.wait diff --git a/samples/management_dashboard/plugins/sample_plugin_a.py b/samples/management_dashboard/plugins/sample_plugin_a.py new file mode 100644 index 0000000..3fc3887 --- /dev/null +++ b/samples/management_dashboard/plugins/sample_plugin_a.py @@ -0,0 +1,43 @@ +""" +Sample Plugin A - Database Connection Simulator + +A sample plugin that simulates a database connection for testing +the management dashboard's enable/disable functionality. +""" + +from awioc import get_logger, inject + +__metadata__ = { + "name": "database_plugin", + "version": "1.0.0", + "description": "Simulates a database connection service", + "wire": True, +} + +_connected = False + + +@inject +async def initialize(logger=get_logger()): + global _connected + logger.info("Database plugin: Establishing connection...") + _connected = True + logger.info("Database plugin: Connection established") + + +@inject +async def shutdown(logger=get_logger()): + global _connected + logger.info("Database plugin: Closing connection...") + _connected = False + logger.info("Database plugin: Connection closed") + + +def is_connected() -> bool: + return _connected + + +async def query(sql: str) -> dict: + if not _connected: + raise RuntimeError("Database not connected") + return {"result": f"Executed: {sql}"} diff --git a/samples/management_dashboard/plugins/sample_plugin_b.py b/samples/management_dashboard/plugins/sample_plugin_b.py new file mode 100644 index 0000000..3534d13 --- /dev/null +++ b/samples/management_dashboard/plugins/sample_plugin_b.py @@ -0,0 +1,57 @@ +""" +Sample Plugin B - Cache Service Simulator + +A sample plugin that simulates a cache service for testing +the management dashboard's enable/disable functionality. +""" + +from awioc import get_logger, inject + +__metadata__ = { + "name": "cache_plugin", + "version": "1.2.0", + "description": "Simulates an in-memory cache service", + "wire": True, +} + +_cache: dict = {} +_enabled = False + + +@inject +async def initialize(logger=get_logger()): + global _enabled, _cache + logger.info("Cache plugin: Initializing cache...") + _cache = {} + _enabled = True + logger.info("Cache plugin: Cache ready") + + +@inject +async def shutdown(logger=get_logger()): + global _enabled, _cache + logger.info("Cache plugin: Clearing cache...") + _cache = {} + _enabled = False + logger.info("Cache plugin: Cache cleared") + + +def is_enabled() -> bool: + return _enabled + + +def get(key: str): + if not _enabled: + raise RuntimeError("Cache not enabled") + return _cache.get(key) + + +def set(key: str, value): + if not _enabled: + raise RuntimeError("Cache not enabled") + _cache[key] = value + + +def clear(): + global _cache + _cache = {} diff --git a/samples/management_dashboard/plugins/sample_plugin_c.py b/samples/management_dashboard/plugins/sample_plugin_c.py new file mode 100644 index 0000000..c9f9969 --- /dev/null +++ b/samples/management_dashboard/plugins/sample_plugin_c.py @@ -0,0 +1,69 @@ +""" +Sample Plugin C - Metrics Collector Simulator + +A sample plugin that simulates a metrics collection service for testing +the management dashboard's enable/disable functionality. +""" + +import time + +from awioc import get_logger, inject + +__metadata__ = { + "name": "metrics_plugin", + "version": "0.5.0", + "description": "Simulates a metrics collection service", + "wire": True, +} + +_metrics: dict = {} +_collecting = False +_start_time = None + + +@inject +async def initialize(logger=get_logger()): + global _collecting, _start_time, _metrics + logger.info("Metrics plugin: Starting collector...") + _metrics = { + "requests_total": 0, + "errors_total": 0, + "latency_sum": 0.0, + } + _start_time = time.time() + _collecting = True + logger.info("Metrics plugin: Collector started") + + +@inject +async def shutdown(logger=get_logger()): + global _collecting, _start_time + logger.info("Metrics plugin: Stopping collector...") + _collecting = False + _start_time = None + logger.info("Metrics plugin: Collector stopped") + + +def is_collecting() -> bool: + return _collecting + + +def record_request(latency: float = 0.0, error: bool = False): + if not _collecting: + return + _metrics["requests_total"] += 1 + _metrics["latency_sum"] += latency + if error: + _metrics["errors_total"] += 1 + + +def get_metrics() -> dict: + if not _collecting: + return {} + uptime = time.time() - _start_time if _start_time else 0 + return { + **_metrics, + "uptime_seconds": uptime, + "avg_latency": (_metrics["latency_sum"] / _metrics["requests_total"]) + if _metrics["requests_total"] > 0 else 0, + } diff --git a/samples/management_dashboard/web/index.html b/samples/management_dashboard/web/index.html new file mode 100644 index 0000000..144d846 --- /dev/null +++ b/samples/management_dashboard/web/index.html @@ -0,0 +1,153 @@ + + + + IOC Management Dashboard + + + +
+

IOC Management Dashboard

+ + +

Application State

+
+ +

Components

+
+ +

Plugins

+
+
+ + + + From 43ce2d75274abdf69e83b6324644ebc6aef6aedb Mon Sep 17 00:00:00 2001 From: Pablo Valverde <92272697+pavalso@users.noreply.github.com> Date: Sun, 21 Dec 2025 21:03:47 +0100 Subject: [PATCH 03/24] Enhance component management by adding component_initialized function and improving plugin registration logic --- src/awioc/components/lifecycle.py | 23 +++++++++---------- src/awioc/components/registry.py | 15 ++++++++---- src/awioc/config/registry.py | 22 ++++++++++++++---- src/awioc/container.py | 38 +++++++++++++++++++++---------- src/awioc/di/providers.py | 4 ++-- src/awioc/di/wiring.py | 11 +++++++-- tests/awioc/test_integration.py | 33 ++++----------------------- 7 files changed, 81 insertions(+), 65 deletions(-) diff --git a/src/awioc/components/lifecycle.py b/src/awioc/components/lifecycle.py index 602f220..52004f5 100644 --- a/src/awioc/components/lifecycle.py +++ b/src/awioc/components/lifecycle.py @@ -1,11 +1,10 @@ import asyncio import inspect import logging - from typing import TYPE_CHECKING from .protocols import Component, PluginComponent -from .registry import component_requires, component_internals, component_str +from .registry import component_requires, component_internals, component_str, component_initialized if TYPE_CHECKING: from ..container import ContainerInterface @@ -138,14 +137,15 @@ async def __shutdown(comp: Component): _exceptions = [_exc for _exc in _ret if isinstance(_exc, Exception)] - if _exceptions: - if not return_exceptions: # pragma: no cover - raise ExceptionGroup( - "One or more errors occurred during component shutdown.", - _exceptions - ) + if return_exceptions: return _exceptions + if _exceptions: + raise ExceptionGroup( + "One or more errors occurred during component shutdown.", + _exceptions + ) + return components @@ -177,9 +177,6 @@ async def register_plugin( caller_frame.filename, caller_frame.lineno) - from ..di.wiring import wire - wire(api_container, components=(plugin,)) - return plugin @@ -244,7 +241,9 @@ async def unregister_plugin( caller_frame.lineno) return - if component_internals(plugin).required_by: + if any(component_initialized(requirer) + for requirer + in component_internals(plugin).required_by): raise RuntimeError( f"Cannot unregister plugin {component_str(plugin)}; " "it is still required by other components" diff --git a/src/awioc/components/registry.py b/src/awioc/components/registry.py index 4722487..23be948 100644 --- a/src/awioc/components/registry.py +++ b/src/awioc/components/registry.py @@ -68,12 +68,19 @@ def component_internals(component: Component) -> Internals: return component.__metadata__["_internals"] -def component_str(comp: Component) -> str: +def component_str(component: Component) -> str: """ Get a string representation of a component. - :param comp: The component. - :return: String in format "name vversion". + :param component: The component. + :return: String in format "name version". """ - meta = comp.__metadata__ + meta = component.__metadata__ return f"{meta['name']} v{meta['version']}" + + +def component_initialized(component: Component) -> bool: + assert hasattr(component, "__metadata__") + if "_internals" not in component.__metadata__ or component.__metadata__["_internals"] is None: + return False + return component.__metadata__["_internals"].is_initialized diff --git a/src/awioc/config/registry.py b/src/awioc/config/registry.py index b1466d7..643db6b 100644 --- a/src/awioc/config/registry.py +++ b/src/awioc/config/registry.py @@ -1,6 +1,6 @@ import inspect import logging -from typing import TypeVar, Optional +from typing import TypeVar, Optional, Iterable import pydantic @@ -46,7 +46,19 @@ def __wrapper__(model: _M_type): return __wrapper__ if _ is None else __wrapper__(_) -def clear_configurations(): - """Clear all registered configurations.""" - logger.debug("Clearing all registered configurations") - _CONFIGURATIONS.clear() +def clear_configurations( + prefixes: Optional[Iterable[str]] = None +): + """ + Clear all registered configurations. + + :param prefixes: Specific prefixes to clear. If None, clears all configurations. + """ + if prefixes is None: + _CONFIGURATIONS.clear() + logger.debug("Cleared all registered configurations") + else: + for prefix in prefixes: + if prefix in _CONFIGURATIONS: + del _CONFIGURATIONS[prefix] + logger.debug("Cleared configuration with prefix '%s'", prefix) diff --git a/src/awioc/container.py b/src/awioc/container.py index 0d15413..48617d2 100644 --- a/src/awioc/container.py +++ b/src/awioc/container.py @@ -12,7 +12,11 @@ PluginComponent, LibraryComponent, ) -from .components.registry import component_requires, component_internals +from .components.registry import ( + component_requires, + component_internals, + component_initialized +) from .config.base import Settings from .config.models import IOCBaseConfig @@ -116,8 +120,19 @@ def provided_app(self) -> AppComponent: def provided_plugins(self) -> set[PluginComponent]: return set(plugin() for plugin in self._plugins_map.values()) + @overload + def provided_plugin(self, type_: type[_Plugin_type]) -> Optional[_Plugin_type]: + ... + + @overload def provided_plugin(self, type_: str) -> Optional[_Plugin_type]: - provider = self._plugins_map.get(type_) + ... + + def provided_plugin(self, type_: Union[_Plugin_type, str]) -> Optional[_Plugin_type]: + if isinstance(type_, str): + provider = self._plugins_map.get(type_) + else: + provider = self._plugins_map.get(type_.__metadata__["name"]) return provider() if provider is not None else None def provided_logger(self) -> Logger: @@ -126,33 +141,32 @@ def provided_logger(self) -> Logger: @classmethod def __init_component(cls, component: Component) -> Internals: assert hasattr(component, "__metadata__") - assert "_internals" not in component.__metadata__ + assert not component_initialized(component) _internals = Internals() component.__metadata__["_internals"] = _internals for req in component_requires(component): - if not cls.__component_initialized(req): + if not component_initialized(req): cls.__init_component(req) req.__metadata__["_internals"].required_by.add(component) return _internals - @staticmethod - def __deinit_component(component: Component): + @classmethod + def __deinit_component(cls, component: Component): assert hasattr(component, "__metadata__") - assert "_internals" in component.__metadata__ + + if "_internals" not in component.__metadata__ or component.__metadata__["_internals"] is None: + return for req in component_requires(component): + if not component_initialized(req): + continue req.__metadata__["_internals"].required_by.discard(component) component.__metadata__["_internals"] = None - @staticmethod - def __component_initialized(component: Component) -> bool: - assert hasattr(component, "__metadata__") - return "_internals" in component.__metadata__ - def register_libraries( self, *libs: tuple[str | type, LibraryComponent] diff --git a/src/awioc/di/providers.py b/src/awioc/di/providers.py index 9f4dcea..af9477e 100644 --- a/src/awioc/di/providers.py +++ b/src/awioc/di/providers.py @@ -9,8 +9,8 @@ from ..components.protocols import AppComponent, Component from ..container import AppContainer, ContainerInterface -_Component = Union[Component, ModuleType] -_AppComponent = Union[AppComponent, ModuleType] +_Component = Union[Component, ModuleType, Any] +_AppComponent = Union[AppComponent, ModuleType, Any] _Component_type = Union[Component, Any] _Lib_type = TypeVar("_Lib_type", bound=_Component) diff --git a/src/awioc/di/wiring.py b/src/awioc/di/wiring.py index cca15ea..928ec76 100644 --- a/src/awioc/di/wiring.py +++ b/src/awioc/di/wiring.py @@ -3,7 +3,7 @@ from typing import Optional, Iterable from ..components.protocols import Component -from ..config.registry import register_configuration +from ..config.registry import register_configuration, clear_configurations from ..container import ContainerInterface logger = logging.getLogger(__name__) @@ -24,6 +24,8 @@ def inject_dependencies( components = container.components def __register_components(iterable: Iterable[Component]) -> None: + new_configs = {} + for item in iterable: configs = item.__metadata__.get("config", set()) @@ -37,7 +39,12 @@ def __register_components(iterable: Iterable[Component]) -> None: prefix = item.__metadata__['name'] logger.debug("Registering configuration for component '%s' with prefix '%s'", item.__metadata__.get('name', 'unknown'), prefix) - register_configuration(config, prefix=prefix) + new_configs[prefix] = config + + clear_configurations(prefixes=new_configs.keys()) + + for prefix, config in new_configs.items(): + register_configuration(config, prefix=prefix) __register_components(components) logger.debug("Dependency injection complete") diff --git a/tests/awioc/test_integration.py b/tests/awioc/test_integration.py index a6b4d41..d011b58 100644 --- a/tests/awioc/test_integration.py +++ b/tests/awioc/test_integration.py @@ -13,6 +13,7 @@ import pydantic import pytest + from src.awioc.bootstrap import create_container, reconfigure_ioc_app from src.awioc.components.lifecycle import ( initialize_components, @@ -402,38 +403,14 @@ async def test_unregister_plugin_when_required_raises(self, container_with_app): base_internals = component_internals(base_plugin) base_internals.required_by.add(dependent_plugin) + # Mock internals set initialized to True + component_internals(base_plugin).is_initialized = True + component_internals(dependent_plugin).is_initialized = True + # Try to unregister base_plugin while it's still required with pytest.raises(RuntimeError, match="still required"): await unregister_plugin(interface, base_plugin) - async def test_register_plugin_with_wiring(self, container_with_app): - """Test register_plugin function wires the plugin.""" - interface = container_with_app - - plugin = type("WiredPlugin", (), { - "__name__": "wired_plugin", - "__module__": "test", - "__package__": None, - "__metadata__": { - "name": "wired_plugin", - "version": "1.0.0", - "requires": set(), - "wire": True, - "wirings": set(), - }, - "initialize": AsyncMock(return_value=True), - "shutdown": AsyncMock() - })() - - # Register plugin using register_plugin function - result = await register_plugin(interface, plugin) - - assert result is plugin - assert plugin in interface.provided_plugins() - assert "_internals" in plugin.__metadata__ - # wire should have been called - interface.raw_container().wire.assert_called() - async def test_register_plugin_already_registered_returns_existing(self, container_with_app): """Test registering an already registered plugin returns it without re-registering.""" interface = container_with_app From 4fae3b7a45d4f20fbf9acd4cf2867b80c9a013d7 Mon Sep 17 00:00:00 2001 From: Pablo Valverde <92272697+pavalso@users.noreply.github.com> Date: Mon, 22 Dec 2025 02:02:15 +0100 Subject: [PATCH 04/24] Add WebSocket support for real-time updates and log streaming in management dashboard --- samples/management_dashboard/ioc.yaml | 12 +- .../management_dashboard.py | 1002 +++++++- samples/management_dashboard/web/index.html | 2229 ++++++++++++++++- 3 files changed, 3127 insertions(+), 116 deletions(-) diff --git a/samples/management_dashboard/ioc.yaml b/samples/management_dashboard/ioc.yaml index b197bb4..61d3164 100644 --- a/samples/management_dashboard/ioc.yaml +++ b/samples/management_dashboard/ioc.yaml @@ -1,15 +1,11 @@ -# IOC Framework Configuration -# Management Dashboard Sample - app: management_dashboard - -# Plugins to demonstrate enable/disable functionality plugins: - plugins/sample_plugin_a.py - plugins/sample_plugin_b.py - plugins/sample_plugin_c.py - -# Dashboard configuration dashboard: - host: "127.0.0.1" + host: 127.0.0.1 port: 8090 + ws_port: 8091 + monitor_interval: 0.25 + log_buffer_size: 500 diff --git a/samples/management_dashboard/management_dashboard.py b/samples/management_dashboard/management_dashboard.py index 992c4b1..b05c687 100644 --- a/samples/management_dashboard/management_dashboard.py +++ b/samples/management_dashboard/management_dashboard.py @@ -5,20 +5,28 @@ - Listing all activated components - Enabling/disabling plugins - Showing overall application state +- Real-time updates via WebSocket with component state monitoring +- Real-time log streaming with filtering """ import asyncio import json +import logging +import tempfile +import time +from collections import deque +from dataclasses import dataclass from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler from pathlib import Path -from threading import Thread -from typing import Optional +from threading import Thread, Lock +from typing import Any, Optional, Set, Dict, List, Deque from urllib.parse import urlparse -# Path to the web assets directory -WEB_DIR = Path(__file__).parent / "web" - import pydantic +import websockets +import yaml +from pydantic_core import PydanticUndefined +from websockets.server import WebSocketServerProtocol from awioc import ( get_config, @@ -29,7 +37,12 @@ component_internals, initialize_components, shutdown_components, + register_plugin, reconfigure_ioc_app, ) +from awioc.loader.module_loader import compile_component + +# Path to the web assets directory +WEB_DIR = Path(__file__).parent / "web" class DashboardConfig(pydantic.BaseModel): @@ -38,17 +51,895 @@ class DashboardConfig(pydantic.BaseModel): host: str = "127.0.0.1" port: int = 8090 + ws_port: int = 8091 + monitor_interval: float = 0.25 # State check interval in seconds + log_buffer_size: int = 500 # Maximum number of log entries to keep __metadata__ = { - "name": "management_dashboard_app", - "version": "1.0.0", - "description": "Management Dashboard for IOC Components", + "name": "Management Dashboard", + "version": "1.3.0", + "description": "Management Dashboard for IOC Components with real-time state monitoring and log streaming", "wire": True, "config": DashboardConfig } +@dataclass +class ComponentState: + """Snapshot of a component's state.""" + is_initialized: bool + is_initializing: bool + is_shutting_down: bool + + def __eq__(self, other): + if not isinstance(other, ComponentState): + return False + return ( + self.is_initialized == other.is_initialized and + self.is_initializing == other.is_initializing and + self.is_shutting_down == other.is_shutting_down + ) + + def get_status_label(self) -> str: + """Get a human-readable status label.""" + if self.is_shutting_down: + return "shutting_down" + elif self.is_initializing: + return "initializing" + elif self.is_initialized: + return "active" + else: + return "inactive" + + +@dataclass +class LogEntry: + """A single log entry.""" + id: int + timestamp: float + level: str + logger_name: str + message: str + source: str = "unknown" # app, plugin, library, or framework + component: str = "unknown" # component name + + def to_dict(self) -> dict: + return { + "id": self.id, + "timestamp": self.timestamp, + "level": self.level, + "logger_name": self.logger_name, + "message": self.message, + "source": self.source, + "component": self.component, + } + + +class LogBuffer: + """Thread-safe circular buffer for log entries.""" + + def __init__(self, max_size: int = 500): + self._buffer: Deque[LogEntry] = deque(maxlen=max_size) + self._lock = Lock() + self._id_counter = 0 + # module_name -> (display_name, type) + self._component_info: Dict[str, tuple] = {} + + def set_component_info(self, component_info: Dict[str, tuple]): + """Set the mapping of module names to (display_name, type).""" + with self._lock: + self._component_info = component_info.copy() + + def add(self, level: str, logger_name: str, message: str) -> LogEntry: + """Add a log entry and return it.""" + with self._lock: + self._id_counter += 1 + + # Determine source and component from logger name + source, component = self._parse_logger_name(logger_name) + + entry = LogEntry( + id=self._id_counter, + timestamp=time.time(), + level=level, + logger_name=logger_name, + message=message, + source=source, + component=component, + ) + self._buffer.append(entry) + return entry + + def _parse_logger_name(self, logger_name: str) -> tuple: + """Parse logger name to determine source and component.""" + logger_lower = logger_name.lower() + + # Check for framework logs first + if "awioc" in logger_lower: + return "framework", "awioc" + + # Try to match logger name against registered module names + # Logger names are like "awioc.samples.management_dashboard.management_dashboard" + for module_name, (display_name, comp_type) in self._component_info.items(): + module_lower = module_name.lower() + # Check if module name is contained in logger name + if module_lower in logger_lower or logger_lower.endswith(module_lower): + return comp_type, display_name + # Also check last part of module name (e.g., "management_dashboard") + module_last = module_lower.rsplit('.', 1)[-1] + if module_last in logger_lower: + return comp_type, display_name + + # Default - extract component name from logger path + parts = logger_name.split(".") + return "unknown", parts[-1] if parts else logger_name + + def get_all(self) -> List[dict]: + """Get all log entries as dicts.""" + with self._lock: + return [entry.to_dict() for entry in self._buffer] + + def get_since(self, last_id: int) -> List[dict]: + """Get log entries since a given ID.""" + with self._lock: + return [entry.to_dict() for entry in self._buffer if entry.id > last_id] + + def clear(self): + """Clear all log entries.""" + with self._lock: + self._buffer.clear() + + +class DashboardLogHandler(logging.Handler): + """Custom logging handler that captures logs for the dashboard.""" + + def __init__(self, log_buffer: LogBuffer, broadcast_callback=None): + super().__init__() + self._log_buffer = log_buffer + self._broadcast_callback = broadcast_callback + self.setFormatter(logging.Formatter("%(message)s")) + + def emit(self, record: logging.LogRecord): + try: + message = self.format(record) + entry = self._log_buffer.add( + level=record.levelname, + logger_name=record.name, + message=message, + ) + + # Trigger broadcast if callback is set + if self._broadcast_callback: + self._broadcast_callback(entry) + except Exception: + self.handleError(record) + + +# Global log buffer instance +log_buffer = LogBuffer() + + +class WebSocketManager: + """Manages WebSocket connections and broadcasts.""" + + def __init__(self): + self._clients: Set[WebSocketServerProtocol] = set() + self._container: Optional[ContainerInterface] = None + self._lock = asyncio.Lock() + self._previous_states: Dict[str, ComponentState] = {} + self._monitoring = False + self._main_loop: Optional[asyncio.AbstractEventLoop] = None + self._ws_loop: Optional[asyncio.AbstractEventLoop] = None + self._log_buffer: Optional[LogBuffer] = None + + def set_container(self, container: ContainerInterface): + self._container = container + + def set_main_loop(self, loop: asyncio.AbstractEventLoop): + """Set the main event loop for scheduling lifecycle operations.""" + self._main_loop = loop + + def set_ws_loop(self, loop: asyncio.AbstractEventLoop): + """Set the WebSocket event loop for log broadcasting.""" + self._ws_loop = loop + + def set_log_buffer(self, buffer: LogBuffer): + """Set the log buffer reference.""" + self._log_buffer = buffer + + def on_new_log(self, entry: LogEntry): + """Callback when a new log entry is added. Schedules broadcast in WS loop.""" + if self._ws_loop and self._clients: + asyncio.run_coroutine_threadsafe( + self.broadcast_log(entry), + self._ws_loop + ) + + async def broadcast_log(self, entry: LogEntry): + """Broadcast a single log entry to all connected clients.""" + message = { + "type": "log", + "entry": entry.to_dict(), + } + await self.broadcast(message) + + async def register(self, websocket: WebSocketServerProtocol): + async with self._lock: + self._clients.add(websocket) + + async def unregister(self, websocket: WebSocketServerProtocol): + async with self._lock: + self._clients.discard(websocket) + + @property + def has_clients(self) -> bool: + return len(self._clients) > 0 + + def _get_component_state(self, component) -> ComponentState: + """Get the current state of a component.""" + internals = component_internals(component) + return ComponentState( + is_initialized=internals.is_initialized, + is_initializing=internals.is_initializing, + is_shutting_down=internals.is_shutting_down, + ) + + def _get_component_info(self, component) -> dict: + """Get information about a component.""" + internals = component_internals(component) + metadata = component.__metadata__ + state = self._get_component_state(component) + + # Get configuration info + config_info = self._get_component_config_info(component) + + return { + "name": metadata.get("name", "unknown"), + "version": metadata.get("version", "unknown"), + "description": metadata.get("description", ""), + "type": internals.type.value, + "state": { + "is_initialized": internals.is_initialized, + "is_initializing": internals.is_initializing, + "is_shutting_down": internals.is_shutting_down, + }, + "status": state.get_status_label(), + "required_by": [ + req.__metadata__.get("name", "unknown") + for req in internals.required_by + ], + "config": config_info + } + + def _normalize_pydantic_schema(self, model: type[pydantic.BaseModel]) -> dict: + # 1. Generate schema with a VALID ref_template + raw_schema = model.model_json_schema(ref_template="#/$defs/{model}") + + # 2. Get $defs for resolving references + defs = raw_schema.get("$defs", {}) + + # 3. Resolve top-level $ref if present + if "$ref" in raw_schema: + ref_name = raw_schema["$ref"].split("/")[-1] + schema = defs.get(ref_name, {}).copy() + else: + schema = {k: v for k, v in raw_schema.items() if k != "$defs"} + + # 4. Recursively resolve all $ref in the schema + schema = self._resolve_refs(schema, defs) + + # 5. Ensure required keys + schema.setdefault("type", "object") + schema.setdefault("properties", {}) + schema.setdefault("required", []) + + # 6. Inject defaults in a JSON-safe way (for UI) + for field_name, field in model.model_fields.items(): + prop = schema["properties"].get(field_name, {}) + + default = field.default + if default is not PydanticUndefined: + try: + # For nested BaseModel defaults, convert to dict + if isinstance(default, pydantic.BaseModel): + default = default.model_dump() + # Only inject if JSON-serializable + json.dumps(default) + prop.setdefault("default", default) + except TypeError: + # Fallback: stringify non-serializable defaults + prop.setdefault("default", str(default)) + + schema["properties"][field_name] = prop + + return schema + + def _resolve_refs(self, obj: Any, defs: dict) -> Any: + """Recursively resolve all $ref references in a schema.""" + if isinstance(obj, dict): + # Handle allOf with single $ref (Pydantic pattern for nested models with defaults) + if "allOf" in obj and isinstance(obj["allOf"], list): + # Merge all schemas in allOf + merged = {} + for item in obj["allOf"]: + resolved_item = self._resolve_refs(item, defs) + if isinstance(resolved_item, dict): + # Deep merge properties + for k, v in resolved_item.items(): + if k == "properties" and "properties" in merged: + merged["properties"].update(v) + elif k == "required" and "required" in merged: + merged["required"] = list(set(merged["required"]) | set(v)) + else: + merged[k] = v + # Also include any other keys from original obj (like default) + for k, v in obj.items(): + if k != "allOf" and k not in merged: + merged[k] = self._resolve_refs(v, defs) + return merged + + # If this dict has a $ref, resolve it + if "$ref" in obj: + ref_path = obj["$ref"] + if ref_path.startswith("#/$defs/"): + ref_name = ref_path.split("/")[-1] + resolved = defs.get(ref_name, {}).copy() + # Recursively resolve any refs in the resolved schema + resolved = self._resolve_refs(resolved, defs) + # Merge with any other keys in the original object + for k, v in obj.items(): + if k != "$ref" and k not in resolved: + resolved[k] = self._resolve_refs(v, defs) + return resolved + return obj + + # Otherwise, recursively process all values + return {k: self._resolve_refs(v, defs) for k, v in obj.items() if k != "$defs"} + + elif isinstance(obj, list): + return [self._resolve_refs(item, defs) for item in obj] + + return obj + + def _get_component_config_info(self, component) -> Optional[dict]: + metadata = component.__metadata__ + config_model = metadata.get("config") + + if not config_model: + return None + + prefix = getattr(config_model, "__prefix__", None) + if not prefix: + return None + + try: + current_config = self._container.provided_config(config_model) + values = current_config.model_dump() if current_config else {} + values = self._make_json_serializable(values) + except Exception: + values = {} + + schema = self._normalize_pydantic_schema(config_model) + + return { + "prefix": prefix, + "values": values, + "schema": schema, + } + + def _make_json_serializable(self, obj): + """Recursively convert non-JSON-serializable objects to strings.""" + if isinstance(obj, dict): + return {k: self._make_json_serializable(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [self._make_json_serializable(v) for v in obj] + elif isinstance(obj, Path): + return str(obj) + elif hasattr(obj, '__str__') and not isinstance(obj, (str, int, float, bool, type(None))): + # For other non-serializable objects, convert to string + try: + json.dumps(obj) + return obj + except (TypeError, ValueError): + return str(obj) + return obj + + def _get_full_state(self) -> dict: + """Get the full application state.""" + if not self._container: + return {} + + components = self._container.components + app = self._container.provided_app() + plugins = self._container.provided_plugins() + libs = self._container.provided_libs() + + initialized_count = sum( + 1 for c in components + if component_internals(c).is_initialized + ) + + # Get config file path from IOCBaseConfig + config_file_path = None + try: + ioc_config = self._container.ioc_config_model + config_file_path = str(ioc_config.config_path) + except Exception: + pass + + return { + "type": "full_state", + "state": { + "app_name": app.__metadata__.get("name", "unknown"), + "app_version": app.__metadata__.get("version", "unknown"), + "total_components": len(components), + "initialized_components": initialized_count, + "plugins_count": len(plugins), + "libraries_count": len(libs), + "config_file": config_file_path, + }, + "components": [self._get_component_info(c) for c in components], + "plugins": [self._get_component_info(p) for p in plugins], + } + + async def broadcast(self, message: dict): + """Broadcast a message to all connected clients.""" + if not self._clients: + return + + data = json.dumps(message) + + async with self._lock: + dead_clients = set() + for client in self._clients: + try: + await client.send(data) + except websockets.exceptions.ConnectionClosed: + dead_clients.add(client) + + self._clients -= dead_clients + + async def broadcast_state(self): + """Broadcast the current full state to all connected clients.""" + await self.broadcast(self._get_full_state()) + + async def broadcast_component_update(self, component_name: str, component_info: dict, old_status: str, + new_status: str): + """Broadcast a component state change.""" + message = { + "type": "component_update", + "component": component_info, + "transition": { + "from": old_status, + "to": new_status, + } + } + await self.broadcast(message) + + async def check_state_changes(self): + """Check for component state changes and broadcast updates.""" + if not self._container or not self._clients: + return + + components = self._container.components + state_changed = False + + for component in components: + name = component.__metadata__.get("name", "unknown") + current_state = self._get_component_state(component) + previous_state = self._previous_states.get(name) + + if previous_state is None: + # First time seeing this component + self._previous_states[name] = current_state + elif current_state != previous_state: + # State changed! + state_changed = True + old_status = previous_state.get_status_label() + new_status = current_state.get_status_label() + + component_info = self._get_component_info(component) + await self.broadcast_component_update(name, component_info, old_status, new_status) + + self._previous_states[name] = current_state + + # If any state changed, also broadcast updated summary stats + if state_changed: + await self.broadcast_state_summary() + + async def broadcast_state_summary(self): + """Broadcast just the summary statistics.""" + if not self._container: + return + + components = self._container.components + app = self._container.provided_app() + plugins = self._container.provided_plugins() + libs = self._container.provided_libs() + + initialized_count = sum( + 1 for c in components + if component_internals(c).is_initialized + ) + + message = { + "type": "state_summary", + "state": { + "app_name": app.__metadata__.get("name", "unknown"), + "app_version": app.__metadata__.get("version", "unknown"), + "total_components": len(components), + "initialized_components": initialized_count, + "plugins_count": len(plugins), + "libraries_count": len(libs), + } + } + await self.broadcast(message) + + async def start_monitoring(self, interval: float = 0.25): + """Start the state monitoring loop.""" + self._monitoring = True + while self._monitoring: + try: + await self.check_state_changes() + except Exception: + pass # Don't let monitoring errors crash the loop + await asyncio.sleep(interval) + + def stop_monitoring(self): + """Stop the state monitoring loop.""" + self._monitoring = False + + async def handle_client(self, websocket: WebSocketServerProtocol): + """Handle a WebSocket client connection.""" + await self.register(websocket) + try: + # Send initial full state + state = self._get_full_state() + await websocket.send(json.dumps(state)) + + # Send initial logs + if self._log_buffer: + logs_message = { + "type": "logs_history", + "logs": self._log_buffer.get_all(), + } + await websocket.send(json.dumps(logs_message)) + + # Keep connection alive and handle incoming messages + async for message in websocket: + try: + data = json.loads(message) + await self._handle_message(websocket, data) + except json.JSONDecodeError: + await websocket.send(json.dumps({"type": "error", "error": "Invalid JSON"})) + except websockets.exceptions.ConnectionClosed: + pass + finally: + await self.unregister(websocket) + + async def _handle_message(self, websocket: WebSocketServerProtocol, data: dict): + """Handle incoming WebSocket messages.""" + action = data.get("action") + + if action == "refresh": + state = self._get_full_state() + await websocket.send(json.dumps(state)) + + elif action == "get_logs": + if self._log_buffer: + last_id = data.get("since_id", 0) + logs = self._log_buffer.get_since(last_id) if last_id else self._log_buffer.get_all() + await websocket.send(json.dumps({ + "type": "logs_history", + "logs": logs, + })) + + elif action == "clear_logs": + if self._log_buffer: + self._log_buffer.clear() + await websocket.send(json.dumps({ + "type": "success", + "message": "Logs cleared", + })) + await self.broadcast({"type": "logs_cleared"}) + + elif action == "enable_plugin": + plugin_name = data.get("name") + result = await self._enable_plugin(plugin_name) + await websocket.send(json.dumps(result)) + + elif action == "disable_plugin": + plugin_name = data.get("name") + result = await self._disable_plugin(plugin_name) + await websocket.send(json.dumps(result)) + + elif action == "register_plugin": + plugin_path = data.get("path") + result = await self._register_plugin_from_path(plugin_path) + await websocket.send(json.dumps(result)) + # Refresh state for all clients after registration + if result.get("type") == "success": + await self.broadcast_state() + + elif action == "upload_plugin": + upload_type = data.get("type") + if upload_type == "file": + result = await self._upload_plugin_file(data.get("filename"), data.get("content")) + elif upload_type == "directory": + result = await self._upload_plugin_directory(data.get("dirname"), data.get("files")) + else: + result = {"type": "error", "error": "Invalid upload type"} + await websocket.send(json.dumps(result)) + # Refresh state for all clients after registration + if result.get("type") == "success": + await self.broadcast_state() + + elif action == "save_config": + component_name = data.get("name") + config_values = data.get("config") + result = await self._save_component_config(component_name, config_values) + await websocket.send(json.dumps(result)) + # Refresh state for all clients after config change + if result.get("type") == "success": + await self.broadcast_state() + + def _run_in_main_loop(self, coro) -> Any: + """Run a coroutine in the main event loop and wait for result.""" + if self._main_loop is None: + raise RuntimeError("Main event loop not set") + + future = asyncio.run_coroutine_threadsafe(coro, self._main_loop) + return future.result(timeout=30) # 30 second timeout + + @inject + async def _enable_plugin( + self, + plugin_name: str, + logger=get_logger() + ) -> dict: + """Enable a plugin.""" + if not plugin_name: + return {"type": "error", "error": "Plugin name required"} + + plugin = self._container.provided_plugin(plugin_name) + if plugin is None: + return {"type": "error", "error": f"Plugin '{plugin_name}' not found"} + + internals = component_internals(plugin) + if internals.is_initialized: + return {"type": "info", "message": f"Plugin '{plugin_name}' is already enabled"} + + try: + # Run in main event loop to avoid cross-loop issues + await asyncio.get_event_loop().run_in_executor( + None, + lambda: self._run_in_main_loop(initialize_components(plugin)) + ) + return {"type": "success", "message": f"Plugin '{plugin_name}' enabled successfully"} + except Exception as e: + logger.error(f"Error enabling plugin '{plugin_name}'", exc_info=e) + return {"type": "error", "error": str(e)} + + @inject + async def _disable_plugin( + self, + plugin_name: str, + logger=get_logger() + ) -> dict: + """Disable a plugin.""" + if not plugin_name: + return {"type": "error", "error": "Plugin name required"} + + plugin = self._container.provided_plugin(plugin_name) + if plugin is None: + return {"type": "error", "error": f"Plugin '{plugin_name}' not found"} + + internals = component_internals(plugin) + if not internals.is_initialized: + return {"type": "info", "message": f"Plugin '{plugin_name}' is already disabled"} + + if internals.required_by: + required_names = [r.__metadata__.get("name") for r in internals.required_by] + return { + "type": "error", + "error": f"Cannot disable plugin '{plugin_name}': required by {required_names}" + } + + try: + # Run in main event loop to avoid cross-loop issues + await asyncio.get_event_loop().run_in_executor( + None, + lambda: self._run_in_main_loop(shutdown_components(plugin)) + ) + return {"type": "success", "message": f"Plugin '{plugin_name}' disabled successfully"} + except Exception as e: + logger.error(f"Error disabling plugin '{plugin_name}'", exc_info=e) + return {"type": "error", "error": str(e)} + + @inject + async def _register_plugin_from_path( + self, + plugin_path: str, + logger=get_logger() + ) -> dict: + """Register a new plugin from a file path.""" + if not plugin_path: + return {"type": "error", "error": "Plugin path required"} + + try: + path = Path(plugin_path) + if not path.exists(): + return {"type": "error", "error": f"File not found: {plugin_path}"} + + # Load the component from the file + plugin = compile_component(path) + plugin_name = plugin.__metadata__.get("name", "unknown") + + # Check if already registered + existing = self._container.provided_plugin(plugin_name) + if existing is not None: + return {"type": "info", "message": f"Plugin '{plugin_name}' is already registered"} + + # Register and initialize the plugin in the main event loop + async def register_and_init(): + await register_plugin(self._container, plugin) + reconfigure_ioc_app(self._container, (plugin,)) + await initialize_components(plugin) + + await asyncio.get_event_loop().run_in_executor( + None, + lambda: self._run_in_main_loop(register_and_init()) + ) + + # Update the log buffer with the new component info + if self._log_buffer: + module_name = plugin.__name__ + display_name = plugin_name + internals = component_internals(plugin) + comp_type = internals.type.value + with self._log_buffer._lock: + self._log_buffer._component_info[module_name] = (display_name, comp_type) + + return {"type": "success", "message": f"Plugin '{plugin_name}' registered and initialized successfully"} + except Exception as e: + logger.error(f"Error registering plugin from '{plugin_path}'", exc_info=e) + return {"type": "error", "error": str(e)} + + @inject + async def _upload_plugin_file( + self, + filename: str, + content: str, + logger=get_logger() + ) -> dict: + """Handle single file plugin upload.""" + if not filename or not content: + return {"type": "error", "error": "Filename and content required"} + + try: + # Create a temporary directory for the plugin + temp_dir = Path(tempfile.mkdtemp(prefix="plugin_")) + plugin_path = temp_dir / filename + + # Write the file + plugin_path.write_text(content, encoding="utf-8") + + # Register the plugin + return await self._register_plugin_from_path(str(plugin_path)) + except Exception as e: + logger.error(f"Error uploading plugin file '{filename}'", exc_info=e) + return {"type": "error", "error": str(e)} + + @inject + async def _upload_plugin_directory( + self, + dirname: str, + files: Dict[str, str], + logger=get_logger() + ) -> dict: + """Handle directory plugin upload.""" + if not dirname or not files: + return {"type": "error", "error": "Directory name and files required"} + + try: + # Create a temporary directory for the plugin + temp_dir = Path(tempfile.mkdtemp(prefix="plugin_")) + + # Write all files preserving directory structure + for relative_path, content in files.items(): + file_path = temp_dir / relative_path + file_path.parent.mkdir(parents=True, exist_ok=True) + file_path.write_text(content, encoding="utf-8") + + # The plugin directory is the first component of the path + plugin_dir = temp_dir / dirname + + # Register the plugin + return await self._register_plugin_from_path(str(plugin_dir)) + except Exception as e: + logger.error(f"Error uploading plugin directory '{dirname}'", exc_info=e) + return {"type": "error", "error": str(e)} + + @inject + async def _save_component_config( + self, + component_name: str, + config_values: Dict[str, Any], + logger=get_logger() + ) -> dict: + """Save configuration for a component to the ioc.yaml file.""" + if not component_name: + return {"type": "error", "error": "Component name required"} + + if not config_values: + return {"type": "error", "error": "Configuration values required"} + + try: + # Find the component + component = None + for c in self._container.components: + if c.__metadata__.get("name") == component_name: + component = c + break + + if component is None: + return {"type": "error", "error": f"Component '{component_name}' not found"} + + # Get the config model and prefix + config_model = component.__metadata__.get("config") + if not config_model: + return {"type": "error", "error": f"Component '{component_name}' has no configuration"} + + prefix = getattr(config_model, "__prefix__", None) + if not prefix: + return {"type": "error", "error": f"Component '{component_name}' config has no prefix"} + + # Validate the new config values against the model + try: + validated_config = config_model(**config_values) + except pydantic.ValidationError as e: + return {"type": "error", "error": f"Invalid configuration: {e}"} + + # Get the config file path from IOCBaseConfig + try: + ioc_config = self._container.ioc_config_model + ioc_yaml_path = ioc_config.config_path + except Exception: + return {"type": "error", "error": "Could not get config file path from IOCBaseConfig"} + + if not ioc_yaml_path.exists(): + return {"type": "error", "error": f"Config file not found: {ioc_yaml_path}"} + + # Read current yaml content + with open(ioc_yaml_path, 'r', encoding='utf-8') as f: + yaml_content = yaml.safe_load(f) or {} + + # Update the config section + yaml_content[prefix] = validated_config.model_dump() + + for key, value in yaml_content[prefix].items(): + # Check each value, if it is not a basic type, convert to string + if not isinstance(value, (str, int, float, bool, type(None), list, dict)): + yaml_content[prefix][key] = str(value) + + # Write back to file + with open(ioc_yaml_path, 'w', encoding='utf-8') as f: + yaml.dump(yaml_content, f, default_flow_style=False, allow_unicode=True, sort_keys=False) + + logger.info(f"Saved configuration for '{component_name}' (prefix: {prefix})") + return {"type": "success", + "message": f"Configuration saved for '{component_name}'. Restart required for changes to take effect."} + + except Exception as e: + logger.error(f"Error saving config for '{component_name}'", exc_info=e) + return {"type": "error", "error": str(e)} + + +# Global WebSocket manager instance +ws_manager = WebSocketManager() + + class DashboardRequestHandler(BaseHTTPRequestHandler): """HTTP request handler for the management dashboard.""" @@ -210,7 +1101,6 @@ def _handle_enable_plugin(self, container: ContainerInterface): self._send_json_response({"message": f"Plugin '{plugin_name}' is already enabled"}) return - # Run initialization in event loop loop = asyncio.new_event_loop() try: loop.run_until_complete(initialize_components(plugin)) @@ -253,7 +1143,6 @@ def _handle_disable_plugin(self, container: ContainerInterface): }, 400) return - # Run shutdown in event loop loop = asyncio.new_event_loop() try: loop.run_until_complete(shutdown_components(plugin)) @@ -273,13 +1162,49 @@ class ManagementDashboardApp: Management Dashboard App Component. Provides a web interface for monitoring and managing IOC components. + Supports real-time updates via WebSocket with automatic state monitoring. + Includes real-time log streaming with filtering capabilities. """ def __init__(self): self._server: Optional[ThreadingHTTPServer] = None + self._ws_server = None self._thread: Optional[Thread] = None + self._ws_thread: Optional[Thread] = None self._running = False self._shutdown_event: Optional[asyncio.Event] = None + self._ws_loop: Optional[asyncio.AbstractEventLoop] = None + self._monitor_interval: float = 0.25 + self._log_handler: Optional[DashboardLogHandler] = None + + def _run_ws_server(self, host: str, port: int): + """Run the WebSocket server in a separate thread with state monitoring.""" + self._ws_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._ws_loop) + + # Set the WS loop on the manager so it can broadcast logs + ws_manager.set_ws_loop(self._ws_loop) + + async def serve(): + # Start the state monitoring task + monitor_task = asyncio.create_task( + ws_manager.start_monitoring(self._monitor_interval) + ) + + async with websockets.serve(ws_manager.handle_client, host, port): + while self._running: + await asyncio.sleep(0.1) + + # Stop monitoring when server stops + ws_manager.stop_monitoring() + monitor_task.cancel() + try: + await monitor_task + except asyncio.CancelledError: + pass + + self._ws_loop.run_until_complete(serve()) + self._ws_loop.close() @inject async def initialize( @@ -290,22 +1215,60 @@ async def initialize( ) -> None: """Start the management dashboard server.""" self._shutdown_event = asyncio.Event() + self._running = True + self._monitor_interval = config.monitor_interval - # Store container reference in handler class + # Store container reference and main event loop DashboardRequestHandler.container = container + ws_manager.set_container(container) + ws_manager.set_main_loop(asyncio.get_running_loop()) - logger.info(f"Starting Management Dashboard on {config.host}:{config.port}") + # Set up log buffer with component type mappings + log_buffer._buffer = deque(maxlen=config.log_buffer_size) + component_info = {} + for comp in container.components: + # Use module name (e.g., "samples.management_dashboard.management_dashboard") as key + display_name = comp.__metadata__.get("name", "unknown") + module_name = display_name + internals = component_internals(comp) + comp_type = internals.type.value + component_info[module_name] = (display_name, comp_type) + log_buffer.set_component_info(component_info) + + # Set up log handler + ws_manager.set_log_buffer(log_buffer) + self._log_handler = DashboardLogHandler( + log_buffer, + broadcast_callback=ws_manager.on_new_log + ) + self._log_handler.setLevel(logging.DEBUG) + # Attach handler to root logger to capture all logs + root_logger = logging.getLogger() + root_logger.addHandler(self._log_handler) + + # Start HTTP server + logger.info(f"Starting Management Dashboard on {config.host}:{config.port}") self._server = ThreadingHTTPServer( (config.host, config.port), DashboardRequestHandler ) - self._running = True - self._thread = Thread(target=self._server.serve_forever, daemon=True) self._thread.start() + # Start WebSocket server with state monitoring + logger.info(f"Starting WebSocket server on {config.host}:{config.ws_port}") + logger.info(f"State monitoring interval: {config.monitor_interval}s") + self._ws_thread = Thread( + target=self._run_ws_server, + args=(config.host, config.ws_port), + daemon=True + ) + self._ws_thread.start() + logger.info(f"Management Dashboard running at http://{config.host}:{config.port}") + logger.info(f"WebSocket available at ws://{config.host}:{config.ws_port}") + logger.info(f"Log buffer size: {config.log_buffer_size} entries") async def wait(self) -> None: """Wait until shutdown is requested.""" @@ -315,10 +1278,17 @@ async def wait(self) -> None: async def shutdown(self) -> None: """Stop the management dashboard server.""" self._running = False + ws_manager.stop_monitoring() if self._shutdown_event: self._shutdown_event.set() + # Remove log handler + if self._log_handler: + root_logger = logging.getLogger() + root_logger.removeHandler(self._log_handler) + self._log_handler = None + if self._server: self._server.shutdown() self._server.server_close() @@ -328,6 +1298,10 @@ async def shutdown(self) -> None: self._thread.join(timeout=2) self._thread = None + if self._ws_thread: + self._ws_thread.join(timeout=2) + self._ws_thread = None + management_dashboard_app = ManagementDashboardApp() initialize = management_dashboard_app.initialize diff --git a/samples/management_dashboard/web/index.html b/samples/management_dashboard/web/index.html index 144d846..e54ccbd 100644 --- a/samples/management_dashboard/web/index.html +++ b/samples/management_dashboard/web/index.html @@ -1,153 +1,2194 @@ - IOC Management Dashboard + + IOC Management Dashboard — Combined Items (title spacing, moved tag/status, apps no controls)
-

IOC Management Dashboard

- +
+

IOC Management Dashboard

+
+ + Connecting... +
+ +

Application State

-

Components

-
+

Components & Plugins

+ + +
+ + + + + + + +
+ + +
+ + + + + +
+ + +
-

Plugins

-
+

Logs

+
+
+ + +
+
+ 0 logs + + + +
+
+
+
No logs yet...
+
+ + + +
+ From b76c2a4447ab76ad2d98188fb834dc3d59a6858a Mon Sep 17 00:00:00 2001 From: Pablo Valverde <92272697+pavalso@users.noreply.github.com> Date: Mon, 22 Dec 2025 13:31:43 +0100 Subject: [PATCH 05/24] Implement HTTP File Server with upload, download, and delete capabilities --- samples/http_server/http_server.py | 508 ++++++++++++++++-- samples/http_server/ioc.yaml | 17 +- .../management_dashboard.py | 51 +- .../plugins/sample_plugin_a.py | 42 +- .../plugins/sample_plugin_b.py | 54 +- .../plugins/sample_plugin_c.py | 63 +-- samples/management_dashboard/web/index.html | 356 ++++++++---- 7 files changed, 807 insertions(+), 284 deletions(-) diff --git a/samples/http_server/http_server.py b/samples/http_server/http_server.py index 4a14e7c..f04adf6 100644 --- a/samples/http_server/http_server.py +++ b/samples/http_server/http_server.py @@ -1,11 +1,23 @@ """ -Simple HTTP Server App Component +HTTP File Server App Component -A minimal HTTP server demonstrating the IOC framework. +A full-featured HTTP file server similar to `python -m http.server` with: +- Directory browsing +- File upload +- Folder download as ZIP +- File/folder deletion """ import asyncio +import html +import io +import mimetypes +import os +import shutil +import urllib.parse +import zipfile from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler +from pathlib import Path from threading import Thread from typing import Optional @@ -20,58 +32,445 @@ class ServerConfig(pydantic.BaseModel): host: str = "127.0.0.1" port: int = 8080 + root_dir: Path = Path("./public") + allow_upload: bool = False + allow_delete: bool = False + allow_zip_download: bool = False + __metadata__ = { - "name": "http_server_app", - "version": "1.0.0", - "description": "Simple HTTP Server Application", + "name": "HTTP File Server", + "version": "2.0.0", + "description": "HTTP File Server with upload, download, and delete capabilities", "wire": True, "config": ServerConfig } -class RequestHandler(BaseHTTPRequestHandler): - """Simple HTTP request handler.""" +# Global config reference (set during initialization) +_server_config: Optional[ServerConfig] = None + + +class FileServerHandler(BaseHTTPRequestHandler): + """HTTP request handler for file server operations.""" + + @property + def root_dir(self) -> Path: + return _server_config.root_dir.resolve() if _server_config else Path(".").resolve() + + def _get_fs_path(self, url_path: str) -> Optional[Path]: + """Convert URL path to filesystem path, ensuring it's within root.""" + # Decode URL and normalize + decoded = urllib.parse.unquote(url_path) + # Remove leading slash and normalize + clean_path = decoded.lstrip("/") + # Resolve to absolute path + fs_path = (self.root_dir / clean_path).resolve() + # Security check: ensure path is within root + try: + fs_path.relative_to(self.root_dir) + except ValueError: + return None + return fs_path + + def _send_error_page(self, code: int, message: str): + """Send an error page.""" + self.send_response(code) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.end_headers() + self.wfile.write(f""" + +Error {code} + +

Error {code}

+

{html.escape(message)}

+Back to root + +""".encode("utf-8")) + + def _send_json(self, data: dict, code: int = 200): + """Send a JSON response.""" + import json + self.send_response(code) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(data).encode("utf-8")) + + def _format_size(self, size: int) -> str: + """Format file size in human-readable format.""" + for unit in ["B", "KB", "MB", "GB", "TB"]: + if size < 1024: + return f"{size:.1f} {unit}" if unit != "B" else f"{size} {unit}" + size /= 1024 + return f"{size:.1f} PB" @inject - def do_GET( - self, - logger=get_logger() - ): + def do_GET(self, logger=get_logger()): """Handle GET requests.""" + parsed = urllib.parse.urlparse(self.path) + url_path = parsed.path + query = urllib.parse.parse_qs(parsed.query) + logger.info(f"GET {self.path} FROM {self.client_address[0]}:{self.client_address[1]}") - if self.path == "/": - self.send_response(200) - self.send_header("Content-Type", "text/html") + + # Handle ZIP download request + if "zip" in query and _server_config and _server_config.allow_zip_download: + self._handle_zip_download(url_path) + return + + fs_path = self._get_fs_path(url_path) + + if fs_path is None: + self._send_error_page(403, "Access denied: Path outside root directory") + return + + if not fs_path.exists(): + self._send_error_page(404, f"Path not found: {url_path}") + return + + if fs_path.is_dir(): + self._serve_directory(url_path, fs_path) + else: + self._serve_file(fs_path) + + def _serve_directory(self, url_path: str, fs_path: Path): + """Serve a directory listing.""" + # Ensure URL path ends with / + if not url_path.endswith("/"): + self.send_response(301) + self.send_header("Location", url_path + "/") self.end_headers() - self.wfile.write(b""" - + return + + entries = [] + try: + for entry in sorted(fs_path.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())): + stat = entry.stat() + entries.append({ + "name": entry.name, + "is_dir": entry.is_dir(), + "size": stat.st_size if entry.is_file() else 0, + "mtime": stat.st_mtime, + }) + except PermissionError: + self._send_error_page(403, "Permission denied") + return + + # Generate HTML + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.end_headers() + + parent_link = "" + if url_path != "/": + parent = str(Path(url_path).parent) + if not parent.endswith("/"): + parent += "/" + parent_link = f'📁 ..' + + rows = [] + for e in entries: + name = html.escape(e["name"]) + href = html.escape(urllib.parse.quote(e["name"])) + if e["is_dir"]: + icon = "📁" + href += "/" + size_str = "-" + zip_link = f'📦' if _server_config and _server_config.allow_zip_download else "" + else: + icon = "📄" + size_str = self._format_size(e["size"]) + zip_link = "" + + delete_btn = "" + if _server_config and _server_config.allow_delete: + delete_btn = f'' + + rows.append(f""" + + {icon} {name} + {size_str} + {zip_link} + {delete_btn} + + """) + + upload_form = "" + if _server_config and _server_config.allow_upload: + upload_form = """ +
+

Upload Files

+
+ + +
+
+
+ """ + + html_content = f""" -IOC Test Server + + + Index of {html.escape(url_path)} + + -

IOC Framework Test Server

-

The server is running successfully!

- +

Index of {html.escape(url_path)}

+ + + + + + + + + + + {parent_link} + {"".join(rows)} + +
NameSizeZIPDelete
+ {upload_form} + - -""") - elif self.path == "/health": +""" + self.wfile.write(html_content.encode("utf-8")) + + def _serve_file(self, fs_path: Path): + """Serve a file.""" + try: + content_type, _ = mimetypes.guess_type(str(fs_path)) + if content_type is None: + content_type = "application/octet-stream" + + stat = fs_path.stat() self.send_response(200) - self.send_header("Content-Type", "application/json") + self.send_header("Content-Type", content_type) + self.send_header("Content-Length", str(stat.st_size)) + self.send_header("Content-Disposition", f'inline; filename="{fs_path.name}"') self.end_headers() - self.wfile.write(b'{"status": "healthy"}') - elif self.path == "/info": + + with open(fs_path, "rb") as f: + shutil.copyfileobj(f, self.wfile) + except PermissionError: + self._send_error_page(403, "Permission denied") + except Exception as e: + self._send_error_page(500, str(e)) + + def _handle_zip_download(self, url_path: str): + """Handle downloading a directory as ZIP.""" + fs_path = self._get_fs_path(url_path) + + if fs_path is None: + self._send_error_page(403, "Access denied") + return + + if not fs_path.exists() or not fs_path.is_dir(): + self._send_error_page(404, "Directory not found") + return + + try: + # Create ZIP in memory + zip_buffer = io.BytesIO() + with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf: + for root, dirs, files in os.walk(fs_path): + for file in files: + file_path = Path(root) / file + arcname = file_path.relative_to(fs_path) + try: + zf.write(file_path, arcname) + except PermissionError: + pass # Skip files we can't read + + zip_data = zip_buffer.getvalue() + zip_name = fs_path.name or "root" + self.send_response(200) - self.send_header("Content-Type", "application/json") - self.end_headers() - self.wfile.write(b'{"name": "IOC Test Server", "version": "1.0.0"}') - else: - self.send_response(404) - self.send_header("Content-Type", "text/plain") + self.send_header("Content-Type", "application/zip") + self.send_header("Content-Length", str(len(zip_data))) + self.send_header("Content-Disposition", f'attachment; filename="{zip_name}.zip"') self.end_headers() - self.wfile.write(b"Not Found") + self.wfile.write(zip_data) + except Exception as e: + self._send_error_page(500, str(e)) + + @inject + def do_POST(self, logger=get_logger()): + """Handle POST requests (file upload).""" + logger.info(f"POST {self.path} FROM {self.client_address[0]}:{self.client_address[1]}") + + if not _server_config or not _server_config.allow_upload: + self._send_json({"error": "Upload not allowed"}, 403) + return + + fs_path = self._get_fs_path(self.path) + if fs_path is None or not fs_path.is_dir(): + self._send_json({"error": "Invalid upload destination"}, 400) + return + + content_type = self.headers.get("Content-Type", "") + if "multipart/form-data" not in content_type: + self._send_json({"error": "Invalid content type"}, 400) + return + + try: + # Parse multipart form data + boundary = content_type.split("boundary=")[1].strip() + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length) + + files_saved = [] + parts = body.split(f"--{boundary}".encode()) + + for part in parts: + if b"filename=" not in part: + continue + + # Extract filename + header_end = part.find(b"\r\n\r\n") + if header_end == -1: + continue + + header = part[:header_end].decode("utf-8", errors="ignore") + file_content = part[header_end + 4:] + if file_content.endswith(b"\r\n"): + file_content = file_content[:-2] + + # Parse filename from Content-Disposition + for line in header.split("\r\n"): + if "filename=" in line: + start = line.find('filename="') + 10 + end = line.find('"', start) + if end > start: + filename = line[start:end] + # Security: only use basename + filename = Path(filename).name + if filename: + dest = fs_path / filename + with open(dest, "wb") as f: + f.write(file_content) + files_saved.append(filename) + logger.info(f"Uploaded: {dest}") + + if files_saved: + self._send_json({"message": f"Uploaded {len(files_saved)} file(s): {', '.join(files_saved)}"}) + else: + self._send_json({"error": "No files uploaded"}, 400) + + except Exception as e: + logger.error(f"Upload error: {e}") + self._send_json({"error": str(e)}, 500) + + @inject + def do_DELETE(self, logger=get_logger()): + """Handle DELETE requests.""" + logger.info(f"DELETE {self.path} FROM {self.client_address[0]}:{self.client_address[1]}") + + if not _server_config or not _server_config.allow_delete: + self._send_json({"error": "Delete not allowed"}, 403) + return + + fs_path = self._get_fs_path(self.path) + if fs_path is None: + self._send_json({"error": "Access denied"}, 403) + return + + if not fs_path.exists(): + self._send_json({"error": "Path not found"}, 404) + return + + # Prevent deleting the root directory + if fs_path == self.root_dir: + self._send_json({"error": "Cannot delete root directory"}, 403) + return + + try: + if fs_path.is_dir(): + shutil.rmtree(fs_path) + logger.info(f"Deleted directory: {fs_path}") + else: + fs_path.unlink() + logger.info(f"Deleted file: {fs_path}") + + self._send_json({"message": "Deleted successfully"}) + except PermissionError: + self._send_json({"error": "Permission denied"}, 403) + except Exception as e: + logger.error(f"Delete error: {e}") + self._send_json({"error": str(e)}, 500) def log_message(self, format, *args): """Suppress default logging.""" @@ -80,10 +479,9 @@ def log_message(self, format, *args): class HttpServerApp: """ - HTTP Server App Component. + HTTP File Server App Component. - This is an AppComponent that runs a simple HTTP server. - AppComponents require both initialize() and shutdown() methods. + Provides a web-based file browser with upload, download, and delete capabilities. """ def __init__(self): @@ -95,32 +493,45 @@ def __init__(self): @inject async def initialize( self, - logger = get_logger(), - config = get_config(ServerConfig) + logger=get_logger(), + config=get_config(ServerConfig) ) -> None: """Start the HTTP server.""" + global _server_config + _server_config = config + self._shutdown_event = asyncio.Event() - logger.info(f"Starting HTTP server on {config.host}:{config.port}") + # Resolve and validate root directory + root = config.root_dir.resolve() + if not root.exists(): + logger.warning(f"Root directory does not exist, creating: {root}") + root.mkdir(parents=True, exist_ok=True) - self._server = ThreadingHTTPServer((config.host, config.port), RequestHandler) + logger.info(f"Starting HTTP File Server on {config.host}:{config.port}") + logger.info(f"Serving files from: {root}") + logger.info(f"Upload: {'enabled' if config.allow_upload else 'disabled'}") + logger.info(f"Delete: {'enabled' if config.allow_delete else 'disabled'}") + logger.info(f"ZIP download: {'enabled' if config.allow_zip_download else 'disabled'}") + + self._server = ThreadingHTTPServer((config.host, config.port), FileServerHandler) self._running = True - # Use serve_forever in a thread - it handles shutdown properly self._thread = Thread(target=self._server.serve_forever, daemon=True) self._thread.start() - logger.info(f"HTTP server running at http://{config.host}:{config.port}") + logger.info(f"HTTP File Server running at http://{config.host}:{config.port}") async def wait(self) -> None: """Wait until shutdown is requested.""" if self._shutdown_event: await self._shutdown_event.wait() - async def shutdown( - self - ) -> None: + async def shutdown(self) -> None: """Stop the HTTP server.""" + global _server_config + _server_config = None + self._running = False if self._shutdown_event: @@ -135,6 +546,7 @@ async def shutdown( self._thread.join(timeout=2) self._thread = None + http_server_app = HttpServerApp() initialize = http_server_app.initialize shutdown = http_server_app.shutdown diff --git a/samples/http_server/ioc.yaml b/samples/http_server/ioc.yaml index 98e65f7..242a855 100644 --- a/samples/http_server/ioc.yaml +++ b/samples/http_server/ioc.yaml @@ -1,15 +1,8 @@ -# IOC Framework Configuration -# This file defines the application components - app: http_server - -# Optional: Define libraries -# libraries: - -# Optional: Define plugins -# plugins: - -# Application-specific configuration server: - host: "127.0.0.1" + host: 127.0.0.1 port: 8080 + root_dir: ./public + allow_upload: false + allow_delete: false + allow_zip_download: false diff --git a/samples/management_dashboard/management_dashboard.py b/samples/management_dashboard/management_dashboard.py index b05c687..346b9ed 100644 --- a/samples/management_dashboard/management_dashboard.py +++ b/samples/management_dashboard/management_dashboard.py @@ -14,6 +14,7 @@ import logging import tempfile import time +import warnings from collections import deque from dataclasses import dataclass from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler @@ -41,9 +42,6 @@ ) from awioc.loader.module_loader import compile_component -# Path to the web assets directory -WEB_DIR = Path(__file__).parent / "web" - class DashboardConfig(pydantic.BaseModel): """Dashboard Server configuration.""" @@ -52,10 +50,13 @@ class DashboardConfig(pydantic.BaseModel): host: str = "127.0.0.1" port: int = 8090 ws_port: int = 8091 - monitor_interval: float = 0.25 # State check interval in seconds - log_buffer_size: int = 500 # Maximum number of log entries to keep + monitor_interval: float = 0.25 + log_buffer_size: int = 500 +# Path to the web assets directory +WEB_DIR = Path(__file__).parent / "web" + __metadata__ = { "name": "Management Dashboard", "version": "1.3.0", @@ -294,6 +295,13 @@ def _get_component_info(self, component) -> dict: # Get configuration info config_info = self._get_component_config_info(component) + # Get requires (dependencies) + requires = metadata.get("requires", set()) + requires_names = [ + req.__metadata__.get("name", "unknown") + for req in requires + ] + return { "name": metadata.get("name", "unknown"), "version": metadata.get("version", "unknown"), @@ -309,12 +317,26 @@ def _get_component_info(self, component) -> dict: req.__metadata__.get("name", "unknown") for req in internals.required_by ], - "config": config_info + "config": config_info, + # Internal data + "internals": { + "module": getattr(component, "__name__", "unknown"), + "wire": metadata.get("wire", False), + "requires": requires_names, + "initialized_by": [ + req.__metadata__.get("name", "unknown") + for req in internals.initialized_by + ], + } } def _normalize_pydantic_schema(self, model: type[pydantic.BaseModel]) -> dict: # 1. Generate schema with a VALID ref_template - raw_schema = model.model_json_schema(ref_template="#/$defs/{model}") + # Suppress PydanticJsonSchemaWarning for non-serializable defaults + # (we handle default injection manually below) + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=pydantic.json_schema.PydanticJsonSchemaWarning) + raw_schema = model.model_json_schema(ref_template="#/$defs/{model}") # 2. Get $defs for resolving references defs = raw_schema.get("$defs", {}) @@ -648,11 +670,17 @@ async def _handle_message(self, websocket: WebSocketServerProtocol, data: dict): plugin_name = data.get("name") result = await self._enable_plugin(plugin_name) await websocket.send(json.dumps(result)) + # Refresh state for all clients after enable + if result.get("type") == "success": + await self.broadcast_state() elif action == "disable_plugin": plugin_name = data.get("name") result = await self._disable_plugin(plugin_name) await websocket.send(json.dumps(result)) + # Refresh state for all clients after disable + if result.get("type") == "success": + await self.broadcast_state() elif action == "register_plugin": plugin_path = data.get("path") @@ -739,8 +767,13 @@ async def _disable_plugin( if not internals.is_initialized: return {"type": "info", "message": f"Plugin '{plugin_name}' is already disabled"} - if internals.required_by: - required_names = [r.__metadata__.get("name") for r in internals.required_by] + # Only block if there are initialized components that require this plugin + active_dependents = [ + r for r in internals.required_by + if component_internals(r).is_initialized + ] + if active_dependents: + required_names = [r.__metadata__.get("name") for r in active_dependents] return { "type": "error", "error": f"Cannot disable plugin '{plugin_name}': required by {required_names}" diff --git a/samples/management_dashboard/plugins/sample_plugin_a.py b/samples/management_dashboard/plugins/sample_plugin_a.py index 3fc3887..935a898 100644 --- a/samples/management_dashboard/plugins/sample_plugin_a.py +++ b/samples/management_dashboard/plugins/sample_plugin_a.py @@ -5,39 +5,41 @@ the management dashboard's enable/disable functionality. """ -from awioc import get_logger, inject +from typing import Literal + +import pydantic + +from awioc import get_logger, inject, get_config + + +class DatabaseConfig(pydantic.BaseModel): + """Configuration for the database plugin.""" + __prefix__ = "database" + + host: str = "localhost" + port: int = 5432 + database: str = "app_db" + pool_size: int = 5 + timeout: int = 30 + ssl_mode: Literal["disable", "prefer", "require", "verify-ca", "verify-full"] = "prefer" + max_overflow: int = 5 __metadata__ = { "name": "database_plugin", "version": "1.0.0", "description": "Simulates a database connection service", "wire": True, + "config": DatabaseConfig } -_connected = False - - @inject -async def initialize(logger=get_logger()): - global _connected - logger.info("Database plugin: Establishing connection...") - _connected = True +async def initialize(logger=get_logger(), config=get_config(DatabaseConfig)): + logger.info(f"Database plugin: Connecting to {config.host}:{config.port}/{config.database}") + logger.info(f"Database plugin: Pool size={config.pool_size}, timeout={config.timeout}s") logger.info("Database plugin: Connection established") @inject async def shutdown(logger=get_logger()): - global _connected logger.info("Database plugin: Closing connection...") - _connected = False logger.info("Database plugin: Connection closed") - - -def is_connected() -> bool: - return _connected - - -async def query(sql: str) -> dict: - if not _connected: - raise RuntimeError("Database not connected") - return {"result": f"Executed: {sql}"} diff --git a/samples/management_dashboard/plugins/sample_plugin_b.py b/samples/management_dashboard/plugins/sample_plugin_b.py index 3534d13..0d7e8bd 100644 --- a/samples/management_dashboard/plugins/sample_plugin_b.py +++ b/samples/management_dashboard/plugins/sample_plugin_b.py @@ -5,53 +5,39 @@ the management dashboard's enable/disable functionality. """ -from awioc import get_logger, inject +from typing import Literal + +import pydantic + +from awioc import get_logger, inject, get_config + + +class CacheConfig(pydantic.BaseModel): + """Configuration for the cache plugin.""" + __prefix__ = "cache" + + backend: Literal["memory", "redis", "memcached"] = "memory" + max_size: int = 1000 + default_ttl: int = 3600 + eviction_policy: Literal["lru", "lfu", "fifo"] = "lru" + compression: bool = False __metadata__ = { "name": "cache_plugin", "version": "1.2.0", "description": "Simulates an in-memory cache service", "wire": True, + "config": CacheConfig } -_cache: dict = {} -_enabled = False - - @inject -async def initialize(logger=get_logger()): - global _enabled, _cache - logger.info("Cache plugin: Initializing cache...") - _cache = {} - _enabled = True +async def initialize(logger=get_logger(), config=get_config(CacheConfig)): + logger.info(f"Cache plugin: Initializing {config.backend} cache...") + logger.info(f"Cache plugin: max_size={config.max_size}, ttl={config.default_ttl}s, policy={config.eviction_policy}") logger.info("Cache plugin: Cache ready") @inject async def shutdown(logger=get_logger()): - global _enabled, _cache logger.info("Cache plugin: Clearing cache...") - _cache = {} - _enabled = False logger.info("Cache plugin: Cache cleared") - - -def is_enabled() -> bool: - return _enabled - - -def get(key: str): - if not _enabled: - raise RuntimeError("Cache not enabled") - return _cache.get(key) - - -def set(key: str, value): - if not _enabled: - raise RuntimeError("Cache not enabled") - _cache[key] = value - - -def clear(): - global _cache - _cache = {} diff --git a/samples/management_dashboard/plugins/sample_plugin_c.py b/samples/management_dashboard/plugins/sample_plugin_c.py index c9f9969..023f6c5 100644 --- a/samples/management_dashboard/plugins/sample_plugin_c.py +++ b/samples/management_dashboard/plugins/sample_plugin_c.py @@ -5,65 +5,40 @@ the management dashboard's enable/disable functionality. """ -import time +from typing import Literal -from awioc import get_logger, inject +import pydantic + +from awioc import get_logger, inject, get_config + + +class MetricsConfig(pydantic.BaseModel): + """Configuration for the metrics plugin.""" + __prefix__ = "metrics" + + collection_interval: int = 10 + export_format: Literal["prometheus", "json", "statsd"] = "prometheus" + retention_hours: int = 24 + enable_histogram: bool = True + labels: dict[str, str] = pydantic.Field(default_factory=dict) __metadata__ = { "name": "metrics_plugin", "version": "0.5.0", "description": "Simulates a metrics collection service", "wire": True, + "config": MetricsConfig } -_metrics: dict = {} -_collecting = False -_start_time = None - - @inject -async def initialize(logger=get_logger()): - global _collecting, _start_time, _metrics +async def initialize(logger=get_logger(), config=get_config(MetricsConfig)): logger.info("Metrics plugin: Starting collector...") - _metrics = { - "requests_total": 0, - "errors_total": 0, - "latency_sum": 0.0, - } - _start_time = time.time() - _collecting = True + logger.info(f"Metrics plugin: interval={config.collection_interval}s, format={config.export_format}") + logger.info(f"Metrics plugin: retention={config.retention_hours}h, histogram={config.enable_histogram}") logger.info("Metrics plugin: Collector started") @inject async def shutdown(logger=get_logger()): - global _collecting, _start_time logger.info("Metrics plugin: Stopping collector...") - _collecting = False - _start_time = None logger.info("Metrics plugin: Collector stopped") - - -def is_collecting() -> bool: - return _collecting - - -def record_request(latency: float = 0.0, error: bool = False): - if not _collecting: - return - _metrics["requests_total"] += 1 - _metrics["latency_sum"] += latency - if error: - _metrics["errors_total"] += 1 - - -def get_metrics() -> dict: - if not _collecting: - return {} - uptime = time.time() - _start_time if _start_time else 0 - return { - **_metrics, - "uptime_seconds": uptime, - "avg_latency": (_metrics["latency_sum"] / _metrics["requests_total"]) - if _metrics["requests_total"] > 0 else 0, - } diff --git a/samples/management_dashboard/web/index.html b/samples/management_dashboard/web/index.html index e54ccbd..5b13cee 100644 --- a/samples/management_dashboard/web/index.html +++ b/samples/management_dashboard/web/index.html @@ -552,20 +552,11 @@

IOC Management Dashboard

Application State

-

Components & Plugins

-
- - - + - @@ -574,7 +565,16 @@

Components & Plugins

- + +

Application

+
+ + +

Libraries

+
+ + +

Plugins

@@ -585,9 +585,7 @@

Components & Plugins

- - -
+

Logs

@@ -656,6 +654,21 @@

Component

No state
+
+
Internals
+
+
+ Module + - +
+
+ Auto-wire + - +
+
+
+
+