From b7ad1cd5dbe95061407ac3ec2d16b47ed8b8483d Mon Sep 17 00:00:00 2001 From: Jordan Danford Date: Thu, 2 Nov 2023 22:04:11 -0700 Subject: [PATCH 1/3] Add type annotations, remove `timeout` parameter from `send_one_ping()` --- aioping/__init__.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/aioping/__init__.py b/aioping/__init__.py index 9221d91..242083c 100644 --- a/aioping/__init__.py +++ b/aioping/__init__.py @@ -88,6 +88,7 @@ import random import platform import os +import typing logger = logging.getLogger("aioping") default_timer = time.perf_counter @@ -106,7 +107,7 @@ proto_icmp6 = socket.getprotobyname("ipv6-icmp") -def checksum(buffer): +def checksum(buffer: bytes) -> int: """ I'm not too confident that this is right but testing seems to suggest that it gives the same answers as in_cksum in ping.c @@ -138,7 +139,7 @@ def checksum(buffer): return answer -async def receive_one_ping(my_socket, id_, timeout): +async def receive_one_ping(my_socket: socket.socket, id_: int, timeout: float) -> float: """ receive the ping from the socket. :param my_socket: @@ -153,7 +154,7 @@ async def receive_one_ping(my_socket, id_, timeout): while True: rec_packet = await loop.sock_recv(my_socket, 1024) - # No IP Header when unpriviledged on Linux + # No IP Header when unprivileged on Linux has_ip_header = ( (os.name != "posix") or (platform.system() == "Darwin") @@ -176,7 +177,7 @@ async def receive_one_ping(my_socket, id_, timeout): if type != ICMP_ECHO_REPLY and type != ICMP6_ECHO_REPLY: continue - if not has_ip_header: + if not has_ip_header: # When unprivileged on Linux, ICMP ID is rewrited by kernel # According to https://stackoverflow.com/a/14023878/4528364 id_ = int.from_bytes(my_socket.getsockname()[1].to_bytes(2, "big"), "little") @@ -195,7 +196,7 @@ async def receive_one_ping(my_socket, id_, timeout): raise TimeoutError("Ping timeout") -def sendto_ready(packet, socket, future, dest): +def sendto_ready(packet: bytes, socket: socket.socket, future: asyncio.Future, dest: socket._Address): try: socket.sendto(packet, dest) except (BlockingIOError, InterruptedError): @@ -208,13 +209,12 @@ def sendto_ready(packet, socket, future, dest): future.set_result(None) -async def send_one_ping(my_socket, dest_addr, id_, timeout, family): +async def send_one_ping(my_socket: socket.socket, dest_addr: socket._Address, id_: int, family: typing.Optional[socket.AddressFamily] = None): """ Send one ping to the given >dest_addr<. :param my_socket: :param dest_addr: :param id_: - :param timeout: :return: """ icmp_type = ICMP_ECHO_REQUEST if family == socket.AddressFamily.AF_INET\ @@ -246,7 +246,7 @@ async def send_one_ping(my_socket, dest_addr, id_, timeout, family): await future -async def ping(dest_addr, timeout=10, family=None): +async def ping(dest_addr: str | bytes, timeout=10.0, family: typing.Optional[socket.AddressFamily] = None) -> float: """ Returns either the delay (in seconds) or raises an exception. :param dest_addr: @@ -292,14 +292,14 @@ async def ping(dest_addr, timeout=10, family=None): my_id = uuid.uuid4().int & 0xFFFF - await send_one_ping(my_socket, addr, my_id, timeout, family) + await send_one_ping(my_socket, addr, my_id, family) delay = await receive_one_ping(my_socket, my_id, timeout) my_socket.close() return delay -async def verbose_ping(dest_addr, timeout=2, count=3, family=None): +async def verbose_ping(dest_addr: str | bytes, timeout=2.0, count=3, family: typing.Union[socket.AddressFamily, None] = None): """ Send >count< ping to >dest_addr< with the given >timeout< and display the result. @@ -313,7 +313,7 @@ async def verbose_ping(dest_addr, timeout=2, count=3, family=None): try: delay = await ping(dest_addr, timeout, family) - except TimeoutError as e: + except TimeoutError: logger.error("%s timed out after %ss" % (dest_addr, timeout)) except Exception as e: logger.error("%s failed: %s" % (dest_addr, str(e))) From f0e3aef3ebc42710931926cbd42a44bd8f111f1e Mon Sep 17 00:00:00 2001 From: Jordan Danford Date: Sat, 4 Nov 2023 10:15:29 -0700 Subject: [PATCH 2/3] Add `SocketAddress` type --- aioping/__init__.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/aioping/__init__.py b/aioping/__init__.py index 242083c..3f59004 100644 --- a/aioping/__init__.py +++ b/aioping/__init__.py @@ -88,7 +88,7 @@ import random import platform import os -import typing +from typing import Optional, Tuple, Union logger = logging.getLogger("aioping") default_timer = time.perf_counter @@ -106,6 +106,8 @@ proto_icmp = socket.getprotobyname("icmp") proto_icmp6 = socket.getprotobyname("ipv6-icmp") +SocketAddress = Union[Tuple[str, int], Tuple[str, int, int, int]] + def checksum(buffer: bytes) -> int: """ @@ -196,9 +198,9 @@ async def receive_one_ping(my_socket: socket.socket, id_: int, timeout: float) - raise TimeoutError("Ping timeout") -def sendto_ready(packet: bytes, socket: socket.socket, future: asyncio.Future, dest: socket._Address): +def sendto_ready(packet: bytes, socket: socket.socket, future: asyncio.Future, dest_addr: SocketAddress): try: - socket.sendto(packet, dest) + socket.sendto(packet, dest_addr) except (BlockingIOError, InterruptedError): return # The callback will be retried except Exception as exc: @@ -209,7 +211,7 @@ def sendto_ready(packet: bytes, socket: socket.socket, future: asyncio.Future, d future.set_result(None) -async def send_one_ping(my_socket: socket.socket, dest_addr: socket._Address, id_: int, family: typing.Optional[socket.AddressFamily] = None): +async def send_one_ping(my_socket: socket.socket, dest_addr: SocketAddress, id_: int, family: Optional[socket.AddressFamily] = None): """ Send one ping to the given >dest_addr<. :param my_socket: @@ -240,13 +242,13 @@ async def send_one_ping(my_socket: socket.socket, dest_addr: socket._Address, id packet = header + data future = asyncio.get_event_loop().create_future() - callback = functools.partial(sendto_ready, packet=packet, socket=my_socket, dest=dest_addr, future=future) + callback = functools.partial(sendto_ready, packet=packet, socket=my_socket, dest_addr=dest_addr, future=future) asyncio.get_event_loop().add_writer(my_socket, callback) await future -async def ping(dest_addr: str | bytes, timeout=10.0, family: typing.Optional[socket.AddressFamily] = None) -> float: +async def ping(dest_addr: Union[str, bytes], timeout=10.0, family: Optional[socket.AddressFamily] = None) -> float: """ Returns either the delay (in seconds) or raises an exception. :param dest_addr: @@ -299,7 +301,7 @@ async def ping(dest_addr: str | bytes, timeout=10.0, family: typing.Optional[soc return delay -async def verbose_ping(dest_addr: str | bytes, timeout=2.0, count=3, family: typing.Union[socket.AddressFamily, None] = None): +async def verbose_ping(dest_addr: Union[str, bytes], timeout=2.0, count=3, family: Optional[socket.AddressFamily] = None): """ Send >count< ping to >dest_addr< with the given >timeout< and display the result. From bde60b515eb7b44afca070303dcc304e13b9b713 Mon Sep 17 00:00:00 2001 From: Jordan Danford Date: Fri, 10 Nov 2023 12:06:04 -0700 Subject: [PATCH 3/3] Allow int for timeout parameter --- aioping/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aioping/__init__.py b/aioping/__init__.py index 3f59004..070809b 100644 --- a/aioping/__init__.py +++ b/aioping/__init__.py @@ -141,7 +141,7 @@ def checksum(buffer: bytes) -> int: return answer -async def receive_one_ping(my_socket: socket.socket, id_: int, timeout: float) -> float: +async def receive_one_ping(my_socket: socket.socket, id_: int, timeout: Union[int, float]) -> float: """ receive the ping from the socket. :param my_socket: @@ -248,7 +248,7 @@ async def send_one_ping(my_socket: socket.socket, dest_addr: SocketAddress, id_: await future -async def ping(dest_addr: Union[str, bytes], timeout=10.0, family: Optional[socket.AddressFamily] = None) -> float: +async def ping(dest_addr: Union[str, bytes], timeout: Union[int, float] = 10.0, family: Optional[socket.AddressFamily] = None) -> float: """ Returns either the delay (in seconds) or raises an exception. :param dest_addr: