diff --git a/.gitignore b/.gitignore index dc94e0b..a65cabe 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ dist/ build/ .venv *.log +.DS_Store diff --git a/README.md b/README.md index c2c0274..1f0660d 100644 --- a/README.md +++ b/README.md @@ -175,6 +175,143 @@ If you need to use different databases or do not want to use the information lis >>> xe1t_coll, xe1t_db, xe1t_user, xe1t_pw, xe1t_url = [ask someone] >>> xe1t_collection = pymongo_collection(xe1t_coll, database=xe1t_coll, user=xe1t_user, password=xe1t_pw, url=xe1t_url) +## Offline SQLite Backend + +For scenarios where network connectivity to MongoDB is unavailable (e.g., working on compute nodes without internet access, traveling, or during database outages), `utilix` provides an **offline SQLite backend** that allows you to continue working with local copies of the database and GridFS files. + +### Overview + +The offline backend consists of two main components: +1. **OfflineSQLiteCollection**: A pymongo-compatible wrapper around SQLite databases containing BSON-compressed documents +2. **OfflineGridFS**: A GridFS-compatible interface for accessing cached files + +When properly configured, utilix will automatically use the SQLite backend when both database files are available, and seamlessly fall back to MongoDB when they're not. + +### Setup + +#### 1. Generate SQLite Database Files + +First, you need to create SQLite dumps of the MongoDB collections you need. The `mongo_to_sqlite.py` script (included in utilix) handles this: + +```bash +# Create a spec file listing what to dump +cat > dump_spec.txt << EOF +xenonnt:runs +files:GRIDFS +xedocs:ALL +corrections:ALL +EOF + +# Run the dump (requires MongoDB access) +python -m utilix.mongo_to_sqlite \ + --spec dump_spec.txt \ + --rundb-out /path/to/rundb.sqlite \ + --xedocs-out /path/to/xedocs.sqlite +``` + +This will create two SQLite files: +- `rundb.sqlite`: Contains runs collection, GridFS file index, and file blobs +- `xedocs.sqlite`: Contains corrections and other xedocs collections + +**Note**: The dump process can take significant time depending on data size. Plan accordingly. + +#### 2. Configure Environment Variables + +Set the following environment variables to enable offline mode: + +```bash +export RUNDB_SQLITE_PATH="/path/to/rundb.sqlite" +export XEDOCS_SQLITE_PATH="/path/to/xedocs.sqlite" + +# Optional: Set compression algorithm (default: zstd) +export OFFLINE_COMP="zstd" # or "zlib" + +# Optional: Enable debug logging +export OFFLINE_DEBUG="1" +``` + +**Important**: Both SQLite files must exist and be accessible for offline mode to activate. If either is missing, utilix will fall back to MongoDB automatically. + +#### 3. Use Normally + +Once configured, your existing code works without modification: + +```python +from utilix import xent_collection + +# Automatically uses SQLite if files are present, MongoDB otherwise +runs = xent_collection("runs") +doc = runs.find_one({"number": 12345}) + +# GridFS downloads also work offline +from utilix.mongo_storage import MongoDownloader +downloader = MongoDownloader() +path = downloader.download_single("my_config") +``` + +### Features and Limitations + +#### Supported Operations +- ✅ `find_one()` with `_id`, `number`, or no filter +- ✅ `find()` with basic filters +- ✅ `count_documents()` +- ✅ GridFS file listing and downloads +- ✅ Automatic MD5-based file caching +- ✅ BSON compression (zstd or zlib) + +#### Limitations +- ⚠️ Complex queries (aggregations, regex, etc.) may not work +- ⚠️ Cursor operations like `sort()` without `limit()` will raise errors to prevent loading entire collections +- ⚠️ Write operations are not supported (read-only) +- ⚠️ The offline database is a snapshot; it won't reflect new data added to MongoDB + +### Performance Considerations + +- SQLite databases are compressed with zstd (or zlib as fallback), typically achieving 5-10x compression +- First-time file access requires decompression; subsequent accesses benefit from OS caching +- For large result sets, queries may be slower than MongoDB due to BSON decompression overhead +- GridFS files are cached by MD5 hash to avoid re-downloading + +### Updating Your Offline Database + +The SQLite files are static snapshots. To refresh them with new data: + +```bash +# Re-run the dump script +python -m utilix.mongo_to_sqlite \ + --spec dump_spec.txt \ + --rundb-out /path/to/rundb.sqlite \ + --xedocs-out /path/to/xedocs.sqlite \ + --overwrite # Add this flag to replace existing files +``` + +### Troubleshooting + +**Problem**: `AttributeError: 'SQLiteConfig' object has no attribute 'sqlite_path'` +- **Solution**: Update to the latest version of utilix. This was a bug in early versions. + +**Problem**: Offline mode not activating +- **Check**: Both environment variables are set: `echo $RUNDB_SQLITE_PATH $XEDOCS_SQLITE_PATH` +- **Check**: Both files exist: `ls -lh $RUNDB_SQLITE_PATH $XEDOCS_SQLITE_PATH` +- **Check**: Set `OFFLINE_DEBUG=1` to see debug messages + +**Problem**: `KeyError: Config 'xyz' not found in offline gridfs_files index` +- **Solution**: The file wasn't included in the dump. Re-dump with the file added to your spec. + +**Problem**: Queries return different results than MongoDB +- **Note**: This shouldn't happen for supported queries. Please report as a bug with example code. + +### Environment Variable Reference + +| Variable | Required | Default | Description | +|----------|----------|---------|-------------| +| `RUNDB_SQLITE_PATH` | Yes | - | Path to RunDB SQLite file | +| `XEDOCS_SQLITE_PATH` | Yes | - | Path to xedocs SQLite file | +| `OFFLINE_COMP` | No | `zstd` | Compression algorithm (`zstd` or `zlib`) | +| `OFFLINE_DEBUG` | No | `0` | Enable debug logging (`1` or `0`) | +| `OFFLINE_HARD` | No | `0` | Raise errors instead of warnings on unsupported ops | +| `PYMONGO_SPY` | No | `0` | Log when pymongo.MongoClient is created (for debugging) | + ## Data processing requests You may find yourself missing some data which requires a large amount of resources to process. In these cases, you can submit a processing request to the computing team. diff --git a/tests/test_offline_sqlite.py b/tests/test_offline_sqlite.py new file mode 100644 index 0000000..f4f568b --- /dev/null +++ b/tests/test_offline_sqlite.py @@ -0,0 +1,375 @@ +"""Tests for SQLite offline backend functionality.""" + +import os +import sqlite3 +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch, MagicMock + +from bson import BSON + + +class TestSQLiteConfig(unittest.TestCase): + """Test SQLiteConfig dataclass and configuration loading.""" + + def test_load_config_from_env(self): + """Test loading SQLite config from environment variables.""" + from utilix.sqlite_backend import _load_sqlite_config + + with tempfile.TemporaryDirectory() as tmpdir: + rundb_path = Path(tmpdir) / "rundb.sqlite" + xedocs_path = Path(tmpdir) / "xedocs.sqlite" + + # Create empty files + rundb_path.touch() + xedocs_path.touch() + + with patch.dict( + os.environ, + { + "RUNDB_SQLITE_PATH": str(rundb_path), + "XEDOCS_SQLITE_PATH": str(xedocs_path), + }, + ): + cfg = _load_sqlite_config() + + self.assertIsNotNone(cfg.rundb_sqlite_path) + self.assertIsNotNone(cfg.xedocs_sqlite_path) + # Use resolve() to handle symlinks (e.g., /var -> /private/var) + self.assertEqual(cfg.rundb_sqlite_path.resolve(), rundb_path.resolve()) + self.assertEqual(cfg.xedocs_sqlite_path.resolve(), xedocs_path.resolve()) + self.assertTrue(cfg.rundb_active()) + self.assertTrue(cfg.xedocs_active()) + self.assertTrue(cfg.sqlite_active()) + + def test_sqlite_active_requires_both_files(self): + """Test that sqlite_active() requires both files to exist.""" + from utilix.sqlite_backend import _load_sqlite_config + + with tempfile.TemporaryDirectory() as tmpdir: + rundb_path = Path(tmpdir) / "rundb.sqlite" + xedocs_path = Path(tmpdir) / "xedocs.sqlite" + + # Only create rundb file + rundb_path.touch() + + with patch.dict( + os.environ, + { + "RUNDB_SQLITE_PATH": str(rundb_path), + "XEDOCS_SQLITE_PATH": str(xedocs_path), + }, + ): + cfg = _load_sqlite_config() + + self.assertTrue(cfg.rundb_active()) + self.assertFalse(cfg.xedocs_active()) + self.assertFalse(cfg.sqlite_active()) # Requires BOTH + + def test_sqlite_active_false_when_no_env_vars(self): + """Test that sqlite_active() is False without environment variables.""" + from utilix.sqlite_backend import _load_sqlite_config + + with patch.dict(os.environ, {}, clear=True): + # Remove RUNDB_SQLITE_PATH and XEDOCS_SQLITE_PATH if present + os.environ.pop("RUNDB_SQLITE_PATH", None) + os.environ.pop("XEDOCS_SQLITE_PATH", None) + + cfg = _load_sqlite_config() + + self.assertFalse(cfg.rundb_active()) + self.assertFalse(cfg.xedocs_active()) + self.assertFalse(cfg.sqlite_active()) + + +class TestOfflineGridFS(unittest.TestCase): + """Test OfflineGridFS for file operations.""" + + def setUp(self): + """Create temporary directory and mock SQLite database.""" + self.tmpdir = tempfile.TemporaryDirectory() + self.tmppath = Path(self.tmpdir.name) + + # Create mock SQLite database with gridfs_files table + self.db_path = self.tmppath / "rundb.sqlite" + self.blob_path = self.tmppath / "test_blob.txt" + + # Write test blob + self.blob_path.write_text("test content") + + # Create database with gridfs_files table + conn = sqlite3.connect(str(self.db_path)) + conn.execute( + """CREATE TABLE gridfs_files ( db_name TEXT, file_id TEXT, config_name TEXT, md5 TEXT, + length INTEGER, uploadDate INTEGER, + + blob_path TEXT ) + + """ + ) + conn.execute( + "INSERT INTO gridfs_files " + "(db_name, file_id, config_name, md5, length, uploadDate, blob_path) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + ( + "files", + "test_id", + "test_config", + "abc123", + 12, + 1234567890, + "test_blob.txt", + ), + ) + conn.commit() + conn.close() + + def tearDown(self): + """Clean up temporary directory.""" + self.tmpdir.cleanup() + + def test_offline_gridfs_list_files(self): + """Test listing files from offline GridFS.""" + from utilix.sqlite_backend import OfflineGridFS + + gfs = OfflineGridFS( + sqlite_path=self.db_path, + offline_root=self.tmppath, + cache_dirs=(self.tmppath / "cache",), + ) + + files = gfs.list_files() + self.assertIn("test_config", files) + gfs.close() + + def test_offline_gridfs_download_single(self): + """Test downloading a single file from offline GridFS.""" + from utilix.sqlite_backend import OfflineGridFS + + cache_dir = self.tmppath / "cache" + gfs = OfflineGridFS( + sqlite_path=self.db_path, + offline_root=self.tmppath, + cache_dirs=(cache_dir,), + ) + + # Download file + result_path = gfs.download_single("test_config") + + # Should be cached by md5 + self.assertTrue(Path(result_path).exists()) + self.assertIn("abc123", result_path) # md5 in filename + + gfs.close() + + def test_offline_gridfs_missing_config_raises(self): + """Test that missing config raises KeyError.""" + from utilix.sqlite_backend import OfflineGridFS + + gfs = OfflineGridFS( + sqlite_path=self.db_path, + offline_root=self.tmppath, + cache_dirs=(self.tmppath / "cache",), + ) + + with self.assertRaises(KeyError): + gfs.download_single("nonexistent_config") + + gfs.close() + + +class TestOfflineSQLiteCollection(unittest.TestCase): + """Test OfflineSQLiteCollection for database queries.""" + + def setUp(self): + """Create temporary SQLite database with test data.""" + self.tmpdir = tempfile.TemporaryDirectory() + self.db_path = Path(self.tmpdir.name) / "rundb.sqlite" + + # Create database with kv_collections and runs_index tables + conn = sqlite3.connect(str(self.db_path)) + conn.execute( + """CREATE TABLE kv_collections ( db_name TEXT, coll_name TEXT, doc_id TEXT, + + doc_bson_z BLOB ) + + """ + ) + conn.execute( + """CREATE TABLE runs_index ( db_name TEXT, number INTEGER, + + doc_id TEXT ) + + """ + ) + + # Insert test document + import zlib + + test_doc = {"_id": "test_id_123", "number": 12345, "name": "test_run"} + bson_data = BSON.encode(test_doc) + compressed = zlib.compress(bson_data, level=6) + + conn.execute( + "INSERT INTO kv_collections " + "(db_name, coll_name, doc_id, doc_bson_z) VALUES (?, ?, ?, ?)", + ("xenonnt", "runs", "test_id_123", compressed), + ) + + conn.execute( + "INSERT INTO runs_index (db_name, number, doc_id) VALUES (?, ?, ?)", + ("xenonnt", 12345, "test_id_123"), + ) + + conn.commit() + conn.close() + + def tearDown(self): + """Clean up temporary directory.""" + self.tmpdir.cleanup() + + def test_find_one_by_id(self): + """Test find_one with _id filter.""" + from utilix.sqlite_backend import OfflineSQLiteCollection + + coll = OfflineSQLiteCollection( + sqlite_path=self.db_path, + db_name="xenonnt", + coll_name="runs", + compression="zlib", + ) + + doc = coll.find_one({"_id": "test_id_123"}) + self.assertIsNotNone(doc) + self.assertEqual(doc["_id"], "test_id_123") + self.assertEqual(doc["number"], 12345) + + coll.close() + + def test_find_one_by_number(self): + """Test find_one with number filter for runs collection.""" + from utilix.sqlite_backend import OfflineSQLiteCollection + + coll = OfflineSQLiteCollection( + sqlite_path=self.db_path, + db_name="xenonnt", + coll_name="runs", + compression="zlib", + ) + + doc = coll.find_one({"number": 12345}) + self.assertIsNotNone(doc) + self.assertEqual(doc["number"], 12345) + self.assertEqual(doc["_id"], "test_id_123") + + coll.close() + + def test_find_one_default_returns_first_doc(self): + """Test find_one without filter returns first document.""" + from utilix.sqlite_backend import OfflineSQLiteCollection + + coll = OfflineSQLiteCollection( + sqlite_path=self.db_path, + db_name="xenonnt", + coll_name="runs", + compression="zlib", + ) + + doc = coll.find_one() + self.assertIsNotNone(doc) + self.assertEqual(doc["_id"], "test_id_123") + + coll.close() + + def test_count_documents(self): + """Test count_documents method.""" + from utilix.sqlite_backend import OfflineSQLiteCollection + + coll = OfflineSQLiteCollection( + sqlite_path=self.db_path, + db_name="xenonnt", + coll_name="runs", + compression="zlib", + ) + + count = coll.count_documents({}) + self.assertEqual(count, 1) + + count = coll.count_documents({"number": 12345}) + self.assertEqual(count, 1) + + count = coll.count_documents({"number": 99999}) + self.assertEqual(count, 0) + + coll.close() + + def test_find_returns_cursor(self): + """Test find method returns iterable cursor.""" + from utilix.sqlite_backend import OfflineSQLiteCollection + + coll = OfflineSQLiteCollection( + sqlite_path=self.db_path, + db_name="xenonnt", + coll_name="runs", + compression="zlib", + ) + + cursor = coll.find({"number": 12345}) + docs = list(cursor) + + self.assertEqual(len(docs), 1) + self.assertEqual(docs[0]["number"], 12345) + + coll.close() + + +class TestXentCollectionOffline(unittest.TestCase): + """Test xent_collection() function with offline mode.""" + + def test_xent_collection_uses_sqlite_when_active(self): + """Test that xent_collection uses SQLite when offline is active.""" + from utilix.sqlite_backend import OfflineSQLiteCollection + from utilix.rundb import xent_collection + + with tempfile.TemporaryDirectory() as tmpdir: + rundb_path = Path(tmpdir) / "rundb.sqlite" + xedocs_path = Path(tmpdir) / "xedocs.sqlite" + rundb_path.touch() + xedocs_path.touch() + + with patch.dict( + os.environ, + { + "RUNDB_SQLITE_PATH": str(rundb_path), + "XEDOCS_SQLITE_PATH": str(xedocs_path), + }, + ): + with patch("utilix.rundb.uconfig") as mock_config: + mock_config.get.return_value = "xenonnt" + + coll = xent_collection("runs") + + # Should return OfflineSQLiteCollection when offline is active + self.assertIsInstance(coll, OfflineSQLiteCollection) + coll.close() + + def test_xent_collection_uses_mongodb_when_offline_inactive(self): + """Test that xent_collection uses MongoDB when offline is not active.""" + from utilix.rundb import xent_collection + + with patch.dict(os.environ, {}, clear=True): + os.environ.pop("RUNDB_SQLITE_PATH", None) + os.environ.pop("XEDOCS_SQLITE_PATH", None) + + with patch("utilix.rundb._collection") as mock_collection: + mock_collection.return_value = MagicMock() + + _result = xent_collection("runs") # noqa: F841 + + # Should call _collection (MongoDB) when offline is not active + mock_collection.assert_called_once() + + +if __name__ == "__main__": + unittest.main() diff --git a/utilix/__init__.py b/utilix/__init__.py index 8f4587b..ac82e81 100644 --- a/utilix/__init__.py +++ b/utilix/__init__.py @@ -15,3 +15,4 @@ from .shell import Shell from .rundb import DB, xent_collection, xe1t_collection from . import mongo_storage +from . import sqlite_backend diff --git a/utilix/mongo_storage.py b/utilix/mongo_storage.py index 8d0de71..f8301c0 100644 --- a/utilix/mongo_storage.py +++ b/utilix/mongo_storage.py @@ -12,6 +12,7 @@ from utilix.rundb import DB, xent_collection from utilix.utils import to_str_tuple from utilix import uconfig, logger +from utilix.sqlite_backend import OfflineGridFS, _load_sqlite_config class GridFsBase: @@ -305,22 +306,69 @@ def __init__(self, *args, **kwargs): return def initialize(self, store_files_at=None, *args, **kwargs): - super().__init__(*args, **kwargs) - - # We are going to set a place where to store the files. It's - # either specified by the user or we use these defaults: + # parse cache dirs (same as you already do) if store_files_at is None: - store_files_at = ( - "./resource_cache", - "/tmp/straxen_resource_cache", - ) - elif not isinstance(store_files_at, (tuple, str, list)): - raise ValueError(f"{store_files_at} should be tuple of paths!") + store_files_at = ("./resource_cache", "/tmp/straxen_resource_cache") elif isinstance(store_files_at, str): store_files_at = to_str_tuple(store_files_at) + elif isinstance(store_files_at, list): + store_files_at = tuple(store_files_at) + elif not isinstance(store_files_at, (tuple, list)): + raise ValueError(f"{store_files_at} should be tuple/list/str of paths!") self.storage_options = store_files_at + # offline? + try: + sqlite_cfg = _load_sqlite_config() + sqlite_active = sqlite_cfg.sqlite_active() + except Exception: + sqlite_cfg = None + sqlite_active = False + + if sqlite_active: + assert sqlite_cfg is not None # for mypy + assert sqlite_cfg.rundb_sqlite_path is not None # for mypy + assert sqlite_cfg.offline_root is not None # for mypy + self._offline = OfflineGridFS( + sqlite_path=sqlite_cfg.rundb_sqlite_path, + offline_root=sqlite_cfg.offline_root, + cache_dirs=tuple(self.storage_options), + gridfs_db_name="files", + ) + # IMPORTANT: do NOT call super().__init__() + return + + # online fallback + super().__init__(*args, **kwargs) + + # ------------------------- + # OFFLINE-safe overrides + # ------------------------- + + def list_files(self) -> List[str]: + if hasattr(self, "_offline"): + return self._offline.list_files() + return super().list_files() + + def config_exists(self, config: str) -> bool: + if hasattr(self, "_offline"): + return self._offline.latest_by_config_name(config) is not None + return super().config_exists(config) + + def md5_stored(self, abs_path: str) -> bool: + # offline mode doesn't have a DB md5 index for arbitrary paths; just behave conservatively + if hasattr(self, "_offline"): + return False + return super().md5_stored(abs_path) + + def test_find(self) -> None: + if hasattr(self, "_offline"): + # simple sanity: must be able to list at least 1 file + _ = self._offline.list_files() + return + return super().test_find() + def download_single(self, config_name: str, human_readable_file_name=False): """Download the config_name if it exists. @@ -331,6 +379,13 @@ def download_single(self, config_name: str, human_readable_file_name=False): :return: str, the absolute path of the file requested """ + # Offline path (sqlite-backed GridFS index) + if hasattr(self, "_offline"): + return self._offline.download_single( + config_name, + human_readable_file_name=human_readable_file_name, + ) + if self.config_exists(config_name): # Query by name query = self.get_query_config(config_name) @@ -513,6 +568,25 @@ def initialize( self.storage_options: Tuple[str, ...] = store_files_at + # Offline sqlite backend support (reuse utilix.sqlite_backend.OfflineGridFS) + try: + sqlite_cfg = _load_sqlite_config() + sqlite_active = sqlite_cfg.sqlite_active() + except Exception: + sqlite_cfg = None + sqlite_active = False + + if sqlite_active: + assert sqlite_cfg is not None # for mypy + assert sqlite_cfg.rundb_sqlite_path is not None # for mypy + assert sqlite_cfg.offline_root is not None # for mypy + self._offline = OfflineGridFS( + sqlite_path=sqlite_cfg.rundb_sqlite_path, + offline_root=sqlite_cfg.offline_root, + cache_dirs=tuple(self.storage_options), + gridfs_db_name="files", + ) + def download_single( self, config_name: str, @@ -520,6 +594,14 @@ def download_single( human_readable_file_name: bool = False, ) -> str: """Download the config_name if it exists.""" + # Offline path (sqlite-backed GridFS index) + if hasattr(self, "_offline"): + return self._offline.download_single( + config_name, + human_readable_file_name=human_readable_file_name, + write_to=write_to, + ) + target_file_name = ( config_name if human_readable_file_name else self.db.get_file_md5(config_name) ) diff --git a/utilix/mongo_to_sqlite.py b/utilix/mongo_to_sqlite.py new file mode 100644 index 0000000..6aa8b1e --- /dev/null +++ b/utilix/mongo_to_sqlite.py @@ -0,0 +1,1045 @@ +#!/usr/bin/env python3 +"""Dump selected MongoDB collections + GridFS into local SQLite(s). + +NEW: +- xedocs:* is dumped into a separate SQLite file (xedocs.sqlite) with + one table per xedocs collection and useful indexes. +- everything else stays as before (rundb.sqlite with kv_collections + runs_index + gridfs_files). + +Spec file examples: + xenonnt:runs + files:GRIDFS + xedocs:ALL + corrections:ALL + +""" + +import argparse +import json +import logging +import sqlite3 +import sys +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, Iterable, List, Optional, Tuple, Any + +import pymongo +from bson import BSON +from bson.objectid import ObjectId + +# ------------------------- +# Compression helpers +# ------------------------- + + +def _compressor(): + try: + import zstandard as zstd # type: ignore + + cctx = zstd.ZstdCompressor(level=10) + dctx = zstd.ZstdDecompressor() + + def compress(b: bytes) -> bytes: + return cctx.compress(b) + + def decompress(b: bytes) -> bytes: + return dctx.decompress(b) + + return "zstd", compress, decompress + except Exception: + import zlib + + def compress(b: bytes) -> bytes: + return zlib.compress(b, level=6) + + def decompress(b: bytes) -> bytes: + return zlib.decompress(b) + + return "zlib", compress, decompress + + +COMP_ALGO, compress_bytes, _ = _compressor() + + +# ------------------------- +# Spec parsing +# ------------------------- + + +@dataclass(frozen=True) +class SpecItem: + db: str + what: str # collection name, "ALL", or "GRIDFS" + + +def parse_spec_lines(lines: Iterable[str]) -> List[SpecItem]: + out: List[SpecItem] = [] + for raw in lines: + s = raw.strip() + if not s or s.startswith("#"): + continue + if ":" not in s: + raise ValueError(f"Bad spec line (expected db:thing): {s}") + db, what = s.split(":", 1) + db, what = db.strip(), what.strip() + if not db or not what: + raise ValueError(f"Bad spec line (empty db/thing): {s}") + out.append(SpecItem(db=db, what=what)) + return out + + +# ------------------------- +# Mongo connection (utilix-friendly) +# ------------------------- + + +def get_utilix_mongo_uri(experiment: str) -> str: + """Mirrors utilix._collection style: + + mongodb://{user}:{password}@{url} + + """ + from utilix import uconfig # type: ignore + + if experiment not in ("xent", "xe1t"): + raise ValueError("experiment must be 'xent' or 'xe1t'") + + url = uconfig.get("RunDB", f"{experiment}_url") + user = uconfig.get("RunDB", f"{experiment}_user") + password = uconfig.get("RunDB", f"{experiment}_password") + + force_single_server = uconfig.get("RunDB", "force_single_server", fallback=True) + if force_single_server: + url = url.split(",")[-1] + + return f"mongodb://{user}:{password}@{url}" + + +def get_mongo_client(experiment: str, uri_override: Optional[str] = None) -> pymongo.MongoClient: + uri = uri_override or get_utilix_mongo_uri(experiment) + + kwargs: Dict[str, object] = { + "serverSelectionTimeoutMS": 30_000, + "connectTimeoutMS": 30_000, + "socketTimeoutMS": 60_000, + "retryWrites": False, + "readPreference": "secondaryPreferred", + } + if int(pymongo.__version__.split(".")[0]) >= 4: + kwargs["directConnection"] = True + + return pymongo.MongoClient(uri, **kwargs) + + +# ------------------------- +# SQLite schema (rundb.sqlite) +# ------------------------- + +SCHEMA_SQL_RUNDB = """ +PRAGMA journal_mode = WAL; +PRAGMA synchronous = NORMAL; +PRAGMA temp_store = MEMORY; + +CREATE TABLE IF NOT EXISTS kv_collections ( + db_name TEXT NOT NULL, + coll_name TEXT NOT NULL, + doc_id TEXT NOT NULL, + doc_bson_z BLOB NOT NULL, + PRIMARY KEY (db_name, coll_name, doc_id) +); + +CREATE TABLE IF NOT EXISTS runs_index ( + db_name TEXT NOT NULL, + doc_id TEXT NOT NULL, + number INTEGER, + name TEXT, + start INTEGER, + end INTEGER, + tags_json TEXT, + PRIMARY KEY (db_name, doc_id) +); + +CREATE INDEX IF NOT EXISTS idx_runs_number ON runs_index(db_name, number); +CREATE INDEX IF NOT EXISTS idx_runs_name ON runs_index(db_name, name); +CREATE INDEX IF NOT EXISTS idx_runs_start ON runs_index(db_name, start); + +CREATE TABLE IF NOT EXISTS gridfs_files ( + db_name TEXT NOT NULL, + file_id TEXT NOT NULL, + filename TEXT, + config_name TEXT, + length INTEGER, + chunkSize INTEGER, + uploadDate INTEGER, + md5 TEXT, + metadata_json TEXT, + logical_name TEXT, + blob_path TEXT NOT NULL, + PRIMARY KEY (db_name, file_id) +); + +CREATE INDEX IF NOT EXISTS idx_gridfs_filename ON gridfs_files(db_name, filename); +CREATE INDEX IF NOT EXISTS idx_gridfs_configname ON gridfs_files(db_name, config_name); +""" + + +# ------------------------- +# SQLite schema (xedocs.sqlite) +# ------------------------- + + +def _schema_sql_xedocs_table(table: str, extra_label_cols: List[str]) -> str: + """Create one table per xedocs collection. + + We keep a stable set of "core" columns (id/version/time/value/full doc), and + *also* create additional TEXT columns for any label fields we discover from + sampling documents in that collection. + + Note: extra label columns are quoted to tolerate odd names. + + """ + + def q(name: str) -> str: + return '"' + name.replace('"', '""') + '"' + + # Core columns + cols = [ + f"{q('_id')} TEXT PRIMARY KEY", + f"{q('version')} TEXT", + f"{q('time_ns')} INTEGER", + f"{q('time_left_ns')} INTEGER", + f"{q('time_right_ns')} INTEGER", + f"{q('created_date_ns')} INTEGER", + f"{q('value_num')} REAL", + f"{q('value_json')} TEXT", + ] + + # Discovered label columns (TEXT) + for c in extra_label_cols: + if c in { + "_id", + "version", + "time_ns", + "time_left_ns", + "time_right_ns", + "created_date_ns", + "value_num", + "value_json", + "doc_bson_z", + }: + continue + cols.append(f"{q(c)} TEXT") + + # Full original BSON (compressed) + cols.append(f"{q('doc_bson_z')} BLOB NOT NULL") + + # Always-create indexes: + # - time sampled lookup: version + time + # - time interval lookup: version + interval + # - common labels (if present) + index_sql = [ + f"CREATE INDEX IF NOT EXISTS " + f"{q('idx_' + table + '_version_time')} " + f"ON {q(table)}({q('version')}, {q('time_ns')});", + f"CREATE INDEX IF NOT EXISTS " + f"{q('idx_' + table + '_version_interval')} " + f"ON {q(table)}({q('version')}, {q('time_left_ns')}, {q('time_right_ns')});", + ] + + # Optional label indexes (keep this small to avoid DB bloat) + preferred = [ + "algorithm", + "config_name", + "detector", + "source", + "pmt", + "gain_model", + ] + + present = set(extra_label_cols) + n_extra = 0 + for lab in preferred: + if lab in present: + index_sql.append( + f"CREATE INDEX IF NOT EXISTS \ + {q('idx_' + table + '_version_' + lab)} \ + ON {q(table)}({q('version')}, {q(lab)});" + ) + n_extra += 1 + if n_extra >= 6: + break + + cols_sql = ",\n ".join(cols) + idx_sql = "\n\n".join(index_sql) + + return f""" +CREATE TABLE IF NOT EXISTS {q(table)} ( + {cols_sql} +); + +{idx_sql} +""" + + +# ------------------------- +# Utilities +# ------------------------- + + +def ensure_dir(p: Path) -> None: + p.mkdir(parents=True, exist_ok=True) + + +def now_s() -> float: + return time.time() + + +def oid_to_str(x) -> str: + if isinstance(x, ObjectId): + return str(x) + return str(x) + + +def to_unix_seconds(dtobj) -> Optional[int]: + try: + if dtobj is None: + return None + return int(dtobj.timestamp()) + except Exception: + return None + + +def to_utc_ns(dtobj) -> Optional[int]: + try: + if dtobj is None: + return None + # bson datetime is usually naive but UTC + # treat naive as UTC + if getattr(dtobj, "tzinfo", None) is None: + import datetime as dt + + dtobj = dtobj.replace(tzinfo=dt.timezone.utc) + return int(dtobj.timestamp() * 1_000_000_000) + except Exception: + return None + + +def bson_pack(doc: dict) -> bytes: + return BSON.encode(doc) + + +def pack_and_compress(doc: dict) -> bytes: + return compress_bytes(bson_pack(doc)) + + +def list_collection_names_safe(db: pymongo.database.Database) -> List[str]: + try: + return db.list_collection_names() + except pymongo.errors.OperationFailure as e: + raise RuntimeError( + f"Not authorized to list collections in DB '{db.name}'. " + f"Use explicit spec lines (db:collection) instead of db:ALL. " + f"Mongo error: {e}" + ) from e + + +# ------------------------- +# Dump logic (generic -> rundb.sqlite kv_collections) +# ------------------------- + + +def dump_generic_collection( + mongo_db: pymongo.database.Database, + coll_name: str, + sql: sqlite3.Connection, + out_db_name: str, + batch_size: int, + logger: logging.Logger, + query: Optional[dict] = None, + projection: Optional[dict] = None, +) -> int: + query = query or {} + coll = mongo_db[coll_name] + + logger.info(f"[mongo] dumping {mongo_db.name}.{coll_name} -> rundb.sqlite kv_collections") + t0 = now_s() + + cur = coll.find(query, projection=projection, no_cursor_timeout=True, batch_size=batch_size) + n = 0 + buf: List[Tuple[str, str, str, bytes]] = [] + + insert_sql = "INSERT OR REPLACE INTO kv_collections(db_name, coll_name, doc_id, doc_bson_z)" + insert_sql += " VALUES (?,?,?,?)" + + for doc in cur: + _id = doc.get("_id") + doc_id = oid_to_str(_id) if _id is not None else f"noid:{n}" + blob = pack_and_compress(doc) + buf.append((out_db_name, coll_name, doc_id, blob)) + n += 1 + + if len(buf) >= batch_size: + sql.executemany(insert_sql, buf) + sql.commit() + buf.clear() + + if buf: + sql.executemany(insert_sql, buf) + sql.commit() + + dt = now_s() - t0 + logger.info(f"[mongo] done {mongo_db.name}.{coll_name}: {n} docs in {dt:.1f}s") + return n + + +def dump_xenonnt_runs_index( + mongo_db: pymongo.database.Database, + runs_coll_name: str, + sql: sqlite3.Connection, + out_db_name: str, + batch_size: int, + logger: logging.Logger, + drop_fields: Optional[List[str]] = None, +) -> int: + drop_fields = drop_fields or [] + coll = mongo_db[runs_coll_name] + + logger.info(f"[mongo] dumping runs {mongo_db.name}.{runs_coll_name} with index + compression") + t0 = now_s() + + cur = coll.find({}, no_cursor_timeout=True, batch_size=batch_size) + n = 0 + buf_kv: List[Tuple[str, str, str, bytes]] = [] + buf_idx: List[ + Tuple[str, str, Optional[int], Optional[str], Optional[int], Optional[int], Optional[str]] + ] = [] + + ins_kv = "INSERT OR REPLACE INTO kv_collections(db_name, coll_name, doc_id, doc_bson_z)" + ins_kv += " VALUES (?,?,?,?)" + + ins_idx = """ + INSERT OR REPLACE INTO runs_index(db_name, doc_id, number, name, start, end, tags_json) + VALUES (?,?,?,?,?,?,?) + """ + + for doc in cur: + _id = doc.get("_id") + doc_id = oid_to_str(_id) if _id is not None else f"noid:{n}" + + number = doc.get("number") or doc.get("run_number") or doc.get("runNumber") + try: + number_i = int(number) if number is not None else None + except Exception: + number_i = None + + name = doc.get("name") or doc.get("run_name") or doc.get("runName") + + start = ( + doc.get("start") + or doc.get("start_time") + or doc.get("startTime") + or doc.get("starttime") + ) + end = doc.get("end") or doc.get("end_time") or doc.get("endTime") or doc.get("endtime") + + start_u = to_unix_seconds(start) + end_u = to_unix_seconds(end) + + tags = doc.get("tags") + tags_json = None + try: + if tags is not None: + tags_json = json.dumps(tags, default=str) + except Exception: + tags_json = None + + if drop_fields: + doc = dict(doc) + for k in drop_fields: + doc.pop(k, None) + + blob = pack_and_compress(doc) + + buf_kv.append((out_db_name, runs_coll_name, doc_id, blob)) + buf_idx.append( + ( + out_db_name, + doc_id, + number_i, + str(name) if name is not None else None, + start_u, + end_u, + tags_json, + ) + ) + n += 1 + + if len(buf_kv) >= batch_size: + sql.executemany(ins_kv, buf_kv) + sql.executemany(ins_idx, buf_idx) + sql.commit() + buf_kv.clear() + buf_idx.clear() + + if buf_kv: + sql.executemany(ins_kv, buf_kv) + sql.executemany(ins_idx, buf_idx) + sql.commit() + + dt = now_s() - t0 + logger.info(f"[mongo] done runs {mongo_db.name}.{runs_coll_name}: {n} docs in {dt:.1f}s") + return n + + +def dump_gridfs_db( + mongo_db: pymongo.database.Database, + sql: sqlite3.Connection, + out_root: Path, + logger: logging.Logger, + batch_size: int, + only_configs: Optional[List[str]] = None, +) -> int: + import json as _json + + files_coll = mongo_db["fs.files"] + chunks_coll = mongo_db["fs.chunks"] + + out_dir = out_root / "gridfs" / mongo_db.name / "blobs" + ensure_dir(out_dir) + + query = {} + if only_configs: + query = {"config_name": {"$in": only_configs}} + + logger.info(f"[gridfs] dumping GridFS from DB '{mongo_db.name}' to {out_dir}") + t0 = now_s() + + cursor = files_coll.find(query, no_cursor_timeout=True).sort("uploadDate", 1) + + n = 0 + buf: List[Tuple] = [] + + ins = """ + INSERT OR REPLACE INTO gridfs_files( + db_name, file_id, filename, config_name, length, chunkSize, uploadDate, md5, + metadata_json, logical_name, blob_path + ) + VALUES (?,?,?,?,?,?,?,?,?,?,?) + """ + + for fdoc in cursor: + file_id = fdoc["_id"] + file_id_s = oid_to_str(file_id) + + filename = fdoc.get("filename") + config_name = fdoc.get("config_name") or fdoc.get("name") or fdoc.get("config") + + length = int(fdoc.get("length", 0)) + chunk_size = int(fdoc.get("chunkSize", 255 * 1024)) + upload_u = to_unix_seconds(fdoc.get("uploadDate")) + md5 = fdoc.get("md5") + + meta = fdoc.get("metadata") + metadata_json = _json.dumps(meta, default=str) if meta is not None else None + + logical_name = ( + config_name + or filename + or (meta.get("filename") if isinstance(meta, dict) else None) + or (meta.get("name") if isinstance(meta, dict) else None) + or None + ) + + display = (logical_name or "NO_NAME").replace("/", "_") + blob_name = f"{file_id_s}__{display}" + blob_rel = str(Path("gridfs") / mongo_db.name / "blobs" / blob_name) + blob_abs = out_root / blob_rel + + if not blob_abs.exists() or blob_abs.stat().st_size != length: + tmp_path = blob_abs.with_suffix(blob_abs.suffix + ".tmp") + ensure_dir(tmp_path.parent) + + with tmp_path.open("wb") as out_f: + expected_n = 0 + ch_cur = chunks_coll.find({"files_id": file_id}, no_cursor_timeout=True).sort( + "n", 1 + ) + wrote = 0 + for ch in ch_cur: + n_chunk = int(ch["n"]) + if n_chunk != expected_n: + raise RuntimeError( + f"[gridfs] Missing chunk for file_id={file_id_s}: " + f"expected n={expected_n}, got n={n_chunk}" + ) + out_f.write(bytes(ch["data"])) + wrote += len(ch["data"]) + expected_n += 1 + + if wrote > length: + out_f.flush() + out_f.seek(length) + out_f.truncate() + + tmp_path.replace(blob_abs) + + buf.append( + ( + mongo_db.name, + file_id_s, + filename, + config_name, + length, + chunk_size, + upload_u, + md5, + metadata_json, + logical_name, + blob_rel, + ) + ) + n += 1 + + if len(buf) >= batch_size: + sql.executemany(ins, buf) + sql.commit() + buf.clear() + + if buf: + sql.executemany(ins, buf) + sql.commit() + + dt = now_s() - t0 + logger.info(f"[gridfs] done '{mongo_db.name}': {n} files in {dt:.1f}s") + return n + + +# ------------------------- +# Dump logic (xedocs -> xedocs.sqlite tables) +# ------------------------- + + +def _xedocs_extract(doc: dict, label_cols: List[str]) -> Dict[str, Any]: + """Extract core xedocs fields + discovered label columns.""" + out: Dict[str, Any] = {} + + out["_id"] = oid_to_str(doc.get("_id")) + out["version"] = doc.get("version") + + created_date = doc.get("created_date") or doc.get("createdDate") + out["created_date_ns"] = to_utc_ns(created_date) + + # time handling + out["time_ns"] = None + out["time_left_ns"] = None + out["time_right_ns"] = None + + t = doc.get("time") + if t is not None: + if isinstance(t, dict) and ("left" in t or "right" in t): + out["time_left_ns"] = to_utc_ns(t.get("left")) + out["time_right_ns"] = to_utc_ns(t.get("right")) + else: + out["time_ns"] = to_utc_ns(t) + + # value columns + v = doc.get("value", None) + out["value_num"] = None + try: + if isinstance(v, (int, float)) and not isinstance(v, bool): + out["value_num"] = float(v) + except Exception: + pass + + try: + out["value_json"] = json.dumps(v, default=str) + except Exception: + out["value_json"] = None + + # discovered labels (TEXT) + for k in label_cols: + if k in ( + "_id", + "version", + "time", + "created_date", + "createdDate", + "value", + "comments", + "reviews", + ): + continue + val = doc.get(k, None) + if val is None: + out[k] = None + continue + # Keep labels reasonably queryable: store simple types as strings, + # otherwise JSON-encode. + if isinstance(val, (str, int, float, bool)): + out[k] = str(val) if not isinstance(val, str) else val + else: + try: + out[k] = json.dumps(val, default=str) + except Exception: + out[k] = str(val) + + out["doc_bson_z"] = pack_and_compress(doc) + return out + + +def dump_xedocs_collection_to_tables( + mongo_db: pymongo.database.Database, + coll_name: str, + sql_x: sqlite3.Connection, + batch_size: int, + logger: logging.Logger, + sample_n: int = 1000, +) -> int: + """Dump xedocs. into xedocs.sqlite table with auto-discovered label columns.""" + coll = mongo_db[coll_name] + table = coll_name + + logger.info( + f"[mongo] dumping xedocs.{coll_name} -> xedocs.sqlite table '{table}' (auto-discover)" + ) + + # --------- + # 1) Discover label columns from a sample of docs + # --------- + skip_keys = { + "_id", + "time", + "value", + "created_date", + "createdDate", + "comments", + "reviews", + } + + label_cols_set = set() + try: + sample_cursor = coll.find( + {}, no_cursor_timeout=True, batch_size=min(batch_size, 500) + ).limit(sample_n) + for d in sample_cursor: + for k in d.keys(): + if k in skip_keys: + continue + # We keep 'version' as a core column, but allow it in schema generation + # (it will be ignored if duplicated) + label_cols_set.add(k) + except Exception as e: + logger.warning( + f"[mongo] xedocs label discovery failed for {coll_name}: {type(e).__name__}: {e}" + ) + + # Deterministic order + label_cols = sorted(label_cols_set) + + # --------- + # 2) Create table schema (core + discovered labels) + # --------- + sql_x.executescript(_schema_sql_xedocs_table(table, extra_label_cols=label_cols)) + sql_x.commit() + + # --------- + # 3) Dump all docs + # --------- + t0 = now_s() + + # Build INSERT dynamically + # Core columns (must match schema) + core_cols = [ + "_id", + "version", + "time_ns", + "time_left_ns", + "time_right_ns", + "created_date_ns", + "value_num", + "value_json", + ] + + # Only keep label columns that are not core columns and are valid SQL identifiers when quoted + # (we always quote, so any name is okay) + extra_cols = [ + c + for c in label_cols + if c + not in { + "_id", + "version", + "time_ns", + "time_left_ns", + "time_right_ns", + "created_date_ns", + "value_num", + "value_json", + "doc_bson_z", + } + ] + + all_cols = core_cols + extra_cols + ["doc_bson_z"] + + def q(name: str) -> str: + return '"' + name.replace('"', '""') + '"' + + placeholders = ",".join(["?"] * len(all_cols)) + ins = ( + f"INSERT OR REPLACE INTO " + f"{q(table)}({','.join(q(c) for c in all_cols)}) " + f"VALUES ({placeholders})" + ) + + cur = coll.find({}, no_cursor_timeout=True, batch_size=batch_size) + + n = 0 + buf: List[Tuple[Any, ...]] = [] + + for doc in cur: + extracted = _xedocs_extract(doc, label_cols=extra_cols) + row = tuple(extracted.get(c) for c in all_cols) + buf.append(row) + n += 1 + + if len(buf) >= batch_size: + sql_x.executemany(ins, buf) + sql_x.commit() + buf.clear() + + if buf: + sql_x.executemany(ins, buf) + sql_x.commit() + + dt = now_s() - t0 + logger.info(f"[mongo] done xedocs.{coll_name}: {n} docs in {dt:.1f}s") + return n + + +# ------------------------- +# Main +# ------------------------- + + +def setup_logger(verbosity: int) -> logging.Logger: + lvl = logging.INFO if verbosity == 0 else (logging.DEBUG if verbosity >= 1 else logging.INFO) + logger = logging.getLogger("dump_mongo_offline") + logger.setLevel(lvl) + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(lvl) + fmt = logging.Formatter("%(asctime)s | %(levelname)s | %(message)s") + handler.setFormatter(fmt) + logger.handlers.clear() + logger.addHandler(handler) + logger.propagate = False + return logger + + +def main() -> None: + ap = argparse.ArgumentParser() + ap.add_argument("--out", required=True, help="Output directory for offline cache") + ap.add_argument( + "--experiment", default="xent", choices=["xent", "xe1t"], help="utilix experiment" + ) + ap.add_argument( + "--mongo-uri", default=None, help="Override Mongo URI (otherwise uses utilix uconfig)" + ) + ap.add_argument( + "--spec", + required=True, + help="Spec file with lines like 'xenonnt:runs', 'xedocs:ALL', 'files:GRIDFS'", + ) + ap.add_argument( + "--sqlite-name", + default="rundb.sqlite", + help="SQLite filename under --out for runs/gridfs/kv", + ) + ap.add_argument( + "--xedocs-sqlite-name", + default="xedocs.sqlite", + help="SQLite filename under --out for xedocs tables", + ) + ap.add_argument( + "--batch-size", + type=int, + default=2000, + help="Batch size for Mongo cursor and SQLite inserts", + ) + ap.add_argument( + "-v", "--verbose", action="count", default=0, help="Increase logging verbosity (-v/-vv)" + ) + + ap.add_argument( + "--runs-drop-field", + action="append", + default=[], + help="Drop a field from xenonnt:runs docs before storing (repeatable).", + ) + + ap.add_argument( + "--gridfs-only-filenames", + default=None, + help="Text file with one filename per line to dump from GridFS", + ) + args = ap.parse_args() + + logger = setup_logger(args.verbose) + + out_root = Path(args.out).resolve() + ensure_dir(out_root) + + spec_path = Path(args.spec).resolve() + spec_items = parse_spec_lines(spec_path.read_text().splitlines()) + + logger.info( + f"Connecting to Mongo (experiment={args.experiment}, uri_override={bool(args.mongo_uri)})" + ) + client = get_mongo_client(args.experiment, uri_override=args.mongo_uri) + + # rundb.sqlite + sqlite_path = out_root / args.sqlite_name + logger.info(f"Opening rundb SQLite at {sqlite_path}") + sql = sqlite3.connect(str(sqlite_path)) + sql.executescript(SCHEMA_SQL_RUNDB) + sql.commit() + + # xedocs.sqlite (only opened if needed) + xedocs_sqlite_path = out_root / args.xedocs_sqlite_name + sql_x: Optional[sqlite3.Connection] = None + + gridfs_only = None + if args.gridfs_only_filenames: + gridfs_only = [ + ln.strip() + for ln in Path(args.gridfs_only_filenames).read_text().splitlines() + if ln.strip() + ] + + manifest = { + "format": "offline-mongo-sqlite-v2", + "created_at_unix": int(time.time()), + "compression": COMP_ALGO, + "experiment": args.experiment, + "spec_file": str(spec_path), + "spec": [{"db": x.db, "what": x.what} for x in spec_items], + "sqlite_rundb": str(sqlite_path.name), + "sqlite_xedocs": str(xedocs_sqlite_path.name), + } + (out_root / "manifest.json").write_text(json.dumps(manifest, indent=2)) + logger.info(f"Wrote manifest.json (compression={COMP_ALGO})") + + def _get_sql_x() -> sqlite3.Connection: + nonlocal sql_x + if sql_x is None: + logger.info(f"Opening xedocs SQLite at {xedocs_sqlite_path}") + sql_x = sqlite3.connect(str(xedocs_sqlite_path)) + # some pragmas for speed + sql_x.execute("PRAGMA journal_mode = WAL;") + sql_x.execute("PRAGMA synchronous = NORMAL;") + sql_x.execute("PRAGMA temp_store = MEMORY;") + sql_x.commit() + return sql_x + + for item in spec_items: + dbname = item.db + what = item.what + mongo_db = client[dbname] + + if what.upper() == "GRIDFS": + dump_gridfs_db( + mongo_db=mongo_db, + sql=sql, + out_root=out_root, + logger=logger, + batch_size=max(200, args.batch_size // 5), + only_configs=gridfs_only, + ) + continue + + if what.upper() == "ALL": + names = list_collection_names_safe(mongo_db) + logger.info(f"[mongo] {dbname}:ALL expanded to {len(names)} collections") + + for cname in names: + if cname in ("fs.files", "fs.chunks"): + logger.info(f"[mongo] skipping {dbname}.{cname} (use {dbname}:GRIDFS instead)") + continue + + if dbname == "xedocs": + dump_xedocs_collection_to_tables( + mongo_db=mongo_db, + coll_name=cname, + sql_x=_get_sql_x(), + batch_size=args.batch_size, + logger=logger, + sample_n=1000, + ) + else: + dump_generic_collection( + mongo_db=mongo_db, + coll_name=cname, + sql=sql, + out_db_name=dbname, + batch_size=args.batch_size, + logger=logger, + ) + continue + + # Single collection + cname = what + + if dbname == "xedocs": + dump_xedocs_collection_to_tables( + mongo_db=mongo_db, + coll_name=cname, + sql_x=_get_sql_x(), + batch_size=args.batch_size, + logger=logger, + sample_n=1000, + ) + continue + + if dbname == "xenonnt" and cname == "runs": + dump_xenonnt_runs_index( + mongo_db=mongo_db, + runs_coll_name=cname, + sql=sql, + out_db_name=dbname, + batch_size=args.batch_size, + logger=logger, + drop_fields=args.runs_drop_field, + ) + else: + dump_generic_collection( + mongo_db=mongo_db, + coll_name=cname, + sql=sql, + out_db_name=dbname, + batch_size=args.batch_size, + logger=logger, + ) + + logger.info("ANALYZE (optional)...") + try: + sql.execute("ANALYZE;") + sql.commit() + except Exception: + logger.warning("ANALYZE failed for rundb.sqlite (continuing)") + + if sql_x is not None: + try: + sql_x.execute("ANALYZE;") + sql_x.commit() + except Exception: + logger.warning("ANALYZE failed for xedocs.sqlite (continuing)") + + logger.info("All done.") + logger.info(f"Offline cache written to: {out_root}") + logger.info(f"rundb.sqlite : {sqlite_path}") + if sql_x is not None: + logger.info(f"xedocs.sqlite: {xedocs_sqlite_path}") + + +if __name__ == "__main__": + main() diff --git a/utilix/rundb.py b/utilix/rundb.py index 6887653..7f357f0 100644 --- a/utilix/rundb.py +++ b/utilix/rundb.py @@ -10,7 +10,7 @@ import time from . import uconfig, logger, io - +from .sqlite_backend import OfflineSQLiteCollection, SQLiteConfig, _load_sqlite_config # Config the logger: if uconfig is not None: # type: ignore @@ -577,8 +577,28 @@ def _collection(experiment, collection, url=None, user=None, password=None, data return db[collection] +def _sqlite_collection( + experiment: str, sqlite_config: SQLiteConfig, collection: str = "runs", **kwargs +): + database = kwargs.pop("database", None) + if database is None: + database = uconfig.get("RunDB", f"{experiment}_database") + + assert sqlite_config.rundb_sqlite_path is not None # for mypy + return OfflineSQLiteCollection( + sqlite_path=sqlite_config.rundb_sqlite_path, + db_name=database, + coll_name=collection, + compression=sqlite_config.compression, + ) + + def xent_collection(collection="runs", **kwargs): - return _collection("xent", collection, **kwargs) + sqlite_config = _load_sqlite_config() + if sqlite_config.sqlite_active(): + return _sqlite_collection("xent", sqlite_config, collection, **kwargs) + else: + return _collection("xent", collection, **kwargs) def xent_collection_admin(collection="runs", **kwargs): diff --git a/utilix/sqlite_backend.py b/utilix/sqlite_backend.py new file mode 100644 index 0000000..2524644 --- /dev/null +++ b/utilix/sqlite_backend.py @@ -0,0 +1,559 @@ +from __future__ import annotations + +"""SQLite offline backend for utilix. + +This module provides offline access to XENON RunDB and GridFS data using local +SQLite databases. It allows analysis to continue when MongoDB is unreachable. + +Usage: + 1. Generate SQLite files using mongo_to_sqlite.py (requires MongoDB access) + 2. Set environment variables: + export RUNDB_SQLITE_PATH="/path/to/rundb.sqlite" + export XEDOCS_SQLITE_PATH="/path/to/xedocs.sqlite" + 3. Use utilix normally - offline mode activates automatically + +Example: + >>> import os + >>> os.environ["RUNDB_SQLITE_PATH"] = "/data/rundb.sqlite" + >>> os.environ["XEDOCS_SQLITE_PATH"] = "/data/xedocs.sqlite" + >>> + >>> from utilix import xent_collection + >>> runs = xent_collection("runs") # Uses SQLite if files exist + >>> doc = runs.find_one({"number": 12345}) + +Environment Variables: + RUNDB_SQLITE_PATH: Path to RunDB SQLite file (required) + XEDOCS_SQLITE_PATH: Path to xedocs SQLite file (required) + OFFLINE_COMP: Compression algorithm, 'zstd' or 'zlib' (default: 'zstd') + OFFLINE_DEBUG: Enable debug logging, '1' or '0' (default: '0') + +Classes: + SQLiteConfig: Configuration dataclass for offline mode + OfflineGridFS: GridFS-compatible offline file access + OfflineSQLiteCollection: pymongo-compatible offline collection access + +Functions: + _load_sqlite_config: Load configuration from environment variables +""" + +import os +import sqlite3 +import shutil +from dataclasses import dataclass +from pathlib import Path +from typing import Optional, Tuple +import traceback +import logging +import pymongo +from bson import BSON + +OFFLINE_DEBUG = os.environ.get("OFFLINE_DEBUG", "0") not in ("0", "", "false", "False") + + +def _env_bool(name: str, default: str = "0") -> bool: + v = os.environ.get(name, default) + return v not in ("0", "", "false", "False", "no", "No", "NO") + + +def _dbg(msg): + if OFFLINE_DEBUG: + logging.debug(f"[offline-debug] {msg}") + + +def _dbg_stack(tag, n=6): + if OFFLINE_DEBUG: + logging.debug(f"[offline-debug] --- stack ({tag}) ---") + logging.debug("".join(traceback.format_stack(limit=n))) + logging.debug(f"[offline-debug] --- end stack ({tag}) ---") + + +def block(msg: str, cfg: SQLiteConfig) -> None: + if cfg.hard: + raise RuntimeError(f"[offline-hard] blocked: {msg}") + _dbg(f"WARNING: {msg}") + _dbg_stack("blocked") + + +@dataclass(frozen=True) +class SQLiteConfig: + rundb_sqlite_path: Optional[Path] + xedocs_sqlite_path: Optional[Path] + offline_root: Optional[Path] + compression: str + debug: bool + hard: bool + stack: bool + spy: bool + + def rundb_active(self) -> bool: + return self.rundb_sqlite_path is not None and self.rundb_sqlite_path.exists() + + def xedocs_active(self) -> bool: + return self.xedocs_sqlite_path is not None and self.xedocs_sqlite_path.exists() + + def sqlite_active(self) -> bool: + return self.rundb_active() and self.xedocs_active() + + +def _load_sqlite_config() -> SQLiteConfig: + sqp = os.environ.get("RUNDB_SQLITE_PATH", "").strip() + rundb_sqlite_path = Path(sqp).expanduser().resolve() if sqp else None + + xsp = os.environ.get("XEDOCS_SQLITE_PATH", "").strip() + xedocs_sqlite_path = Path(xsp).expanduser().resolve() if xsp else None + + offline_root = ( + rundb_sqlite_path.parent if (rundb_sqlite_path and rundb_sqlite_path.exists()) else None + ) + + debug = _env_bool("OFFLINE_DEBUG") + hard = _env_bool("OFFLINE_HARD") + stack = _env_bool("OFFLINE_STACK") + spy = _env_bool("PYMONGO_SPY") + + return SQLiteConfig( + rundb_sqlite_path=rundb_sqlite_path, + xedocs_sqlite_path=xedocs_sqlite_path, + offline_root=offline_root, + compression="zstd", + debug=debug, + hard=hard, + stack=stack, + spy=spy, + ) + + +@dataclass(frozen=True) +class GridFSRow: + db_name: str + file_id: str + config_name: str + md5: str + length: int + uploadDate: int + blob_path: str + + +class OfflineGridFS: + """Minimal offline replacement for utilix.mongo_storage.MongoDownloader / APIDownloader + behavior: + + - query SQLite table gridfs_files by config_name + - pick the latest by uploadDate + - stage/copy the blob into a local cache folder named by md5 + - return the staged path + + """ + + def __init__( + self, + sqlite_path: str | Path, + offline_root: str | Path, + cache_dirs: Tuple[str | Path, ...] = ("./resource_cache", "/tmp/straxen_resource_cache"), + gridfs_db_name: str = "files", + ): + self.sqlite_path = Path(sqlite_path).resolve() + self.offline_root = Path(offline_root).resolve() + self.cache_dirs = tuple(Path(p) for p in cache_dirs) + self.gridfs_db_name = gridfs_db_name + + self.conn = sqlite3.connect(str(self.sqlite_path)) + self.conn.row_factory = sqlite3.Row + + # ----------------- + # cache dir helpers + # ----------------- + def _pick_cache_dir(self) -> Path: + for d in self.cache_dirs: + try: + d.mkdir(parents=True, exist_ok=True) + except Exception: + continue + if os.access(d, os.W_OK): + return d + raise PermissionError(f"Cannot write to any cache dir: {self.cache_dirs}") + + # ----------------- + # sqlite queries + # ----------------- + def latest_by_config_name(self, config_name: str) -> Optional[GridFSRow]: + row = self.conn.execute( + "SELECT db_name, file_id, config_name, md5, length, uploadDate, blob_path " # noqa: E501 + "FROM gridfs_files WHERE db_name = ? AND config_name = ? " + "ORDER BY uploadDate DESC LIMIT 1", + (self.gridfs_db_name, config_name), + ).fetchone() + + if row is None: + return None + + # Some older entries might have NULL md5; that's not usable for caching-by-md5. + md5 = row["md5"] + if md5 is None: + raise RuntimeError( + f"Found GridFS entry for {config_name} but md5 is NULL in sqlite index" + ) + + return GridFSRow( + db_name=row["db_name"], + file_id=row["file_id"], + config_name=row["config_name"], + md5=str(md5), + length=int(row["length"] or 0), + uploadDate=int(row["uploadDate"] or 0), + blob_path=str(row["blob_path"]), + ) + + # ----------------- + # public API + # ----------------- + def download_single( + self, + config_name: str, + human_readable_file_name: bool = False, + write_to: Optional[str | Path] = None, + ) -> str: + """Return absolute path to a staged file. + + Default behavior matches utilix: store under md5 in a cache dir. + + """ + _dbg(f"OfflineGridFS.download_single('{config_name}') [SQLITE]") + + entry = self.latest_by_config_name(config_name) + if entry is None: + raise KeyError(f"Config '{config_name}' not found in offline gridfs_files index") + + blob_abs = (self.offline_root / entry.blob_path).resolve() + if not blob_abs.exists(): + raise FileNotFoundError(f"Blob missing on disk: {blob_abs} (from sqlite blob_path)") + + target_dir = Path(write_to).resolve() if write_to else self._pick_cache_dir() + target_dir.mkdir(parents=True, exist_ok=True) + + target_name = config_name if human_readable_file_name else entry.md5 + target_abs = (target_dir / target_name).resolve() + + # If already staged, trust it (fast path) + if target_abs.exists(): + return str(target_abs) + + # Copy in a safe-ish way (atomic replace) + tmp = target_abs.with_suffix(target_abs.suffix + ".tmp") + shutil.copyfile(blob_abs, tmp) + tmp.replace(target_abs) + + return str(target_abs) + + def list_files(self) -> list[str]: + rows = self.conn.execute( + "SELECT DISTINCT config_name FROM gridfs_files WHERE db_name=? ORDER BY config_name", + (self.gridfs_db_name,), + ).fetchall() + return [r["config_name"] for r in rows if r["config_name"] is not None] + + def close(self) -> None: + self.conn.close() + + +def smoke_test( + sqlite_path: str | Path, + offline_root: str | Path, + config_name: str, +) -> None: + g = OfflineGridFS(sqlite_path=sqlite_path, offline_root=offline_root) + p = g.download_single(config_name) + print("[OK] staged:", p) + g.close() + + +# ---- OFFLINE RUNDB COLLECTION (SQLite-backed) ---- + + +def _decompressor(algo: str): + if algo == "zstd": + import zstandard as zstd # type: ignore + + dctx = zstd.ZstdDecompressor() + return dctx.decompress + elif algo == "zlib": + import zlib + + return zlib.decompress + else: + raise ValueError(f"Unknown compression algo: {algo}") + + +class OfflineMongoClient: + """Dummy client to satisfy: collection.database.client.""" + + def close(self): + return + + +@dataclass +class OfflineMongoDatabase: + name: str + client: OfflineMongoClient + + +class OfflineSQLiteCollection: + """Minimal pymongo.collection.Collection-like wrapper backed by our sqlite cache. + + Provides the attribute chain expected by straxen.storage.rundb.RunDB: + collection.database.client + And a few commonly-used methods: find_one, find, count_documents. + + """ + + def __init__( + self, + sqlite_path: str | Path, + db_name: str, + coll_name: str, + compression: str = "zstd", + ): + self.sqlite_path = Path(sqlite_path).resolve() + self.db_name = str(db_name) + self.name = str(coll_name) # pymongo Collection has .name + self._coll_name = str(coll_name) + + self._conn = sqlite3.connect(str(self.sqlite_path)) + self._conn.row_factory = sqlite3.Row + self._decompress = _decompressor(compression) + + # mimic pymongo: collection.database.client + self.database = OfflineMongoDatabase(name=self.db_name, client=OfflineMongoClient()) + + def close(self): + try: + self._conn.close() + except Exception: + pass + + # --- internal helpers --- + + def _decode_row(self, row) -> dict: + raw = self._decompress(row["doc_bson_z"]) + return BSON(raw).decode() + + def _get_by_id(self, doc_id: str) -> dict: + row = self._conn.execute( + "SELECT doc_bson_z FROM kv_collections WHERE db_name=? AND coll_name=? AND doc_id=?", + (self.db_name, self._coll_name, str(doc_id)), + ).fetchone() + if row is None: + raise KeyError(f"Not found: {self.db_name}.{self._coll_name} _id={doc_id}") + return self._decode_row(row) + + # --- pymongo-ish public API --- + + def find_one(self, filter: dict | None = None, *args, **kwargs): + """ + Minimal behavior: + - if filter contains _id, return that doc + - if filter contains 'number' (for runs collection), look it up + - else return first doc (used as connectivity test) + """ + filter = filter or {} + + # _id special case + if "_id" in filter: + try: + return self._get_by_id(str(filter["_id"])) + except KeyError: + return None + + # Special case for runs collection with number filter + if self._coll_name == "runs" and "number" in filter: + number = int(filter["number"]) + row = self._conn.execute( + "SELECT doc_id FROM runs_index WHERE db_name=? AND number=? LIMIT 1", + (self.db_name, number), + ).fetchone() + if row is None: + return None + return self._get_by_id(row["doc_id"]) + + # Default: return first doc (connectivity test) + row = self._conn.execute( + "SELECT doc_bson_z FROM kv_collections WHERE db_name=? AND coll_name=? LIMIT 1", + (self.db_name, self._coll_name), + ).fetchone() + + if row is None: + return None + return self._decode_row(row) + + def find(self, filter: dict | None = None, *args, **kwargs): + filter = filter or {} + + # Special-case _id + if "_id" in filter: + try: + doc = self._get_by_id(str(filter["_id"])) + return _OfflineCursor([doc]) # small list OK + except KeyError: + return _OfflineCursor([]) + + # Special-case xenonnt.runs by number + if self._coll_name == "runs" and "number" in filter: + number = int(filter["number"]) + row = self._conn.execute( + "SELECT doc_id FROM runs_index WHERE db_name=? AND number=? LIMIT 1", + (self.db_name, number), + ).fetchone() + if row is None: + return _OfflineCursor([]) + doc = self._get_by_id(row["doc_id"]) + return _OfflineCursor([doc]) + + # Default: streaming cursor over all docs + return _OfflineStreamingCursor(self.iter_all()) + + def count_documents(self, filter: dict | None = None, *args, **kwargs) -> int: + filter = filter or {} + + if "_id" in filter: + row = self._conn.execute( + "SELECT COUNT(*) AS n FROM kv_collections " + "WHERE db_name=? AND coll_name=? AND doc_id=?", + (self.db_name, self._coll_name, str(filter["_id"])), + ).fetchone() + return int(row["n"]) if row else 0 + + if self._coll_name == "runs" and "number" in filter: + number = int(filter["number"]) + row = self._conn.execute( + "SELECT COUNT(*) AS n FROM runs_index WHERE db_name=? AND number=?", + (self.db_name, number), + ).fetchone() + return int(row["n"]) if row else 0 + + row = self._conn.execute( + "SELECT COUNT(*) AS n FROM kv_collections WHERE db_name=? AND coll_name=?", + (self.db_name, self._coll_name), + ).fetchone() + return int(row["n"]) if row else 0 + + def iter_all(self): + cur = self._conn.execute( + "SELECT doc_bson_z FROM kv_collections WHERE db_name=? AND coll_name=?", + (self.db_name, self._coll_name), + ) + for row in cur: + yield self._decode_row(row) + + def as_list(self, limit: int | None = None): + out = [] + for i, d in enumerate(self.iter_all()): + out.append(d) + if limit is not None and i + 1 >= limit: + break + return out + + +class _OfflineCursor: + """Small in-memory cursor (safe only for tiny result sets).""" + + def __init__(self, docs): + self._docs = list(docs) + + def sort(self, key, direction=1): + rev = direction == -1 + self._docs.sort(key=lambda d: d.get(key), reverse=rev) + return self + + def skip(self, n): + self._docs = self._docs[int(n) :] # noqa: E203 + return self + + def limit(self, n): + self._docs = self._docs[: int(n)] + return self + + def __iter__(self): + return iter(self._docs) + + +class _OfflineStreamingCursor: + """Streaming cursor: does NOT materialize docs.""" + + def __init__(self, iterator): + self._it = iterator + self._skip = 0 + self._limit = None + self._sort_key = None + self._sort_dir = 1 + + def sort(self, key, direction=1): + # WARNING: true sort requires materialization. + # Keep it conservative: only allow sort if limit is set (small-ish), + # otherwise do nothing or raise. + self._sort_key = key + self._sort_dir = direction + return self + + def skip(self, n): + self._skip = int(n) + return self + + def limit(self, n): + self._limit = int(n) + return self + + def __iter__(self): + it = self._it + + # apply skip + for _ in range(self._skip): + try: + next(it) + except StopIteration: + return iter(()) + + # If no sort requested, stream directly + if self._sort_key is None: + if self._limit is None: + return it + else: + # stream with limit + def gen(): + for i, d in enumerate(it): + if i >= self._limit: + break + yield d + + return gen() + + # If sort requested, we must materialize. + # We materialize only up to limit if provided, else this is dangerous. + if self._limit is None: + raise RuntimeError( + "Offline streaming cursor cannot sort without limit (would load everything)." + ) + + docs = [] + for i, d in enumerate(it): + if i >= self._limit: + break + docs.append(d) + + rev = self._sort_dir == -1 + docs.sort(key=lambda d: d.get(self._sort_key), reverse=rev) + return iter(docs) + + +# Add pymongo spy +_orig_mc = pymongo.MongoClient + + +class MongoClientSpy(_orig_mc): # type: ignore[misc,valid-type] + def __init__(self, *args, **kwargs): + cfg = _load_sqlite_config() + if cfg.spy: + block(f"pymongo.MongoClient CREATED args={args} kwargs_keys={list(kwargs.keys())}", cfg) + super().__init__(*args, **kwargs) + + +pymongo.MongoClient = MongoClientSpy