From fbf9a083707a4d7f88ae0eeffc3205db643be645 Mon Sep 17 00:00:00 2001 From: Yusuf Ali Date: Sat, 12 Oct 2024 23:31:08 -0400 Subject: [PATCH] feat: servc compliant BREAKING CHANGE: compliant with servc service spec --- .coveragerc | 3 +- .github/workflows/pypi.yml | 2 + .github/workflows/servc.yml | 2 +- .github/workflows/test.yml | 29 +-- .github/workflows/version.yml | 9 - README.md | 1 + docker-compose.yml | 6 +- main.py | 8 +- requirements.txt | 1 + servc/{com => }/__init__.py | 0 servc/com/bus/__init__.py | 35 ---- servc/com/bus/rabbitmq/__init__.py | 196 ------------------ servc/com/server/http.py | 132 ------------ servc/com/server/server.py | 142 ------------- servc/config/__init__.py | 14 -- servc/io/client/__init__.py | 4 - servc/io/client/get.py | 56 ----- servc/io/idgenerator/__init__.py | 6 - servc/io/idgenerator/simple.py | 14 -- servc/server.py | 113 ++++++++++ servc/{com/service.py => svc/__init__.py} | 38 ++-- servc/svc/client/get.py | 21 ++ servc/{io => svc}/client/poll.py | 6 +- servc/{io => svc}/client/send.py | 44 ++-- servc/{com/server => svc/com}/__init__.py | 0 servc/svc/com/bus/__init__.py | 55 +++++ servc/svc/com/bus/rabbitmq.py | 184 ++++++++++++++++ servc/{ => svc}/com/cache/__init__.py | 16 +- servc/{ => svc}/com/cache/redis.py | 29 +-- servc/svc/com/http/__init__.py | 186 +++++++++++++++++ .../consumer => svc/com/worker}/__init__.py | 95 ++++----- servc/svc/config/__init__.py | 86 ++++++++ servc/svc/idgen/__init__.py | 6 + servc/svc/idgen/simple.py | 8 + servc/{ => svc}/io/__init__.py | 0 servc/{ => svc}/io/input.py | 3 +- servc/{ => svc}/io/output.py | 0 servc/{ => svc}/io/response.py | 4 +- tests/__init__.py | 0 tests/svc/test_config.py | 32 +++ tests/svc/test_idgen.py | 13 ++ tests/svc/test_rabbitmq.py | 104 ++++++++++ tests/svc/test_redis.py | 59 ++++++ tests/test_com_cache_redis.py | 48 ----- tests/test_io_idgenerator.py | 13 -- tests/test_server.py | 150 -------------- 46 files changed, 988 insertions(+), 985 deletions(-) delete mode 100644 .github/workflows/version.yml rename servc/{com => }/__init__.py (100%) delete mode 100644 servc/com/bus/__init__.py delete mode 100644 servc/com/bus/rabbitmq/__init__.py delete mode 100644 servc/com/server/http.py delete mode 100644 servc/com/server/server.py delete mode 100644 servc/config/__init__.py delete mode 100644 servc/io/client/__init__.py delete mode 100644 servc/io/client/get.py delete mode 100644 servc/io/idgenerator/__init__.py delete mode 100644 servc/io/idgenerator/simple.py create mode 100644 servc/server.py rename servc/{com/service.py => svc/__init__.py} (60%) create mode 100644 servc/svc/client/get.py rename servc/{io => svc}/client/poll.py (73%) rename servc/{io => svc}/client/send.py (53%) rename servc/{com/server => svc/com}/__init__.py (100%) create mode 100644 servc/svc/com/bus/__init__.py create mode 100644 servc/svc/com/bus/rabbitmq.py rename servc/{ => svc}/com/cache/__init__.py (68%) rename servc/{ => svc}/com/cache/redis.py (65%) create mode 100644 servc/svc/com/http/__init__.py rename servc/{com/consumer => svc/com/worker}/__init__.py (69%) create mode 100644 servc/svc/config/__init__.py create mode 100644 servc/svc/idgen/__init__.py create mode 100644 servc/svc/idgen/simple.py rename servc/{ => svc}/io/__init__.py (100%) rename servc/{ => svc}/io/input.py (88%) rename servc/{ => svc}/io/output.py (100%) rename servc/{ => svc}/io/response.py (87%) create mode 100644 tests/__init__.py create mode 100644 tests/svc/test_config.py create mode 100644 tests/svc/test_idgen.py create mode 100644 tests/svc/test_rabbitmq.py create mode 100644 tests/svc/test_redis.py delete mode 100644 tests/test_com_cache_redis.py delete mode 100644 tests/test_io_idgenerator.py delete mode 100644 tests/test_server.py diff --git a/.coveragerc b/.coveragerc index 0028c9d..372529f 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,4 +1,3 @@ [run] omit = - tests/* - servc/com/bus/rabbitmq/rpc.py \ No newline at end of file + tests/* \ No newline at end of file diff --git a/.github/workflows/pypi.yml b/.github/workflows/pypi.yml index 46f5855..9b53313 100644 --- a/.github/workflows/pypi.yml +++ b/.github/workflows/pypi.yml @@ -6,6 +6,8 @@ jobs: packagepublish: runs-on: ubuntu-latest steps: + - uses: https://git.yusufali.ca/actions/commit@main + - uses: https://git.yusufali.ca/actions/pythonpip@main with: pypiptoken: ${{ secrets.PYPI_TOKEN }} \ No newline at end of file diff --git a/.github/workflows/servc.yml b/.github/workflows/servc.yml index f98b062..c7ea62c 100644 --- a/.github/workflows/servc.yml +++ b/.github/workflows/servc.yml @@ -1,6 +1,6 @@ name: 'Serv-C Unit Test' on: - - push + - pull_request jobs: servc: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 38ae6d0..0fdd049 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -29,19 +29,6 @@ jobs: ports: - 6379/tcp - postgres: - image: postgres - env: - POSTGRES_PASSWORD: postgres - POSTGRES_DB: postgres - options: >- - --health-cmd pg_isready - --health-interval 10s - --health-timeout 5s - --health-retries 5 - ports: - - 5432/tcp - steps: - if: github.server_url != 'https://github.com' run: sleep 20s @@ -54,14 +41,11 @@ jobs: with: python-version: ${{ matrix.python-version }} - - name: Lint + - name: Type check run: | - pip install isort black flake8 coverage - isort servc - black servc - flake8 --ignore=E501,W503 servc - git status - git diff --exit-code servc + pip install mypy + pip install types-PyYAML types-simplejson + mypy servc - name: Install dependencies run: | @@ -76,10 +60,9 @@ jobs: - name: Run tests env: - DATABASE_URL: postgres://postgres:postgres@postgres/postgres CACHE_URL: redis://redis BUS_URL: amqp://guest:guest@rabbitmq run: | - coverage run --concurrency=multiprocessing -m unittest tests/*.py - coverage combine + pip install coverage + coverage run -m unittest tests/**/*.py coverage report -m --fail-under=60 \ No newline at end of file diff --git a/.github/workflows/version.yml b/.github/workflows/version.yml deleted file mode 100644 index 3997e87..0000000 --- a/.github/workflows/version.yml +++ /dev/null @@ -1,9 +0,0 @@ -name: 'Changelong' -on: - - push - -jobs: - version: - runs-on: ubuntu-latest - steps: - - uses: https://git.yusufali.ca/actions/commit@main \ No newline at end of file diff --git a/README.md b/README.md index edb02aa..3c558eb 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,7 @@ Serv-C implmentation for Python. Documentation can be found https://docs.servc.io +[![PyPI version](https://badge.fury.io/py/servc.svg)](https://badge.fury.io/py/servc) ![Serv-C](https://git.yusufali.ca/serv-c/servc-python/actions/workflows/servc.yml/badge.svg) ## Example diff --git a/docker-compose.yml b/docker-compose.yml index 26f1fe9..5cbcd87 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,14 +8,13 @@ services: command: sleep infinity environment: CACHE_URL: redis://redis:6379/0 - BUS_URL: amqp://rabbitmq:rabbitmq@rabbitmq + BUS_URL: amqp://rabbitmq:rabbitmq@rabbitmq/ links: - rabbitmq - redis redis: image: redis - container_name: redis restart: unless-stopped expose: - "6379" @@ -24,11 +23,10 @@ services: rabbitmq: image: rabbitmq:3-management - container_name: rabbitmq environment: RABBITMQ_DEFAULT_USER: rabbitmq RABBITMQ_DEFAULT_PASS: rabbitmq - RABBITMQ_DEFAULT_VHOST: / + RABBITMQ_DEFAULT_VHOST: / ports: - "5672:5672" - "15672:15672" diff --git a/main.py b/main.py index 93ced1b..0d001bc 100644 --- a/main.py +++ b/main.py @@ -1,10 +1,12 @@ -from servc.com.server.server import start_server +from servc.server import start_server + def main(): return start_server( resolver={}, - eventResolver={}, + # route="test", ) + if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/requirements.txt b/requirements.txt index de13c2a..766f6fa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ pika==1.3.2 redis==5.0.1 simplejson==3.19.2 flask==3.0.1 +pyyaml==6.0.2 \ No newline at end of file diff --git a/servc/com/__init__.py b/servc/__init__.py similarity index 100% rename from servc/com/__init__.py rename to servc/__init__.py diff --git a/servc/com/bus/__init__.py b/servc/com/bus/__init__.py deleted file mode 100644 index 55282ed..0000000 --- a/servc/com/bus/__init__.py +++ /dev/null @@ -1,35 +0,0 @@ -from typing import Any, Callable, Union - -from servc.com.service import ComponentType, ServiceComponent -from servc.io.output import StatusCode - -EmitFunction = Union[Callable[[Any, str, StatusCode | int], None], None] - -InputProcessor = Callable[[Any], StatusCode] - -OnConsuming = Union[Callable[[str], None], None] - - -class BusComponent(ServiceComponent): - _type: ComponentType = ComponentType.BUS - - _url: str - - def __init__(self, url: str): - super().__init__() - - self._url = url - - def publishMessage( - self, route: str, message: Any, emitFunction: EmitFunction = None - ) -> bool: - return True - - def subscribe( - self, - route: str, - inputProcessor: InputProcessor, - emitFunction: EmitFunction = None, - onConsuming: OnConsuming = None, - ) -> bool: - return True diff --git a/servc/com/bus/rabbitmq/__init__.py b/servc/com/bus/rabbitmq/__init__.py deleted file mode 100644 index 1e7a076..0000000 --- a/servc/com/bus/rabbitmq/__init__.py +++ /dev/null @@ -1,196 +0,0 @@ -from __future__ import annotations - -import functools -import json -import os -import threading -from enum import Enum -from typing import Any, Callable, Tuple, TypedDict - -import pika -import simplejson - -from servc.com.bus import BusComponent, EmitFunction, InputProcessor, OnConsuming -from servc.com.cache.redis import decimal_default -from servc.io.input import InputType -from servc.io.output import StatusCode - - -class ExchangeTypes(Enum): - DIRECT = "direct" - FANOUT = "fanout" - - -class DeliveryMethod(TypedDict): - delivery_tag: str - - -PayloadHandler = Callable[ - [ - pika.BlockingConnection, - InputProcessor, - EmitFunction, - Any, - bytes, - DeliveryMethod, - ], - None, -] - - -def reply( - channel, - basic_deliver: DeliveryMethod, - payload: Any, - result: StatusCode, - emitFunction: EmitFunction, -): - if channel and channel.is_open: - if result == StatusCode.NO_PROCESSING: - channel.basic_nack(basic_deliver.delivery_tag) - else: - channel.basic_ack(basic_deliver.delivery_tag) - - if emitFunction: - emitFunction(payload, payload["route"] if "route" in payload else "", result) - - -def payload_non_block( - connection: pika.BlockingConnection, - inputProcessor: InputProcessor, - emitFunction: EmitFunction, - channel: Any, - body: bytes, - basic_deliver: DeliveryMethod, -): - payload = json.loads(body.decode("utf-8")) - result = inputProcessor(payload) - - callback = functools.partial( - reply, channel, basic_deliver, payload, result, emitFunction - ) - connection.add_callback_threadsafe(callback) - - -def consume_non_block( - arg: Tuple[ - pika.BlockingConnection, - InputProcessor, - EmitFunction, - ], - channel: Any, - method: DeliveryMethod, - properties: Any, - body: Any, -): - function_to_handle: PayloadHandler = payload_non_block - (connection, inputProcessor, emitFunction) = arg - thread = threading.Thread( - target=function_to_handle, - args=(connection, inputProcessor, emitFunction, channel, body, method), - ) - thread.start() - - -class BusRabbitMQ(BusComponent): - _url: str - - _conn: pika.BlockingConnection | None = None - - @BusComponent.isReady.getter - def isReady(self) -> bool: - return self._conn is not None and self._conn.is_open - - @BusComponent.isOpen.getter - def isOpen(self) -> bool: - return self.isReady - - def _connect(self): - if not self.isOpen: - params = pika.URLParameters(self._url) - self._conn = pika.BlockingConnection(params) - - def _close(self): - if self.isOpen or self._isReady: - try: - self._channel.stop_consuming() - self._channel.close() - except Exception as e: - print(e, flush=True) - try: - self._conn.close() - except Exception as e: - print(e, flush=True) - self._conn = None - return True - return False - - def get_channel(self): - try: - return self._conn.channel() - except pika.exceptions.StreamLostError as e: - print(str(e), flush=True) - self._conn = None - self._connect() - return self.get_channel() - - def queue_declare(self, channel: Any, queueName: str): - channel.queue_declare( - queue=queueName, durable=True, exclusive=False, auto_delete=False - ) - channel.queue_bind( - exchange=os.environ.get("FANOUT_EXCHANGE", "amq.fanout"), queue=queueName - ) - - def publishMessage( - self, - route: str, - message: Any, - emitFunction: EmitFunction = None, - ) -> bool: - if not self.isReady: - self._connect() - return self.publishMessage(route, message, emitFunction) - - channel = self.get_channel() - exchangeName = ( - os.environ.get("FANOUT_EXCHANGE", "amq.fanout") - if "type" in message - and message["type"] in [InputType.EVENT.value, InputType.EVENT] - else "" - ) - - channel.basic_publish( - exchange=exchangeName, - routing_key=route, - properties=None, - body=simplejson.dumps(message, default=decimal_default, ignore_nan=True), - ) - channel.close() - - if emitFunction: - emitFunction(message, route, 0) - return super().publishMessage(route, message, emitFunction) - - def subscribe( - self, - route: str, - inputProcessor: InputProcessor, - emitFunction: EmitFunction = None, - onConsuming: OnConsuming = None, - ) -> bool: - channel = self.get_channel() - - self.queue_declare(channel, route) - channel.basic_qos(prefetch_count=1) - msg_cb = functools.partial( - consume_non_block, (self._conn, inputProcessor, emitFunction) - ) - channel.basic_consume(queue=route, on_message_callback=msg_cb, auto_ack=False) - self._channel = channel - channel.start_consuming() - - if onConsuming: - onConsuming(route) - - return super().subscribe(route, inputProcessor, emitFunction, onConsuming) diff --git a/servc/com/server/http.py b/servc/com/server/http.py deleted file mode 100644 index ebb8d26..0000000 --- a/servc/com/server/http.py +++ /dev/null @@ -1,132 +0,0 @@ -import os -import uuid -from multiprocessing import Process - -from flask import Flask, jsonify, request - -from servc.com.bus import BusComponent -from servc.com.cache import CacheComponent -from servc.com.service import ComponentType, ServiceComponent -from servc.io.client.send import sendMessage -from servc.io.idgenerator.simple import simpleIDGenerator -from servc.io.input import InputPayload, InputType - - -class HTTPInterface(ServiceComponent): - _type: ComponentType = ComponentType.INTERFACE - - _port: int - - _server: Flask - - _bus: BusComponent - - _cache: CacheComponent - - _consumer: Process - - _route: str - - _instanceId: str - - def __init__( - self, - port: int, - bus: BusComponent, - cache: CacheComponent, - route: str, - instanceId: str, - consumerthread: Process, - ): - super().__init__() - self._port = port - self._server = Flask(__name__) - - self._bus = bus - self._cache = cache - self._children.append(self._bus) - self._children.append(self._cache) - - self._route = route - self._instanceId = instanceId - self._consumer = consumerthread - - def _connect(self): - self.bindRoutes() - self._isOpen = True - self._isReady = True - print("Listening on port", self._port, flush=True) - self._server.run(port=self._port, host="0.0.0.0") - - def _close(self): - self._consumer.terminate() - self._consumer.close() - func = request.environ.get("werkzeug.server.shutdown") - if func is None: - raise RuntimeError("Not running with the Werkzeug Server") - func() - self._isOpen = False - self._isReady = False - return True - - def start(self): - self.connect() - - def _health(self): - consumerAlive = False - try: - consumerAlive = self._consumer.is_alive() - except AssertionError: - pid = self._consumer.pid - try: - os.kill(pid, 0) - except OSError: - consumerAlive = False - else: - consumerAlive = True - print("health check:", self.isReady, consumerAlive) - if self.isReady and consumerAlive: - return "OK" - else: - return "Not OK", 500 - - def _getResponse(self, id: str): - return jsonify(self._cache.getKey(id)) - - def _postMessage(self): - content_type = request.headers["Content-Type"] - if content_type == "application/json": - body = request.json - if body and body["route"] and body["inputs"]: - force = False if "force" not in body else body["force"] - no_id = ("id" not in body) or ("id" in body and body["id"] in ("", "0")) - - value: InputPayload = { - "id": str(uuid.uuid4()) if no_id else body["id"], - "route": body["route"], - "inputs": body["inputs"], - "argumentId": body["argumentId"] if body["argumentId"] else "plain", - "type": InputType.INPUT.value, - } - if "instanceId" in body: - value["instanceId"] = body["instanceId"] - else: - return "bad request", 400 - id = sendMessage( - value, - self._bus, - self._cache, - simpleIDGenerator, - force, - [], - ) - return id - return "Content-Type not supported" - - def bindRoutes(self): - self._server.add_url_rule("/healthz", "healthz", self._health, methods=["GET"]) - self._server.add_url_rule("/readyz", "readyz", self._health, methods=["GET"]) - self._server.add_url_rule( - "/id/", "_getResponse", self._getResponse, methods=["GET"] - ) - self._server.add_url_rule("/", "", self._postMessage, methods=["POST"]) diff --git a/servc/com/server/server.py b/servc/com/server/server.py deleted file mode 100644 index d0629b7..0000000 --- a/servc/com/server/server.py +++ /dev/null @@ -1,142 +0,0 @@ -from multiprocessing import Process -from typing import Any, List - -from servc.com.bus import BusComponent, EmitFunction, OnConsuming -from servc.com.bus.rabbitmq import BusRabbitMQ -from servc.com.cache import CacheComponent -from servc.com.cache.redis import CacheRedis -from servc.com.consumer import RESOLVER_MAPPING, ConsumerComponent -from servc.com.server.http import HTTPInterface -from servc.com.service import ComponentType -from servc.config import bus_url as default_bus_url -from servc.config import cache_url as default_cache_url -from servc.config import instance_id as default_instance_id -from servc.config import port as default_port - -blankEmitFunction: EmitFunction = lambda route, message, code: None -blankOnConsuming: OnConsuming = lambda route: print( - "Consuming on route ", route, flush=True -) - - -def compose_components( - component_list: List[[type[ComponentType], List[Any]]] -) -> List[ComponentType]: - components: List[ComponentType] = [] - for [componentClass, args] in component_list: - components.append(componentClass(*args)) - return components - - -def start_consumer( - route: str, - resolver: RESOLVER_MAPPING, - eventResolver: RESOLVER_MAPPING, - busClass: type[BusComponent], - cacheClass: type[CacheComponent], - consumerClass: type[ConsumerComponent], - instance_id: str, - cache_url: str, - bus_url: str, - emitFunction: EmitFunction, - onConsuming: OnConsuming, - components: List[[type[ComponentType], List[Any]]], -): - bus = busClass(bus_url) - cache = cacheClass(cache_url) - consumer = consumerClass( - route, - instance_id, - resolver, - eventResolver, - emitFunction, - onConsuming, - bus, - cache, - busClass, - [bus_url], - compose_components(components), - ) - consumer.connect() - - -def test_start_http( - route: str, - busClass: type[BusComponent], - cacheClass: type[CacheComponent], - consumerProcess: Process, - httpClass: type[HTTPInterface], - cache_url: str, - bus_url: str, - port: int, - instance_id: str, -): - bus = busClass(bus_url) - cache = cacheClass(cache_url) - http = httpClass(port, bus, cache, route, instance_id, consumerProcess) - http.start() - - -def start_server( - route: str, - resolver: RESOLVER_MAPPING, - eventResolver: RESOLVER_MAPPING = {}, - port: int = default_port, - busClass=BusRabbitMQ, - cacheClass=CacheRedis, - consumerClass=ConsumerComponent, - httpClass=HTTPInterface, - instance_id: str = default_instance_id, - cache_url: str = default_cache_url, - bus_url: str = default_bus_url, - emitFunction: EmitFunction = blankEmitFunction, - onConsuming: OnConsuming = blankOnConsuming, - components: List[[type[ComponentType], List[Any]]] = [], - start=True, - returnProcess=False, -): - consumer = Process( - target=start_consumer, - args=( - route, - resolver, - eventResolver, - busClass, - cacheClass, - consumerClass, - instance_id, - cache_url, - bus_url, - emitFunction, - onConsuming, - components, - ), - ) - consumer.start() - - if returnProcess: - http = Process( - target=test_start_http, - args=( - route, - busClass, - cacheClass, - consumer, - httpClass, - cache_url, - bus_url, - port, - instance_id, - ), - ) - http.start() - return [http, consumer] - - bus = busClass(bus_url) - cache = cacheClass(cache_url) - interface = httpClass(port, bus, cache, route, instance_id, consumer) - - if start: - interface.start() - - return interface diff --git a/servc/config/__init__.py b/servc/config/__init__.py deleted file mode 100644 index 475821d..0000000 --- a/servc/config/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -import os -import socket - -redis_url = os.getenv("REDIS_URL", "redis://localhost:6379") -cache_url = os.getenv("CACHE_URL", redis_url) - -cloudamqp_url = os.getenv("CLOUDAMQP_URL", "amqp://localhost:5672") -bus_url = os.getenv("BUS_URL", cloudamqp_url) - -postgres_url = os.getenv("POSTGRES_URL", "postgresql://localhost:5432") -db_url = os.getenv("DATABASE_URL", postgres_url) - -port = int(os.getenv("PORT", 3000)) -instance_id = os.getenv("INSTANCE_ID", socket.gethostname()) diff --git a/servc/io/client/__init__.py b/servc/io/client/__init__.py deleted file mode 100644 index 066f9da..0000000 --- a/servc/io/client/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -import os - -BULK_JOB_SEPARATOR = os.getenv("SERVC_BULK_JOB_SEPARATOR", "__") -BULK_JOB_DELIMITER = os.getenv("SERVC_BULK_JOB_DELIMITER", "||") diff --git a/servc/io/client/get.py b/servc/io/client/get.py deleted file mode 100644 index 758252b..0000000 --- a/servc/io/client/get.py +++ /dev/null @@ -1,56 +0,0 @@ -from servc.com.cache import CacheComponent -from servc.io.client import BULK_JOB_DELIMITER, BULK_JOB_SEPARATOR -from servc.io.output import ResponseArtifact - - -def get_result(id: str, cache: CacheComponent) -> ResponseArtifact: - if BULK_JOB_DELIMITER and BULK_JOB_SEPARATOR in id: - return get_bulk_result(id, cache) - cacheResult = cache.getKey(id) - if ( - cacheResult - and "progress" in cacheResult - and "responseBody" in cacheResult - and "statusCode" in cacheResult - ): - return cacheResult - - return { - "id": id, - "progress": 0, - "responseBody": "Starting", - "statusCode": 200, - "isError": False, - } - - -def get_bulk_result(bulk_id: str, cache: CacheComponent) -> ResponseArtifact: - results = { - "id": bulk_id, - "progress": 0, - "statusCode": 200, - "isError": False, - "responseBody": {}, - } - completed_jobs = 0 - - key_ids = bulk_id.split(BULK_JOB_DELIMITER) - for key_id in key_ids: - key, id = key_id.split(BULK_JOB_SEPARATOR) - - result = get_result(id, cache) - if result["progress"] == 100: - completed_jobs += 1 - - results["progress"] += result["progress"] / len(key_ids) - results["responseBody"][key] = result["responseBody"] - results["statusCode"] = ( - result["statusCode"] - if result["statusCode"] > results["statusCode"] - else results["statusCode"] - ) - results["isError"] = result["isError"] or results["isError"] - - if completed_jobs == len(key_ids): - results["progress"] = 100 - return results diff --git a/servc/io/idgenerator/__init__.py b/servc/io/idgenerator/__init__.py deleted file mode 100644 index 3672180..0000000 --- a/servc/io/idgenerator/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -from typing import Callable, List - -from servc.com.service import ServiceComponent -from servc.io.input import ArgumentArtifact - -ID_GENERATOR = Callable[[str, List[ServiceComponent], ArgumentArtifact], str] diff --git a/servc/io/idgenerator/simple.py b/servc/io/idgenerator/simple.py deleted file mode 100644 index 4232b52..0000000 --- a/servc/io/idgenerator/simple.py +++ /dev/null @@ -1,14 +0,0 @@ -import json -from hashlib import sha256 -from typing import List - -from servc.com.service import ServiceComponent -from servc.io.input import ArgumentArtifact - - -def simpleIDGenerator( - route: str, _c: List[ServiceComponent], message: ArgumentArtifact -) -> str: - input_string: str = "".join([route, json.dumps(message)]) - input_sha: str = sha256(input_string.encode("utf-8")).hexdigest() - return input_sha diff --git a/servc/server.py b/servc/server.py new file mode 100644 index 0000000..cd4320d --- /dev/null +++ b/servc/server.py @@ -0,0 +1,113 @@ +from multiprocessing import Process +from typing import Any, List, Tuple + +from servc.svc import Middleware +from servc.svc.com.bus import BusComponent, OnConsuming +from servc.svc.com.bus.rabbitmq import BusRabbitMQ +from servc.svc.com.cache import CacheComponent +from servc.svc.com.cache.redis import CacheRedis +from servc.svc.com.http import HTTPInterface +from servc.svc.com.worker import RESOLVER_MAPPING, WorkerComponent +from servc.svc.config import Config + + +def blankOnConsuming(route: str): + print("Consuming on route", route, flush=True) + + +COMPONENT_ARRAY = List[Tuple[type[Middleware], List[Any]]] + + +def compose_components(component_list: COMPONENT_ARRAY) -> List[Middleware]: + components: List[Middleware] = [] + for [componentClass, args] in component_list: + components.append(componentClass(*args)) + return components + + +def start_consumer( + configDictionary: dict, + resolver: RESOLVER_MAPPING, + eventResolver: RESOLVER_MAPPING, + configClass: type[Config], + busClass: type[BusComponent], + cacheClass: type[CacheComponent], + workerClass: type[WorkerComponent], + onConsuming: OnConsuming, + components: COMPONENT_ARRAY, +): + config = configClass() + config.setAll(configDictionary) + bus = busClass( + config.get("conf.bus.url"), + config.get("conf.bus.routemap"), + config.get("conf.bus.prefix"), + ) + cache = cacheClass(config.get("conf.cache.url")) + consumer = workerClass( + config.get("conf.bus.route"), + config.get("conf.instanceid"), + resolver, + eventResolver, + onConsuming, + bus, + cache, + config, + compose_components(components), + ) + consumer.connect() + + +def start_server( + resolver: RESOLVER_MAPPING, + route: str | None = None, + eventResolver: RESOLVER_MAPPING = {}, + configClass=Config, + busClass=BusRabbitMQ, + cacheClass=CacheRedis, + workerClass=WorkerComponent, + httpClass=HTTPInterface, + onConsuming: OnConsuming = blankOnConsuming, + components: COMPONENT_ARRAY = [], + start=True, +): + config = configClass() + if route is not None: + config.setValue("conf.bus.route", route) + + consumer = Process( + target=start_consumer, + args=( + config.getAll(), + resolver, + eventResolver, + configClass, + busClass, + cacheClass, + workerClass, + onConsuming, + components, + ), + ) + consumer.start() + + bus = busClass( + config.get("conf.bus.url"), + config.get("conf.bus.routemap"), + config.get("conf.bus.prefix"), + ) + cache = cacheClass(config.get("conf.cache.url")) + http = httpClass( + int(config.get("conf.http.port")), + bus, + cache, + config.get("conf.bus.route"), + config.get("conf.instanceid"), + consumer, + resolver, + eventResolver, + ) + if start: + http.start() + + return http diff --git a/servc/com/service.py b/servc/svc/__init__.py similarity index 60% rename from servc/com/service.py rename to servc/svc/__init__.py index aae8755..b264d08 100644 --- a/servc/com/service.py +++ b/servc/svc/__init__.py @@ -7,14 +7,16 @@ class ComponentType(Enum): BUS = "bus" CACHE = "cache" - CONSUMER = "consumer" + WORKER = "worker" INTERFACE = "interface" DATABASE = "database" STORAGE = "storage" -class ServiceComponent: - _children: List[ServiceComponent] +class Middleware: + _children: List[Middleware] + + _name: str _isReady: bool @@ -31,13 +33,17 @@ def __init__(self): self._isReady = False self._isOpen = False + @property + def name(self) -> str: + return self._name + @property def isReady(self) -> bool: - isReady = self._isReady + isReadyCheck = self._isReady for child in self._children: - isReady = isReady and child.isReady + isReadyCheck = isReadyCheck and child.isReady - return isReady + return isReadyCheck @property def isOpen(self) -> bool: @@ -47,30 +53,20 @@ def isOpen(self) -> bool: return isOpen - @isReady.setter - def isReady(self, value: bool): - self._isReady = value - - @isOpen.setter - def isOpen(self, value: bool): - self._isOpen = value - @property def type(self) -> ComponentType: return self._type def connect(self): - for child in self._children: - child.connect() return self._connect() def close(self): - for child in self._children: - child.close() return self._close() - def getChild(self, filter: ComponentType) -> ServiceComponent: + def getChild( + self, filter: ComponentType | None = None, name: str | None = None + ) -> Middleware: for child in self._children: - if child.type == filter: + if (filter and child.type == filter) or (name and child.name == name): return child - raise Exception(f"Child of type {filter} not found") + raise Exception("Child of not found") diff --git a/servc/svc/client/get.py b/servc/svc/client/get.py new file mode 100644 index 0000000..7d34904 --- /dev/null +++ b/servc/svc/client/get.py @@ -0,0 +1,21 @@ +from servc.svc.com.cache import CacheComponent +from servc.svc.io.output import ResponseArtifact, StatusCode + + +def get_result(id: str, cache: CacheComponent) -> ResponseArtifact: + cacheResult = cache.getKey(id) + if ( + cacheResult + and "progress" in cacheResult + and "responseBody" in cacheResult + and "statusCode" in cacheResult + ): + return cacheResult + + return { + "id": id, + "progress": 0, + "responseBody": "Starting", + "statusCode": StatusCode.OK.value, + "isError": False, + } diff --git a/servc/io/client/poll.py b/servc/svc/client/poll.py similarity index 73% rename from servc/io/client/poll.py rename to servc/svc/client/poll.py index 818669d..660cae9 100644 --- a/servc/io/client/poll.py +++ b/servc/svc/client/poll.py @@ -1,8 +1,8 @@ import time -from servc.com.cache import CacheComponent -from servc.io.client.get import get_result -from servc.io.output import ResponseArtifact +from servc.svc.client.get import get_result +from servc.svc.com.cache import CacheComponent +from servc.svc.io.output import ResponseArtifact def pollMessage(id: str, cache: CacheComponent, timeout: int = 30) -> ResponseArtifact: diff --git a/servc/io/client/send.py b/servc/svc/client/send.py similarity index 53% rename from servc/io/client/send.py rename to servc/svc/client/send.py index 71a9d60..306529f 100644 --- a/servc/io/client/send.py +++ b/servc/svc/client/send.py @@ -1,11 +1,10 @@ -from typing import Dict, List +from typing import List -from servc.com.bus import BusComponent -from servc.com.cache import CacheComponent -from servc.com.service import ServiceComponent -from servc.io.client import BULK_JOB_DELIMITER, BULK_JOB_SEPARATOR -from servc.io.idgenerator import ID_GENERATOR -from servc.io.input import InputPayload, InputType +from servc.svc import Middleware +from servc.svc.com.bus import BusComponent +from servc.svc.com.cache import CacheComponent +from servc.svc.idgen import ID_GENERATOR +from servc.svc.io.input import InputPayload, InputType def sendMessage( @@ -14,16 +13,16 @@ def sendMessage( cache: CacheComponent, idGenerator: ID_GENERATOR, force: bool = False, - services: List[ServiceComponent] = [], + services: List[Middleware] = [], ) -> str: - if "inputs" not in message: + if "argument" not in message: raise Exception("InputPayload must have inputs") id = ( idGenerator( "-".join(["svc", message["route"]]), [bus, cache, *services], - message["inputs"], + message["argument"], ) if "id" not in message or message["id"] in ["", None] else message["id"] @@ -46,7 +45,7 @@ def sendMessage( "route": message["route"], "argumentId": message["argumentId"] if "argumentId" in message else "", "id": id, - "inputs": message["inputs"], + "argument": message["argument"], } if "instanceId" in message and message["instanceId"]: @@ -57,27 +56,12 @@ def sendMessage( argumentId = idGenerator( "-".join(["arg", message["route"]]), [bus, cache, *services], - message["inputs"], + message["argument"], ) - cache.setKey(argumentId, message["inputs"]) + cache.setKey(argumentId, message["argument"]) inputObject["argumentId"] = argumentId - del inputObject["inputs"] + del inputObject["argument"] - bus.publishMessage(message["route"], inputObject, lambda *x: True) + bus.publishMessage(message["route"], inputObject) return id - - -def sendBulkMessage( - message: Dict[str, InputPayload], - bus: BusComponent, - cache: CacheComponent, - idGenerator: ID_GENERATOR, - force: bool = False, - services: List[ServiceComponent] = [], -) -> str: - ids: List[str] = [] - for key, payload in message.items(): - job_id = sendMessage(payload, bus, cache, idGenerator, force, services) - ids.append(BULK_JOB_SEPARATOR.join([key, job_id])) - return BULK_JOB_DELIMITER.join(ids) diff --git a/servc/com/server/__init__.py b/servc/svc/com/__init__.py similarity index 100% rename from servc/com/server/__init__.py rename to servc/svc/com/__init__.py diff --git a/servc/svc/com/bus/__init__.py b/servc/svc/com/bus/__init__.py new file mode 100644 index 0000000..b1ac4b4 --- /dev/null +++ b/servc/svc/com/bus/__init__.py @@ -0,0 +1,55 @@ +from typing import Any, Callable, Union + +from servc.svc import ComponentType, Middleware +from servc.svc.io.input import EventPayload, InputPayload, InputType +from servc.svc.io.output import StatusCode + +InputProcessor = Callable[..., StatusCode] + +OnConsuming = Union[Callable[[str], None], None] + + +class BusComponent(Middleware): + _type: ComponentType = ComponentType.BUS + + _url: str + + _routeMap: dict + + _prefix: str + + def __init__(self, url: str, routeMap: dict, prefix: str): + super().__init__() + + self._url = url + self._routeMap = routeMap + self._prefix = prefix + + def getRoute(self, route: str) -> str: + if route in self._routeMap: + return "".join([self._prefix, self._routeMap[route]]) + return "".join([self._prefix, route]) + + def publishMessage(self, route: str, message: InputPayload | EventPayload) -> bool: + return True + + def emitEvent(self, event: str, instanceId: str, details: Any) -> bool: + return self.publishMessage( + self.getRoute(event), + { + "type": InputType.EVENT.value, + "route": self.getRoute(event), + "event": event, + "details": details, + "instanceId": instanceId, + }, + ) + + def subscribe( + self, + route: str, + inputProcessor: InputProcessor, + onConsuming: OnConsuming | None, + bindEventExchange: bool = True, + ) -> bool: + return True diff --git a/servc/svc/com/bus/rabbitmq.py b/servc/svc/com/bus/rabbitmq.py new file mode 100644 index 0000000..d2606cb --- /dev/null +++ b/servc/svc/com/bus/rabbitmq.py @@ -0,0 +1,184 @@ +from __future__ import annotations + +import json +from typing import Any, Callable, Tuple + +import pika # type: ignore +import pika.channel # type: ignore +import pika.exceptions # type: ignore +import simplejson +from pika.adapters.blocking_connection import BlockingConnection # type: ignore +from pika.adapters.asyncio_connection import AsyncioConnection # type: ignore + +from servc.svc.com.bus import BusComponent, InputProcessor, OnConsuming +from servc.svc.com.cache.redis import decimal_default +from servc.svc.io.input import EventPayload, InputPayload, InputType +from servc.svc.io.output import StatusCode + +EVENT_EXCHANGE = "amq.fanout" + + +def queue_declare( + channel: pika.channel.Channel, queueName: str, bindEventExchange: bool +): + channel.queue_declare( + queue=queueName, durable=True, exclusive=False, auto_delete=False + ) + if bindEventExchange: + channel.queue_bind(exchange=EVENT_EXCHANGE, queue=queueName) + + +def on_channel_open(channel: pika.channel.Channel, method: Callable, args: Tuple): + method(*args, channel) + + +class BusRabbitMQ(BusComponent): + _url: str + + _conn: AsyncioConnection | BlockingConnection | None = None + + @property + def isReady(self) -> bool: + return ( + self._conn is not None and self._conn.is_open and ( + self.isBlockingConnection() or + not self._conn.is_closing + ) + ) + + @ property + def isOpen(self) -> bool: + return self.isReady + + def isBlockingConnection(self) -> bool: + return isinstance(self._conn, BlockingConnection) + + def _connect(self, method=None | Callable, args=None | Tuple, blocking=True): + if not self.isOpen: + if blocking: + self._conn = BlockingConnection(pika.URLParameters(self._url)) + self.get_channel(method, args) + else: + self._conn = AsyncioConnection( + parameters=pika.URLParameters(self._url), + on_open_callback=lambda _c: self.get_channel(method, args), + on_close_callback=self.on_connection_closed, + ) + + def _close(self): + if self.isOpen or self.isReady: + if self._conn and not self._conn.is_closed and ( + self.isBlockingConnection() or + not self._conn.is_closing + ): + self._conn.close() + self._conn = None + + return True + return False + + def on_connection_closed(self, _conn: AsyncioConnection, reason: pika.exceptions): + if reason == pika.exceptions.StreamLostError: + print(str(reason), flush=True) + self._conn = None + self._connect() + + def get_channel(self, method: Callable | None, args: Tuple | None): + if not self.isReady: + self._connect(method, args) + elif method and args and self._conn: + if self.isBlockingConnection(): + channel = self._conn.channel() + on_channel_open(channel, method, args) + else: + self._conn.channel( + on_open_callback=lambda c: on_channel_open(c, method, args) + ) + + def publishMessage( # type: ignore + self, + route: str, + message: InputPayload | EventPayload, + channel: pika.channel.Channel | None = None, + ) -> bool: + if not self.isReady: + return self._connect(self.publishMessage, (route, message)) + if not channel: + return self.get_channel(self.publishMessage, (route, message)) + + exchangeName = ( + EVENT_EXCHANGE + if "type" in message + and message["type"] in [InputType.EVENT.value, InputType.EVENT] + else "" + ) + + channel.basic_publish( + exchange=exchangeName, + routing_key=self.getRoute(route), + properties=None, + body=simplejson.dumps( + message, default=decimal_default, ignore_nan=True), + ) + channel.close() + + return super().publishMessage(route, message) + + def subscribe( # type: ignore + self, + route: str, + inputProcessor: InputProcessor, + onConsuming: OnConsuming | None, + bindEventExchange: bool = True, + channel: pika.channel.Channel | None = None, + ) -> bool: + if not self.isReady: + self._connect( + self.subscribe, + (route, inputProcessor, + onConsuming, bindEventExchange), + blocking=False + ) + self._conn.ioloop.run_forever() # type: ignore + elif self.isBlockingConnection(): + self.close() + return self.subscribe(route, inputProcessor, onConsuming, bindEventExchange) + if not channel: + return self.get_channel( + self.subscribe, (route, inputProcessor, + onConsuming, bindEventExchange) + ) + channel.add_on_close_callback(lambda _c, _r: self.close()) + channel.add_on_cancel_callback(lambda _c: self.close()) + + queue_declare(channel, self.getRoute(route), bindEventExchange) + channel.basic_qos(prefetch_count=1) + + channel.basic_consume( + self.getRoute(route), + on_message_callback=lambda c, m, p, b: self.on_message( + c, m, p, b, inputProcessor + ), + auto_ack=False, + ) + + if onConsuming: + onConsuming(self.getRoute(route)) + + def on_message( + self, + channel: pika.channel.Channel, + method, + properties: Any, + body: Any, + inputProcessor: InputProcessor, + ): + if not body: + channel.basic_ack(method.delivery_tag) + payload = json.loads(body.decode("utf-8")) + result = inputProcessor(payload) + + if result == StatusCode.NO_PROCESSING: + channel.basic_nack(method.delivery_tag) + else: + channel.basic_ack(method.delivery_tag) diff --git a/servc/com/cache/__init__.py b/servc/svc/com/cache/__init__.py similarity index 68% rename from servc/com/cache/__init__.py rename to servc/svc/com/cache/__init__.py index 8249c25..ad29574 100644 --- a/servc/com/cache/__init__.py +++ b/servc/svc/com/cache/__init__.py @@ -1,11 +1,11 @@ from typing import Any -from servc.com.service import ComponentType, ServiceComponent -from servc.io.output import StatusCode -from servc.io.response import generateResponseArtifact +from servc.svc import ComponentType, Middleware +from servc.svc.io.output import StatusCode +from servc.svc.io.response import generateResponseArtifact -class CacheComponent(ServiceComponent): +class CacheComponent(Middleware): _type: ComponentType = ComponentType.CACHE _url: str @@ -16,16 +16,16 @@ def __init__(self, url: str): self._url = url def setKey(self, id: str, value: Any) -> str: - pass + return "" def getKey(self, id: str) -> Any | None: - pass + return None def deleteKey(self, id: str) -> bool: - pass + return False def setProgress(self, id: str, progress: float, message: str) -> bool: - return self.setKey( + return not not self.setKey( id, generateResponseArtifact( id, diff --git a/servc/com/cache/redis.py b/servc/svc/com/cache/redis.py similarity index 65% rename from servc/com/cache/redis.py rename to servc/svc/com/cache/redis.py index 2cbe8a7..e7cefc8 100644 --- a/servc/com/cache/redis.py +++ b/servc/svc/com/cache/redis.py @@ -1,38 +1,42 @@ import datetime import decimal import json -from typing import Any, Union +from typing import Any import simplejson from redis import Redis -from servc.com.cache import CacheComponent +from servc.svc.com.cache import CacheComponent -def decimal_default(obj: Any): +def decimal_default(obj: Any) -> None | str | float: if isinstance(obj, decimal.Decimal): return float(obj) if isinstance(obj, (datetime.date, datetime.datetime)): return obj.isoformat() - return + return None class CacheRedis(CacheComponent): - _redisClient: Union[Redis.client, None] = None + _redisClient: Redis + + @property + def conn(self): + return self._redisClient def _connect(self): if self.isOpen: return None self._redisClient = Redis.from_url(self._url) - self.isReady = self._redisClient.ping() - self.isOpen = self._redisClient.ping() + self._isReady = self._redisClient.ping() + self._isOpen = self._redisClient.ping() return None def _close(self): if self._isOpen: self._redisClient.close() - self.isReady = False - self.isOpen = False + self._isReady = False + self._isOpen = False return True return False @@ -41,7 +45,8 @@ def setKey(self, id: str, value: Any) -> str: self.connect() return self.setKey(id, value) self._redisClient.set( - id, simplejson.dumps(value, default=decimal_default, ignore_nan=True) + id, simplejson.dumps( + value, default=decimal_default, ignore_nan=True) ) return id @@ -51,11 +56,11 @@ def getKey(self, id: str) -> Any | None: return self.getKey(id) value = self._redisClient.get(id) if value: - return json.loads(value) + return json.loads(value) # type: ignore return value def deleteKey(self, id: str) -> bool: if not self.isReady: self.connect() return self.deleteKey(id) - return self._redisClient.delete(id) > 0 + return self.conn.delete(id) > 0 diff --git a/servc/svc/com/http/__init__.py b/servc/svc/com/http/__init__.py new file mode 100644 index 0000000..2d0c76e --- /dev/null +++ b/servc/svc/com/http/__init__.py @@ -0,0 +1,186 @@ +import os +from multiprocessing import Process +from typing import Dict, Tuple, TypedDict + +from flask import Flask, jsonify, request + +from servc.svc import ComponentType, Middleware +from servc.svc.client.send import sendMessage +from servc.svc.com.bus import BusComponent +from servc.svc.com.cache import CacheComponent +from servc.svc.com.worker import RESOLVER_MAPPING +from servc.svc.idgen.simple import simple +from servc.svc.io.input import InputPayload, InputType +from servc.svc.io.output import StatusCode + + +class ServiceInformation(TypedDict): + instanceId: str + queue: str + methods: Dict[str, Tuple[str, ...]] + eventHandlers: Dict[str, Tuple[str, ...]] + + +def methodGrabber(m: RESOLVER_MAPPING) -> Dict[str, Tuple[str, ...]]: + j: Dict[str, Tuple[str, ...]] = {} + for key, value in m.items(): + j[key] = value.__code__.co_varnames + return j + + +class HTTPInterface(Middleware): + _type: ComponentType = ComponentType.INTERFACE + + _port: int + + _server: Flask + + _bus: BusComponent + + _cache: CacheComponent + + _consumer: Process + + _route: str + + _instanceId: str + + _info: ServiceInformation + + def __init__( + self, + port: int, + bus: BusComponent, + cache: CacheComponent, + route: str, + instanceId: str, + consumerthread: Process, + resolvers: RESOLVER_MAPPING, + eventResolvers: RESOLVER_MAPPING, + ): + super().__init__() + self._port = port + self._server = Flask(__name__) + + self._bus = bus + self._cache = cache + self._children.append(self._bus) + self._children.append(self._cache) + + self._route = route + self._instanceId = instanceId + self._consumer = consumerthread + + self._info = { + "instanceId": self._instanceId, + "queue": self._bus.getRoute(self._route), + "methods": methodGrabber(resolvers), + "eventHandlers": methodGrabber(eventResolvers), + } + + def _connect(self): + self.bindRoutes() + self._isOpen = True + self._isReady = True + print("Listening on port", self._port, flush=True) + self._server.run(port=self._port, host="0.0.0.0") + + def _close(self): + self._consumer.terminate() + self._consumer.close() + func = request.environ.get("werkzeug.server.shutdown") + if func is None: + raise RuntimeError("Not running with the Werkzeug Server") + func() + self._isOpen = False + self._isReady = False + return True + + def start(self): + self.connect() + + def _health(self): + consumerAlive = False + try: + consumerAlive = self._consumer.is_alive() + except AssertionError: + pid = self._consumer.pid + try: + os.kill(pid, 0) + except OSError: + consumerAlive = False + else: + consumerAlive = True + print("health check:", self.isReady, consumerAlive) + if self.isReady and consumerAlive: + return "OK" + else: + return "Not OK", StatusCode.SERVER_ERROR.value + + def _getResponse(self, id: str): + return jsonify(self._cache.getKey(id)) + + def _postMessage(self): + content_type = request.headers.get("Content-Type", None) + if request.method == "GET": + return self._getInformation() + if content_type == "application/json": + body = request.json + + must_have_keys = ("type",) + for key in must_have_keys: + if key not in body: + return f"missing key {key}", StatusCode.INVALID_INPUTS.value + + if body["type"] == InputType.EVENT.value: + must_have_keys = ("event", "details") + for key in must_have_keys: + if key not in body: + return f"missing key {key}", StatusCode.INVALID_INPUTS.value + instanceId = ( + body["instanceId"] if "instanceId" in body else self._instanceId + ) + + id = self._bus.emitEvent( + body["event"], instanceId, body["details"]) + return id + elif body["type"] == InputType.INPUT.value: + must_have_keys = ("route", "argument") + for key in must_have_keys: + if key not in body: + return f"missing key {key}", StatusCode.INVALID_INPUTS.value + payload: InputPayload = { + "type": body["type"], + "route": body["route"], + "argumentId": "", + "id": body["id"] if "id" in body else "", + "argument": body["argument"], + } + + id = sendMessage( + payload, + self._bus, + self._cache, + simple, + True if "force" in body and body["force"] else False, + [], + ) + return id + else: + return "bad request", StatusCode.INVALID_INPUTS.value + + return "Content-Type not supported" + + def _getInformation(self): + return jsonify(self._info) + + def bindRoutes(self): + self._server.add_url_rule( + "/healthz", "healthz", self._health, methods=["GET"]) + self._server.add_url_rule( + "/readyz", "readyz", self._health, methods=["GET"]) + self._server.add_url_rule( + "/id/", "_getResponse", self._getResponse, methods=["GET"] + ) + self._server.add_url_rule( + "/", "", self._postMessage, methods=["POST", "GET"]) diff --git a/servc/com/consumer/__init__.py b/servc/svc/com/worker/__init__.py similarity index 69% rename from servc/com/consumer/__init__.py rename to servc/svc/com/worker/__init__.py index 83a8d85..e03e709 100644 --- a/servc/com/consumer/__init__.py +++ b/servc/svc/com/worker/__init__.py @@ -1,44 +1,48 @@ from typing import Any, Callable, Dict, List, Union -from servc.com.bus import BusComponent, EmitFunction, OnConsuming -from servc.com.cache import CacheComponent -from servc.com.service import ComponentType, ServiceComponent -from servc.io.input import InputPayload, InputType -from servc.io.output import StatusCode -from servc.io.response import getAnswerArtifact, getErrorArtifact - -EMIT_EVENT = Callable[[str, Any], None] +from servc.svc import ComponentType, Middleware +from servc.svc.com.bus import BusComponent, OnConsuming +from servc.svc.com.cache import CacheComponent +from servc.svc.config import Config +from servc.svc.io.input import EventPayload, InputType +from servc.svc.io.output import StatusCode +from servc.svc.io.response import getAnswerArtifact, getErrorArtifact RESOLVER = Callable[ - [str, BusComponent, CacheComponent, List[ComponentType], Any, EMIT_EVENT], + [str, BusComponent, CacheComponent, Any, List[Middleware]], Union[StatusCode, Any, None], ] RESOLVER_MAPPING = Dict[str, RESOLVER] -class ConsumerComponent(ServiceComponent): - _type: ComponentType = ComponentType.CONSUMER +def HEALTHZ( + _id: str, bus: BusComponent, cache: CacheComponent, _any: Any, c: List[Middleware] +) -> StatusCode: + for component in [bus, cache, *c]: + if not component.isReady: + return StatusCode.SERVER_ERROR + return StatusCode.OK + + +class WorkerComponent(Middleware): + _type: ComponentType = ComponentType.WORKER _route: str _instanceId: str - _resolvers: RESOLVER + _resolvers: RESOLVER_MAPPING - _eventResolvers: RESOLVER + _eventResolvers: RESOLVER_MAPPING _bus: BusComponent _cache: CacheComponent - _emitFunction: EmitFunction - _onConsuming: OnConsuming - _busClass: type[BusComponent] - - _busArgs: List[Any] + _config: Config def __init__( self, @@ -46,39 +50,35 @@ def __init__( instanceId: str, resolvers: RESOLVER_MAPPING, eventResolvers: RESOLVER_MAPPING, - emitFunction: EmitFunction, onConsuming: OnConsuming, bus: BusComponent, cache: CacheComponent, - busClass: type[BusComponent], - busArgs: List[Any], - otherComponents: List[ServiceComponent] = [], + config: Config, + otherComponents: List[Middleware] = [], ): super().__init__() self._route = route self._instanceId = instanceId self._resolvers = resolvers self._eventResolvers = eventResolvers - self._emitFunction = emitFunction self._onConsuming = onConsuming self._bus = bus self._cache = cache - self._busClass = busClass - self._busArgs = busArgs + self._config = config - self._resolvers["healthz"] = lambda *args: self.healthz(self, *args) + self._resolvers["healthz"] = lambda *args: HEALTHZ(*args) self._children.extend(otherComponents) self._children.append(bus) self._children.append(cache) def _connect(self): - self.isReady = True - self.isOpen = True + self._isReady = True + self._isOpen = True def _close(self): - self.isReady = False - self.isOpen = False + self._isReady = False + self._isOpen = False return True def connect(self): @@ -87,27 +87,11 @@ def connect(self): self._bus.subscribe( self._route, self.inputProcessor, - self._emitFunction, self._onConsuming, ) - def healthz( - self, - route: str, - bus: BusComponent, - cache: CacheComponent, - otherComponents: List[ServiceComponent], - inputs: Any, - emitEvent: EMIT_EVENT, - ): - for component in [bus, cache, *otherComponents]: - if not component.isReady: - return StatusCode.SERVER_ERROR - return StatusCode.OK - def emitEvent(self, eventName: str, details: Any): - bus = self._busClass(*self._busArgs) - eventMessage: InputPayload = { + eventMessage: EventPayload = { "type": InputType.EVENT.value, "event": eventName, "details": details, @@ -115,11 +99,10 @@ def emitEvent(self, eventName: str, details: Any): "instanceId": self._instanceId, } - bus.publishMessage(self._route, eventMessage, self._emitFunction) - bus.close() + self._bus.publishMessage(self._route, eventMessage) def inputProcessor(self, message: Any) -> StatusCode: - bus = self._busClass(*self._busArgs) + bus = self._bus cache = self._cache if "type" not in message or "route" not in message: @@ -138,9 +121,8 @@ def inputProcessor(self, message: Any) -> StatusCode: "", bus, cache, - self._children, {**message}, - self.emitEvent, + self._children, ) return StatusCode.OK @@ -188,16 +170,17 @@ def inputProcessor(self, message: Any) -> StatusCode: message["id"], bus, cache, - self._children, artifact["inputs"], - self.emitEvent, + self._children, ) - cache.setKey(message["id"], getAnswerArtifact(message["id"], response)) + cache.setKey(message["id"], getAnswerArtifact( + message["id"], response)) return StatusCode.OK except Exception as e: cache.setKey( message["id"], - getErrorArtifact(message["id"], str(e), StatusCode.SERVER_ERROR), + getErrorArtifact(message["id"], str( + e), StatusCode.SERVER_ERROR), ) return StatusCode.SERVER_ERROR diff --git a/servc/svc/config/__init__.py b/servc/svc/config/__init__.py new file mode 100644 index 0000000..4ed27f8 --- /dev/null +++ b/servc/svc/config/__init__.py @@ -0,0 +1,86 @@ +import os +import socket +import json +from typing import Any + +import yaml + +defaults = { + "conf.http.port": int(os.getenv("PORT", 3000)), + "conf.instanceid": os.getenv("INSTANCE_ID", socket.gethostname()), + "conf.cache.url": os.getenv( + "CACHE_URL", os.getenv("REDIS_URL", "redis://localhost:6379") + ), + "conf.bus.url": os.getenv( + "BUS_URL", os.getenv("CLOUDAMQP_URL", "amqp://localhost:5672") + ), + "conf.bus.route": os.getenv("CONF__BUS__QUEUE", os.getenv("QUEUE_NAME", "test")), + "conf.bus.routemap": json.loads( + os.getenv("CONF__BUS__ROUTEMAP", json.dumps({})) + ), + "conf.bus.prefix": "", +} + + +class Config: + _configDictionary: dict = {} + + def __init__(self, config_path: str | None = None): + if config_path is None: + config_path = os.getenv("CONF__FILE", "/config/config.yaml") + elif not os.path.exists(config_path): + raise FileNotFoundError(f"Configuration file not found: {config_path}") + + self.setValue("conf.file", config_path) + if os.path.exists(config_path): + with open(config_path) as stream: + self._configDictionary = yaml.safe_load(stream) + if self._configDictionary is None: + self._configDictionary = {} + if self.get("conf.file") is None: + self.setValue("conf.file", config_path) + + # set certain defaults + for key, value in defaults.items(): + if self.get(key) is None: + self.setValue(key, value) + + # parse the environment variables and override the configuration file + for key, value in os.environ.items(): + if key.startswith("CONF__") and key not in ("CONF__FILE", "CONF__BUS__ROUTEMAP"): + self.setValue(key.replace("__", ".").lower(), value) + + # validate conf.file matches config_path. Otherwise, raise an exception because we are not able to load the configuration file + if self.get("conf.file") != config_path: + raise Exception("Configuration file does not match the configuration path") + + def get(self, key: str) -> Any: + keys = key.lower().split(".") + subconfig = self._configDictionary + + for index, subkey in enumerate(keys): + is_last = index == len(keys) - 1 + + if is_last: + return subconfig.get(subkey) + else: + subconfig = subconfig.get(subkey, {}) + + def setValue(self, key: str, value: Any): + keys = key.lower().split(".") + subconfig = self._configDictionary + + for index, subkey in enumerate(keys): + is_last = index == len(keys) - 1 + if is_last: + subconfig[subkey] = value + elif subkey not in subconfig: + subconfig[subkey] = {} + + subconfig = subconfig[subkey] + + def getAll(self) -> dict: + return self._configDictionary + + def setAll(self, config: dict): + self._configDictionary = config diff --git a/servc/svc/idgen/__init__.py b/servc/svc/idgen/__init__.py new file mode 100644 index 0000000..28fd088 --- /dev/null +++ b/servc/svc/idgen/__init__.py @@ -0,0 +1,6 @@ +from typing import Callable, List + +from servc.svc import Middleware +from servc.svc.io.input import ArgumentArtifact + +ID_GENERATOR = Callable[[str, List[Middleware], ArgumentArtifact], str] diff --git a/servc/svc/idgen/simple.py b/servc/svc/idgen/simple.py new file mode 100644 index 0000000..1ef7328 --- /dev/null +++ b/servc/svc/idgen/simple.py @@ -0,0 +1,8 @@ +import json +from hashlib import sha256 + +from servc.svc.idgen import ID_GENERATOR + +simple: ID_GENERATOR = lambda route, _c, message: sha256( + "".join([route, json.dumps(message)]).encode("utf-8") +).hexdigest() diff --git a/servc/io/__init__.py b/servc/svc/io/__init__.py similarity index 100% rename from servc/io/__init__.py rename to servc/svc/io/__init__.py diff --git a/servc/io/input.py b/servc/svc/io/input.py similarity index 88% rename from servc/io/input.py rename to servc/svc/io/input.py index ee2ecba..85a27e4 100644 --- a/servc/io/input.py +++ b/servc/svc/io/input.py @@ -10,13 +10,14 @@ class InputType(Enum): class GenericInput(TypedDict): type: str route: str + force: NotRequired[bool] class InputPayload(GenericInput): id: str argumentId: str instanceId: NotRequired[str] - inputs: NotRequired[Any] + argument: NotRequired[Any] class EventPayload(GenericInput): diff --git a/servc/io/output.py b/servc/svc/io/output.py similarity index 100% rename from servc/io/output.py rename to servc/svc/io/output.py diff --git a/servc/io/response.py b/servc/svc/io/response.py similarity index 87% rename from servc/io/response.py rename to servc/svc/io/response.py index 38280a4..8706309 100644 --- a/servc/io/response.py +++ b/servc/svc/io/response.py @@ -1,6 +1,6 @@ from typing import Any -from servc.io.output import ResponseArtifact, StatusCode +from servc.svc.io.output import ResponseArtifact, StatusCode def generateResponseArtifact( @@ -23,7 +23,7 @@ def generateResponseArtifact( def getErrorArtifact( - id: str, errorMessage: str, statusCode: StatusCode = StatusCode.USER_ERROR + id: str, errorMessage: str, statusCode: StatusCode ) -> ResponseArtifact: return generateResponseArtifact(id, 100, errorMessage, statusCode, True) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/svc/test_config.py b/tests/svc/test_config.py new file mode 100644 index 0000000..0f123e4 --- /dev/null +++ b/tests/svc/test_config.py @@ -0,0 +1,32 @@ +import unittest +from servc.svc.config import Config + + +class TestConfig(unittest.TestCase): + @classmethod + def setUpClass(cls) -> None: + cls.config = Config() + + def test_get_defaults(self): + self.assertEqual(self.config.get("conf.file"), "/config/config.yaml") + self.assertEqual(self.config.get("conf.bus.routemap"), {}) + self.assertEqual(self.config.get("conf.bus.prefix"), "") + + def test_value(self): + self.config.setValue("conf.bus.prefix", "test") + self.assertEqual(self.config.get("conf.bus.prefix"), "test") + + self.config.setValue("conf.bus.routemap.api", "test_route") + self.assertEqual(self.config.get( + "conf.bus.routemap.api"), "test_route") + + def test_wrong_location(self): + try: + Config("config.test.yaml") + except FileNotFoundError: + return self.assertTrue(True) + self.assertTrue(False) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/svc/test_idgen.py b/tests/svc/test_idgen.py new file mode 100644 index 0000000..17c1376 --- /dev/null +++ b/tests/svc/test_idgen.py @@ -0,0 +1,13 @@ +import unittest +from servc.svc.idgen.simple import simple + + +class TestIDGen(unittest.TestCase): + def test_simple_idgen(self): + route = "/test" + message = {"test": "test"} + self.assertIsInstance(simple(route, [], message), str) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/svc/test_rabbitmq.py b/tests/svc/test_rabbitmq.py new file mode 100644 index 0000000..e8a1ce5 --- /dev/null +++ b/tests/svc/test_rabbitmq.py @@ -0,0 +1,104 @@ +import unittest +import pika +from servc.svc.com.bus.rabbitmq import BusRabbitMQ, queue_declare +from servc.svc.config import Config +from servc.svc.io.input import EventPayload, InputType + + +class TestRabbitMQ(unittest.TestCase): + @classmethod + def setUpClass(cls) -> None: + config = Config() + cls.bus = BusRabbitMQ(config.get("conf.bus.url"), {}, "") + + params = pika.URLParameters(config.get("conf.bus.url")) + cls.conn = pika.BlockingConnection(params) + cls.channel = cls.conn.channel() + + @classmethod + def tearDownClass(cls) -> None: + cls.bus.close() + cls.channel.close() + cls.conn.close() + + def setUp(self) -> None: + self.bus._prefix = "" + self.bus._routeMap = {} + + def test_get_route(self): + route = "test_route" + prefix = "prefix" + mapPrefix = { + "test_route": "test_route2", + "my_route": "my_route2", + } + + self.assertEqual(self.bus.getRoute(route), route) + + self.bus._prefix = prefix + self.assertTrue(self.bus.getRoute(route).startswith(prefix)) + + self.bus._routeMap = mapPrefix + self.assertEqual(self.bus.getRoute(route), + "".join([prefix, mapPrefix[route]])) + self.assertEqual(self.bus.getRoute("fake_route"), + "".join([prefix, "fake_route"])) + + def test_send_message_no_existence(self): + route = "test_route" + message = "test_message" + + self.channel.queue_delete(queue=route) + self.bus.publishMessage(route, message) + queue = self.channel.queue_declare( + queue=route, passive=False, durable=True, exclusive=False + ) + self.assertEqual(queue.method.message_count, 0) + + def test_send_message_existence(self): + route = "test_route" + message = "test_message" + self.channel.queue_delete(queue=route) + queue_declare(self.channel, route, True) + + self.bus.publishMessage(route, message) + queue = self.channel.queue_declare( + queue=route, passive=True, durable=True, exclusive=False + ) + self.assertEqual(queue.method.message_count, 1) + + def test_fanout_exchange(self): + routes = ["test_route1", "test_route2", "test_route3"] + message: EventPayload = { + "type": InputType.EVENT, + "route": "test_route", + "event": "test_event", + "details": { + "test": "test", + }, + "instanceId": "test_instanceId", + } + + for route in routes: + self.channel.queue_delete(queue=route) + queue_declare(self.channel, route, True) + self.bus.publishMessage("testing", message) + + for route in routes: + queue = self.channel.queue_declare( + queue=route, passive=True, durable=True, exclusive=False + ) + self.assertEqual(queue.method.message_count, 1) + self.channel.queue_delete(queue=route) + + def test_close_twice(self): + self.bus.close() + self.bus.close() + + def test_get_fresh_channel(self): + self.bus.close() + self.bus.get_channel(None, None) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/svc/test_redis.py b/tests/svc/test_redis.py new file mode 100644 index 0000000..2f11253 --- /dev/null +++ b/tests/svc/test_redis.py @@ -0,0 +1,59 @@ +import datetime +import decimal +import unittest +from servc.svc.com.cache.redis import CacheRedis +from servc.svc.config import Config + + +class TestRedis(unittest.TestCase): + @classmethod + def setUpClass(cls) -> None: + config = Config() + cls.cache = CacheRedis(config.get("conf.cache.url")) + + @classmethod + def tearDownClass(cls) -> None: + cls.cache.close() + + def test_get_key(self): + key = "test_key" + value = datetime.datetime.now() + self.cache.setKey(key, value) + self.assertEqual(self.cache.getKey(key), value.isoformat()) + + def test_set_key(self): + key = "test_key2" + value = decimal.Decimal(10.5) + self.cache.connect() + self.cache.setKey(key, value) + + self.cache.close() + self.assertEqual(self.cache.getKey(key), value) + + def test_connect_twice(self): + self.cache.connect() + self.cache.connect() + self.assertTrue(self.cache.isReady) + + def test_fake_key(self): + self.assertIsNone(self.cache.getKey("fake_key")) + + def test_delete_key(self): + key = "test_key3" + value = "test_value" + self.cache.close() + self.cache.setKey(key, value) + self.assertIsNotNone(self.cache.getKey(key)) + + self.cache.close() + self.assertTrue(self.cache.deleteKey(key)) + self.assertIsNone(self.cache.getKey(key)) + + def test_close_twice(self): + self.cache.close() + self.cache.close() + self.assertFalse(self.cache.isReady) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_com_cache_redis.py b/tests/test_com_cache_redis.py deleted file mode 100644 index 11f0f23..0000000 --- a/tests/test_com_cache_redis.py +++ /dev/null @@ -1,48 +0,0 @@ -import unittest - -from servc.com.cache.redis import CacheRedis -from servc.com.service import ComponentType -from servc.config import cache_url - -redis = CacheRedis(cache_url) -id = "1234" -value = { - "key": "value", - "nested": [ - { - "key": "value", - }, - ], -} - - -class TestRedis(unittest.TestCase): - def test_simple_set(self): - result = redis.setKey(id, value) - self.assertEqual(result, id) - - def test_simple_get(self): - redis.close() - redis.setKey(id, value) - result = redis.getKey(id) - self.assertEqual(result["nested"][0]["key"], value["nested"][0]["key"]) - - def test_blank_get(self): - result = redis.getKey("12345") - self.assertEqual(result, None) - - def test_connect_ready(self): - result = redis.isReady - self.assertEqual(result, True) - - def test_type(self): - result = redis.type - self.assertEqual(result, ComponentType.CACHE) - - def test_connect(self): - result = redis.connect() - self.assertEqual(result, None) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_io_idgenerator.py b/tests/test_io_idgenerator.py deleted file mode 100644 index 4a1f61b..0000000 --- a/tests/test_io_idgenerator.py +++ /dev/null @@ -1,13 +0,0 @@ -import unittest - -from servc.io.idgenerator.simple import simpleIDGenerator - - -class TestIdGenerator(unittest.TestCase): - def test_generator_simple(self): - id = simpleIDGenerator("test", [], {"test": "test"}) - self.assertEqual(type(id), str) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_server.py b/tests/test_server.py deleted file mode 100644 index 520dc02..0000000 --- a/tests/test_server.py +++ /dev/null @@ -1,150 +0,0 @@ -import json -import os -import signal -import time -import unittest -import urllib.request - -from servc.com.bus.rabbitmq import BusRabbitMQ -from servc.com.cache.redis import CacheRedis -from servc.com.server.server import start_server -from servc.config import bus_url, cache_url -from servc.io.client.poll import pollMessage -from servc.io.client.send import sendMessage -from servc.io.idgenerator.simple import simpleIDGenerator -from servc.io.input import InputType -from servc.io.output import StatusCode - - -def inputProcessor(id, bus, cache, components, message, emit): - return message - - -class TestServer(unittest.TestCase): - @classmethod - def setUpClass(cls) -> None: - cls.route = "test_consumer" - cls.instance_id = "test_consumer_instance_id" - cls.bus = BusRabbitMQ(bus_url) - cls.cache = CacheRedis(cache_url) - cls.server = start_server( - cls.route, - { - "passthrough": inputProcessor, - }, - components=[ - [CacheRedis, [cache_url]] - ], - returnProcess=True, - port=5000, - ) - - def setUp(self): - time.sleep(5) - - @classmethod - def tearDownClass(cls) -> None: - if cls.server is not None: - for process in cls.server: - if process is not None and process.pid is not None: - os.kill(process.pid, signal.SIGTERM) - - def tearDown(self): - time.sleep(5) - - def test_health_http(self): - time.sleep(5) - contents = urllib.request.urlopen("http://localhost:5000/healthz").read() - self.assertEqual(contents, b"OK") - - def test_plaintext_ack(self): - message = { - "type": InputType.INPUT.value, - "route": self.route, - "id": "", - "argumentId": "plain", - "inputs": { - "method": "passthrough", - "inputs": { - "test": "test", - }, - }, - } - id = sendMessage(message, self.bus, self.cache, simpleIDGenerator, force=True) - result = pollMessage(id, self.cache) - self.assertEqual(result["progress"], 100) - self.assertEqual(result["isError"], False) - self.assertEqual(result["statusCode"], StatusCode.OK.value) - self.assertEqual(result["responseBody"]["test"], "test") - - def test_simple_ack(self): - argumentId = "asdasd" - self.cache.setKey( - argumentId, - { - "method": "passthrough", - "inputs": 10, - }, - ) - message = { - "type": InputType.INPUT.value, - "route": self.route, - "id": "", - "argumentId": argumentId, - "inputs": "", - } - id = sendMessage(message, self.bus, self.cache, simpleIDGenerator, force=True) - result = pollMessage(id, self.cache) - self.assertEqual(result["progress"], 100) - self.assertEqual(result["isError"], False) - self.assertEqual(result["statusCode"], StatusCode.OK.value) - self.assertEqual(result["responseBody"], 10) - - def test_non_existing_method(self): - message = { - "type": InputType.INPUT.value, - "route": self.route, - "id": "", - "argumentId": "plain", - "inputs": { - "method": "nonExistingMethod", - "inputs": { - "test": "test", - }, - }, - } - id = sendMessage(message, self.bus, self.cache, simpleIDGenerator, force=True) - result = pollMessage(id, self.cache) - self.assertEqual(result["progress"], 100) - self.assertEqual(result["isError"], True) - self.assertEqual(result["statusCode"], StatusCode.METHOD_NOT_FOUND.value) - self.assertEqual(result["responseBody"], "Method not found") - - def test_get_response(self): - message = { - "type": InputType.INPUT.value, - "route": self.route, - "id": "", - "argumentId": "", - "inputs": { - "method": "passthrough", - "inputs": 100, - }, - } - id = sendMessage(message, self.bus, self.cache, simpleIDGenerator) - result = pollMessage(id, self.cache) - response = ( - urllib.request.urlopen(f"http://localhost:5000/id/{id}") - .read() - .decode("utf-8") - ) - result = json.loads(response) - - self.assertEqual(result["progress"], 100) - self.assertEqual(result["isError"], False) - self.assertEqual(result["statusCode"], StatusCode.OK.value) - self.assertEqual(result["responseBody"], 100) - - -if __name__ == "__main__": - unittest.main()