diff --git a/SAAS_DEPLOYMENT.md b/SAAS_DEPLOYMENT.md new file mode 100644 index 00000000..5e4f11de --- /dev/null +++ b/SAAS_DEPLOYMENT.md @@ -0,0 +1,741 @@ +# OpenViking SaaS 部署指南 + +> **架构升级说明** +> 本次改造将 OpenViking 的存储层从全本地化升级为 SaaS 数据库模式: +> - 向量索引 & 上下文元数据:**本地 C++ VectorDB** → **PostgreSQL + pgvector** +> - 文件内容存储:**本地文件系统** → **MinIO(S3 兼容对象存储)** + +--- + +## 目录 + +1. [环境要求](#1-环境要求) +2. [架构说明](#2-架构说明) +3. [快速启动(Docker Compose)](#3-快速启动docker-compose) +4. [配置文件说明](#4-配置文件说明) +5. [验证部署](#5-验证部署) +6. [可视化面板使用指南](#6-可视化面板使用指南) +7. [REST API 参考](#7-rest-api-参考) +8. [本地开发模式(不使用 Docker)](#8-本地开发模式不使用-docker) +9. [生产环境注意事项](#9-生产环境注意事项) +10. [故障排查](#10-故障排查) + +--- + +## 1. 环境要求 + +| 依赖 | 最低版本 | 说明 | +|------|----------|------| +| Docker | 20.10+ | 容器运行时 | +| Docker Compose | 2.0+ (`docker compose`) | 编排工具 | +| 可用内存 | 2 GB | PostgreSQL + MinIO + OpenViking | +| 可用磁盘 | 5 GB | 数据卷存储 | +| Embedding API | — | OpenAI 或兼容接口(必须有效的 API Key) | + +--- + +## 2. 架构说明 + +``` +┌─────────────────────────────────────────────────────┐ +│ OpenViking Server │ +│ http://localhost:1933 │ +│ │ +│ ┌──────────────┐ ┌─────────────────────────────┐ │ +│ │ AGFS (Go) │ │ VikingFS / Service │ │ +│ │ 文件系统抽象 │ │ 上下文管理 / 搜索 │ │ +│ └──────┬───────┘ └──────────┬──────────────────┘ │ +│ │ │ │ +└─────────┼───────────────────────┼────────────────────┘ + │ S3 API │ asyncpg + ▼ ▼ +┌─────────────────┐ ┌───────────────────────────────┐ +│ MinIO │ │ PostgreSQL 16 + pgvector │ +│ (对象存储) │ │ (向量索引 + 上下文元数据) │ +│ :9000 (API) │ │ :5432 │ +│ :9001 (控制台) │ │ 表: ov_context │ +└─────────────────┘ └───────────────────────────────┘ +``` + +**数据流向:** +- 上传文件/资源 → AGFS → 写入 MinIO 对象存储 +- 生成向量嵌入 → 写入 PostgreSQL `ov_context` 表(带 pgvector) +- 语义搜索 → PostgreSQL 余弦相似度查询(`<=>` 算子) +- 元数据过滤 → PostgreSQL SQL WHERE 条件 + +--- + +## 3. 快速启动 + +> **注意**:`ghcr.io/volcengine/openviking:main` 为私有镜像,无法直接 `docker pull`。 +> 请根据你的场景选择以下两种方式之一。 + +--- + +### 方式 A:全 Docker 部署(推荐,首次构建较慢) + +**适用场景**:生产/测试环境,一键启动全部服务。 + +**前置条件**:Docker Desktop(Windows/macOS)或 Docker Engine + Docker Compose(Linux) + +#### 第一步:克隆代码 + +```bash +git clone +cd OpenViking +``` + +#### 第二步:创建并编辑配置文件 + +```bash +# Windows PowerShell +copy ov.conf.saas.example ov.conf.saas +notepad ov.conf.saas + +# macOS / Linux +cp ov.conf.saas.example ov.conf.saas +nano ov.conf.saas +``` + +**必须修改的字段**(填入你的 API Key): + +```json +{ + "embedding": { + "dense": { + "api_key": "sk-xxxxxxxxxxxxxxxx" + } + }, + "vlm": { + "api_key": "sk-xxxxxxxxxxxxxxxx" + } +} +``` + +> **使用国内代理?** 一并修改 `api_base`: +> ```json +> "api_base": "https://api.your-proxy.com/v1" +> ``` + +#### 第三步:构建并启动(`--build` 触发本地构建) + +```bash +docker compose -f docker-compose.saas.yml up -d --build +``` + +**首次构建说明:** +- 需要下载 Go 工具链 + Python 环境 + 编译 C++ 扩展 +- 耗时约 **10~30 分钟**(取决于网络和机器性能) +- 后续启动无需重新构建,直接 `docker compose -f docker-compose.saas.yml up -d` + +查看构建/启动日志: + +```bash +docker compose -f docker-compose.saas.yml logs -f +``` + +#### 第四步:验证服务状态 + +```bash +docker compose -f docker-compose.saas.yml ps +``` + +期望输出(所有服务 `healthy` 或 `running`): + +``` +NAME STATUS PORTS +openviking-postgres healthy 0.0.0.0:5432->5432/tcp +openviking-minio healthy 0.0.0.0:9000-9001->9000-9001/tcp +openviking-minio-init exited (0) — +openviking-server healthy 0.0.0.0:1933->1933/tcp +``` + +--- + +### 方式 B:仅 Docker 启动基础设施 + 宿主机运行服务器(推荐开发调试) + +**适用场景**:本地开发、快速调试,跳过镜像构建等待。 + +**需要**:Python 3.10+、pip + +#### 第一步:启动基础设施(PostgreSQL + MinIO) + +```bash +docker compose -f docker-compose.infra.yml up -d +``` + +约 20 秒即可就绪。 + +#### 第二步:安装 OpenViking + +> **Windows 用户注意**:直接运行 `pip install -e .` 会触发 C++ 扩展编译, +> 需要 MinGW,而 SaaS 模式根本不需要这个扩展。 +> 使用下方环境变量跳过编译即可。 + +**Windows PowerShell(推荐):** +```powershell +$env:OPENVIKING_NO_CPP_EXT = "1" +pip install -e . +``` + +**macOS / Linux:** +```bash +OPENVIKING_NO_CPP_EXT=1 pip install -e . +``` + +`OPENVIKING_NO_CPP_EXT=1` 的作用:跳过本地 C++ VectorDB 扩展编译。 +SaaS 模式使用 PostgreSQL 作为向量存储,该扩展完全不参与运行,跳过编译不影响任何功能。 + +#### 第三步:创建本地配置文件 + +```bash +# Windows PowerShell +copy ov.conf.local.example ov.conf.local +notepad ov.conf.local + +# macOS / Linux +cp ov.conf.local.example ov.conf.local +nano ov.conf.local +``` + +同样需要填入 `api_key`(配置文件中 host 已设为 `localhost`,无需其他修改)。 + +#### 第四步:启动 OpenViking 服务器 + +```bash +openviking-server --config ov.conf.local +``` + +服务器启动后访问:`http://localhost:1933/dashboard` + +--- + +## 4. 配置文件说明 + +完整配置结构 `ov.conf.saas`: + +```jsonc +{ + // ── 存储配置 ────────────────────────────────────── + "storage": { + "workspace": "/app/data", // 容器内工作目录(勿修改) + + // AGFS 文件系统 → MinIO S3 后端 + "agfs": { + "backend": "s3", + "port": 1833, + "s3": { + "bucket": "openviking-storage", // MinIO bucket 名称 + "region": "us-east-1", // 固定值,MinIO 不校验 + "access_key": "openviking", // MinIO 用户名 + "secret_key": "openviking_secret", // MinIO 密码 + "endpoint": "http://minio:9000", // 容器内地址(勿修改) + "use_ssl": false, + "use_path_style": true + } + }, + + // 向量数据库 → PostgreSQL + pgvector + "vectordb": { + "backend": "postgresql", // ← SaaS 模式的关键配置 + "name": "context", // 集合名称(表名 ov_context) + "dimension": 1024, // 必须与 embedding.dimension 一致 + "postgresql": { + "host": "postgres", // 容器名(Docker 内部 DNS) + "port": 5432, + "database": "openviking", + "user": "openviking", + "password": "openviking_secret" + // 也可用 DSN: "dsn": "postgresql://openviking:secret@postgres:5432/openviking" + } + } + }, + + // ── Embedding 模型配置 ──────────────────────────── + "embedding": { + "dense": { + "provider": "openai", // 支持 openai / custom + "api_base": "https://api.openai.com/v1", + "api_key": "YOUR_OPENAI_API_KEY", // ← 必须修改 + "model": "text-embedding-3-small", + "dimension": 1024 // 必须与 vectordb.dimension 一致 + } + }, + + // ── VLM(视觉语言模型,用于图片/PDF 理解)────────── + "vlm": { + "provider": "openai", + "api_base": "https://api.openai.com/v1", + "api_key": "YOUR_OPENAI_API_KEY", // ← 必须修改 + "model": "gpt-4o" + }, + + // ── HTTP 服务配置 ───────────────────────────────── + "server": { + "host": "0.0.0.0", + "port": 1933, + "cors_origins": ["*"] + }, + + // ── 多租户标识 ──────────────────────────────────── + "default_account": "default", + "default_user": "default", + "default_agent": "default", + + // ── 日志级别 ────────────────────────────────────── + "log": { "level": "INFO" } +} +``` + +--- + +## 5. 验证部署 + +### 方式一:健康检查接口 + +```bash +curl http://localhost:1933/health +# 期望返回:{"status":"ok"} +``` + +### 方式二:系统状态接口 + +```bash +curl http://localhost:1933/api/v1/system/status +# 期望返回:{"status":"ok","result":{"initialized":true,...}} +``` + +### 方式三:存储后端统计 + +```bash +curl http://localhost:1933/api/v1/debug/storage/stats +# 期望返回包含 "backend": "postgresql" +``` + +示例返回: +```json +{ + "status": "ok", + "result": { + "backend": "postgresql", + "collections": 1, + "total_records": 0, + "storage_size": 8388608 + } +} +``` + +### 方式四:访问 MinIO 控制台 + +浏览器打开:`http://localhost:9001` + +- 用户名:`openviking` +- 密码:`openviking_secret` + +确认 `openviking-storage` bucket 已创建。 + +--- + +## 6. 可视化面板使用指南 + +### 打开面板 + +浏览器访问:**`http://localhost:1933/dashboard`** + +### 面板功能说明 + +#### Overview 标签(总览) + +![overview](docs/images/dashboard-overview.png) + +- **后端类型徽章**:页面右上角显示 `POSTGRESQL`(SaaS 模式) +- **统计卡片**:后端类型 / 集合数 / 总上下文数 / 存储大小 +- **最近上下文**:列出最新写入的上下文记录(URI、类型、摘要) +- **快速操作**:刷新统计 / 跳转添加资源 / 跳转搜索 + +#### Context Browser 标签(上下文浏览器) + +1. 在输入框中输入 URI,例如:`viking://resources/` +2. 点击 **List** 按钮,列出该目录下的文件和子目录 +3. 点击目录的 **Open** 按钮可以进入子目录 +4. 点击文件的 **View** 按钮查看文件内容和摘要 + +常用 URI 路径: +| URI | 说明 | +|-----|------| +| `viking://` | 根目录 | +| `viking://resources/` | 资源根目录 | +| `viking://agent/memories/` | 记忆存储 | +| `viking://agent/skills/` | 技能存储 | + +#### Search 标签(语义搜索) + +1. 在查询框中输入自然语言查询 +2. (可选)在 Target URI 框中限定搜索范围,例如 `viking://resources/` +3. 调整返回数量(默认 10) +4. 点击 **Search**,结果按相似度评分排序显示 +5. 点击结果的 **View** 按钮跳转到浏览器查看详情 + +#### Add Resource 标签(添加资源) + +**添加外部资源/URL:** +1. 在 Resource Path / URL 框中输入路径 +2. (可选)设置目标 URI(默认 `viking://resources/`) +3. 点击 **Add Resource**,系统自动解析并向量化 + +**添加文本内容:** +1. 在 URI 框中输入写入位置,例如 `viking://resources/my-notes.md` +2. 在 Content 框中输入文本内容 +3. 点击 **Write to Storage** + +#### Sessions 标签(会话管理) + +- 列出所有会话,显示消息数和创建时间 +- 点击 **View** 查看会话的完整消息历史 + +#### Debug 标签(API 控制台) + +可以直接调用任意 REST API 端点: + +1. 选择 HTTP 方法(GET / POST) +2. 输入端点路径,例如 `/api/v1/system/status` +3. POST 请求可以在 Body 框输入 JSON 请求体 +4. 点击 **Send Request** 查看完整响应 + +--- + +## 7. REST API 参考 + +服务器地址:`http://localhost:1933` + +### 系统 + +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/health` | 健康检查 | +| GET | `/api/v1/system/status` | 系统状态 | +| GET | `/api/v1/debug/storage/stats` | 存储后端统计 | +| GET | `/api/v1/debug/storage/list` | 列出上下文记录 | + +### 资源管理 + +| 方法 | 路径 | 说明 | +|------|------|------| +| POST | `/api/v1/resources/add` | 添加资源(URL/文件) | +| DELETE | `/api/v1/resources/remove` | 删除资源 | +| GET | `/api/v1/resources/list` | 列出资源 | + +**添加资源示例:** +```bash +curl -X POST http://localhost:1933/api/v1/resources/add \ + -H "Content-Type: application/json" \ + -d '{ + "path": "https://en.wikipedia.org/wiki/Artificial_intelligence", + "target_uri": "viking://resources/" + }' +``` + +### 文件系统 + +| 方法 | 路径 | 说明 | +|------|------|------| +| POST | `/api/v1/fs/ls` | 列出目录内容 | +| POST | `/api/v1/fs/read` | 读取文件 | +| POST | `/api/v1/fs/write` | 写入文件 | +| POST | `/api/v1/fs/mkdir` | 创建目录 | +| DELETE | `/api/v1/fs/rm` | 删除文件/目录 | + +**列出目录示例:** +```bash +curl -X POST http://localhost:1933/api/v1/fs/ls \ + -H "Content-Type: application/json" \ + -d '{"uri": "viking://resources/", "recursive": false}' +``` + +### 语义搜索 + +| 方法 | 路径 | 说明 | +|------|------|------| +| POST | `/api/v1/search/find` | 语义搜索(无 session) | +| POST | `/api/v1/search/search` | 语义搜索(带 session 上下文) | + +**搜索示例:** +```bash +curl -X POST http://localhost:1933/api/v1/search/find \ + -H "Content-Type: application/json" \ + -d '{ + "query": "机器学习算法", + "target_uri": "viking://resources/", + "limit": 5 + }' +``` + +### 内容读取 + +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/v1/content/abstract?uri=...` | 获取摘要(L0) | +| GET | `/api/v1/content/overview?uri=...` | 获取概览(L1) | +| GET | `/api/v1/content/read?uri=...` | 读取原始内容 | + +### 会话管理 + +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/v1/sessions/list` | 列出会话 | +| POST | `/api/v1/sessions/create` | 创建会话 | +| GET | `/api/v1/sessions/messages?session_id=...` | 获取会话消息 | +| POST | `/api/v1/sessions/add_message` | 添加消息 | + +**完整 API 文档(Swagger UI):** `http://localhost:1933/docs` + +--- + +## 8. 本地开发模式(不使用 Docker) + +如果你希望在本地直接运行(不使用 Docker),需要手动准备依赖服务。 + +### 前置条件 + +**安装 PostgreSQL 16 + pgvector:** +```bash +# macOS +brew install postgresql@16 +brew install pgvector # 或从源码编译 + +# Ubuntu/Debian +sudo apt install postgresql-16 postgresql-16-pgvector +``` + +**创建数据库:** +```bash +psql -U postgres < +MINIO_ROOT_PASSWORD: <强密码> + +# ov.conf.saas 中同步修改 +"password": "<强密码>" +"secret_key": "<强密码>" +``` + +**开启 API Key 认证:** +在 `ov.conf.saas` 中添加: +```json +{ + "server": { + "root_api_key": "your-secret-api-key" + } +} +``` + +之后所有 API 请求需要携带 Header: +``` +Authorization: Bearer your-secret-api-key +``` + +**限制 CORS:** +```json +{ + "server": { + "cors_origins": ["https://your-domain.com"] + } +} +``` + +### 数据持久化 + +Docker Compose 默认使用命名 volume,数据不随容器删除而丢失: +- `postgres_data` → PostgreSQL 数据 +- `minio_data` → MinIO 对象存储数据 + +**备份 PostgreSQL:** +```bash +docker exec openviking-postgres pg_dump -U openviking openviking > backup.sql +``` + +**恢复 PostgreSQL:** +```bash +cat backup.sql | docker exec -i openviking-postgres psql -U openviking openviking +``` + +### 替换为真实 S3 + +生产环境建议使用 AWS S3 或阿里云 OSS 替代 MinIO: + +```json +{ + "storage": { + "agfs": { + "backend": "s3", + "s3": { + "bucket": "your-production-bucket", + "region": "cn-hangzhou", + "access_key": "AKIA...", + "secret_key": "...", + "endpoint": "https://oss-cn-hangzhou.aliyuncs.com", + "use_ssl": true, + "use_path_style": false + } + } + } +} +``` + +--- + +## 10. 故障排查 + +### 问题:openviking-server 启动失败 + +**查看日志:** +```bash +docker compose -f docker-compose.saas.yml logs openviking +``` + +**常见原因:** + +| 错误信息 | 原因 | 解决方法 | +|----------|------|----------| +| `connection refused` to postgres | PostgreSQL 未就绪 | 等待 10s 后重试,或检查 postgres 容器 | +| `FileNotFoundError: ov.conf` | 配置文件未挂载 | 确认 `ov.conf.saas` 文件存在于当前目录 | +| `invalid api key` | Embedding API Key 错误 | 检查 `ov.conf.saas` 中的 `api_key` | +| `asyncpg not found` | 依赖未安装 | `pip install asyncpg` | + +### 问题:向量搜索无结果 + +1. 确认 Embedding API 可访问: + ```bash + curl -X POST http://localhost:1933/api/v1/search/find \ + -H "Content-Type: application/json" \ + -d '{"query": "test"}' + ``` + +2. 查看嵌入队列是否有积压: + ```bash + curl http://localhost:1933/api/v1/debug/health + ``` + +3. 检查 `ov_context` 表中是否有带向量的记录: + ```bash + docker exec openviking-postgres psql -U openviking openviking \ + -c "SELECT id, uri, (vector IS NOT NULL) as has_vector FROM ov_context LIMIT 10;" + ``` + +### 问题:MinIO 连接失败 + +```bash +# 检查 MinIO 是否健康 +docker compose -f docker-compose.saas.yml ps minio + +# 手动测试 bucket 连接 +docker exec openviking-minio mc ls local/openviking-storage +``` + +### 停止/重启服务 + +```bash +# 停止服务(保留数据) +docker compose -f docker-compose.saas.yml stop + +# 重启服务 +docker compose -f docker-compose.saas.yml restart + +# 彻底清除(删除所有数据!) +docker compose -f docker-compose.saas.yml down -v +``` + +### 查看 PostgreSQL 中的数据 + +```bash +# 进入 PostgreSQL 命令行 +docker exec -it openviking-postgres psql -U openviking openviking + +# 常用查询 +\dt -- 列出所有表 +SELECT COUNT(*) FROM ov_context; -- 总记录数 +SELECT id, uri, context_type, level, abstract FROM ov_context LIMIT 5; +SELECT uri, 1-(vector<=>vector) AS self_sim FROM ov_context WHERE vector IS NOT NULL LIMIT 3; +\q -- 退出 +``` + +--- + +## 相关链接 + +- **可视化面板**:`http://localhost:1933/dashboard` +- **Swagger API 文档**:`http://localhost:1933/docs` +- **MinIO 控制台**:`http://localhost:9001`(用户名/密码:`openviking` / `openviking_secret`) +- **项目主页**:[GitHub](https://github.com/volcengine/OpenViking) diff --git a/agfs.config.local.yaml b/agfs.config.local.yaml new file mode 100644 index 00000000..568f1737 --- /dev/null +++ b/agfs.config.local.yaml @@ -0,0 +1,28 @@ +server: + address: ":1833" + log_level: info + +plugins: + serverinfofs: + enabled: true + path: /serverinfo + config: + version: "1.0.0" + + queuefs: + enabled: true + path: /queue + config: {} + + s3fs: + - name: local + enabled: true + path: /local + config: + region: us-east-1 + bucket: openviking-storage + access_key_id: openviking + secret_access_key: openviking_secret + endpoint: http://minio:9000 + use_path_style: true + disable_ssl: true diff --git a/docker-compose.infra.yml b/docker-compose.infra.yml new file mode 100644 index 00000000..74abb246 --- /dev/null +++ b/docker-compose.infra.yml @@ -0,0 +1,95 @@ +version: "3.8" + +# OpenViking 基础设施(PostgreSQL + MinIO + AGFS) +# +# 适用场景:本地开发、快速调试 +# OpenViking 服务器在宿主机直接运行(pip install),无需 Docker 构建 +# +# 启动命令: +# docker compose -f docker-compose.infra.yml up -d --build +# +# 启动后在宿主机运行 OpenViking: +# openviking-server --config ov.conf.local + +services: + # ─── PostgreSQL + pgvector ──────────────────────────────────────────────── + postgres: + image: pgvector/pgvector:pg16 + container_name: openviking-postgres + environment: + POSTGRES_USER: openviking + POSTGRES_PASSWORD: openviking_secret + POSTGRES_DB: openviking + volumes: + - postgres_data:/var/lib/postgresql/data + ports: + - "5432:5432" # 宿主机可直连 localhost:5432 + healthcheck: + test: ["CMD-SHELL", "pg_isready -U openviking -d openviking"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + restart: unless-stopped + + # ─── MinIO (S3-compatible object storage) ───────────────────────────────── + minio: + image: minio/minio:latest + container_name: openviking-minio + command: server /data --console-address ":9001" + environment: + MINIO_ROOT_USER: openviking + MINIO_ROOT_PASSWORD: openviking_secret + volumes: + - minio_data:/data + ports: + - "9000:9000" # S3 API(宿主机 localhost:9000) + - "9001:9001" # MinIO 控制台 + healthcheck: + test: ["CMD", "mc", "ready", "local"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 15s + restart: unless-stopped + + # ─── MinIO bucket 初始化 ────────────────────────────────────────────────── + minio-init: + image: minio/mc:latest + container_name: openviking-minio-init + depends_on: + minio: + condition: service_healthy + entrypoint: > + /bin/sh -c " + mc alias set local http://minio:9000 openviking openviking_secret && + mc mb --ignore-existing local/openviking-storage && + echo 'MinIO bucket openviking-storage ready' + " + restart: "no" + + # ─── AGFS (Agent Global File System) ───────────────────────────────────── + agfs: + build: + context: ./third_party/agfs + dockerfile: agfs-server/Dockerfile + image: openviking-agfs:local + container_name: openviking-agfs + volumes: + - ./agfs.config.local.yaml:/config.yaml:ro + ports: + - "1833:1833" # AGFS HTTP API(宿主机 localhost:1833) + depends_on: + minio: + condition: service_healthy + restart: unless-stopped + +volumes: + postgres_data: + driver: local + minio_data: + driver: local + +networks: + default: + name: openviking-infra diff --git a/docker-compose.saas.yml b/docker-compose.saas.yml new file mode 100644 index 00000000..82b118fb --- /dev/null +++ b/docker-compose.saas.yml @@ -0,0 +1,120 @@ +version: "3.8" + +# OpenViking SaaS Stack +# +# 存储层升级: +# - PostgreSQL + pgvector → 向量索引 & 上下文元数据 +# - MinIO (S3-compatible) → 文件内容存储 (AGFS S3 后端) +# +# 【重要】OpenViking 服务器需要本地构建镜像(私有镜像不可直接拉取) +# +# 启动步骤: +# 1. cp ov.conf.saas.example ov.conf.saas +# 2. 编辑 ov.conf.saas,填入 Embedding API Key +# 3. docker compose -f docker-compose.saas.yml up -d --build +# (首次构建需要 10~30 分钟,依赖网络和机器性能) +# 4. 访问 http://localhost:1933/dashboard +# +# 如果只想快速启动基础设施(PostgreSQL + MinIO),本地运行 OpenViking: +# docker compose -f docker-compose.infra.yml up -d + +services: + # ─── PostgreSQL + pgvector ──────────────────────────────────────────────── + postgres: + image: pgvector/pgvector:pg16 + container_name: openviking-postgres + environment: + POSTGRES_USER: openviking + POSTGRES_PASSWORD: openviking_secret + POSTGRES_DB: openviking + volumes: + - postgres_data:/var/lib/postgresql/data + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U openviking -d openviking"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + restart: unless-stopped + + # ─── MinIO (S3-compatible object storage for AGFS) ──────────────────────── + minio: + image: minio/minio:latest + container_name: openviking-minio + command: server /data --console-address ":9001" + environment: + MINIO_ROOT_USER: openviking + MINIO_ROOT_PASSWORD: openviking_secret + volumes: + - minio_data:/data + ports: + - "9000:9000" # S3 API + - "9001:9001" # MinIO Console (web UI) + healthcheck: + test: ["CMD", "mc", "ready", "local"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 15s + restart: unless-stopped + + # ─── MinIO bucket initialization ────────────────────────────────────────── + minio-init: + image: minio/mc:latest + container_name: openviking-minio-init + depends_on: + minio: + condition: service_healthy + entrypoint: > + /bin/sh -c " + mc alias set local http://minio:9000 openviking openviking_secret && + mc mb --ignore-existing local/openviking-storage && + echo 'MinIO bucket created successfully' + " + restart: "no" + + # ─── OpenViking Server(本地源码构建)──────────────────────────────────── + # + # 说明:因 ghcr.io/volcengine/openviking:main 为私有镜像, + # 改为 build: . 从本地 Dockerfile 构建。 + # 首次构建需下载 Go + Python 工具链并编译 C++ 扩展,耗时较长(10~30 分钟)。 + # 构建完成后再次启动无需重新构建。 + openviking: + build: + context: . + dockerfile: Dockerfile + image: openviking:saas-local # 构建后本地缓存的镜像名 + container_name: openviking-server + depends_on: + postgres: + condition: service_healthy + minio-init: + condition: service_completed_successfully + ports: + - "1933:1933" + volumes: + - ./ov.conf.saas:/app/ov.conf:ro + - openviking_temp:/app/data/temp + environment: + OPENVIKING_CONFIG_FILE: /app/ov.conf + healthcheck: + test: ["CMD-SHELL", "curl -fsS http://127.0.0.1:1933/health || exit 1"] + interval: 30s + timeout: 5s + retries: 3 + start_period: 60s + restart: unless-stopped + +volumes: + postgres_data: + driver: local + minio_data: + driver: local + openviking_temp: + driver: local + +networks: + default: + name: openviking-saas diff --git a/openviking/server/app.py b/openviking/server/app.py index 99ee15f0..54d2a685 100644 --- a/openviking/server/app.py +++ b/openviking/server/app.py @@ -17,6 +17,7 @@ from openviking.server.routers import ( admin_router, content_router, + dashboard_router, debug_router, filesystem_router, observer_router, @@ -151,5 +152,6 @@ async def general_error_handler(request: Request, exc: Exception): app.include_router(pack_router) app.include_router(debug_router) app.include_router(observer_router) + app.include_router(dashboard_router) return app diff --git a/openviking/server/routers/__init__.py b/openviking/server/routers/__init__.py index d90e6687..9e03d4db 100644 --- a/openviking/server/routers/__init__.py +++ b/openviking/server/routers/__init__.py @@ -4,6 +4,7 @@ from openviking.server.routers.admin import router as admin_router from openviking.server.routers.content import router as content_router +from openviking.server.routers.dashboard import router as dashboard_router from openviking.server.routers.debug import router as debug_router from openviking.server.routers.filesystem import router as filesystem_router from openviking.server.routers.observer import router as observer_router @@ -26,4 +27,5 @@ "pack_router", "debug_router", "observer_router", + "dashboard_router", ] diff --git a/openviking/server/routers/dashboard.py b/openviking/server/routers/dashboard.py new file mode 100644 index 00000000..972ccba8 --- /dev/null +++ b/openviking/server/routers/dashboard.py @@ -0,0 +1,772 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +""" +OpenViking SaaS Dashboard. + +Provides a single-page HTML visualization tool for testing the SaaS database +migration. The page communicates with existing OpenViking REST API endpoints. + +Access at: GET /dashboard +""" + +from fastapi import APIRouter +from fastapi.responses import HTMLResponse + +router = APIRouter(tags=["dashboard"]) + +_DASHBOARD_HTML = """ + + + + + OpenViking SaaS Dashboard + + + + +
+ + SaaS Mode +
+ + Checking... + +
+
+ + +
+ +
+
Overview
+
Context Browser
+
Search
+
Add Resource
+
Sessions
+
Debug
+
+ + +
+
+
+
+
Storage Backend
+
+
+
+
Quick Actions
+
+ + + +
+
+
+
+
Recent Contexts (latest 10)
+
+
+
+
+
+ + +
+
+
Browse by URI
+
+ + +
+
+
Enter a URI above and click List to browse.
+
+
+ +
+ + + + + +
+
+
Add Resource / URL
+
+
+ + +
+
+ + +
+
+ +
+
+
+
+
+
Add Text Content
+
+
+ + +
+
+ + +
+ +
+
+
+
+ + +
+
+
Active Sessions
+ +
+
+
+
+ +
+ + +
+
+
API Console
+
+
+ + +
+
+ + +
+ +
+
+
+
+ +
+ + + +""" + + +@router.get("/dashboard", response_class=HTMLResponse, include_in_schema=False) +async def dashboard(): + """OpenViking SaaS visualization dashboard.""" + return HTMLResponse(content=_DASHBOARD_HTML, status_code=200) diff --git a/openviking/server/routers/debug.py b/openviking/server/routers/debug.py index f2c04756..bcbce688 100644 --- a/openviking/server/routers/debug.py +++ b/openviking/server/routers/debug.py @@ -24,3 +24,44 @@ async def debug_health( service = get_service() is_healthy = service.debug.is_healthy() return Response(status="ok", result={"healthy": is_healthy}) + + +@router.get("/storage/stats") +async def storage_stats( + _ctx: RequestContext = Depends(get_request_context), +): + """Get storage backend statistics (backend type, record counts, etc.).""" + service = get_service() + vikingdb = service.vikingdb_manager + if vikingdb is None: + return Response(status="ok", result={"error": "storage not initialized"}) + stats = await vikingdb.get_stats() + return Response(status="ok", result=stats) + + +@router.get("/storage/list") +async def storage_list( + collection: str = "context", + limit: int = 10, + offset: int = 0, + _ctx: RequestContext = Depends(get_request_context), +): + """List records from a collection (for dashboard overview).""" + service = get_service() + vikingdb = service.vikingdb_manager + if vikingdb is None: + return Response(status="ok", result={"items": []}) + try: + from openviking_cli.utils.config import get_openviking_config + coll_name = get_openviking_config().storage.vectordb.name or collection + items = await vikingdb.filter( + coll_name, + filter={}, + limit=limit, + offset=offset, + order_by="created_at", + order_desc=True, + ) + return Response(status="ok", result={"items": items, "total": len(items)}) + except Exception as e: + return Response(status="ok", result={"items": [], "error": str(e)}) diff --git a/openviking/service/core.py b/openviking/service/core.py index 59d81163..cee8156b 100644 --- a/openviking/service/core.py +++ b/openviking/service/core.py @@ -118,9 +118,18 @@ def _init_storage(self, config: StorageConfig, max_concurrent_embedding: int = 1 logger.warning("AGFS URL not configured, skipping queue manager initialization") # Initialize VikingDBManager with QueueManager - self._vikingdb_manager = VikingDBManager( - vectordb_config=config.vectordb, queue_manager=self._queue_manager - ) + # Choose backend based on configuration + if config.vectordb.backend == "postgresql": + from openviking.storage.postgresql_manager import PostgreSQLManager + + self._vikingdb_manager = PostgreSQLManager( + vectordb_config=config.vectordb, queue_manager=self._queue_manager + ) + logger.info("Using PostgreSQL backend for vector storage (SaaS mode)") + else: + self._vikingdb_manager = VikingDBManager( + vectordb_config=config.vectordb, queue_manager=self._queue_manager + ) # Configure queues if QueueManager is available if self._queue_manager: diff --git a/openviking/storage/__init__.py b/openviking/storage/__init__.py index 63620722..1de0f68d 100644 --- a/openviking/storage/__init__.py +++ b/openviking/storage/__init__.py @@ -3,6 +3,8 @@ """Storage layer interfaces and implementations.""" from openviking.storage.observers import BaseObserver, QueueObserver +from openviking.storage.postgresql_backend import PostgreSQLBackend +from openviking.storage.postgresql_manager import PostgreSQLManager from openviking.storage.queuefs import QueueManager, get_queue_manager, init_queue_manager from openviking.storage.viking_fs import VikingFS, get_viking_fs, init_viking_fs from openviking.storage.viking_vector_index_backend import VikingVectorIndexBackend @@ -27,9 +29,11 @@ "DuplicateKeyError", "ConnectionError", "SchemaError", - # Backend + # Backends "VikingVectorIndexBackend", "VikingDBManager", + "PostgreSQLBackend", + "PostgreSQLManager", # QueueFS "QueueManager", "init_queue_manager", diff --git a/openviking/storage/postgresql_backend.py b/openviking/storage/postgresql_backend.py new file mode 100644 index 00000000..63dcdc61 --- /dev/null +++ b/openviking/storage/postgresql_backend.py @@ -0,0 +1,805 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +""" +PostgreSQL + pgvector storage backend for OpenViking SaaS mode. + +Implements VikingDBInterface using PostgreSQL as both metadata store +and vector similarity search engine (via pgvector extension). + +Each collection maps to a table named ov_{collection_name}. +Vectors are stored using the pgvector 'vector' column type. +""" + +import hashlib +import json +import uuid +from datetime import datetime +from typing import Any, Dict, List, Optional, Tuple + +try: + import asyncpg +except ImportError: + asyncpg = None # type: ignore + +from openviking.storage.vikingdb_interface import ( + CollectionNotFoundError, + VikingDBInterface, +) +from openviking_cli.utils import get_logger + +logger = get_logger(__name__) + +# Known scalar columns - used for safe query building +_SCALAR_COLUMNS = frozenset( + { + "id", + "uri", + "parent_uri", + "type", + "context_type", + "level", + "name", + "description", + "tags", + "abstract", + "active_count", + "created_at", + "updated_at", + } +) + +# Columns returned by default (no vector, no extra_data unpacked) +_DEFAULT_SELECT = ( + "id, uri, parent_uri, type, context_type, level, name, " + "description, tags, abstract, active_count, created_at, updated_at, " + "sparse_vector, extra_data, vector::text AS vector" +) + + +class PostgreSQLBackend(VikingDBInterface): + """ + PostgreSQL + pgvector backend implementing VikingDBInterface. + + Features: + - Stores all context metadata in PostgreSQL tables + - Uses pgvector for dense vector similarity search (cosine) + - Translates OpenViking filter DSL to SQL WHERE clauses + - Supports scroll/pagination, batch operations, and URI-tree deletion + """ + + def __init__(self, dsn: str, vector_dim: int = 1024): + """ + Initialize PostgreSQL backend. + + Args: + dsn: PostgreSQL DSN e.g. 'postgresql://user:pass@host:5432/dbname' + vector_dim: Dense vector dimension (must match embedding model) + """ + if asyncpg is None: + raise ImportError( + "asyncpg is required for PostgreSQL backend. " + "Install it with: pip install asyncpg" + ) + self._dsn = dsn + self._vector_dim = vector_dim + self._pool: Optional[Any] = None # asyncpg.Pool + + # ========================================================================= + # Internal helpers + # ========================================================================= + + async def _ensure_pool(self): + """Lazily create the connection pool and ensure pgvector extension.""" + if self._pool is None: + self._pool = await asyncpg.create_pool( + self._dsn, + min_size=2, + max_size=20, + command_timeout=60, + statement_cache_size=0, # Avoid prepared-statement issues + ) + async with self._pool.acquire() as conn: + await conn.execute("CREATE EXTENSION IF NOT EXISTS vector") + logger.info(f"PostgreSQL pool created (dim={self._vector_dim})") + return self._pool + + def _tbl(self, collection: str) -> str: + """Return sanitized table name for a collection.""" + safe = "".join(c for c in collection if c.isalnum() or c == "_") + return f"ov_{safe}" + + def _record_to_dict(self, record) -> Dict[str, Any]: + """Convert asyncpg Record to plain dict, parsing stored JSON/vector.""" + d = dict(record) + + # Parse vector from text representation '[0.1,0.2,...]' + raw_vec = d.pop("vector", None) + if raw_vec is not None and isinstance(raw_vec, str) and raw_vec.strip(): + try: + d["vector"] = [ + float(x) for x in raw_vec.strip("[]").split(",") if x.strip() + ] + except ValueError: + d["vector"] = [] + + # Merge extra_data back into top level for transparency + extra = d.pop("extra_data", None) + if extra and isinstance(extra, (dict, str)): + if isinstance(extra, str): + try: + extra = json.loads(extra) + except Exception: + extra = {} + for k, v in extra.items(): + if k not in d: + d[k] = v + + # Parse sparse_vector JSON + sv = d.get("sparse_vector") + if isinstance(sv, str): + try: + d["sparse_vector"] = json.loads(sv) + except Exception: + d["sparse_vector"] = {} + + # Remove None values + return {k: v for k, v in d.items() if v is not None} + + # ------------------------------------------------------------------------- + # Filter DSL → SQL translation + # ------------------------------------------------------------------------- + + def _filter_to_sql(self, filt: Optional[Dict[str, Any]], params: list) -> str: + """ + Translate OpenViking filter DSL to a SQL WHERE clause (without 'WHERE'). + + Filter DSL operators: + and / or : logical combination of sub-conditions + must : field IN [values] + range : field comparison (gte, gt, lte, lt) + prefix : field LIKE 'prefix%' + contains : field LIKE '%substring%' + not : NOT (inner) + """ + if not filt: + return "" + return self._translate_node(filt, params) + + def _translate_node(self, node: Dict[str, Any], params: list) -> str: + op = node.get("op", "") + + if op in ("and", "or"): + parts = [] + for cond in node.get("conds", []): + part = self._translate_node(cond, params) + if part: + parts.append(f"({part})") + if not parts: + return "" + sql_op = "AND" if op == "and" else "OR" + return f" {sql_op} ".join(parts) + + elif op == "must": + field = node.get("field", "") + values = node.get("conds", []) + if not values: + return "" + col = self._col(field) + if len(values) == 1: + params.append(values[0]) + return f"{col} = ${len(params)}" + else: + placeholders = ", ".join( + f"${len(params) + i + 1}" for i in range(len(values)) + ) + params.extend(values) + return f"{col} IN ({placeholders})" + + elif op == "range": + field = node.get("field", "") + col = self._col(field) + parts = [] + for operator, sql_op in [("gte", ">="), ("gt", ">"), ("lte", "<="), ("lt", "<")]: + if operator in node: + params.append(node[operator]) + parts.append(f"{col} {sql_op} ${len(params)}") + return " AND ".join(parts) + + elif op == "prefix": + field = node.get("field", "") + prefix = node.get("prefix", "") + col = self._col(field) + params.append(f"{prefix}%") + return f"{col} LIKE ${len(params)}" + + elif op == "contains": + field = node.get("field", "") + substring = node.get("substring", "") + col = self._col(field) + params.append(f"%{substring}%") + return f"{col} LIKE ${len(params)}" + + elif op == "not": + inner = self._translate_node(node.get("cond", {}), params) + return f"NOT ({inner})" if inner else "" + + logger.debug(f"Unknown filter op: '{op}', skipping") + return "" + + def _col(self, field: str) -> str: + """Return safe SQL column reference for a field name.""" + if field in _SCALAR_COLUMNS: + return field + # Unknown field → look up in extra_data JSONB + safe = "".join(c for c in field if c.isalnum() or c == "_") + return f"extra_data->>'{safe}'" + + def _make_where(self, sql_filter: str, extra_parts: List[str] = None) -> str: + """Build full WHERE clause from filter string and optional extra parts.""" + parts = [] + if sql_filter: + parts.append(f"({sql_filter})") + if extra_parts: + parts.extend(extra_parts) + return f"WHERE {' AND '.join(parts)}" if parts else "" + + # ========================================================================= + # Collection Management + # ========================================================================= + + async def create_collection(self, name: str, schema: Dict[str, Any]) -> bool: + pool = await self._ensure_pool() + tbl = self._tbl(name) + + async with pool.acquire() as conn: + exists = await conn.fetchval( + "SELECT EXISTS (SELECT 1 FROM information_schema.tables " + "WHERE table_schema='public' AND table_name=$1)", + tbl, + ) + if exists: + logger.debug(f"Collection '{name}' already exists") + return False + + # Derive vector dimension from schema if available + dim = self._vector_dim + for field in schema.get("Fields", []): + if field.get("FieldType") == "vector": + dim = field.get("Dim", dim) + break + + await conn.execute( + f""" + CREATE TABLE {tbl} ( + id TEXT PRIMARY KEY, + uri TEXT, + parent_uri TEXT, + type TEXT DEFAULT 'resource', + context_type TEXT DEFAULT 'resource', + level INTEGER DEFAULT 2, + name TEXT, + description TEXT, + tags TEXT, + abstract TEXT, + active_count BIGINT DEFAULT 0, + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + vector vector({dim}), + sparse_vector JSONB, + extra_data JSONB DEFAULT '{{}}'::jsonb + ) + """ + ) + + # Scalar indexes + for col in ("uri", "parent_uri", "context_type", "level", "active_count"): + await conn.execute( + f"CREATE INDEX idx_{tbl}_{col} ON {tbl} ({col})" + ) + + logger.info(f"Created collection '{name}' (table={tbl}, dim={dim})") + return True + + async def drop_collection(self, name: str) -> bool: + pool = await self._ensure_pool() + tbl = self._tbl(name) + async with pool.acquire() as conn: + await conn.execute(f"DROP TABLE IF EXISTS {tbl}") + logger.info(f"Dropped collection '{name}'") + return True + + async def collection_exists(self, name: str) -> bool: + pool = await self._ensure_pool() + tbl = self._tbl(name) + async with pool.acquire() as conn: + return bool( + await conn.fetchval( + "SELECT EXISTS (SELECT 1 FROM information_schema.tables " + "WHERE table_schema='public' AND table_name=$1)", + tbl, + ) + ) + + async def list_collections(self) -> List[str]: + pool = await self._ensure_pool() + async with pool.acquire() as conn: + rows = await conn.fetch( + "SELECT table_name FROM information_schema.tables " + "WHERE table_schema='public' AND table_name LIKE 'ov_%'" + ) + return [r["table_name"][3:] for r in rows] # strip 'ov_' + + async def get_collection_info(self, name: str) -> Optional[Dict[str, Any]]: + if not await self.collection_exists(name): + return None + pool = await self._ensure_pool() + tbl = self._tbl(name) + async with pool.acquire() as conn: + cnt = await conn.fetchval(f"SELECT COUNT(*) FROM {tbl}") + return { + "name": name, + "vector_dim": self._vector_dim, + "count": int(cnt), + "status": "ready", + "backend": "postgresql", + } + + # ========================================================================= + # CRUD — Single Record + # ========================================================================= + + async def insert(self, collection: str, data: Dict[str, Any]) -> str: + if not await self.collection_exists(collection): + raise CollectionNotFoundError(f"Collection '{collection}' not found") + + pool = await self._ensure_pool() + tbl = self._tbl(collection) + + record_id = data.get("id") + if not record_id: + uri = data.get("uri", "") + record_id = hashlib.md5(uri.encode()).hexdigest() if uri else str(uuid.uuid4()) + + vector = data.get("vector") + vector_str = f"[{','.join(str(x) for x in vector)}]" if vector else None + + sparse = data.get("sparse_vector") + sparse_json = json.dumps(sparse) if sparse is not None else None + + known = _SCALAR_COLUMNS | {"id", "vector", "sparse_vector"} + extra = {k: v for k, v in data.items() if k not in known} + + created_at = self._parse_dt(data.get("created_at")) + updated_at = self._parse_dt(data.get("updated_at")) + + async with pool.acquire() as conn: + await conn.execute( + f""" + INSERT INTO {tbl} ( + id, uri, parent_uri, type, context_type, level, + name, description, tags, abstract, active_count, + created_at, updated_at, vector, sparse_vector, extra_data + ) VALUES ( + $1, $2, $3, $4, $5, $6, + $7, $8, $9, $10, $11, + $12, $13, $14::vector, $15::jsonb, $16::jsonb + ) + ON CONFLICT (id) DO UPDATE SET + uri = EXCLUDED.uri, + parent_uri = EXCLUDED.parent_uri, + type = EXCLUDED.type, + context_type = EXCLUDED.context_type, + level = EXCLUDED.level, + name = EXCLUDED.name, + description = EXCLUDED.description, + tags = EXCLUDED.tags, + abstract = EXCLUDED.abstract, + active_count = EXCLUDED.active_count, + updated_at = NOW(), + vector = EXCLUDED.vector, + sparse_vector = EXCLUDED.sparse_vector, + extra_data = EXCLUDED.extra_data + """, + record_id, + data.get("uri"), + data.get("parent_uri"), + data.get("type", "resource"), + data.get("context_type", "resource"), + int(data.get("level", 2)), + data.get("name"), + data.get("description"), + data.get("tags"), + data.get("abstract"), + int(data.get("active_count", 0)), + created_at, + updated_at, + vector_str, + sparse_json, + json.dumps(extra), + ) + + return record_id + + async def update(self, collection: str, id: str, data: Dict[str, Any]) -> bool: + if not await self.collection_exists(collection): + raise CollectionNotFoundError(f"Collection '{collection}' not found") + + pool = await self._ensure_pool() + tbl = self._tbl(collection) + + set_parts = [] + params: List[Any] = [id] + + updatable = [ + "uri", "parent_uri", "type", "context_type", "level", + "name", "description", "tags", "abstract", "active_count", + ] + for field in updatable: + if field in data: + params.append(data[field]) + set_parts.append(f"{field} = ${len(params)}") + + if "vector" in data and data["vector"]: + vec_str = f"[{','.join(str(x) for x in data['vector'])}]" + params.append(vec_str) + set_parts.append(f"vector = ${len(params)}::vector") + + if not set_parts: + return False + + set_parts.append("updated_at = NOW()") + async with pool.acquire() as conn: + result = await conn.execute( + f"UPDATE {tbl} SET {', '.join(set_parts)} WHERE id = $1", + *params, + ) + return result.split()[-1] != "0" + + async def upsert(self, collection: str, data: Dict[str, Any]) -> str: + return await self.insert(collection, data) + + async def delete(self, collection: str, ids: List[str]) -> int: + if not await self.collection_exists(collection): + raise CollectionNotFoundError(f"Collection '{collection}' not found") + + pool = await self._ensure_pool() + tbl = self._tbl(collection) + async with pool.acquire() as conn: + placeholders = ", ".join(f"${i+1}" for i in range(len(ids))) + result = await conn.execute( + f"DELETE FROM {tbl} WHERE id IN ({placeholders})", *ids + ) + return int(result.split()[-1]) + + async def get(self, collection: str, ids: List[str]) -> List[Dict[str, Any]]: + if not await self.collection_exists(collection): + raise CollectionNotFoundError(f"Collection '{collection}' not found") + + pool = await self._ensure_pool() + tbl = self._tbl(collection) + async with pool.acquire() as conn: + placeholders = ", ".join(f"${i+1}" for i in range(len(ids))) + rows = await conn.fetch( + f"SELECT {_DEFAULT_SELECT} FROM {tbl} WHERE id IN ({placeholders})", + *ids, + ) + return [self._record_to_dict(r) for r in rows] + + async def exists(self, collection: str, id: str) -> bool: + if not await self.collection_exists(collection): + return False + pool = await self._ensure_pool() + tbl = self._tbl(collection) + async with pool.acquire() as conn: + return bool( + await conn.fetchval( + f"SELECT EXISTS (SELECT 1 FROM {tbl} WHERE id = $1)", id + ) + ) + + # ========================================================================= + # CRUD — Batch + # ========================================================================= + + async def batch_insert( + self, collection: str, data: List[Dict[str, Any]] + ) -> List[str]: + return [await self.insert(collection, item) for item in data] + + async def batch_upsert( + self, collection: str, data: List[Dict[str, Any]] + ) -> List[str]: + return await self.batch_insert(collection, data) + + async def batch_delete(self, collection: str, filter: Dict[str, Any]) -> int: + if not await self.collection_exists(collection): + raise CollectionNotFoundError(f"Collection '{collection}' not found") + + pool = await self._ensure_pool() + tbl = self._tbl(collection) + params: List[Any] = [] + sql_filter = self._filter_to_sql(filter, params) + where = self._make_where(sql_filter) + + async with pool.acquire() as conn: + result = await conn.execute(f"DELETE FROM {tbl} {where}", *params) + return int(result.split()[-1]) + + async def remove_by_uri(self, collection: str, uri: str) -> int: + if not await self.collection_exists(collection): + raise CollectionNotFoundError(f"Collection '{collection}' not found") + + pool = await self._ensure_pool() + tbl = self._tbl(collection) + # Escape LIKE special characters + escaped = uri.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") + async with pool.acquire() as conn: + result = await conn.execute( + f"DELETE FROM {tbl} WHERE uri = $1 OR uri LIKE $2 ESCAPE '\\'", + uri, + f"{escaped}/%", + ) + return int(result.split()[-1]) + + # ========================================================================= + # Search Operations + # ========================================================================= + + async def search( + self, + collection: str, + query_vector: Optional[List[float]] = None, + sparse_query_vector: Optional[Dict[str, float]] = None, + filter: Optional[Dict[str, Any]] = None, + limit: int = 10, + offset: int = 0, + output_fields: Optional[List[str]] = None, + with_vector: bool = False, + ) -> List[Dict[str, Any]]: + if not await self.collection_exists(collection): + raise CollectionNotFoundError(f"Collection '{collection}' not found") + + pool = await self._ensure_pool() + tbl = self._tbl(collection) + + params: List[Any] = [] + sql_filter = self._filter_to_sql(filter, params) + + select_cols = _DEFAULT_SELECT + + if query_vector: + vec_str = f"[{','.join(str(x) for x in query_vector)}]" + params.append(vec_str) + vp_idx = len(params) + vp = f"${vp_idx}::vector" + + where = self._make_where(sql_filter, ["vector IS NOT NULL"]) + + params.extend([limit, offset]) + lp, op = f"${len(params)-1}", f"${len(params)}" + + query = ( + f"SELECT {select_cols}, " + f"ROUND((1 - (vector <=> {vp}))::numeric, 4) AS _score " + f"FROM {tbl} {where} " + f"ORDER BY vector <=> {vp} " + f"LIMIT {lp} OFFSET {op}" + ) + else: + where = self._make_where(sql_filter) + params.extend([limit, offset]) + lp, op = f"${len(params)-1}", f"${len(params)}" + query = ( + f"SELECT {select_cols} " + f"FROM {tbl} {where} " + f"ORDER BY active_count DESC, created_at DESC " + f"LIMIT {lp} OFFSET {op}" + ) + + async with pool.acquire() as conn: + rows = await conn.fetch(query, *params) + return [self._record_to_dict(r) for r in rows] + + async def filter( + self, + collection: str, + filter: Dict[str, Any], + limit: int = 10, + offset: int = 0, + output_fields: Optional[List[str]] = None, + order_by: Optional[str] = None, + order_desc: bool = False, + ) -> List[Dict[str, Any]]: + if not await self.collection_exists(collection): + raise CollectionNotFoundError(f"Collection '{collection}' not found") + + pool = await self._ensure_pool() + tbl = self._tbl(collection) + + params: List[Any] = [] + sql_filter = self._filter_to_sql(filter, params) + where = self._make_where(sql_filter) + + safe_sort = _SCALAR_COLUMNS + if order_by and order_by in safe_sort: + direction = "DESC" if order_desc else "ASC" + order_clause = f"ORDER BY {order_by} {direction}" + else: + order_clause = "ORDER BY active_count DESC, created_at DESC" + + params.extend([limit, offset]) + lp, op = f"${len(params)-1}", f"${len(params)}" + + query = ( + f"SELECT {_DEFAULT_SELECT} " + f"FROM {tbl} {where} {order_clause} " + f"LIMIT {lp} OFFSET {op}" + ) + + async with pool.acquire() as conn: + rows = await conn.fetch(query, *params) + return [self._record_to_dict(r) for r in rows] + + async def scroll( + self, + collection: str, + filter: Optional[Dict[str, Any]] = None, + limit: int = 100, + cursor: Optional[str] = None, + output_fields: Optional[List[str]] = None, + ) -> Tuple[List[Dict[str, Any]], Optional[str]]: + if not await self.collection_exists(collection): + raise CollectionNotFoundError(f"Collection '{collection}' not found") + + pool = await self._ensure_pool() + tbl = self._tbl(collection) + + params: List[Any] = [] + sql_filter = self._filter_to_sql(filter, params) + where = self._make_where(sql_filter) + + offset = int(cursor) if cursor and cursor.isdigit() else 0 + fetch_limit = limit + 1 + params.extend([fetch_limit, offset]) + lp, op = f"${len(params)-1}", f"${len(params)}" + + query = ( + f"SELECT {_DEFAULT_SELECT} " + f"FROM {tbl} {where} ORDER BY id " + f"LIMIT {lp} OFFSET {op}" + ) + + async with pool.acquire() as conn: + rows = await conn.fetch(query, *params) + + records = [self._record_to_dict(r) for r in rows[:limit]] + next_cursor = str(offset + limit) if len(rows) > limit else None + return records, next_cursor + + # ========================================================================= + # Aggregation + # ========================================================================= + + async def count(self, collection: str, filter: Optional[Dict[str, Any]] = None) -> int: + if not await self.collection_exists(collection): + raise CollectionNotFoundError(f"Collection '{collection}' not found") + + pool = await self._ensure_pool() + tbl = self._tbl(collection) + params: List[Any] = [] + sql_filter = self._filter_to_sql(filter, params) + where = self._make_where(sql_filter) + + async with pool.acquire() as conn: + return int(await conn.fetchval(f"SELECT COUNT(*) FROM {tbl} {where}", *params)) + + # ========================================================================= + # Index Operations + # ========================================================================= + + async def create_index( + self, collection: str, field: str, index_type: str, **kwargs + ) -> bool: + pool = await self._ensure_pool() + tbl = self._tbl(collection) + col = self._col(field) + safe_field = "".join(c for c in field if c.isalnum() or c == "_") + async with pool.acquire() as conn: + try: + await conn.execute( + f"CREATE INDEX IF NOT EXISTS idx_{tbl}_{safe_field} " + f"ON {tbl} ({col})" + ) + return True + except Exception as e: + logger.warning(f"Failed to create index on {field}: {e}") + return False + + async def drop_index(self, collection: str, field: str) -> bool: + pool = await self._ensure_pool() + tbl = self._tbl(collection) + safe_field = "".join(c for c in field if c.isalnum() or c == "_") + async with pool.acquire() as conn: + await conn.execute(f"DROP INDEX IF EXISTS idx_{tbl}_{safe_field}") + return True + + # ========================================================================= + # Lifecycle + # ========================================================================= + + async def clear(self, collection: str) -> bool: + if not await self.collection_exists(collection): + raise CollectionNotFoundError(f"Collection '{collection}' not found") + pool = await self._ensure_pool() + tbl = self._tbl(collection) + async with pool.acquire() as conn: + await conn.execute(f"TRUNCATE TABLE {tbl}") + return True + + async def optimize(self, collection: str) -> bool: + pool = await self._ensure_pool() + tbl = self._tbl(collection) + async with pool.acquire() as conn: + await conn.execute(f"VACUUM ANALYZE {tbl}") + return True + + async def close(self) -> None: + if self._pool is not None: + await self._pool.close() + self._pool = None + logger.info("PostgreSQL connection pool closed") + + # ========================================================================= + # Health & Status + # ========================================================================= + + async def health_check(self) -> bool: + try: + pool = await self._ensure_pool() + async with pool.acquire() as conn: + await conn.fetchval("SELECT 1") + return True + except Exception as e: + logger.error(f"PostgreSQL health check failed: {e}") + return False + + async def get_stats(self) -> Dict[str, Any]: + try: + pool = await self._ensure_pool() + collections = await self.list_collections() + total_records = 0 + async with pool.acquire() as conn: + for coll in collections: + tbl = self._tbl(coll) + cnt = await conn.fetchval(f"SELECT COUNT(*) FROM {tbl}") + total_records += int(cnt) + db_size = await conn.fetchval( + "SELECT pg_database_size(current_database())" + ) + return { + "backend": "postgresql", + "collections": len(collections), + "total_records": total_records, + "storage_size": int(db_size), + } + except Exception as e: + logger.error(f"Failed to get PostgreSQL stats: {e}") + return {"backend": "postgresql", "error": str(e)} + + # ========================================================================= + # Private helpers + # ========================================================================= + + @staticmethod + def _parse_dt(value: Any) -> Optional[datetime]: + """Parse various datetime representations to datetime object.""" + if value is None: + return None + if isinstance(value, datetime): + return value + if isinstance(value, str): + try: + return datetime.fromisoformat(value.replace("Z", "+00:00")) + except Exception: + return None + return None diff --git a/openviking/storage/postgresql_manager.py b/openviking/storage/postgresql_manager.py new file mode 100644 index 00000000..58c70e57 --- /dev/null +++ b/openviking/storage/postgresql_manager.py @@ -0,0 +1,133 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +""" +PostgreSQL Manager - extends PostgreSQLBackend with queue management, +mirroring the interface of VikingDBManager so OpenVikingService can use +either backend transparently. +""" + +from typing import TYPE_CHECKING, Optional + +from openviking.storage.postgresql_backend import PostgreSQLBackend +from openviking_cli.utils import get_logger +from openviking_cli.utils.config.vectordb_config import VectorDBBackendConfig + +if TYPE_CHECKING: + from openviking.storage.queuefs.embedding_msg import EmbeddingMsg + from openviking.storage.queuefs.embedding_queue import EmbeddingQueue + from openviking.storage.queuefs.queue_manager import QueueManager + +logger = get_logger(__name__) + + +class PostgreSQLManager(PostgreSQLBackend): + """ + PostgreSQL-backed VikingDB manager. + + Wraps PostgreSQLBackend with the same queue-management interface + as VikingDBManager so OpenVikingService can swap them transparently. + """ + + def __init__( + self, + vectordb_config: VectorDBBackendConfig, + queue_manager: Optional["QueueManager"] = None, + ): + """ + Initialize PostgreSQL Manager. + + Args: + vectordb_config: VectorDB configuration (must have backend='postgresql'). + queue_manager: Shared QueueManager instance for embedding tasks. + """ + pg_cfg = vectordb_config.postgresql + if pg_cfg is None: + raise ValueError("PostgreSQL backend requires 'vectordb.postgresql' config section") + + # Build DSN from config fields or use explicit dsn + if pg_cfg.dsn: + dsn = pg_cfg.dsn + else: + dsn = ( + f"postgresql://{pg_cfg.user}:{pg_cfg.password}" + f"@{pg_cfg.host}:{pg_cfg.port}/{pg_cfg.database}" + ) + + dim = vectordb_config.dimension or 1024 + super().__init__(dsn=dsn, vector_dim=dim) + + self._queue_manager = queue_manager + self._closing = False + logger.info( + f"PostgreSQLManager initialized (host={pg_cfg.host}, db={pg_cfg.database}, dim={dim})" + ) + + # ========================================================================= + # Lifecycle + # ========================================================================= + + async def close(self) -> None: + self._closing = True + await super().close() + + # ========================================================================= + # Queue management (mirrors VikingDBManager) + # ========================================================================= + + @property + def is_closing(self) -> bool: + return self._closing + + @property + def queue_manager(self) -> Optional["QueueManager"]: + return self._queue_manager + + @property + def embedding_queue(self) -> Optional["EmbeddingQueue"]: + if not self._queue_manager: + return None + queue = self._queue_manager.get_queue(self._queue_manager.EMBEDDING) + from openviking.storage.queuefs.embedding_queue import EmbeddingQueue + + return queue if isinstance(queue, EmbeddingQueue) else None + + @property + def has_queue_manager(self) -> bool: + return self._queue_manager is not None + + async def enqueue_embedding_msg(self, embedding_msg: "EmbeddingMsg") -> bool: + if not embedding_msg: + logger.warning("Embedding message is None, skipping") + return False + if not self._queue_manager: + raise RuntimeError("Queue manager not initialized") + try: + eq = self.embedding_queue + if not eq: + raise RuntimeError("Embedding queue not initialized") + await eq.enqueue(embedding_msg) + logger.debug(f"Enqueued embedding message: {embedding_msg.id}") + return True + except Exception as e: + logger.error(f"Error enqueuing embedding message: {e}") + return False + + async def get_embedding_queue_size(self) -> int: + if not self._queue_manager: + return 0 + try: + eq = self._queue_manager.get_queue("embedding") + return await eq.size() + except Exception as e: + logger.error(f"Error getting embedding queue size: {e}") + return 0 + + def get_embedder(self): + """Get configured embedder (matches VikingDBManager interface).""" + try: + from openviking_cli.utils.config import get_openviking_config + + return get_openviking_config().embedding.get_embedder() + except Exception as e: + logger.warning(f"Failed to get embedder: {e}") + return None diff --git a/openviking/storage/vectordb/index/local_index.py b/openviking/storage/vectordb/index/local_index.py index eadd17c5..4148f758 100644 --- a/openviking/storage/vectordb/index/local_index.py +++ b/openviking/storage/vectordb/index/local_index.py @@ -8,7 +8,13 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union -import openviking.storage.vectordb.engine as engine +try: + import openviking.storage.vectordb.engine as engine + _ENGINE_AVAILABLE = True +except ImportError: + engine = None # type: ignore[assignment] + _ENGINE_AVAILABLE = False + from openviking.storage.vectordb.index.index import IIndex from openviking.storage.vectordb.store.data import CandidateData, DeltaRecord from openviking.storage.vectordb.utils.constants import IndexFileMarkers @@ -64,6 +70,12 @@ def __init__(self, index_path_or_json: str, normalize_vector_flag: bool = False) normalize_vector_flag (bool): If True, all vectors will be L2-normalized before being added to the index or used for search. Defaults to False. """ + if not _ENGINE_AVAILABLE: + raise RuntimeError( + "C++ vectordb engine is not available (module 'openviking.storage.vectordb.engine' " + "was not built). On Windows, set OPENVIKING_NO_CPP_EXT=1 and use the PostgreSQL " + "backend instead of the local backend." + ) self.index_engine: Optional[engine.IndexEngine] = engine.IndexEngine(index_path_or_json) self.normalize_vector_flag = normalize_vector_flag diff --git a/openviking/storage/vectordb/store/local_store.py b/openviking/storage/vectordb/store/local_store.py index b1ab54cb..fb13bfe9 100644 --- a/openviking/storage/vectordb/store/local_store.py +++ b/openviking/storage/vectordb/store/local_store.py @@ -1,8 +1,16 @@ # Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. # SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + from typing import List, Tuple, Union -import openviking.storage.vectordb.engine as engine +try: + import openviking.storage.vectordb.engine as engine + _ENGINE_AVAILABLE = True +except ImportError: + engine = None # type: ignore[assignment] + _ENGINE_AVAILABLE = False + from openviking.storage.vectordb.store.store import BatchOp, IMutiTableStore, Op, OpType # Constant for the maximum Unicode character, used for range queries to cover all possible keys @@ -19,6 +27,11 @@ def create_store_engine_proxy(path: str = "") -> "StoreEngineProxy": Returns: StoreEngineProxy: Proxy instance wrapping the underlying storage engine. """ + if not _ENGINE_AVAILABLE: + raise RuntimeError( + "C++ vectordb engine is not available. " + "Use the PostgreSQL backend instead (set vectordb.backend='postgresql' in config)." + ) date_engine = engine.PersistStore(path) if path else engine.VolatileStore() return StoreEngineProxy(date_engine) diff --git a/openviking_cli/utils/config/vectordb_config.py b/openviking_cli/utils/config/vectordb_config.py index 4052e0c4..2bc97ada 100644 --- a/openviking_cli/utils/config/vectordb_config.py +++ b/openviking_cli/utils/config/vectordb_config.py @@ -8,6 +8,25 @@ DEFAULT_PROJECT_NAME = "default" +class PostgreSQLConfig(BaseModel): + """Configuration for PostgreSQL + pgvector backend (SaaS mode).""" + + host: str = Field(default="localhost", description="PostgreSQL host") + port: int = Field(default=5432, description="PostgreSQL port") + database: str = Field(default="openviking", description="Database name") + user: str = Field(default="postgres", description="Database user") + password: str = Field(default="postgres", description="Database password") + dsn: Optional[str] = Field( + default=None, + description=( + "Full PostgreSQL DSN (overrides individual host/port/db/user/password). " + "Example: postgresql://user:pass@host:5432/dbname" + ), + ) + + model_config = {"extra": "forbid"} + + class VolcengineConfig(BaseModel): """Configuration for Volcengine VikingDB.""" @@ -42,7 +61,10 @@ class VectorDBBackendConfig(BaseModel): backend: str = Field( default="local", - description="VectorDB backend type: 'local' (file-based), 'http' (remote service), or 'volcengine' (VikingDB)", + description=( + "VectorDB backend type: 'local' (file-based), 'http' (remote service), " + "'volcengine' (VikingDB), 'vikingdb' (private), or 'postgresql' (SaaS)" + ), ) name: Optional[str] = Field(default=COLLECTION_NAME, description="Collection name for VectorDB") @@ -90,14 +112,21 @@ class VectorDBBackendConfig(BaseModel): description="VikingDB private deployment configuration for 'vikingdb' type", ) + # PostgreSQL SaaS mode + postgresql: Optional[PostgreSQLConfig] = Field( + default=None, + description="PostgreSQL + pgvector configuration for 'postgresql' backend (SaaS mode)", + ) + model_config = {"extra": "forbid"} @model_validator(mode="after") def validate_config(self): """Validate configuration completeness and consistency""" - if self.backend not in ["local", "http", "volcengine", "vikingdb"]: + valid_backends = ["local", "http", "volcengine", "vikingdb", "postgresql"] + if self.backend not in valid_backends: raise ValueError( - f"Invalid VectorDB backend: '{self.backend}'. Must be one of: 'local', 'http', 'volcengine', 'vikingdb'" + f"Invalid VectorDB backend: '{self.backend}'. Must be one of: {valid_backends}" ) if self.backend == "local": @@ -117,4 +146,9 @@ def validate_config(self): if not self.vikingdb or not self.vikingdb.host: raise ValueError("VectorDB vikingdb backend requires 'host' to be set") + elif self.backend == "postgresql": + if self.postgresql is None: + # Auto-create with defaults if not specified + self.postgresql = PostgreSQLConfig() + return self diff --git a/ov.conf.local.example b/ov.conf.local.example new file mode 100644 index 00000000..162d4ce2 --- /dev/null +++ b/ov.conf.local.example @@ -0,0 +1,64 @@ +{ + "storage": { + "workspace": "./data", + + "agfs": { + "backend": "s3", + "port": 1833, + "s3": { + "bucket": "openviking-storage", + "region": "us-east-1", + "access_key": "openviking", + "secret_key": "openviking_secret", + "endpoint": "http://localhost:9000", + "prefix": "", + "use_ssl": false, + "use_path_style": true + } + }, + + "vectordb": { + "backend": "postgresql", + "name": "context", + "dimension": 1024, + "postgresql": { + "host": "localhost", + "port": 5432, + "database": "openviking", + "user": "openviking", + "password": "openviking_secret" + } + } + }, + + "embedding": { + "dense": { + "provider": "openai", + "api_base": "https://api.openai.com/v1", + "api_key": "YOUR_OPENAI_API_KEY", + "model": "text-embedding-3-small", + "dimension": 1024 + } + }, + + "vlm": { + "provider": "openai", + "api_base": "https://api.openai.com/v1", + "api_key": "YOUR_OPENAI_API_KEY", + "model": "gpt-4o" + }, + + "server": { + "host": "0.0.0.0", + "port": 1933, + "cors_origins": ["*"] + }, + + "default_account": "default", + "default_user": "default", + "default_agent": "default", + + "log": { + "level": "INFO" + } +} diff --git a/ov.conf.saas.example b/ov.conf.saas.example new file mode 100644 index 00000000..e6e0e2ec --- /dev/null +++ b/ov.conf.saas.example @@ -0,0 +1,64 @@ +{ + "storage": { + "workspace": "/app/data", + + "agfs": { + "backend": "s3", + "port": 1833, + "s3": { + "bucket": "openviking-storage", + "region": "us-east-1", + "access_key": "openviking", + "secret_key": "openviking_secret", + "endpoint": "http://minio:9000", + "prefix": "", + "use_ssl": false, + "use_path_style": true + } + }, + + "vectordb": { + "backend": "postgresql", + "name": "context", + "dimension": 1024, + "postgresql": { + "host": "postgres", + "port": 5432, + "database": "openviking", + "user": "openviking", + "password": "openviking_secret" + } + } + }, + + "embedding": { + "dense": { + "provider": "openai", + "api_base": "https://api.openai.com/v1", + "api_key": "YOUR_OPENAI_API_KEY", + "model": "text-embedding-3-small", + "dimension": 1024 + } + }, + + "vlm": { + "provider": "openai", + "api_base": "https://api.openai.com/v1", + "api_key": "YOUR_OPENAI_API_KEY", + "model": "gpt-4o" + }, + + "server": { + "host": "0.0.0.0", + "port": 1933, + "cors_origins": ["*"] + }, + + "default_account": "default", + "default_user": "default", + "default_agent": "default", + + "log": { + "level": "INFO" + } +} diff --git a/pyproject.toml b/pyproject.toml index ac30a11f..cb334792 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,7 @@ dependencies = [ "typer>=0.12.0", "litellm>=1.0.0", "python-multipart>=0.0.22", + "asyncpg>=0.29.0", ] [project.optional-dependencies] diff --git a/setup.py b/setup.py index 92e5b45c..05f37413 100644 --- a/setup.py +++ b/setup.py @@ -96,7 +96,22 @@ def build_agfs(self): self._copy_binary(agfs_target_binary, dst) def build_extension(self, ext): - """Build a single C++ extension module using CMake.""" + """Build a single C++ extension module using CMake. + + This extension powers the *local* VectorDB backend. + In SaaS mode (PostgreSQL backend) the extension is never imported, + so a build failure is treated as a non-fatal warning. + + Set OPENVIKING_NO_CPP_EXT=1 to skip compilation explicitly. + """ + # Allow explicit opt-out (useful for SaaS / PostgreSQL mode on Windows) + if os.environ.get("OPENVIKING_NO_CPP_EXT", "").strip() == "1": + print( + "[WARNING] OPENVIKING_NO_CPP_EXT=1: skipping C++ vectordb extension. " + "Local VectorDB backend will be unavailable; PostgreSQL/SaaS mode unaffected." + ) + return + ext_fullpath = Path(self.get_ext_fullpath(ext.name)) ext_dir = ext_fullpath.parent.resolve() build_dir = Path(self.build_temp) / "cmake_build" @@ -122,10 +137,18 @@ def build_extension(self, ext): elif sys.platform == "win32": cmake_args.extend(["-G", "MinGW Makefiles"]) - self.spawn([self.cmake_executable] + cmake_args) - - build_args = ["--build", str(build_dir), "--config", "Release", f"-j{os.cpu_count() or 4}"] - self.spawn([self.cmake_executable] + build_args) + try: + self.spawn([self.cmake_executable] + cmake_args) + build_args = ["--build", str(build_dir), "--config", "Release", f"-j{os.cpu_count() or 4}"] + self.spawn([self.cmake_executable] + build_args) + except Exception as e: + print( + f"\n[WARNING] C++ vectordb extension build failed: {e}\n" + " The local VectorDB backend will be unavailable.\n" + " This is fine for SaaS/PostgreSQL mode.\n" + " To suppress this warning, set OPENVIKING_NO_CPP_EXT=1 before installing.\n" + " To enable local mode on Windows, install MinGW: https://www.mingw-w64.org/\n" + ) setup(