From 1e77ea45ad6cb1a8a37b468692bbe6dea81b0986 Mon Sep 17 00:00:00 2001 From: Yusuf Ali Date: Fri, 24 Jan 2025 04:46:13 +0000 Subject: [PATCH] fix(rabbitmq): reestablish consuming upon connection loss --- servc/svc/com/bus/rabbitmq.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/servc/svc/com/bus/rabbitmq.py b/servc/svc/com/bus/rabbitmq.py index 0d5c8c9..ad99794 100644 --- a/servc/svc/com/bus/rabbitmq.py +++ b/servc/svc/com/bus/rabbitmq.py @@ -37,6 +37,8 @@ class BusRabbitMQ(BusComponent): _conn: AsyncioConnection | BlockingConnection | None = None + _consumingArgs: Tuple[str, InputProcessor, OnConsuming | None, bool] | None = None + @property def isReady(self) -> bool: return ( @@ -83,7 +85,13 @@ def on_connection_closed(self, _conn: AsyncioConnection, reason: pika.exceptions if reason == pika.exceptions.StreamLostError: print(str(reason), flush=True) self._conn = None - self._connect() + + # if this happens while consuming, then we need to re establish the on-consuming method + if self._consumingArgs: + route, inputProcessor, onConsuming, bindEventExchange = ( + self._consumingArgs + ) + self.subscribe(route, inputProcessor, onConsuming, bindEventExchange) def get_channel(self, method: Callable | None, args: Tuple | None): if not self.isReady: @@ -197,6 +205,7 @@ def subscribe( # type: ignore return self.get_channel( self.subscribe, (route, inputProcessor, onConsuming, bindEventExchange) ) + self._consumingArgs = (route, inputProcessor, onConsuming, bindEventExchange) channel.add_on_close_callback(lambda _c, _r: self.close()) channel.add_on_cancel_callback(lambda _c: self.close())