diff --git a/makefile b/makefile index 50994004a..83af13a5c 100644 --- a/makefile +++ b/makefile @@ -13,17 +13,17 @@ test-cicd: .PHONY: test-sdk test-sdk: - uv run pytest -n logical tests/sdk - uv run pytest -n logical tests/integration/sdk + uv run pytest -n 4 tests/sdk + uv run pytest -n 4 tests/integration/sdk .PHONY: test-docs test-docs: - uv run pytest -n logical tests/docs + uv run pytest -n 4 tests/docs .PHONY: test-agent test-agent: - uv run pytest -n logical tests/agent - uv run pytest -n logical tests/integration/sdk/test_vault.py + uv run pytest -n 4 tests/agent + uv run pytest -n 4 tests/integration/sdk/test_vault.py .PHONY: test-sdk-staging test-sdk-staging: diff --git a/packages/notte-core/src/notte_core/ast.py b/packages/notte-core/src/notte_core/ast.py index 6001ea586..289286b31 100644 --- a/packages/notte-core/src/notte_core/ast.py +++ b/packages/notte-core/src/notte_core/ast.py @@ -71,6 +71,7 @@ class ScriptValidator(RestrictingNodeTransformer): "notte_sdk", "notte_agent", "notte_core", + "notte_llm", # Safe third-party "pydantic", # Data validation library "loguru", # Logging library @@ -80,6 +81,7 @@ class ScriptValidator(RestrictingNodeTransformer): "gspread", "google", "litellm", + "tqdm", # Safe standard library modules - data processing and utilities "types", "json", # JSON parsing diff --git a/packages/notte-core/src/notte_core/browser/observation.py b/packages/notte-core/src/notte_core/browser/observation.py index 215e32811..3161137d2 100644 --- a/packages/notte-core/src/notte_core/browser/observation.py +++ b/packages/notte-core/src/notte_core/browser/observation.py @@ -248,4 +248,4 @@ def validate_exception(cls, v: Any) -> NotteBaseError | Exception | None: def model_post_init(self, context: Any, /) -> None: if self.success: if self.exception is not None: - raise ValueError("Exception should be None if success is True") + raise ValueError(f"Exception should be None if success is True: {self.exception}") diff --git a/packages/notte-sdk/pyproject.toml b/packages/notte-sdk/pyproject.toml index 0aa2a3b2e..dbc2abd77 100644 --- a/packages/notte-sdk/pyproject.toml +++ b/packages/notte-sdk/pyproject.toml @@ -15,6 +15,8 @@ dependencies = [ "halo>=0.0.28", "notte-core==1.4.4.dev", "websockets>=13.1", + "typer>=0.9.0", + "tqdm>=4.66.0", ] [project.optional-dependencies] @@ -22,6 +24,9 @@ playwright = [ "playwright~=1.55", ] +[project.scripts] +notte = "notte_sdk.cli:main" + [build-system] requires = ["hatchling"] build-backend = "hatchling.build" diff --git a/packages/notte-sdk/src/notte_sdk/__init__.py b/packages/notte-sdk/src/notte_sdk/__init__.py index 82d32961e..40d4bba31 100644 --- a/packages/notte-sdk/src/notte_sdk/__init__.py +++ b/packages/notte-sdk/src/notte_sdk/__init__.py @@ -28,7 +28,9 @@ UploadFile, Wait, ) +from notte_sdk.cli.workflow_cli import workflow_cli from notte_sdk.client import NotteClient +from notte_sdk.decorators import workflow from notte_sdk.endpoints.agents import RemoteAgent from notte_sdk.endpoints.sessions import RemoteSession from notte_sdk.errors import retry @@ -42,6 +44,8 @@ "RemoteAgent", "retry", "generate_cookies", + "workflow", + "workflow_cli", "FormFill", "Goto", "GotoNewTab", diff --git a/packages/notte-sdk/src/notte_sdk/cli/README.md b/packages/notte-sdk/src/notte_sdk/cli/README.md new file mode 100644 index 000000000..da3c426de --- /dev/null +++ b/packages/notte-sdk/src/notte_sdk/cli/README.md @@ -0,0 +1,68 @@ +# Notte Workflow CLI + +Manage Notte workflow lifecycle from the command line. + +## Usage + +```bash +notte workflow [--workflow-path FILE] COMMAND [OPTIONS] +``` + +## Commands + +### Create +```bash +notte workflow --workflow-path my_workflow.py create +``` + +### Update +```bash +notte workflow --workflow-path my_workflow.py update +``` + +### Run +```bash +# Run locally +notte workflow --workflow-path my_workflow.py run --local + +# Run on cloud +notte workflow --workflow-path my_workflow.py run --variables vars.json +``` + +### Benchmark +```bash +# Run 10 iterations locally +notte workflow --workflow-path my_workflow.py benchmark --local --iterations 10 + +# Run on cloud with parallelism +notte workflow --workflow-path my_workflow.py benchmark --iterations 50 --parallelism 4 +``` + +## Auto-Detection + +When running from a workflow file, `--workflow-path` is optional: + +```python +# my_workflow.py +from notte_sdk import NotteClient, workflow_cli + +def run(url: str) -> str: + # ... workflow code ... + return result + +if __name__ == "__main__": + workflow_cli() # Enables CLI commands +``` + +```bash +# These work without --workflow-path +python my_workflow.py create +python my_workflow.py run --local +python my_workflow.py benchmark --iterations 10 +``` + +## Environment Variables + +- `NOTTE_API_KEY` - API key (required for cloud operations) +- `NOTTE_API_URL` - API server URL (optional) + diff --git a/packages/notte-sdk/src/notte_sdk/cli/__init__.py b/packages/notte-sdk/src/notte_sdk/cli/__init__.py new file mode 100644 index 000000000..3f6226d8a --- /dev/null +++ b/packages/notte-sdk/src/notte_sdk/cli/__init__.py @@ -0,0 +1,53 @@ +""" +Main CLI module for Notte. + +This module aggregates all CLI subcommands (workflow, session, agent, etc.) +into a single unified CLI interface. + +Usage: + notte workflow create + notte workflow run + notte workflow benchmark +""" + +from __future__ import annotations + +from pathlib import Path + +import typer + +from notte_sdk.cli import workflow + +# Main CLI app +app = typer.Typer( + name="notte", + help="Notte CLI - Manage workflows, sessions, agents, and more", + add_completion=False, + no_args_is_help=True, +) + +# Add workflow subcommand +app.add_typer(workflow.workflow_app, name="workflow") + +# Future subcommands can be added here: +# from notte_sdk.cli import session +# app.add_typer(session.session_app, name="session") +# +# from notte_sdk.cli import agent +# app.add_typer(agent.agent_app, name="agent") + + +def main(_file_path: Path | None = None) -> None: + """ + Main CLI entry point. + + Args: + _file_path: Optional path to workflow file. If None, will be auto-detected from sys.argv. + Currently unused, kept for compatibility with workflow_cli(). + """ + # Run typer app directly - typer handles help, argument parsing, etc. + app() + + +if __name__ == "__main__": + main() diff --git a/packages/notte-sdk/src/notte_sdk/cli/metadata.py b/packages/notte-sdk/src/notte_sdk/cli/metadata.py new file mode 100644 index 000000000..418394af9 --- /dev/null +++ b/packages/notte-sdk/src/notte_sdk/cli/metadata.py @@ -0,0 +1,366 @@ +from __future__ import annotations + +import datetime as dt +import re +import subprocess +from pathlib import Path + +from notte_core.common.logging import logger + + +class WorkflowMetadata: + """Represents workflow metadata stored in a Python file.""" + + workflow_id: str | None + name: str | None + description: str | None + author: str | None + creation_date: str | None + last_update_date: str | None + + def __init__( + self, + workflow_id: str | None = None, + name: str | None = None, + description: str | None = None, + author: str | None = None, + creation_date: str | None = None, + last_update_date: str | None = None, + ): + self.workflow_id = workflow_id + self.name = name + self.description = description + self.author = author + self.creation_date = creation_date + self.last_update_date = last_update_date + + @classmethod + def from_file(cls, file_path: Path) -> WorkflowMetadata | None: + """ + Parse metadata from a Python file. + + Args: + file_path: Path to the Python file. + + Returns: + WorkflowMetadata if found, None otherwise. + """ + try: + content = file_path.read_text(encoding="utf-8") + return cls.from_string(content) + except Exception as e: + logger.debug(f"Failed to read metadata from {file_path}: {e}") + return None + + @classmethod + def from_string(cls, content: str) -> WorkflowMetadata | None: + """ + Parse metadata from file content string. + + Args: + content: The file content. + + Returns: + WorkflowMetadata if found, None otherwise. + """ + # Look for the metadata block - match from start of file + lines = content.split("\n") + metadata_start = None + for i, line in enumerate(lines): + if line.strip() == "#!/notte/workflow": + metadata_start = i + break + + if metadata_start is None: + return None + + metadata = cls() + + # Parse lines after the metadata marker + for i in range(metadata_start + 1, len(lines)): + line = lines[i].strip() + + # Stop at first non-comment, non-empty line after metadata block + if line and not line.startswith("#"): + # Check if we've seen any metadata fields + if metadata.workflow_id or metadata.name: + break + continue + + # Skip empty comment lines + if line in ("#", ""): + continue + + # Parse field lines: # key: value + if line.startswith("#"): + # Remove leading # + content_line = line[1:].strip() + if ":" in content_line: + key, value = content_line.split(":", 1) + key = key.strip() + value = value.strip() + + if key == "workflow_id": + metadata.workflow_id = value + elif key == "name": + metadata.name = value + elif key == "description": + metadata.description = value + elif key == "author": + metadata.author = value + elif key == "creation_date": + metadata.creation_date = value + elif key == "last_update_date": + metadata.last_update_date = value + + return metadata if metadata.workflow_id or metadata.name else None + + def to_block(self) -> str: + """ + Convert metadata to a metadata block string. + + Returns: + The metadata block as a string. + """ + lines = ["#!/notte/workflow", "#"] + if self.workflow_id: + lines.append(f"# workflow_id: {self.workflow_id}") + if self.name: + lines.append(f"# name: {self.name}") + if self.description: + lines.append(f"# description: {self.description}") + if self.author: + lines.append(f"# author: {self.author}") + if self.creation_date: + lines.append(f"# creation_date: {self.creation_date}") + if self.last_update_date: + lines.append(f"# last_update_date: {self.last_update_date}") + return "\n".join(lines) + "\n" + + def update_from_api(self, workflow_id: str, name: str | None = None) -> None: + """ + Update metadata from API response. + + Args: + workflow_id: The workflow ID from the API. + name: Optional workflow name from the API. + """ + self.workflow_id = workflow_id + if name: + self.name = name + now = dt.datetime.now(dt.timezone.utc).isoformat() + if not self.creation_date: + self.creation_date = now + self.last_update_date = now + + +def get_git_author(file_path: Path | None = None) -> str | None: + """ + Get git author information for the current repository. + + Args: + file_path: Optional path to a file in the git repository. + + Returns: + Author string in format "Name " or None if not available. + """ + try: + # Try to get author from git config + result = subprocess.run( + ["git", "config", "user.name"], + capture_output=True, + text=True, + check=True, + timeout=5, + ) + name = result.stdout.strip() + + result = subprocess.run( + ["git", "config", "user.email"], + capture_output=True, + text=True, + check=True, + timeout=5, + ) + email = result.stdout.strip() + + if name and email: + return f"{name} <{email}>" + elif name: + return name + except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired): + pass + + # Fallback: try to get from git log if file_path is provided + if file_path: + try: + result = subprocess.run( + ["git", "log", "-1", "--format=%an <%ae>", "--", str(file_path)], + capture_output=True, + text=True, + check=True, + timeout=5, + ) + author = result.stdout.strip() + if author: + return author + except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired): + pass + + return None + + +def insert_metadata_block(content: str, metadata: WorkflowMetadata) -> str: + """ + Insert or update metadata block in file content. + + Args: + content: The file content. + metadata: The metadata to insert. + + Returns: + The content with metadata block inserted/updated. + """ + metadata_block = metadata.to_block() + + # Check if metadata block already exists + existing_metadata = WorkflowMetadata.from_string(content) + if existing_metadata: + # Replace existing metadata block + pattern = r"#!/notte/workflow\s*\n(?:#\s*\n)?(?:#\s*(.+?)\s*\n)*" + content = re.sub(pattern, metadata_block + "\n", content, flags=re.MULTILINE) + else: + # Insert metadata block at the beginning + # If there's a shebang, insert after it, otherwise at the start + shebang_pattern = r"^#![^\n]+\n" + shebang_match = re.match(shebang_pattern, content) + + if shebang_match: + # Insert after shebang + shebang = shebang_match.group(0) + rest = content[len(shebang) :] + content = shebang + metadata_block + "\n" + rest + else: + # Insert at the beginning + content = metadata_block + "\n" + content + + return content + + +def comment_main_block(content: str) -> tuple[str, bool]: + """ + Comment out the `if __name__ == "__main__"` block. + + Args: + content: The file content. + + Returns: + Tuple of (modified content, whether block was found and commented). + """ + lines = content.split("\n") + main_start = None + + # Find the line with `if __name__ == "__main__":` + for i, line in enumerate(lines): + if re.match(r'^\s*if\s+__name__\s*==\s*["\']__main__["\']\s*:', line): + main_start = i + break + + if main_start is None: + return content, False + + # Find the end of the block (all indented lines after the if statement) + main_end = main_start + 1 + if main_start < len(lines): + # Get the indentation level of the if statement itself + if_line = lines[main_start] + if_indent = len(if_line) - len(if_line.lstrip()) + + # Find all subsequent lines that are indented more than the if statement + # (i.e., the body of the if block) + for i in range(main_start + 1, len(lines)): + line = lines[i] + if not line.strip(): # Empty line, include it + main_end = i + 1 + continue + line_indent = len(line) - len(line.lstrip()) + # Include lines that are indented more than the if statement + if line_indent > if_indent: + main_end = i + 1 + else: + # Hit a line at same or less indentation - end of block + break + + # Comment out all lines in the block + new_lines = lines.copy() + for i in range(main_start, main_end): + if new_lines[i].strip(): # Only comment non-empty lines + new_lines[i] = "# " + new_lines[i] + + return "\n".join(new_lines), True + + +def uncomment_main_block(content: str) -> tuple[str, bool]: + """ + Uncomment the `if __name__ == "__main__"` block. + + Args: + content: The file content. + + Returns: + Tuple of (modified content, whether block was found and uncommented). + """ + lines = content.split("\n") + main_start = None + + # Find the commented line with `if __name__ == "__main__":` + for i, line in enumerate(lines): + stripped = line.strip() + if stripped.startswith("#") and re.match(r'#\s*if\s+__name__\s*==\s*["\']__main__["\']\s*:', stripped): + main_start = i + break + + if main_start is None: + return content, False + + # Find the end of the commented block + main_end = main_start + 1 + if main_start < len(lines): + # Get the indentation level of the commented if statement + commented_if_line = lines[main_start] + # Uncomment temporarily to get original indentation + uncommented_if = commented_if_line.lstrip("# ").lstrip("#") + if_indent = len(uncommented_if) - len(uncommented_if.lstrip()) + + # Find all subsequent commented lines that are indented more than the if statement + for i in range(main_start + 1, len(lines)): + line = lines[i] + if not line.strip(): # Empty line, include it + main_end = i + 1 + continue + if line.strip().startswith("#"): + # Uncomment temporarily to check indentation + uncommented = line.lstrip("# ").lstrip("#") + if uncommented.strip(): + uncommented_indent = len(uncommented) - len(uncommented.lstrip()) + if uncommented_indent > if_indent: + main_end = i + 1 + else: + break + else: + main_end = i + 1 + else: + # Hit a non-commented line - end of block + break + + # Uncomment all lines in the block + new_lines = lines.copy() + for i in range(main_start, main_end): + if new_lines[i].strip().startswith("#"): + # Remove leading "# " or "#" + if new_lines[i].startswith("# "): + new_lines[i] = new_lines[i][2:] + elif new_lines[i].startswith("#"): + new_lines[i] = new_lines[i][1:] + + return "\n".join(new_lines), True diff --git a/packages/notte-sdk/src/notte_sdk/cli/workflow.py b/packages/notte-sdk/src/notte_sdk/cli/workflow.py new file mode 100644 index 000000000..bf01feecd --- /dev/null +++ b/packages/notte-sdk/src/notte_sdk/cli/workflow.py @@ -0,0 +1,721 @@ +from __future__ import annotations + +import contextlib +import importlib.util +import json +import sys +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path +from typing import Annotated, Any, Callable + +import typer +from notte_core.common.logging import logger +from tqdm import tqdm + +from notte_sdk.cli.metadata import ( + WorkflowMetadata, + comment_main_block, + get_git_author, + insert_metadata_block, + uncomment_main_block, +) +from notte_sdk.client import NotteClient +from notte_sdk.decorators import get_workflow_description, get_workflow_name, is_workflow + +# Create workflow subcommand group +workflow_app = typer.Typer( + name="workflow", + help="Manage workflow lifecycle (create, update, run, benchmark)", + add_completion=False, + no_args_is_help=True, +) + + +@workflow_app.callback(invoke_without_command=True) +def workflow_callback( + ctx: typer.Context, + workflow_path: Annotated[ + Path | None, + typer.Option( + "--workflow-path", + help="Path to the workflow Python file (required unless running from workflow file)", + ), + ] = None, +) -> None: + """Workflow management commands.""" + # Store workflow_path in context for subcommands to access + _ = ctx.ensure_object(dict) # type: ignore[assignment] + ctx.obj["workflow_path"] = workflow_path + + # If no command provided, show help + if ctx.invoked_subcommand is None: + raise typer.Exit() + + +def get_workflow_path_from_context(ctx: typer.Context, workflow_path: Path | None) -> Path: + """ + Get workflow_path from context (parent) or parameter (subcommand), with fallback to auto-detection. + + Priority: + 1. Subcommand --workflow-path parameter (highest priority) + 2. Parent (workflow) --workflow-path option + 3. Auto-detection from sys.argv[0] when running from workflow file + + Raises: + typer.BadParameter: If no path provided and auto-detection fails + """ + # Priority 1: subcommand parameter + if workflow_path is not None: + return workflow_path.resolve() + + # Priority 2: parent context (from callback) + if ctx.parent and isinstance(ctx.parent.obj, dict): + parent_obj: dict[str, Any] = ctx.parent.obj # type: ignore[assignment] + parent_path = parent_obj.get("workflow_path") + if parent_path is not None and isinstance(parent_path, Path): + return parent_path.resolve() + + # Priority 3: auto-detection + auto_detected = get_default_file_path() + if auto_detected is not None: + return auto_detected + + raise typer.BadParameter("File path is required. Provide it using --workflow-path option.") + + +# Common argument definitions +def get_default_file_path() -> Path | None: + """Get the default file path from sys.argv[0] if running from a workflow file.""" + # Check for workflow subcommand followed by a command + # sys.argv format: ['script.py', 'workflow', 'create', ...] or ['script.py', 'create', ...] + if len(sys.argv) > 1: + # Handle both "workflow create" and direct "create" (for backward compatibility) + cmd_index = 1 + if sys.argv[1] == "workflow" and len(sys.argv) > 2: + cmd_index = 2 + if sys.argv[cmd_index] in ["create", "update", "run", "benchmark"]: + return Path(sys.argv[0]).resolve() + return None + + +API_KEY_ARG = Annotated[ + str | None, + typer.Option( + "--api-key", + help="Notte API key (defaults to NOTTE_API_KEY environment variable)", + envvar="NOTTE_API_KEY", + ), +] + +SERVER_URL_ARG = Annotated[ + str | None, + typer.Option( + "--server-url", + help="Notte API server URL (defaults to NOTTE_API_URL environment variable)", + envvar="NOTTE_API_URL", + ), +] + + +def find_workflow_function(module: Any) -> tuple[Any, str] | None: + """Find the workflow function in a module. + + First tries to find a function with @workflow decorator. + If not found, tries to find a function named 'run' or the only function in the module. + """ + # First, try to find a function with @workflow decorator + for name in dir(module): + obj = getattr(module, name) + if callable(obj) and is_workflow(obj): + return obj, name + + # If no decorated function found, try to find 'run' function + if hasattr(module, "run"): + obj = getattr(module, "run") + if callable(obj) and not obj.__name__.startswith("_"): # Skip private functions + return obj, "run" + + # Last resort: if there's only one callable function, use it + callable_functions: list[tuple[Callable[..., Any], str]] = [] + for name in dir(module): + obj = getattr(module, name) + if ( + callable(obj) + and not name.startswith("_") + and not isinstance(obj, type) # Skip classes + and not isinstance(obj, type(__builtins__)) # Skip builtins + ): + # Skip common non-workflow functions + if name not in ["main", "app", "cli"]: + callable_functions.append((obj, name)) + + if len(callable_functions) == 1: + return callable_functions[0] + + return None + + +def load_workflow_file(file_path: Path) -> tuple[Any, str, Any]: + """ + Load a workflow file and find the workflow function. + + Sets __name__ to prevent execution of if __name__ == "__main__" blocks. + """ + spec = importlib.util.spec_from_file_location("workflow_module", file_path) + if spec is None or spec.loader is None: + raise typer.BadParameter(f"Could not load module from {file_path}") + + module = importlib.util.module_from_spec(spec) + # Set __name__ to module name (not "__main__") to prevent __main__ block execution + module.__name__ = spec.name + spec.loader.exec_module(module) + + result = find_workflow_function(module) + if result is None: + raise typer.BadParameter( + f"No workflow function found in {file_path}. " + + "Either decorate a function with @workflow, name it 'run', or ensure there's only one function in the file." + ) + + func, func_name = result + return module, func_name, func + + +@contextlib.contextmanager +def workflow_file_for_upload(file_path: Path): + """ + Context manager for preparing workflow file for upload. + + Comments out __main__ block, yields temp file, then restores original. + """ + content = file_path.read_text(encoding="utf-8") + content, was_commented = comment_main_block(content) + if was_commented: + logger.debug("Commented out __main__ block for upload") + + temp_file = file_path.parent / f".{file_path.stem}_temp{file_path.suffix}" + _ = temp_file.write_text(content, encoding="utf-8") + + try: + yield temp_file + finally: + # Clean up temp file + if temp_file.exists(): + temp_file.unlink() + # Restore __main__ block if it was commented + if was_commented: + content = file_path.read_text(encoding="utf-8") + content, _ = uncomment_main_block(content) + _ = file_path.write_text(content, encoding="utf-8") + + +def get_workflow_metadata(file_path: Path, require_id: bool = False) -> WorkflowMetadata: + """Get workflow metadata from file, raising typer errors if invalid.""" + metadata = WorkflowMetadata.from_file(file_path) + if not metadata: + raise typer.BadParameter("No workflow metadata found. Run 'create' command first.") + if require_id: + if not metadata.workflow_id: + raise typer.BadParameter("No workflow ID found. Run 'create' command first.") + # Type narrowing: at this point workflow_id is guaranteed to be str + assert metadata.workflow_id is not None + return metadata + + +def update_metadata_in_file(file_path: Path, metadata: WorkflowMetadata) -> None: + """Update metadata block in workflow file.""" + content = file_path.read_text(encoding="utf-8") + content = insert_metadata_block(content, metadata) + _ = file_path.write_text(content, encoding="utf-8") + + +@workflow_app.command() +def create( + ctx: typer.Context, + workflow_path: Annotated[ + Path | None, + typer.Option( + "--workflow-path", + help="Path to the workflow Python file (required unless running from workflow file)", + ), + ] = None, + api_key: API_KEY_ARG = None, + server_url: SERVER_URL_ARG = None, +) -> None: + """Create a new workflow.""" + if not api_key: + raise typer.BadParameter("NOTTE_API_KEY not found. Set it in environment or use --api-key flag.") + + resolved_file = get_workflow_path_from_context(ctx, workflow_path) + logger.info(f"Creating workflow from {resolved_file}") + + # Load the workflow function + _module, _func_name, func = load_workflow_file(resolved_file) + + # Get workflow metadata from decorator (if present) + name = get_workflow_name(func) + description = get_workflow_description(func) + + # If no decorator found, prompt interactively + if not name: + # Suggest a default name based on file name + default_name = resolved_file.stem.replace("_", " ").title() + + logger.info("No @workflow decorator found. Please provide workflow metadata:") + name = typer.prompt("Workflow name", default=default_name) + description = typer.prompt("Workflow description (optional)", default="", show_default=False) + if not description.strip(): + description = None + + # Check if metadata already exists + existing_metadata = WorkflowMetadata.from_file(resolved_file) + if existing_metadata and existing_metadata.workflow_id: + raise typer.BadParameter( + f"Workflow already exists with ID: {existing_metadata.workflow_id}. Use 'update' command to update it." + ) + + with workflow_file_for_upload(resolved_file) as temp_file: + # Create client and workflow + client = NotteClient(api_key=api_key, server_url=server_url) + workflow_obj = client.Workflow(workflow_path=str(temp_file), name=name, description=description, _client=client) + + logger.info(f"Workflow created with ID: {workflow_obj.workflow_id}") + + # Create metadata and insert into file + metadata = WorkflowMetadata( + workflow_id=workflow_obj.workflow_id, + name=name, + description=description, + author=get_git_author(resolved_file), + creation_date=workflow_obj.response.created_at.isoformat(), + last_update_date=workflow_obj.response.updated_at.isoformat(), + ) + + update_metadata_in_file(resolved_file, metadata) + + logger.info(f"Metadata block added to {resolved_file}") + logger.info(f"You can reference this workflow using: notte.Workflow('{workflow_obj.workflow_id}')") + + +@workflow_app.command() +def update( + ctx: typer.Context, + workflow_path: Annotated[ + Path | None, + typer.Option( + "--workflow-path", + help="Path to the workflow Python file (required unless running from workflow file)", + ), + ] = None, + api_key: API_KEY_ARG = None, + server_url: SERVER_URL_ARG = None, + restricted: Annotated[ + bool, typer.Option("--restricted/--no-restricted", help="Run workflow in restricted mode") + ] = True, +) -> None: + """Update an existing workflow.""" + if not api_key: + raise typer.BadParameter("NOTTE_API_KEY not found. Set it in environment or use --api-key flag.") + + resolved_file = get_workflow_path_from_context(ctx, workflow_path) + logger.info(f"Updating workflow from {resolved_file}") + + # Read metadata + metadata = get_workflow_metadata(resolved_file, require_id=True) + # Type narrowing: workflow_id is guaranteed to be str when require_id=True + assert metadata.workflow_id is not None + + with workflow_file_for_upload(resolved_file) as temp_file: + # Update workflow + client = NotteClient(api_key=api_key, server_url=server_url) + workflow_obj = client.Workflow(workflow_id=metadata.workflow_id, _client=client) + workflow_obj.update(workflow_path=str(temp_file), restricted=restricted) + + logger.info(f"Workflow {metadata.workflow_id} updated successfully") + + # Update metadata + metadata.last_update_date = workflow_obj.response.updated_at.isoformat() + update_metadata_in_file(resolved_file, metadata) + + logger.info(f"Metadata updated in {resolved_file}") + + +@workflow_app.command() +def run( + ctx: typer.Context, + workflow_path: Annotated[ + Path | None, + typer.Option( + "--workflow-path", + help="Path to the workflow Python file (required unless running from workflow file)", + ), + ] = None, + api_key: API_KEY_ARG = None, + server_url: SERVER_URL_ARG = None, + local: Annotated[bool, typer.Option("--local", help="Run workflow locally instead of on cloud")] = False, + variables: Annotated[ + Path | None, typer.Option("--variables", help="JSON file containing workflow variables") + ] = None, + timeout: Annotated[int | None, typer.Option("--timeout", help="Timeout in seconds for cloud runs")] = None, + stream: Annotated[ + bool, typer.Option("--stream/--no-stream", help="Enable/disable streaming logs for cloud runs") + ] = True, + raise_on_failure: Annotated[ + bool, typer.Option("--raise-on-failure/--no-raise-on-failure", help="Raise exception on workflow failure") + ] = True, +) -> None: + """Run a workflow.""" + resolved_file = get_workflow_path_from_context(ctx, workflow_path) + + if local: + logger.info(f"Running workflow locally from {resolved_file}") + import subprocess + + result = subprocess.run([sys.executable, str(resolved_file)], check=False) + raise typer.Exit(result.returncode) + + if not api_key: + raise typer.BadParameter("NOTTE_API_KEY not found. Set it in environment or use --api-key flag.") + + logger.info(f"Running workflow on cloud from {resolved_file}") + + # Read metadata + metadata = get_workflow_metadata(resolved_file, require_id=True) + # Type narrowing: workflow_id is guaranteed to be str when require_id=True + assert metadata.workflow_id is not None + + # Load variables if provided + variables_dict: dict[str, Any] = {} + if variables: + if not variables.exists(): + raise typer.BadParameter(f"Variables file not found: {variables}") + variables_dict = json.loads(variables.read_text(encoding="utf-8")) + + # Run workflow + client = NotteClient(api_key=api_key, server_url=server_url) + workflow_obj = client.Workflow(workflow_id=metadata.workflow_id, _client=client) + result = workflow_obj.run( + timeout=timeout, + stream=stream, + raise_on_failure=raise_on_failure, + **variables_dict, + ) + + logger.info(f"Workflow completed with status: {result.status}") + logger.info(f"Result: {result.result}") + + +@workflow_app.command() +def benchmark( + ctx: typer.Context, + workflow_path: Annotated[ + Path | None, + typer.Option( + "--workflow-path", + help="Path to the workflow Python file (required unless running from workflow file)", + ), + ] = None, + api_key: API_KEY_ARG = None, + server_url: SERVER_URL_ARG = None, + local: Annotated[bool, typer.Option("--local", help="Run workflow locally instead of on cloud")] = False, + iterations: Annotated[int, typer.Option("--iterations", help="Maximum number of iterations to run")] = 10, + timeout: Annotated[int, typer.Option("--timeout", help="Timeout in minutes for the entire benchmark")] = 20, + parallelism: Annotated[int, typer.Option("--parallelism", help="Number of parallel runs (default: 1)")] = 1, + variables: Annotated[ + Path | None, typer.Option("--variables", help="JSON file containing workflow variables") + ] = None, +) -> None: + """Run a benchmark test with multiple iterations of the workflow.""" + resolved_file = get_workflow_path_from_context(ctx, workflow_path) + timeout_seconds = timeout * 60 # Convert minutes to seconds + + # Interactive prompts if running without flags + # Check if any benchmark-specific flags were provided + benchmark_flags = ["--local", "--iterations", "--timeout", "--variables", "--parallelism"] + has_flags = any(flag in sys.argv for flag in benchmark_flags) + + if not has_flags: + logger.info("Running benchmark interactively. Press Enter to use defaults.") + logger.info("") + + # Prompt for local vs cloud + while True: + local_input = ( + typer.prompt( + "Run locally or on cloud? [local/cloud]", + default="cloud", + ) + .strip() + .lower() + ) + if local_input in ["local", "cloud"]: + local = local_input == "local" + break + logger.error("Please enter 'local' or 'cloud'") + + # Prompt for iterations + iterations = typer.prompt("Number of iterations", default=10, type=int) + + # Prompt for timeout + timeout = typer.prompt("Timeout in minutes", default=20, type=int) + + # Prompt for parallelism + parallelism = typer.prompt("Parallelism level (number of parallel runs)", default=1, type=int) + if parallelism < 1: + parallelism = 1 + if parallelism > iterations: + parallelism = iterations + logger.warning(f"Parallelism reduced to {iterations} (cannot exceed number of iterations)") + + # Prompt for variables file (optional) + variables_input = typer.prompt( + "Variables file path (optional, press Enter to skip)", + default="", + show_default=False, + ) + if variables_input.strip(): + variables = Path(variables_input.strip()) + if not variables.exists(): + raise typer.BadParameter(f"Variables file not found: {variables}") + else: + variables = None + + logger.info("") + + # Validate parallelism + if parallelism < 1: + parallelism = 1 + if parallelism > iterations: + parallelism = iterations + logger.warning(f"Parallelism reduced to {iterations} (cannot exceed number of iterations)") + + if local: + logger.info( + f"Running benchmark locally from {resolved_file} ({iterations} iterations, {timeout} min timeout, parallelism={parallelism})" + ) + else: + if not api_key: + raise typer.BadParameter("NOTTE_API_KEY not found. Set it in environment or use --api-key flag.") + logger.info( + f"Running benchmark on cloud from {resolved_file} ({iterations} iterations, {timeout} min timeout, parallelism={parallelism})" + ) + + # Read metadata for cloud runs + metadata: WorkflowMetadata | None = None + workflow_obj: Any | None = None + if not local: + metadata = get_workflow_metadata(resolved_file, require_id=True) + assert metadata.workflow_id is not None + client = NotteClient(api_key=api_key, server_url=server_url) + workflow_obj = client.Workflow(workflow_id=metadata.workflow_id, _client=client) + + # Load variables if provided + variables_dict: dict[str, Any] = {} + if variables: + if not variables.exists(): + raise typer.BadParameter(f"Variables file not found: {variables}") + variables_dict = json.loads(variables.read_text(encoding="utf-8")) + + # Helper function to run a single iteration + def run_iteration(iteration_num: int) -> dict[str, Any]: + """Run a single benchmark iteration.""" + iteration_start = time.time() + workflow_id: str | None = None + try: + if local: + # Run locally + import subprocess + + result = subprocess.run( + [sys.executable, str(resolved_file)], + check=False, + capture_output=True, + text=True, + ) + iteration_end = time.time() + execution_time = iteration_end - iteration_start + success = result.returncode == 0 + run_id = f"local-{iteration_num}" + status = "closed" if success else "failed" + workflow_id = metadata.workflow_id if metadata else None + else: + # Run on cloud + assert workflow_obj is not None + assert metadata is not None + result = workflow_obj.run( + timeout=None, # Use default timeout per run + stream=False, + raise_on_failure=False, # Don't raise for benchmark + **variables_dict, + ) + iteration_end = time.time() + execution_time = iteration_end - iteration_start + success = result.status == "closed" + run_id = result.workflow_run_id + workflow_id = result.workflow_id # Get workflow_id from response + status = result.status + + return { + "iteration": iteration_num, + "success": success, + "execution_time": execution_time, + "run_id": run_id, + "status": status, + "workflow_id": workflow_id, + } + except Exception as e: + iteration_end = time.time() + execution_time = iteration_end - iteration_start + logger.error(f"\nIteration {iteration_num} failed with exception: {e}") + return { + "iteration": iteration_num, + "success": False, + "execution_time": execution_time, + "run_id": f"error-{iteration_num}", + "status": "failed", + "workflow_id": metadata.workflow_id if metadata else None, + } + + # Benchmark results + results: list[dict[str, Any]] = [] + start_time = time.time() + + # Create progress bar + pbar = tqdm( + total=iterations, + desc="Benchmark progress", + unit="run", + bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]", + ) + + try: + if parallelism == 1: + # Sequential execution (original behavior) + for i in range(iterations): + # Check if we've exceeded the timeout + elapsed = time.time() - start_time + if elapsed >= timeout_seconds: + pbar.close() + logger.info(f"\nTimeout reached ({timeout} min). Stopping benchmark after {i} iterations.") + break + + # Update progress bar description with current iteration + pbar.set_description(f"Running iteration {i + 1}/{iterations}") + result = run_iteration(i + 1) + results.append(result) + + # Update progress bar with result + status_icon = "✅" if result["success"] else "❌" + pbar.set_postfix_str(f"{status_icon} {result['execution_time']:.2f}s") + _ = pbar.update(1) + else: + # Parallel execution + with ThreadPoolExecutor(max_workers=parallelism) as executor: + # Submit all tasks + future_to_iteration = {executor.submit(run_iteration, i + 1): i + 1 for i in range(iterations)} + + # Process completed futures as they finish + for future in as_completed(future_to_iteration): + # Check if we've exceeded the timeout + elapsed = time.time() - start_time + if elapsed >= timeout_seconds: + pbar.close() + logger.info(f"\nTimeout reached ({timeout} min). Cancelling remaining tasks...") + # Cancel remaining futures + for f in future_to_iteration: + _ = f.cancel() + break + + try: + result = future.result() + results.append(result) + + # Update progress bar with result + status_icon = "✅" if result["success"] else "❌" + pbar.set_postfix_str(f"{status_icon} {result['execution_time']:.2f}s") + _ = pbar.update(1) + except Exception as e: + iteration_num = future_to_iteration[future] + logger.error(f"\nIteration {iteration_num} failed with exception: {e}") + results.append( + { + "iteration": iteration_num, + "success": False, + "execution_time": 0.0, + "run_id": f"error-{iteration_num}", + "status": "failed", + "workflow_id": metadata.workflow_id if metadata else None, + } + ) + _ = pbar.update(1) + + # Sort results by iteration number for consistent display + results.sort(key=lambda x: x["iteration"]) + + finally: + pbar.close() + logger.info("") # New line after progress bar + + # Calculate summary statistics + total_runs = len(results) + successful_runs = sum(1 for r in results if r["success"]) + failed_runs = total_runs - successful_runs + success_rate = (successful_runs / total_runs * 100) if total_runs > 0 else 0.0 + + # Calculate average execution time for successful runs, or failed runs if all failed + if successful_runs > 0: + avg_execution_time = sum(r["execution_time"] for r in results if r["success"]) / successful_runs + elif failed_runs > 0: + avg_execution_time = sum(r["execution_time"] for r in results if not r["success"]) / failed_runs + else: + avg_execution_time = 0.0 + + # Use consistent width for all separators + # Table columns: Status (8) + Time (12) + Run ID (40) + Console URL (80) + 3 spaces = 143 chars + separator_width = 143 + separator_double = "=" * separator_width + separator_single = "-" * separator_width + + # Display summary + logger.info("") + logger.info(separator_double) + logger.info("BENCHMARK SUMMARY") + logger.info(separator_double) + logger.info(f"Total runs: {total_runs}") + logger.info(f"Successful: {successful_runs}") + logger.info(f"Failed: {failed_runs}") + logger.info(f"Success rate: {success_rate:.1f}%") + logger.info(f"Average execution time: {avg_execution_time:.2f}s") + logger.info(f"Total benchmark time: {time.time() - start_time:.2f}s") + logger.info(separator_double) + + # Display results table + logger.info("") + logger.info("Detailed Results:") + logger.info(separator_single) + + # Table header + header = f"{'Status':<8} {'Time':<12} {'Run ID':<40} {'Console URL':<80}" + logger.info(header) + logger.info(separator_single) + + for r in results: + status_icon = "✅" if r["success"] else "❌" + execution_time_str = f"{r['execution_time']:.2f}s" + run_id_str = r["run_id"][:38] # Truncate if too long + + # Build console URL using workflow_id and run_id from response + if r["workflow_id"] and not local: + console_url = f"https://console.notte.cc/logs/workflows/{r['workflow_id']}/runs/{r['run_id']}" + else: + console_url = "N/A (local run)" + + row = f"{status_icon:<8} {execution_time_str:<12} {run_id_str:<40} {console_url:<80}" + logger.info(row) + + logger.info(separator_single) + + # Exit with error code if all runs failed + if failed_runs == total_runs and total_runs > 0: + raise typer.Exit(1) diff --git a/packages/notte-sdk/src/notte_sdk/cli/workflow_cli.py b/packages/notte-sdk/src/notte_sdk/cli/workflow_cli.py new file mode 100644 index 000000000..a88b6c697 --- /dev/null +++ b/packages/notte-sdk/src/notte_sdk/cli/workflow_cli.py @@ -0,0 +1,74 @@ +""" +Helper function for workflow files to enable CLI commands. + +Call this in your `if __name__ == "__main__"` block. If CLI arguments are detected, +it handles them and exits. Otherwise, it returns immediately and your code continues normally. + +Example: +```python +from notte_sdk import NotteClient, workflow, workflow_cli + +notte = NotteClient() + +@workflow(name="My Workflow") +def run(): + ... + +if __name__ == "__main__": + workflow_cli() # Handles CLI or does nothing + # Your custom code continues here if no CLI args + run() +``` + +This allows you to run commands like: +- python workflow_file.py workflow create +- python workflow_file.py workflow run --local +- python workflow_file.py workflow update +- python workflow_file.py workflow run --variables variables.json +- python workflow_file.py workflow benchmark --iterations 10 --timeout 20 +- python workflow_file.py workflow benchmark --local --iterations 5 + +Or directly (backward compatible): +- python workflow_file.py create +- python workflow_file.py run --local +- python workflow_file.py update + +Note: The @workflow decorator is optional. If you don't use it, the CLI will +prompt for workflow name and description during creation. +""" + +from __future__ import annotations + +import sys + +from notte_sdk.cli import main + + +def workflow_cli() -> None: + """ + CLI entry point for workflow files. + + Call this anywhere in your `if __name__ == "__main__"` block. + If CLI arguments are detected, it handles them and exits. + Otherwise, it returns immediately and your code continues normally. + """ + # Only handle CLI if args are present + if len(sys.argv) > 1: + first_arg = sys.argv[1] + # Check for workflow commands (with or without "workflow" subcommand prefix) + if first_arg in ["workflow", "create", "update", "run", "benchmark", "--help", "-h"]: + # Handle CLI and exit + # If first arg is a workflow command (not "workflow"), prepend "workflow" subcommand + if first_arg in ["create", "update", "run", "benchmark"]: + sys.argv.insert(1, "workflow") + # After inserting "workflow", auto-detect file path if not provided + # Check if a file path is already provided (after "workflow" and the command) + if len(sys.argv) <= 3: # Only ["script.py", "workflow", "command"] - no file path yet + # Insert the current script path as the file argument + sys.argv.insert(3, sys.argv[0]) + # Note: file_path is auto-detected from sys.argv by typer, so we don't need to pass it + main() + sys.exit(0) + + # No CLI args - return immediately, let normal execution continue + return diff --git a/packages/notte-sdk/src/notte_sdk/decorators.py b/packages/notte-sdk/src/notte_sdk/decorators.py new file mode 100644 index 000000000..2e1834ed5 --- /dev/null +++ b/packages/notte-sdk/src/notte_sdk/decorators.py @@ -0,0 +1,108 @@ +from __future__ import annotations + +import functools +from typing import TYPE_CHECKING, Any, Callable, ParamSpec, TypeVar + +if TYPE_CHECKING: + pass + +P = ParamSpec("P") +R = TypeVar("R") + + +def workflow(name: str, description: str | None = None) -> Callable[[Callable[P, R]], Callable[P, R]]: + """ + Decorator to mark a function as a Notte workflow. + + This decorator is optional. If present, it stores metadata on the function that can be used + by the CLI to manage the workflow lifecycle (create, update, run). If not present, the CLI + will prompt for workflow name and description during creation. + + Args: + name: The name of the workflow. + description: Optional description of the workflow. + + Example: + ```python + from notte_sdk import workflow + + @workflow(name="My Workflow", description="Does something useful") + def run(url: str, query: str) -> str: + # workflow code here + return "result" + ``` + + Note: + The decorator is optional. You can also create workflows without it - the CLI will + prompt for name and description interactively, or detect the workflow function by + looking for a function named 'run' or the only function in the file. + """ + + def decorator(func: Callable[P, R]) -> Callable[P, R]: + # Store metadata on the function + func.__workflow_name__ = name # type: ignore[attr-defined] + func.__workflow_description__ = description # type: ignore[attr-defined] + func.__is_workflow__ = True # type: ignore[attr-defined] + + # Preserve function signature and behavior + @functools.wraps(func) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + return func(*args, **kwargs) + + # Handle async functions + import inspect + + if inspect.iscoroutinefunction(func): + + @functools.wraps(func) + async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + return await func(*args, **kwargs) + + # Copy metadata to async wrapper + async_wrapper.__workflow_name__ = name # type: ignore[attr-defined] + async_wrapper.__workflow_description__ = description # type: ignore[attr-defined] + async_wrapper.__is_workflow__ = True # type: ignore[attr-defined] + return async_wrapper # type: ignore[return-value] + + return wrapper + + return decorator + + +def is_workflow(func: Callable[..., Any]) -> bool: + """ + Check if a function is decorated with @workflow. + + Args: + func: The function to check. + + Returns: + True if the function is a workflow, False otherwise. + """ + return hasattr(func, "__is_workflow__") and getattr(func, "__is_workflow__", False) + + +def get_workflow_name(func: Callable[..., Any]) -> str | None: + """ + Get the workflow name from a decorated function. + + Args: + func: The workflow function. + + Returns: + The workflow name, or None if not found. + """ + return getattr(func, "__workflow_name__", None) + + +def get_workflow_description(func: Callable[..., Any]) -> str | None: + """ + Get the workflow description from a decorated function. + + Args: + func: The workflow function. + + Returns: + The workflow description, or None if not found. + """ + return getattr(func, "__workflow_description__", None) diff --git a/packages/notte-sdk/src/notte_sdk/endpoints/workflows.py b/packages/notte-sdk/src/notte_sdk/endpoints/workflows.py index 12f1dd471..b233ffdf3 100644 --- a/packages/notte-sdk/src/notte_sdk/endpoints/workflows.py +++ b/packages/notte-sdk/src/notte_sdk/endpoints/workflows.py @@ -520,9 +520,13 @@ def run( elif message["type"] == "session_start": session_id = log_msg - logger.info( - f"Live viewer for session available at: https://api.notte.cc/sessions/viewer/index.html?ws=wss://api.notte.cc/sessions/{session_id}/debug/recording?token={self.token}" + base_url = self.server_url.rstrip("/") + base_ws_url = ( + self.server_url.replace("https://", "wss://").replace("http://", "ws://").rstrip("/") ) + viewer_url = f"{base_url}/sessions/viewer/index.html?ws={base_ws_url}/sessions/{session_id}/debug/recording?token={self.token}" + + logger.info(f"Live viewer for session available at: {viewer_url}") except json.JSONDecodeError: continue diff --git a/uv.lock b/uv.lock index ad8c171e2..f0497509f 100644 --- a/uv.lock +++ b/uv.lock @@ -3115,6 +3115,8 @@ source = { editable = "packages/notte-sdk" } dependencies = [ { name = "halo" }, { name = "notte-core" }, + { name = "tqdm" }, + { name = "typer" }, { name = "websockets" }, ] @@ -3128,6 +3130,8 @@ requires-dist = [ { name = "halo", specifier = ">=0.0.28" }, { name = "notte-core", editable = "packages/notte-core" }, { name = "playwright", marker = "extra == 'playwright'", specifier = "~=1.55" }, + { name = "tqdm", specifier = ">=4.66.0" }, + { name = "typer", specifier = ">=0.9.0" }, { name = "websockets", specifier = ">=13.1" }, ] provides-extras = ["playwright"]