From 87302a714561e674341b9e303b983e651f976e6d Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Fri, 20 Feb 2026 16:51:21 +0000 Subject: [PATCH 1/2] First pass at txt extraction support --- retriever/src/retriever/__main__.py | 2 + .../src/retriever/examples/batch_pipeline.py | 54 +++-- .../retriever/examples/inprocess_pipeline.py | 61 ++++-- retriever/src/retriever/ingest-config.yaml | 7 + retriever/src/retriever/ingest_modes/batch.py | 37 ++++ .../src/retriever/ingest_modes/inprocess.py | 101 ++++----- retriever/src/retriever/pdf/split.py | 32 +++ retriever/src/retriever/txt/__init__.py | 21 ++ retriever/src/retriever/txt/__main__.py | 101 +++++++++ retriever/src/retriever/txt/ray_data.py | 44 ++++ retriever/src/retriever/txt/split.py | 204 ++++++++++++++++++ retriever/tests/__init__.py | 1 + retriever/tests/test_txt_split.py | 81 +++++++ 13 files changed, 645 insertions(+), 101 deletions(-) create mode 100644 retriever/src/retriever/txt/__init__.py create mode 100644 retriever/src/retriever/txt/__main__.py create mode 100644 retriever/src/retriever/txt/ray_data.py create mode 100644 retriever/src/retriever/txt/split.py create mode 100644 retriever/tests/__init__.py create mode 100644 retriever/tests/test_txt_split.py diff --git a/retriever/src/retriever/__main__.py b/retriever/src/retriever/__main__.py index cf3abd9d2..a8116b7d0 100644 --- a/retriever/src/retriever/__main__.py +++ b/retriever/src/retriever/__main__.py @@ -10,6 +10,7 @@ from .compare import app as compare_app from .vector_store import app as vector_store_app from .recall import app as recall_app +from .txt import __main__ as txt_main app = typer.Typer(help="Retriever") app.add_typer(image_app, name="image") @@ -19,6 +20,7 @@ app.add_typer(compare_app, name="compare") app.add_typer(vector_store_app, name="vector-store") app.add_typer(recall_app, name="recall") +app.add_typer(txt_main.app, name="txt") def main(): diff --git a/retriever/src/retriever/examples/batch_pipeline.py b/retriever/src/retriever/examples/batch_pipeline.py index 89666cc34..686b3dde9 100644 --- a/retriever/src/retriever/examples/batch_pipeline.py +++ b/retriever/src/retriever/examples/batch_pipeline.py @@ -57,10 +57,15 @@ def _hit_key_and_distance(hit: dict) -> tuple[str | None, float | None]: def main( input_dir: Path = typer.Argument( ..., - help="Directory containing PDFs to ingest.", + help="Directory containing PDFs or .txt files to ingest.", path_type=Path, exists=True, ), + input_type: str = typer.Option( + "pdf", + "--input-type", + help="Input format: 'pdf' or 'txt'. Use 'txt' for a directory of .txt files (tokenizer-based chunking).", + ), ray_address: Optional[str] = typer.Option( None, "--ray-address", @@ -92,20 +97,29 @@ def main( # else: use ray_address as-is (None → in-process, or URL to existing cluster) input_dir = Path(input_dir) - pdf_glob = str(input_dir / "*.pdf") - - ingestor = create_ingestor(run_mode="batch", ray_address=ray_address) - ingestor = ( - ingestor.files(pdf_glob) - .extract( - extract_text=True, - extract_tables=True, - extract_charts=True, - extract_infographics=False, + if input_type == "txt": + glob_pattern = str(input_dir / "*.txt") + ingestor = create_ingestor(run_mode="batch", ray_address=ray_address) + ingestor = ( + ingestor.files(glob_pattern) + .extract_txt(max_tokens=512, overlap_tokens=0) + .embed(model_name="nemo_retriever_v1") + .vdb_upload(lancedb_uri=LANCEDB_URI, table_name=LANCEDB_TABLE, overwrite=True, create_index=True) + ) + else: + glob_pattern = str(input_dir / "*.pdf") + ingestor = create_ingestor(run_mode="batch", ray_address=ray_address) + ingestor = ( + ingestor.files(glob_pattern) + .extract( + extract_text=True, + extract_tables=True, + extract_charts=True, + extract_infographics=False, + ) + .embed(model_name="nemo_retriever_v1") + .vdb_upload(lancedb_uri=LANCEDB_URI, table_name=LANCEDB_TABLE, overwrite=True, create_index=True) ) - .embed(model_name="nemo_retriever_v1") - .vdb_upload(lancedb_uri=LANCEDB_URI, table_name=LANCEDB_TABLE, overwrite=True, create_index=True) - ) print("Running extraction...") ingestor.ingest() @@ -158,8 +172,9 @@ def main( hit = _is_hit_at_k(g, top_keys, cfg.top_k) if not no_recall_details: + ext = ".txt" if input_type == "txt" else ".pdf" print(f"\nQuery {i}: {q}") - print(f" Gold: {g} (file: {doc}.pdf, page: {page})") + print(f" Gold: {g} (file: {doc}{ext}, page: {page})") print(f" Hit@{cfg.top_k}: {hit}") print(" Top hits:") if not scored_hits: @@ -172,15 +187,16 @@ def main( print(f" {rank:02d}. {key} distance={dist:.6f}") if not hit: - missed_gold.append((f"{doc}.pdf", str(page))) + ext = ".txt" if input_type == "txt" else ".pdf" + missed_gold.append((f"{doc}{ext}", str(page))) missed_unique = sorted(set(missed_gold), key=lambda x: (x[0], x[1])) - print("\nMissed gold (unique pdf/page):") + print("\nMissed gold (unique doc/page):") if not missed_unique: print(" (none)") else: - for pdf, page in missed_unique: - print(f" {pdf} page {page}") + for doc_page, page in missed_unique: + print(f" {doc_page} page {page}") print(f"\nTotal missed: {len(missed_unique)} / {len(_gold)}") print("\nRecall metrics (matching retriever.recall.core):") diff --git a/retriever/src/retriever/examples/inprocess_pipeline.py b/retriever/src/retriever/examples/inprocess_pipeline.py index f06b41eba..e47cba2e3 100644 --- a/retriever/src/retriever/examples/inprocess_pipeline.py +++ b/retriever/src/retriever/examples/inprocess_pipeline.py @@ -54,10 +54,15 @@ def _hit_key_and_distance(hit: dict) -> tuple[str | None, float | None]: def main( input_dir: Path = typer.Argument( ..., - help="Directory containing PDFs to ingest.", + help="Directory containing PDFs or .txt files to ingest.", path_type=Path, exists=True, ), + input_type: str = typer.Option( + "pdf", + "--input-type", + help="Input format: 'pdf' or 'txt'. Use 'txt' for a directory of .txt files (tokenizer-based chunking).", + ), query_csv: Path = typer.Option( "bo767_query_gt.csv", "--query-csv", @@ -70,24 +75,36 @@ def main( help="Do not print per-query retrieval details (query, gold, hits). Only the missed-gold summary and recall metrics are printed.", ), ) -> None: - os.environ.setdefault("NEMOTRON_OCR_MODEL_DIR", str(Path.cwd() / "nemotron-ocr-v1")) + if input_type == "txt": + pass # No NEMOTRON_OCR_MODEL_DIR needed for .txt + else: + os.environ.setdefault("NEMOTRON_OCR_MODEL_DIR", str(Path.cwd() / "nemotron-ocr-v1")) input_dir = Path(input_dir) - pdf_glob = str(input_dir / "*.pdf") - - ingestor = create_ingestor(run_mode="inprocess") - ingestor = ( - ingestor.files(pdf_glob) - .extract( - method="pdfium", - extract_text=True, - extract_tables=True, - extract_charts=True, - extract_infographics=False, + if input_type == "txt": + glob_pattern = str(input_dir / "*.txt") + ingestor = create_ingestor(run_mode="inprocess") + ingestor = ( + ingestor.files(glob_pattern) + .extract_txt(max_tokens=512, overlap_tokens=0) + .embed(model_name="nemo_retriever_v1") + .vdb_upload(lancedb_uri=LANCEDB_URI, table_name=LANCEDB_TABLE, overwrite=False, create_index=True) + ) + else: + glob_pattern = str(input_dir / "*.pdf") + ingestor = create_ingestor(run_mode="inprocess") + ingestor = ( + ingestor.files(glob_pattern) + .extract( + method="pdfium", + extract_text=True, + extract_tables=True, + extract_charts=True, + extract_infographics=False, + ) + .embed(model_name="nemo_retriever_v1") + .vdb_upload(lancedb_uri=LANCEDB_URI, table_name=LANCEDB_TABLE, overwrite=False, create_index=True) ) - .embed(model_name="nemo_retriever_v1") - .vdb_upload(lancedb_uri=LANCEDB_URI, table_name=LANCEDB_TABLE, overwrite=False, create_index=True) - ) print("Running extraction...") ingestor.ingest(show_progress=True) @@ -138,8 +155,9 @@ def main( hit = _is_hit_at_k(g, top_keys, cfg.top_k) if not no_recall_details: + ext = ".txt" if input_type == "txt" else ".pdf" print(f"\nQuery {i}: {q}") - print(f" Gold: {g} (file: {doc}.pdf, page: {page})") + print(f" Gold: {g} (file: {doc}{ext}, page: {page})") print(f" Hit@{cfg.top_k}: {hit}") print(" Top hits:") if not scored_hits: @@ -152,15 +170,16 @@ def main( print(f" {rank:02d}. {key} distance={dist:.6f}") if not hit: - missed_gold.append((f"{doc}.pdf", str(page))) + ext = ".txt" if input_type == "txt" else ".pdf" + missed_gold.append((f"{doc}{ext}", str(page))) missed_unique = sorted(set(missed_gold), key=lambda x: (x[0], x[1])) - print("\nMissed gold (unique pdf/page):") + print("\nMissed gold (unique doc/page):") if not missed_unique: print(" (none)") else: - for pdf, page in missed_unique: - print(f" {pdf} page {page}") + for doc_page, page in missed_unique: + print(f" {doc_page} page {page}") print(f"\nTotal missed: {len(missed_unique)} / {len(_gold)}") print("\nRecall metrics (matching retriever.recall.core):") diff --git a/retriever/src/retriever/ingest-config.yaml b/retriever/src/retriever/ingest-config.yaml index d5bf0d0e1..91441ce2b 100644 --- a/retriever/src/retriever/ingest-config.yaml +++ b/retriever/src/retriever/ingest-config.yaml @@ -59,6 +59,13 @@ pdf: # Optionally limit number of PDFs processed limit: null +# Optional config for `retriever txt run` and .extract_txt() API +txt: + max_tokens: 512 + overlap_tokens: 0 + tokenizer_model_id: nvidia/llama-3.2-nv-embedqa-1b-v2 + encoding: utf-8 + table: # Example config for: # - `retriever table stage run --config --input ` diff --git a/retriever/src/retriever/ingest_modes/batch.py b/retriever/src/retriever/ingest_modes/batch.py index 72f8992a6..8a265da5f 100644 --- a/retriever/src/retriever/ingest_modes/batch.py +++ b/retriever/src/retriever/ingest_modes/batch.py @@ -250,6 +250,8 @@ def __init__(self, documents: Optional[List[str]] = None, ray_address: Optional[ self._rd_dataset: rd.Dataset = None # Ray Data dataset created from input documents. self._tasks: List[tuple[str, dict[str, Any]]] = [] self._intermediate_output_dir: Optional[str] = None + self._pipeline_type: str = "pdf" # "pdf" | "txt" + self._extract_txt_kwargs: Dict[str, Any] = {} def files(self, documents: Union[str, List[str]]) -> "BatchIngestor": """ @@ -373,6 +375,7 @@ def extract(self, **kwargs: Any) -> "BatchIngestor": # memory usage by ~30-40% vs 300 DPI. kwargs.setdefault("dpi", 200) + self._pipeline_type = "pdf" self._tasks.append(("extract", dict(kwargs))) # Stage-specific kwargs: upstream PDF stages accept many options (dpi, extract_*), @@ -444,6 +447,40 @@ def extract(self, **kwargs: Any) -> "BatchIngestor": return self + def extract_txt( + self, + max_tokens: int = 512, + overlap_tokens: int = 0, + encoding: str = "utf-8", + **kwargs: Any, + ) -> "BatchIngestor": + """ + Configure txt-only pipeline: read_binary_files -> TxtSplitActor (bytes -> chunk rows). + + Use with .files("*.txt").extract_txt(...).embed().vdb_upload().ingest(). + Do not call .extract() when using .extract_txt(). + """ + from retriever.txt.ray_data import TxtSplitActor + + self._pipeline_type = "txt" + self._extract_txt_kwargs = { + "max_tokens": max_tokens, + "overlap_tokens": overlap_tokens, + "encoding": encoding, + **kwargs, + } + self._tasks.append(("extract_txt", dict(self._extract_txt_kwargs))) + + self._rd_dataset = self._rd_dataset.map_batches( + TxtSplitActor, + batch_size=4, + batch_format="pandas", + num_cpus=1, + num_gpus=0, + fn_constructor_kwargs=dict(self._extract_txt_kwargs), + ) + return self + def embed(self, **kwargs: Any) -> "BatchIngestor": """ Add a text-embedding stage to the batch pipeline. diff --git a/retriever/src/retriever/ingest_modes/inprocess.py b/retriever/src/retriever/ingest_modes/inprocess.py index ea6b3866e..82cfef3fc 100644 --- a/retriever/src/retriever/ingest_modes/inprocess.py +++ b/retriever/src/retriever/ingest_modes/inprocess.py @@ -16,7 +16,7 @@ from io import BytesIO from collections.abc import Callable from pathlib import Path -from typing import Any, Dict, List, Optional, Sequence, Tuple, Union +from typing import Any, Dict, List, Literal, Optional, Sequence, Tuple, Union from nemotron_page_elements_v3.model import define_model @@ -27,14 +27,6 @@ from retriever.ocr.ocr import ocr_page_elements from retriever.text_embed.main_text_embed import TextEmbeddingConfig, create_text_embeddings_for_df -try: - import pypdfium2 as pdfium -except Exception as e: # pragma: no cover - pdfium = None # type: ignore[assignment] - _PDFIUM_IMPORT_ERROR = e -else: # pragma: no cover - _PDFIUM_IMPORT_ERROR = None - try: from tqdm.auto import tqdm except Exception: # pragma: no cover @@ -42,6 +34,8 @@ from ..ingest import Ingestor from ..pdf.extract import pdf_extraction +from ..pdf.split import pdf_path_to_pages_df +from ..txt import txt_file_to_chunks_df _CONTENT_COLUMNS = ("table", "chart", "infographic") @@ -602,6 +596,10 @@ def __init__(self, documents: Optional[List[str]] = None, **kwargs: Any) -> None # Builder-style configuration recorded for later execution (TBD). self._tasks: List[tuple[Callable[..., Any], dict[str, Any]]] = [] + # Pipeline type: "pdf" (extract) or "txt" (extract_txt). Loader dispatch in ingest(). + self._pipeline_type: Literal["pdf", "txt"] = "pdf" + self._extract_txt_kwargs: Dict[str, Any] = {} + def files(self, documents: Union[str, List[str]]) -> "InProcessIngestor": """ Add local files for in-process execution. @@ -658,6 +656,7 @@ def extract(self, **kwargs: Any) -> "InProcessIngestor": for k in ("extract_text", "extract_images", "extract_tables", "extract_charts", "extract_infographics") ): extract_kwargs["extract_page_as_image"] = True + self._pipeline_type = "pdf" self._tasks.append((pdf_extraction, extract_kwargs)) # Common, optional knobs shared by our detect_* helpers. @@ -714,12 +713,34 @@ def _detect_kwargs_with_model(model_obj: Any) -> dict[str, Any]: return self + def extract_txt( + self, + max_tokens: int = 512, + overlap_tokens: int = 0, + encoding: str = "utf-8", + **kwargs: Any, + ) -> "InProcessIngestor": + """ + Configure txt ingestion: tokenizer-based chunking only (no PDF extraction). + + Use with .files("*.txt").extract_txt(...).embed().vdb_upload().ingest(). + Do not call .extract() when using .extract_txt(). + """ + self._pipeline_type = "txt" + self._extract_txt_kwargs = { + "max_tokens": max_tokens, + "overlap_tokens": overlap_tokens, + "encoding": encoding, + **kwargs, + } + return self + def embed(self, **kwargs: Any) -> "InProcessIngestor": """ Configure embedding for in-process execution. This records an embedding task so call sites can chain `.embed(...)` - after `.extract(...)`. + after `.extract(...)` or `.extract_txt()`. When ``embedding_endpoint`` is provided (e.g. ``"http://embedding:8000/v1"``), a remote NIM endpoint is used for @@ -820,55 +841,6 @@ def ingest( **_: Any, ) -> list[Any]: - if pdfium is None: # pragma: no cover - raise ImportError("pypdfium2 is required for inprocess ingestion.") from _PDFIUM_IMPORT_ERROR - - def _pdf_to_pages_df(path: str) -> pd.DataFrame: - """ - Convert a multi-page PDF at `path` into a DataFrame where each row - contains a *single-page* PDF's raw bytes. - - Columns: - - bytes: single-page PDF bytes - - path: original input path - - page_number: 1-indexed page number - """ - abs_path = os.path.abspath(path) - out_rows: list[dict[str, Any]] = [] - doc = None - try: - doc = pdfium.PdfDocument(abs_path) - for page_idx in range(len(doc)): - single = pdfium.PdfDocument.new() - try: - single.import_pages(doc, pages=[page_idx]) - buf = BytesIO() - single.save(buf) - out_rows.append( - { - "bytes": buf.getvalue(), - "path": abs_path, - "page_number": page_idx + 1, - } - ) - finally: - try: - single.close() - except Exception: - pass - except BaseException as e: - # Preserve shape expected downstream (pdf_extraction emits error - # records per-row, so we return a single row to trigger that). - out_rows.append({"bytes": b"", "path": abs_path, "page_number": 0, "error": str(e)}) - finally: - try: - if doc is not None: - doc.close() - except Exception: - pass - - return pd.DataFrame(out_rows) - # Tasks that run once on combined results (after all docs). All others run per-doc. _post_tasks = (upload_embeddings_to_lancedb_inprocess, save_dataframe_to_disk_json) per_doc_tasks = [(f, k) for f, k in self._tasks if f not in _post_tasks] @@ -882,10 +854,17 @@ def _pdf_to_pages_df(path: str) -> pd.DataFrame: if show_progress and tqdm is not None: doc_iter = tqdm(docs, desc="Processing documents", unit="doc") + if self._pipeline_type == "pdf": + def _loader(p: str) -> pd.DataFrame: + return pdf_path_to_pages_df(p) + else: + def _loader(p: str) -> pd.DataFrame: + return txt_file_to_chunks_df(p, **self._extract_txt_kwargs) + for doc_path in doc_iter: - pages_df = _pdf_to_pages_df(doc_path) + initial_df = _loader(doc_path) - current: Any = pages_df + current: Any = initial_df for func, kwargs in per_doc_tasks: if func is pdf_extraction: current = func(pdf_binary=current, **kwargs) diff --git a/retriever/src/retriever/pdf/split.py b/retriever/src/retriever/pdf/split.py index 40c7f1ea2..cc245e54d 100644 --- a/retriever/src/retriever/pdf/split.py +++ b/retriever/src/retriever/pdf/split.py @@ -2,6 +2,7 @@ from dataclasses import dataclass from io import BytesIO +from pathlib import Path from typing import Any, Dict, List, Optional import traceback @@ -78,6 +79,37 @@ def _split_pdf_to_single_page_bytes(pdf_binary: Any) -> List[bytes]: return out +def pdf_path_to_pages_df(path: str) -> pd.DataFrame: + """ + Convert a multi-page PDF at `path` into a DataFrame where each row + contains a single-page PDF's raw bytes. + + Columns: bytes, path, page_number (1-indexed). + Compatible with pdf_extraction and downstream inprocess/batch stages. + """ + if pdfium is None: # pragma: no cover + raise ImportError("pypdfium2 is required for PDF splitting but could not be imported.") from _PDFIUM_IMPORT_ERROR + + abs_path = str(Path(path).resolve()) + out_rows: List[Dict[str, Any]] = [] + try: + raw_bytes = Path(abs_path).read_bytes() + pages = _split_pdf_to_single_page_bytes(raw_bytes) + for page_idx, page_bytes in enumerate(pages): + out_rows.append( + { + "bytes": page_bytes, + "path": abs_path, + "page_number": page_idx + 1, + } + ) + except BaseException as e: + out_rows.append( + {"bytes": b"", "path": abs_path, "page_number": 0, "error": str(e)} + ) + return pd.DataFrame(out_rows) + + def split_pdf_batch(pdf_batch: Any, **kwargs: Any) -> pd.DataFrame: """ Split a batch of PDFs into per-page single-page PDFs (bytes), without rendering. diff --git a/retriever/src/retriever/txt/__init__.py b/retriever/src/retriever/txt/__init__.py new file mode 100644 index 000000000..8085fdcc0 --- /dev/null +++ b/retriever/src/retriever/txt/__init__.py @@ -0,0 +1,21 @@ +""" +Txt ingestion: tokenizer-based split and chunk DataFrame builder. + +Compatible with the same embed and LanceDB stages as PDF primitives. +""" + +from .split import ( + DEFAULT_MAX_TOKENS, + DEFAULT_OVERLAP_TOKENS, + DEFAULT_TOKENIZER_MODEL_ID, + split_text_by_tokens, + txt_file_to_chunks_df, +) + +__all__ = [ + "DEFAULT_MAX_TOKENS", + "DEFAULT_OVERLAP_TOKENS", + "DEFAULT_TOKENIZER_MODEL_ID", + "split_text_by_tokens", + "txt_file_to_chunks_df", +] diff --git a/retriever/src/retriever/txt/__main__.py b/retriever/src/retriever/txt/__main__.py new file mode 100644 index 000000000..ae7e8f486 --- /dev/null +++ b/retriever/src/retriever/txt/__main__.py @@ -0,0 +1,101 @@ +""" +CLI for .txt extraction: tokenizer-based split, write *.txt_extraction.json. + +Use with: retriever local stage5 run --pattern "*.txt_extraction.json" then stage6. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any, Dict, Optional + +import typer +from rich.console import Console + +from . import txt_file_to_chunks_df + +console = Console() +app = typer.Typer(help="Txt extraction: tokenizer-based split, write primitives-like JSON.") + + +def _to_jsonable(val: Any) -> Any: + """Convert to JSON-serializable (e.g. numpy int64 -> int).""" + if hasattr(val, "item"): + return val.item() + if hasattr(val, "tolist"): + return val.tolist() + if isinstance(val, dict): + return {k: _to_jsonable(v) for k, v in val.items()} + if isinstance(val, list): + return [_to_jsonable(x) for x in val] + return val + + +@app.command("run") +def run( + input_dir: Path = typer.Option( + ..., + "--input-dir", + exists=True, + file_okay=False, + dir_okay=True, + help="Directory to scan for *.txt files.", + ), + output_dir: Optional[Path] = typer.Option( + None, + "--output-dir", + file_okay=False, + dir_okay=True, + help="Directory to write *.txt_extraction.json. Default: alongside each input file.", + ), + max_tokens: int = typer.Option(512, "--max-tokens", min=1, help="Max tokens per chunk."), + overlap: int = typer.Option(0, "--overlap", min=0, help="Overlap tokens between consecutive chunks."), + encoding: str = typer.Option("utf-8", "--encoding", help="File encoding for reading .txt."), + limit: Optional[int] = typer.Option(None, "--limit", min=1, help="Limit number of .txt files processed."), +) -> None: + """ + Scan input_dir for *.txt, tokenizer-split each into chunks, write .txt_extraction.json. + + Output JSON has the same primitives-like shape as stage5 input (text, path, page_number, metadata). + Then run: retriever local stage5 run --input-dir --pattern "*.txt_extraction.json" + and retriever local stage6 run --input-dir . + """ + input_dir = Path(input_dir) + txt_files = sorted(input_dir.glob("*.txt")) + if not txt_files: + console.print(f"[red]No *.txt files found in[/red] {input_dir}") + raise typer.Exit(code=2) + if limit is not None: + txt_files = txt_files[:limit] + + for path in txt_files: + try: + df = txt_file_to_chunks_df( + str(path), + max_tokens=max_tokens, + overlap_tokens=overlap, + encoding=encoding, + ) + except Exception as e: + console.print(f"[red]Failed[/red] {path}: {e}") + continue + + out_path: Path + if output_dir is not None: + output_dir.mkdir(parents=True, exist_ok=True) + out_path = Path(output_dir) / f"{path.stem}.txt_extraction.json" + else: + out_path = path.with_suffix(".txt_extraction.json") + + records = df.to_dict(orient="records") + jsonable = _to_jsonable(records) + # Write as JSON array so stage5's pd.read_json(in_path) returns a DataFrame + out_path.write_text(json.dumps(jsonable, ensure_ascii=False, indent=2), encoding="utf-8") + console.print(f"[green]Wrote[/green] {out_path} ({len(df)} chunks)") + + console.print(f"[green]Done[/green] processed={len(txt_files)}") + + +def main() -> None: + app() diff --git a/retriever/src/retriever/txt/ray_data.py b/retriever/src/retriever/txt/ray_data.py new file mode 100644 index 000000000..221619af0 --- /dev/null +++ b/retriever/src/retriever/txt/ray_data.py @@ -0,0 +1,44 @@ +""" +Ray Data adapter for .txt: TxtSplitActor turns bytes+path batches into chunk rows. +""" + +from __future__ import annotations + +from typing import Any, Dict, List + +import pandas as pd + +from .split import txt_bytes_to_chunks_df + + +class TxtSplitActor: + """ + Ray Data map_batches callable: DataFrame with bytes, path -> DataFrame of chunks. + + Each output row has: text, path, page_number, metadata (same shape as txt_file_to_chunks_df). + """ + + def __init__(self, **kwargs: Any) -> None: + self._kwargs = dict(kwargs) + + def __call__(self, batch_df: pd.DataFrame, **override_kwargs: Any) -> pd.DataFrame: + if not isinstance(batch_df, pd.DataFrame) or batch_df.empty: + return pd.DataFrame(columns=["text", "path", "page_number", "metadata"]) + + kwargs = {**self._kwargs, **override_kwargs} + out_dfs: List[pd.DataFrame] = [] + for _, row in batch_df.iterrows(): + raw = row.get("bytes") + path = row.get("path") + if raw is None or path is None: + continue + path_str = str(path) if path is not None else "" + try: + chunk_df = txt_bytes_to_chunks_df(raw, path_str, **kwargs) + if not chunk_df.empty: + out_dfs.append(chunk_df) + except Exception: + continue + if not out_dfs: + return pd.DataFrame(columns=["text", "path", "page_number", "metadata"]) + return pd.concat(out_dfs, ignore_index=True) diff --git a/retriever/src/retriever/txt/split.py b/retriever/src/retriever/txt/split.py new file mode 100644 index 000000000..7d31ca378 --- /dev/null +++ b/retriever/src/retriever/txt/split.py @@ -0,0 +1,204 @@ +""" +Tokenizer-based text splitting for .txt ingestion. + +Produces chunk DataFrames compatible with embed_text_from_primitives_df +and the LanceDB row builder (text, path, page_number, metadata). +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any, Dict, List, Optional + +import pandas as pd + +DEFAULT_TOKENIZER_MODEL_ID = "nvidia/llama-3.2-nv-embedqa-1b-v2" +DEFAULT_MAX_TOKENS = 512 +DEFAULT_OVERLAP_TOKENS = 0 + + +def _get_tokenizer(model_id: str, cache_dir: Optional[str] = None): # noqa: ANN201 + """Lazy-load HuggingFace tokenizer.""" + from transformers import AutoTokenizer + + return AutoTokenizer.from_pretrained( + model_id, + cache_dir=cache_dir, + ) + + +def split_text_by_tokens( + text: str, + *, + tokenizer: Any, + max_tokens: int, + overlap_tokens: int = 0, +) -> List[str]: + """ + Split text into chunks by token count with optional overlap. + + Chunk boundaries align with tokenizer token boundaries. Uses a sliding + window: each chunk has at most max_tokens tokens; consecutive chunks + overlap by overlap_tokens tokens. + + Parameters + ---------- + text : str + Input text to split. + tokenizer + HuggingFace tokenizer (e.g. AutoTokenizer) with encode/decode. + max_tokens : int + Maximum tokens per chunk. + overlap_tokens : int + Number of tokens to overlap between consecutive chunks (default 0). + + Returns + ------- + list[str] + Chunk strings in order. + """ + if not text or not text.strip(): + return [] + if max_tokens <= 0: + raise ValueError("max_tokens must be positive") + + enc = tokenizer.encode(text, add_special_tokens=False) + if not enc: + return [] + + step = max(1, max_tokens - overlap_tokens) + chunks: List[str] = [] + start = 0 + while start < len(enc): + end = min(start + max_tokens, len(enc)) + chunk_ids = enc[start:end] + chunk_text = tokenizer.decode(chunk_ids, skip_special_tokens=True) + if chunk_text.strip(): + chunks.append(chunk_text) + start += step + if end >= len(enc): + break + + return chunks if chunks else [text] + + +def txt_file_to_chunks_df( + path: str, + *, + max_tokens: int = DEFAULT_MAX_TOKENS, + overlap_tokens: int = DEFAULT_OVERLAP_TOKENS, + tokenizer_model_id: Optional[str] = None, + encoding: str = "utf-8", + tokenizer_cache_dir: Optional[str] = None, + **kwargs: Any, +) -> pd.DataFrame: + """ + Read a .txt file and return a DataFrame of chunks (one row per chunk). + + Columns: text, path, page_number (chunk index, 1-based), metadata. + Shape is compatible with embed_text_from_primitives_df and LanceDB row build. + + Parameters + ---------- + path : str + Path to the .txt file. + max_tokens : int + Max tokens per chunk (default 512). + overlap_tokens : int + Overlap between consecutive chunks (default 0). + tokenizer_model_id : str, optional + HuggingFace model id for tokenizer (default: same as embedder). + encoding : str + File encoding (default utf-8). + tokenizer_cache_dir : str, optional + HuggingFace cache directory for tokenizer. + + Returns + ------- + pd.DataFrame + Columns: text, path, page_number, metadata. + """ + path = str(Path(path).resolve()) + raw = Path(path).read_text(encoding=encoding, errors="replace") + model_id = tokenizer_model_id or DEFAULT_TOKENIZER_MODEL_ID + tokenizer = _get_tokenizer(model_id, cache_dir=tokenizer_cache_dir) + chunk_texts = split_text_by_tokens( + raw, + tokenizer=tokenizer, + max_tokens=max_tokens, + overlap_tokens=overlap_tokens, + ) + + if not chunk_texts: + return pd.DataFrame( + columns=["text", "path", "page_number", "metadata"], + ).astype({"page_number": "int64"}) + + rows: List[Dict[str, Any]] = [] + for i, chunk in enumerate(chunk_texts): + rows.append( + { + "text": chunk, + "content": chunk, + "path": path, + "page_number": i + 1, + "metadata": { + "source_path": path, + "chunk_index": i, + "content_metadata": {"type": "text"}, + "content": chunk, + }, + } + ) + return pd.DataFrame(rows) + + +def txt_bytes_to_chunks_df( + content_bytes: bytes, + path: str, + *, + max_tokens: int = DEFAULT_MAX_TOKENS, + overlap_tokens: int = DEFAULT_OVERLAP_TOKENS, + tokenizer_model_id: Optional[str] = None, + encoding: str = "utf-8", + tokenizer_cache_dir: Optional[str] = None, + **kwargs: Any, +) -> pd.DataFrame: + """ + Decode bytes to text and return a DataFrame of chunks (same shape as txt_file_to_chunks_df). + + Used by batch TxtSplitActor when input is bytes + path from read_binary_files. + """ + path = str(Path(path).resolve()) + raw = content_bytes.decode(encoding, errors="replace") + model_id = tokenizer_model_id or DEFAULT_TOKENIZER_MODEL_ID + tokenizer = _get_tokenizer(model_id, cache_dir=tokenizer_cache_dir) + chunk_texts = split_text_by_tokens( + raw, + tokenizer=tokenizer, + max_tokens=max_tokens, + overlap_tokens=overlap_tokens, + ) + + if not chunk_texts: + return pd.DataFrame( + columns=["text", "path", "page_number", "metadata"], + ).astype({"page_number": "int64"}) + + rows: List[Dict[str, Any]] = [] + for i, chunk in enumerate(chunk_texts): + rows.append( + { + "text": chunk, + "content": chunk, + "path": path, + "page_number": i + 1, + "metadata": { + "source_path": path, + "chunk_index": i, + "content_metadata": {"type": "text"}, + "content": chunk, + }, + } + ) + return pd.DataFrame(rows) diff --git a/retriever/tests/__init__.py b/retriever/tests/__init__.py new file mode 100644 index 000000000..6e195a8eb --- /dev/null +++ b/retriever/tests/__init__.py @@ -0,0 +1 @@ +# Tests for retriever diff --git a/retriever/tests/test_txt_split.py b/retriever/tests/test_txt_split.py new file mode 100644 index 000000000..c74ca2afd --- /dev/null +++ b/retriever/tests/test_txt_split.py @@ -0,0 +1,81 @@ +""" +Unit tests for retriever.txt.split: split_text_by_tokens and txt_file_to_chunks_df. +""" + +import tempfile +from pathlib import Path + +import pandas as pd +import pytest + +from retriever.txt.split import split_text_by_tokens, txt_file_to_chunks_df + + +class _MockTokenizer: + """Minimal tokenizer: encode = split on spaces, decode = join.""" + + def encode(self, text: str, add_special_tokens: bool = False): + return text.split() + + def decode(self, ids, skip_special_tokens: bool = True): + if isinstance(ids, (list, range)): + return " ".join(str(i) for i in ids) + return str(ids) + + +def test_split_text_by_tokens_empty(): + tokenizer = _MockTokenizer() + assert split_text_by_tokens("", tokenizer=tokenizer, max_tokens=10) == [] + assert split_text_by_tokens(" \n ", tokenizer=tokenizer, max_tokens=10) == [] + + +def test_split_text_by_tokens_no_overlap(): + tokenizer = _MockTokenizer() + # "a b c d e f g h i j" -> 10 tokens, max_tokens=3 -> 4 chunks + text = "a b c d e f g h i j" + chunks = split_text_by_tokens(text, tokenizer=tokenizer, max_tokens=3, overlap_tokens=0) + assert len(chunks) >= 1 + joined = " ".join(chunks) + assert "a" in joined and "j" in joined + + +def test_split_text_by_tokens_single_chunk(): + tokenizer = _MockTokenizer() + text = "one two three" + chunks = split_text_by_tokens(text, tokenizer=tokenizer, max_tokens=10, overlap_tokens=0) + assert len(chunks) == 1 + assert chunks[0] == "one two three" + + +def test_split_text_by_tokens_max_tokens_positive(): + tokenizer = _MockTokenizer() + with pytest.raises(ValueError, match="max_tokens must be positive"): + split_text_by_tokens("hello", tokenizer=tokenizer, max_tokens=0) + + +def test_txt_file_to_chunks_df(tmp_path: Path): + pytest.importorskip("transformers") + f = tmp_path / "doc.txt" + f.write_text("First paragraph here. Second paragraph there.", encoding="utf-8") + df = txt_file_to_chunks_df( + str(f), + max_tokens=512, + overlap_tokens=0, + ) + assert isinstance(df, pd.DataFrame) + assert list(df.columns) == ["text", "path", "page_number", "metadata"] + assert len(df) >= 1 + assert df["path"].iloc[0] == str(f.resolve()) + assert df["page_number"].iloc[0] >= 1 + assert "source_path" in df["metadata"].iloc[0] + assert "chunk_index" in df["metadata"].iloc[0] + + +def test_txt_file_to_chunks_df_empty_file(tmp_path: Path): + pytest.importorskip("transformers") + f = tmp_path / "empty.txt" + f.write_text("", encoding="utf-8") + df = txt_file_to_chunks_df(str(f), max_tokens=512) + assert isinstance(df, pd.DataFrame) + assert list(df.columns) == ["text", "path", "page_number", "metadata"] + assert len(df) == 0 From ee0e84914aa51c03063af33f5d2ba804212f079e Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Tue, 24 Feb 2026 17:55:33 +0000 Subject: [PATCH 2/2] Add docx/pptx support to batch/inprocess examples --- .../src/retriever/examples/batch_pipeline.py | 43 ++++++++++++++++++- .../retriever/examples/inprocess_pipeline.py | 22 ++++++++-- 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/retriever/src/retriever/examples/batch_pipeline.py b/retriever/src/retriever/examples/batch_pipeline.py index 4f0c67a96..1b8cd03f2 100644 --- a/retriever/src/retriever/examples/batch_pipeline.py +++ b/retriever/src/retriever/examples/batch_pipeline.py @@ -338,7 +338,7 @@ def main( input_type: str = typer.Option( "pdf", "--input-type", - help="Input format: 'pdf' or 'txt'. Use 'txt' for a directory of .txt files (tokenizer-based chunking).", + help="Input format: 'pdf', 'txt', or 'doc'. Use 'txt' for .txt files (tokenizer chunking). Use 'doc' for .docx/.pptx (converted to PDF via LibreOffice).", ), ray_address: Optional[str] = typer.Option( None, @@ -568,6 +568,47 @@ def main( .embed(model_name="nemo_retriever_v1", embed_invoke_url=embed_invoke_url) .vdb_upload(lancedb_uri=lancedb_uri, table_name=LANCEDB_TABLE, overwrite=True, create_index=True) ) + elif input_type == "doc": + # DOCX/PPTX: same pipeline as PDF; DocToPdfConversionActor converts before split. + doc_globs = [str(input_dir / "*.docx"), str(input_dir / "*.pptx")] + ingestor = create_ingestor( + run_mode="batch", + ray_address=ray_address, + ray_log_to_driver=ray_log_to_driver, + ) + ingestor = ( + ingestor.files(doc_globs) + .extract( + extract_text=True, + extract_tables=True, + extract_charts=True, + extract_infographics=False, + debug_run_id=str(runtime_metrics_prefix or "unknown"), + pdf_extract_workers=int(pdf_extract_workers), + pdf_extract_num_cpus=float(pdf_extract_num_cpus), + pdf_split_batch_size=int(pdf_split_batch_size), + pdf_extract_batch_size=int(pdf_extract_batch_size), + page_elements_batch_size=int(page_elements_batch_size), + page_elements_workers=int(page_elements_workers), + detect_workers=int(ocr_workers), + detect_batch_size=int(ocr_batch_size), + page_elements_cpus_per_actor=float(page_elements_cpus_per_actor), + ocr_cpus_per_actor=float(ocr_cpus_per_actor), + gpu_page_elements=float(gpu_page_elements), + gpu_ocr=float(gpu_ocr), + gpu_embed=float(gpu_embed), + page_elements_invoke_url=page_elements_invoke_url, + ocr_invoke_url=ocr_invoke_url, + ) + .embed( + model_name="nemo_retriever_v1", + embed_workers=int(embed_workers), + embed_batch_size=int(embed_batch_size), + embed_cpus_per_actor=float(embed_cpus_per_actor), + embed_invoke_url=embed_invoke_url, + ) + .vdb_upload(lancedb_uri=lancedb_uri, table_name=LANCEDB_TABLE, overwrite=True, create_index=True) + ) else: pdf_glob = str(input_dir / "*.pdf") ingestor = create_ingestor( diff --git a/retriever/src/retriever/examples/inprocess_pipeline.py b/retriever/src/retriever/examples/inprocess_pipeline.py index e47cba2e3..a94516274 100644 --- a/retriever/src/retriever/examples/inprocess_pipeline.py +++ b/retriever/src/retriever/examples/inprocess_pipeline.py @@ -61,7 +61,7 @@ def main( input_type: str = typer.Option( "pdf", "--input-type", - help="Input format: 'pdf' or 'txt'. Use 'txt' for a directory of .txt files (tokenizer-based chunking).", + help="Input format: 'pdf', 'txt', or 'doc'. Use 'txt' for .txt files. Use 'doc' for .docx/.pptx (converted to PDF via LibreOffice).", ), query_csv: Path = typer.Option( "bo767_query_gt.csv", @@ -90,6 +90,22 @@ def main( .embed(model_name="nemo_retriever_v1") .vdb_upload(lancedb_uri=LANCEDB_URI, table_name=LANCEDB_TABLE, overwrite=False, create_index=True) ) + elif input_type == "doc": + # DOCX/PPTX: same pipeline as PDF; inprocess loader converts to PDF then splits. + doc_globs = [str(input_dir / "*.docx"), str(input_dir / "*.pptx")] + ingestor = create_ingestor(run_mode="inprocess") + ingestor = ( + ingestor.files(doc_globs) + .extract( + method="pdfium", + extract_text=True, + extract_tables=True, + extract_charts=True, + extract_infographics=False, + ) + .embed(model_name="nemo_retriever_v1") + .vdb_upload(lancedb_uri=LANCEDB_URI, table_name=LANCEDB_TABLE, overwrite=False, create_index=True) + ) else: glob_pattern = str(input_dir / "*.pdf") ingestor = create_ingestor(run_mode="inprocess") @@ -155,7 +171,7 @@ def main( hit = _is_hit_at_k(g, top_keys, cfg.top_k) if not no_recall_details: - ext = ".txt" if input_type == "txt" else ".pdf" + ext = ".txt" if input_type == "txt" else (".docx" if input_type == "doc" else ".pdf") print(f"\nQuery {i}: {q}") print(f" Gold: {g} (file: {doc}{ext}, page: {page})") print(f" Hit@{cfg.top_k}: {hit}") @@ -170,7 +186,7 @@ def main( print(f" {rank:02d}. {key} distance={dist:.6f}") if not hit: - ext = ".txt" if input_type == "txt" else ".pdf" + ext = ".txt" if input_type == "txt" else (".docx" if input_type == "doc" else ".pdf") missed_gold.append((f"{doc}{ext}", str(page))) missed_unique = sorted(set(missed_gold), key=lambda x: (x[0], x[1]))