Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions src/s2python/message.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,98 @@
from typing import Union

from s2python.frbc import (
FRBCActuatorDescription,
FRBCActuatorStatus,
FRBCFillLevelTargetProfile,
FRBCFillLevelTargetProfileElement,
FRBCInstruction,
FRBCLeakageBehaviour,
FRBCLeakageBehaviourElement,
FRBCOperationMode,
FRBCOperationModeElement,
FRBCStorageDescription,
FRBCStorageStatus,
FRBCSystemDescription,
FRBCTimerStatus,
FRBCUsageForecast
FRBCUsageForecast,
FRBCUsageForecastElement,
)
from s2python.ppbc import (
PPBCEndInterruptionInstruction,
PPBCPowerProfileDefinition,
PPBCPowerSequenceContainer,
PPBCPowerSequence,
PPBCPowerProfileStatus,
PPBCPowerSequenceContainerStatus,
PPBCPowerSequenceElement,
PPBCScheduleInstruction,
PPBCStartInterruptionInstruction,
)

from s2python.common import (
Duration,
Handshake,
HandshakeResponse,
InstructionStatusUpdate,
NumberRange,
PowerForecast,
PowerForecastElement,
PowerForecastValue,
PowerMeasurement,
PowerRange,
PowerValue,
ReceptionStatus,
ResourceManagerDetails,
RevokeObject,
Role,
SelectControlType,
SessionRequest
SessionRequest,
Timer,
Transition,
)

S2Message = Union[
FRBCActuatorDescription,
FRBCActuatorStatus,
FRBCFillLevelTargetProfile,
FRBCFillLevelTargetProfileElement,
FRBCInstruction,
FRBCLeakageBehaviour,
FRBCLeakageBehaviourElement,
FRBCOperationMode,
FRBCOperationModeElement,
FRBCStorageDescription,
FRBCStorageStatus,
FRBCSystemDescription,
FRBCTimerStatus,
FRBCUsageForecast,
FRBCUsageForecastElement,
PPBCEndInterruptionInstruction,
PPBCPowerProfileDefinition,
PPBCPowerSequenceContainer,
PPBCPowerSequence,
PPBCPowerProfileStatus,
PPBCPowerSequenceContainerStatus,
PPBCPowerSequenceElement,
PPBCScheduleInstruction,
PPBCStartInterruptionInstruction,
Duration,
Handshake,
HandshakeResponse,
InstructionStatusUpdate,
NumberRange,
PowerForecast,
PowerForecastElement,
PowerForecastValue,
PowerMeasurement,
PowerRange,
PowerValue,
ReceptionStatus,
ResourceManagerDetails,
RevokeObject,
Role,
SelectControlType,
SessionRequest,
Timer,
Transition,
]
1 change: 1 addition & 0 deletions src/s2python/ppbc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
PPBCPowerSequenceContainerStatus,
)
from s2python.ppbc.ppbc_power_sequence_element import PPBCPowerSequenceElement
from s2python.ppbc.ppbc_start_interruption_instruction import PPBCStartInterruptionInstruction
56 changes: 56 additions & 0 deletions tests/unit/message_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import unittest

import importlib
import inspect
import pkgutil

from s2python import message
from s2python.validate_values_mixin import S2MessageComponent


class S2MessageTest(unittest.TestCase):
"""Check importing S2Message classes from s2_python.message."""

def _test_import_s2_messages(self, module_name):
"""Check each S2MessageComponent subclass in the given module is importable."""
module = importlib.import_module(module_name)

# Find all submodules
all_subclasses = []
for _, name, _ in pkgutil.iter_modules(module.__path__, module.__name__ + "."):
submodule = importlib.import_module(name)

# Find all classes in the submodule that subclass BaseClass
subclasses = [
obj
for _, obj in inspect.getmembers(submodule, inspect.isclass)
if issubclass(obj, S2MessageComponent) and obj is not S2MessageComponent
]
all_subclasses.extend(subclasses)

# Ensure we found at least one subclass
self.assertGreater(
len(all_subclasses), 0, f"No subclasses found in {module_name}"
)

for _class in all_subclasses:
assert hasattr(
message, _class.__name__
), f"{_class} should be importable from s2_python.message"

def test_import_s2_messages__common(self):
self._test_import_s2_messages("s2python.common")

@unittest.skip("Work in progress")
def test_import_s2_messages__ddbc(self):
self._test_import_s2_messages("s2python.ddbc")

def test_import_s2_messages__frbc(self):
self._test_import_s2_messages("s2python.frbc")

@unittest.skip("Work in progress")
def test_import_s2_messages__pebc(self):
self._test_import_s2_messages("s2python.pebc")

def test_import_s2_messages__ppbc(self):
self._test_import_s2_messages("s2python.ppbc")