-
Notifications
You must be signed in to change notification settings - Fork 50
Add IoT Metrics Support #710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
bf5456e
d3d6352
21fe38d
13fe021
da7ed31
c167b43
2368721
c456258
03e5d98
9a9c89d
5f3868c
ec40b81
fb81ecc
0435c7f
3a7b509
a21e542
c51a273
2f31d9d
a6a3c30
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,7 +15,7 @@ | |
| from awscrt.http import HttpProxyOptions, HttpRequest | ||
| from awscrt.io import ClientBootstrap, ClientTlsContext, SocketOptions | ||
| from dataclasses import dataclass | ||
| from awscrt.mqtt5 import Client as Mqtt5Client | ||
| from awscrt.mqtt5 import Client as Mqtt5Client, SdkMetrics | ||
|
|
||
|
|
||
| class QoS(IntEnum): | ||
|
|
@@ -330,6 +330,10 @@ class Connection(NativeResource): | |
|
|
||
| proxy_options (Optional[awscrt.http.HttpProxyOptions]): | ||
| Optional proxy options for all connections. | ||
|
|
||
| enable_metrics (bool): If true, enable IoT SDK metrics in CONNECT packet username field, otherwise, disabled. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Trivial: I think we should work on the wording here. The initial sentence makes it sound like the default is false. Instead of the "If true" maybe we should just describe what the bool is for. "Enable or disable IoT SDK metrics in CONNECT packet username field." Also wondering if we should add a bit of extra text regarding why but that's debatable. e.g. "Our team uses these metrics to help determine what features and improvements to prioritize." We can leave the "Default to True" but I would remove the follow-up that provides a reason to set it to false. |
||
| Default to True. You may set it to false if you are not using AWS IoT services, and | ||
| using a custom authentication mechanism. | ||
| """ | ||
|
|
||
| def __init__(self, | ||
|
|
@@ -355,7 +359,8 @@ def __init__(self, | |
| proxy_options=None, | ||
| on_connection_success=None, | ||
| on_connection_failure=None, | ||
| on_connection_closed=None | ||
| on_connection_closed=None, | ||
| enable_metrics=True, | ||
| ): | ||
|
|
||
| assert isinstance(client, Client) or isinstance(client, Mqtt5Client) | ||
|
|
@@ -408,6 +413,10 @@ def __init__(self, | |
| self.password = password | ||
| self.socket_options = socket_options if socket_options else SocketOptions() | ||
| self.proxy_options = proxy_options if proxy_options else websocket_proxy_options | ||
| if enable_metrics: | ||
| self.metrics = SdkMetrics() | ||
| else: | ||
| self.metrics = None | ||
|
|
||
| self._binding = _awscrt.mqtt_client_connection_new( | ||
| self, | ||
|
|
@@ -524,7 +533,8 @@ def on_connect(error_code, return_code, session_present): | |
| self.password, | ||
| self.clean_session, | ||
| on_connect, | ||
| self.proxy_options | ||
| self.proxy_options, | ||
| self.metrics | ||
| ) | ||
|
|
||
| except Exception as e: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,18 @@ | |
| from inspect import signature | ||
|
|
||
|
|
||
| @dataclass | ||
| class SdkMetrics: | ||
| """ | ||
| Configuration for IoT SDK metrics that are embedded in MQTT username field. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Trivial/debatable: MQTT Connect Packet username field ? |
||
|
|
||
| Args: | ||
| library_name (str): The SDK library name (e.g., "IoTDeviceSDK/Python") | ||
|
|
||
| """ | ||
| library_name: str = "IoTDeviceSDK/Python" | ||
|
|
||
|
|
||
| class QoS(IntEnum): | ||
| """MQTT message delivery quality of service. | ||
|
|
||
|
|
@@ -1158,6 +1170,7 @@ class ConnectPacket: | |
| will_delay_interval_sec (int): A time interval, in seconds, that the server should wait (for a session reconnection) before sending the will message associated with the connection's session. If omitted or None, the server will send the will when the associated session is destroyed. If the session is destroyed before a will delay interval has elapsed, then the will must be sent at the time of session destruction. | ||
| will (PublishPacket): The definition of a message to be published when the connection's session is destroyed by the server or when the will delay interval has elapsed, whichever comes first. If None, then nothing will be sent. | ||
| user_properties (Sequence[UserProperty]): List of MQTT5 user properties included with the packet. | ||
|
|
||
| """ | ||
| keep_alive_interval_sec: int = None | ||
| client_id: str = None | ||
|
|
@@ -1338,6 +1351,8 @@ class ClientOptions: | |
| on_lifecycle_event_connection_success_fn (Callable[[LifecycleConnectSuccessData],]): Callback for Lifecycle Event Connection Success. | ||
| on_lifecycle_event_connection_failure_fn (Callable[[LifecycleConnectFailureData],]): Callback for Lifecycle Event Connection Failure. | ||
| on_lifecycle_event_disconnection_fn (Callable[[LifecycleDisconnectData],]): Callback for Lifecycle Event Disconnection. | ||
| enable_metrics (bool): If true, enable IoT SDK metrics in CONNECT packet username field, otherwise, disabled. Default to True. You may set it to false if you are not using AWS IoT services, and using a custom authentication mechanism. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as comment above in regards to rewording. |
||
|
|
||
| """ | ||
| host_name: str | ||
| port: int = None | ||
|
|
@@ -1364,6 +1379,7 @@ class ClientOptions: | |
| on_lifecycle_event_connection_success_fn: Callable[[LifecycleConnectSuccessData], None] = None | ||
| on_lifecycle_event_connection_failure_fn: Callable[[LifecycleConnectFailureData], None] = None | ||
| on_lifecycle_event_disconnection_fn: Callable[[LifecycleDisconnectData], None] = None | ||
| enable_metrics: bool = True | ||
|
|
||
|
|
||
| def _check_callback(callback): | ||
|
|
@@ -1392,6 +1408,7 @@ def __init__(self, client_options: ClientOptions): | |
| self._on_lifecycle_connection_failure_cb = _check_callback( | ||
| client_options.on_lifecycle_event_connection_failure_fn) | ||
| self._on_lifecycle_disconnection_cb = _check_callback(client_options.on_lifecycle_event_disconnection_fn) | ||
| self._enable_metrics = client_options.enable_metrics | ||
|
|
||
| def _ws_handshake_transform(self, http_request_binding, http_headers_binding, native_userdata): | ||
| if self._ws_handshake_transform_cb is None: | ||
|
|
@@ -1704,7 +1721,8 @@ def __init__( | |
| ping_timeout_ms: int, | ||
| keep_alive_secs: int, | ||
| ack_timeout_secs: int, | ||
| clean_session: int): | ||
| clean_session: int, | ||
| enable_metrics: bool): | ||
| self.host_name = host_name | ||
| self.port = port | ||
| self.client_id = "" if client_id is None else client_id | ||
|
|
@@ -1715,6 +1733,7 @@ def __init__( | |
| self.keep_alive_secs: int = 1200 if keep_alive_secs is None else keep_alive_secs | ||
| self.ack_timeout_secs: int = 0 if ack_timeout_secs is None else ack_timeout_secs | ||
| self.clean_session: bool = True if clean_session is None else clean_session | ||
| self.enable_metrics: bool = True if enable_metrics is None else enable_metrics | ||
|
|
||
|
|
||
| class Client(NativeResource): | ||
|
|
@@ -1728,7 +1747,6 @@ class Client(NativeResource): | |
| """ | ||
|
|
||
| def __init__(self, client_options: ClientOptions): | ||
|
|
||
| super().__init__() | ||
|
|
||
| core = _ClientCore(client_options) | ||
|
|
@@ -1746,6 +1764,12 @@ def __init__(self, client_options: ClientOptions): | |
| if not socket_options: | ||
| socket_options = SocketOptions() | ||
|
|
||
| # Handle metrics configuration | ||
| if client_options.enable_metrics: | ||
| self.metrics = SdkMetrics() | ||
| else: | ||
| self.metrics = None | ||
|
|
||
| if not connect_options.will: | ||
| is_will_none = True | ||
| will = PublishPacket() | ||
|
|
@@ -1785,6 +1809,8 @@ def __init__(self, client_options: ClientOptions): | |
| will.correlation_data_bytes or will.correlation_data, | ||
| will.content_type, | ||
| will.user_properties, | ||
| client_options.enable_metrics, | ||
| self.metrics.library_name if self.metrics else None, | ||
| client_options.session_behavior, | ||
| client_options.extended_validation_and_flow_control_options, | ||
| client_options.offline_queue_behavior, | ||
|
|
@@ -1811,7 +1837,8 @@ def __init__(self, client_options: ClientOptions): | |
| keep_alive_secs=connect_options.keep_alive_interval_sec, | ||
| ack_timeout_secs=client_options.ack_timeout_sec, | ||
| clean_session=( | ||
| client_options.session_behavior < ClientSessionBehaviorType.REJOIN_ALWAYS if client_options.session_behavior else True)) | ||
| client_options.session_behavior < ClientSessionBehaviorType.REJOIN_ALWAYS if client_options.session_behavior else True), | ||
| enable_metrics=client_options.enable_metrics) | ||
xiazhvera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def start(self): | ||
| """Notifies the MQTT5 client that you want it maintain connectivity to the configured endpoint. | ||
|
|
@@ -2043,5 +2070,6 @@ def new_connection(self, on_connection_interrupted=None, on_connection_resumed=N | |
| use_websockets=False, | ||
| websocket_proxy_options=None, | ||
| websocket_handshake_transform=None, | ||
| proxy_options=None | ||
| proxy_options=None, | ||
| enable_metrics=self.adapter_options.enable_metrics | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -831,6 +831,10 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) { | |
| PyObject *will_delay_interval_sec_py; /* optional uint32_t */ | ||
| PyObject *user_properties_py; /* optional */ | ||
|
|
||
| /* Metrics */ | ||
| PyObject *is_metrics_enabled_py; /* optional enable metrics */ | ||
| struct aws_byte_cursor metrics_library_name; /* optional IoT SDK metrics username */ | ||
|
|
||
| /* Will */ | ||
| PyObject *is_will_none_py; /* optional PublishPacket */ | ||
| PyObject *will_qos_val_py; | ||
|
|
@@ -862,7 +866,7 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) { | |
|
|
||
| if (!PyArg_ParseTuple( | ||
| args, | ||
| "Os#IOOOOz#Oz#z#OOOOOOOOOz*Oz#OOOz#z*z#OOOOOOOOOOOOOO", | ||
| "Os#IOOOOz#Oz#z#OOOOOOOOOz*Oz#OOOz#z*z#OOz#OOOOOOOOOOOOO", | ||
| /* O */ &self_py, | ||
| /* s */ &host_name.ptr, | ||
| /* # */ &host_name.len, | ||
|
|
@@ -904,6 +908,11 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) { | |
| /* # */ &will_content_type.len, | ||
| /* O */ &will_user_properties_py, | ||
|
|
||
| /* Metrics */ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Trivial: Why are we injecting the metrics into the middle here instead of adding it to the end? I feel like this has a good potential to need expansion as we add more metrics in which case it'd be easier/cleaner to add to the end instead of tracking down and continuing to add in the middle. |
||
| /* O */ &is_metrics_enabled_py, | ||
| /* z */ &metrics_library_name.ptr, | ||
| /* # */ &metrics_library_name.len, | ||
|
|
||
| /* O */ &session_behavior_py, | ||
| /* O */ &extended_validation_and_flow_control_options_py, | ||
| /* O */ &offline_queue_behavior_py, | ||
|
|
@@ -1279,6 +1288,14 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) { | |
| connect_options.will = &will; | ||
| } | ||
|
|
||
| /* METRICS */ | ||
| struct aws_mqtt_iot_sdk_metrics metrics_tmp; | ||
| AWS_ZERO_STRUCT(metrics_tmp); | ||
| if (PyObject_IsTrue(is_metrics_enabled_py)) { | ||
| metrics_tmp.library_name = metrics_library_name; | ||
| client_options.metrics = &metrics_tmp; | ||
| } | ||
|
|
||
| /* CALLBACKS */ | ||
|
|
||
| Py_INCREF(client_core_py); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -437,6 +437,39 @@ static void s_on_connect( | |
| PyGILState_Release(state); | ||
| } | ||
|
|
||
| /* If unsuccessful, false is returned and a Python error has been set */ | ||
| bool s_set_metrics(struct aws_mqtt_client_connection *connection, PyObject *metrics) { | ||
| assert(metrics && (metrics != Py_None)); | ||
|
|
||
| if (connection == NULL) { | ||
| return false; | ||
| } | ||
|
|
||
| bool success = false; | ||
|
|
||
| PyObject *library_name_py = PyObject_GetAttrString(metrics, "library_name"); | ||
| struct aws_byte_cursor library_name = aws_byte_cursor_from_pyunicode(library_name_py); | ||
| if (!library_name.ptr) { | ||
| PyErr_SetString(PyExc_TypeError, "metrics.library_name must be str type"); | ||
| goto done; | ||
| } | ||
|
|
||
| struct aws_mqtt_iot_sdk_metrics metrics_struct = { | ||
| .library_name = library_name, | ||
| }; | ||
|
|
||
| if (aws_mqtt_client_connection_set_metrics(connection, &metrics_struct)) { | ||
| PyErr_SetAwsLastError(); | ||
| goto done; | ||
| } | ||
|
|
||
| success = true; | ||
|
|
||
| done: | ||
| Py_DECREF(library_name_py); | ||
| return success; | ||
| } | ||
|
|
||
| /* If unsuccessful, false is returned and a Python error has been set */ | ||
| bool s_set_will(struct aws_mqtt_client_connection *connection, PyObject *will) { | ||
| assert(will && (will != Py_None)); | ||
|
|
@@ -668,9 +701,10 @@ PyObject *aws_py_mqtt_client_connection_connect(PyObject *self, PyObject *args) | |
| PyObject *is_clean_session; | ||
| PyObject *on_connect; | ||
| PyObject *proxy_options_py; | ||
| PyObject *metrics_py; | ||
| if (!PyArg_ParseTuple( | ||
| args, | ||
| "Os#s#IOOKKHIIOz#z#OOO", | ||
| "Os#s#IOOKKHIIOz#z#OOOO", | ||
| &impl_capsule, | ||
| &client_id, | ||
| &client_id_len, | ||
|
|
@@ -691,7 +725,8 @@ PyObject *aws_py_mqtt_client_connection_connect(PyObject *self, PyObject *args) | |
| &password_len, | ||
| &is_clean_session, | ||
| &on_connect, | ||
| &proxy_options_py)) { | ||
| &proxy_options_py, | ||
| &metrics_py)) { | ||
| return NULL; | ||
| } | ||
|
|
||
|
|
@@ -773,6 +808,13 @@ PyObject *aws_py_mqtt_client_connection_connect(PyObject *self, PyObject *args) | |
| } | ||
| } | ||
|
|
||
| // If metrics is None, we do not set metrics at all. | ||
| if (metrics_py != Py_None) { | ||
| if (!s_set_metrics(py_connection->native, metrics_py)) { | ||
| goto done; | ||
|
Comment on lines
+813
to
+814
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In crt-cpp, a failure on setting metrics is ignored. Here, it fails connection attempts. I think we should document the correct behavior and implement it everywhere.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is still inconsistency on dealing with a metrics failure between crt-cpp (just log failure and proceed with connection) and crt-python (fail connect attempt on failure). I think the cpp logic is the correct one because we don't want the metrics-related issues to break users.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me clarify the current state: The reason crt-cpp doesn't fail the connection is that it lacks a proper mechanism to propagate the configuration failure. In the other three SDKs (Python, Java, Node.js), I implemented it to fail the connection attempt when setting metrics fails. However, I think you raise a valid point. Since metrics are set internally by the SDK (not explicitly requested by the user), we probably shouldn't let metrics-related failures break the user's connection. The core functionality should remain working even if the optional metrics feature encounters issues. I will update it to keep it consistent with cpp.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm in agreement with the decision here. Let's not fail a connection attempt with anything related to metrics. At most we should log the issue but move on. All behavior should be identical across our SDKs. |
||
| } | ||
| } | ||
|
|
||
| if (on_connect != Py_None) { | ||
| Py_INCREF(on_connect); | ||
| py_connection->on_connect = on_connect; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of pulling SdkMetrics in from mqtt5, should we instead implement an mqtt3 version of SdkMetrics? It appears there's precedence for this with the double implementation of OperationStatisticsData. Unsure if this is a good or bad practice but it's one we've previously used. This could also potentially allow us to set different things by default for mqtt3 vs. mqtt5 or it could add confusion and a second place to update things when we make changes...