diff --git a/agent_os_kernel/__main__.py b/agent_os_kernel/__main__.py index 6b7356c..a8d4390 100644 --- a/agent_os_kernel/__main__.py +++ b/agent_os_kernel/__main__.py @@ -4,6 +4,15 @@ Usage: python -m agent_os_kernel python -m agent_os_kernel --help + +中文说明(逻辑风险提示): +1) 该文件目前更像“demo 分发器”(根据 --demo 选择 examples 下的示例运行), + 并不是完整的内核管理 CLI。 +2) 如果 pyproject.toml 的 console_scripts 指向 agent_os_kernel.__main__:main, + 那么用户安装后执行的入口会落到这里,而不是 agent_os_kernel/cli/main.py。 + 这会造成:文档/预期的 init/create/list/serve 等命令不可用或行为不一致。 +3) --version 的值若与包版本(pyproject.toml / AgentOSKernel.VERSION)不一致, + 会导致排障与发布版本对齐困难。 """ import sys @@ -17,6 +26,9 @@ def main(): parser.add_argument( "--version", action="version", + # 注意:这里的版本号是硬编码字符串。 + # 若与 pyproject.toml 的 project.version(例如 0.2.0)或 kernel.py 的 AgentOSKernel.VERSION 不一致, + # 会产生“对外显示版本”和“实际代码版本”冲突的逻辑问题。 version="1.0.0" ) parser.add_argument( @@ -34,6 +46,12 @@ def main(): from examples.multi_agent_demo import main as demo_main elif args.demo == "workflow": from examples.complete_workflow import main as demo_main + + # 注意:这里直接从 examples.* 导入 demo。 + # 这意味着: + # - 运行入口依赖源码树中存在 examples 包/模块; + # - 在某些打包/安装形态下 examples 可能不会随包发布,从而出现 ImportError。 + # 如需稳定的 CLI,需要明确将“demo runner”和“正式 CLI”分离并统一入口。 try: import asyncio diff --git a/agent_os_kernel/cli/main.py b/agent_os_kernel/cli/main.py index 0757b7e..3be84a2 100644 --- a/agent_os_kernel/cli/main.py +++ b/agent_os_kernel/cli/main.py @@ -16,6 +16,11 @@ from datetime import datetime from datetime import timezone, timezone +# 逻辑风险提示: +# - 这里出现重复导入(timezone, timezone),属于明显的代码瑕疵;通常不会影响运行,但会影响静态检查与可维护性。 +# - 本文件实现的 CLI 与 agent_os_kernel/__main__.py 的“demo runner”入口存在职责冲突: +# 如果对外 console_scripts 绑定到 __main__.py,这里这些命令实际不会被用户用到。 + class CLI: """命令行接口""" @@ -137,6 +142,9 @@ def _cmd_list(self, args): """列出 Agent""" from ..core import AgentOSKernel kernel = AgentOSKernel() + # 逻辑风险提示:这里调用了 kernel.list_agents()。 + # 但在当前主内核实现(agent_os_kernel/kernel.py)中未看到该方法定义, + # 如果两者未对齐,运行 list 命令会直接抛 AttributeError。 agents = kernel.list_agents() print(f"{'ID':<8} {'Name':<20} {'Status':<12}") for a in agents: @@ -145,6 +153,9 @@ def _cmd_list(self, args): def _cmd_delete(self, args): """删除 Agent""" + # 逻辑风险提示:当前 delete/demo/serve/status 多数为占位实现(仅 print), + # 并未真正调用内核/存储层做状态变更。 + # 如果文档对外宣称这些命令可用,用户会遇到“看似成功但什么也没发生”的逻辑落差。 print(f"删除 Agent: {args.agent_id}") return 0 @@ -155,6 +166,8 @@ def _cmd_demo(self, args): def _cmd_serve(self, args): """启动服务器""" + # 逻辑风险提示:这里未真正启动 FastAPI/uvicorn(当前仅输出), + # 与 requirements.txt 中包含 fastapi/uvicorn 的预期不一致。 print(f"启动服务器: {args.host}:{args.port}") return 0 diff --git a/agent_os_kernel/core/__init__.py b/agent_os_kernel/core/__init__.py index c63f7f4..e624f94 100644 --- a/agent_os_kernel/core/__init__.py +++ b/agent_os_kernel/core/__init__.py @@ -3,6 +3,15 @@ Agent-OS-Kernel Core Module """ +# 逻辑风险提示: +# 本文件聚合导出非常多的类型/类,且存在大量“同名符号”从不同模块重复导入的情况。 +# 在 Python 中,后导入会覆盖先导入的同名变量(例如 AgentState、AgentPool、EventBus 等)。 +# 这会导致: +# - from agent_os_kernel.core import AgentState 得到的到底是哪一个版本不稳定; +# - 状态机/枚举比较可能失效(同名不同类); +# - 对外公共 API 语义模糊,使用者难以排错。 +# 如需对外提供稳定 API,建议对外导出时进行命名去重(或显式别名),并减少“星爆式聚合导出”。 + # === agent_definition === from .agent_definition import ( AgentStatus, diff --git a/agent_os_kernel/core/config_hot_reload.py b/agent_os_kernel/core/config_hot_reload.py index 2a42265..66c97f6 100644 --- a/agent_os_kernel/core/config_hot_reload.py +++ b/agent_os_kernel/core/config_hot_reload.py @@ -28,6 +28,13 @@ from watchdog.events import FileSystemEventHandler, FileSystemEvent import jsonschema +# 逻辑风险提示: +# - 本模块依赖 watchdog/jsonschema(以及后续的 PyYAML),但它们未必在项目的基础依赖中声明。 +# 如果用户仅安装最小依赖,导入该模块就可能触发 ImportError。 +# - 建议: +# 1) 将其加入 requirements/pyproject optional-dependencies;或 +# 2) 将 import 放入函数内部并给出明确提示,使其成为可选能力。 + # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -162,6 +169,7 @@ def validate_file(self, file_path: str) -> tuple[bool, Optional[str], Optional[A import yaml config = yaml.safe_load(content) except ImportError: + # 逻辑风险提示:yaml 依赖缺失时返回错误字符串,但上层可能不一定处理该错误。 return False, "Neither JSON nor YAML parsing available", None except yaml.YAMLError as e: return False, f"YAML parsing error: {str(e)}", None @@ -685,6 +693,8 @@ def _notify_callbacks(self, file_path: str, change: ConfigChange) -> None: try: callback(change) except Exception as e: + # 逻辑风险提示:回调异常会被吞掉(仅日志), + # 可能导致外部系统以为已收到变更通知,但实际上 callback 内部失败。 logger.error(f"[ConfigHotReload] Callback error: {e}") def _load_all_configs(self) -> None: diff --git a/agent_os_kernel/core/storage.py b/agent_os_kernel/core/storage.py index 326cc8f..818b76f 100644 --- a/agent_os_kernel/core/storage.py +++ b/agent_os_kernel/core/storage.py @@ -94,9 +94,14 @@ def save(self, key: str, value: Any) -> bool: } self._stats.last_modify = datetime.now() self._stats.total_keys = len(self._data) + # 逻辑风险提示:total_size_bytes 这里是累加。 + # - 若同一个 key 被反复覆盖写入,旧值占用的 size 并未扣除,会导致统计不断膨胀。 + # - 如果该统计用于“资源配额/监控告警”,会产生误报。 self._stats.total_size_bytes += size return True except Exception: + # 逻辑风险提示:这里吞掉所有异常并返回 False,会让上层只能看到“失败”但不知道原因。 + # 对关键路径(持久化/审计/检查点)建议至少记录日志或抛出更明确异常。 return False def retrieve(self, key: str) -> Optional[Any]: @@ -179,6 +184,8 @@ def save(self, key: str, value: Any) -> bool: with open(path, 'w', encoding='utf-8') as f: json.dump(value, f, ensure_ascii=False, indent=2) self._stats.last_modify = datetime.now() + # 逻辑风险提示:total_keys 这里简单 +1。 + # - 若 key 对应文件已存在(覆盖写),该统计会偏大。 self._stats.total_keys += 1 return True except Exception: @@ -196,6 +203,8 @@ def retrieve(self, key: str) -> Optional[Any]: self._stats.miss_count += 1 return None except Exception: + # 逻辑风险提示:这里返回 None 既可能表示“没找到 key”,也可能表示“数据库异常”。 + # 会让上层逻辑无法区分,导致错误处理分支混乱。 return None def delete(self, key: str) -> bool: @@ -227,6 +236,9 @@ def list_keys(self, prefix: str = "") -> List[str]: full_hash = os.path.basename(root) + key_hash try: # 这里简化处理,实际应该维护映射 + # 逻辑风险提示:由于文件名使用 hash,且没有维护 hash<->原始 key 的映射, + # 这里无法恢复真实 key,只能返回“拼出来的 hash”。 + # 这会导致 list_keys() 的接口语义基本不成立(调用方拿到 key 也无法 retrieve)。 keys.append(full_hash) except Exception: pass @@ -280,6 +292,8 @@ def _connect(self): ) self._init_schema() except ImportError: + # 逻辑风险提示:psycopg2 缺失时直接将 _pool 置空并静默降级。 + # 上层若以为已经启用 PostgreSQL 后端,会出现“看似正常但实际上没持久化”的风险。 self._pool = None def _init_schema(self): @@ -358,6 +372,9 @@ def save(self, key: str, value: Any) -> bool: self._pool.putconn(conn) return True except Exception: + # 逻辑风险提示:这里吞掉所有异常并返回 False。 + # - 上层无法区分“写入失败原因”(连接断开/权限/序列化失败/SQL 错误)。 + # - 如果该写入承担检查点/审计等关键职责,静默失败会造成“看似运行但无法恢复/无法追溯”。 return False def retrieve(self, key: str) -> Optional[Any]: @@ -377,6 +394,8 @@ def retrieve(self, key: str) -> Optional[Any]: return json.loads(row[0]) return None except Exception: + # 逻辑风险提示:返回 None 既可能表示“key 不存在”,也可能是“数据库异常”。 + # 如果上层用 exists/retrieve 组合实现逻辑判断,可能会误判并进入错误分支。 return None def delete(self, key: str) -> bool: @@ -468,6 +487,8 @@ def save_checkpoint(self, checkpoint_data: dict) -> bool: self._pool.putconn(conn) return True except Exception: + # 逻辑风险提示:检查点保存失败会直接返回 False; + # 若上层未显式处理该 False,可能导致“已创建检查点”的假象,最终无法恢复。 return False def save_audit_log(self, log_data: dict) -> bool: @@ -493,6 +514,7 @@ def save_audit_log(self, log_data: dict) -> bool: self._pool.putconn(conn) return True except Exception: + # 逻辑风险提示:审计日志写入失败被静默吞掉,会让“可观测性/审计”形同虚设。 return False def save_vector(self, key: str, content: str, embedding: bytes, metadata: dict = None) -> bool: @@ -510,6 +532,8 @@ def save_vector(self, key: str, content: str, embedding: bytes, metadata: dict = self._pool.putconn(conn) return True except Exception: + # 逻辑风险提示:向量写入失败同样被吞掉。 + # 如果上层依赖语义检索作为关键能力,需确保失败可被观测并触发降级策略。 return False def search_vectors(self, query_embedding: bytes, limit: int = 10) -> List[dict]: diff --git a/agent_os_kernel/kernel.py b/agent_os_kernel/kernel.py index f742b4d..681ee2a 100644 --- a/agent_os_kernel/kernel.py +++ b/agent_os_kernel/kernel.py @@ -118,12 +118,20 @@ def __init__(self, logger.info("Initializing five subsystems...") # 1. 存储层(必须先初始化,供其他子系统使用) + # 逻辑风险提示: + # - storage_backend 允许为 Optional[StorageBackend],但 StorageManager.__init__ 更像期待一个枚举值。 + # 这里传 None 时虽然可能会走到 MemoryStorage 兜底,但 self.storage._backend 会变成 None, + # 进而影响后续诸如 “if self._backend == StorageBackend.POSTGRESQL” 的逻辑判断语义。 self.storage = StorageManager(storage_backend) logger.info("[1/5] Storage Layer ready (PostgreSQL Five Roles)") # 2. 上下文管理器(虚拟内存) self.context_manager = ContextManager( max_context_tokens=max_context_tokens, + # 逻辑风险提示: + # - ContextManager 的 storage_backend 参数名容易让人误解: + # 它到底应该是 StorageManager 实例/接口,还是 StorageBackend 枚举? + # - 这里传入 self.storage._backend(枚举/None)可能导致后续 swap in/out 时调用存储接口失败。 storage_backend=self.storage._backend ) logger.info("[2/5] Context Manager ready (Virtual Memory)") @@ -297,6 +305,9 @@ def create_checkpoint(self, agent_pid: str, if page: context_pages.append(page.to_dict()) # 将页面写回存储 + # 逻辑风险提示:这里调用 self.storage.save_context_page(page)。 + # 但在 core/storage.py 的 StorageManager 中未必实现了该方法(可能存在接口不一致/多版本代码混用)。 + # 若缺失会在 checkpoint 过程中直接抛 AttributeError,导致“优雅退出/崩溃恢复”承诺落空。 self.storage.save_context_page(page) logger.info("✓ Created checkpoint %s... for agent %s... (%d pages)", @@ -317,6 +328,8 @@ def restore_checkpoint(self, checkpoint_id: str) -> Optional[str]: 新的 Agent PID """ # 1. 加载检查点 + # 逻辑风险提示:这里依赖 self.storage.load_checkpoint(checkpoint_id)。 + # 若 StorageManager 未实现该方法或其行为与 PostgreSQLStorage.save_checkpoint 不对齐,会导致恢复失败。 checkpoint = self.storage.load_checkpoint(checkpoint_id) if not checkpoint: logger.error("Checkpoint %s... not found", checkpoint_id[:8]) @@ -368,6 +381,9 @@ def execute_agent_step(self, process: AgentProcess) -> Dict[str, Any]: ) # 3. 检查资源配额 + # 逻辑风险提示:tokens_needed 使用 split()*2 的方式粗略估计。 + # - 这会让配额管理与真实模型 token 计费差异很大(中文/代码/工具 schema 尤其明显)。 + # - 如果将来要做“可观测性/成本控制”,需要统一 token 计数策略(如 tiktoken 等)。 tokens_needed = len(context.split()) * 2 # 粗略估计 if not self.scheduler.request_resources(process.pid, tokens_needed): return {'success': False, 'error': 'Resource quota exceeded', 'done': False} @@ -380,6 +396,9 @@ def execute_agent_step(self, process: AgentProcess) -> Dict[str, Any]: reasoning = f"Processing task: {process.context.get('task', 'unknown')}" # 6. 记录审计日志(可观测性) + # 逻辑风险提示:这里调用 self.storage.log_action(...)。 + # 但 core/storage.py 中展示的接口是 log_audit / save_audit_log 等, + # log_action 是否存在取决于 StorageManager 的实际实现(同样存在接口不一致风险)。 self.storage.log_action( agent_pid=process.pid, action_type="reasoning",