Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def _run_chart_inference(
model_name="paddle",
max_batch_size=1 if ocr_client.protocol == "grpc" else 2,
)
elif ocr_model_name in {"scene_text_ensemble", "scene_text_wrapper", "scene_text_python"}:
elif ocr_model_name in {"pipeline", "scene_text_ensemble", "scene_text_wrapper", "scene_text_python"}:
future_ocr_kwargs.update(
model_name=ocr_model_name,
input_names=["INPUT_IMAGE_URLS", "MERGE_LEVELS"],
Expand Down Expand Up @@ -241,7 +241,7 @@ def _create_ocr_client(
) -> NimClient:
ocr_model_interface = (
NemoRetrieverOCRModelInterface()
if ocr_model_name in {"scene_text_ensemble", "scene_text_wrapper", "scene_text_python"}
if ocr_model_name in {"pipeline", "scene_text_ensemble", "scene_text_wrapper", "scene_text_python"}
else PaddleOCRModelInterface()
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def _update_infographic_metadata(
model_name="paddle",
max_batch_size=1 if ocr_client.protocol == "grpc" else 2,
)
elif ocr_model_name in {"scene_text_ensemble", "scene_text_wrapper", "scene_text_python"}:
elif ocr_model_name in {"pipeline", "scene_text_ensemble", "scene_text_wrapper", "scene_text_python"}:
infer_kwargs.update(
model_name=ocr_model_name,
input_names=["INPUT_IMAGE_URLS", "MERGE_LEVELS"],
Expand Down Expand Up @@ -153,7 +153,7 @@ def _create_ocr_client(
) -> NimClient:
ocr_model_interface = (
NemoRetrieverOCRModelInterface()
if ocr_model_name in {"scene_text_ensemble", "scene_text_wrapper", "scene_text_python"}
if ocr_model_name in {"pipeline", "scene_text_ensemble", "scene_text_wrapper", "scene_text_python"}
else PaddleOCRModelInterface()
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def _update_text_metadata(
model_name="paddle",
max_batch_size=1 if ocr_client.protocol == "grpc" else 2,
)
elif ocr_model_name in {"scene_text_ensemble", "scene_text_wrapper", "scene_text_python"}:
elif ocr_model_name in {"pipeline", "scene_text_ensemble", "scene_text_wrapper", "scene_text_python"}:
infer_kwargs.update(
model_name=ocr_model_name,
input_names=["INPUT_IMAGE_URLS", "MERGE_LEVELS"],
Expand Down Expand Up @@ -143,7 +143,7 @@ def _create_ocr_client(
) -> NimClient:
ocr_model_interface = (
NemoRetrieverOCRModelInterface()
if ocr_model_name in {"scene_text_ensemble", "scene_text_wrapper", "scene_text_python"}
if ocr_model_name in {"pipeline", "scene_text_ensemble", "scene_text_wrapper", "scene_text_python"}
else PaddleOCRModelInterface()
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def _run_inference(
model_name="paddle",
max_batch_size=1 if ocr_client.protocol == "grpc" else 2,
)
elif ocr_model_name in {"scene_text_ensemble", "scene_text_wrapper", "scene_text_python"}:
elif ocr_model_name in {"pipeline", "scene_text_ensemble", "scene_text_wrapper", "scene_text_python"}:
future_ocr_kwargs.update(
model_name=ocr_model_name,
input_names=["INPUT_IMAGE_URLS", "MERGE_LEVELS"],
Expand Down Expand Up @@ -250,7 +250,7 @@ def _create_ocr_client(
) -> NimClient:
ocr_model_interface = (
NemoRetrieverOCRModelInterface()
if ocr_model_name in {"scene_text_ensemble", "scene_text_wrapper", "scene_text_python"}
if ocr_model_name in {"pipeline", "scene_text_ensemble", "scene_text_wrapper", "scene_text_python"}
else PaddleOCRModelInterface()
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# Copyright (c) 2024, NVIDIA CORPORATION.


YOLOX_MAX_BATCH_SIZE = 8
YOLOX_MAX_BATCH_SIZE = 16
YOLOX_MAX_WIDTH = 1536
YOLOX_MAX_HEIGHT = 1536
YOLOX_CONF_THRESHOLD = 0.01
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
from nv_ingest_api.internal.primitives.nim.model_interface.helpers import preprocess_image_for_paddle
from nv_ingest_api.util.image_processing.transforms import base64_to_numpy

DEFAULT_OCR_MODEL_NAME = "scene_text_ensemble"
NEMORETRIEVER_OCR_MODEL_NAME = "scene_text_wrapper"
NEMORETRIEVER_OCR_ENSEMBLE_MODEL_NAME = "scene_text_ensemble"
NEMORETRIEVER_OCR_BLS_MODEL_NAME = "scene_text_python"
DEFAULT_OCR_MODEL_NAME = "pipeline"
NEMORETRIEVER_OCR_MODEL_NAME = "pipeline"
NEMORETRIEVER_OCR_ENSEMBLE_MODEL_NAME = "pipeline"
NEMORETRIEVER_OCR_BLS_MODEL_NAME = "pipeline"


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -202,64 +202,57 @@ def _extract_content_from_ocr_grpc_response(
model_name: str = DEFAULT_OCR_MODEL_NAME,
) -> List[Tuple[str, str]]:
"""
Parse a gRPC response for one or more images. The response can have two possible shapes:
- (3,) for batch_size=1
- (3, n) for batch_size=n
Parse a gRPC response for one or more images from the BLS pipeline.

In either case:
response[0, i]: byte string containing bounding box data
response[1, i]: byte string containing text prediction data
response[2, i]: (Optional) additional data/metadata (ignored here)
For BLS pipeline, output shape is [1, N*3] which gets reshaped to [N, 3] where:
- response[i, 0]: byte string containing bounding box data
- response[i, 1]: byte string containing text prediction data
- response[i, 2]: byte string containing confidence scores

Parameters
----------
response : np.ndarray
The raw NumPy array from gRPC. Expected shape: (3,) or (3, n).
table_content_format : str
The format of the output text content, e.g. 'simple' or 'pseudo_markdown'.
dims : list of dict, optional
The raw NumPy array from gRPC. Expected shape: (1, N*3) for BLS pipeline.
model_name : str
The name of the model used for inference.
dimensions : list of dict, optional
A list of dict for each corresponding image, used for bounding box scaling.

Returns
-------
list of (str, str)
A list of (content, table_content_format) for each image.
list of [bounding_boxes, text_predictions, conf_scores]
A list of results for each image.

Raises
------
ValueError
If the response is not a NumPy array or has an unexpected shape,
or if the `table_content_format` is unrecognized.
If the response is not a NumPy array or has an unexpected shape.
"""
if not isinstance(response, np.ndarray):
raise ValueError("Unexpected response format: response is not a NumPy array.")

if model_name in [
NEMORETRIEVER_OCR_MODEL_NAME,
NEMORETRIEVER_OCR_ENSEMBLE_MODEL_NAME,
NEMORETRIEVER_OCR_BLS_MODEL_NAME,
]:
response = response.transpose((1, 0))

# If we have shape (3,), convert to (3, 1)
if response.ndim == 1 and response.shape == (3,):
response = response.reshape(3, 1)
elif response.ndim != 2 or response.shape[0] != 3:
raise ValueError(f"Unexpected response shape: {response.shape}. Expecting (3,) or (3, n).")
batch_size = response.shape[1]
# BLS pipeline returns shape [1, N*3] - flatten and reshape to [N, 3]
# Each row contains [bboxes_json, texts_json, scores_json] as byte strings
flat_response = response.flatten()
if flat_response.size % 3 != 0:
raise ValueError(f"Unexpected response size: {flat_response.size}. Expected multiple of 3.")

num_images = flat_response.size // 3
response = flat_response.reshape(num_images, 3)

results: List[Tuple[str, str]] = []

for i in range(batch_size):
for i in range(num_images):
# 1) Parse bounding boxes
bboxes_bytestr: bytes = response[0, i]
bboxes_bytestr: bytes = response[i, 0]
bounding_boxes = json.loads(bboxes_bytestr.decode("utf8"))

# 2) Parse text predictions
texts_bytestr: bytes = response[1, i]
texts_bytestr: bytes = response[i, 1]
text_predictions = json.loads(texts_bytestr.decode("utf8"))

# 3) Parse confidence scores
confs_bytestr: bytes = response[2, i]
confs_bytestr: bytes = response[i, 2]
conf_scores = json.loads(confs_bytestr.decode("utf8"))

# Some gRPC responses nest single-item lists; flatten them if needed
Expand Down Expand Up @@ -699,25 +692,26 @@ def _format_single_batch(
merge_level = kwargs.get("merge_level", "paragraph")

if protocol == "grpc":
logger.debug("Formatting input for gRPC OCR model (batched).")
processed: List[np.ndarray] = []
logger.debug("Formatting input for gRPC OCR BLS pipeline model (batched).")

# Build image URLs with data URL prefix as expected by NIM BLS pipeline
image_urls = []
for img, shape in zip(batch_images, batch_dims):
_dims = {"new_width": shape[1], "new_height": shape[0]}
dims.append(_dims)
image_url = f"data:image/png;base64,{img}"
image_urls.append(image_url)

arr = np.array([img], dtype=np.object_)
arr = np.expand_dims(arr, axis=0)
processed.append(arr)

batched_input = np.concatenate(processed, axis=0)
# Create input arrays with shape [1, N] for BLS pipeline
# This matches the NIM's expected format: batch of 1 request containing N images
num_images = len(image_urls)
image_array = np.array(image_urls, dtype=np.object_).reshape(1, num_images)

batch_size = batched_input.shape[0]
# Merge levels with shape [1, N] for BLS pipeline
merge_levels_list = [merge_level] * num_images
merge_levels_array = np.array(merge_levels_list, dtype=np.object_).reshape(1, num_images)

merge_levels_list = [[merge_level] for _ in range(batch_size)]
merge_levels = np.array(merge_levels_list, dtype="object")

final_batch = [batched_input, merge_levels]
final_batch = [image_array, merge_levels_array]
batch_data = {"image_dims": dims}

return final_batch, batch_data
Expand Down Expand Up @@ -768,7 +762,14 @@ def get_ocr_model_name(ocr_grpc_endpoint=None, default_model_name=DEFAULT_OCR_MO
client = grpcclient.InferenceServerClient(ocr_grpc_endpoint)
model_index = client.get_model_repository_index(as_json=True)
model_names = [x["name"] for x in model_index.get("models", [])]
ocr_model_name = model_names[0]

# Prefer 'pipeline' model if available (BLS model)
if "pipeline" in model_names:
ocr_model_name = "pipeline"
elif "scene_text_ensemble" in model_names:
ocr_model_name = "scene_text_ensemble"
else:
ocr_model_name = model_names[0] if model_names else default_model_name
except Exception:
logger.warning(f"Failed to get ocr model name after 30 seconds. Falling back to '{default_model_name}'.")
ocr_model_name = default_model_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,12 @@ def __init__(
self.class_labels = class_labels

if endpoints:
self.model_name = get_yolox_model_name(endpoints[0], default_model_name="yolox_ensemble")
self._grpc_uses_bls = self.model_name == "pipeline"
self.model_name = get_yolox_model_name(endpoints[0], default_model_name="pipeline")
else:
self._grpc_uses_bls = False
self.model_name = "pipeline"

# Always use BLS format for gRPC - NIMs use the pipeline model
self._grpc_uses_bls = True

def prepare_data_for_inference(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -213,26 +215,25 @@ def chunk_list_geometrically(lst: list, max_size: int) -> List[list]:
return chunks

if protocol == "grpc":
logger.debug("Formatting input for gRPC Yolox Ensemble model")
logger.debug("Formatting input for gRPC Yolox BLS pipeline model")

# Convert images to base64 data URLs as expected by the NIM BLS pipeline
b64_images = [numpy_to_base64(image, format=YOLOX_PAGE_IMAGE_FORMAT) for image in data["images"]]
b64_chunks = chunk_list_geometrically(b64_images, max_batch_size)
b64_data_urls = [f"data:image/{YOLOX_PAGE_IMAGE_FORMAT.lower()};base64,{b64}" for b64 in b64_images]

b64_chunks = chunk_list_geometrically(b64_data_urls, max_batch_size)
original_chunks = chunk_list_geometrically(data["images"], max_batch_size)
shape_chunks = chunk_list_geometrically(data["original_image_shapes"], max_batch_size)

batched_inputs = []
formatted_batch_data = []
for b64_chunk, orig_chunk, shapes in zip(b64_chunks, original_chunks, shape_chunks):
input_array = np.array(b64_chunk, dtype=np.object_)
# Create input array with shape [1, N] for BLS pipeline
# This matches the NIM's expected format: batch of 1 request containing N images
input_array = np.array(b64_chunk, dtype=np.object_).reshape(1, -1)

if self._grpc_uses_bls:
# For BLS with dynamic batching (max_batch_size > 0), we need to add explicit batch dimension
# Shape [N] becomes [1, N] to indicate: batch of 1, containing N images
input_array = input_array.reshape(1, -1)
thresholds = np.array([[self.conf_threshold, self.iou_threshold]], dtype=np.float32)
else:
current_batch_size = input_array.shape[0]
single_threshold_pair = [self.conf_threshold, self.iou_threshold]
thresholds = np.tile(single_threshold_pair, (current_batch_size, 1)).astype(np.float32)
# Thresholds with shape [1, 2] for BLS pipeline
thresholds = np.array([[self.conf_threshold, self.iou_threshold]], dtype=np.float32)

batched_inputs.append([input_array, thresholds])
formatted_batch_data.append({"images": orig_chunk, "original_image_shapes": shapes})
Expand Down Expand Up @@ -336,7 +337,7 @@ def process_inference_results(self, output: Any, protocol: str, **kwargs) -> Lis
Parameters
----------
output_array : np.ndarray
The raw output from the Yolox model.
The raw output from the Yolox model. For BLS pipeline, shape is [1, N].
kwargs : dict
Additional parameters for processing, including thresholds and number of classes.

Expand All @@ -351,11 +352,13 @@ def process_inference_results(self, output: Any, protocol: str, **kwargs) -> Lis

elif protocol == "grpc":
results = []
# For grpc, apply the same NIM postprocessing.
for out in output:
# For BLS pipeline, output shape is [1, N] - flatten to iterate over individual results
flat_output = output.flatten() if hasattr(output, "flatten") else output
for out in flat_output:
if isinstance(out, bytes):
out = out.decode("utf-8")
if isinstance(out, dict):
results.append(out)
continue
results.append(json.loads(out))
inference_results = self.postprocess_annotations(results, **kwargs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def infer(self, data: dict, model_name: str, **kwargs) -> Any:
)

# Check for a custom maximum pool worker count, and remove it from kwargs.
max_pool_workers = kwargs.pop("max_pool_workers", 16)
max_pool_workers = kwargs.pop("max_pool_workers", 24)

# 4. Process each batch concurrently using a thread pool.
# We enumerate the batches so that we can later reassemble results in order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ class ChartExtractorConfigSchema(LowercaseProtocolMixin):
ocr_endpoints: Tuple[Optional[str], Optional[str]] = (None, None)
ocr_infer_protocol: str = ""

nim_batch_size: int = 2
workers_per_progress_engine: int = 5
nim_batch_size: int = 8
workers_per_progress_engine: int = 12

@model_validator(mode="before")
@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class InfographicExtractorConfigSchema(LowercaseProtocolMixin):
ocr_endpoints: Tuple[Optional[str], Optional[str]] = (None, None)
ocr_infer_protocol: str = ""

nim_batch_size: int = 2
workers_per_progress_engine: int = 5
nim_batch_size: int = 8
workers_per_progress_engine: int = 12

@model_validator(mode="before")
@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class OCRExtractorConfigSchema(LowercaseProtocolMixin):
ocr_endpoints: Tuple[Optional[str], Optional[str]] = (None, None)
ocr_infer_protocol: str = ""

nim_batch_size: int = 2
workers_per_progress_engine: int = 5
nim_batch_size: int = 8
workers_per_progress_engine: int = 12

@model_validator(mode="before")
@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ class PDFiumConfigSchema(LowercaseProtocolMixin):
yolox_endpoints: Tuple[Optional[str], Optional[str]] = (None, None)
yolox_infer_protocol: str = ""

nim_batch_size: int = 4
workers_per_progress_engine: int = 5
nim_batch_size: int = 8
workers_per_progress_engine: int = 12

@model_validator(mode="before")
@classmethod
Expand Down Expand Up @@ -137,7 +137,7 @@ class NemotronParseConfigSchema(LowercaseProtocolMixin):

timeout: float = 300.0

workers_per_progress_engine: int = 5
workers_per_progress_engine: int = 12

@model_validator(mode="before")
@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class TableExtractorConfigSchema(LowercaseProtocolMixin):
ocr_endpoints: Tuple[Optional[str], Optional[str]] = (None, None)
ocr_infer_protocol: str = ""

nim_batch_size: int = 2
workers_per_progress_engine: int = 5
nim_batch_size: int = 8
workers_per_progress_engine: int = 12

@model_validator(mode="before")
@classmethod
Expand Down
Loading
Loading