diff --git a/openviking/storage/vectordb/collection/collection.py b/openviking/storage/vectordb/collection/collection.py index 45954e8c..97d259fd 100644 --- a/openviking/storage/vectordb/collection/collection.py +++ b/openviking/storage/vectordb/collection/collection.py @@ -1,12 +1,23 @@ # Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. # SPDX-License-Identifier: Apache-2.0 +import importlib from abc import ABC, abstractmethod -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Type from openviking.storage.vectordb.collection.result import AggregateResult, SearchResult from openviking.storage.vectordb.index.index import IIndex +def load_collection_class(class_path: str) -> Type["ICollection"]: + """Load collection class from string path""" + try: + module_name, class_name = class_path.rsplit(".", 1) + module = importlib.import_module(module_name) + return getattr(module, class_name) + except (ImportError, AttributeError) as e: + raise ImportError(f"Could not load collection class {class_path}: {e}") + + class ICollection(ABC): def __init__(self): pass diff --git a/openviking/storage/vectordb/project/vikingdb_project.py b/openviking/storage/vectordb/project/vikingdb_project.py index 50a67acf..bcf82ec9 100644 --- a/openviking/storage/vectordb/project/vikingdb_project.py +++ b/openviking/storage/vectordb/project/vikingdb_project.py @@ -2,12 +2,14 @@ # SPDX-License-Identifier: Apache-2.0 from typing import Any, Dict, List, Optional -from openviking.storage.vectordb.collection.collection import Collection +from openviking.storage.vectordb.collection.collection import ( + Collection, + load_collection_class, +) from openviking.storage.vectordb.collection.vikingdb_clients import ( VIKINGDB_APIS, VikingDBClient, ) -from openviking.storage.vectordb.collection.vikingdb_collection import VikingDBCollection from openviking_cli.utils.logger import default_logger as logger @@ -22,6 +24,8 @@ def get_or_create_vikingdb_project( config: Configuration dict with keys: - Host: VikingDB service host - Headers: Custom headers for authentication/context + - CollectionClass: Class path for collection implementation + - CollectionArgs: Optional dictionary of arguments to pass to collection constructor Returns: VikingDBProject instance @@ -31,11 +35,23 @@ def get_or_create_vikingdb_project( host = config.get("Host") headers = config.get("Headers") + collection_class_path = config.get( + "CollectionClass", + "openviking.storage.vectordb.collection.vikingdb_collection.VikingDBCollection", + ) + # Extract any other arguments that might be needed for collection initialization + collection_args = config.get("CollectionArgs", {}) if not host: raise ValueError("config must contain 'Host'") - return VikingDBProject(host=host, headers=headers, project_name=project_name) + return VikingDBProject( + host=host, + headers=headers, + project_name=project_name, + collection_class_path=collection_class_path, + collection_args=collection_args, + ) class VikingDBProject: @@ -45,7 +61,12 @@ class VikingDBProject: """ def __init__( - self, host: str, headers: Optional[Dict[str, str]] = None, project_name: str = "default" + self, + host: str, + headers: Optional[Dict[str, str]] = None, + project_name: str = "default", + collection_class_path: str = "openviking.storage.vectordb.collection.vikingdb_collection.VikingDBCollection", + collection_args: Optional[Dict[str, Any]] = None, ): """ Initialize VikingDB project. @@ -54,12 +75,19 @@ def __init__( host: VikingDB service host headers: Custom headers for requests project_name: Project name + collection_class_path: Python path to the collection class + collection_args: Optional dictionary of arguments to pass to collection constructor """ self.host = host self.headers = headers self.project_name = project_name + self.collection_class_path = collection_class_path + self.CollectionClass = load_collection_class(self.collection_class_path) + self.collection_args = collection_args or {} - logger.info(f"Initialized VikingDB project: {project_name} with host {host}") + logger.info( + f"Initialized VikingDB project: {project_name} with host {host} and collection class {collection_class_path}" + ) def close(self): """Close project""" @@ -87,9 +115,17 @@ def get_collection(self, collection_name: str) -> Optional[Collection]: meta_data = result.get("Result", {}) if not meta_data: return None - vikingdb_collection = VikingDBCollection( - host=self.host, headers=self.headers, meta_data=meta_data - ) + # Prepare arguments for collection constructor + # Default arguments + kwargs = { + "host": self.host, + "headers": self.headers, + "meta_data": meta_data, + } + # Update with user-provided arguments (can override defaults if needed, though usually additive) + kwargs.update(self.collection_args) + + vikingdb_collection = self.CollectionClass(**kwargs) return Collection(vikingdb_collection) except Exception: return None @@ -118,12 +154,24 @@ def list_collections(self) -> List[str]: def get_collections(self) -> Dict[str, Collection]: """Get all collections from server""" colls = self._get_collections() - return { - c["CollectionName"]: Collection( - VikingDBCollection(host=self.host, headers=self.headers, meta_data=c) - ) - for c in colls + + # Prepare base arguments + base_kwargs = { + "host": self.host, + "headers": self.headers, } + + collections = {} + for c in colls: + kwargs = base_kwargs.copy() + kwargs["meta_data"] = c + kwargs.update(self.collection_args) + + collections[c["CollectionName"]] = Collection( + self.CollectionClass(**kwargs) + ) + + return collections def create_collection(self, collection_name: str, meta_data: Dict[str, Any]) -> Collection: """collection should be pre-created""" diff --git a/openviking/storage/vectordb_adapters/README.md b/openviking/storage/vectordb_adapters/README.md index 3cb8e065..4158306c 100644 --- a/openviking/storage/vectordb_adapters/README.md +++ b/openviking/storage/vectordb_adapters/README.md @@ -118,6 +118,36 @@ - `create_collection` 时使用配置中的 name 绑定 collection。 - 后续操作默认绑定,不需要每次传 collection_name。 +--- + +## Step 6:配置 ov.conf + +对于没有提交到仓库,或者在第三方仓库的 Adapter,可以通过配置 `backend` 为完整的类路径来动态加载。 +同时,可以使用 `custom_params` 字段传递自定义参数。 + +在 `ov.conf` 中添加如下配置: + +```json +{ + "storage": { + "vectordb": { + "backend": "tests.storage.mock_backend.MockCollectionAdapter", + "name": "mock_test_collection", + "custom_params": { + "custom_param1": "val1", + "custom_param2": 123 + } + } + } +} +``` + +注意: +1. `backend`: 填写 Adapter 类的完整 Python 路径(例如 `my_project.adapters.MyAdapter`)。 +2. `custom_params`: 这是一个字典,你可以放入任何自定义参数,Adapter 的 `from_config` 方法可以通过 `config.custom_params` 获取这些值。 + + + --- ## 5. Filter 与查询兼容规则 @@ -208,4 +238,4 @@ class ThirdPartyCollectionAdapter(CollectionAdapter): - `backend=thirdparty` 可正常初始化。 - create 后可完成 upsert/get/query/delete/count 全流程。 - 不改上层业务调用方式即可参与 `find/search` 检索链路。 -- 后端差异全部封装在 adapter 层。 +- 后端差异全部封装在 adapter 层。 \ No newline at end of file diff --git a/openviking/storage/vectordb_adapters/factory.py b/openviking/storage/vectordb_adapters/factory.py index 21f15279..62039293 100644 --- a/openviking/storage/vectordb_adapters/factory.py +++ b/openviking/storage/vectordb_adapters/factory.py @@ -20,7 +20,22 @@ def create_collection_adapter(config) -> CollectionAdapter: """Unified factory entrypoint for backend-specific collection adapters.""" - adapter_cls = _ADAPTER_REGISTRY.get(config.backend) + backend = config.backend + adapter_cls = _ADAPTER_REGISTRY.get(backend) + + # If not in registry, try to load dynamically as a class path + if adapter_cls is None and "." in backend: + try: + import importlib + module_name, class_name = backend.rsplit(".", 1) + module = importlib.import_module(module_name) + potential_cls = getattr(module, class_name) + if issubclass(potential_cls, CollectionAdapter): + adapter_cls = potential_cls + except (ImportError, AttributeError, TypeError): + # Fallback to raising error if dynamic loading fails + pass + if adapter_cls is None: raise ValueError( f"Vector backend {config.backend} is not supported. " diff --git a/openviking_cli/utils/config/vectordb_config.py b/openviking_cli/utils/config/vectordb_config.py index 0a78877b..8b4a4e16 100644 --- a/openviking_cli/utils/config/vectordb_config.py +++ b/openviking_cli/utils/config/vectordb_config.py @@ -1,6 +1,6 @@ # Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. # SPDX-License-Identifier: Apache-2.0 -from typing import Dict, Optional +from typing import Any, Dict, Optional from pydantic import BaseModel, Field, model_validator @@ -99,14 +99,27 @@ class VectorDBBackendConfig(BaseModel): description="VikingDB private deployment configuration for 'vikingdb' type", ) + custom_params: Dict[str, Any] = Field( + default_factory=dict, + description="Custom parameters for custom backend adapters", + ) + model_config = {"extra": "forbid"} @model_validator(mode="after") def validate_config(self): """Validate configuration completeness and consistency""" - if self.backend not in ["local", "http", "volcengine", "vikingdb"]: + standard_backends = ["local", "http", "volcengine", "vikingdb"] + + # Allow custom backend classes (containing dot) without standard validation + if "." in self.backend: + logger.info("Using custom VectorDB backend: %s", self.backend) + return self + + if self.backend not in standard_backends: raise ValueError( - f"Invalid VectorDB backend: '{self.backend}'. Must be one of: 'local', 'http', 'volcengine', 'vikingdb'" + f"Invalid VectorDB backend: '{self.backend}'. Must be one of: {standard_backends} " + "or a valid Python class path." ) if self.backend == "local": @@ -132,4 +145,4 @@ def validate_config(self): if not self.vikingdb or not self.vikingdb.host: raise ValueError("VectorDB vikingdb backend requires 'host' to be set") - return self + return self \ No newline at end of file diff --git a/tests/storage/mock_backend.py b/tests/storage/mock_backend.py new file mode 100644 index 00000000..00d565ca --- /dev/null +++ b/tests/storage/mock_backend.py @@ -0,0 +1,182 @@ +from typing import Any, Dict, List, Optional +from openviking.storage.vectordb.collection.collection import ICollection +from openviking.storage.vectordb.collection.result import AggregateResult, SearchResult +from openviking.storage.vectordb.index.index import IIndex + +from openviking.storage.vectordb_adapters.base import CollectionAdapter +from openviking.storage.vectordb.collection.collection import Collection + +class MockCollectionAdapter(CollectionAdapter): + """ + Mock adapter for testing dynamic loading. + Inherits from CollectionAdapter and wraps MockCollection. + """ + def __init__(self, collection_name: str, custom_param1: str = "", custom_param2: int = 0): + super().__init__(collection_name=collection_name) + self.mode = "mock" + self.custom_param1 = custom_param1 + self.custom_param2 = custom_param2 + + @classmethod + def from_config(cls, config: Any) -> "MockCollectionAdapter": + custom_params = getattr(config, "custom_params", {}) + return cls( + collection_name=config.name or "mock_collection", + custom_param1=custom_params.get("custom_param1", ""), + custom_param2=custom_params.get("custom_param2", 0) + ) + + def _load_existing_collection_if_needed(self) -> None: + if self._collection is None: + # Create a dummy collection wrapping MockCollection + self._collection = MockCollection(self.custom_param1, self.custom_param2) + + def _create_backend_collection(self, meta: Dict[str, Any]) -> Collection: + return MockCollection(self.custom_param1, self.custom_param2) + +class MockCollection(ICollection): + def __init__(self, custom_param1: str, custom_param2: int, meta_data: Optional[Dict[str, Any]] = None, **kwargs): + super().__init__() + self.meta_data = meta_data if meta_data is not None else {} + + self.custom_param1 = custom_param1 + self.custom_param2 = custom_param2 + + # Store extra kwargs (including host/headers if passed but not used explicitly) + self.kwargs = kwargs + + # Verify that we can access values passed during initialization + if self.meta_data and "test_verification" in self.meta_data: + print(f"MockCollection initialized with custom_param1={self.custom_param1}, custom_param2={self.custom_param2}, kwargs={kwargs}") + + def update(self, fields: Optional[Dict[str, Any]] = None, description: Optional[str] = None): + raise NotImplementedError("MockCollection.update is not supported") + + def get_meta_data(self): + raise NotImplementedError("MockCollection.get_meta_data is not supported") + + def close(self): + # No-op for mock + pass + + def drop(self): + # No-op for mock + pass + + def create_index(self, index_name: str, meta_data: Dict[str, Any]) -> IIndex: + raise NotImplementedError("MockCollection.create_index is not supported") + + def has_index(self, index_name: str) -> bool: + raise NotImplementedError("MockCollection.has_index is not supported") + + def get_index(self, index_name: str) -> Optional[IIndex]: + raise NotImplementedError("MockCollection.get_index is not supported") + + def search_by_vector( + self, + index_name: str, + dense_vector: Optional[List[float]] = None, + limit: int = 10, + offset: int = 0, + filters: Optional[Dict[str, Any]] = None, + sparse_vector: Optional[Dict[str, float]] = None, + output_fields: Optional[List[str]] = None, + ) -> SearchResult: + raise NotImplementedError("MockCollection.search_by_vector is not supported") + + def search_by_keywords( + self, + index_name: str, + keywords: Optional[List[str]] = None, + query: Optional[str] = None, + limit: int = 10, + offset: int = 0, + filters: Optional[Dict[str, Any]] = None, + output_fields: Optional[List[str]] = None, + ) -> SearchResult: + raise NotImplementedError("MockCollection.search_by_keywords is not supported") + + def search_by_id( + self, + index_name: str, + id: Any, + limit: int = 10, + offset: int = 0, + filters: Optional[Dict[str, Any]] = None, + output_fields: Optional[List[str]] = None, + ) -> SearchResult: + raise NotImplementedError("MockCollection.search_by_id is not supported") + + def search_by_multimodal( + self, + index_name: str, + text: Optional[str], + image: Optional[Any], + video: Optional[Any], + limit: int = 10, + offset: int = 0, + filters: Optional[Dict[str, Any]] = None, + output_fields: Optional[List[str]] = None, + ) -> SearchResult: + raise NotImplementedError("MockCollection.search_by_multimodal is not supported") + + def search_by_random( + self, + index_name: str, + limit: int = 10, + offset: int = 0, + filters: Optional[Dict[str, Any]] = None, + output_fields: Optional[List[str]] = None, + ) -> SearchResult: + raise NotImplementedError("MockCollection.search_by_random is not supported") + + def search_by_scalar( + self, + index_name: str, + field: str, + order: Optional[str] = "desc", + limit: int = 10, + offset: int = 0, + filters: Optional[Dict[str, Any]] = None, + output_fields: Optional[List[str]] = None, + ) -> SearchResult: + raise NotImplementedError("MockCollection.search_by_scalar is not supported") + + def update_index( + self, + index_name: str, + scalar_index: Optional[Dict[str, Any]] = None, + description: Optional[str] = None, + ): + raise NotImplementedError("MockCollection.update_index is not supported") + + def get_index_meta_data(self, index_name: str): + raise NotImplementedError("MockCollection.get_index_meta_data is not supported") + + def list_indexes(self): + raise NotImplementedError("MockCollection.list_indexes is not supported") + + def drop_index(self, index_name: str): + raise NotImplementedError("MockCollection.drop_index is not supported") + + def upsert_data(self, data_list: List[Dict[str, Any]], ttl=0): + raise NotImplementedError("MockCollection.upsert_data is not supported") + + def fetch_data(self, primary_keys: List[Any]): + raise NotImplementedError("MockCollection.fetch_data is not supported") + + def delete_data(self, primary_keys: List[Any]): + raise NotImplementedError("MockCollection.delete_data is not supported") + + def delete_all_data(self): + raise NotImplementedError("MockCollection.delete_all_data is not supported") + + def aggregate_data( + self, + index_name: str, + op: str = "count", + field: Optional[str] = None, + filters: Optional[Dict[str, Any]] = None, + cond: Optional[Dict[str, Any]] = None, + ) -> AggregateResult: + raise NotImplementedError("MockCollection.aggregate_data is not supported") \ No newline at end of file diff --git a/tests/storage/test_vectordb_adaptor.py b/tests/storage/test_vectordb_adaptor.py new file mode 100644 index 00000000..22146b41 --- /dev/null +++ b/tests/storage/test_vectordb_adaptor.py @@ -0,0 +1,89 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +import unittest +from unittest.mock import MagicMock, patch +import sys +import os + +# Add paths to sys.path to ensure modules can be found +# sys.path.insert(0, "/cloudide/workspace/viking_python_client") +sys.path.insert(0, "/cloudide/workspace/open_test") + +from openviking.storage.vectordb_adapters.factory import create_collection_adapter +from openviking_cli.utils.config import get_openviking_config, OpenVikingConfigSingleton +import json +import shutil +import tempfile + +class TestAdapterLoading(unittest.TestCase): + def setUp(self): + self.test_dir = tempfile.mkdtemp() + self.config_path = os.path.join(self.test_dir, "ov.conf") + + # Create a valid config file + config_data = { + "storage": { + "vectordb": { + "backend": "tests.storage.mock_backend.MockCollectionAdapter", + "name": "mock_test_collection", + "custom_params": { + "custom_param1": "val1", + "custom_param2": 123 + } + } + }, + "embedding": { + "dense": { + "provider": "openai", + "model": "text-embedding-3-small", + "api_key": "mock-key", + "dimension": 1536 + } + } + } + with open(self.config_path, "w") as f: + json.dump(config_data, f) + + def tearDown(self): + shutil.rmtree(self.test_dir) + # Reset singleton to avoid side effects on other tests + OpenVikingConfigSingleton.reset_instance() + + def test_dynamic_loading_mock_adapter(self): + """ + Test that create_collection_adapter can dynamically load MockCollectionAdapter + from tests.storage.mock_backend using the full class path string, + loaded from a real configuration file. + """ + # Load config from the temporary file + OpenVikingConfigSingleton.initialize(config_path=self.config_path) + + config = get_openviking_config().storage.vectordb + + # Verify that custom params are loaded + # Since we use custom_params dict + self.assertEqual(config.custom_params.get("custom_param1"), "val1") + self.assertEqual(config.custom_params.get("custom_param2"), 123) + + try: + adapter = create_collection_adapter(config) + + self.assertEqual(adapter.__class__.__name__, "MockCollectionAdapter") + self.assertEqual(adapter.mode, "mock") + self.assertEqual(adapter.collection_name, "mock_test_collection") + self.assertEqual(adapter.custom_param1, "val1") + self.assertEqual(adapter.custom_param2, 123) + + # Verify internal behavior + exists = adapter.collection_exists() + self.assertTrue(exists) + + print("Successfully loaded MockCollectionAdapter dynamically from config file.") + + except Exception as e: + import traceback + traceback.print_exc() + self.fail(f"Failed to load adapter dynamically: {e}") + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/storage/test_vectordb_collection_loading.py b/tests/storage/test_vectordb_collection_loading.py new file mode 100644 index 00000000..3c1770e1 --- /dev/null +++ b/tests/storage/test_vectordb_collection_loading.py @@ -0,0 +1,108 @@ +import unittest +import sys +import os + +# Add open_test path to ensure modules can be found +sys.path.insert(0, "/cloudide/workspace/open_test") + +from openviking.storage.vectordb.project.vikingdb_project import get_or_create_vikingdb_project, VikingDBProject +from openviking.storage.vectordb.collection.vikingdb_collection import VikingDBCollection + +class TestDynamicLoading(unittest.TestCase): + def test_default_loading(self): + # Test with default configuration + config = {"Host": "test_host"} + project = get_or_create_vikingdb_project(config=config) + self.assertEqual(project.CollectionClass, VikingDBCollection) + print("Default loading test passed") + + def test_explicit_loading(self): + # Test with explicit configuration pointing to MockJoiner + # MockJoiner is now in tests.storage.mock_backend + + # We assume tests package structure is available from /cloudide/workspace/open_test + + config = { + "Host": "test_host", + "Headers": {"Auth": "Token"}, + "CollectionClass": "tests.storage.mock_backend.MockCollection", + "CollectionArgs": { + "custom_param1": "custom_val", + "custom_param2": 123 + } + } + project = get_or_create_vikingdb_project(config=config) + + from tests.storage.mock_backend import MockCollection + self.assertEqual(project.CollectionClass, MockCollection) + self.assertEqual(project.host, "test_host") + self.assertEqual(project.headers, {"Auth": "Token"}) + self.assertEqual(project.collection_args, {"custom_param1": "custom_val", "custom_param2": 123}) + + # Test collection creation to verify params are passed + collection_name = "test_collection" + meta_data = { + "test_verification": True, + "Host": "metadata_host", + "Headers": {"Meta": "Header"} + } + + # The project wrapper will pass host, headers, meta_data, AND collection_args + kwargs = { + "host": project.host, + "headers": project.headers, + "meta_data": meta_data + } + kwargs.update(project.collection_args) + + collection_instance = project.CollectionClass(**kwargs) + + # Verify custom params are set correctly + self.assertEqual(collection_instance.custom_param1, "custom_val") + self.assertEqual(collection_instance.custom_param2, 123) + + # Verify host/headers are in kwargs (since init doesn't take them explicitly anymore) + self.assertEqual(collection_instance.kwargs.get("host"), "test_host") + self.assertEqual(collection_instance.kwargs.get("headers"), {"Auth": "Token"}) + + print("Explicit loading test passed (MockCollection with custom params)") + + def test_kwargs_loading(self): + # Test with CollectionArgs + config = { + "Host": "test_host", + "CollectionClass": "tests.storage.mock_backend.MockCollection", + "CollectionArgs": { + "custom_param1": "extra_value", + "custom_param2": 456 + } + } + project = get_or_create_vikingdb_project(config=config) + + self.assertEqual(project.collection_args, {"custom_param1": "extra_value", "custom_param2": 456}) + + # Manually verify instantiation with kwargs + kwargs = { + "host": project.host, + "headers": project.headers, + "meta_data": {"test_verification": True} + } + kwargs.update(project.collection_args) + + collection_instance = project.CollectionClass(**kwargs) + self.assertEqual(collection_instance.custom_param1, "extra_value") + self.assertEqual(collection_instance.custom_param2, 456) + print("Kwargs loading test passed") + + def test_invalid_loading(self): + # Test with invalid class path + config = { + "Host": "test_host", + "CollectionClass": "non.existent.module.Class" + } + with self.assertRaises(ImportError): + get_or_create_vikingdb_project(config=config) + print("Invalid loading test passed") + +if __name__ == '__main__': + unittest.main()