From 5439659d27bfd990a6635dea8b6393e2c02ee5f2 Mon Sep 17 00:00:00 2001 From: Yusuf Ali Date: Thu, 24 Apr 2025 03:43:34 -0400 Subject: [PATCH 1/3] feat(bus): adding in asb support --- servc/svc/com/bus/asb | 137 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 servc/svc/com/bus/asb diff --git a/servc/svc/com/bus/asb b/servc/svc/com/bus/asb new file mode 100644 index 0000000..70e8339 --- /dev/null +++ b/servc/svc/com/bus/asb @@ -0,0 +1,137 @@ +from __future__ import annotations + +import json +import threading +import time +from typing import Any + +import simplejson +from azure.servicebus import ServiceBusClient, ServiceBusMessage, ServiceBusReceiver +from azure.servicebus.management import ServiceBusAdministrationClient +from servc.svc.com.bus import BusComponent, InputProcessor, OnConsuming +from servc.svc.com.cache.redis import decimal_default +from servc.svc.io.input import EventPayload, InputPayload, InputType +from servc.svc.io.output import StatusCode + + +class AzureServiceBus(BusComponent): + _url: str + + _conn: ServiceBusClient | None = None + + @property + def isReady(self) -> bool: + return self._conn is not None + + @property + def isOpen(self) -> bool: + return self.isReady + + def isBlockingConnection(self) -> bool: + return isinstance(self._conn, ServiceBusClient) + + def _connect(self): + if not self.isOpen: + self._conn = ServiceBusClient.from_connection_string(self._url) + + def _close(self, expected=True, reason: Any = None): + print("Close method called", flush=True) + if not expected: + print("Unexpected close: ", reason, flush=True) + exit(1) + if self.isOpen or self.isReady: + if ( + self._conn + # and not self._conn.is_closed + # and (self.isBlockingConnection() or not self._conn.is_closing) + ): + self._conn.close() + self._conn = None + + return True + return False + + def publishMessage(self, route: str, message: InputPayload | EventPayload) -> bool: + if not self.isReady or not self._conn: + self._connect() + if not self._conn: + raise Exception("Service Bus connection is not established") + + isEvent = ( + True + if "type" in message + and message["type"] in [InputType.EVENT.value, InputType.EVENT] + else False + ) + asb_message = ServiceBusMessage( + simplejson.dumps(message, default=decimal_default, ignore_nan=True) + ) + + # NOTE: azure service bus does not support event routing. thus, we must + # manually handle the event routing + if isEvent: + with ServiceBusAdministrationClient.from_connection_string( + self._url + ) as admin_client: + for queue_properties in admin_client.list_queues(): + sender = self._conn.get_queue_sender( + queue_name=self.getRoute(queue_properties.name) + ) + with sender: + sender.send_messages(asb_message) + + return super().publishMessage(route, message) + + sender = self._conn.get_queue_sender(queue_name=self.getRoute(route)) + with sender: + sender.send_messages(asb_message) + + return super().publishMessage(route, message) + + def subscribe( + self, + route: str, + inputProcessor: InputProcessor, + onConsuming: OnConsuming | None, + bindEventExchange: bool, + ) -> bool: + if not self.isReady or not self._conn: + self._connect() + if not self._conn: + raise Exception("Service Bus connection is not established") + + receiver = self._conn.get_queue_receiver(queue_name=self.getRoute(route)) + with receiver: + received_msgs = receiver.receive_messages(max_message_count=1) + for msg in received_msgs: + thread = threading.Thread( + target=self.on_message, + args=(msg, receiver, inputProcessor), + ) + thread.start() + thread.join() + + time.sleep(1) + self.subscribe( + route, + inputProcessor, + onConsuming, + bindEventExchange, + ) + + return True + + def on_message( + self, + body: Any, + receiver: ServiceBusReceiver, + inputProcessor: InputProcessor, + ): + payload = json.loads(str(body)) + result = inputProcessor(payload) + + if result == StatusCode.NO_PROCESSING: + receiver.abandon_message(body) + else: + receiver.complete_message(body) + print("Processed message", flush=True) From 01015a29a06c351e75fa0116fc02152713f2a16a Mon Sep 17 00:00:00 2001 From: Yusuf Ali Date: Thu, 24 Apr 2025 03:44:05 -0400 Subject: [PATCH 2/3] chore: adding in asb depends --- requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index e2d9de8..55817c6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ simplejson==3.20.1 flask==3.1.0 pyyaml==6.0.2 pyiceberg[sql-sqlite,pyarrow]==0.8.1 -deltalake==0.25.5 \ No newline at end of file +deltalake==0.25.5 +azure-servicebus==7.14.2 From 180aa2fd3e6330b0cd74bb7dbbd08d5b1ebc0b14 Mon Sep 17 00:00:00 2001 From: Yusuf Ali Date: Thu, 24 Apr 2025 03:44:59 -0400 Subject: [PATCH 3/3] chore: rename file --- servc/svc/com/bus/{asb => asb.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename servc/svc/com/bus/{asb => asb.py} (100%) diff --git a/servc/svc/com/bus/asb b/servc/svc/com/bus/asb.py similarity index 100% rename from servc/svc/com/bus/asb rename to servc/svc/com/bus/asb.py