Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build-tools/requirements.docs.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
sphinx>=7.0,<8.0
sphinx
furo
m2r2
autodoc-pydantic
9 changes: 9 additions & 0 deletions documentation/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
'undoc-members': True,
'show-inheritance': True,
}
autodoc_member_order = 'bysource'


# -- Project information -----------------------------------------------------
Expand All @@ -55,8 +56,16 @@
'sphinx.ext.autodoc',
'sphinx.ext.linkcode',
'm2r2',
'sphinxcontrib.autodoc_pydantic',
]

autodoc_pydantic_model_show_json = True
autodoc_pydantic_model_show_field_summary = True
autodoc_pydantic_model_show_config_summary = False
autodoc_pydantic_model_show_validator_members = False
autodoc_pydantic_model_show_validator_summary = False
autodoc_pydantic_field_list_validators = False

templates_path = ['_templates']
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# dynamically generated files
Expand Down
140 changes: 118 additions & 22 deletions mmif/utils/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@
import io
import os
import sys
from typing import Iterator, Optional, TextIO, cast
from typing import Iterator, Optional, TextIO, Type, Union, cast, get_args, get_origin

from pydantic import BaseModel


@contextlib.contextmanager
def open_cli_io_arg(path_or_dash: Optional[str],
mode: str = 'r',
encoding: Optional[str] = None,
errors: Optional[str] = None,
default_stdin: bool = False,
) -> Iterator[TextIO]:
def open_cli_io_arg(
path_or_dash: Optional[str],
mode: str = "r",
encoding: Optional[str] = None,
errors: Optional[str] = None,
default_stdin: bool = False,
) -> Iterator[TextIO]:
"""
Context manager for opening files with stdin/stdout support.

Expand All @@ -28,6 +31,7 @@ def open_cli_io_arg(path_or_dash: Optional[str],
manager.

Handles the common CLI pattern where:

- '-' means stdin (read mode) or stdout (write mode)
- None means "argument not provided"; when default_stdin=True, it falls back
to stdin/stdout
Expand All @@ -54,10 +58,10 @@ def open_cli_io_arg(path_or_dash: Optional[str],
f.write(content)
"""
# Valid text modes for file operations
_READ_FLAGS = frozenset({'r', '+'})
_WRITE_FLAGS = frozenset({'w', 'a', 'x', '+'})
_READ_FLAGS = frozenset({"r", "+"})
_WRITE_FLAGS = frozenset({"w", "a", "x", "+"})

if 'b' in mode:
if "b" in mode:
raise ValueError(
f"Binary mode '{mode}' is not supported. "
"Use text modes ('r', 'w', 'a', 'x') instead."
Expand All @@ -66,9 +70,7 @@ def open_cli_io_arg(path_or_dash: Optional[str],
needs_read = bool(set(mode) & _READ_FLAGS)
needs_write = bool(set(mode) & _WRITE_FLAGS)

should_use_stdio = path_or_dash == '-' or (
path_or_dash is None and default_stdin
)
should_use_stdio = path_or_dash == "-" or (path_or_dash is None and default_stdin)

file_handle: Optional[TextIO] = None
should_close = False
Expand All @@ -83,11 +85,7 @@ def open_cli_io_arg(path_or_dash: Optional[str],

if needs_read:
# Check for missing input when stdin is a terminal
if (
path_or_dash is None
and default_stdin
and sys.stdin.isatty()
):
if path_or_dash is None and default_stdin and sys.stdin.isatty():
raise SystemExit("error: No input provided.")
file_handle = sys.stdin

Expand All @@ -96,14 +94,15 @@ def open_cli_io_arg(path_or_dash: Optional[str],

else:
raise ValueError(
f"Mode '{mode}' not supported with stdin/stdout "
"(use 'r' or 'w')"
f"Mode '{mode}' not supported with stdin/stdout (use 'r' or 'w')"
)

elif isinstance(path_or_dash, str):
if needs_read and not os.path.exists(path_or_dash):
raise FileNotFoundError(f"Input path does not exist: {path_or_dash}")
file_handle = cast(TextIO, io.open(path_or_dash, mode, encoding=encoding, errors=errors))
file_handle = cast(
TextIO, io.open(path_or_dash, mode, encoding=encoding, errors=errors)
)
should_close = True

elif path_or_dash is None:
Expand All @@ -117,13 +116,110 @@ def open_cli_io_arg(path_or_dash: Optional[str],
"Expected str or None."
)

yield file_handle
if file_handle is not None:
yield file_handle

finally:
if should_close and file_handle is not None:
file_handle.close()


def generate_model_summary(model: Type[BaseModel], indent: int = 0) -> str:
lines = []
prefix = " " * indent

# model_fields is a dictionary of FieldInfo objects
for name, field in model.model_fields.items():
# Get the alias if available, otherwise use the field name
field_name = field.alias if field.alias else name

# Get type annotation
type_annotation = field.annotation

def format_type(t) -> str:
origin = get_origin(t)
args = get_args(t)

# Handle Optional (Union[T, None])
if origin is Union and type(None) in args:
non_none_args = [arg for arg in args if arg is not type(None)]
if len(non_none_args) == 1:
return f"{format_type(non_none_args[0])}, optional"

# Handle List
if origin is list:
if args:
return f"[{format_type(args[0])}]"
return "[]"

# Handle Dict
if origin is dict:
return "obj"

# Handle Pydantic Models (Custom Classes)
if isinstance(t, type) and issubclass(t, BaseModel):
return "obj"

# Handle basic types and cleanup
t_str = str(t)
if t_str.startswith("<class '"):
t_str = t_str[8:-2]
if t_str.startswith("typing."):
t_str = t_str[7:]

# Remove module prefix if present
if "." in t_str:
t_str = t_str.split(".")[-1]

return t_str

display_type = format_type(type_annotation)

description = field.description if field.description else ""

line_content = f"{prefix}- {field_name} ({display_type})"
if description:
line_content += f": {description}"
lines.append(line_content)

# Check if it's a Pydantic model or a list/dict of Pydantic models
origin = get_origin(type_annotation)
args = get_args(type_annotation)

nested_model = None
# Handle Optional wrappers for nesting check
check_type = type_annotation
if origin is Union and type(None) in args:
non_none_args = [arg for arg in args if arg is not type(None)]
if len(non_none_args) == 1:
check_type = non_none_args[0]
origin = get_origin(check_type)
args = get_args(check_type)

if isinstance(check_type, type) and issubclass(check_type, BaseModel):
nested_model = check_type
elif (
origin is list
and args
and isinstance(args[0], type)
and issubclass(args[0], BaseModel)
):
nested_model = args[0]
elif (
origin is dict
and args
and len(args) > 1
and isinstance(args[1], type)
and issubclass(args[1], BaseModel)
):
nested_model = args[1]

if nested_model:
lines.append(generate_model_summary(nested_model, indent + 4))

return "\n".join(lines)


# keep imports of CLI modules for historical reasons
# keep them here in the bottom to avoid circular imports
from mmif.utils.cli import rewind
Expand Down
80 changes: 44 additions & 36 deletions mmif/utils/cli/describe.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import argparse
import json
import os
import sys
import textwrap
from pathlib import Path
from typing import Union, cast

from mmif.utils.cli import open_cli_io_arg
from mmif.utils.workflow_helper import generate_workflow_identifier, describe_single_mmif, \
describe_mmif_collection
from mmif.utils.cli import open_cli_io_arg, generate_model_summary

# gen_param_hash is imported for backward compatibility
from mmif.utils.workflow_helper import generate_param_hash
from mmif.utils.workflow_helper import (
CollectionMmifDesc,
SingleMmifDesc,
describe_mmif_collection,
describe_single_mmif,
generate_workflow_identifier,
)


def get_pipeline_specs(mmif_file: Union[str, Path]):
Expand All @@ -33,30 +37,19 @@ def describe_argparser():
'collection of MMIF files.'
)

# get and clean docstrings
def _extract_describe_docstring(func):
doc = func.__doc__.split(':param')[0]
# then cut off all lines after `---`
doc = doc.split('---')[0]
return textwrap.dedent(doc).strip()

single_doc = _extract_describe_docstring(describe_single_mmif)
collection_doc = _extract_describe_docstring(describe_mmif_collection)

additional = textwrap.dedent(f"""
This command extracts workflow information from a single MMIF file or
summarizes a directory of MMIF files. The output is serialized as JSON and
includes:
a directory of MMIF files. The output is serialized as JSON.

Output Schemas:

=========================
Single MMIF file as input
=========================
{single_doc}

==================================
A directory of MMIF files as input
==================================
{collection_doc}
1. Single MMIF File (mmif-file):
{generate_model_summary(SingleMmifDesc, indent=4)}

2. MMIF Collection (mmif-dir):
{generate_model_summary(CollectionMmifDesc, indent=4)}

Use `--help-schema` to inspect the full JSON schema for a specific output type.
""")
return oneliner, additional

Expand All @@ -67,6 +60,7 @@ def prep_argparser(**kwargs):
formatter_class=argparse.RawDescriptionHelpFormatter,
**kwargs
)

parser.add_argument(
"MMIF_FILE",
nargs="?",
Expand All @@ -84,24 +78,37 @@ def prep_argparser(**kwargs):
action="store_true",
help="Pretty-print JSON output"
)
parser.add_argument(
"--help-schema",
nargs=1,
choices=["mmif-file", "mmif-dir"],
metavar="SCHEMA_NAME",
help="Print the JSON schema for the output. Options: mmif-file, mmif-dir."
)
return parser


def main(args):
"""
Main entry point for the describe CLI command.

Reads a MMIF file and outputs a JSON summary containing:

- workflow_id: unique identifier for the source and app sequence
- stats: view counts, annotation counts (total/per-view/per-type), and lists of error/warning/empty view IDs
- views: map of view IDs to app configurations and profiling data

:param args: Parsed command-line arguments
Main block for the describe CLI command.
This function basically works as a wrapper around
:func:`describe_single_mmif` (for single file input) or
:func:`describe_mmif_collection` (for directory input).
"""
if hasattr(args, 'help_schema') and args.help_schema is not None:
schema_name = args.help_schema[0]
if schema_name == 'mmif-file':
model_cls = SingleMmifDesc
elif schema_name == 'mmif-dir':
model_cls = CollectionMmifDesc

schema = model_cls.model_json_schema()
print(json.dumps(schema, indent=2))
sys.exit(0)

output = {}
# if input is a directory
if isinstance(args.MMIF_FILE, (str, os.PathLike)) and Path(args.MMIF_FILE).is_dir():
if Path(str(args.MMIF_FILE)).is_dir():
output = describe_mmif_collection(args.MMIF_FILE)
# if input is a file or stdin
else:
Expand All @@ -125,6 +132,7 @@ def main(args):
tmp_path.unlink()

if output:
# Convert Pydantic models to dicts
with open_cli_io_arg(args.output, 'w', default_stdin=True) as output_file:
json.dump(output, output_file, indent=2 if args.pretty else None)
output_file.write('\n')
Expand Down
Loading
Loading