Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ def _process_page_elements(df_to_process: pd.DataFrame, ocr_results: List[Tuple]
return df_to_process

for result_idx, df_idx in enumerate(valid_indices):
# Preserve the original base64 image before overwriting with OCR text.
# This enables text_image modality for multimodal embeddings.
original_image = df_to_process.loc[df_idx, "metadata"]["content"]
df_to_process.loc[df_idx, "metadata"]["text_metadata"]["source_image"] = original_image

# Unpack result: (bounding_boxes, text_predictions, confidence_scores)
bboxes, texts, _ = ocr_results[result_idx]
if not bboxes or not texts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ def pdfium_extractor(
text_extraction_method = extractor_config.get("extract_method", "pdfium")
extract_images_method = extractor_config.get("extract_images_method", "group")
extract_images_params = extractor_config.get("extract_images_params", {})
page_image_max_dimension = extractor_config.get("page_image_max_dimension", 1024)

# Extract metadata_column
metadata_column = extractor_config.get("metadata_column", "metadata")
Expand Down Expand Up @@ -555,7 +556,11 @@ def pdfium_extractor(
page_text = ""
else:
page_text = _extract_page_text(page)
image, _ = pdfium_pages_to_numpy([page], scale_tuple=(16384, 16384), trace_info=execution_trace_log)
image, _ = pdfium_pages_to_numpy(
[page],
scale_tuple=(page_image_max_dimension, page_image_max_dimension),
trace_info=execution_trace_log,
)
base64_image = numpy_to_base64(image[0])
if len(base64_image) > 2**24 - 1:
base64_image, _ = scale_image_to_encoding_size(base64_image, max_base64_size=2**24 - 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,13 @@ class IngestTaskEmbedSchema(BaseModelNoExt):
model_name: Optional[str] = None
api_key: Optional[str] = Field(default=None, repr=False)
filter_errors: bool = False
embed_text_elements: Optional[bool] = None
embed_structured_elements: Optional[bool] = None
embed_image_elements: Optional[bool] = None
embed_audio_elements: Optional[bool] = None
text_elements_modality: Optional[str] = None
image_elements_modality: Optional[str] = None
image_elements_aggregate_page_content: Optional[bool] = None
structured_elements_modality: Optional[str] = None
audio_elements_modality: Optional[str] = None
custom_content_field: Optional[str] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ class TextEmbeddingSchema(BaseModel):
input_type: str = Field(default="passage")
raise_on_failure: bool = Field(default=False)
truncate: str = Field(default="END")
embed_text_elements: bool = Field(default=True)
embed_structured_elements: bool = Field(default=True)
embed_image_elements: bool = Field(default=True)
embed_audio_elements: bool = Field(default=True)
text_elements_modality: str = Field(default="text")
image_elements_modality: str = Field(default="text")
image_elements_aggregate_page_content: bool = Field(default=False)
structured_elements_modality: str = Field(default="text")
audio_elements_modality: str = Field(default="text")
custom_content_field: Optional[str] = None
Expand Down
191 changes: 184 additions & 7 deletions api/src/nv_ingest_api/internal/transform/embed_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
logging.getLogger("httpcore").setLevel(logging.ERROR)


MULTI_MODAL_MODELS = ["llama-3.2-nemoretriever-1b-vlm-embed-v1"]
MULTI_MODAL_MODELS = [
"llama-3.2-nemoretriever-1b-vlm-embed-v1",
"nvidia/llama-nemotron-embed-vl-1b-v2",
]


# ------------------------------------------------------------------------------
Expand Down Expand Up @@ -315,7 +318,9 @@ def _add_custom_embeddings(row, embeddings, result_target_field):
def _format_image_input_string(image_b64: Optional[str]) -> str:
if not image_b64:
return
return f"data:image/png;base64,{image_b64}"
# Detect format from base64 magic bytes: JPEG starts with /9j/, PNG starts with iVBORw
mime_type = "image/jpeg" if image_b64.startswith("/9j/") else "image/png"
return f"data:{mime_type};base64,{image_b64}"


def _format_text_image_pair_input_string(text: Optional[str], image_b64: Optional[str]) -> str:
Expand All @@ -326,19 +331,29 @@ def _format_text_image_pair_input_string(text: Optional[str], image_b64: Optiona

def _get_pandas_text_content(row, modality="text"):
"""
Extracts text content from a DataFrame row.
Extracts text content from a DataFrame row's metadata.

Parameters
----------
row : pandas.Series
A row containing the 'content' key.
row : dict
The metadata dictionary containing 'content' and optionally 'text_metadata.source_image'.

Returns
-------
str
The text content from the row.
The text content, image content, or combined text+image content based on modality.
"""
return row["content"]
if modality == "text":
content = row.get("content")
elif modality == "image":
source_image = row.get("text_metadata", {}).get("source_image")
content = _format_image_input_string(source_image)
elif modality == "text_image":
text = row.get("content")
source_image = row.get("text_metadata", {}).get("source_image")
content = _format_text_image_pair_input_string(text, source_image)

return content


def _get_pandas_table_content(row, modality="text"):
Expand Down Expand Up @@ -431,6 +446,96 @@ def _get_pandas_custom_content(row, custom_content_field):
return None


def _cleanup_source_images(row):
"""
Removes source_image from text_metadata to reduce metadata size.

The source_image field is used during embedding for text_image modality
but should be removed afterward to avoid exceeding storage limits
(e.g., Milvus JSON field 64KB limit).

Parameters
----------
row : pandas.Series
A DataFrame row containing 'metadata'.

Returns
-------
pandas.Series
The row with source_image removed from text_metadata.
"""
text_metadata = row.get("metadata", {}).get("text_metadata")
if text_metadata and "source_image" in text_metadata:
del text_metadata["source_image"]
return row


def _aggregate_page_content(df: pd.DataFrame) -> pd.DataFrame:
"""
Aggregates text content from TEXT and STRUCTURED elements into PAGE_IMAGE entries.

For each page, collects text from:
- TEXT elements: content field
- STRUCTURED elements (tables/charts): table_metadata.table_content field

The aggregated text is stored in image_metadata.text for PAGE_IMAGE entries,
enabling text_image modality embedding with full page context.

Parameters
----------
df : pd.DataFrame
DataFrame containing extracted content with metadata.

Returns
-------
pd.DataFrame
DataFrame with PAGE_IMAGE entries updated to include aggregated page text.
"""
# Build mapping of page_number -> list of text content
page_text_map: Dict[int, List[str]] = {}

for _, row in df.iterrows():
metadata = row.get("metadata", {})
content_metadata = metadata.get("content_metadata", {})
content_type = content_metadata.get("type")
page_number = content_metadata.get("page_number")

if page_number is None:
continue

if page_number not in page_text_map:
page_text_map[page_number] = []

# Collect text from TEXT elements
if content_type == ContentTypeEnum.TEXT.value:
text_content = metadata.get("content")
if text_content and isinstance(text_content, str) and text_content.strip():
page_text_map[page_number].append(text_content.strip())

# Collect text from STRUCTURED elements (tables, charts)
elif content_type == ContentTypeEnum.STRUCTURED.value:
table_content = metadata.get("table_metadata", {}).get("table_content")
if table_content and isinstance(table_content, str) and table_content.strip():
page_text_map[page_number].append(table_content.strip())

# Apply aggregated text to PAGE_IMAGE entries
for idx, row in df.iterrows():
metadata = row.get("metadata", {})
content_metadata = metadata.get("content_metadata", {})

if (
content_metadata.get("type") == ContentTypeEnum.IMAGE.value
and content_metadata.get("subtype") == ContentTypeEnum.PAGE_IMAGE.value
):
page_number = content_metadata.get("page_number")
if page_number in page_text_map and page_text_map[page_number]:
aggregated_text = "\n\n".join(page_text_map[page_number])
image_metadata = metadata.get("image_metadata", {})
image_metadata["text"] = aggregated_text

return df


# ------------------------------------------------------------------------------
# Batch Processing Utilities
# ------------------------------------------------------------------------------
Expand Down Expand Up @@ -579,6 +684,42 @@ def transform_create_text_embeddings_internal(
if df_transform_ledger.empty:
return df_transform_ledger, {"trace_info": execution_trace_log}

# Determine if page content aggregation should be enabled
image_elements_modality = task_config.get("image_elements_modality") or transform_config.image_elements_modality

# Check if user explicitly set the aggregation flag
explicit_aggregate_setting = task_config.get("image_elements_aggregate_page_content")
image_elements_aggregate_page_content = (
explicit_aggregate_setting
if explicit_aggregate_setting is not None
else transform_config.image_elements_aggregate_page_content
)

# Auto-enable aggregation when using text_image modality with PAGE_IMAGE entries
# Only auto-enable if user hasn't explicitly set the flag
if explicit_aggregate_setting is None and not image_elements_aggregate_page_content:
if image_elements_modality == "text_image":
# Check if PAGE_IMAGE entries exist
def _has_page_images(df):
for _, row in df.iterrows():
metadata = row.get("metadata", {})
content_metadata = metadata.get("content_metadata", {})
if (
content_metadata.get("type") == ContentTypeEnum.IMAGE.value
and content_metadata.get("subtype") == ContentTypeEnum.PAGE_IMAGE.value
):
return True
return False

if _has_page_images(df_transform_ledger):
image_elements_aggregate_page_content = True
logger.debug("Auto-enabled page content aggregation for text_image modality with PAGE_IMAGE entries")

# Aggregate text content from TEXT and STRUCTURED elements into PAGE_IMAGE entries
if image_elements_aggregate_page_content:
df_transform_ledger = _aggregate_page_content(df_transform_ledger)
logger.debug("Aggregated page content into PAGE_IMAGE entries for text_image embedding")

embedding_dataframes = []
content_masks = []

Expand All @@ -599,6 +740,33 @@ def transform_create_text_embeddings_internal(
ContentTypeEnum.VIDEO: lambda x: None, # Not supported yet.
}

# Determine which content types to embed
# When aggregating page content, automatically skip TEXT and STRUCTURED unless explicitly set
def _get_embed_flag(content_type: ContentTypeEnum) -> bool:
flag_map = {
ContentTypeEnum.TEXT: task_config.get("embed_text_elements"),
ContentTypeEnum.STRUCTURED: task_config.get("embed_structured_elements"),
ContentTypeEnum.IMAGE: task_config.get("embed_image_elements"),
ContentTypeEnum.AUDIO: task_config.get("embed_audio_elements"),
}
default_map = {
ContentTypeEnum.TEXT: transform_config.embed_text_elements,
ContentTypeEnum.STRUCTURED: transform_config.embed_structured_elements,
ContentTypeEnum.IMAGE: transform_config.embed_image_elements,
ContentTypeEnum.AUDIO: transform_config.embed_audio_elements,
}
task_flag = flag_map.get(content_type)
if task_flag is not None:
return task_flag
# When aggregating page content, skip TEXT and STRUCTURED by default
# since their content is already included in PAGE_IMAGE entries
if image_elements_aggregate_page_content and content_type in (
ContentTypeEnum.TEXT,
ContentTypeEnum.STRUCTURED,
):
return False
return default_map.get(content_type, True)

def _content_type_getter(row):
return row["content_metadata"]["type"]

Expand All @@ -607,6 +775,11 @@ def _content_type_getter(row):
logger.warning(f"Skipping text_embedding generation for unsupported content type: {content_type}")
continue

# Check if this content type should be embedded
if not _get_embed_flag(content_type):
logger.debug(f"Skipping embedding for content type {content_type} (disabled by configuration)")
continue

# Get rows matching the content type
content_mask = df_transform_ledger["metadata"].apply(_content_type_getter) == content_type.value
if not content_mask.any():
Expand Down Expand Up @@ -705,4 +878,8 @@ def _content_type_getter(row):
_add_custom_embeddings, embeddings=custom_embeddings_dict, result_target_field=result_target_field, axis=1
)

# Clean up source_image from text_metadata to avoid exceeding Milvus JSON field limits.
# The source_image is only needed during embedding and can be safely removed afterward.
combined_df = combined_df.apply(_cleanup_source_images, axis=1)

return combined_df, {"trace_info": execution_trace_log}
Loading
Loading