Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/example_frbc_rm.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def start_s2_session(url, client_node_id=str(uuid.uuid4())):

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="A simple S2 reseource manager example.")
parser.add_argument('endpoint', type=str, help="WebSocket endpoint uri for the server (CEM) e.g. ws://localhost:8080/backend/rm/s2python-frbc/cem/dummy_model/ws")
parser.add_argument('endpoint', type=str, help="WebSocket endpoint uri for the server (CEM) e.h. ws://localhost:8080/websocket/s2/my-first-websocket-rm")
args = parser.parse_args()

start_s2_session(args.endpoint)
37 changes: 37 additions & 0 deletions examples/example_with_pairing_frbc_rm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import argparse
import re
import uuid
import logging

from example_frbc_rm import start_s2_session
from s2python.s2_pairing import PairingDetails, S2Pairing
from s2python.generated.gen_s2_pairing import S2NodeDescription, Deployment
from s2python.generated.gen_s2 import EnergyManagementRole

logger = logging.getLogger("s2python")


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="A simple S2 reseource manager example.")
parser.add_argument('endpoint', type=str, help="Rest endpoint to start S2 pairing. E.g. https://localhost/requestPairing")
parser.add_argument('pairing_token', type=str, help="The pairing toekn for teh endpoint. You should get this from the S2 server e.g. ca14fda4")
args = parser.parse_args()

nodeDescription: S2NodeDescription = S2NodeDescription(brand="TNO",
logoUri = "https://www.tno.nl/publish/pages/5604/tno-logo-1484x835_003_.jpg",
type = "demo frbc example",
modelName = "S2 pairing example stub",
userDefinedName = "TNO S2 pairing example for frbc",
role = EnergyManagementRole.RM,
deployment = Deployment.LAN)
client_node_id: str = str(uuid.uuid4())

pairing: S2Pairing = S2Pairing(request_pairing_endpoint = args.endpoint,
token = args.pairing_token,
s2_client_node_description = nodeDescription,
client_node_id = client_node_id )

logger.info(f'Pairing details: \n{pairing.pairing_details}')
#print(pairing.pairing_details)

start_s2_session(pairing.pairing_details.connection_details.connectionUri)
14 changes: 8 additions & 6 deletions src/s2python/generated/gen_s2_pairing.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@
from typing import List

from pydantic import BaseModel, ConfigDict, Field
from s2python.common import EnergyManagementRole as S2Role



class Deployment(Enum):
class S2Role(str, Enum):
CEM = 'CEM'
RM = 'RM'

class Deployment(str, Enum):
WAN = 'WAN'
LAN = 'LAN'


class Protocols(Enum):
class Protocols(str, Enum):
WebSocketSecure = 'WebSocketSecure'


Expand Down Expand Up @@ -50,7 +52,7 @@ class PairingRequest(BaseModel):
token: str
publicKey: str
s2ClientNodeId: str
s2ClientNodeDescription: str
s2ClientNodeDescription: S2NodeDescription
supportedProtocols: List[Protocols]


Expand All @@ -59,7 +61,7 @@ class PairingResponse(BaseModel):
extra='forbid',
)
s2ServerNodeId: str
serverNodeDescription: str
serverNodeDescription: S2NodeDescription
requestConnectionUri: str


Expand Down
16 changes: 8 additions & 8 deletions src/s2python/s2_pairing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Tuple, Union
import requests

from jwskate import JweCompact, Jwk
from jwskate import JweCompact, Jwk, Jwt
from binapy.binapy import BinaPy

from s2python.generated.gen_s2_pairing import (Protocols,
Expand Down Expand Up @@ -77,19 +77,18 @@ def _pair(self) -> None:
self._paring_timestamp = datetime.datetime.now()

rsa_key_pair = Jwk.generate_for_alg(KEY_ALGORITHM).with_kid_thumbprint()

pairing_request: PairingRequest = PairingRequest(token=self._token,
publicKey=rsa_key_pair.public_jwk().to_pem(),
s2ClientNodeId=self._client_node_id,
s2ClientNodeDescription=self._s2_client_node_description,
supportedProtocols=self._supported_protocols)

response = requests.post(self._request_pairing_endpoint,
json=pairing_request.model_dump_json(),
timeout=REQTEST_TIMEOUT,
json = pairing_request.dict(),
timeout = REQTEST_TIMEOUT,
verify = self._verify_certificate)
response.raise_for_status()
pairing_response: PairingResponse = PairingResponse.parse_raw(response.json())
pairing_response: PairingResponse = PairingResponse.parse_raw(response.text)

connection_request: ConnectionRequest = ConnectionRequest(s2ClientNodeId=self._client_node_id,
supportedProtocols=self._supported_protocols)
Expand All @@ -102,12 +101,13 @@ def _pair(self) -> None:
'requestConnection')

response = requests.post(restest_pairing_uri,
json=connection_request.model_dump_json(),
timeout=REQTEST_TIMEOUT,
json = connection_request.dict(),
timeout = REQTEST_TIMEOUT,
verify = self._verify_certificate)
response.raise_for_status()
connection_details: ConnectionDetails = ConnectionDetails.parse_raw(response.json())
connection_details: ConnectionDetails = ConnectionDetails.parse_raw(response.text)
challenge = JweCompact(connection_details.challenge).decrypt(rsa_key_pair)
challenge = Jwt.unprotected(JweCompact(connection_details.challenge).decrypt(rsa_key_pair))
self._pairing_details = PairingDetails(pairing_response, connection_details, challenge)


Expand Down
5 changes: 4 additions & 1 deletion src/s2python/validate_values_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ def inner(*args: List[Any], **kwargs: Dict[str, Any]) -> Any:
return inner


def catch_and_convert_exceptions(input_class: Type[S2MessageComponent[B_co]]) -> Type[S2MessageComponent[B_co]]:
S = TypeVar("S", bound=S2MessageComponent)


def catch_and_convert_exceptions(input_class: Type[S]) -> Type[S]:
input_class.__init__ = convert_to_s2exception(input_class.__init__) # type: ignore[method-assign]
input_class.__setattr__ = convert_to_s2exception(input_class.__setattr__) # type: ignore[method-assign]
input_class.model_validate_json = convert_to_s2exception( # type: ignore[method-assign]
Expand Down
Loading