Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/s2python/ddbc/ddbc_operation_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class DDBCOperationMode(GenDDBCOperationMode, S2MessageComponent["DDBCOperationM
# ? Id vs id
id: uuid.UUID = GenDDBCOperationMode.model_fields["Id"] # type: ignore[assignment]
power_ranges: List[PowerRange] = GenDDBCOperationMode.model_fields["power_ranges"] # type: ignore[assignment]
supply_ranges: List[NumberRange] = GenDDBCOperationMode.model_fields["supply_ranges"] # type: ignore[assignment]
supply_range: List[NumberRange] = GenDDBCOperationMode.model_fields["supply_range"] # type: ignore[assignment]
abnormal_condition_only: bool = GenDDBCOperationMode.model_fields[
"abnormal_condition_only"
] # type: ignore[assignment]
32 changes: 32 additions & 0 deletions src/s2python/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,25 @@
PPBCScheduleInstruction,
PPBCStartInterruptionInstruction,
)
from s2python.ddbc import (
DDBCActuatorDescription,
DDBCActuatorStatus,
DDBCAverageDemandRateForecast,
DDBCAverageDemandRateForecastElement,
DDBCInstruction,
DDBCOperationMode,
DDBCSystemDescription,
DDBCTimerStatus,
)

from s2python.pebc import (
PEBCAllowedLimitRange,
PEBCEnergyConstraint,
PEBCInstruction,
PEBCPowerConstraints,
PEBCPowerEnvelope,
PEBCPowerEnvelopeElement,
)
from s2python.common import (
Duration,
Handshake,
Expand Down Expand Up @@ -76,6 +94,20 @@
PPBCPowerSequenceElement,
PPBCScheduleInstruction,
PPBCStartInterruptionInstruction,
PEBCAllowedLimitRange,
PEBCEnergyConstraint,
PEBCInstruction,
PEBCPowerConstraints,
PEBCPowerEnvelope,
PEBCPowerEnvelopeElement,
DDBCActuatorDescription,
DDBCActuatorStatus,
DDBCAverageDemandRateForecast,
DDBCAverageDemandRateForecastElement,
DDBCInstruction,
DDBCOperationMode,
DDBCSystemDescription,
DDBCTimerStatus,
Duration,
Handshake,
HandshakeResponse,
Expand Down
2 changes: 1 addition & 1 deletion src/s2python/pebc/pebc_instruction.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ class PEBCInstruction(GenPEBCInstruction, S2MessageComponent["PEBCInstruction"])
power_envelopes: List[PEBCPowerEnvelope] = [
GenPEBCInstruction.model_fields["power_envelopes"] # type: ignore[assignment]
]
abnormal_conditions: bool = GenPEBCInstruction.model_fields["abnormal_conditions"] # type: ignore[assignment]
abnormal_condition: bool = GenPEBCInstruction.model_fields["abnormal_condition"] # type: ignore[assignment]
57 changes: 17 additions & 40 deletions src/s2python/s2_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,9 @@ class AssetDetails: # pylint: disable=too-many-instance-attributes
firmware_version: Optional[str] = None
serial_number: Optional[str] = None

def to_resource_manager_details(
self, control_types: List[S2ControlType]
) -> ResourceManagerDetails:
def to_resource_manager_details(self, control_types: List[S2ControlType]) -> ResourceManagerDetails:
return ResourceManagerDetails(
available_control_types=[
control_type.get_protocol_control_type() for control_type in control_types
],
available_control_types=[control_type.get_protocol_control_type() for control_type in control_types],
currency=self.currency,
firmware_version=self.firmware_version,
instruction_processing_delay=self.instruction_processing_delay,
Expand Down Expand Up @@ -210,7 +206,7 @@ def __init__( # pylint: disable=too-many-arguments
asset_details: AssetDetails,
reconnect: bool = False,
verify_certificate: bool = True,
bearer_token: Optional[str] = None
bearer_token: Optional[str] = None,
) -> None:
self.url = url
self.reconnect = reconnect
Expand Down Expand Up @@ -254,8 +250,7 @@ def stop(self) -> None:
"""
if threading.current_thread() == self._thread:
raise RuntimeError(
"Do not call stop from the thread running the S2 connection. This results in an "
"infinite block!"
"Do not call stop from the thread running the S2 connection. This results in an infinite block!"
)
if self._eventloop.is_running():
asyncio.run_coroutine_threadsafe(self._do_stop(), self._eventloop).result()
Expand Down Expand Up @@ -299,9 +294,7 @@ async def wait_till_connection_restart() -> None:
self._eventloop.create_task(wait_till_connection_restart()),
]

(done, pending) = await asyncio.wait(
background_tasks, return_when=asyncio.FIRST_COMPLETED
)
(done, pending) = await asyncio.wait(background_tasks, return_when=asyncio.FIRST_COMPLETED)
if self._current_control_type:
self._current_control_type.deactivate(self)
self._current_control_type = None
Expand Down Expand Up @@ -329,34 +322,28 @@ async def _connect_ws(self) -> None:
# set up connection arguments for SSL and bearer token, if required
connection_kwargs: Dict[str, Any] = {}
if self.url.startswith("wss://") and not self._verify_certificate:
connection_kwargs['ssl'] = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
connection_kwargs['ssl'].check_hostname = False
connection_kwargs['ssl'].verify_mode = ssl.CERT_NONE
connection_kwargs["ssl"] = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
connection_kwargs["ssl"].check_hostname = False
connection_kwargs["ssl"].verify_mode = ssl.CERT_NONE

if self._bearer_token:
connection_kwargs['additional_headers'] = {"Authorization": f"Bearer {self._bearer_token}"}
connection_kwargs["additional_headers"] = {"Authorization": f"Bearer {self._bearer_token}"}

self.ws = await ws_connect(uri=self.url, **connection_kwargs)
except (EOFError, OSError) as e:
logger.info("Could not connect due to: %s", str(e))

async def _connect_as_rm(self) -> None:
await self.send_msg_and_await_reception_status_async(
Handshake(
message_id=uuid.uuid4(), role=self.role, supported_protocol_versions=[S2_VERSION]
)
Handshake(message_id=uuid.uuid4(), role=self.role, supported_protocol_versions=[S2_VERSION])
)
logger.debug("Send handshake to CEM. Expecting Handshake and HandshakeResponse from CEM.")

await self._handle_received_messages()

async def handle_handshake(
self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None]
) -> None:
async def handle_handshake(self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None]) -> None:
if not isinstance(message, Handshake):
logger.error(
"Handler for Handshake received a message of the wrong type: %s", type(message)
)
logger.error("Handler for Handshake received a message of the wrong type: %s", type(message))
return

logger.debug(
Expand Down Expand Up @@ -400,12 +387,8 @@ async def handle_select_control_type_as_rm(

logger.debug("CEM selected control type %s. Activating control type.", message.control_type)

control_types_by_protocol_name = {
c.get_protocol_control_type(): c for c in self.control_types
}
selected_control_type: Optional[S2ControlType] = control_types_by_protocol_name.get(
message.control_type
)
control_types_by_protocol_name = {c.get_protocol_control_type(): c for c in self.control_types}
selected_control_type: Optional[S2ControlType] = control_types_by_protocol_name.get(message.control_type)

if self._current_control_type is not None:
await self._eventloop.run_in_executor(None, self._current_control_type.deactivate, self)
Expand All @@ -423,9 +406,7 @@ async def _receive_messages(self) -> None:
to any calls of `send_msg_and_await_reception_status`.
"""
if self.ws is None:
raise RuntimeError(
"Cannot receive messages if websocket connection is not yet established."
)
raise RuntimeError("Cannot receive messages if websocket connection is not yet established.")

logger.info("S2 connection has started to receive messages.")

Expand Down Expand Up @@ -469,9 +450,7 @@ async def _receive_messages(self) -> None:

async def _send_and_forget(self, s2_msg: S2Message) -> None:
if self.ws is None:
raise RuntimeError(
"Cannot send messages if websocket connection is not yet established."
)
raise RuntimeError("Cannot send messages if websocket connection is not yet established.")

json_msg = s2_msg.to_json()
logger.debug("Sending message %s", json_msg)
Expand Down Expand Up @@ -531,9 +510,7 @@ def send_msg_and_await_reception_status_sync(
self, s2_msg: S2Message, timeout_reception_status: float = 5.0, raise_on_error: bool = True
) -> ReceptionStatus:
return asyncio.run_coroutine_threadsafe(
self.send_msg_and_await_reception_status_async(
s2_msg, timeout_reception_status, raise_on_error
),
self.send_msg_and_await_reception_status_async(s2_msg, timeout_reception_status, raise_on_error),
self._eventloop,
).result()

Expand Down
4 changes: 1 addition & 3 deletions src/s2python/s2_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,7 @@ def parse_as_any_message(unparsed_message: Union[dict, str, bytes]) -> S2Message
return TYPE_TO_MESSAGE_CLASS[message_type].model_validate(message_json)

@staticmethod
def parse_as_message(
unparsed_message: Union[dict, str, bytes], as_message: Type[M]
) -> M:
def parse_as_message(unparsed_message: Union[dict, str, bytes], as_message: Type[M]) -> M:
"""Parse the message to a specific S2 python message.

:param unparsed_message: The message as a JSON-formatted string or as a JSON-parsed dictionary.
Expand Down
2 changes: 0 additions & 2 deletions tests/unit/message_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,12 @@ def _test_import_s2_messages(self, module_name):
def test_import_s2_messages__common(self):
self._test_import_s2_messages("s2python.common")

@unittest.skip("Work in progress")
def test_import_s2_messages__ddbc(self):
self._test_import_s2_messages("s2python.ddbc")

def test_import_s2_messages__frbc(self):
self._test_import_s2_messages("s2python.frbc")

@unittest.skip("Work in progress")
def test_import_s2_messages__pebc(self):
self._test_import_s2_messages("s2python.pebc")

Expand Down