From 2f562895384cde71167c97cf9728fea07e4a5e2b Mon Sep 17 00:00:00 2001 From: Lucas Giordano Date: Sun, 2 Nov 2025 18:27:04 +0100 Subject: [PATCH 1/6] add notte cli --- makefile | 10 +- .../src/notte_core/browser/observation.py | 2 +- .../src/notte_core/credentials/base.py | 8 + packages/notte-sdk/pyproject.toml | 4 + packages/notte-sdk/src/notte_sdk/__init__.py | 4 + .../notte-sdk/src/notte_sdk/cli/__init__.py | 407 ++++++++++++++++++ .../notte-sdk/src/notte_sdk/cli/metadata.py | 366 ++++++++++++++++ .../src/notte_sdk/cli/workflow_cli.py | 55 +++ .../notte-sdk/src/notte_sdk/decorators.py | 102 +++++ uv.lock | 2 + 10 files changed, 954 insertions(+), 6 deletions(-) create mode 100644 packages/notte-sdk/src/notte_sdk/cli/__init__.py create mode 100644 packages/notte-sdk/src/notte_sdk/cli/metadata.py create mode 100644 packages/notte-sdk/src/notte_sdk/cli/workflow_cli.py create mode 100644 packages/notte-sdk/src/notte_sdk/decorators.py diff --git a/makefile b/makefile index 50994004a..83af13a5c 100644 --- a/makefile +++ b/makefile @@ -13,17 +13,17 @@ test-cicd: .PHONY: test-sdk test-sdk: - uv run pytest -n logical tests/sdk - uv run pytest -n logical tests/integration/sdk + uv run pytest -n 4 tests/sdk + uv run pytest -n 4 tests/integration/sdk .PHONY: test-docs test-docs: - uv run pytest -n logical tests/docs + uv run pytest -n 4 tests/docs .PHONY: test-agent test-agent: - uv run pytest -n logical tests/agent - uv run pytest -n logical tests/integration/sdk/test_vault.py + uv run pytest -n 4 tests/agent + uv run pytest -n 4 tests/integration/sdk/test_vault.py .PHONY: test-sdk-staging test-sdk-staging: diff --git a/packages/notte-core/src/notte_core/browser/observation.py b/packages/notte-core/src/notte_core/browser/observation.py index 215e32811..3161137d2 100644 --- a/packages/notte-core/src/notte_core/browser/observation.py +++ b/packages/notte-core/src/notte_core/browser/observation.py @@ -248,4 +248,4 @@ def validate_exception(cls, v: Any) -> NotteBaseError | Exception | None: def model_post_init(self, context: Any, /) -> None: if self.success: if self.exception is not None: - raise ValueError("Exception should be None if success is True") + raise ValueError(f"Exception should be None if success is True: {self.exception}") diff --git a/packages/notte-core/src/notte_core/credentials/base.py b/packages/notte-core/src/notte_core/credentials/base.py index 01dc6745b..c359b0326 100644 --- a/packages/notte-core/src/notte_core/credentials/base.py +++ b/packages/notte-core/src/notte_core/credentials/base.py @@ -486,6 +486,14 @@ async def add_credentials_from_env_async(self, url: str) -> None: def get_credentials(self, url: str) -> CredentialsDict | None: return asyncio.run(self.get_credentials_async(url=url)) + def get_mfa_code(self, url: str) -> str: + cred = self.get_credentials(url) + if cred is None: + raise ValueError(f"No credentials found for {url}") + if "mfa_secret" not in cred: + raise ValueError(f"No mfa secret found for {url}. Please update your credentials to include an mfa secret.") + return TOTP(cred["mfa_secret"]).now() + @profiler.profiled() # noqa: F821 async def get_credentials_async(self, url: str) -> CredentialsDict | None: """Get credentials for a given URL. diff --git a/packages/notte-sdk/pyproject.toml b/packages/notte-sdk/pyproject.toml index 0aa2a3b2e..3ed60c85d 100644 --- a/packages/notte-sdk/pyproject.toml +++ b/packages/notte-sdk/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "halo>=0.0.28", "notte-core==1.4.4.dev", "websockets>=13.1", + "typer>=0.9.0", ] [project.optional-dependencies] @@ -22,6 +23,9 @@ playwright = [ "playwright~=1.55", ] +[project.scripts] +notte-workflow = "notte_sdk.cli:main" + [build-system] requires = ["hatchling"] build-backend = "hatchling.build" diff --git a/packages/notte-sdk/src/notte_sdk/__init__.py b/packages/notte-sdk/src/notte_sdk/__init__.py index 82d32961e..40d4bba31 100644 --- a/packages/notte-sdk/src/notte_sdk/__init__.py +++ b/packages/notte-sdk/src/notte_sdk/__init__.py @@ -28,7 +28,9 @@ UploadFile, Wait, ) +from notte_sdk.cli.workflow_cli import workflow_cli from notte_sdk.client import NotteClient +from notte_sdk.decorators import workflow from notte_sdk.endpoints.agents import RemoteAgent from notte_sdk.endpoints.sessions import RemoteSession from notte_sdk.errors import retry @@ -42,6 +44,8 @@ "RemoteAgent", "retry", "generate_cookies", + "workflow", + "workflow_cli", "FormFill", "Goto", "GotoNewTab", diff --git a/packages/notte-sdk/src/notte_sdk/cli/__init__.py b/packages/notte-sdk/src/notte_sdk/cli/__init__.py new file mode 100644 index 000000000..024a1d5b6 --- /dev/null +++ b/packages/notte-sdk/src/notte_sdk/cli/__init__.py @@ -0,0 +1,407 @@ +from __future__ import annotations + +import importlib.util +import json +import os +import sys +from pathlib import Path +from typing import Annotated, Any + +import typer +from notte_core.common.logging import logger + +from notte_sdk.cli.metadata import ( + WorkflowMetadata, + comment_main_block, + get_git_author, + insert_metadata_block, + uncomment_main_block, +) +from notte_sdk.client import NotteClient +from notte_sdk.decorators import get_workflow_description, get_workflow_name, is_workflow + +app = typer.Typer( + name="notte-workflow", + help="Notte Workflow CLI - Manage workflow lifecycle from your workflow files", + add_completion=False, + no_args_is_help=True, +) + + +def get_default_file_path() -> Path: + """Get the default file path from sys.argv[0] if running from a workflow file.""" + if len(sys.argv) > 1 and sys.argv[1] in ["create", "update", "run"]: + # Running as: python workflow_file.py create + return Path(sys.argv[0]).resolve() + # Default fallback - will raise error if not provided + raise typer.BadParameter("File path is required when not running from a workflow file") + + +def find_workflow_function(module: Any) -> tuple[Any, str] | None: + """ + Find the workflow function in a module. + + Args: + module: The imported module. + + Returns: + Tuple of (function, function_name) if found, None otherwise. + """ + for name in dir(module): + obj = getattr(module, name) + if callable(obj) and is_workflow(obj): + return obj, name + return None + + +def load_workflow_file(file_path: Path) -> tuple[Any, str, Any]: + """ + Load a workflow file and find the workflow function. + + Args: + file_path: Path to the workflow Python file. + + Returns: + Tuple of (module, function_name, function). + + Raises: + ValueError: If no workflow function is found. + """ + # Set __name__ to something other than "__main__" to prevent execution of if __name__ == "__main__" block + import sys + + original_argv = sys.argv.copy() + try: + # Temporarily modify sys.argv to prevent the workflow from thinking it's being run directly + # Save original to restore later + spec = importlib.util.spec_from_file_location("workflow_module", file_path) + if spec is None or spec.loader is None: + raise ValueError(f"Could not load module from {file_path}") + + module = importlib.util.module_from_spec(spec) + # Set __name__ to the module name (not "__main__") so if __name__ == "__main__" blocks don't execute + module.__name__ = spec.name + spec.loader.exec_module(module) + + result = find_workflow_function(module) + if result is None: + raise ValueError( + f"No workflow function found in {file_path}. Make sure to decorate a function with @workflow." + ) + + func, func_name = result + return module, func_name, func + finally: + # Restore original sys.argv + sys.argv = original_argv + + +def get_api_key(api_key: str | None = None) -> str: + """ + Get API key from argument or environment variable. + + Args: + api_key: Optional API key from CLI argument. + + Returns: + The API key. + + Raises: + ValueError: If no API key is found. + """ + if api_key: + return api_key + api_key = os.getenv("NOTTE_API_KEY") + if not api_key: + raise ValueError("NOTTE_API_KEY not found. Set it in environment or use --api-key flag.") + return api_key + + +def prepare_workflow_file_for_upload(file_path: Path) -> tuple[Path, bool]: + """ + Prepare a workflow file for upload by commenting out the __main__ block. + + Args: + file_path: Path to the workflow file. + + Returns: + Tuple of (temp_file_path, was_commented) where temp_file_path is the path to + the temporary file with commented __main__ block, and was_commented indicates + if the block was found and commented. + """ + # Read current content + content = file_path.read_text(encoding="utf-8") + + # Comment out __main__ block + content, commented = comment_main_block(content) + if commented: + logger.debug("Commented out __main__ block for upload") + + # Write temporary file with .py extension (API requires .py files) + temp_file = file_path.parent / f".{file_path.stem}_temp{file_path.suffix}" + _ = temp_file.write_text(content, encoding="utf-8") + + return temp_file, commented + + +def restore_workflow_file(file_path: Path, was_commented: bool) -> None: + """ + Restore the workflow file by uncommenting the __main__ block if needed. + + Args: + file_path: Path to the workflow file. + was_commented: Whether the __main__ block was commented out. + """ + if was_commented: + # Read content again + content = file_path.read_text(encoding="utf-8") + + # Uncomment __main__ block + content, _ = uncomment_main_block(content) + + # Write back to file + _ = file_path.write_text(content, encoding="utf-8") + + +@app.command() +def create( + file: Annotated[ + Path, + typer.Argument( + help="Path to the workflow Python file", + default_factory=get_default_file_path, + ), + ], + api_key: Annotated[ + str | None, typer.Option("--api-key", help="Notte API key (defaults to NOTTE_API_KEY environment variable") + ] = None, +) -> None: + """Create a new workflow.""" + try: + api_key = get_api_key(api_key) + except ValueError as e: + logger.error(str(e)) + raise typer.Exit(1) + + logger.info(f"Creating workflow from {file}") + + # Load the workflow function + _module, _func_name, func = load_workflow_file(file) + + # Get workflow metadata from decorator + name = get_workflow_name(func) + description = get_workflow_description(func) + + if not name: + logger.error("Workflow name is required. Set it in the @workflow decorator.") + raise typer.Exit(1) + + # Check if metadata already exists + existing_metadata = WorkflowMetadata.from_file(file) + if existing_metadata and existing_metadata.workflow_id: + logger.error( + f"Workflow already exists with ID: {existing_metadata.workflow_id}. Use 'update' command to update it." + ) + raise typer.Exit(1) + + try: + # Prepare file for upload (comment out __main__ block) + temp_file, was_commented = prepare_workflow_file_for_upload(file) + + try: + # Create client and workflow + client = NotteClient(api_key=api_key) + workflow_obj = client.Workflow( + workflow_path=str(temp_file), name=name, description=description, _client=client + ) + + logger.info(f"Workflow created with ID: {workflow_obj.workflow_id}") + + # Create metadata and insert into file + metadata = WorkflowMetadata( + workflow_id=workflow_obj.workflow_id, + name=name, + description=description, + author=get_git_author(file), + creation_date=workflow_obj.response.created_at.isoformat(), + last_update_date=workflow_obj.response.updated_at.isoformat(), + ) + + # Read current content + content = file.read_text(encoding="utf-8") + + # Insert metadata block + content = insert_metadata_block(content, metadata) + + # Write back to file + _ = file.write_text(content, encoding="utf-8") + + # Restore __main__ block if it was commented + restore_workflow_file(file, was_commented) + + logger.info(f"Metadata block added to {file}") + logger.info(f"You can reference this workflow using: notte.Workflow('{workflow_obj.workflow_id}')") + finally: + # Clean up temp file + if temp_file.exists(): + temp_file.unlink() + except Exception as e: + logger.error(f"Error: {e}") + raise typer.Exit(1) + + +@app.command() +def update( + file: Annotated[ + Path, + typer.Argument( + help="Path to the workflow Python file", + default_factory=get_default_file_path, + ), + ], + api_key: Annotated[ + str | None, typer.Option("--api-key", help="Notte API key (defaults to NOTTE_API_KEY environment variable") + ] = None, + restricted: Annotated[ + bool, typer.Option("--restricted/--no-restricted", help="Run workflow in restricted mode") + ] = True, +) -> None: + """Update an existing workflow.""" + try: + api_key = get_api_key(api_key) + except ValueError as e: + logger.error(str(e)) + raise typer.Exit(1) + + logger.info(f"Updating workflow from {file}") + + # Read metadata + metadata = WorkflowMetadata.from_file(file) + if not metadata or not metadata.workflow_id: + logger.error("No workflow metadata found. Run 'create' command first to create the workflow.") + raise typer.Exit(1) + + # Prepare file for upload (comment out __main__ block) + temp_file, was_commented = prepare_workflow_file_for_upload(file) + + try: + # Update workflow + client = NotteClient(api_key=api_key) + workflow_obj = client.Workflow(workflow_id=metadata.workflow_id, _client=client) + workflow_obj.update(workflow_path=str(temp_file), restricted=restricted) + + logger.info(f"Workflow {metadata.workflow_id} updated successfully") + + # Update metadata + metadata.last_update_date = workflow_obj.response.updated_at.isoformat() + + # Read content again (may have been modified) + content = file.read_text(encoding="utf-8") + + # Insert metadata block + content = insert_metadata_block(content, metadata) + + # Write back to file + _ = file.write_text(content, encoding="utf-8") + + # Restore __main__ block if it was commented + restore_workflow_file(file, was_commented) + + logger.info(f"Metadata updated in {file}") + except Exception as e: + logger.error(f"Error: {e}") + raise typer.Exit(1) + finally: + # Clean up temp file + if temp_file.exists(): + temp_file.unlink() + + +@app.command() +def run( + file: Annotated[ + Path, + typer.Argument( + help="Path to the workflow Python file", + default_factory=get_default_file_path, + ), + ], + api_key: Annotated[ + str | None, typer.Option("--api-key", help="Notte API key (defaults to NOTTE_API_KEY environment variable") + ] = None, + local: Annotated[bool, typer.Option("--local", help="Run workflow locally instead of on cloud")] = False, + variables: Annotated[ + Path | None, typer.Option("--variables", help="JSON file containing workflow variables") + ] = None, + timeout: Annotated[int | None, typer.Option("--timeout", help="Timeout in seconds for cloud runs")] = None, + stream: Annotated[ + bool, typer.Option("--stream/--no-stream", help="Enable/disable streaming logs for cloud runs") + ] = True, + raise_on_failure: Annotated[ + bool, typer.Option("--raise-on-failure/--no-raise-on-failure", help="Raise exception on workflow failure") + ] = True, +) -> None: + """Run a workflow.""" + if local: + logger.info(f"Running workflow locally from {file}") + # For local runs, just execute the file + # This mimics: uv run python workflow_code.py + import subprocess + + result = subprocess.run([sys.executable, str(file)], check=False) + raise typer.Exit(result.returncode) + else: + try: + api_key = get_api_key(api_key) + except ValueError as e: + logger.error(str(e)) + raise typer.Exit(1) + + logger.info(f"Running workflow on cloud from {file}") + + # Read metadata + metadata = WorkflowMetadata.from_file(file) + if not metadata or not metadata.workflow_id: + logger.error("No workflow metadata found. Run 'create' command first to create the workflow.") + raise typer.Exit(1) + + # Load variables if provided + variables_dict: dict[str, Any] = {} + if variables: + if not variables.exists(): + logger.error(f"Variables file not found: {variables}") + raise typer.Exit(1) + variables_dict = json.loads(variables.read_text(encoding="utf-8")) + + try: + # Run workflow + client = NotteClient(api_key=api_key) + workflow_obj = client.Workflow(workflow_id=metadata.workflow_id, _client=client) + result = workflow_obj.run( + timeout=timeout, + stream=stream, + raise_on_failure=raise_on_failure, + **variables_dict, + ) + + logger.info(f"Workflow completed with status: {result.status}") + logger.info(f"Result: {result.result}") + except Exception as e: + logger.error(f"Error: {e}") + raise typer.Exit(1) + + +def main(_file_path: Path | None = None) -> None: + """ + Main CLI entry point. + + Args: + _file_path: Optional path to workflow file. If None, will be auto-detected from sys.argv. + Currently unused, kept for compatibility with workflow_cli(). + """ + # Run typer app directly - typer handles help, argument parsing, etc. + app() + + +if __name__ == "__main__": + main() diff --git a/packages/notte-sdk/src/notte_sdk/cli/metadata.py b/packages/notte-sdk/src/notte_sdk/cli/metadata.py new file mode 100644 index 000000000..418394af9 --- /dev/null +++ b/packages/notte-sdk/src/notte_sdk/cli/metadata.py @@ -0,0 +1,366 @@ +from __future__ import annotations + +import datetime as dt +import re +import subprocess +from pathlib import Path + +from notte_core.common.logging import logger + + +class WorkflowMetadata: + """Represents workflow metadata stored in a Python file.""" + + workflow_id: str | None + name: str | None + description: str | None + author: str | None + creation_date: str | None + last_update_date: str | None + + def __init__( + self, + workflow_id: str | None = None, + name: str | None = None, + description: str | None = None, + author: str | None = None, + creation_date: str | None = None, + last_update_date: str | None = None, + ): + self.workflow_id = workflow_id + self.name = name + self.description = description + self.author = author + self.creation_date = creation_date + self.last_update_date = last_update_date + + @classmethod + def from_file(cls, file_path: Path) -> WorkflowMetadata | None: + """ + Parse metadata from a Python file. + + Args: + file_path: Path to the Python file. + + Returns: + WorkflowMetadata if found, None otherwise. + """ + try: + content = file_path.read_text(encoding="utf-8") + return cls.from_string(content) + except Exception as e: + logger.debug(f"Failed to read metadata from {file_path}: {e}") + return None + + @classmethod + def from_string(cls, content: str) -> WorkflowMetadata | None: + """ + Parse metadata from file content string. + + Args: + content: The file content. + + Returns: + WorkflowMetadata if found, None otherwise. + """ + # Look for the metadata block - match from start of file + lines = content.split("\n") + metadata_start = None + for i, line in enumerate(lines): + if line.strip() == "#!/notte/workflow": + metadata_start = i + break + + if metadata_start is None: + return None + + metadata = cls() + + # Parse lines after the metadata marker + for i in range(metadata_start + 1, len(lines)): + line = lines[i].strip() + + # Stop at first non-comment, non-empty line after metadata block + if line and not line.startswith("#"): + # Check if we've seen any metadata fields + if metadata.workflow_id or metadata.name: + break + continue + + # Skip empty comment lines + if line in ("#", ""): + continue + + # Parse field lines: # key: value + if line.startswith("#"): + # Remove leading # + content_line = line[1:].strip() + if ":" in content_line: + key, value = content_line.split(":", 1) + key = key.strip() + value = value.strip() + + if key == "workflow_id": + metadata.workflow_id = value + elif key == "name": + metadata.name = value + elif key == "description": + metadata.description = value + elif key == "author": + metadata.author = value + elif key == "creation_date": + metadata.creation_date = value + elif key == "last_update_date": + metadata.last_update_date = value + + return metadata if metadata.workflow_id or metadata.name else None + + def to_block(self) -> str: + """ + Convert metadata to a metadata block string. + + Returns: + The metadata block as a string. + """ + lines = ["#!/notte/workflow", "#"] + if self.workflow_id: + lines.append(f"# workflow_id: {self.workflow_id}") + if self.name: + lines.append(f"# name: {self.name}") + if self.description: + lines.append(f"# description: {self.description}") + if self.author: + lines.append(f"# author: {self.author}") + if self.creation_date: + lines.append(f"# creation_date: {self.creation_date}") + if self.last_update_date: + lines.append(f"# last_update_date: {self.last_update_date}") + return "\n".join(lines) + "\n" + + def update_from_api(self, workflow_id: str, name: str | None = None) -> None: + """ + Update metadata from API response. + + Args: + workflow_id: The workflow ID from the API. + name: Optional workflow name from the API. + """ + self.workflow_id = workflow_id + if name: + self.name = name + now = dt.datetime.now(dt.timezone.utc).isoformat() + if not self.creation_date: + self.creation_date = now + self.last_update_date = now + + +def get_git_author(file_path: Path | None = None) -> str | None: + """ + Get git author information for the current repository. + + Args: + file_path: Optional path to a file in the git repository. + + Returns: + Author string in format "Name " or None if not available. + """ + try: + # Try to get author from git config + result = subprocess.run( + ["git", "config", "user.name"], + capture_output=True, + text=True, + check=True, + timeout=5, + ) + name = result.stdout.strip() + + result = subprocess.run( + ["git", "config", "user.email"], + capture_output=True, + text=True, + check=True, + timeout=5, + ) + email = result.stdout.strip() + + if name and email: + return f"{name} <{email}>" + elif name: + return name + except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired): + pass + + # Fallback: try to get from git log if file_path is provided + if file_path: + try: + result = subprocess.run( + ["git", "log", "-1", "--format=%an <%ae>", "--", str(file_path)], + capture_output=True, + text=True, + check=True, + timeout=5, + ) + author = result.stdout.strip() + if author: + return author + except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired): + pass + + return None + + +def insert_metadata_block(content: str, metadata: WorkflowMetadata) -> str: + """ + Insert or update metadata block in file content. + + Args: + content: The file content. + metadata: The metadata to insert. + + Returns: + The content with metadata block inserted/updated. + """ + metadata_block = metadata.to_block() + + # Check if metadata block already exists + existing_metadata = WorkflowMetadata.from_string(content) + if existing_metadata: + # Replace existing metadata block + pattern = r"#!/notte/workflow\s*\n(?:#\s*\n)?(?:#\s*(.+?)\s*\n)*" + content = re.sub(pattern, metadata_block + "\n", content, flags=re.MULTILINE) + else: + # Insert metadata block at the beginning + # If there's a shebang, insert after it, otherwise at the start + shebang_pattern = r"^#![^\n]+\n" + shebang_match = re.match(shebang_pattern, content) + + if shebang_match: + # Insert after shebang + shebang = shebang_match.group(0) + rest = content[len(shebang) :] + content = shebang + metadata_block + "\n" + rest + else: + # Insert at the beginning + content = metadata_block + "\n" + content + + return content + + +def comment_main_block(content: str) -> tuple[str, bool]: + """ + Comment out the `if __name__ == "__main__"` block. + + Args: + content: The file content. + + Returns: + Tuple of (modified content, whether block was found and commented). + """ + lines = content.split("\n") + main_start = None + + # Find the line with `if __name__ == "__main__":` + for i, line in enumerate(lines): + if re.match(r'^\s*if\s+__name__\s*==\s*["\']__main__["\']\s*:', line): + main_start = i + break + + if main_start is None: + return content, False + + # Find the end of the block (all indented lines after the if statement) + main_end = main_start + 1 + if main_start < len(lines): + # Get the indentation level of the if statement itself + if_line = lines[main_start] + if_indent = len(if_line) - len(if_line.lstrip()) + + # Find all subsequent lines that are indented more than the if statement + # (i.e., the body of the if block) + for i in range(main_start + 1, len(lines)): + line = lines[i] + if not line.strip(): # Empty line, include it + main_end = i + 1 + continue + line_indent = len(line) - len(line.lstrip()) + # Include lines that are indented more than the if statement + if line_indent > if_indent: + main_end = i + 1 + else: + # Hit a line at same or less indentation - end of block + break + + # Comment out all lines in the block + new_lines = lines.copy() + for i in range(main_start, main_end): + if new_lines[i].strip(): # Only comment non-empty lines + new_lines[i] = "# " + new_lines[i] + + return "\n".join(new_lines), True + + +def uncomment_main_block(content: str) -> tuple[str, bool]: + """ + Uncomment the `if __name__ == "__main__"` block. + + Args: + content: The file content. + + Returns: + Tuple of (modified content, whether block was found and uncommented). + """ + lines = content.split("\n") + main_start = None + + # Find the commented line with `if __name__ == "__main__":` + for i, line in enumerate(lines): + stripped = line.strip() + if stripped.startswith("#") and re.match(r'#\s*if\s+__name__\s*==\s*["\']__main__["\']\s*:', stripped): + main_start = i + break + + if main_start is None: + return content, False + + # Find the end of the commented block + main_end = main_start + 1 + if main_start < len(lines): + # Get the indentation level of the commented if statement + commented_if_line = lines[main_start] + # Uncomment temporarily to get original indentation + uncommented_if = commented_if_line.lstrip("# ").lstrip("#") + if_indent = len(uncommented_if) - len(uncommented_if.lstrip()) + + # Find all subsequent commented lines that are indented more than the if statement + for i in range(main_start + 1, len(lines)): + line = lines[i] + if not line.strip(): # Empty line, include it + main_end = i + 1 + continue + if line.strip().startswith("#"): + # Uncomment temporarily to check indentation + uncommented = line.lstrip("# ").lstrip("#") + if uncommented.strip(): + uncommented_indent = len(uncommented) - len(uncommented.lstrip()) + if uncommented_indent > if_indent: + main_end = i + 1 + else: + break + else: + main_end = i + 1 + else: + # Hit a non-commented line - end of block + break + + # Uncomment all lines in the block + new_lines = lines.copy() + for i in range(main_start, main_end): + if new_lines[i].strip().startswith("#"): + # Remove leading "# " or "#" + if new_lines[i].startswith("# "): + new_lines[i] = new_lines[i][2:] + elif new_lines[i].startswith("#"): + new_lines[i] = new_lines[i][1:] + + return "\n".join(new_lines), True diff --git a/packages/notte-sdk/src/notte_sdk/cli/workflow_cli.py b/packages/notte-sdk/src/notte_sdk/cli/workflow_cli.py new file mode 100644 index 000000000..60f3aa7bf --- /dev/null +++ b/packages/notte-sdk/src/notte_sdk/cli/workflow_cli.py @@ -0,0 +1,55 @@ +""" +Helper function for workflow files to enable CLI commands. + +Call this in your `if __name__ == "__main__"` block. If CLI arguments are detected, +it handles them and exits. Otherwise, it returns immediately and your code continues normally. + +Example: +```python +from notte_sdk import NotteClient, workflow, workflow_cli + +notte = NotteClient() + +@workflow(name="My Workflow") +def run(): + ... + +if __name__ == "__main__": + workflow_cli() # Handles CLI or does nothing + # Your custom code continues here if no CLI args + run() +``` + +This allows you to run commands like: +- python workflow_file.py create +- python workflow_file.py run --local +- python workflow_file.py update +- python workflow_file.py run --variables variables.json +""" + +from __future__ import annotations + +import sys + +from notte_sdk.cli import main + + +def workflow_cli() -> None: + """ + CLI entry point for workflow files. + + Call this anywhere in your `if __name__ == "__main__"` block. + If CLI arguments are detected, it handles them and exits. + Otherwise, it returns immediately and your code continues normally. + """ + # Only handle CLI if args are present + if len(sys.argv) > 1: + first_arg = sys.argv[1] + if first_arg in ["create", "update", "run", "--help", "-h"]: + # Handle CLI and exit + # Note: file_path is auto-detected from sys.argv by typer, so we don't need to pass it + main() + sys.exit(0) + + # No CLI args - return immediately, let normal execution continue + return diff --git a/packages/notte-sdk/src/notte_sdk/decorators.py b/packages/notte-sdk/src/notte_sdk/decorators.py new file mode 100644 index 000000000..cca86ad69 --- /dev/null +++ b/packages/notte-sdk/src/notte_sdk/decorators.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +import functools +from typing import TYPE_CHECKING, Any, Callable, ParamSpec, TypeVar + +if TYPE_CHECKING: + pass + +P = ParamSpec("P") +R = TypeVar("R") + + +def workflow(name: str, description: str | None = None) -> Callable[[Callable[P, R]], Callable[P, R]]: + """ + Decorator to mark a function as a Notte workflow. + + This decorator stores metadata on the function that can be used by the CLI + to manage the workflow lifecycle (create, update, run). + + Args: + name: The name of the workflow. + description: Optional description of the workflow. + + Example: + ```python + from notte_sdk import workflow + + @workflow(name="My Workflow", description="Does something useful") + def run(url: str, query: str) -> str: + # workflow code here + return "result" + ``` + """ + + def decorator(func: Callable[P, R]) -> Callable[P, R]: + # Store metadata on the function + func.__workflow_name__ = name # type: ignore[attr-defined] + func.__workflow_description__ = description # type: ignore[attr-defined] + func.__is_workflow__ = True # type: ignore[attr-defined] + + # Preserve function signature and behavior + @functools.wraps(func) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + return func(*args, **kwargs) + + # Handle async functions + import inspect + + if inspect.iscoroutinefunction(func): + + @functools.wraps(func) + async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + return await func(*args, **kwargs) + + # Copy metadata to async wrapper + async_wrapper.__workflow_name__ = name # type: ignore[attr-defined] + async_wrapper.__workflow_description__ = description # type: ignore[attr-defined] + async_wrapper.__is_workflow__ = True # type: ignore[attr-defined] + return async_wrapper # type: ignore[return-value] + + return wrapper + + return decorator + + +def is_workflow(func: Callable[..., Any]) -> bool: + """ + Check if a function is decorated with @workflow. + + Args: + func: The function to check. + + Returns: + True if the function is a workflow, False otherwise. + """ + return hasattr(func, "__is_workflow__") and getattr(func, "__is_workflow__", False) + + +def get_workflow_name(func: Callable[..., Any]) -> str | None: + """ + Get the workflow name from a decorated function. + + Args: + func: The workflow function. + + Returns: + The workflow name, or None if not found. + """ + return getattr(func, "__workflow_name__", None) + + +def get_workflow_description(func: Callable[..., Any]) -> str | None: + """ + Get the workflow description from a decorated function. + + Args: + func: The workflow function. + + Returns: + The workflow description, or None if not found. + """ + return getattr(func, "__workflow_description__", None) diff --git a/uv.lock b/uv.lock index ad8c171e2..18c63bdbe 100644 --- a/uv.lock +++ b/uv.lock @@ -3115,6 +3115,7 @@ source = { editable = "packages/notte-sdk" } dependencies = [ { name = "halo" }, { name = "notte-core" }, + { name = "typer" }, { name = "websockets" }, ] @@ -3128,6 +3129,7 @@ requires-dist = [ { name = "halo", specifier = ">=0.0.28" }, { name = "notte-core", editable = "packages/notte-core" }, { name = "playwright", marker = "extra == 'playwright'", specifier = "~=1.55" }, + { name = "typer", specifier = ">=0.9.0" }, { name = "websockets", specifier = ">=13.1" }, ] provides-extras = ["playwright"] From cb1655fadc6a8e0190a1848ad0e4cd2890e8d05a Mon Sep 17 00:00:00 2001 From: Lucas Giordano Date: Sun, 2 Nov 2025 18:34:37 +0100 Subject: [PATCH 2/6] simplfy code --- .../notte-sdk/src/notte_sdk/cli/__init__.py | 399 +++++++----------- 1 file changed, 142 insertions(+), 257 deletions(-) diff --git a/packages/notte-sdk/src/notte_sdk/cli/__init__.py b/packages/notte-sdk/src/notte_sdk/cli/__init__.py index 024a1d5b6..d55e11ad7 100644 --- a/packages/notte-sdk/src/notte_sdk/cli/__init__.py +++ b/packages/notte-sdk/src/notte_sdk/cli/__init__.py @@ -1,8 +1,8 @@ from __future__ import annotations +import contextlib import importlib.util import json -import os import sys from pathlib import Path from typing import Annotated, Any @@ -28,25 +28,34 @@ ) +# Common argument definitions def get_default_file_path() -> Path: """Get the default file path from sys.argv[0] if running from a workflow file.""" if len(sys.argv) > 1 and sys.argv[1] in ["create", "update", "run"]: - # Running as: python workflow_file.py create return Path(sys.argv[0]).resolve() - # Default fallback - will raise error if not provided raise typer.BadParameter("File path is required when not running from a workflow file") -def find_workflow_function(module: Any) -> tuple[Any, str] | None: - """ - Find the workflow function in a module. +FILE_ARG = Annotated[ + Path, + typer.Argument( + help="Path to the workflow Python file", + default_factory=get_default_file_path, + ), +] - Args: - module: The imported module. +API_KEY_ARG = Annotated[ + str | None, + typer.Option( + "--api-key", + help="Notte API key (defaults to NOTTE_API_KEY environment variable)", + envvar="NOTTE_API_KEY", + ), +] - Returns: - Tuple of (function, function_name) if found, None otherwise. - """ + +def find_workflow_function(module: Any) -> tuple[Any, str] | None: + """Find the workflow function in a module.""" for name in dir(module): obj = getattr(module, name) if callable(obj) and is_workflow(obj): @@ -58,130 +67,83 @@ def load_workflow_file(file_path: Path) -> tuple[Any, str, Any]: """ Load a workflow file and find the workflow function. - Args: - file_path: Path to the workflow Python file. - - Returns: - Tuple of (module, function_name, function). - - Raises: - ValueError: If no workflow function is found. + Sets __name__ to prevent execution of if __name__ == "__main__" blocks. """ - # Set __name__ to something other than "__main__" to prevent execution of if __name__ == "__main__" block - import sys - - original_argv = sys.argv.copy() - try: - # Temporarily modify sys.argv to prevent the workflow from thinking it's being run directly - # Save original to restore later - spec = importlib.util.spec_from_file_location("workflow_module", file_path) - if spec is None or spec.loader is None: - raise ValueError(f"Could not load module from {file_path}") - - module = importlib.util.module_from_spec(spec) - # Set __name__ to the module name (not "__main__") so if __name__ == "__main__" blocks don't execute - module.__name__ = spec.name - spec.loader.exec_module(module) - - result = find_workflow_function(module) - if result is None: - raise ValueError( - f"No workflow function found in {file_path}. Make sure to decorate a function with @workflow." - ) - - func, func_name = result - return module, func_name, func - finally: - # Restore original sys.argv - sys.argv = original_argv - - -def get_api_key(api_key: str | None = None) -> str: - """ - Get API key from argument or environment variable. - - Args: - api_key: Optional API key from CLI argument. - - Returns: - The API key. + spec = importlib.util.spec_from_file_location("workflow_module", file_path) + if spec is None or spec.loader is None: + raise typer.BadParameter(f"Could not load module from {file_path}") + + module = importlib.util.module_from_spec(spec) + # Set __name__ to module name (not "__main__") to prevent __main__ block execution + module.__name__ = spec.name + spec.loader.exec_module(module) + + result = find_workflow_function(module) + if result is None: + raise typer.BadParameter( + f"No workflow function found in {file_path}. Make sure to decorate a function with @workflow." + ) - Raises: - ValueError: If no API key is found. - """ - if api_key: - return api_key - api_key = os.getenv("NOTTE_API_KEY") - if not api_key: - raise ValueError("NOTTE_API_KEY not found. Set it in environment or use --api-key flag.") - return api_key + func, func_name = result + return module, func_name, func -def prepare_workflow_file_for_upload(file_path: Path) -> tuple[Path, bool]: +@contextlib.contextmanager +def workflow_file_for_upload(file_path: Path): """ - Prepare a workflow file for upload by commenting out the __main__ block. - - Args: - file_path: Path to the workflow file. + Context manager for preparing workflow file for upload. - Returns: - Tuple of (temp_file_path, was_commented) where temp_file_path is the path to - the temporary file with commented __main__ block, and was_commented indicates - if the block was found and commented. + Comments out __main__ block, yields temp file, then restores original. """ - # Read current content content = file_path.read_text(encoding="utf-8") - - # Comment out __main__ block - content, commented = comment_main_block(content) - if commented: + content, was_commented = comment_main_block(content) + if was_commented: logger.debug("Commented out __main__ block for upload") - # Write temporary file with .py extension (API requires .py files) temp_file = file_path.parent / f".{file_path.stem}_temp{file_path.suffix}" _ = temp_file.write_text(content, encoding="utf-8") - return temp_file, commented - - -def restore_workflow_file(file_path: Path, was_commented: bool) -> None: - """ - Restore the workflow file by uncommenting the __main__ block if needed. - - Args: - file_path: Path to the workflow file. - was_commented: Whether the __main__ block was commented out. - """ - if was_commented: - # Read content again - content = file_path.read_text(encoding="utf-8") - - # Uncomment __main__ block - content, _ = uncomment_main_block(content) - - # Write back to file - _ = file_path.write_text(content, encoding="utf-8") + try: + yield temp_file + finally: + # Clean up temp file + if temp_file.exists(): + temp_file.unlink() + # Restore __main__ block if it was commented + if was_commented: + content = file_path.read_text(encoding="utf-8") + content, _ = uncomment_main_block(content) + _ = file_path.write_text(content, encoding="utf-8") + + +def get_workflow_metadata(file_path: Path, require_id: bool = False) -> WorkflowMetadata: + """Get workflow metadata from file, raising typer errors if invalid.""" + metadata = WorkflowMetadata.from_file(file_path) + if not metadata: + raise typer.BadParameter("No workflow metadata found. Run 'create' command first.") + if require_id: + if not metadata.workflow_id: + raise typer.BadParameter("No workflow ID found. Run 'create' command first.") + # Type narrowing: at this point workflow_id is guaranteed to be str + assert metadata.workflow_id is not None + return metadata + + +def update_metadata_in_file(file_path: Path, metadata: WorkflowMetadata) -> None: + """Update metadata block in workflow file.""" + content = file_path.read_text(encoding="utf-8") + content = insert_metadata_block(content, metadata) + _ = file_path.write_text(content, encoding="utf-8") @app.command() def create( - file: Annotated[ - Path, - typer.Argument( - help="Path to the workflow Python file", - default_factory=get_default_file_path, - ), - ], - api_key: Annotated[ - str | None, typer.Option("--api-key", help="Notte API key (defaults to NOTTE_API_KEY environment variable") - ] = None, + file: FILE_ARG, + api_key: API_KEY_ARG = None, ) -> None: """Create a new workflow.""" - try: - api_key = get_api_key(api_key) - except ValueError as e: - logger.error(str(e)) - raise typer.Exit(1) + if not api_key: + raise typer.BadParameter("NOTTE_API_KEY not found. Set it in environment or use --api-key flag.") logger.info(f"Creating workflow from {file}") @@ -193,98 +155,58 @@ def create( description = get_workflow_description(func) if not name: - logger.error("Workflow name is required. Set it in the @workflow decorator.") - raise typer.Exit(1) + raise typer.BadParameter("Workflow name is required. Set it in the @workflow decorator.") # Check if metadata already exists existing_metadata = WorkflowMetadata.from_file(file) if existing_metadata and existing_metadata.workflow_id: - logger.error( + raise typer.BadParameter( f"Workflow already exists with ID: {existing_metadata.workflow_id}. Use 'update' command to update it." ) - raise typer.Exit(1) - try: - # Prepare file for upload (comment out __main__ block) - temp_file, was_commented = prepare_workflow_file_for_upload(file) - - try: - # Create client and workflow - client = NotteClient(api_key=api_key) - workflow_obj = client.Workflow( - workflow_path=str(temp_file), name=name, description=description, _client=client - ) - - logger.info(f"Workflow created with ID: {workflow_obj.workflow_id}") - - # Create metadata and insert into file - metadata = WorkflowMetadata( - workflow_id=workflow_obj.workflow_id, - name=name, - description=description, - author=get_git_author(file), - creation_date=workflow_obj.response.created_at.isoformat(), - last_update_date=workflow_obj.response.updated_at.isoformat(), - ) - - # Read current content - content = file.read_text(encoding="utf-8") - - # Insert metadata block - content = insert_metadata_block(content, metadata) - - # Write back to file - _ = file.write_text(content, encoding="utf-8") - - # Restore __main__ block if it was commented - restore_workflow_file(file, was_commented) - - logger.info(f"Metadata block added to {file}") - logger.info(f"You can reference this workflow using: notte.Workflow('{workflow_obj.workflow_id}')") - finally: - # Clean up temp file - if temp_file.exists(): - temp_file.unlink() - except Exception as e: - logger.error(f"Error: {e}") - raise typer.Exit(1) + with workflow_file_for_upload(file) as temp_file: + # Create client and workflow + client = NotteClient(api_key=api_key) + workflow_obj = client.Workflow(workflow_path=str(temp_file), name=name, description=description, _client=client) + + logger.info(f"Workflow created with ID: {workflow_obj.workflow_id}") + + # Create metadata and insert into file + metadata = WorkflowMetadata( + workflow_id=workflow_obj.workflow_id, + name=name, + description=description, + author=get_git_author(file), + creation_date=workflow_obj.response.created_at.isoformat(), + last_update_date=workflow_obj.response.updated_at.isoformat(), + ) + + update_metadata_in_file(file, metadata) + + logger.info(f"Metadata block added to {file}") + logger.info(f"You can reference this workflow using: notte.Workflow('{workflow_obj.workflow_id}')") @app.command() def update( - file: Annotated[ - Path, - typer.Argument( - help="Path to the workflow Python file", - default_factory=get_default_file_path, - ), - ], - api_key: Annotated[ - str | None, typer.Option("--api-key", help="Notte API key (defaults to NOTTE_API_KEY environment variable") - ] = None, + file: FILE_ARG, + api_key: API_KEY_ARG = None, restricted: Annotated[ bool, typer.Option("--restricted/--no-restricted", help="Run workflow in restricted mode") ] = True, ) -> None: """Update an existing workflow.""" - try: - api_key = get_api_key(api_key) - except ValueError as e: - logger.error(str(e)) - raise typer.Exit(1) + if not api_key: + raise typer.BadParameter("NOTTE_API_KEY not found. Set it in environment or use --api-key flag.") logger.info(f"Updating workflow from {file}") # Read metadata - metadata = WorkflowMetadata.from_file(file) - if not metadata or not metadata.workflow_id: - logger.error("No workflow metadata found. Run 'create' command first to create the workflow.") - raise typer.Exit(1) + metadata = get_workflow_metadata(file, require_id=True) + # Type narrowing: workflow_id is guaranteed to be str when require_id=True + assert metadata.workflow_id is not None - # Prepare file for upload (comment out __main__ block) - temp_file, was_commented = prepare_workflow_file_for_upload(file) - - try: + with workflow_file_for_upload(file) as temp_file: # Update workflow client = NotteClient(api_key=api_key) workflow_obj = client.Workflow(workflow_id=metadata.workflow_id, _client=client) @@ -294,41 +216,15 @@ def update( # Update metadata metadata.last_update_date = workflow_obj.response.updated_at.isoformat() - - # Read content again (may have been modified) - content = file.read_text(encoding="utf-8") - - # Insert metadata block - content = insert_metadata_block(content, metadata) - - # Write back to file - _ = file.write_text(content, encoding="utf-8") - - # Restore __main__ block if it was commented - restore_workflow_file(file, was_commented) + update_metadata_in_file(file, metadata) logger.info(f"Metadata updated in {file}") - except Exception as e: - logger.error(f"Error: {e}") - raise typer.Exit(1) - finally: - # Clean up temp file - if temp_file.exists(): - temp_file.unlink() @app.command() def run( - file: Annotated[ - Path, - typer.Argument( - help="Path to the workflow Python file", - default_factory=get_default_file_path, - ), - ], - api_key: Annotated[ - str | None, typer.Option("--api-key", help="Notte API key (defaults to NOTTE_API_KEY environment variable") - ] = None, + file: FILE_ARG, + api_key: API_KEY_ARG = None, local: Annotated[bool, typer.Option("--local", help="Run workflow locally instead of on cloud")] = False, variables: Annotated[ Path | None, typer.Option("--variables", help="JSON file containing workflow variables") @@ -344,51 +240,40 @@ def run( """Run a workflow.""" if local: logger.info(f"Running workflow locally from {file}") - # For local runs, just execute the file - # This mimics: uv run python workflow_code.py import subprocess result = subprocess.run([sys.executable, str(file)], check=False) raise typer.Exit(result.returncode) - else: - try: - api_key = get_api_key(api_key) - except ValueError as e: - logger.error(str(e)) - raise typer.Exit(1) - - logger.info(f"Running workflow on cloud from {file}") - - # Read metadata - metadata = WorkflowMetadata.from_file(file) - if not metadata or not metadata.workflow_id: - logger.error("No workflow metadata found. Run 'create' command first to create the workflow.") - raise typer.Exit(1) - - # Load variables if provided - variables_dict: dict[str, Any] = {} - if variables: - if not variables.exists(): - logger.error(f"Variables file not found: {variables}") - raise typer.Exit(1) - variables_dict = json.loads(variables.read_text(encoding="utf-8")) - - try: - # Run workflow - client = NotteClient(api_key=api_key) - workflow_obj = client.Workflow(workflow_id=metadata.workflow_id, _client=client) - result = workflow_obj.run( - timeout=timeout, - stream=stream, - raise_on_failure=raise_on_failure, - **variables_dict, - ) - - logger.info(f"Workflow completed with status: {result.status}") - logger.info(f"Result: {result.result}") - except Exception as e: - logger.error(f"Error: {e}") - raise typer.Exit(1) + + if not api_key: + raise typer.BadParameter("NOTTE_API_KEY not found. Set it in environment or use --api-key flag.") + + logger.info(f"Running workflow on cloud from {file}") + + # Read metadata + metadata = get_workflow_metadata(file, require_id=True) + # Type narrowing: workflow_id is guaranteed to be str when require_id=True + assert metadata.workflow_id is not None + + # Load variables if provided + variables_dict: dict[str, Any] = {} + if variables: + if not variables.exists(): + raise typer.BadParameter(f"Variables file not found: {variables}") + variables_dict = json.loads(variables.read_text(encoding="utf-8")) + + # Run workflow + client = NotteClient(api_key=api_key) + workflow_obj = client.Workflow(workflow_id=metadata.workflow_id, _client=client) + result = workflow_obj.run( + timeout=timeout, + stream=stream, + raise_on_failure=raise_on_failure, + **variables_dict, + ) + + logger.info(f"Workflow completed with status: {result.status}") + logger.info(f"Result: {result.result}") def main(_file_path: Path | None = None) -> None: From 0424ba79f6183eaeef71f9615ff6b1bdc56f6bad Mon Sep 17 00:00:00 2001 From: Lucas Giordano Date: Sun, 2 Nov 2025 20:33:19 +0100 Subject: [PATCH 3/6] add benchmark // and server url dectection --- packages/notte-core/src/notte_core/ast.py | 2 + packages/notte-sdk/pyproject.toml | 1 + .../notte-sdk/src/notte_sdk/cli/__init__.py | 375 +++++++++++++++++- .../src/notte_sdk/cli/workflow_cli.py | 7 +- .../notte-sdk/src/notte_sdk/decorators.py | 10 +- uv.lock | 2 + 6 files changed, 385 insertions(+), 12 deletions(-) diff --git a/packages/notte-core/src/notte_core/ast.py b/packages/notte-core/src/notte_core/ast.py index 6001ea586..289286b31 100644 --- a/packages/notte-core/src/notte_core/ast.py +++ b/packages/notte-core/src/notte_core/ast.py @@ -71,6 +71,7 @@ class ScriptValidator(RestrictingNodeTransformer): "notte_sdk", "notte_agent", "notte_core", + "notte_llm", # Safe third-party "pydantic", # Data validation library "loguru", # Logging library @@ -80,6 +81,7 @@ class ScriptValidator(RestrictingNodeTransformer): "gspread", "google", "litellm", + "tqdm", # Safe standard library modules - data processing and utilities "types", "json", # JSON parsing diff --git a/packages/notte-sdk/pyproject.toml b/packages/notte-sdk/pyproject.toml index 3ed60c85d..42406f4f1 100644 --- a/packages/notte-sdk/pyproject.toml +++ b/packages/notte-sdk/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "notte-core==1.4.4.dev", "websockets>=13.1", "typer>=0.9.0", + "tqdm>=4.66.0", ] [project.optional-dependencies] diff --git a/packages/notte-sdk/src/notte_sdk/cli/__init__.py b/packages/notte-sdk/src/notte_sdk/cli/__init__.py index d55e11ad7..f0e4fc6dc 100644 --- a/packages/notte-sdk/src/notte_sdk/cli/__init__.py +++ b/packages/notte-sdk/src/notte_sdk/cli/__init__.py @@ -4,11 +4,14 @@ import importlib.util import json import sys +import time +from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path -from typing import Annotated, Any +from typing import Annotated, Any, Callable import typer from notte_core.common.logging import logger +from tqdm import tqdm from notte_sdk.cli.metadata import ( WorkflowMetadata, @@ -31,7 +34,7 @@ # Common argument definitions def get_default_file_path() -> Path: """Get the default file path from sys.argv[0] if running from a workflow file.""" - if len(sys.argv) > 1 and sys.argv[1] in ["create", "update", "run"]: + if len(sys.argv) > 1 and sys.argv[1] in ["create", "update", "run", "benchmark"]: return Path(sys.argv[0]).resolve() raise typer.BadParameter("File path is required when not running from a workflow file") @@ -53,13 +56,51 @@ def get_default_file_path() -> Path: ), ] +SERVER_URL_ARG = Annotated[ + str | None, + typer.Option( + "--server-url", + help="Notte API server URL (defaults to NOTTE_API_URL environment variable)", + envvar="NOTTE_API_URL", + ), +] + def find_workflow_function(module: Any) -> tuple[Any, str] | None: - """Find the workflow function in a module.""" + """Find the workflow function in a module. + + First tries to find a function with @workflow decorator. + If not found, tries to find a function named 'run' or the only function in the module. + """ + # First, try to find a function with @workflow decorator for name in dir(module): obj = getattr(module, name) if callable(obj) and is_workflow(obj): return obj, name + + # If no decorated function found, try to find 'run' function + if hasattr(module, "run"): + obj = getattr(module, "run") + if callable(obj) and not obj.__name__.startswith("_"): # Skip private functions + return obj, "run" + + # Last resort: if there's only one callable function, use it + callable_functions: list[tuple[Callable[..., Any], str]] = [] + for name in dir(module): + obj = getattr(module, name) + if ( + callable(obj) + and not name.startswith("_") + and not isinstance(obj, type) # Skip classes + and not isinstance(obj, type(__builtins__)) # Skip builtins + ): + # Skip common non-workflow functions + if name not in ["main", "app", "cli"]: + callable_functions.append((obj, name)) + + if len(callable_functions) == 1: + return callable_functions[0] + return None @@ -81,7 +122,8 @@ def load_workflow_file(file_path: Path) -> tuple[Any, str, Any]: result = find_workflow_function(module) if result is None: raise typer.BadParameter( - f"No workflow function found in {file_path}. Make sure to decorate a function with @workflow." + f"No workflow function found in {file_path}. " + + "Either decorate a function with @workflow, name it 'run', or ensure there's only one function in the file." ) func, func_name = result @@ -140,6 +182,7 @@ def update_metadata_in_file(file_path: Path, metadata: WorkflowMetadata) -> None def create( file: FILE_ARG, api_key: API_KEY_ARG = None, + server_url: SERVER_URL_ARG = None, ) -> None: """Create a new workflow.""" if not api_key: @@ -150,12 +193,20 @@ def create( # Load the workflow function _module, _func_name, func = load_workflow_file(file) - # Get workflow metadata from decorator + # Get workflow metadata from decorator (if present) name = get_workflow_name(func) description = get_workflow_description(func) + # If no decorator found, prompt interactively if not name: - raise typer.BadParameter("Workflow name is required. Set it in the @workflow decorator.") + # Suggest a default name based on file name + default_name = file.stem.replace("_", " ").title() + + logger.info("No @workflow decorator found. Please provide workflow metadata:") + name = typer.prompt("Workflow name", default=default_name) + description = typer.prompt("Workflow description (optional)", default="", show_default=False) + if not description.strip(): + description = None # Check if metadata already exists existing_metadata = WorkflowMetadata.from_file(file) @@ -166,7 +217,7 @@ def create( with workflow_file_for_upload(file) as temp_file: # Create client and workflow - client = NotteClient(api_key=api_key) + client = NotteClient(api_key=api_key, server_url=server_url) workflow_obj = client.Workflow(workflow_path=str(temp_file), name=name, description=description, _client=client) logger.info(f"Workflow created with ID: {workflow_obj.workflow_id}") @@ -191,6 +242,7 @@ def create( def update( file: FILE_ARG, api_key: API_KEY_ARG = None, + server_url: SERVER_URL_ARG = None, restricted: Annotated[ bool, typer.Option("--restricted/--no-restricted", help="Run workflow in restricted mode") ] = True, @@ -208,7 +260,7 @@ def update( with workflow_file_for_upload(file) as temp_file: # Update workflow - client = NotteClient(api_key=api_key) + client = NotteClient(api_key=api_key, server_url=server_url) workflow_obj = client.Workflow(workflow_id=metadata.workflow_id, _client=client) workflow_obj.update(workflow_path=str(temp_file), restricted=restricted) @@ -225,6 +277,7 @@ def update( def run( file: FILE_ARG, api_key: API_KEY_ARG = None, + server_url: SERVER_URL_ARG = None, local: Annotated[bool, typer.Option("--local", help="Run workflow locally instead of on cloud")] = False, variables: Annotated[ Path | None, typer.Option("--variables", help="JSON file containing workflow variables") @@ -263,7 +316,7 @@ def run( variables_dict = json.loads(variables.read_text(encoding="utf-8")) # Run workflow - client = NotteClient(api_key=api_key) + client = NotteClient(api_key=api_key, server_url=server_url) workflow_obj = client.Workflow(workflow_id=metadata.workflow_id, _client=client) result = workflow_obj.run( timeout=timeout, @@ -276,6 +329,310 @@ def run( logger.info(f"Result: {result.result}") +@app.command() +def benchmark( + file: FILE_ARG, + api_key: API_KEY_ARG = None, + server_url: SERVER_URL_ARG = None, + local: Annotated[bool, typer.Option("--local", help="Run workflow locally instead of on cloud")] = False, + iterations: Annotated[int, typer.Option("--iterations", help="Maximum number of iterations to run")] = 10, + timeout: Annotated[int, typer.Option("--timeout", help="Timeout in minutes for the entire benchmark")] = 20, + parallelism: Annotated[int, typer.Option("--parallelism", help="Number of parallel runs (default: 1)")] = 1, + variables: Annotated[ + Path | None, typer.Option("--variables", help="JSON file containing workflow variables") + ] = None, +) -> None: + """Run a benchmark test with multiple iterations of the workflow.""" + timeout_seconds = timeout * 60 # Convert minutes to seconds + + # Interactive prompts if running without flags + # Check if any benchmark-specific flags were provided + benchmark_flags = ["--local", "--iterations", "--timeout", "--variables", "--parallelism"] + has_flags = any(flag in sys.argv for flag in benchmark_flags) + + if not has_flags: + logger.info("Running benchmark interactively. Press Enter to use defaults.") + logger.info("") + + # Prompt for local vs cloud + while True: + local_input = ( + typer.prompt( + "Run locally or on cloud? [local/cloud]", + default="cloud", + ) + .strip() + .lower() + ) + if local_input in ["local", "cloud"]: + local = local_input == "local" + break + logger.error("Please enter 'local' or 'cloud'") + + # Prompt for iterations + iterations = typer.prompt("Number of iterations", default=10, type=int) + + # Prompt for timeout + timeout = typer.prompt("Timeout in minutes", default=20, type=int) + + # Prompt for parallelism + parallelism = typer.prompt("Parallelism level (number of parallel runs)", default=1, type=int) + if parallelism < 1: + parallelism = 1 + if parallelism > iterations: + parallelism = iterations + logger.warning(f"Parallelism reduced to {iterations} (cannot exceed number of iterations)") + + # Prompt for variables file (optional) + variables_input = typer.prompt( + "Variables file path (optional, press Enter to skip)", + default="", + show_default=False, + ) + if variables_input.strip(): + variables = Path(variables_input.strip()) + if not variables.exists(): + raise typer.BadParameter(f"Variables file not found: {variables}") + else: + variables = None + + logger.info("") + + # Validate parallelism + if parallelism < 1: + parallelism = 1 + if parallelism > iterations: + parallelism = iterations + logger.warning(f"Parallelism reduced to {iterations} (cannot exceed number of iterations)") + + if local: + logger.info( + f"Running benchmark locally from {file} ({iterations} iterations, {timeout} min timeout, parallelism={parallelism})" + ) + else: + if not api_key: + raise typer.BadParameter("NOTTE_API_KEY not found. Set it in environment or use --api-key flag.") + logger.info( + f"Running benchmark on cloud from {file} ({iterations} iterations, {timeout} min timeout, parallelism={parallelism})" + ) + + # Read metadata for cloud runs + metadata: WorkflowMetadata | None = None + workflow_obj: Any | None = None + if not local: + metadata = get_workflow_metadata(file, require_id=True) + assert metadata.workflow_id is not None + client = NotteClient(api_key=api_key, server_url=server_url) + workflow_obj = client.Workflow(workflow_id=metadata.workflow_id, _client=client) + + # Load variables if provided + variables_dict: dict[str, Any] = {} + if variables: + if not variables.exists(): + raise typer.BadParameter(f"Variables file not found: {variables}") + variables_dict = json.loads(variables.read_text(encoding="utf-8")) + + # Helper function to run a single iteration + def run_iteration(iteration_num: int) -> dict[str, Any]: + """Run a single benchmark iteration.""" + iteration_start = time.time() + try: + if local: + # Run locally + import subprocess + + result = subprocess.run( + [sys.executable, str(file)], + check=False, + capture_output=True, + text=True, + ) + iteration_end = time.time() + execution_time = iteration_end - iteration_start + success = result.returncode == 0 + run_id = f"local-{iteration_num}" + status = "closed" if success else "failed" + else: + # Run on cloud + assert workflow_obj is not None + assert metadata is not None + result = workflow_obj.run( + timeout=None, # Use default timeout per run + stream=False, + raise_on_failure=False, # Don't raise for benchmark + **variables_dict, + ) + iteration_end = time.time() + execution_time = iteration_end - iteration_start + success = result.status == "closed" + run_id = result.workflow_run_id + status = result.status + + return { + "iteration": iteration_num, + "success": success, + "execution_time": execution_time, + "run_id": run_id, + "status": status, + "workflow_id": metadata.workflow_id if metadata else None, + } + except Exception as e: + iteration_end = time.time() + execution_time = iteration_end - iteration_start + logger.error(f"\nIteration {iteration_num} failed with exception: {e}") + return { + "iteration": iteration_num, + "success": False, + "execution_time": execution_time, + "run_id": f"error-{iteration_num}", + "status": "failed", + "workflow_id": metadata.workflow_id if metadata else None, + } + + # Benchmark results + results: list[dict[str, Any]] = [] + start_time = time.time() + + # Create progress bar + pbar = tqdm( + total=iterations, + desc="Benchmark progress", + unit="run", + bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]", + ) + + try: + if parallelism == 1: + # Sequential execution (original behavior) + for i in range(iterations): + # Check if we've exceeded the timeout + elapsed = time.time() - start_time + if elapsed >= timeout_seconds: + pbar.close() + logger.info(f"\nTimeout reached ({timeout} min). Stopping benchmark after {i} iterations.") + break + + # Update progress bar description with current iteration + pbar.set_description(f"Running iteration {i + 1}/{iterations}") + result = run_iteration(i + 1) + results.append(result) + + # Update progress bar with result + status_icon = "✅" if result["success"] else "❌" + pbar.set_postfix_str(f"{status_icon} {result['execution_time']:.2f}s") + _ = pbar.update(1) + else: + # Parallel execution + with ThreadPoolExecutor(max_workers=parallelism) as executor: + # Submit all tasks + future_to_iteration = {executor.submit(run_iteration, i + 1): i + 1 for i in range(iterations)} + + # Process completed futures as they finish + for future in as_completed(future_to_iteration): + # Check if we've exceeded the timeout + elapsed = time.time() - start_time + if elapsed >= timeout_seconds: + pbar.close() + logger.info(f"\nTimeout reached ({timeout} min). Cancelling remaining tasks...") + # Cancel remaining futures + for f in future_to_iteration: + _ = f.cancel() + break + + try: + result = future.result() + results.append(result) + + # Update progress bar with result + status_icon = "✅" if result["success"] else "❌" + pbar.set_postfix_str(f"{status_icon} {result['execution_time']:.2f}s") + _ = pbar.update(1) + except Exception as e: + iteration_num = future_to_iteration[future] + logger.error(f"\nIteration {iteration_num} failed with exception: {e}") + results.append( + { + "iteration": iteration_num, + "success": False, + "execution_time": 0.0, + "run_id": f"error-{iteration_num}", + "status": "failed", + "workflow_id": metadata.workflow_id if metadata else None, + } + ) + _ = pbar.update(1) + + # Sort results by iteration number for consistent display + results.sort(key=lambda x: x["iteration"]) + + finally: + pbar.close() + logger.info("") # New line after progress bar + + # Calculate summary statistics + total_runs = len(results) + successful_runs = sum(1 for r in results if r["success"]) + failed_runs = total_runs - successful_runs + success_rate = (successful_runs / total_runs * 100) if total_runs > 0 else 0.0 + + # Calculate average execution time for successful runs, or failed runs if all failed + if successful_runs > 0: + avg_execution_time = sum(r["execution_time"] for r in results if r["success"]) / successful_runs + elif failed_runs > 0: + avg_execution_time = sum(r["execution_time"] for r in results if not r["success"]) / failed_runs + else: + avg_execution_time = 0.0 + + # Use consistent width for all separators + # Table columns: Status (8) + Time (10) + Run ID (40) + Console URL (40) + 3 spaces = 101 chars + separator_width = 101 + separator_double = "=" * separator_width + separator_single = "-" * separator_width + + # Display summary + logger.info("") + logger.info(separator_double) + logger.info("BENCHMARK SUMMARY") + logger.info(separator_double) + logger.info(f"Total runs: {total_runs}") + logger.info(f"Successful: {successful_runs}") + logger.info(f"Failed: {failed_runs}") + logger.info(f"Success rate: {success_rate:.1f}%") + logger.info(f"Average execution time: {avg_execution_time:.2f}s") + logger.info(f"Total benchmark time: {time.time() - start_time:.2f}s") + logger.info(separator_double) + + # Display results table + logger.info("") + logger.info("Detailed Results:") + logger.info(separator_single) + + # Table header + header = f"{'Status':<8} {'Time':<10} {'Run ID':<40} {'Console URL':<40}" + logger.info(header) + logger.info(separator_single) + + for r in results: + status_icon = "✅" if r["success"] else "❌" + execution_time_str = f"{r['execution_time']:.2f}s" + run_id_str = r["run_id"][:38] # Truncate if too long + + # Build console URL + if r["workflow_id"] and not local: + console_url = f"https://console.notte.cc/logs/workflows/{r['workflow_id']}/runs/{r['run_id']}" + else: + console_url = "N/A (local run)" + + row = f"{status_icon:<8} {execution_time_str:<10} {run_id_str:<40} {console_url[:38]:<40}" + logger.info(row) + + logger.info(separator_single) + + # Exit with error code if all runs failed + if failed_runs == total_runs and total_runs > 0: + raise typer.Exit(1) + + def main(_file_path: Path | None = None) -> None: """ Main CLI entry point. diff --git a/packages/notte-sdk/src/notte_sdk/cli/workflow_cli.py b/packages/notte-sdk/src/notte_sdk/cli/workflow_cli.py index 60f3aa7bf..54b02c586 100644 --- a/packages/notte-sdk/src/notte_sdk/cli/workflow_cli.py +++ b/packages/notte-sdk/src/notte_sdk/cli/workflow_cli.py @@ -25,6 +25,11 @@ def run(): - python workflow_file.py run --local - python workflow_file.py update - python workflow_file.py run --variables variables.json +- python workflow_file.py benchmark --iterations 10 --timeout 20 +- python workflow_file.py benchmark --local --iterations 5 + +Note: The @workflow decorator is optional. If you don't use it, the CLI will +prompt for workflow name and description during creation. """ from __future__ import annotations @@ -45,7 +50,7 @@ def workflow_cli() -> None: # Only handle CLI if args are present if len(sys.argv) > 1: first_arg = sys.argv[1] - if first_arg in ["create", "update", "run", "--help", "-h"]: + if first_arg in ["create", "update", "run", "benchmark", "--help", "-h"]: # Handle CLI and exit # Note: file_path is auto-detected from sys.argv by typer, so we don't need to pass it main() diff --git a/packages/notte-sdk/src/notte_sdk/decorators.py b/packages/notte-sdk/src/notte_sdk/decorators.py index cca86ad69..2e1834ed5 100644 --- a/packages/notte-sdk/src/notte_sdk/decorators.py +++ b/packages/notte-sdk/src/notte_sdk/decorators.py @@ -14,8 +14,9 @@ def workflow(name: str, description: str | None = None) -> Callable[[Callable[P, """ Decorator to mark a function as a Notte workflow. - This decorator stores metadata on the function that can be used by the CLI - to manage the workflow lifecycle (create, update, run). + This decorator is optional. If present, it stores metadata on the function that can be used + by the CLI to manage the workflow lifecycle (create, update, run). If not present, the CLI + will prompt for workflow name and description during creation. Args: name: The name of the workflow. @@ -30,6 +31,11 @@ def run(url: str, query: str) -> str: # workflow code here return "result" ``` + + Note: + The decorator is optional. You can also create workflows without it - the CLI will + prompt for name and description interactively, or detect the workflow function by + looking for a function named 'run' or the only function in the file. """ def decorator(func: Callable[P, R]) -> Callable[P, R]: diff --git a/uv.lock b/uv.lock index 18c63bdbe..f0497509f 100644 --- a/uv.lock +++ b/uv.lock @@ -3115,6 +3115,7 @@ source = { editable = "packages/notte-sdk" } dependencies = [ { name = "halo" }, { name = "notte-core" }, + { name = "tqdm" }, { name = "typer" }, { name = "websockets" }, ] @@ -3129,6 +3130,7 @@ requires-dist = [ { name = "halo", specifier = ">=0.0.28" }, { name = "notte-core", editable = "packages/notte-core" }, { name = "playwright", marker = "extra == 'playwright'", specifier = "~=1.55" }, + { name = "tqdm", specifier = ">=4.66.0" }, { name = "typer", specifier = ">=0.9.0" }, { name = "websockets", specifier = ">=13.1" }, ] From a7dd8ed34c15022b34840a30c24b2564597b67e5 Mon Sep 17 00:00:00 2001 From: Lucas Giordano Date: Sun, 2 Nov 2025 22:31:13 +0100 Subject: [PATCH 4/6] improve few stuff --- packages/notte-sdk/src/notte_sdk/cli/__init__.py | 15 +++++++++------ .../src/notte_sdk/endpoints/workflows.py | 8 ++++++-- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/packages/notte-sdk/src/notte_sdk/cli/__init__.py b/packages/notte-sdk/src/notte_sdk/cli/__init__.py index f0e4fc6dc..bffbf06b8 100644 --- a/packages/notte-sdk/src/notte_sdk/cli/__init__.py +++ b/packages/notte-sdk/src/notte_sdk/cli/__init__.py @@ -436,6 +436,7 @@ def benchmark( def run_iteration(iteration_num: int) -> dict[str, Any]: """Run a single benchmark iteration.""" iteration_start = time.time() + workflow_id: str | None = None try: if local: # Run locally @@ -452,6 +453,7 @@ def run_iteration(iteration_num: int) -> dict[str, Any]: success = result.returncode == 0 run_id = f"local-{iteration_num}" status = "closed" if success else "failed" + workflow_id = metadata.workflow_id if metadata else None else: # Run on cloud assert workflow_obj is not None @@ -466,6 +468,7 @@ def run_iteration(iteration_num: int) -> dict[str, Any]: execution_time = iteration_end - iteration_start success = result.status == "closed" run_id = result.workflow_run_id + workflow_id = result.workflow_id # Get workflow_id from response status = result.status return { @@ -474,7 +477,7 @@ def run_iteration(iteration_num: int) -> dict[str, Any]: "execution_time": execution_time, "run_id": run_id, "status": status, - "workflow_id": metadata.workflow_id if metadata else None, + "workflow_id": workflow_id, } except Exception as e: iteration_end = time.time() @@ -584,8 +587,8 @@ def run_iteration(iteration_num: int) -> dict[str, Any]: avg_execution_time = 0.0 # Use consistent width for all separators - # Table columns: Status (8) + Time (10) + Run ID (40) + Console URL (40) + 3 spaces = 101 chars - separator_width = 101 + # Table columns: Status (8) + Time (12) + Run ID (40) + Console URL (80) + 3 spaces = 143 chars + separator_width = 143 separator_double = "=" * separator_width separator_single = "-" * separator_width @@ -608,7 +611,7 @@ def run_iteration(iteration_num: int) -> dict[str, Any]: logger.info(separator_single) # Table header - header = f"{'Status':<8} {'Time':<10} {'Run ID':<40} {'Console URL':<40}" + header = f"{'Status':<8} {'Time':<12} {'Run ID':<40} {'Console URL':<80}" logger.info(header) logger.info(separator_single) @@ -617,13 +620,13 @@ def run_iteration(iteration_num: int) -> dict[str, Any]: execution_time_str = f"{r['execution_time']:.2f}s" run_id_str = r["run_id"][:38] # Truncate if too long - # Build console URL + # Build console URL using workflow_id and run_id from response if r["workflow_id"] and not local: console_url = f"https://console.notte.cc/logs/workflows/{r['workflow_id']}/runs/{r['run_id']}" else: console_url = "N/A (local run)" - row = f"{status_icon:<8} {execution_time_str:<10} {run_id_str:<40} {console_url[:38]:<40}" + row = f"{status_icon:<8} {execution_time_str:<12} {run_id_str:<40} {console_url:<80}" logger.info(row) logger.info(separator_single) diff --git a/packages/notte-sdk/src/notte_sdk/endpoints/workflows.py b/packages/notte-sdk/src/notte_sdk/endpoints/workflows.py index 12f1dd471..b233ffdf3 100644 --- a/packages/notte-sdk/src/notte_sdk/endpoints/workflows.py +++ b/packages/notte-sdk/src/notte_sdk/endpoints/workflows.py @@ -520,9 +520,13 @@ def run( elif message["type"] == "session_start": session_id = log_msg - logger.info( - f"Live viewer for session available at: https://api.notte.cc/sessions/viewer/index.html?ws=wss://api.notte.cc/sessions/{session_id}/debug/recording?token={self.token}" + base_url = self.server_url.rstrip("/") + base_ws_url = ( + self.server_url.replace("https://", "wss://").replace("http://", "ws://").rstrip("/") ) + viewer_url = f"{base_url}/sessions/viewer/index.html?ws={base_ws_url}/sessions/{session_id}/debug/recording?token={self.token}" + + logger.info(f"Live viewer for session available at: {viewer_url}") except json.JSONDecodeError: continue From fa8d53fb8bb28820608a42f19cf1fe99830aa245 Mon Sep 17 00:00:00 2001 From: Lucas Giordano Date: Mon, 3 Nov 2025 10:25:55 +0100 Subject: [PATCH 5/6] remove mfa coed --- packages/notte-core/src/notte_core/credentials/base.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/packages/notte-core/src/notte_core/credentials/base.py b/packages/notte-core/src/notte_core/credentials/base.py index c359b0326..01dc6745b 100644 --- a/packages/notte-core/src/notte_core/credentials/base.py +++ b/packages/notte-core/src/notte_core/credentials/base.py @@ -486,14 +486,6 @@ async def add_credentials_from_env_async(self, url: str) -> None: def get_credentials(self, url: str) -> CredentialsDict | None: return asyncio.run(self.get_credentials_async(url=url)) - def get_mfa_code(self, url: str) -> str: - cred = self.get_credentials(url) - if cred is None: - raise ValueError(f"No credentials found for {url}") - if "mfa_secret" not in cred: - raise ValueError(f"No mfa secret found for {url}. Please update your credentials to include an mfa secret.") - return TOTP(cred["mfa_secret"]).now() - @profiler.profiled() # noqa: F821 async def get_credentials_async(self, url: str) -> CredentialsDict | None: """Get credentials for a given URL. From a2c2df45f466826cf9752243982004a79dfcbc21 Mon Sep 17 00:00:00 2001 From: Lucas Giordano Date: Mon, 3 Nov 2025 10:51:26 +0100 Subject: [PATCH 6/6] simplify cli --- packages/notte-sdk/pyproject.toml | 2 +- .../notte-sdk/src/notte_sdk/cli/README.md | 68 ++ .../notte-sdk/src/notte_sdk/cli/__init__.py | 647 +--------------- .../notte-sdk/src/notte_sdk/cli/workflow.py | 721 ++++++++++++++++++ .../src/notte_sdk/cli/workflow_cli.py | 22 +- 5 files changed, 832 insertions(+), 628 deletions(-) create mode 100644 packages/notte-sdk/src/notte_sdk/cli/README.md create mode 100644 packages/notte-sdk/src/notte_sdk/cli/workflow.py diff --git a/packages/notte-sdk/pyproject.toml b/packages/notte-sdk/pyproject.toml index 42406f4f1..dbc2abd77 100644 --- a/packages/notte-sdk/pyproject.toml +++ b/packages/notte-sdk/pyproject.toml @@ -25,7 +25,7 @@ playwright = [ ] [project.scripts] -notte-workflow = "notte_sdk.cli:main" +notte = "notte_sdk.cli:main" [build-system] requires = ["hatchling"] diff --git a/packages/notte-sdk/src/notte_sdk/cli/README.md b/packages/notte-sdk/src/notte_sdk/cli/README.md new file mode 100644 index 000000000..da3c426de --- /dev/null +++ b/packages/notte-sdk/src/notte_sdk/cli/README.md @@ -0,0 +1,68 @@ +# Notte Workflow CLI + +Manage Notte workflow lifecycle from the command line. + +## Usage + +```bash +notte workflow [--workflow-path FILE] COMMAND [OPTIONS] +``` + +## Commands + +### Create +```bash +notte workflow --workflow-path my_workflow.py create +``` + +### Update +```bash +notte workflow --workflow-path my_workflow.py update +``` + +### Run +```bash +# Run locally +notte workflow --workflow-path my_workflow.py run --local + +# Run on cloud +notte workflow --workflow-path my_workflow.py run --variables vars.json +``` + +### Benchmark +```bash +# Run 10 iterations locally +notte workflow --workflow-path my_workflow.py benchmark --local --iterations 10 + +# Run on cloud with parallelism +notte workflow --workflow-path my_workflow.py benchmark --iterations 50 --parallelism 4 +``` + +## Auto-Detection + +When running from a workflow file, `--workflow-path` is optional: + +```python +# my_workflow.py +from notte_sdk import NotteClient, workflow_cli + +def run(url: str) -> str: + # ... workflow code ... + return result + +if __name__ == "__main__": + workflow_cli() # Enables CLI commands +``` + +```bash +# These work without --workflow-path +python my_workflow.py create +python my_workflow.py run --local +python my_workflow.py benchmark --iterations 10 +``` + +## Environment Variables + +- `NOTTE_API_KEY` - API key (required for cloud operations) +- `NOTTE_API_URL` - API server URL (optional) + diff --git a/packages/notte-sdk/src/notte_sdk/cli/__init__.py b/packages/notte-sdk/src/notte_sdk/cli/__init__.py index bffbf06b8..3f6226d8a 100644 --- a/packages/notte-sdk/src/notte_sdk/cli/__init__.py +++ b/packages/notte-sdk/src/notte_sdk/cli/__init__.py @@ -1,639 +1,40 @@ +""" +Main CLI module for Notte. + +This module aggregates all CLI subcommands (workflow, session, agent, etc.) +into a single unified CLI interface. + +Usage: + notte workflow create + notte workflow run + notte workflow benchmark +""" + from __future__ import annotations -import contextlib -import importlib.util -import json -import sys -import time -from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path -from typing import Annotated, Any, Callable import typer -from notte_core.common.logging import logger -from tqdm import tqdm -from notte_sdk.cli.metadata import ( - WorkflowMetadata, - comment_main_block, - get_git_author, - insert_metadata_block, - uncomment_main_block, -) -from notte_sdk.client import NotteClient -from notte_sdk.decorators import get_workflow_description, get_workflow_name, is_workflow +from notte_sdk.cli import workflow +# Main CLI app app = typer.Typer( - name="notte-workflow", - help="Notte Workflow CLI - Manage workflow lifecycle from your workflow files", + name="notte", + help="Notte CLI - Manage workflows, sessions, agents, and more", add_completion=False, no_args_is_help=True, ) +# Add workflow subcommand +app.add_typer(workflow.workflow_app, name="workflow") -# Common argument definitions -def get_default_file_path() -> Path: - """Get the default file path from sys.argv[0] if running from a workflow file.""" - if len(sys.argv) > 1 and sys.argv[1] in ["create", "update", "run", "benchmark"]: - return Path(sys.argv[0]).resolve() - raise typer.BadParameter("File path is required when not running from a workflow file") - - -FILE_ARG = Annotated[ - Path, - typer.Argument( - help="Path to the workflow Python file", - default_factory=get_default_file_path, - ), -] - -API_KEY_ARG = Annotated[ - str | None, - typer.Option( - "--api-key", - help="Notte API key (defaults to NOTTE_API_KEY environment variable)", - envvar="NOTTE_API_KEY", - ), -] - -SERVER_URL_ARG = Annotated[ - str | None, - typer.Option( - "--server-url", - help="Notte API server URL (defaults to NOTTE_API_URL environment variable)", - envvar="NOTTE_API_URL", - ), -] - - -def find_workflow_function(module: Any) -> tuple[Any, str] | None: - """Find the workflow function in a module. - - First tries to find a function with @workflow decorator. - If not found, tries to find a function named 'run' or the only function in the module. - """ - # First, try to find a function with @workflow decorator - for name in dir(module): - obj = getattr(module, name) - if callable(obj) and is_workflow(obj): - return obj, name - - # If no decorated function found, try to find 'run' function - if hasattr(module, "run"): - obj = getattr(module, "run") - if callable(obj) and not obj.__name__.startswith("_"): # Skip private functions - return obj, "run" - - # Last resort: if there's only one callable function, use it - callable_functions: list[tuple[Callable[..., Any], str]] = [] - for name in dir(module): - obj = getattr(module, name) - if ( - callable(obj) - and not name.startswith("_") - and not isinstance(obj, type) # Skip classes - and not isinstance(obj, type(__builtins__)) # Skip builtins - ): - # Skip common non-workflow functions - if name not in ["main", "app", "cli"]: - callable_functions.append((obj, name)) - - if len(callable_functions) == 1: - return callable_functions[0] - - return None - - -def load_workflow_file(file_path: Path) -> tuple[Any, str, Any]: - """ - Load a workflow file and find the workflow function. - - Sets __name__ to prevent execution of if __name__ == "__main__" blocks. - """ - spec = importlib.util.spec_from_file_location("workflow_module", file_path) - if spec is None or spec.loader is None: - raise typer.BadParameter(f"Could not load module from {file_path}") - - module = importlib.util.module_from_spec(spec) - # Set __name__ to module name (not "__main__") to prevent __main__ block execution - module.__name__ = spec.name - spec.loader.exec_module(module) - - result = find_workflow_function(module) - if result is None: - raise typer.BadParameter( - f"No workflow function found in {file_path}. " - + "Either decorate a function with @workflow, name it 'run', or ensure there's only one function in the file." - ) - - func, func_name = result - return module, func_name, func - - -@contextlib.contextmanager -def workflow_file_for_upload(file_path: Path): - """ - Context manager for preparing workflow file for upload. - - Comments out __main__ block, yields temp file, then restores original. - """ - content = file_path.read_text(encoding="utf-8") - content, was_commented = comment_main_block(content) - if was_commented: - logger.debug("Commented out __main__ block for upload") - - temp_file = file_path.parent / f".{file_path.stem}_temp{file_path.suffix}" - _ = temp_file.write_text(content, encoding="utf-8") - - try: - yield temp_file - finally: - # Clean up temp file - if temp_file.exists(): - temp_file.unlink() - # Restore __main__ block if it was commented - if was_commented: - content = file_path.read_text(encoding="utf-8") - content, _ = uncomment_main_block(content) - _ = file_path.write_text(content, encoding="utf-8") - - -def get_workflow_metadata(file_path: Path, require_id: bool = False) -> WorkflowMetadata: - """Get workflow metadata from file, raising typer errors if invalid.""" - metadata = WorkflowMetadata.from_file(file_path) - if not metadata: - raise typer.BadParameter("No workflow metadata found. Run 'create' command first.") - if require_id: - if not metadata.workflow_id: - raise typer.BadParameter("No workflow ID found. Run 'create' command first.") - # Type narrowing: at this point workflow_id is guaranteed to be str - assert metadata.workflow_id is not None - return metadata - - -def update_metadata_in_file(file_path: Path, metadata: WorkflowMetadata) -> None: - """Update metadata block in workflow file.""" - content = file_path.read_text(encoding="utf-8") - content = insert_metadata_block(content, metadata) - _ = file_path.write_text(content, encoding="utf-8") - - -@app.command() -def create( - file: FILE_ARG, - api_key: API_KEY_ARG = None, - server_url: SERVER_URL_ARG = None, -) -> None: - """Create a new workflow.""" - if not api_key: - raise typer.BadParameter("NOTTE_API_KEY not found. Set it in environment or use --api-key flag.") - - logger.info(f"Creating workflow from {file}") - - # Load the workflow function - _module, _func_name, func = load_workflow_file(file) - - # Get workflow metadata from decorator (if present) - name = get_workflow_name(func) - description = get_workflow_description(func) - - # If no decorator found, prompt interactively - if not name: - # Suggest a default name based on file name - default_name = file.stem.replace("_", " ").title() - - logger.info("No @workflow decorator found. Please provide workflow metadata:") - name = typer.prompt("Workflow name", default=default_name) - description = typer.prompt("Workflow description (optional)", default="", show_default=False) - if not description.strip(): - description = None - - # Check if metadata already exists - existing_metadata = WorkflowMetadata.from_file(file) - if existing_metadata and existing_metadata.workflow_id: - raise typer.BadParameter( - f"Workflow already exists with ID: {existing_metadata.workflow_id}. Use 'update' command to update it." - ) - - with workflow_file_for_upload(file) as temp_file: - # Create client and workflow - client = NotteClient(api_key=api_key, server_url=server_url) - workflow_obj = client.Workflow(workflow_path=str(temp_file), name=name, description=description, _client=client) - - logger.info(f"Workflow created with ID: {workflow_obj.workflow_id}") - - # Create metadata and insert into file - metadata = WorkflowMetadata( - workflow_id=workflow_obj.workflow_id, - name=name, - description=description, - author=get_git_author(file), - creation_date=workflow_obj.response.created_at.isoformat(), - last_update_date=workflow_obj.response.updated_at.isoformat(), - ) - - update_metadata_in_file(file, metadata) - - logger.info(f"Metadata block added to {file}") - logger.info(f"You can reference this workflow using: notte.Workflow('{workflow_obj.workflow_id}')") - - -@app.command() -def update( - file: FILE_ARG, - api_key: API_KEY_ARG = None, - server_url: SERVER_URL_ARG = None, - restricted: Annotated[ - bool, typer.Option("--restricted/--no-restricted", help="Run workflow in restricted mode") - ] = True, -) -> None: - """Update an existing workflow.""" - if not api_key: - raise typer.BadParameter("NOTTE_API_KEY not found. Set it in environment or use --api-key flag.") - - logger.info(f"Updating workflow from {file}") - - # Read metadata - metadata = get_workflow_metadata(file, require_id=True) - # Type narrowing: workflow_id is guaranteed to be str when require_id=True - assert metadata.workflow_id is not None - - with workflow_file_for_upload(file) as temp_file: - # Update workflow - client = NotteClient(api_key=api_key, server_url=server_url) - workflow_obj = client.Workflow(workflow_id=metadata.workflow_id, _client=client) - workflow_obj.update(workflow_path=str(temp_file), restricted=restricted) - - logger.info(f"Workflow {metadata.workflow_id} updated successfully") - - # Update metadata - metadata.last_update_date = workflow_obj.response.updated_at.isoformat() - update_metadata_in_file(file, metadata) - - logger.info(f"Metadata updated in {file}") - - -@app.command() -def run( - file: FILE_ARG, - api_key: API_KEY_ARG = None, - server_url: SERVER_URL_ARG = None, - local: Annotated[bool, typer.Option("--local", help="Run workflow locally instead of on cloud")] = False, - variables: Annotated[ - Path | None, typer.Option("--variables", help="JSON file containing workflow variables") - ] = None, - timeout: Annotated[int | None, typer.Option("--timeout", help="Timeout in seconds for cloud runs")] = None, - stream: Annotated[ - bool, typer.Option("--stream/--no-stream", help="Enable/disable streaming logs for cloud runs") - ] = True, - raise_on_failure: Annotated[ - bool, typer.Option("--raise-on-failure/--no-raise-on-failure", help="Raise exception on workflow failure") - ] = True, -) -> None: - """Run a workflow.""" - if local: - logger.info(f"Running workflow locally from {file}") - import subprocess - - result = subprocess.run([sys.executable, str(file)], check=False) - raise typer.Exit(result.returncode) - - if not api_key: - raise typer.BadParameter("NOTTE_API_KEY not found. Set it in environment or use --api-key flag.") - - logger.info(f"Running workflow on cloud from {file}") - - # Read metadata - metadata = get_workflow_metadata(file, require_id=True) - # Type narrowing: workflow_id is guaranteed to be str when require_id=True - assert metadata.workflow_id is not None - - # Load variables if provided - variables_dict: dict[str, Any] = {} - if variables: - if not variables.exists(): - raise typer.BadParameter(f"Variables file not found: {variables}") - variables_dict = json.loads(variables.read_text(encoding="utf-8")) - - # Run workflow - client = NotteClient(api_key=api_key, server_url=server_url) - workflow_obj = client.Workflow(workflow_id=metadata.workflow_id, _client=client) - result = workflow_obj.run( - timeout=timeout, - stream=stream, - raise_on_failure=raise_on_failure, - **variables_dict, - ) - - logger.info(f"Workflow completed with status: {result.status}") - logger.info(f"Result: {result.result}") - - -@app.command() -def benchmark( - file: FILE_ARG, - api_key: API_KEY_ARG = None, - server_url: SERVER_URL_ARG = None, - local: Annotated[bool, typer.Option("--local", help="Run workflow locally instead of on cloud")] = False, - iterations: Annotated[int, typer.Option("--iterations", help="Maximum number of iterations to run")] = 10, - timeout: Annotated[int, typer.Option("--timeout", help="Timeout in minutes for the entire benchmark")] = 20, - parallelism: Annotated[int, typer.Option("--parallelism", help="Number of parallel runs (default: 1)")] = 1, - variables: Annotated[ - Path | None, typer.Option("--variables", help="JSON file containing workflow variables") - ] = None, -) -> None: - """Run a benchmark test with multiple iterations of the workflow.""" - timeout_seconds = timeout * 60 # Convert minutes to seconds - - # Interactive prompts if running without flags - # Check if any benchmark-specific flags were provided - benchmark_flags = ["--local", "--iterations", "--timeout", "--variables", "--parallelism"] - has_flags = any(flag in sys.argv for flag in benchmark_flags) - - if not has_flags: - logger.info("Running benchmark interactively. Press Enter to use defaults.") - logger.info("") - - # Prompt for local vs cloud - while True: - local_input = ( - typer.prompt( - "Run locally or on cloud? [local/cloud]", - default="cloud", - ) - .strip() - .lower() - ) - if local_input in ["local", "cloud"]: - local = local_input == "local" - break - logger.error("Please enter 'local' or 'cloud'") - - # Prompt for iterations - iterations = typer.prompt("Number of iterations", default=10, type=int) - - # Prompt for timeout - timeout = typer.prompt("Timeout in minutes", default=20, type=int) - - # Prompt for parallelism - parallelism = typer.prompt("Parallelism level (number of parallel runs)", default=1, type=int) - if parallelism < 1: - parallelism = 1 - if parallelism > iterations: - parallelism = iterations - logger.warning(f"Parallelism reduced to {iterations} (cannot exceed number of iterations)") - - # Prompt for variables file (optional) - variables_input = typer.prompt( - "Variables file path (optional, press Enter to skip)", - default="", - show_default=False, - ) - if variables_input.strip(): - variables = Path(variables_input.strip()) - if not variables.exists(): - raise typer.BadParameter(f"Variables file not found: {variables}") - else: - variables = None - - logger.info("") - - # Validate parallelism - if parallelism < 1: - parallelism = 1 - if parallelism > iterations: - parallelism = iterations - logger.warning(f"Parallelism reduced to {iterations} (cannot exceed number of iterations)") - - if local: - logger.info( - f"Running benchmark locally from {file} ({iterations} iterations, {timeout} min timeout, parallelism={parallelism})" - ) - else: - if not api_key: - raise typer.BadParameter("NOTTE_API_KEY not found. Set it in environment or use --api-key flag.") - logger.info( - f"Running benchmark on cloud from {file} ({iterations} iterations, {timeout} min timeout, parallelism={parallelism})" - ) - - # Read metadata for cloud runs - metadata: WorkflowMetadata | None = None - workflow_obj: Any | None = None - if not local: - metadata = get_workflow_metadata(file, require_id=True) - assert metadata.workflow_id is not None - client = NotteClient(api_key=api_key, server_url=server_url) - workflow_obj = client.Workflow(workflow_id=metadata.workflow_id, _client=client) - - # Load variables if provided - variables_dict: dict[str, Any] = {} - if variables: - if not variables.exists(): - raise typer.BadParameter(f"Variables file not found: {variables}") - variables_dict = json.loads(variables.read_text(encoding="utf-8")) - - # Helper function to run a single iteration - def run_iteration(iteration_num: int) -> dict[str, Any]: - """Run a single benchmark iteration.""" - iteration_start = time.time() - workflow_id: str | None = None - try: - if local: - # Run locally - import subprocess - - result = subprocess.run( - [sys.executable, str(file)], - check=False, - capture_output=True, - text=True, - ) - iteration_end = time.time() - execution_time = iteration_end - iteration_start - success = result.returncode == 0 - run_id = f"local-{iteration_num}" - status = "closed" if success else "failed" - workflow_id = metadata.workflow_id if metadata else None - else: - # Run on cloud - assert workflow_obj is not None - assert metadata is not None - result = workflow_obj.run( - timeout=None, # Use default timeout per run - stream=False, - raise_on_failure=False, # Don't raise for benchmark - **variables_dict, - ) - iteration_end = time.time() - execution_time = iteration_end - iteration_start - success = result.status == "closed" - run_id = result.workflow_run_id - workflow_id = result.workflow_id # Get workflow_id from response - status = result.status - - return { - "iteration": iteration_num, - "success": success, - "execution_time": execution_time, - "run_id": run_id, - "status": status, - "workflow_id": workflow_id, - } - except Exception as e: - iteration_end = time.time() - execution_time = iteration_end - iteration_start - logger.error(f"\nIteration {iteration_num} failed with exception: {e}") - return { - "iteration": iteration_num, - "success": False, - "execution_time": execution_time, - "run_id": f"error-{iteration_num}", - "status": "failed", - "workflow_id": metadata.workflow_id if metadata else None, - } - - # Benchmark results - results: list[dict[str, Any]] = [] - start_time = time.time() - - # Create progress bar - pbar = tqdm( - total=iterations, - desc="Benchmark progress", - unit="run", - bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]", - ) - - try: - if parallelism == 1: - # Sequential execution (original behavior) - for i in range(iterations): - # Check if we've exceeded the timeout - elapsed = time.time() - start_time - if elapsed >= timeout_seconds: - pbar.close() - logger.info(f"\nTimeout reached ({timeout} min). Stopping benchmark after {i} iterations.") - break - - # Update progress bar description with current iteration - pbar.set_description(f"Running iteration {i + 1}/{iterations}") - result = run_iteration(i + 1) - results.append(result) - - # Update progress bar with result - status_icon = "✅" if result["success"] else "❌" - pbar.set_postfix_str(f"{status_icon} {result['execution_time']:.2f}s") - _ = pbar.update(1) - else: - # Parallel execution - with ThreadPoolExecutor(max_workers=parallelism) as executor: - # Submit all tasks - future_to_iteration = {executor.submit(run_iteration, i + 1): i + 1 for i in range(iterations)} - - # Process completed futures as they finish - for future in as_completed(future_to_iteration): - # Check if we've exceeded the timeout - elapsed = time.time() - start_time - if elapsed >= timeout_seconds: - pbar.close() - logger.info(f"\nTimeout reached ({timeout} min). Cancelling remaining tasks...") - # Cancel remaining futures - for f in future_to_iteration: - _ = f.cancel() - break - - try: - result = future.result() - results.append(result) - - # Update progress bar with result - status_icon = "✅" if result["success"] else "❌" - pbar.set_postfix_str(f"{status_icon} {result['execution_time']:.2f}s") - _ = pbar.update(1) - except Exception as e: - iteration_num = future_to_iteration[future] - logger.error(f"\nIteration {iteration_num} failed with exception: {e}") - results.append( - { - "iteration": iteration_num, - "success": False, - "execution_time": 0.0, - "run_id": f"error-{iteration_num}", - "status": "failed", - "workflow_id": metadata.workflow_id if metadata else None, - } - ) - _ = pbar.update(1) - - # Sort results by iteration number for consistent display - results.sort(key=lambda x: x["iteration"]) - - finally: - pbar.close() - logger.info("") # New line after progress bar - - # Calculate summary statistics - total_runs = len(results) - successful_runs = sum(1 for r in results if r["success"]) - failed_runs = total_runs - successful_runs - success_rate = (successful_runs / total_runs * 100) if total_runs > 0 else 0.0 - - # Calculate average execution time for successful runs, or failed runs if all failed - if successful_runs > 0: - avg_execution_time = sum(r["execution_time"] for r in results if r["success"]) / successful_runs - elif failed_runs > 0: - avg_execution_time = sum(r["execution_time"] for r in results if not r["success"]) / failed_runs - else: - avg_execution_time = 0.0 - - # Use consistent width for all separators - # Table columns: Status (8) + Time (12) + Run ID (40) + Console URL (80) + 3 spaces = 143 chars - separator_width = 143 - separator_double = "=" * separator_width - separator_single = "-" * separator_width - - # Display summary - logger.info("") - logger.info(separator_double) - logger.info("BENCHMARK SUMMARY") - logger.info(separator_double) - logger.info(f"Total runs: {total_runs}") - logger.info(f"Successful: {successful_runs}") - logger.info(f"Failed: {failed_runs}") - logger.info(f"Success rate: {success_rate:.1f}%") - logger.info(f"Average execution time: {avg_execution_time:.2f}s") - logger.info(f"Total benchmark time: {time.time() - start_time:.2f}s") - logger.info(separator_double) - - # Display results table - logger.info("") - logger.info("Detailed Results:") - logger.info(separator_single) - - # Table header - header = f"{'Status':<8} {'Time':<12} {'Run ID':<40} {'Console URL':<80}" - logger.info(header) - logger.info(separator_single) - - for r in results: - status_icon = "✅" if r["success"] else "❌" - execution_time_str = f"{r['execution_time']:.2f}s" - run_id_str = r["run_id"][:38] # Truncate if too long - - # Build console URL using workflow_id and run_id from response - if r["workflow_id"] and not local: - console_url = f"https://console.notte.cc/logs/workflows/{r['workflow_id']}/runs/{r['run_id']}" - else: - console_url = "N/A (local run)" - - row = f"{status_icon:<8} {execution_time_str:<12} {run_id_str:<40} {console_url:<80}" - logger.info(row) - - logger.info(separator_single) - - # Exit with error code if all runs failed - if failed_runs == total_runs and total_runs > 0: - raise typer.Exit(1) +# Future subcommands can be added here: +# from notte_sdk.cli import session +# app.add_typer(session.session_app, name="session") +# +# from notte_sdk.cli import agent +# app.add_typer(agent.agent_app, name="agent") def main(_file_path: Path | None = None) -> None: diff --git a/packages/notte-sdk/src/notte_sdk/cli/workflow.py b/packages/notte-sdk/src/notte_sdk/cli/workflow.py new file mode 100644 index 000000000..bf01feecd --- /dev/null +++ b/packages/notte-sdk/src/notte_sdk/cli/workflow.py @@ -0,0 +1,721 @@ +from __future__ import annotations + +import contextlib +import importlib.util +import json +import sys +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path +from typing import Annotated, Any, Callable + +import typer +from notte_core.common.logging import logger +from tqdm import tqdm + +from notte_sdk.cli.metadata import ( + WorkflowMetadata, + comment_main_block, + get_git_author, + insert_metadata_block, + uncomment_main_block, +) +from notte_sdk.client import NotteClient +from notte_sdk.decorators import get_workflow_description, get_workflow_name, is_workflow + +# Create workflow subcommand group +workflow_app = typer.Typer( + name="workflow", + help="Manage workflow lifecycle (create, update, run, benchmark)", + add_completion=False, + no_args_is_help=True, +) + + +@workflow_app.callback(invoke_without_command=True) +def workflow_callback( + ctx: typer.Context, + workflow_path: Annotated[ + Path | None, + typer.Option( + "--workflow-path", + help="Path to the workflow Python file (required unless running from workflow file)", + ), + ] = None, +) -> None: + """Workflow management commands.""" + # Store workflow_path in context for subcommands to access + _ = ctx.ensure_object(dict) # type: ignore[assignment] + ctx.obj["workflow_path"] = workflow_path + + # If no command provided, show help + if ctx.invoked_subcommand is None: + raise typer.Exit() + + +def get_workflow_path_from_context(ctx: typer.Context, workflow_path: Path | None) -> Path: + """ + Get workflow_path from context (parent) or parameter (subcommand), with fallback to auto-detection. + + Priority: + 1. Subcommand --workflow-path parameter (highest priority) + 2. Parent (workflow) --workflow-path option + 3. Auto-detection from sys.argv[0] when running from workflow file + + Raises: + typer.BadParameter: If no path provided and auto-detection fails + """ + # Priority 1: subcommand parameter + if workflow_path is not None: + return workflow_path.resolve() + + # Priority 2: parent context (from callback) + if ctx.parent and isinstance(ctx.parent.obj, dict): + parent_obj: dict[str, Any] = ctx.parent.obj # type: ignore[assignment] + parent_path = parent_obj.get("workflow_path") + if parent_path is not None and isinstance(parent_path, Path): + return parent_path.resolve() + + # Priority 3: auto-detection + auto_detected = get_default_file_path() + if auto_detected is not None: + return auto_detected + + raise typer.BadParameter("File path is required. Provide it using --workflow-path option.") + + +# Common argument definitions +def get_default_file_path() -> Path | None: + """Get the default file path from sys.argv[0] if running from a workflow file.""" + # Check for workflow subcommand followed by a command + # sys.argv format: ['script.py', 'workflow', 'create', ...] or ['script.py', 'create', ...] + if len(sys.argv) > 1: + # Handle both "workflow create" and direct "create" (for backward compatibility) + cmd_index = 1 + if sys.argv[1] == "workflow" and len(sys.argv) > 2: + cmd_index = 2 + if sys.argv[cmd_index] in ["create", "update", "run", "benchmark"]: + return Path(sys.argv[0]).resolve() + return None + + +API_KEY_ARG = Annotated[ + str | None, + typer.Option( + "--api-key", + help="Notte API key (defaults to NOTTE_API_KEY environment variable)", + envvar="NOTTE_API_KEY", + ), +] + +SERVER_URL_ARG = Annotated[ + str | None, + typer.Option( + "--server-url", + help="Notte API server URL (defaults to NOTTE_API_URL environment variable)", + envvar="NOTTE_API_URL", + ), +] + + +def find_workflow_function(module: Any) -> tuple[Any, str] | None: + """Find the workflow function in a module. + + First tries to find a function with @workflow decorator. + If not found, tries to find a function named 'run' or the only function in the module. + """ + # First, try to find a function with @workflow decorator + for name in dir(module): + obj = getattr(module, name) + if callable(obj) and is_workflow(obj): + return obj, name + + # If no decorated function found, try to find 'run' function + if hasattr(module, "run"): + obj = getattr(module, "run") + if callable(obj) and not obj.__name__.startswith("_"): # Skip private functions + return obj, "run" + + # Last resort: if there's only one callable function, use it + callable_functions: list[tuple[Callable[..., Any], str]] = [] + for name in dir(module): + obj = getattr(module, name) + if ( + callable(obj) + and not name.startswith("_") + and not isinstance(obj, type) # Skip classes + and not isinstance(obj, type(__builtins__)) # Skip builtins + ): + # Skip common non-workflow functions + if name not in ["main", "app", "cli"]: + callable_functions.append((obj, name)) + + if len(callable_functions) == 1: + return callable_functions[0] + + return None + + +def load_workflow_file(file_path: Path) -> tuple[Any, str, Any]: + """ + Load a workflow file and find the workflow function. + + Sets __name__ to prevent execution of if __name__ == "__main__" blocks. + """ + spec = importlib.util.spec_from_file_location("workflow_module", file_path) + if spec is None or spec.loader is None: + raise typer.BadParameter(f"Could not load module from {file_path}") + + module = importlib.util.module_from_spec(spec) + # Set __name__ to module name (not "__main__") to prevent __main__ block execution + module.__name__ = spec.name + spec.loader.exec_module(module) + + result = find_workflow_function(module) + if result is None: + raise typer.BadParameter( + f"No workflow function found in {file_path}. " + + "Either decorate a function with @workflow, name it 'run', or ensure there's only one function in the file." + ) + + func, func_name = result + return module, func_name, func + + +@contextlib.contextmanager +def workflow_file_for_upload(file_path: Path): + """ + Context manager for preparing workflow file for upload. + + Comments out __main__ block, yields temp file, then restores original. + """ + content = file_path.read_text(encoding="utf-8") + content, was_commented = comment_main_block(content) + if was_commented: + logger.debug("Commented out __main__ block for upload") + + temp_file = file_path.parent / f".{file_path.stem}_temp{file_path.suffix}" + _ = temp_file.write_text(content, encoding="utf-8") + + try: + yield temp_file + finally: + # Clean up temp file + if temp_file.exists(): + temp_file.unlink() + # Restore __main__ block if it was commented + if was_commented: + content = file_path.read_text(encoding="utf-8") + content, _ = uncomment_main_block(content) + _ = file_path.write_text(content, encoding="utf-8") + + +def get_workflow_metadata(file_path: Path, require_id: bool = False) -> WorkflowMetadata: + """Get workflow metadata from file, raising typer errors if invalid.""" + metadata = WorkflowMetadata.from_file(file_path) + if not metadata: + raise typer.BadParameter("No workflow metadata found. Run 'create' command first.") + if require_id: + if not metadata.workflow_id: + raise typer.BadParameter("No workflow ID found. Run 'create' command first.") + # Type narrowing: at this point workflow_id is guaranteed to be str + assert metadata.workflow_id is not None + return metadata + + +def update_metadata_in_file(file_path: Path, metadata: WorkflowMetadata) -> None: + """Update metadata block in workflow file.""" + content = file_path.read_text(encoding="utf-8") + content = insert_metadata_block(content, metadata) + _ = file_path.write_text(content, encoding="utf-8") + + +@workflow_app.command() +def create( + ctx: typer.Context, + workflow_path: Annotated[ + Path | None, + typer.Option( + "--workflow-path", + help="Path to the workflow Python file (required unless running from workflow file)", + ), + ] = None, + api_key: API_KEY_ARG = None, + server_url: SERVER_URL_ARG = None, +) -> None: + """Create a new workflow.""" + if not api_key: + raise typer.BadParameter("NOTTE_API_KEY not found. Set it in environment or use --api-key flag.") + + resolved_file = get_workflow_path_from_context(ctx, workflow_path) + logger.info(f"Creating workflow from {resolved_file}") + + # Load the workflow function + _module, _func_name, func = load_workflow_file(resolved_file) + + # Get workflow metadata from decorator (if present) + name = get_workflow_name(func) + description = get_workflow_description(func) + + # If no decorator found, prompt interactively + if not name: + # Suggest a default name based on file name + default_name = resolved_file.stem.replace("_", " ").title() + + logger.info("No @workflow decorator found. Please provide workflow metadata:") + name = typer.prompt("Workflow name", default=default_name) + description = typer.prompt("Workflow description (optional)", default="", show_default=False) + if not description.strip(): + description = None + + # Check if metadata already exists + existing_metadata = WorkflowMetadata.from_file(resolved_file) + if existing_metadata and existing_metadata.workflow_id: + raise typer.BadParameter( + f"Workflow already exists with ID: {existing_metadata.workflow_id}. Use 'update' command to update it." + ) + + with workflow_file_for_upload(resolved_file) as temp_file: + # Create client and workflow + client = NotteClient(api_key=api_key, server_url=server_url) + workflow_obj = client.Workflow(workflow_path=str(temp_file), name=name, description=description, _client=client) + + logger.info(f"Workflow created with ID: {workflow_obj.workflow_id}") + + # Create metadata and insert into file + metadata = WorkflowMetadata( + workflow_id=workflow_obj.workflow_id, + name=name, + description=description, + author=get_git_author(resolved_file), + creation_date=workflow_obj.response.created_at.isoformat(), + last_update_date=workflow_obj.response.updated_at.isoformat(), + ) + + update_metadata_in_file(resolved_file, metadata) + + logger.info(f"Metadata block added to {resolved_file}") + logger.info(f"You can reference this workflow using: notte.Workflow('{workflow_obj.workflow_id}')") + + +@workflow_app.command() +def update( + ctx: typer.Context, + workflow_path: Annotated[ + Path | None, + typer.Option( + "--workflow-path", + help="Path to the workflow Python file (required unless running from workflow file)", + ), + ] = None, + api_key: API_KEY_ARG = None, + server_url: SERVER_URL_ARG = None, + restricted: Annotated[ + bool, typer.Option("--restricted/--no-restricted", help="Run workflow in restricted mode") + ] = True, +) -> None: + """Update an existing workflow.""" + if not api_key: + raise typer.BadParameter("NOTTE_API_KEY not found. Set it in environment or use --api-key flag.") + + resolved_file = get_workflow_path_from_context(ctx, workflow_path) + logger.info(f"Updating workflow from {resolved_file}") + + # Read metadata + metadata = get_workflow_metadata(resolved_file, require_id=True) + # Type narrowing: workflow_id is guaranteed to be str when require_id=True + assert metadata.workflow_id is not None + + with workflow_file_for_upload(resolved_file) as temp_file: + # Update workflow + client = NotteClient(api_key=api_key, server_url=server_url) + workflow_obj = client.Workflow(workflow_id=metadata.workflow_id, _client=client) + workflow_obj.update(workflow_path=str(temp_file), restricted=restricted) + + logger.info(f"Workflow {metadata.workflow_id} updated successfully") + + # Update metadata + metadata.last_update_date = workflow_obj.response.updated_at.isoformat() + update_metadata_in_file(resolved_file, metadata) + + logger.info(f"Metadata updated in {resolved_file}") + + +@workflow_app.command() +def run( + ctx: typer.Context, + workflow_path: Annotated[ + Path | None, + typer.Option( + "--workflow-path", + help="Path to the workflow Python file (required unless running from workflow file)", + ), + ] = None, + api_key: API_KEY_ARG = None, + server_url: SERVER_URL_ARG = None, + local: Annotated[bool, typer.Option("--local", help="Run workflow locally instead of on cloud")] = False, + variables: Annotated[ + Path | None, typer.Option("--variables", help="JSON file containing workflow variables") + ] = None, + timeout: Annotated[int | None, typer.Option("--timeout", help="Timeout in seconds for cloud runs")] = None, + stream: Annotated[ + bool, typer.Option("--stream/--no-stream", help="Enable/disable streaming logs for cloud runs") + ] = True, + raise_on_failure: Annotated[ + bool, typer.Option("--raise-on-failure/--no-raise-on-failure", help="Raise exception on workflow failure") + ] = True, +) -> None: + """Run a workflow.""" + resolved_file = get_workflow_path_from_context(ctx, workflow_path) + + if local: + logger.info(f"Running workflow locally from {resolved_file}") + import subprocess + + result = subprocess.run([sys.executable, str(resolved_file)], check=False) + raise typer.Exit(result.returncode) + + if not api_key: + raise typer.BadParameter("NOTTE_API_KEY not found. Set it in environment or use --api-key flag.") + + logger.info(f"Running workflow on cloud from {resolved_file}") + + # Read metadata + metadata = get_workflow_metadata(resolved_file, require_id=True) + # Type narrowing: workflow_id is guaranteed to be str when require_id=True + assert metadata.workflow_id is not None + + # Load variables if provided + variables_dict: dict[str, Any] = {} + if variables: + if not variables.exists(): + raise typer.BadParameter(f"Variables file not found: {variables}") + variables_dict = json.loads(variables.read_text(encoding="utf-8")) + + # Run workflow + client = NotteClient(api_key=api_key, server_url=server_url) + workflow_obj = client.Workflow(workflow_id=metadata.workflow_id, _client=client) + result = workflow_obj.run( + timeout=timeout, + stream=stream, + raise_on_failure=raise_on_failure, + **variables_dict, + ) + + logger.info(f"Workflow completed with status: {result.status}") + logger.info(f"Result: {result.result}") + + +@workflow_app.command() +def benchmark( + ctx: typer.Context, + workflow_path: Annotated[ + Path | None, + typer.Option( + "--workflow-path", + help="Path to the workflow Python file (required unless running from workflow file)", + ), + ] = None, + api_key: API_KEY_ARG = None, + server_url: SERVER_URL_ARG = None, + local: Annotated[bool, typer.Option("--local", help="Run workflow locally instead of on cloud")] = False, + iterations: Annotated[int, typer.Option("--iterations", help="Maximum number of iterations to run")] = 10, + timeout: Annotated[int, typer.Option("--timeout", help="Timeout in minutes for the entire benchmark")] = 20, + parallelism: Annotated[int, typer.Option("--parallelism", help="Number of parallel runs (default: 1)")] = 1, + variables: Annotated[ + Path | None, typer.Option("--variables", help="JSON file containing workflow variables") + ] = None, +) -> None: + """Run a benchmark test with multiple iterations of the workflow.""" + resolved_file = get_workflow_path_from_context(ctx, workflow_path) + timeout_seconds = timeout * 60 # Convert minutes to seconds + + # Interactive prompts if running without flags + # Check if any benchmark-specific flags were provided + benchmark_flags = ["--local", "--iterations", "--timeout", "--variables", "--parallelism"] + has_flags = any(flag in sys.argv for flag in benchmark_flags) + + if not has_flags: + logger.info("Running benchmark interactively. Press Enter to use defaults.") + logger.info("") + + # Prompt for local vs cloud + while True: + local_input = ( + typer.prompt( + "Run locally or on cloud? [local/cloud]", + default="cloud", + ) + .strip() + .lower() + ) + if local_input in ["local", "cloud"]: + local = local_input == "local" + break + logger.error("Please enter 'local' or 'cloud'") + + # Prompt for iterations + iterations = typer.prompt("Number of iterations", default=10, type=int) + + # Prompt for timeout + timeout = typer.prompt("Timeout in minutes", default=20, type=int) + + # Prompt for parallelism + parallelism = typer.prompt("Parallelism level (number of parallel runs)", default=1, type=int) + if parallelism < 1: + parallelism = 1 + if parallelism > iterations: + parallelism = iterations + logger.warning(f"Parallelism reduced to {iterations} (cannot exceed number of iterations)") + + # Prompt for variables file (optional) + variables_input = typer.prompt( + "Variables file path (optional, press Enter to skip)", + default="", + show_default=False, + ) + if variables_input.strip(): + variables = Path(variables_input.strip()) + if not variables.exists(): + raise typer.BadParameter(f"Variables file not found: {variables}") + else: + variables = None + + logger.info("") + + # Validate parallelism + if parallelism < 1: + parallelism = 1 + if parallelism > iterations: + parallelism = iterations + logger.warning(f"Parallelism reduced to {iterations} (cannot exceed number of iterations)") + + if local: + logger.info( + f"Running benchmark locally from {resolved_file} ({iterations} iterations, {timeout} min timeout, parallelism={parallelism})" + ) + else: + if not api_key: + raise typer.BadParameter("NOTTE_API_KEY not found. Set it in environment or use --api-key flag.") + logger.info( + f"Running benchmark on cloud from {resolved_file} ({iterations} iterations, {timeout} min timeout, parallelism={parallelism})" + ) + + # Read metadata for cloud runs + metadata: WorkflowMetadata | None = None + workflow_obj: Any | None = None + if not local: + metadata = get_workflow_metadata(resolved_file, require_id=True) + assert metadata.workflow_id is not None + client = NotteClient(api_key=api_key, server_url=server_url) + workflow_obj = client.Workflow(workflow_id=metadata.workflow_id, _client=client) + + # Load variables if provided + variables_dict: dict[str, Any] = {} + if variables: + if not variables.exists(): + raise typer.BadParameter(f"Variables file not found: {variables}") + variables_dict = json.loads(variables.read_text(encoding="utf-8")) + + # Helper function to run a single iteration + def run_iteration(iteration_num: int) -> dict[str, Any]: + """Run a single benchmark iteration.""" + iteration_start = time.time() + workflow_id: str | None = None + try: + if local: + # Run locally + import subprocess + + result = subprocess.run( + [sys.executable, str(resolved_file)], + check=False, + capture_output=True, + text=True, + ) + iteration_end = time.time() + execution_time = iteration_end - iteration_start + success = result.returncode == 0 + run_id = f"local-{iteration_num}" + status = "closed" if success else "failed" + workflow_id = metadata.workflow_id if metadata else None + else: + # Run on cloud + assert workflow_obj is not None + assert metadata is not None + result = workflow_obj.run( + timeout=None, # Use default timeout per run + stream=False, + raise_on_failure=False, # Don't raise for benchmark + **variables_dict, + ) + iteration_end = time.time() + execution_time = iteration_end - iteration_start + success = result.status == "closed" + run_id = result.workflow_run_id + workflow_id = result.workflow_id # Get workflow_id from response + status = result.status + + return { + "iteration": iteration_num, + "success": success, + "execution_time": execution_time, + "run_id": run_id, + "status": status, + "workflow_id": workflow_id, + } + except Exception as e: + iteration_end = time.time() + execution_time = iteration_end - iteration_start + logger.error(f"\nIteration {iteration_num} failed with exception: {e}") + return { + "iteration": iteration_num, + "success": False, + "execution_time": execution_time, + "run_id": f"error-{iteration_num}", + "status": "failed", + "workflow_id": metadata.workflow_id if metadata else None, + } + + # Benchmark results + results: list[dict[str, Any]] = [] + start_time = time.time() + + # Create progress bar + pbar = tqdm( + total=iterations, + desc="Benchmark progress", + unit="run", + bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]", + ) + + try: + if parallelism == 1: + # Sequential execution (original behavior) + for i in range(iterations): + # Check if we've exceeded the timeout + elapsed = time.time() - start_time + if elapsed >= timeout_seconds: + pbar.close() + logger.info(f"\nTimeout reached ({timeout} min). Stopping benchmark after {i} iterations.") + break + + # Update progress bar description with current iteration + pbar.set_description(f"Running iteration {i + 1}/{iterations}") + result = run_iteration(i + 1) + results.append(result) + + # Update progress bar with result + status_icon = "✅" if result["success"] else "❌" + pbar.set_postfix_str(f"{status_icon} {result['execution_time']:.2f}s") + _ = pbar.update(1) + else: + # Parallel execution + with ThreadPoolExecutor(max_workers=parallelism) as executor: + # Submit all tasks + future_to_iteration = {executor.submit(run_iteration, i + 1): i + 1 for i in range(iterations)} + + # Process completed futures as they finish + for future in as_completed(future_to_iteration): + # Check if we've exceeded the timeout + elapsed = time.time() - start_time + if elapsed >= timeout_seconds: + pbar.close() + logger.info(f"\nTimeout reached ({timeout} min). Cancelling remaining tasks...") + # Cancel remaining futures + for f in future_to_iteration: + _ = f.cancel() + break + + try: + result = future.result() + results.append(result) + + # Update progress bar with result + status_icon = "✅" if result["success"] else "❌" + pbar.set_postfix_str(f"{status_icon} {result['execution_time']:.2f}s") + _ = pbar.update(1) + except Exception as e: + iteration_num = future_to_iteration[future] + logger.error(f"\nIteration {iteration_num} failed with exception: {e}") + results.append( + { + "iteration": iteration_num, + "success": False, + "execution_time": 0.0, + "run_id": f"error-{iteration_num}", + "status": "failed", + "workflow_id": metadata.workflow_id if metadata else None, + } + ) + _ = pbar.update(1) + + # Sort results by iteration number for consistent display + results.sort(key=lambda x: x["iteration"]) + + finally: + pbar.close() + logger.info("") # New line after progress bar + + # Calculate summary statistics + total_runs = len(results) + successful_runs = sum(1 for r in results if r["success"]) + failed_runs = total_runs - successful_runs + success_rate = (successful_runs / total_runs * 100) if total_runs > 0 else 0.0 + + # Calculate average execution time for successful runs, or failed runs if all failed + if successful_runs > 0: + avg_execution_time = sum(r["execution_time"] for r in results if r["success"]) / successful_runs + elif failed_runs > 0: + avg_execution_time = sum(r["execution_time"] for r in results if not r["success"]) / failed_runs + else: + avg_execution_time = 0.0 + + # Use consistent width for all separators + # Table columns: Status (8) + Time (12) + Run ID (40) + Console URL (80) + 3 spaces = 143 chars + separator_width = 143 + separator_double = "=" * separator_width + separator_single = "-" * separator_width + + # Display summary + logger.info("") + logger.info(separator_double) + logger.info("BENCHMARK SUMMARY") + logger.info(separator_double) + logger.info(f"Total runs: {total_runs}") + logger.info(f"Successful: {successful_runs}") + logger.info(f"Failed: {failed_runs}") + logger.info(f"Success rate: {success_rate:.1f}%") + logger.info(f"Average execution time: {avg_execution_time:.2f}s") + logger.info(f"Total benchmark time: {time.time() - start_time:.2f}s") + logger.info(separator_double) + + # Display results table + logger.info("") + logger.info("Detailed Results:") + logger.info(separator_single) + + # Table header + header = f"{'Status':<8} {'Time':<12} {'Run ID':<40} {'Console URL':<80}" + logger.info(header) + logger.info(separator_single) + + for r in results: + status_icon = "✅" if r["success"] else "❌" + execution_time_str = f"{r['execution_time']:.2f}s" + run_id_str = r["run_id"][:38] # Truncate if too long + + # Build console URL using workflow_id and run_id from response + if r["workflow_id"] and not local: + console_url = f"https://console.notte.cc/logs/workflows/{r['workflow_id']}/runs/{r['run_id']}" + else: + console_url = "N/A (local run)" + + row = f"{status_icon:<8} {execution_time_str:<12} {run_id_str:<40} {console_url:<80}" + logger.info(row) + + logger.info(separator_single) + + # Exit with error code if all runs failed + if failed_runs == total_runs and total_runs > 0: + raise typer.Exit(1) diff --git a/packages/notte-sdk/src/notte_sdk/cli/workflow_cli.py b/packages/notte-sdk/src/notte_sdk/cli/workflow_cli.py index 54b02c586..a88b6c697 100644 --- a/packages/notte-sdk/src/notte_sdk/cli/workflow_cli.py +++ b/packages/notte-sdk/src/notte_sdk/cli/workflow_cli.py @@ -21,12 +21,17 @@ def run(): ``` This allows you to run commands like: +- python workflow_file.py workflow create +- python workflow_file.py workflow run --local +- python workflow_file.py workflow update +- python workflow_file.py workflow run --variables variables.json +- python workflow_file.py workflow benchmark --iterations 10 --timeout 20 +- python workflow_file.py workflow benchmark --local --iterations 5 + +Or directly (backward compatible): - python workflow_file.py create - python workflow_file.py run --local - python workflow_file.py update -- python workflow_file.py run --variables variables.json -- python workflow_file.py benchmark --iterations 10 --timeout 20 -- python workflow_file.py benchmark --local --iterations 5 Note: The @workflow decorator is optional. If you don't use it, the CLI will prompt for workflow name and description during creation. @@ -50,8 +55,17 @@ def workflow_cli() -> None: # Only handle CLI if args are present if len(sys.argv) > 1: first_arg = sys.argv[1] - if first_arg in ["create", "update", "run", "benchmark", "--help", "-h"]: + # Check for workflow commands (with or without "workflow" subcommand prefix) + if first_arg in ["workflow", "create", "update", "run", "benchmark", "--help", "-h"]: # Handle CLI and exit + # If first arg is a workflow command (not "workflow"), prepend "workflow" subcommand + if first_arg in ["create", "update", "run", "benchmark"]: + sys.argv.insert(1, "workflow") + # After inserting "workflow", auto-detect file path if not provided + # Check if a file path is already provided (after "workflow" and the command) + if len(sys.argv) <= 3: # Only ["script.py", "workflow", "command"] - no file path yet + # Insert the current script path as the file argument + sys.argv.insert(3, sys.argv[0]) # Note: file_path is auto-detected from sys.argv by typer, so we don't need to pass it main() sys.exit(0)