From 33c3813d021cdb3778b60b9cc122ce331c990b4b Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 24 Mar 2025 10:26:31 +0100 Subject: [PATCH 1/9] fix: revise test to distinguish S2Message classes and S2MessageElement classes Signed-off-by: F.N. Claessen --- tests/unit/message_test.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/unit/message_test.py b/tests/unit/message_test.py index 06e2798..5f9e677 100644 --- a/tests/unit/message_test.py +++ b/tests/unit/message_test.py @@ -3,6 +3,7 @@ import importlib import inspect import pkgutil +from typing import get_args from s2python import message from s2python.validate_values_mixin import S2MessageComponent @@ -37,6 +38,10 @@ def _test_import_s2_messages(self, module_name): assert hasattr( message, _class.__name__ ), f"{_class} should be importable from s2_python.message" + if hasattr(_class, "message_id"): + assert _class in get_args(message.S2Message), f"{_class} should be typed as a s2_python.message.S2Message" + else: + assert _class in get_args(message.S2MessageElement), f"{_class} should be typed as a s2_python.message.S2MessageElement" def test_import_s2_messages__common(self): self._test_import_s2_messages("s2python.common") From fc2a60b00d8300109980f5b6d4cfa8478ba1e2c9 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 24 Mar 2025 10:42:08 +0100 Subject: [PATCH 2/9] fix: move some of the classes from the S2Message to the S2MessageElement type Signed-off-by: F.N. Claessen --- src/s2python/message.py | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/src/s2python/message.py b/src/s2python/message.py index 030d8e2..35045a6 100644 --- a/src/s2python/message.py +++ b/src/s2python/message.py @@ -52,20 +52,30 @@ ) S2Message = Union[ - FRBCActuatorDescription, FRBCActuatorStatus, - FRBCFillLevelTargetProfile, - FRBCFillLevelTargetProfileElement, FRBCInstruction, - FRBCLeakageBehaviour, - FRBCLeakageBehaviourElement, - FRBCOperationMode, - FRBCOperationModeElement, FRBCStorageDescription, FRBCStorageStatus, FRBCSystemDescription, FRBCTimerStatus, FRBCUsageForecast, + PPBCScheduleInstruction, + PPBCStartInterruptionInstruction, + ReceptionStatus, + ResourceManagerDetails, + RevokeObject, + SelectControlType, + SessionRequest, +] + +S2MessageElement = Union[ + FRBCActuatorDescription, + FRBCFillLevelTargetProfile, + FRBCFillLevelTargetProfileElement, + FRBCLeakageBehaviour, + FRBCLeakageBehaviourElement, + FRBCOperationMode, + FRBCOperationModeElement, FRBCUsageForecastElement, PPBCEndInterruptionInstruction, PPBCPowerProfileDefinition, @@ -74,8 +84,6 @@ PPBCPowerProfileStatus, PPBCPowerSequenceContainerStatus, PPBCPowerSequenceElement, - PPBCScheduleInstruction, - PPBCStartInterruptionInstruction, Duration, Handshake, HandshakeResponse, @@ -87,12 +95,7 @@ PowerMeasurement, PowerRange, PowerValue, - ReceptionStatus, - ResourceManagerDetails, - RevokeObject, Role, - SelectControlType, - SessionRequest, Timer, Transition, ] From a4df1b3a71cfc69d2655d55adc0f20a69d0dc686 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 24 Mar 2025 12:24:50 +0100 Subject: [PATCH 3/9] fix: test based on message_id attribute Signed-off-by: F.N. Claessen --- tests/unit/message_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/message_test.py b/tests/unit/message_test.py index 3c5dd37..7a61063 100644 --- a/tests/unit/message_test.py +++ b/tests/unit/message_test.py @@ -38,7 +38,7 @@ def _test_import_s2_messages(self, module_name): assert hasattr( message, _class.__name__ ), f"{_class} should be importable from s2_python.message" - if hasattr(_class, "message_id"): + if "message_id" in _class.model_fields: assert _class in get_args(message.S2Message), f"{_class} should be typed as a s2_python.message.S2Message" else: assert _class in get_args(message.S2MessageElement), f"{_class} should be typed as a s2_python.message.S2MessageElement" From 2d1edc9e4fe35daa6ed55193846f12cc5a49f1ed Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 24 Mar 2025 12:24:55 +0100 Subject: [PATCH 4/9] fix: move remaining classes Signed-off-by: F.N. Claessen --- src/s2python/message.py | 53 +++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/src/s2python/message.py b/src/s2python/message.py index 1e5adb5..8cb5f09 100644 --- a/src/s2python/message.py +++ b/src/s2python/message.py @@ -70,63 +70,64 @@ ) S2Message = Union[ + DDBCAverageDemandRateForecast, + DDBCInstruction, + DDBCSystemDescription, + DDBCTimerStatus, FRBCActuatorStatus, + FRBCFillLevelTargetProfile, FRBCInstruction, - FRBCStorageDescription, + FRBCLeakageBehaviour, FRBCStorageStatus, FRBCSystemDescription, FRBCTimerStatus, FRBCUsageForecast, + PEBCPowerConstraints, + PPBCEndInterruptionInstruction, + PPBCPowerProfileDefinition, + PPBCPowerProfileStatus, PPBCScheduleInstruction, PPBCStartInterruptionInstruction, - PEBCAllowedLimitRange, - PEBCEnergyConstraint, - PEBCInstruction, - PEBCPowerConstraints, - PEBCPowerEnvelope, - PEBCPowerEnvelopeElement, - DDBCActuatorDescription, - DDBCActuatorStatus, - DDBCAverageDemandRateForecast, - DDBCAverageDemandRateForecastElement, - DDBCInstruction, - DDBCOperationMode, - DDBCSystemDescription, - DDBCTimerStatus, - ReceptionStatus, ResourceManagerDetails, RevokeObject, SelectControlType, SessionRequest, + DDBCActuatorStatus, + FRBCInstruction, + PEBCEnergyConstraint, + PEBCInstruction, + Handshake, + HandshakeResponse, + InstructionStatusUpdate, + PowerForecast, + PowerMeasurement, ] S2MessageElement = Union[ + DDBCActuatorDescription, + DDBCAverageDemandRateForecastElement, + DDBCOperationMode, FRBCActuatorDescription, - FRBCFillLevelTargetProfile, FRBCFillLevelTargetProfileElement, - FRBCLeakageBehaviour, FRBCLeakageBehaviourElement, FRBCOperationMode, FRBCOperationModeElement, + FRBCStorageDescription, FRBCUsageForecastElement, - PPBCEndInterruptionInstruction, - PPBCPowerProfileDefinition, + PEBCAllowedLimitRange, + PEBCPowerEnvelope, + PEBCPowerEnvelopeElement, PPBCPowerSequenceContainer, PPBCPowerSequence, - PPBCPowerProfileStatus, PPBCPowerSequenceContainerStatus, PPBCPowerSequenceElement, Duration, - Handshake, - HandshakeResponse, - InstructionStatusUpdate, NumberRange, - PowerForecast, PowerForecastElement, PowerForecastValue, - PowerMeasurement, PowerRange, PowerValue, + ReceptionStatus, Role, Timer, Transition, From 094173c0f394845f2d7d4e8b62af9b12356fe089 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 24 Mar 2025 12:30:30 +0100 Subject: [PATCH 5/9] style: pylint Signed-off-by: F.N. Claessen --- tests/unit/message_test.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/unit/message_test.py b/tests/unit/message_test.py index 7a61063..2783ee5 100644 --- a/tests/unit/message_test.py +++ b/tests/unit/message_test.py @@ -39,9 +39,15 @@ def _test_import_s2_messages(self, module_name): message, _class.__name__ ), f"{_class} should be importable from s2_python.message" if "message_id" in _class.model_fields: - assert _class in get_args(message.S2Message), f"{_class} should be typed as a s2_python.message.S2Message" + assert ( + _class in get_args(message.S2Message), + f"{_class} should be typed as a s2_python.message.S2Message", + ) else: - assert _class in get_args(message.S2MessageElement), f"{_class} should be typed as a s2_python.message.S2MessageElement" + assert ( + _class in get_args(message.S2MessageElement), + f"{_class} should be typed as a s2_python.message.S2MessageElement", + ) def test_import_s2_messages__common(self): self._test_import_s2_messages("s2python.common") From 9b0652dbf561ad31fc764ee864181a717b8d74ef Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 24 Mar 2025 12:34:54 +0100 Subject: [PATCH 6/9] fix: pylint warned me about a bug I introduced due to pylint warning me about a style issue, ugh Signed-off-by: F.N. Claessen --- tests/unit/message_test.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/unit/message_test.py b/tests/unit/message_test.py index 2783ee5..20adaeb 100644 --- a/tests/unit/message_test.py +++ b/tests/unit/message_test.py @@ -39,13 +39,11 @@ def _test_import_s2_messages(self, module_name): message, _class.__name__ ), f"{_class} should be importable from s2_python.message" if "message_id" in _class.model_fields: - assert ( - _class in get_args(message.S2Message), + assert _class in get_args(message.S2Message), ( f"{_class} should be typed as a s2_python.message.S2Message", ) else: - assert ( - _class in get_args(message.S2MessageElement), + assert _class in get_args(message.S2MessageElement), ( f"{_class} should be typed as a s2_python.message.S2MessageElement", ) From 01829a1ed5b25a833df1c891e32ea9b7c2c7f659 Mon Sep 17 00:00:00 2001 From: Vlad Iftime Date: Mon, 24 Mar 2025 15:27:11 +0100 Subject: [PATCH 7/9] Switching from str to UUID Signed-off-by: Vlad Iftime --- src/s2python/s2_connection.py | 64 ++++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 20 deletions(-) diff --git a/src/s2python/s2_connection.py b/src/s2python/s2_connection.py index cfc915e..897344c 100644 --- a/src/s2python/s2_connection.py +++ b/src/s2python/s2_connection.py @@ -36,7 +36,7 @@ @dataclass class AssetDetails: # pylint: disable=too-many-instance-attributes - resource_id: str + resource_id: uuid.UUID provides_forecast: bool provides_power_measurements: List[CommodityQuantity] @@ -51,9 +51,13 @@ class AssetDetails: # pylint: disable=too-many-instance-attributes firmware_version: Optional[str] = None serial_number: Optional[str] = None - def to_resource_manager_details(self, control_types: List[S2ControlType]) -> ResourceManagerDetails: + def to_resource_manager_details( + self, control_types: List[S2ControlType] + ) -> ResourceManagerDetails: return ResourceManagerDetails( - available_control_types=[control_type.get_protocol_control_type() for control_type in control_types], + available_control_types=[ + control_type.get_protocol_control_type() for control_type in control_types + ], currency=self.currency, firmware_version=self.firmware_version, instruction_processing_delay=self.instruction_processing_delay, @@ -89,7 +93,7 @@ async def run_async(self) -> None: self.status_is_send.set() await self.connection.respond_with_reception_status( - subject_message_id=str(self.subject_message_id), + subject_message_id=self.subject_message_id, status=ReceptionStatusValues.OK, diagnostic_label="Processed okay.", ) @@ -98,7 +102,7 @@ def run_sync(self) -> None: self.status_is_send.set() self.connection.respond_with_reception_status_sync( - subject_message_id=str(self.subject_message_id), + subject_message_id=self.subject_message_id, status=ReceptionStatusValues.OK, diagnostic_label="Processed okay.", ) @@ -155,7 +159,7 @@ def do_message() -> None: except Exception: if not send_okay.status_is_send.is_set(): await connection.respond_with_reception_status( - subject_message_id=str(msg.message_id), # type: ignore[attr-defined, union-attr] + subject_message_id=msg.message_id, # type: ignore[attr-defined, union-attr] status=ReceptionStatusValues.PERMANENT_ERROR, diagnostic_label=f"While processing message {msg.message_id} " # type: ignore[attr-defined, union-attr] # pylint: disable=line-too-long f"an unrecoverable error occurred.", @@ -294,7 +298,9 @@ async def wait_till_connection_restart() -> None: self._eventloop.create_task(wait_till_connection_restart()), ] - (done, pending) = await asyncio.wait(background_tasks, return_when=asyncio.FIRST_COMPLETED) + (done, pending) = await asyncio.wait( + background_tasks, return_when=asyncio.FIRST_COMPLETED + ) if self._current_control_type: self._current_control_type.deactivate(self) self._current_control_type = None @@ -327,7 +333,9 @@ async def _connect_ws(self) -> None: connection_kwargs["ssl"].verify_mode = ssl.CERT_NONE if self._bearer_token: - connection_kwargs["additional_headers"] = {"Authorization": f"Bearer {self._bearer_token}"} + connection_kwargs["additional_headers"] = { + "Authorization": f"Bearer {self._bearer_token}" + } self.ws = await ws_connect(uri=self.url, **connection_kwargs) except (EOFError, OSError) as e: @@ -335,15 +343,21 @@ async def _connect_ws(self) -> None: async def _connect_as_rm(self) -> None: await self.send_msg_and_await_reception_status_async( - Handshake(message_id=uuid.uuid4(), role=self.role, supported_protocol_versions=[S2_VERSION]) + Handshake( + message_id=uuid.uuid4(), role=self.role, supported_protocol_versions=[S2_VERSION] + ) ) logger.debug("Send handshake to CEM. Expecting Handshake and HandshakeResponse from CEM.") await self._handle_received_messages() - async def handle_handshake(self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None]) -> None: + async def handle_handshake( + self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None] + ) -> None: if not isinstance(message, Handshake): - logger.error("Handler for Handshake received a message of the wrong type: %s", type(message)) + logger.error( + "Handler for Handshake received a message of the wrong type: %s", type(message) + ) return logger.debug( @@ -387,8 +401,12 @@ async def handle_select_control_type_as_rm( logger.debug("CEM selected control type %s. Activating control type.", message.control_type) - control_types_by_protocol_name = {c.get_protocol_control_type(): c for c in self.control_types} - selected_control_type: Optional[S2ControlType] = control_types_by_protocol_name.get(message.control_type) + control_types_by_protocol_name = { + c.get_protocol_control_type(): c for c in self.control_types + } + selected_control_type: Optional[S2ControlType] = control_types_by_protocol_name.get( + message.control_type + ) if self._current_control_type is not None: await self._eventloop.run_in_executor(None, self._current_control_type.deactivate, self) @@ -406,7 +424,9 @@ async def _receive_messages(self) -> None: to any calls of `send_msg_and_await_reception_status`. """ if self.ws is None: - raise RuntimeError("Cannot receive messages if websocket connection is not yet established.") + raise RuntimeError( + "Cannot receive messages if websocket connection is not yet established." + ) logger.info("S2 connection has started to receive messages.") @@ -416,7 +436,7 @@ async def _receive_messages(self) -> None: except json.JSONDecodeError: await self._send_and_forget( ReceptionStatus( - subject_message_id="00000000-0000-0000-0000-000000000000", + subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), status=ReceptionStatusValues.INVALID_DATA, diagnostic_label="Not valid json.", ) @@ -432,7 +452,7 @@ async def _receive_messages(self) -> None: ) else: await self.respond_with_reception_status( - subject_message_id="00000000-0000-0000-0000-000000000000", + subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), status=ReceptionStatusValues.INVALID_DATA, diagnostic_label="Message appears valid json but could not find a message_id field.", ) @@ -450,7 +470,9 @@ async def _receive_messages(self) -> None: async def _send_and_forget(self, s2_msg: S2Message) -> None: if self.ws is None: - raise RuntimeError("Cannot send messages if websocket connection is not yet established.") + raise RuntimeError( + "Cannot send messages if websocket connection is not yet established." + ) json_msg = s2_msg.to_json() logger.debug("Sending message %s", json_msg) @@ -461,7 +483,7 @@ async def _send_and_forget(self, s2_msg: S2Message) -> None: self._restart_connection_event.set() async def respond_with_reception_status( - self, subject_message_id: str, status: ReceptionStatusValues, diagnostic_label: str + self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str ) -> None: logger.debug("Responding to message %s with status %s", subject_message_id, status) await self._send_and_forget( @@ -473,7 +495,7 @@ async def respond_with_reception_status( ) def respond_with_reception_status_sync( - self, subject_message_id: str, status: ReceptionStatusValues, diagnostic_label: str + self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str ) -> None: asyncio.run_coroutine_threadsafe( self.respond_with_reception_status(subject_message_id, status, diagnostic_label), @@ -510,7 +532,9 @@ def send_msg_and_await_reception_status_sync( self, s2_msg: S2Message, timeout_reception_status: float = 5.0, raise_on_error: bool = True ) -> ReceptionStatus: return asyncio.run_coroutine_threadsafe( - self.send_msg_and_await_reception_status_async(s2_msg, timeout_reception_status, raise_on_error), + self.send_msg_and_await_reception_status_async( + s2_msg, timeout_reception_status, raise_on_error + ), self._eventloop, ).result() From 00ceb69384c78b62e1d1ef3109ca3ac2bfe71a1c Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Mon, 24 Mar 2025 17:33:21 +0100 Subject: [PATCH 8/9] fix: the subject_message_id attribute signals an S2Message, too Signed-off-by: F.N. Claessen --- src/s2python/message.py | 2 +- tests/unit/message_test.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/s2python/message.py b/src/s2python/message.py index 8cb5f09..36031e9 100644 --- a/src/s2python/message.py +++ b/src/s2python/message.py @@ -101,6 +101,7 @@ InstructionStatusUpdate, PowerForecast, PowerMeasurement, + ReceptionStatus, ] S2MessageElement = Union[ @@ -127,7 +128,6 @@ PowerForecastValue, PowerRange, PowerValue, - ReceptionStatus, Role, Timer, Transition, diff --git a/tests/unit/message_test.py b/tests/unit/message_test.py index 20adaeb..f667306 100644 --- a/tests/unit/message_test.py +++ b/tests/unit/message_test.py @@ -38,7 +38,7 @@ def _test_import_s2_messages(self, module_name): assert hasattr( message, _class.__name__ ), f"{_class} should be importable from s2_python.message" - if "message_id" in _class.model_fields: + if "message_id" in _class.model_fields or "subject_message_id" in _class.model_fields: assert _class in get_args(message.S2Message), ( f"{_class} should be typed as a s2_python.message.S2Message", ) From 7978e7f527a59759bf95fc8770271357082c5fb3 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Tue, 25 Mar 2025 09:11:12 +0100 Subject: [PATCH 9/9] Revert "Switching from str to UUID" This reverts commit 01829a1ed5b25a833df1c891e32ea9b7c2c7f659. --- src/s2python/s2_connection.py | 64 +++++++++++------------------------ 1 file changed, 20 insertions(+), 44 deletions(-) diff --git a/src/s2python/s2_connection.py b/src/s2python/s2_connection.py index 897344c..cfc915e 100644 --- a/src/s2python/s2_connection.py +++ b/src/s2python/s2_connection.py @@ -36,7 +36,7 @@ @dataclass class AssetDetails: # pylint: disable=too-many-instance-attributes - resource_id: uuid.UUID + resource_id: str provides_forecast: bool provides_power_measurements: List[CommodityQuantity] @@ -51,13 +51,9 @@ class AssetDetails: # pylint: disable=too-many-instance-attributes firmware_version: Optional[str] = None serial_number: Optional[str] = None - def to_resource_manager_details( - self, control_types: List[S2ControlType] - ) -> ResourceManagerDetails: + def to_resource_manager_details(self, control_types: List[S2ControlType]) -> ResourceManagerDetails: return ResourceManagerDetails( - available_control_types=[ - control_type.get_protocol_control_type() for control_type in control_types - ], + available_control_types=[control_type.get_protocol_control_type() for control_type in control_types], currency=self.currency, firmware_version=self.firmware_version, instruction_processing_delay=self.instruction_processing_delay, @@ -93,7 +89,7 @@ async def run_async(self) -> None: self.status_is_send.set() await self.connection.respond_with_reception_status( - subject_message_id=self.subject_message_id, + subject_message_id=str(self.subject_message_id), status=ReceptionStatusValues.OK, diagnostic_label="Processed okay.", ) @@ -102,7 +98,7 @@ def run_sync(self) -> None: self.status_is_send.set() self.connection.respond_with_reception_status_sync( - subject_message_id=self.subject_message_id, + subject_message_id=str(self.subject_message_id), status=ReceptionStatusValues.OK, diagnostic_label="Processed okay.", ) @@ -159,7 +155,7 @@ def do_message() -> None: except Exception: if not send_okay.status_is_send.is_set(): await connection.respond_with_reception_status( - subject_message_id=msg.message_id, # type: ignore[attr-defined, union-attr] + subject_message_id=str(msg.message_id), # type: ignore[attr-defined, union-attr] status=ReceptionStatusValues.PERMANENT_ERROR, diagnostic_label=f"While processing message {msg.message_id} " # type: ignore[attr-defined, union-attr] # pylint: disable=line-too-long f"an unrecoverable error occurred.", @@ -298,9 +294,7 @@ async def wait_till_connection_restart() -> None: self._eventloop.create_task(wait_till_connection_restart()), ] - (done, pending) = await asyncio.wait( - background_tasks, return_when=asyncio.FIRST_COMPLETED - ) + (done, pending) = await asyncio.wait(background_tasks, return_when=asyncio.FIRST_COMPLETED) if self._current_control_type: self._current_control_type.deactivate(self) self._current_control_type = None @@ -333,9 +327,7 @@ async def _connect_ws(self) -> None: connection_kwargs["ssl"].verify_mode = ssl.CERT_NONE if self._bearer_token: - connection_kwargs["additional_headers"] = { - "Authorization": f"Bearer {self._bearer_token}" - } + connection_kwargs["additional_headers"] = {"Authorization": f"Bearer {self._bearer_token}"} self.ws = await ws_connect(uri=self.url, **connection_kwargs) except (EOFError, OSError) as e: @@ -343,21 +335,15 @@ async def _connect_ws(self) -> None: async def _connect_as_rm(self) -> None: await self.send_msg_and_await_reception_status_async( - Handshake( - message_id=uuid.uuid4(), role=self.role, supported_protocol_versions=[S2_VERSION] - ) + Handshake(message_id=uuid.uuid4(), role=self.role, supported_protocol_versions=[S2_VERSION]) ) logger.debug("Send handshake to CEM. Expecting Handshake and HandshakeResponse from CEM.") await self._handle_received_messages() - async def handle_handshake( - self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None] - ) -> None: + async def handle_handshake(self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None]) -> None: if not isinstance(message, Handshake): - logger.error( - "Handler for Handshake received a message of the wrong type: %s", type(message) - ) + logger.error("Handler for Handshake received a message of the wrong type: %s", type(message)) return logger.debug( @@ -401,12 +387,8 @@ async def handle_select_control_type_as_rm( logger.debug("CEM selected control type %s. Activating control type.", message.control_type) - control_types_by_protocol_name = { - c.get_protocol_control_type(): c for c in self.control_types - } - selected_control_type: Optional[S2ControlType] = control_types_by_protocol_name.get( - message.control_type - ) + control_types_by_protocol_name = {c.get_protocol_control_type(): c for c in self.control_types} + selected_control_type: Optional[S2ControlType] = control_types_by_protocol_name.get(message.control_type) if self._current_control_type is not None: await self._eventloop.run_in_executor(None, self._current_control_type.deactivate, self) @@ -424,9 +406,7 @@ async def _receive_messages(self) -> None: to any calls of `send_msg_and_await_reception_status`. """ if self.ws is None: - raise RuntimeError( - "Cannot receive messages if websocket connection is not yet established." - ) + raise RuntimeError("Cannot receive messages if websocket connection is not yet established.") logger.info("S2 connection has started to receive messages.") @@ -436,7 +416,7 @@ async def _receive_messages(self) -> None: except json.JSONDecodeError: await self._send_and_forget( ReceptionStatus( - subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), + subject_message_id="00000000-0000-0000-0000-000000000000", status=ReceptionStatusValues.INVALID_DATA, diagnostic_label="Not valid json.", ) @@ -452,7 +432,7 @@ async def _receive_messages(self) -> None: ) else: await self.respond_with_reception_status( - subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), + subject_message_id="00000000-0000-0000-0000-000000000000", status=ReceptionStatusValues.INVALID_DATA, diagnostic_label="Message appears valid json but could not find a message_id field.", ) @@ -470,9 +450,7 @@ async def _receive_messages(self) -> None: async def _send_and_forget(self, s2_msg: S2Message) -> None: if self.ws is None: - raise RuntimeError( - "Cannot send messages if websocket connection is not yet established." - ) + raise RuntimeError("Cannot send messages if websocket connection is not yet established.") json_msg = s2_msg.to_json() logger.debug("Sending message %s", json_msg) @@ -483,7 +461,7 @@ async def _send_and_forget(self, s2_msg: S2Message) -> None: self._restart_connection_event.set() async def respond_with_reception_status( - self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str + self, subject_message_id: str, status: ReceptionStatusValues, diagnostic_label: str ) -> None: logger.debug("Responding to message %s with status %s", subject_message_id, status) await self._send_and_forget( @@ -495,7 +473,7 @@ async def respond_with_reception_status( ) def respond_with_reception_status_sync( - self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str + self, subject_message_id: str, status: ReceptionStatusValues, diagnostic_label: str ) -> None: asyncio.run_coroutine_threadsafe( self.respond_with_reception_status(subject_message_id, status, diagnostic_label), @@ -532,9 +510,7 @@ def send_msg_and_await_reception_status_sync( self, s2_msg: S2Message, timeout_reception_status: float = 5.0, raise_on_error: bool = True ) -> ReceptionStatus: return asyncio.run_coroutine_threadsafe( - self.send_msg_and_await_reception_status_async( - s2_msg, timeout_reception_status, raise_on_error - ), + self.send_msg_and_await_reception_status_async(s2_msg, timeout_reception_status, raise_on_error), self._eventloop, ).result()