From 0670a2213363003c72e25a82d91027584fda5228 Mon Sep 17 00:00:00 2001 From: Yusuf Ali Date: Sun, 2 Feb 2025 11:03:21 -0500 Subject: [PATCH] fix(worker): refactor input processor to garuntee exit --- servc/svc/com/worker/__init__.py | 134 +++++++++++++------------------ servc/svc/com/worker/methods.py | 54 +++++++++++++ servc/svc/config/__init__.py | 12 ++- 3 files changed, 120 insertions(+), 80 deletions(-) create mode 100644 servc/svc/com/worker/methods.py diff --git a/servc/svc/com/worker/__init__.py b/servc/svc/com/worker/__init__.py index eb9d335..ed89432 100644 --- a/servc/svc/com/worker/__init__.py +++ b/servc/svc/com/worker/__init__.py @@ -4,6 +4,7 @@ from servc.svc.com.bus import BusComponent, OnConsuming from servc.svc.com.cache import CacheComponent from servc.svc.com.worker.hooks import evaluate_post_hooks, evaluate_pre_hooks +from servc.svc.com.worker.methods import evaluate_exit, get_artifact from servc.svc.com.worker.types import RESOLVER, RESOLVER_CONTEXT, RESOLVER_MAPPING from servc.svc.config import Config from servc.svc.io.input import InputType @@ -105,7 +106,7 @@ def connect(self): def run_resolver( self, method: RESOLVER, context: RESOLVER_CONTEXT, args: Tuple[str, Any] - ) -> Tuple[StatusCode, ResponseArtifact | None]: + ) -> Tuple[StatusCode, ResponseArtifact | None, Any | None]: id, payload = args statuscode: StatusCode = StatusCode.OK response: ResponseArtifact | None = None @@ -132,20 +133,10 @@ def run_resolver( statuscode = StatusCode.SERVER_ERROR response = getErrorArtifact(id, str(e), StatusCode.SERVER_ERROR) - if self._config.get(f"conf.{self.name}.exiton5xx") and statuscode.value >= 500: - print("Exiting due to 5xx error", error, flush=True) - exit(1) - if ( - self._config.get(f"conf.{self.name}.exiton4xx") - and statuscode.value >= 400 - and statuscode.value < 500 - ): - print("Exiting due to 4xx error", error, flush=True) - exit(1) - - return statuscode, response + return statuscode, response, error def inputProcessor(self, message: Any) -> StatusCode: + workerConfig = self._config.get(f"conf.{self.name}") bus = self._busClass( self._config.get(f"conf.{self._bus.name}"), ) @@ -157,6 +148,10 @@ def inputProcessor(self, message: Any) -> StatusCode: "config": self._config, } + status_code: StatusCode = StatusCode.OK + response: ResponseArtifact | None = None + error: Any | None = None + if "type" not in message or "route" not in message: return StatusCode.INVALID_INPUTS @@ -166,85 +161,66 @@ def inputProcessor(self, message: Any) -> StatusCode: or "details" not in message or "instanceId" not in message ): - return StatusCode.INVALID_INPUTS + status_code = StatusCode.INVALID_INPUTS + response = getErrorArtifact( + message["id"] if "id" in message else "", + "Invalid input type for event. event, details or instanceId not specified", + StatusCode.INVALID_INPUTS, + ) if message["event"] not in self._eventResolvers: return StatusCode.METHOD_NOT_FOUND - status_code, response = self.run_resolver( + status_code, response, error = self.run_resolver( self._eventResolvers[message["event"]], context, ("", {**message}), ) - return status_code - - if message["type"] in [InputType.INPUT.value, InputType.INPUT]: - if "id" not in message: - return StatusCode.INVALID_INPUTS - if "argumentId" not in message: - cache.setKey( - message["id"], - getErrorArtifact( - message["id"], - "Invalid input type. Id and argumentId not specified", - StatusCode.INVALID_INPUTS, - ), + elif message["type"] in [InputType.INPUT.value, InputType.INPUT]: + if "id" not in message or "argumentId" not in message: + status_code = StatusCode.INVALID_INPUTS + response = getErrorArtifact( + message["id"] if "id" in message else "", + "Invalid input type. Id and argumentId not specified", + StatusCode.INVALID_INPUTS, ) - return StatusCode.INVALID_INPUTS + status_code = StatusCode.INVALID_INPUTS if "instanceId" in message and message["instanceId"] != bus.instanceId: return StatusCode.NO_PROCESSING - if message["argumentId"] in ["raw", "plain"] and message["inputs"]: - artifact = message["argument"] + # get the artifact from the message + artifact = get_artifact(message, cache) + if isinstance(artifact, tuple): + status_code, response = artifact else: - artifact = cache.getKey(message["argumentId"]) - if artifact is None or "method" not in artifact or "inputs" not in artifact: - cache.setKey( - message["id"], - getErrorArtifact( - message["id"], - "Invalid argument. Need to specify method and inputs in payload", - StatusCode.USER_ERROR, - ), - ) - return StatusCode.USER_ERROR - if artifact["method"] not in self._resolvers: - cache.setKey( - message["id"], - getErrorArtifact( + if artifact["method"] not in self._resolvers: + status_code = StatusCode.METHOD_NOT_FOUND + response = getErrorArtifact( message["id"], "Method not found", StatusCode.METHOD_NOT_FOUND - ), - ) - if self._config.get(f"conf.{self.name}.exiton4xx"): - print("Exiting due to 4xx error:", "Method not found", flush=True) - exit(1) - return StatusCode.METHOD_NOT_FOUND - - continueExecution = evaluate_pre_hooks( - self._resolvers, - message, - artifact, - context, - ) - if not continueExecution: - return StatusCode.OK - - statusCode, response = self.run_resolver( - self._resolvers[artifact["method"]], - context, - (message["id"], artifact["inputs"]), - ) - if statusCode == StatusCode.NO_PROCESSING: - return StatusCode.NO_PROCESSING + ) + else: + continueExecution = evaluate_pre_hooks( + self._resolvers, + message, + artifact, + context, + ) + if not continueExecution: + return StatusCode.OK + + status_code, response, error = self.run_resolver( + self._resolvers[artifact["method"]], + context, + (message["id"], artifact["inputs"]), + ) + if status_code == StatusCode.NO_PROCESSING: + return StatusCode.NO_PROCESSING + + evaluate_exit( + message, response, cache, status_code, workerConfig, error + ) + evaluate_post_hooks(bus, cache, message, artifact) + + evaluate_exit(message, response, cache, status_code, workerConfig, error) - cache.setKey(message["id"], response) - evaluate_post_hooks(bus, cache, message, artifact) - return statusCode - - cache.setKey( - message["id"], - getErrorArtifact( - message["id"], "Invalid input type", StatusCode.INVALID_INPUTS - ), - ) return StatusCode.INVALID_INPUTS diff --git a/servc/svc/com/worker/methods.py b/servc/svc/com/worker/methods.py new file mode 100644 index 0000000..857892f --- /dev/null +++ b/servc/svc/com/worker/methods.py @@ -0,0 +1,54 @@ +from typing import Any, Tuple + +from servc.svc.com.cache import CacheComponent +from servc.svc.config import Config +from servc.svc.io.input import ArgumentArtifact, InputPayload +from servc.svc.io.output import ResponseArtifact, StatusCode +from servc.svc.io.response import getErrorArtifact + + +def evaluate_exit( + message: InputPayload, + response: ResponseArtifact | None, + cache: CacheComponent, + statusCode: StatusCode, + config: Config, + error: Any | None, +): + if config.get("exiton5xx") and statusCode.value >= 500: + print("Exiting due to 5xx error: ", error, flush=True) + exit(1) + if config.get("exiton4xx") and statusCode.value >= 400 and statusCode.value < 500: + print("Exiting due to 4xx error: ", error, flush=True) + exit(1) + + # allow specific exit to an error code + error_str: str = str(statusCode.value) + if config.get(f"exiton{error_str}"): + print(f"Exiting due to {error_str} error: ", error, flush=True) + exit(1) + + if response is not None and "id" in message and message["id"]: + cache.setKey(message["id"], response) + + +def get_artifact( + message: InputPayload, cache: CacheComponent +) -> ArgumentArtifact | Tuple[StatusCode, ResponseArtifact]: + artifact = ( + cache.getKey(message["argumentId"]) + if message["argumentId"] not in ["raw", "plain"] + else message["argument"] + ) + + if artifact is None or "method" not in artifact or "inputs" not in artifact: + return ( + StatusCode.USER_ERROR, + getErrorArtifact( + message["id"], + "Invalid argument. Need to specify method and inputs in payload", + StatusCode.USER_ERROR, + ), + ) + + return artifact diff --git a/servc/svc/config/__init__.py b/servc/svc/config/__init__.py index 27c6133..f291e53 100644 --- a/servc/svc/config/__init__.py +++ b/servc/svc/config/__init__.py @@ -24,7 +24,17 @@ BOOLEAN_CONFIGS = os.getenv( "SERVC_BOOLEAN_CONFIGS", - "conf.worker.exiton4xx,conf.worker.exiton5xx,conf.worker.bindtoeventexchange", + ",".join( + [ + "conf.worker.exiton400", + "conf.worker.exiton404", + "conf.worker.exiton401", + "conf.worker.exiton422", + "conf.worker.exiton4xx", + "conf.worker.exiton5xx", + "conf.worker.bindtoeventexchange", + ] + ), ).split(",") DOT_MARKER = os.getenv("SERVC_DOT_MARKER", "_DOT_") DASH_MARKER = os.getenv("SERVC_DASH_MARKER", "_DASH_")