Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions agent_os_kernel/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@
Usage:
python -m agent_os_kernel
python -m agent_os_kernel --help

中文说明(逻辑风险提示):
1) 该文件目前更像“demo 分发器”(根据 --demo 选择 examples 下的示例运行),
并不是完整的内核管理 CLI。
2) 如果 pyproject.toml 的 console_scripts 指向 agent_os_kernel.__main__:main,
那么用户安装后执行的入口会落到这里,而不是 agent_os_kernel/cli/main.py。
这会造成:文档/预期的 init/create/list/serve 等命令不可用或行为不一致。
3) --version 的值若与包版本(pyproject.toml / AgentOSKernel.VERSION)不一致,
会导致排障与发布版本对齐困难。
"""

import sys
Expand All @@ -17,6 +26,9 @@ def main():
parser.add_argument(
"--version",
action="version",
# 注意:这里的版本号是硬编码字符串。
# 若与 pyproject.toml 的 project.version(例如 0.2.0)或 kernel.py 的 AgentOSKernel.VERSION 不一致,
# 会产生“对外显示版本”和“实际代码版本”冲突的逻辑问题。
version="1.0.0"
)
parser.add_argument(
Expand All @@ -34,6 +46,12 @@ def main():
from examples.multi_agent_demo import main as demo_main
elif args.demo == "workflow":
from examples.complete_workflow import main as demo_main

# 注意:这里直接从 examples.* 导入 demo。
# 这意味着:
# - 运行入口依赖源码树中存在 examples 包/模块;
# - 在某些打包/安装形态下 examples 可能不会随包发布,从而出现 ImportError。
# 如需稳定的 CLI,需要明确将“demo runner”和“正式 CLI”分离并统一入口。

try:
import asyncio
Expand Down
13 changes: 13 additions & 0 deletions agent_os_kernel/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
from datetime import datetime
from datetime import timezone, timezone

# 逻辑风险提示:
# - 这里出现重复导入(timezone, timezone),属于明显的代码瑕疵;通常不会影响运行,但会影响静态检查与可维护性。
# - 本文件实现的 CLI 与 agent_os_kernel/__main__.py 的“demo runner”入口存在职责冲突:
# 如果对外 console_scripts 绑定到 __main__.py,这里这些命令实际不会被用户用到。


class CLI:
"""命令行接口"""
Expand Down Expand Up @@ -137,6 +142,9 @@ def _cmd_list(self, args):
"""列出 Agent"""
from ..core import AgentOSKernel
kernel = AgentOSKernel()
# 逻辑风险提示:这里调用了 kernel.list_agents()。
# 但在当前主内核实现(agent_os_kernel/kernel.py)中未看到该方法定义,
# 如果两者未对齐,运行 list 命令会直接抛 AttributeError。
agents = kernel.list_agents()
print(f"{'ID':<8} {'Name':<20} {'Status':<12}")
for a in agents:
Expand All @@ -145,6 +153,9 @@ def _cmd_list(self, args):

def _cmd_delete(self, args):
"""删除 Agent"""
# 逻辑风险提示:当前 delete/demo/serve/status 多数为占位实现(仅 print),
# 并未真正调用内核/存储层做状态变更。
# 如果文档对外宣称这些命令可用,用户会遇到“看似成功但什么也没发生”的逻辑落差。
print(f"删除 Agent: {args.agent_id}")
return 0

Expand All @@ -155,6 +166,8 @@ def _cmd_demo(self, args):

def _cmd_serve(self, args):
"""启动服务器"""
# 逻辑风险提示:这里未真正启动 FastAPI/uvicorn(当前仅输出),
# 与 requirements.txt 中包含 fastapi/uvicorn 的预期不一致。
print(f"启动服务器: {args.host}:{args.port}")
return 0

Expand Down
9 changes: 9 additions & 0 deletions agent_os_kernel/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@
Agent-OS-Kernel Core Module
"""

# 逻辑风险提示:
# 本文件聚合导出非常多的类型/类,且存在大量“同名符号”从不同模块重复导入的情况。
# 在 Python 中,后导入会覆盖先导入的同名变量(例如 AgentState、AgentPool、EventBus 等)。
# 这会导致:
# - from agent_os_kernel.core import AgentState 得到的到底是哪一个版本不稳定;
# - 状态机/枚举比较可能失效(同名不同类);
# - 对外公共 API 语义模糊,使用者难以排错。
# 如需对外提供稳定 API,建议对外导出时进行命名去重(或显式别名),并减少“星爆式聚合导出”。

# === agent_definition ===
from .agent_definition import (
AgentStatus,
Expand Down
10 changes: 10 additions & 0 deletions agent_os_kernel/core/config_hot_reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@
from watchdog.events import FileSystemEventHandler, FileSystemEvent
import jsonschema

# 逻辑风险提示:
# - 本模块依赖 watchdog/jsonschema(以及后续的 PyYAML),但它们未必在项目的基础依赖中声明。
# 如果用户仅安装最小依赖,导入该模块就可能触发 ImportError。
# - 建议:
# 1) 将其加入 requirements/pyproject optional-dependencies;或
# 2) 将 import 放入函数内部并给出明确提示,使其成为可选能力。

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -162,6 +169,7 @@ def validate_file(self, file_path: str) -> tuple[bool, Optional[str], Optional[A
import yaml
config = yaml.safe_load(content)
except ImportError:
# 逻辑风险提示:yaml 依赖缺失时返回错误字符串,但上层可能不一定处理该错误。
return False, "Neither JSON nor YAML parsing available", None
except yaml.YAMLError as e:
return False, f"YAML parsing error: {str(e)}", None
Expand Down Expand Up @@ -685,6 +693,8 @@ def _notify_callbacks(self, file_path: str, change: ConfigChange) -> None:
try:
callback(change)
except Exception as e:
# 逻辑风险提示:回调异常会被吞掉(仅日志),
# 可能导致外部系统以为已收到变更通知,但实际上 callback 内部失败。
logger.error(f"[ConfigHotReload] Callback error: {e}")

def _load_all_configs(self) -> None:
Expand Down
24 changes: 24 additions & 0 deletions agent_os_kernel/core/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,14 @@ def save(self, key: str, value: Any) -> bool:
}
self._stats.last_modify = datetime.now()
self._stats.total_keys = len(self._data)
# 逻辑风险提示:total_size_bytes 这里是累加。
# - 若同一个 key 被反复覆盖写入,旧值占用的 size 并未扣除,会导致统计不断膨胀。
# - 如果该统计用于“资源配额/监控告警”,会产生误报。
self._stats.total_size_bytes += size
return True
except Exception:
# 逻辑风险提示:这里吞掉所有异常并返回 False,会让上层只能看到“失败”但不知道原因。
# 对关键路径(持久化/审计/检查点)建议至少记录日志或抛出更明确异常。
return False

def retrieve(self, key: str) -> Optional[Any]:
Expand Down Expand Up @@ -179,6 +184,8 @@ def save(self, key: str, value: Any) -> bool:
with open(path, 'w', encoding='utf-8') as f:
json.dump(value, f, ensure_ascii=False, indent=2)
self._stats.last_modify = datetime.now()
# 逻辑风险提示:total_keys 这里简单 +1。
# - 若 key 对应文件已存在(覆盖写),该统计会偏大。
self._stats.total_keys += 1
return True
except Exception:
Expand All @@ -196,6 +203,8 @@ def retrieve(self, key: str) -> Optional[Any]:
self._stats.miss_count += 1
return None
except Exception:
# 逻辑风险提示:这里返回 None 既可能表示“没找到 key”,也可能表示“数据库异常”。
# 会让上层逻辑无法区分,导致错误处理分支混乱。
return None

def delete(self, key: str) -> bool:
Expand Down Expand Up @@ -227,6 +236,9 @@ def list_keys(self, prefix: str = "") -> List[str]:
full_hash = os.path.basename(root) + key_hash
try:
# 这里简化处理,实际应该维护映射
# 逻辑风险提示:由于文件名使用 hash,且没有维护 hash<->原始 key 的映射,
# 这里无法恢复真实 key,只能返回“拼出来的 hash”。
# 这会导致 list_keys() 的接口语义基本不成立(调用方拿到 key 也无法 retrieve)。
keys.append(full_hash)
except Exception:
pass
Expand Down Expand Up @@ -280,6 +292,8 @@ def _connect(self):
)
self._init_schema()
except ImportError:
# 逻辑风险提示:psycopg2 缺失时直接将 _pool 置空并静默降级。
# 上层若以为已经启用 PostgreSQL 后端,会出现“看似正常但实际上没持久化”的风险。
self._pool = None

def _init_schema(self):
Expand Down Expand Up @@ -358,6 +372,9 @@ def save(self, key: str, value: Any) -> bool:
self._pool.putconn(conn)
return True
except Exception:
# 逻辑风险提示:这里吞掉所有异常并返回 False。
# - 上层无法区分“写入失败原因”(连接断开/权限/序列化失败/SQL 错误)。
# - 如果该写入承担检查点/审计等关键职责,静默失败会造成“看似运行但无法恢复/无法追溯”。
return False

def retrieve(self, key: str) -> Optional[Any]:
Expand All @@ -377,6 +394,8 @@ def retrieve(self, key: str) -> Optional[Any]:
return json.loads(row[0])
return None
except Exception:
# 逻辑风险提示:返回 None 既可能表示“key 不存在”,也可能是“数据库异常”。
# 如果上层用 exists/retrieve 组合实现逻辑判断,可能会误判并进入错误分支。
return None

def delete(self, key: str) -> bool:
Expand Down Expand Up @@ -468,6 +487,8 @@ def save_checkpoint(self, checkpoint_data: dict) -> bool:
self._pool.putconn(conn)
return True
except Exception:
# 逻辑风险提示:检查点保存失败会直接返回 False;
# 若上层未显式处理该 False,可能导致“已创建检查点”的假象,最终无法恢复。
return False

def save_audit_log(self, log_data: dict) -> bool:
Expand All @@ -493,6 +514,7 @@ def save_audit_log(self, log_data: dict) -> bool:
self._pool.putconn(conn)
return True
except Exception:
# 逻辑风险提示:审计日志写入失败被静默吞掉,会让“可观测性/审计”形同虚设。
return False

def save_vector(self, key: str, content: str, embedding: bytes, metadata: dict = None) -> bool:
Expand All @@ -510,6 +532,8 @@ def save_vector(self, key: str, content: str, embedding: bytes, metadata: dict =
self._pool.putconn(conn)
return True
except Exception:
# 逻辑风险提示:向量写入失败同样被吞掉。
# 如果上层依赖语义检索作为关键能力,需确保失败可被观测并触发降级策略。
return False

def search_vectors(self, query_embedding: bytes, limit: int = 10) -> List[dict]:
Expand Down
19 changes: 19 additions & 0 deletions agent_os_kernel/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,20 @@ def __init__(self,
logger.info("Initializing five subsystems...")

# 1. 存储层(必须先初始化,供其他子系统使用)
# 逻辑风险提示:
# - storage_backend 允许为 Optional[StorageBackend],但 StorageManager.__init__ 更像期待一个枚举值。
# 这里传 None 时虽然可能会走到 MemoryStorage 兜底,但 self.storage._backend 会变成 None,
# 进而影响后续诸如 “if self._backend == StorageBackend.POSTGRESQL” 的逻辑判断语义。
self.storage = StorageManager(storage_backend)
logger.info("[1/5] Storage Layer ready (PostgreSQL Five Roles)")

# 2. 上下文管理器(虚拟内存)
self.context_manager = ContextManager(
max_context_tokens=max_context_tokens,
# 逻辑风险提示:
# - ContextManager 的 storage_backend 参数名容易让人误解:
# 它到底应该是 StorageManager 实例/接口,还是 StorageBackend 枚举?
# - 这里传入 self.storage._backend(枚举/None)可能导致后续 swap in/out 时调用存储接口失败。
storage_backend=self.storage._backend
)
logger.info("[2/5] Context Manager ready (Virtual Memory)")
Expand Down Expand Up @@ -297,6 +305,9 @@ def create_checkpoint(self, agent_pid: str,
if page:
context_pages.append(page.to_dict())
# 将页面写回存储
# 逻辑风险提示:这里调用 self.storage.save_context_page(page)。
# 但在 core/storage.py 的 StorageManager 中未必实现了该方法(可能存在接口不一致/多版本代码混用)。
# 若缺失会在 checkpoint 过程中直接抛 AttributeError,导致“优雅退出/崩溃恢复”承诺落空。
self.storage.save_context_page(page)

logger.info("✓ Created checkpoint %s... for agent %s... (%d pages)",
Expand All @@ -317,6 +328,8 @@ def restore_checkpoint(self, checkpoint_id: str) -> Optional[str]:
新的 Agent PID
"""
# 1. 加载检查点
# 逻辑风险提示:这里依赖 self.storage.load_checkpoint(checkpoint_id)。
# 若 StorageManager 未实现该方法或其行为与 PostgreSQLStorage.save_checkpoint 不对齐,会导致恢复失败。
checkpoint = self.storage.load_checkpoint(checkpoint_id)
if not checkpoint:
logger.error("Checkpoint %s... not found", checkpoint_id[:8])
Expand Down Expand Up @@ -368,6 +381,9 @@ def execute_agent_step(self, process: AgentProcess) -> Dict[str, Any]:
)

# 3. 检查资源配额
# 逻辑风险提示:tokens_needed 使用 split()*2 的方式粗略估计。
# - 这会让配额管理与真实模型 token 计费差异很大(中文/代码/工具 schema 尤其明显)。
# - 如果将来要做“可观测性/成本控制”,需要统一 token 计数策略(如 tiktoken 等)。
tokens_needed = len(context.split()) * 2 # 粗略估计
if not self.scheduler.request_resources(process.pid, tokens_needed):
return {'success': False, 'error': 'Resource quota exceeded', 'done': False}
Expand All @@ -380,6 +396,9 @@ def execute_agent_step(self, process: AgentProcess) -> Dict[str, Any]:
reasoning = f"Processing task: {process.context.get('task', 'unknown')}"

# 6. 记录审计日志(可观测性)
# 逻辑风险提示:这里调用 self.storage.log_action(...)。
# 但 core/storage.py 中展示的接口是 log_audit / save_audit_log 等,
# log_action 是否存在取决于 StorageManager 的实际实现(同样存在接口不一致风险)。
self.storage.log_action(
agent_pid=process.pid,
action_type="reasoning",
Expand Down