Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion openviking/storage/vectordb/collection/collection.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: Apache-2.0
import importlib
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Type

from openviking.storage.vectordb.collection.result import AggregateResult, SearchResult
from openviking.storage.vectordb.index.index import IIndex


def load_collection_class(class_path: str) -> Type["ICollection"]:
"""Load collection class from string path"""
try:
module_name, class_name = class_path.rsplit(".", 1)
module = importlib.import_module(module_name)
return getattr(module, class_name)
except (ImportError, AttributeError) as e:
raise ImportError(f"Could not load collection class {class_path}: {e}")


class ICollection(ABC):
def __init__(self):
pass
Expand Down
74 changes: 61 additions & 13 deletions openviking/storage/vectordb/project/vikingdb_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
# SPDX-License-Identifier: Apache-2.0
from typing import Any, Dict, List, Optional

from openviking.storage.vectordb.collection.collection import Collection
from openviking.storage.vectordb.collection.collection import (
Collection,
load_collection_class,
)
from openviking.storage.vectordb.collection.vikingdb_clients import (
VIKINGDB_APIS,
VikingDBClient,
)
from openviking.storage.vectordb.collection.vikingdb_collection import VikingDBCollection
from openviking_cli.utils.logger import default_logger as logger


Expand All @@ -22,6 +24,8 @@ def get_or_create_vikingdb_project(
config: Configuration dict with keys:
- Host: VikingDB service host
- Headers: Custom headers for authentication/context
- CollectionClass: Class path for collection implementation
- CollectionArgs: Optional dictionary of arguments to pass to collection constructor

Returns:
VikingDBProject instance
Expand All @@ -31,11 +35,23 @@ def get_or_create_vikingdb_project(

host = config.get("Host")
headers = config.get("Headers")
collection_class_path = config.get(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个 config 有更新 config 下的代码吗,好像没看到

"CollectionClass",
"openviking.storage.vectordb.collection.vikingdb_collection.VikingDBCollection",
)
# Extract any other arguments that might be needed for collection initialization
collection_args = config.get("CollectionArgs", {})

if not host:
raise ValueError("config must contain 'Host'")

return VikingDBProject(host=host, headers=headers, project_name=project_name)
return VikingDBProject(
host=host,
headers=headers,
project_name=project_name,
collection_class_path=collection_class_path,
collection_args=collection_args,
)


class VikingDBProject:
Expand All @@ -45,7 +61,12 @@ class VikingDBProject:
"""

def __init__(
self, host: str, headers: Optional[Dict[str, str]] = None, project_name: str = "default"
self,
host: str,
headers: Optional[Dict[str, str]] = None,
project_name: str = "default",
collection_class_path: str = "openviking.storage.vectordb.collection.vikingdb_collection.VikingDBCollection",
collection_args: Optional[Dict[str, Any]] = None,
):
"""
Initialize VikingDB project.
Expand All @@ -54,12 +75,19 @@ def __init__(
host: VikingDB service host
headers: Custom headers for requests
project_name: Project name
collection_class_path: Python path to the collection class
collection_args: Optional dictionary of arguments to pass to collection constructor
"""
self.host = host
self.headers = headers
self.project_name = project_name
self.collection_class_path = collection_class_path
self.CollectionClass = load_collection_class(self.collection_class_path)
self.collection_args = collection_args or {}

logger.info(f"Initialized VikingDB project: {project_name} with host {host}")
logger.info(
f"Initialized VikingDB project: {project_name} with host {host} and collection class {collection_class_path}"
)

def close(self):
"""Close project"""
Expand Down Expand Up @@ -87,9 +115,17 @@ def get_collection(self, collection_name: str) -> Optional[Collection]:
meta_data = result.get("Result", {})
if not meta_data:
return None
vikingdb_collection = VikingDBCollection(
host=self.host, headers=self.headers, meta_data=meta_data
)
# Prepare arguments for collection constructor
# Default arguments
kwargs = {
"host": self.host,
"headers": self.headers,
"meta_data": meta_data,
}
# Update with user-provided arguments (can override defaults if needed, though usually additive)
kwargs.update(self.collection_args)

vikingdb_collection = self.CollectionClass(**kwargs)
return Collection(vikingdb_collection)
except Exception:
return None
Expand Down Expand Up @@ -118,12 +154,24 @@ def list_collections(self) -> List[str]:
def get_collections(self) -> Dict[str, Collection]:
"""Get all collections from server"""
colls = self._get_collections()
return {
c["CollectionName"]: Collection(
VikingDBCollection(host=self.host, headers=self.headers, meta_data=c)
)
for c in colls

# Prepare base arguments
base_kwargs = {
"host": self.host,
"headers": self.headers,
}

collections = {}
for c in colls:
kwargs = base_kwargs.copy()
kwargs["meta_data"] = c
kwargs.update(self.collection_args)

collections[c["CollectionName"]] = Collection(
self.CollectionClass(**kwargs)
)

return collections

def create_collection(self, collection_name: str, meta_data: Dict[str, Any]) -> Collection:
"""collection should be pre-created"""
Expand Down
17 changes: 17 additions & 0 deletions openviking/storage/vectordb_adapters/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,23 @@

---

## Step 6:反射加载模型

对于没有提交到仓库,或者在第三方仓库的adapter,可以在`from_config`中动态加载。

例如:想要加载 tests.storage.mock_backend.MockCollectionAdapter
则需要在配置中加入
```
class MockConfig:
def __init__(self):
self.backend = "tests.storage.mock_backend.MockCollectionAdapter"
self.name = "mock_test_collection"
self.custom_param1 = "val1"
self.custom_param2 = 123
```

---

## 5. Filter 与查询兼容规则

- Adapter 需要兼容统一过滤表达。
Expand Down
17 changes: 16 additions & 1 deletion openviking/storage/vectordb_adapters/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,22 @@

def create_collection_adapter(config) -> CollectionAdapter:
"""Unified factory entrypoint for backend-specific collection adapters."""
adapter_cls = _ADAPTER_REGISTRY.get(config.backend)
backend = config.backend
adapter_cls = _ADAPTER_REGISTRY.get(backend)

# If not in registry, try to load dynamically as a class path
if adapter_cls is None and "." in backend:
try:
import importlib
module_name, class_name = backend.rsplit(".", 1)
module = importlib.import_module(module_name)
potential_cls = getattr(module, class_name)
if issubclass(potential_cls, CollectionAdapter):
adapter_cls = potential_cls
except (ImportError, AttributeError, TypeError):
# Fallback to raising error if dynamic loading fails
pass

if adapter_cls is None:
raise ValueError(
f"Vector backend {config.backend} is not supported. "
Expand Down
181 changes: 181 additions & 0 deletions tests/storage/mock_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
from typing import Any, Dict, List, Optional
from openviking.storage.vectordb.collection.collection import ICollection
from openviking.storage.vectordb.collection.result import AggregateResult, SearchResult
from openviking.storage.vectordb.index.index import IIndex

from openviking.storage.vectordb_adapters.base import CollectionAdapter
from openviking.storage.vectordb.collection.collection import Collection

class MockCollectionAdapter(CollectionAdapter):
"""
Mock adapter for testing dynamic loading.
Inherits from CollectionAdapter and wraps MockCollection.
"""
def __init__(self, collection_name: str, custom_param1: str = "", custom_param2: int = 0):
super().__init__(collection_name=collection_name)
self.mode = "mock"
self.custom_param1 = custom_param1
self.custom_param2 = custom_param2

@classmethod
def from_config(cls, config: Any) -> "MockCollectionAdapter":
return cls(
collection_name=config.name or "mock_collection",
custom_param1=getattr(config, "custom_param1", ""),
custom_param2=getattr(config, "custom_param2", 0)
)

def _load_existing_collection_if_needed(self) -> None:
if self._collection is None:
# Create a dummy collection wrapping MockCollection
self._collection = MockCollection(self.custom_param1, self.custom_param2)

def _create_backend_collection(self, meta: Dict[str, Any]) -> Collection:
return MockCollection(self.custom_param1, self.custom_param2)

class MockCollection(ICollection):
def __init__(self, custom_param1: str, custom_param2: int, meta_data: Optional[Dict[str, Any]] = None, **kwargs):
super().__init__()
self.meta_data = meta_data if meta_data is not None else {}

self.custom_param1 = custom_param1
self.custom_param2 = custom_param2

# Store extra kwargs (including host/headers if passed but not used explicitly)
self.kwargs = kwargs

# Verify that we can access values passed during initialization
if self.meta_data and "test_verification" in self.meta_data:
print(f"MockCollection initialized with custom_param1={self.custom_param1}, custom_param2={self.custom_param2}, kwargs={kwargs}")

def update(self, fields: Optional[Dict[str, Any]] = None, description: Optional[str] = None):
raise NotImplementedError("MockCollection.update is not supported")

def get_meta_data(self):
raise NotImplementedError("MockCollection.get_meta_data is not supported")

def close(self):
# No-op for mock
pass

def drop(self):
# No-op for mock
pass

def create_index(self, index_name: str, meta_data: Dict[str, Any]) -> IIndex:
raise NotImplementedError("MockCollection.create_index is not supported")

def has_index(self, index_name: str) -> bool:
raise NotImplementedError("MockCollection.has_index is not supported")

def get_index(self, index_name: str) -> Optional[IIndex]:
raise NotImplementedError("MockCollection.get_index is not supported")

def search_by_vector(
self,
index_name: str,
dense_vector: Optional[List[float]] = None,
limit: int = 10,
offset: int = 0,
filters: Optional[Dict[str, Any]] = None,
sparse_vector: Optional[Dict[str, float]] = None,
output_fields: Optional[List[str]] = None,
) -> SearchResult:
raise NotImplementedError("MockCollection.search_by_vector is not supported")

def search_by_keywords(
self,
index_name: str,
keywords: Optional[List[str]] = None,
query: Optional[str] = None,
limit: int = 10,
offset: int = 0,
filters: Optional[Dict[str, Any]] = None,
output_fields: Optional[List[str]] = None,
) -> SearchResult:
raise NotImplementedError("MockCollection.search_by_keywords is not supported")

def search_by_id(
self,
index_name: str,
id: Any,
limit: int = 10,
offset: int = 0,
filters: Optional[Dict[str, Any]] = None,
output_fields: Optional[List[str]] = None,
) -> SearchResult:
raise NotImplementedError("MockCollection.search_by_id is not supported")

def search_by_multimodal(
self,
index_name: str,
text: Optional[str],
image: Optional[Any],
video: Optional[Any],
limit: int = 10,
offset: int = 0,
filters: Optional[Dict[str, Any]] = None,
output_fields: Optional[List[str]] = None,
) -> SearchResult:
raise NotImplementedError("MockCollection.search_by_multimodal is not supported")

def search_by_random(
self,
index_name: str,
limit: int = 10,
offset: int = 0,
filters: Optional[Dict[str, Any]] = None,
output_fields: Optional[List[str]] = None,
) -> SearchResult:
raise NotImplementedError("MockCollection.search_by_random is not supported")

def search_by_scalar(
self,
index_name: str,
field: str,
order: Optional[str] = "desc",
limit: int = 10,
offset: int = 0,
filters: Optional[Dict[str, Any]] = None,
output_fields: Optional[List[str]] = None,
) -> SearchResult:
raise NotImplementedError("MockCollection.search_by_scalar is not supported")

def update_index(
self,
index_name: str,
scalar_index: Optional[Dict[str, Any]] = None,
description: Optional[str] = None,
):
raise NotImplementedError("MockCollection.update_index is not supported")

def get_index_meta_data(self, index_name: str):
raise NotImplementedError("MockCollection.get_index_meta_data is not supported")

def list_indexes(self):
raise NotImplementedError("MockCollection.list_indexes is not supported")

def drop_index(self, index_name: str):
raise NotImplementedError("MockCollection.drop_index is not supported")

def upsert_data(self, data_list: List[Dict[str, Any]], ttl=0):
raise NotImplementedError("MockCollection.upsert_data is not supported")

def fetch_data(self, primary_keys: List[Any]):
raise NotImplementedError("MockCollection.fetch_data is not supported")

def delete_data(self, primary_keys: List[Any]):
raise NotImplementedError("MockCollection.delete_data is not supported")

def delete_all_data(self):
raise NotImplementedError("MockCollection.delete_all_data is not supported")

def aggregate_data(
self,
index_name: str,
op: str = "count",
field: Optional[str] = None,
filters: Optional[Dict[str, Any]] = None,
cond: Optional[Dict[str, Any]] = None,
) -> AggregateResult:
raise NotImplementedError("MockCollection.aggregate_data is not supported")
Loading