diff --git a/pyproject.toml b/pyproject.toml index ee5d15d..345176d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "tws-sdk" -version = "0.3.0" +version = "0.4.0" description = "TWS client for Python." authors = ["Fireline Science "] homepage = "https://github.com/Fireline-Science/tws-py" diff --git a/tests/test_async_client.py b/tests/test_async_client.py index 9dd874a..0a13628 100644 --- a/tests/test_async_client.py +++ b/tests/test_async_client.py @@ -132,7 +132,15 @@ async def test_run_workflow_with_valid_tags(mock_request, mock_rpc, good_async_c # Mock successful completion mock_request.return_value = [ - {"status": "COMPLETED", "result": {"output": "success"}} + { + "id": "instance-123", + "workflow_definition_id": "workflow-id", + "created_at": "2024-01-01T00:00:00Z", + "updated_at": "2024-01-01T00:01:00Z", + "status": "COMPLETED", + "result": {"output": "success"}, + "instance_num": 1, + } ] valid_tags = {"userId": "someUserId", "lessonId": "someLessonId"} @@ -151,7 +159,9 @@ async def test_run_workflow_with_valid_tags(mock_request, mock_rpc, good_async_c "tags": valid_tags, }, ) - assert result == {"output": "success"} + assert result["result"] == {"output": "success"} + assert result["status"] == "COMPLETED" + assert result["id"] == "instance-123" @patch("tws._async.client.AsyncClient._upload_file") @@ -196,7 +206,7 @@ async def test_run_workflow_with_files( }, }, ) - assert result == {"output": "success with file"} + assert result["result"] == {"output": "success with file"} @patch("tws._async.client.AsyncClient._make_rpc_request") @@ -241,12 +251,24 @@ async def test_run_workflow_success(mock_request, mock_rpc, good_async_client): # Mock successful completion mock_request.return_value = [ - {"status": "COMPLETED", "result": {"output": "success"}} + { + "id": "instance-123", + "workflow_definition_id": "workflow-id", + "created_at": "2024-01-01T00:00:00Z", + "updated_at": "2024-01-01T00:01:00Z", + "status": "COMPLETED", + "result": {"output": "success"}, + "instance_num": 1, + } ] async with good_async_client: result = await good_async_client.run_workflow("workflow-id", {"arg": "value"}) - assert result == {"output": "success"} + assert result["result"] == {"output": "success"} + assert result["status"] == "COMPLETED" + assert result["id"] == "instance-123" + assert result["workflow_definition_id"] == "workflow-id" + assert result["instance_num"] == 1 @patch("tws._async.client.AsyncClient._make_rpc_request") @@ -273,7 +295,7 @@ async def test_run_workflow_success_after_polling( # Verify sleep was called once with retry_delay mock_sleep.assert_called_once_with(1) - assert result == {"output": "success after poll"} + assert result["result"] == {"output": "success after poll"} @patch("tws._async.client.AsyncClient._make_rpc_request") @@ -548,7 +570,7 @@ async def test_run_workflow_with_multiple_files( }, }, ) - assert result == {"output": "success with multiple files"} + assert result["result"] == {"output": "success with multiple files"} @patch("tws._async.client.AsyncClient._upload_file") @@ -595,4 +617,222 @@ async def test_run_workflow_with_files_and_tags( "tags": tags, }, ) - assert result == {"output": "success with file and tags"} + assert result["result"] == {"output": "success with file and tags"} + + +# Tests for rerun_workflow_with_step_slug + + +@patch("tws._async.client.AsyncClient._make_rpc_request") +@patch("tws._async.client.AsyncClient._make_request") +async def test_rerun_workflow_with_step_slug_success( + mock_request, mock_rpc, good_async_client +): + # Mock successful rerun start + mock_rpc.return_value = {"new_workflow_instance_id": "new-instance-456"} + + # Mock successful completion + mock_request.return_value = [ + { + "id": "new-instance-456", + "workflow_definition_id": "workflow-def-123", + "created_at": "2024-01-01T00:00:00Z", + "updated_at": "2024-01-01T00:01:00Z", + "status": "COMPLETED", + "result": {"output": "rerun success"}, + "instance_num": 2, + } + ] + + async with good_async_client: + result = await good_async_client.rerun_workflow_with_step_slug( + "instance-123", "step-slug-name" + ) + + mock_rpc.assert_called_once_with( + "rerun_workflow_instance", + { + "workflow_instance_id": "instance-123", + "start_from_slug_name": "step-slug-name", + }, + ) + assert result["result"] == {"output": "rerun success"} + assert result["id"] == "new-instance-456" + + +@patch("tws._async.client.AsyncClient._make_rpc_request") +@patch("tws._async.client.AsyncClient._make_request") +async def test_rerun_workflow_with_step_slug_with_overrides( + mock_request, mock_rpc, good_async_client +): + mock_rpc.return_value = {"new_workflow_instance_id": "new-instance-456"} + mock_request.return_value = [ + {"status": "COMPLETED", "result": {"output": "rerun with overrides"}} + ] + + step_overrides = {"step-slug": {"key": "new_value"}} + + async with good_async_client: + await good_async_client.rerun_workflow_with_step_slug( + "instance-123", + "step-slug-name", + step_state_overrides=step_overrides, + ) + + mock_rpc.assert_called_once_with( + "rerun_workflow_instance", + { + "workflow_instance_id": "instance-123", + "start_from_slug_name": "step-slug-name", + "step_state_overrides": step_overrides, + }, + ) + + +# Tests for rerun_workflow_with_step_id + + +@patch("tws._async.client.AsyncClient._make_rpc_request") +@patch("tws._async.client.AsyncClient._make_request") +async def test_rerun_workflow_with_step_id_success( + mock_request, mock_rpc, good_async_client +): + mock_rpc.return_value = {"new_workflow_instance_id": "new-instance-456"} + mock_request.return_value = [ + { + "id": "new-instance-456", + "workflow_definition_id": "workflow-def-123", + "created_at": "2024-01-01T00:00:00Z", + "updated_at": "2024-01-01T00:01:00Z", + "status": "COMPLETED", + "result": {"output": "rerun by id success"}, + "instance_num": 2, + } + ] + + async with good_async_client: + result = await good_async_client.rerun_workflow_with_step_id( + "instance-123", "step-id-456" + ) + + mock_rpc.assert_called_once_with( + "rerun_workflow_instance", + { + "workflow_instance_id": "instance-123", + "start_from_workflow_step_id": "step-id-456", + }, + ) + assert result["result"] == {"output": "rerun by id success"} + + +# Tests for rerun workflow error handling + + +@patch("tws._async.client.AsyncClient._make_rpc_request") +async def test_rerun_workflow_permission_denied(mock_rpc, good_async_client): + mock_request = Request("POST", "http://example.com") + mock_response = Response(400, request=mock_request) + mock_response._content = b'{"code": "DNIED", "message": "Permission denied"}' + + mock_rpc.side_effect = HTTPStatusError( + "400 Bad Request", request=mock_request, response=mock_response + ) + + with pytest.raises(ClientException) as exc_info: + async with good_async_client: + await good_async_client.rerun_workflow_with_step_slug( + "instance-123", "step-slug" + ) + assert "Permission denied to rerun this workflow instance" in str(exc_info.value) + + +@patch("tws._async.client.AsyncClient._make_rpc_request") +async def test_rerun_workflow_not_found(mock_rpc, good_async_client): + mock_request = Request("POST", "http://example.com") + mock_response = Response(400, request=mock_request) + mock_response._content = ( + b'{"code": "OTHER", "message": "Workflow instance not found"}' + ) + + mock_rpc.side_effect = HTTPStatusError( + "400 Bad Request", request=mock_request, response=mock_response + ) + + with pytest.raises(ClientException) as exc_info: + async with good_async_client: + await good_async_client.rerun_workflow_with_step_id( + "instance-123", "step-id" + ) + assert "Workflow instance or step not found" in str(exc_info.value) + + +@patch("tws._async.client.AsyncClient._make_rpc_request") +async def test_rerun_workflow_already_running(mock_rpc, good_async_client): + mock_request = Request("POST", "http://example.com") + mock_response = Response(400, request=mock_request) + mock_response._content = ( + b'{"code": "OTHER", "message": "Workflow is currently running"}' + ) + + mock_rpc.side_effect = HTTPStatusError( + "400 Bad Request", request=mock_request, response=mock_response + ) + + with pytest.raises(ClientException) as exc_info: + async with good_async_client: + await good_async_client.rerun_workflow_with_step_slug( + "instance-123", "step-slug" + ) + assert "Cannot rerun a workflow that is currently running or pending" in str( + exc_info.value + ) + + +@patch("tws._async.client.AsyncClient._make_rpc_request") +@patch("tws._async.client.AsyncClient._make_request") +@patch("time.time") +async def test_rerun_workflow_timeout( + mock_time, mock_request, mock_rpc, good_async_client +): + mock_rpc.return_value = {"new_workflow_instance_id": "new-instance-456"} + mock_request.return_value = [{"status": "RUNNING", "result": None}] + mock_time.side_effect = [0, 601] + + with pytest.raises(ClientException) as exc_info: + async with good_async_client: + await good_async_client.rerun_workflow_with_step_slug( + "instance-123", "step-slug", timeout=600 + ) + assert "Workflow execution timed out after 600 seconds" in str(exc_info.value) + + +@patch("tws._async.client.AsyncClient._make_rpc_request") +@patch("tws._async.client.AsyncClient._make_request") +async def test_rerun_workflow_instance_not_found_during_poll( + mock_request, mock_rpc, good_async_client +): + mock_rpc.return_value = {"new_workflow_instance_id": "new-instance-456"} + mock_request.return_value = [] + + with pytest.raises(ClientException) as exc_info: + async with good_async_client: + await good_async_client.rerun_workflow_with_step_slug( + "instance-123", "step-slug" + ) + assert "Workflow instance new-instance-456 not found" in str(exc_info.value) + + +@patch("tws._async.client.AsyncClient._make_rpc_request") +@patch("tws._async.client.AsyncClient._make_request") +async def test_rerun_workflow_failure(mock_request, mock_rpc, good_async_client): + mock_rpc.return_value = {"new_workflow_instance_id": "new-instance-456"} + mock_request.return_value = [ + {"status": "FAILED", "result": {"error": "workflow failed"}} + ] + + with pytest.raises(ClientException) as exc_info: + async with good_async_client: + await good_async_client.rerun_workflow_with_step_slug( + "instance-123", "step-slug" + ) + assert "Workflow execution failed" in str(exc_info.value) diff --git a/tests/test_sync_client.py b/tests/test_sync_client.py index 9461a28..7af5ed2 100644 --- a/tests/test_sync_client.py +++ b/tests/test_sync_client.py @@ -94,7 +94,15 @@ def test_run_workflow_with_valid_tags(mock_request, mock_rpc, good_client): # Mock successful completion mock_request.return_value = [ - {"status": "COMPLETED", "result": {"output": "success"}} + { + "id": "instance-123", + "workflow_definition_id": "workflow-id", + "created_at": "2024-01-01T00:00:00Z", + "updated_at": "2024-01-01T00:01:00Z", + "status": "COMPLETED", + "result": {"output": "success"}, + "instance_num": 1, + } ] valid_tags = {"userId": "someUserId", "lessonId": "someLessonId"} @@ -113,7 +121,9 @@ def test_run_workflow_with_valid_tags(mock_request, mock_rpc, good_client): "tags": valid_tags, }, ) - assert result == {"output": "success"} + assert result["result"] == {"output": "success"} + assert result["status"] == "COMPLETED" + assert result["id"] == "instance-123" @patch("tws._sync.client.SyncClient._make_rpc_request") @@ -156,12 +166,24 @@ def test_run_workflow_success(mock_request, mock_rpc, good_client): # Mock successful completion mock_request.return_value = [ - {"status": "COMPLETED", "result": {"output": "success"}} + { + "id": "instance-123", + "workflow_definition_id": "workflow-id", + "created_at": "2024-01-01T00:00:00Z", + "updated_at": "2024-01-01T00:01:00Z", + "status": "COMPLETED", + "result": {"output": "success"}, + "instance_num": 1, + } ] with good_client: result = good_client.run_workflow("workflow-id", {"arg": "value"}) - assert result == {"output": "success"} + assert result["result"] == {"output": "success"} + assert result["status"] == "COMPLETED" + assert result["id"] == "instance-123" + assert result["workflow_definition_id"] == "workflow-id" + assert result["instance_num"] == 1 @patch("tws._sync.client.SyncClient._make_rpc_request") @@ -188,7 +210,7 @@ def test_run_workflow_success_after_polling( # Verify sleep was called once with retry_delay mock_sleep.assert_called_once_with(1) - assert result == {"output": "success after poll"} + assert result["result"] == {"output": "success after poll"} @patch("tws._sync.client.SyncClient._make_rpc_request") @@ -387,7 +409,7 @@ def test_run_workflow_with_file_upload( workflow_args = mock_rpc.call_args[0][1]["request_body"] assert "file_arg" in workflow_args assert workflow_args["file_arg"] == "test-user-123/timestamp-test_file.txt" - assert result == {"output": "success"} + assert result["result"] == {"output": "success"} @patch("os.path.exists") @@ -456,3 +478,203 @@ def test_run_workflow_with_file_upload_error(mock_time, mock_exists, good_client ) assert "File not found: /path/to/nonexistent/file.txt" in str(exc_info.value) + + +# Tests for rerun_workflow_with_step_slug + + +@patch("tws._sync.client.SyncClient._make_rpc_request") +@patch("tws._sync.client.SyncClient._make_request") +def test_rerun_workflow_with_step_slug_success(mock_request, mock_rpc, good_client): + # Mock successful rerun start + mock_rpc.return_value = {"new_workflow_instance_id": "new-instance-456"} + + # Mock successful completion + mock_request.return_value = [ + { + "id": "new-instance-456", + "workflow_definition_id": "workflow-def-123", + "created_at": "2024-01-01T00:00:00Z", + "updated_at": "2024-01-01T00:01:00Z", + "status": "COMPLETED", + "result": {"output": "rerun success"}, + "instance_num": 2, + } + ] + + with good_client: + result = good_client.rerun_workflow_with_step_slug( + "instance-123", "step-slug-name" + ) + + mock_rpc.assert_called_once_with( + "rerun_workflow_instance", + { + "workflow_instance_id": "instance-123", + "start_from_slug_name": "step-slug-name", + }, + ) + assert result["result"] == {"output": "rerun success"} + assert result["id"] == "new-instance-456" + + +@patch("tws._sync.client.SyncClient._make_rpc_request") +@patch("tws._sync.client.SyncClient._make_request") +def test_rerun_workflow_with_step_slug_with_overrides( + mock_request, mock_rpc, good_client +): + mock_rpc.return_value = {"new_workflow_instance_id": "new-instance-456"} + mock_request.return_value = [ + {"status": "COMPLETED", "result": {"output": "rerun with overrides"}} + ] + + step_overrides = {"step-slug": {"key": "new_value"}} + + with good_client: + good_client.rerun_workflow_with_step_slug( + "instance-123", + "step-slug-name", + step_state_overrides=step_overrides, + ) + + mock_rpc.assert_called_once_with( + "rerun_workflow_instance", + { + "workflow_instance_id": "instance-123", + "start_from_slug_name": "step-slug-name", + "step_state_overrides": step_overrides, + }, + ) + + +# Tests for rerun_workflow_with_step_id + + +@patch("tws._sync.client.SyncClient._make_rpc_request") +@patch("tws._sync.client.SyncClient._make_request") +def test_rerun_workflow_with_step_id_success(mock_request, mock_rpc, good_client): + mock_rpc.return_value = {"new_workflow_instance_id": "new-instance-456"} + mock_request.return_value = [ + { + "id": "new-instance-456", + "workflow_definition_id": "workflow-def-123", + "created_at": "2024-01-01T00:00:00Z", + "updated_at": "2024-01-01T00:01:00Z", + "status": "COMPLETED", + "result": {"output": "rerun by id success"}, + "instance_num": 2, + } + ] + + with good_client: + result = good_client.rerun_workflow_with_step_id("instance-123", "step-id-456") + + mock_rpc.assert_called_once_with( + "rerun_workflow_instance", + { + "workflow_instance_id": "instance-123", + "start_from_workflow_step_id": "step-id-456", + }, + ) + assert result["result"] == {"output": "rerun by id success"} + + +# Tests for rerun workflow error handling + + +@patch("tws._sync.client.SyncClient._make_rpc_request") +def test_rerun_workflow_permission_denied(mock_rpc, good_client): + mock_request = Request("POST", "http://example.com") + mock_response = Response(400, request=mock_request) + mock_response._content = b'{"code": "DNIED", "message": "Permission denied"}' + + mock_rpc.side_effect = HTTPStatusError( + "400 Bad Request", request=mock_request, response=mock_response + ) + + with pytest.raises(ClientException) as exc_info: + with good_client: + good_client.rerun_workflow_with_step_slug("instance-123", "step-slug") + assert "Permission denied to rerun this workflow instance" in str(exc_info.value) + + +@patch("tws._sync.client.SyncClient._make_rpc_request") +def test_rerun_workflow_not_found(mock_rpc, good_client): + mock_request = Request("POST", "http://example.com") + mock_response = Response(400, request=mock_request) + mock_response._content = ( + b'{"code": "OTHER", "message": "Workflow instance not found"}' + ) + + mock_rpc.side_effect = HTTPStatusError( + "400 Bad Request", request=mock_request, response=mock_response + ) + + with pytest.raises(ClientException) as exc_info: + with good_client: + good_client.rerun_workflow_with_step_id("instance-123", "step-id") + assert "Workflow instance or step not found" in str(exc_info.value) + + +@patch("tws._sync.client.SyncClient._make_rpc_request") +def test_rerun_workflow_already_running(mock_rpc, good_client): + mock_request = Request("POST", "http://example.com") + mock_response = Response(400, request=mock_request) + mock_response._content = ( + b'{"code": "OTHER", "message": "Workflow is currently running"}' + ) + + mock_rpc.side_effect = HTTPStatusError( + "400 Bad Request", request=mock_request, response=mock_response + ) + + with pytest.raises(ClientException) as exc_info: + with good_client: + good_client.rerun_workflow_with_step_slug("instance-123", "step-slug") + assert "Cannot rerun a workflow that is currently running or pending" in str( + exc_info.value + ) + + +@patch("tws._sync.client.SyncClient._make_rpc_request") +@patch("tws._sync.client.SyncClient._make_request") +@patch("time.time") +def test_rerun_workflow_timeout(mock_time, mock_request, mock_rpc, good_client): + mock_rpc.return_value = {"new_workflow_instance_id": "new-instance-456"} + mock_request.return_value = [{"status": "RUNNING", "result": None}] + mock_time.side_effect = [0, 601] + + with pytest.raises(ClientException) as exc_info: + with good_client: + good_client.rerun_workflow_with_step_slug( + "instance-123", "step-slug", timeout=600 + ) + assert "Workflow execution timed out after 600 seconds" in str(exc_info.value) + + +@patch("tws._sync.client.SyncClient._make_rpc_request") +@patch("tws._sync.client.SyncClient._make_request") +def test_rerun_workflow_instance_not_found_during_poll( + mock_request, mock_rpc, good_client +): + mock_rpc.return_value = {"new_workflow_instance_id": "new-instance-456"} + mock_request.return_value = [] + + with pytest.raises(ClientException) as exc_info: + with good_client: + good_client.rerun_workflow_with_step_slug("instance-123", "step-slug") + assert "Workflow instance new-instance-456 not found" in str(exc_info.value) + + +@patch("tws._sync.client.SyncClient._make_rpc_request") +@patch("tws._sync.client.SyncClient._make_request") +def test_rerun_workflow_failure(mock_request, mock_rpc, good_client): + mock_rpc.return_value = {"new_workflow_instance_id": "new-instance-456"} + mock_request.return_value = [ + {"status": "FAILED", "result": {"error": "workflow failed"}} + ] + + with pytest.raises(ClientException) as exc_info: + with good_client: + good_client.rerun_workflow_with_step_slug("instance-123", "step-slug") + assert "Workflow execution failed" in str(exc_info.value) diff --git a/tws/_async/client.py b/tws/_async/client.py index fbed8d4..a31fdab 100644 --- a/tws/_async/client.py +++ b/tws/_async/client.py @@ -2,7 +2,7 @@ import mimetypes import os import time -from typing import cast, Dict, Optional +from typing import cast, Dict, Optional, Any import aiofiles import httpx @@ -218,7 +218,10 @@ async def run_workflow( while True: self._check_timeout(start_time, timeout) - params = {"select": "status,result", "id": f"eq.{workflow_instance_id}"} + params = { + "select": "id,workflow_definition_id,created_at,updated_at,status,result,instance_num", + "id": f"eq.{workflow_instance_id}", + } result = await self._make_request( "GET", "workflow_instances", params=params ) @@ -231,6 +234,123 @@ async def run_workflow( instance = result[0] workflow_result = self._handle_workflow_status(instance) if workflow_result is not None: - return workflow_result + return self._normalize_workflow_result(instance, workflow_result) + + await asyncio.sleep(retry_delay) + + async def _rerun_workflow( + self, + workflow_instance_id: str, + start_from_workflow_key: str, + start_from_workflow_step_value: str, + timeout: int, + retry_delay: int, + step_state_overrides: Optional[Dict[str, dict]] = None, + ): + """Rerun a workflow instance from a specific step. + + Args: + workflow_instance_id: The unique identifier of the workflow instance to rerun + start_from_workflow_key: The workflow step key to start from ("start_from_slug_name" or "start_from_workflow_step_id") + start_from_workflow_step_value: The workflow step ID or slug name to start from + step_state_overrides: Optional dictionary mapping step slugs to state overrides + timeout: Maximum time in seconds to wait for workflow completion (1-3600) + retry_delay: Time in seconds between status checks (1-60) + + Returns: + The workflow execution result as a dictionary + + Raises: + ClientException: If the workflow fails, times out, or if invalid parameters are provided + """ + self._validate_workflow_params(timeout, retry_delay) + + payload: dict[str, Any] = { + "workflow_instance_id": workflow_instance_id, + start_from_workflow_key: start_from_workflow_step_value, + } + if step_state_overrides is not None: + payload["step_state_overrides"] = step_state_overrides + + try: + result = await self._make_rpc_request("rerun_workflow_instance", payload) + except httpx.HTTPStatusError as e: + if e.response.status_code == 400: + error_code = e.response.json().get("code") + error_message = e.response.json().get("message", "") + if error_code == "DNIED": + raise ClientException( + "Permission denied to rerun this workflow instance" + ) + elif "not found" in error_message.lower(): + raise ClientException("Workflow instance or step not found") + elif ( + "running" in error_message.lower() + or "pending" in error_message.lower() + ): + raise ClientException( + "Cannot rerun a workflow that is currently running or pending" + ) + raise ClientException(f"HTTP error occurred: {e}") + + new_workflow_instance_id = result["new_workflow_instance_id"] + start_time = time.time() + + while True: + self._check_timeout(start_time, timeout) + + params = { + "select": "id,workflow_definition_id,created_at,updated_at,status,result,instance_num", + "id": f"eq.{new_workflow_instance_id}", + } + result = await self._make_request( + "GET", "workflow_instances", params=params + ) + + if not result: + raise ClientException( + f"Workflow instance {new_workflow_instance_id} not found" + ) + + instance = result[0] + workflow_result = self._handle_workflow_status(instance) + if workflow_result is not None: + return self._normalize_workflow_result(instance, workflow_result) await asyncio.sleep(retry_delay) + + async def rerun_workflow_with_step_slug( + self, + workflow_instance_id: str, + start_from_slug_name: str, + step_state_overrides: Optional[Dict[str, dict]] = None, + timeout=600, + retry_delay=1, + ): + """Rerun a workflow instance from a specific step by slug.""" + return await self._rerun_workflow( + workflow_instance_id, + "start_from_slug_name", + start_from_slug_name, + timeout, + retry_delay, + step_state_overrides, + ) + + async def rerun_workflow_with_step_id( + self, + workflow_instance_id: str, + start_from_workflow_step_id: str, + step_state_overrides: Optional[Dict[str, dict]] = None, + timeout=600, + retry_delay=1, + ): + """Rerun a workflow instance from a specific step by ID.""" + return await self._rerun_workflow( + workflow_instance_id, + "start_from_workflow_step_id", + start_from_workflow_step_id, + timeout, + retry_delay, + step_state_overrides, + ) diff --git a/tws/_sync/client.py b/tws/_sync/client.py index a9e75dc..3aceccb 100644 --- a/tws/_sync/client.py +++ b/tws/_sync/client.py @@ -1,6 +1,6 @@ import os import time -from typing import Dict, cast, Optional +from typing import Any, Dict, cast, Optional import httpx from httpx import Client as SyncHttpClient @@ -203,7 +203,10 @@ def run_workflow( while True: self._check_timeout(start_time, timeout) - params = {"select": "status,result", "id": f"eq.{workflow_instance_id}"} + params = { + "select": "id,workflow_definition_id,created_at,updated_at,status,result,instance_num", + "id": f"eq.{workflow_instance_id}", + } result = self._make_request("GET", "workflow_instances", params=params) if not result: @@ -214,6 +217,121 @@ def run_workflow( instance = result[0] workflow_result = self._handle_workflow_status(instance) if workflow_result is not None: - return workflow_result + return self._normalize_workflow_result(instance, workflow_result) + + time.sleep(retry_delay) + + def _rerun_workflow( + self, + workflow_instance_id: str, + start_from_workflow_key: str, + start_from_workflow_step_value: str, + timeout: int, + retry_delay: int, + step_state_overrides: Optional[Dict[str, dict]] = None, + ): + """Rerun a workflow instance from a specific step. + + Args: + workflow_instance_id: The unique identifier of the workflow instance to rerun + start_from_workflow_key: The workflow step key to start from ("start_from_slug_name" or "start_from_workflow_step_id") + start_from_workflow_step_value: The workflow step ID or slug name to start from + step_state_overrides: Optional dictionary mapping step slugs to state overrides + timeout: Maximum time in seconds to wait for workflow completion (1-3600) + retry_delay: Time in seconds between status checks (1-60) + + Returns: + The workflow execution result as a dictionary + + Raises: + ClientException: If the workflow fails, times out, or if invalid parameters are provided + """ + self._validate_workflow_params(timeout, retry_delay) + + payload: dict[str, Any] = { + "workflow_instance_id": workflow_instance_id, + start_from_workflow_key: start_from_workflow_step_value, + } + if step_state_overrides is not None: + payload["step_state_overrides"] = step_state_overrides + + try: + result = self._make_rpc_request("rerun_workflow_instance", payload) + except httpx.HTTPStatusError as e: + if e.response.status_code == 400: + error_code = e.response.json().get("code") + error_message = e.response.json().get("message", "") + if error_code == "DNIED": + raise ClientException( + "Permission denied to rerun this workflow instance" + ) + elif "not found" in error_message.lower(): + raise ClientException("Workflow instance or step not found") + elif ( + "running" in error_message.lower() + or "pending" in error_message.lower() + ): + raise ClientException( + "Cannot rerun a workflow that is currently running or pending" + ) + raise ClientException(f"HTTP error occurred: {e}") + + new_workflow_instance_id = result["new_workflow_instance_id"] + start_time = time.time() + + while True: + self._check_timeout(start_time, timeout) + + params = { + "select": "id,workflow_definition_id,created_at,updated_at,status,result,instance_num", + "id": f"eq.{new_workflow_instance_id}", + } + result = self._make_request("GET", "workflow_instances", params=params) + + if not result: + raise ClientException( + f"Workflow instance {new_workflow_instance_id} not found" + ) + + instance = result[0] + workflow_result = self._handle_workflow_status(instance) + if workflow_result is not None: + return self._normalize_workflow_result(instance, workflow_result) time.sleep(retry_delay) + + def rerun_workflow_with_step_slug( + self, + workflow_instance_id: str, + start_from_slug_name: str, + step_state_overrides: Optional[Dict[str, dict]] = None, + timeout=600, + retry_delay=1, + ): + """Rerun a workflow instance from a specific step by slug.""" + return self._rerun_workflow( + workflow_instance_id, + "start_from_slug_name", + start_from_slug_name, + timeout, + retry_delay, + step_state_overrides, + ) + + def rerun_workflow_with_step_id( + self, + workflow_instance_id: str, + start_from_workflow_step_id: str, + step_state_overrides: Optional[Dict[str, dict]] = None, + timeout=600, + retry_delay=1, + ): + """Rerun a workflow instance from a specific step by ID.""" + return self._rerun_workflow( + workflow_instance_id, + "start_from_workflow_step_id", + start_from_workflow_step_id, + timeout, + retry_delay, + step_state_overrides, + ) diff --git a/tws/base/client.py b/tws/base/client.py index e982dde..c57811f 100644 --- a/tws/base/client.py +++ b/tws/base/client.py @@ -97,6 +97,27 @@ def _check_timeout(start_time: float, timeout: Union[int, float]) -> None: f"Workflow execution timed out after {timeout} seconds" ) + @staticmethod + def _normalize_workflow_result(instance: dict, workflow_result: dict) -> dict: + """Normalize workflow instance data into a standard response format. + + Args: + instance: The workflow instance data from the API + workflow_result: The processed workflow result + + Returns: + A normalized dictionary with standard workflow response fields + """ + return { + "id": instance.get("id"), + "workflow_definition_id": instance.get("workflow_definition_id"), + "created_at": instance.get("created_at"), + "updated_at": instance.get("updated_at"), + "result": workflow_result, + "status": instance.get("status"), + "instance_num": instance.get("instance_num"), + } + @staticmethod def _validate_tags(tags: Optional[Dict[str, str]]) -> None: if tags is not None: @@ -172,3 +193,55 @@ def run_workflow( ClientException: If the workflow fails, times out, or if invalid parameters are provided """ pass + + @abstractmethod + def rerun_workflow_with_step_id( + self, + workflow_instance_id: str, + start_from_workflow_step_id: str, + step_state_overrides: Optional[Dict[str, dict]] = None, + timeout=600, + retry_delay=1, + ) -> Union[dict, Coroutine[Any, Any, dict]]: + """Rerun a workflow instance from a specific step by ID. + + Args: + workflow_instance_id: The unique identifier of the workflow instance to rerun + start_from_workflow_step_id: The workflow step ID to start from + step_state_overrides: Optional dictionary mapping step slugs to state overrides + timeout: Maximum time in seconds to wait for workflow completion (1-3600) + retry_delay: Time in seconds between status checks (1-60) + + Returns: + The workflow execution result as a dictionary + + Raises: + ClientException: If the workflow fails, times out, or if invalid parameters are provided + """ + pass + + @abstractmethod + def rerun_workflow_with_step_slug( + self, + workflow_instance_id: str, + start_from_slug_name: str, + step_state_overrides: Optional[Dict[str, dict]] = None, + timeout=600, + retry_delay=1, + ) -> Union[dict, Coroutine[Any, Any, dict]]: + """Rerun a workflow instance from a specific step by slug. + + Args: + workflow_instance_id: The unique identifier of the workflow instance to rerun + start_from_slug_name: The workflow slug name to start from + step_state_overrides: Optional dictionary mapping step slugs to state overrides + timeout: Maximum time in seconds to wait for workflow completion (1-3600) + retry_delay: Time in seconds between status checks (1-60) + + Returns: + The workflow execution result as a dictionary + + Raises: + ClientException: If the workflow fails, times out, or if invalid parameters are provided + """ + pass