Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions src/s2python/s2_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def __init__(self, connection: "S2Connection", subject_message_id: uuid.UUID):
async def run_async(self) -> None:
self.status_is_send.set()

await self.connection.respond_with_reception_status(
await self.connection._respond_with_reception_status( # pylint: disable=protected-access
subject_message_id=self.subject_message_id,
status=ReceptionStatusValues.OK,
diagnostic_label="Processed okay.",
Expand Down Expand Up @@ -168,7 +168,7 @@ def do_message() -> None:
await eventloop.run_in_executor(executor=None, func=do_message)
except Exception:
if not send_okay.status_is_send.is_set():
await connection.respond_with_reception_status(
await connection._respond_with_reception_status( # pylint: disable=protected-access
subject_message_id=msg.message_id, # type: ignore[attr-defined, union-attr]
status=ReceptionStatusValues.PERMANENT_ERROR,
diagnostic_label=f"While processing message {msg.message_id} " # type: ignore[attr-defined, union-attr] # pylint: disable=line-too-long
Expand Down Expand Up @@ -241,10 +241,10 @@ def __init__( # pylint: disable=too-many-arguments
self._verify_certificate = verify_certificate

self._handlers.register_handler(
SelectControlType, self.handle_select_control_type_as_rm
SelectControlType, self._handle_select_control_type_as_rm
)
self._handlers.register_handler(Handshake, self.handle_handshake)
self._handlers.register_handler(HandshakeResponse, self.handle_handshake_response_as_rm)
self._handlers.register_handler(Handshake, self._handle_handshake)
self._handlers.register_handler(HandshakeResponse, self._handle_handshake_response_as_rm)
self._bearer_token = bearer_token

def start_as_rm(self) -> None:
Expand Down Expand Up @@ -359,7 +359,7 @@ async def _connect_ws(self) -> None:
logger.info("Could not connect due to: %s", str(e))

async def _connect_as_rm(self) -> None:
await self.send_msg_and_await_reception_status_async(
await self._send_msg_and_await_reception_status_async(
Handshake(
message_id=uuid.uuid4(),
role=self.role,
Expand All @@ -372,7 +372,7 @@ async def _connect_as_rm(self) -> None:

await self._handle_received_messages()

async def handle_handshake(
async def _handle_handshake(
self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None]
) -> None:
if not isinstance(message, Handshake):
Expand All @@ -389,7 +389,7 @@ async def handle_handshake(
)
await send_okay

async def handle_handshake_response_as_rm(
async def _handle_handshake_response_as_rm(
self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None]
) -> None:
if not isinstance(message, HandshakeResponse):
Expand All @@ -407,11 +407,11 @@ async def handle_handshake_response_as_rm(
await send_okay
logger.debug("Handshake complete. Sending first ResourceManagerDetails.")

await self.send_msg_and_await_reception_status_async(
await self._send_msg_and_await_reception_status_async(
self.asset_details.to_resource_manager_details(self.control_types)
)

async def handle_select_control_type_as_rm(
async def _handle_select_control_type_as_rm(
self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None]
) -> None:
if not isinstance(message, SelectControlType):
Expand Down Expand Up @@ -476,13 +476,13 @@ async def _receive_messages(self) -> None:
json_msg = json.loads(message)
message_id = json_msg.get("message_id")
if message_id:
await self.respond_with_reception_status(
await self._respond_with_reception_status(
subject_message_id=message_id,
status=ReceptionStatusValues.INVALID_MESSAGE,
diagnostic_label=str(e),
)
else:
await self.respond_with_reception_status(
await self._respond_with_reception_status(
subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"),
status=ReceptionStatusValues.INVALID_DATA,
diagnostic_label="Message appears valid json but could not find a message_id field.",
Expand Down Expand Up @@ -513,7 +513,7 @@ async def _send_and_forget(self, s2_msg: S2Message) -> None:
logger.error("Unable to send message %s due to %s", s2_msg, str(e))
self._restart_connection_event.set()

async def respond_with_reception_status(
async def _respond_with_reception_status(
self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str
) -> None:
logger.debug(
Expand All @@ -531,13 +531,13 @@ def respond_with_reception_status_sync(
self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str
) -> None:
asyncio.run_coroutine_threadsafe(
self.respond_with_reception_status(
self._respond_with_reception_status(
subject_message_id, status, diagnostic_label
),
self._eventloop,
).result()

async def send_msg_and_await_reception_status_async(
async def _send_msg_and_await_reception_status_async(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the sync functions are supposed to be used in the ControlType implementations, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently yes. I am going to work on adding an async interface.

self,
s2_msg: S2Message,
timeout_reception_status: float = 5.0,
Expand Down Expand Up @@ -575,7 +575,7 @@ def send_msg_and_await_reception_status_sync(
raise_on_error: bool = True,
) -> ReceptionStatus:
return asyncio.run_coroutine_threadsafe(
self.send_msg_and_await_reception_status_async(
self._send_msg_and_await_reception_status_async(
s2_msg, timeout_reception_status, raise_on_error
),
self._eventloop,
Expand Down