diff --git a/aioping/__init__.py b/aioping/__init__.py index 9221d91..070809b 100644 --- a/aioping/__init__.py +++ b/aioping/__init__.py @@ -88,6 +88,7 @@ import random import platform import os +from typing import Optional, Tuple, Union logger = logging.getLogger("aioping") default_timer = time.perf_counter @@ -105,8 +106,10 @@ proto_icmp = socket.getprotobyname("icmp") proto_icmp6 = socket.getprotobyname("ipv6-icmp") +SocketAddress = Union[Tuple[str, int], Tuple[str, int, int, int]] -def checksum(buffer): + +def checksum(buffer: bytes) -> int: """ I'm not too confident that this is right but testing seems to suggest that it gives the same answers as in_cksum in ping.c @@ -138,7 +141,7 @@ def checksum(buffer): return answer -async def receive_one_ping(my_socket, id_, timeout): +async def receive_one_ping(my_socket: socket.socket, id_: int, timeout: Union[int, float]) -> float: """ receive the ping from the socket. :param my_socket: @@ -153,7 +156,7 @@ async def receive_one_ping(my_socket, id_, timeout): while True: rec_packet = await loop.sock_recv(my_socket, 1024) - # No IP Header when unpriviledged on Linux + # No IP Header when unprivileged on Linux has_ip_header = ( (os.name != "posix") or (platform.system() == "Darwin") @@ -176,7 +179,7 @@ async def receive_one_ping(my_socket, id_, timeout): if type != ICMP_ECHO_REPLY and type != ICMP6_ECHO_REPLY: continue - if not has_ip_header: + if not has_ip_header: # When unprivileged on Linux, ICMP ID is rewrited by kernel # According to https://stackoverflow.com/a/14023878/4528364 id_ = int.from_bytes(my_socket.getsockname()[1].to_bytes(2, "big"), "little") @@ -195,9 +198,9 @@ async def receive_one_ping(my_socket, id_, timeout): raise TimeoutError("Ping timeout") -def sendto_ready(packet, socket, future, dest): +def sendto_ready(packet: bytes, socket: socket.socket, future: asyncio.Future, dest_addr: SocketAddress): try: - socket.sendto(packet, dest) + socket.sendto(packet, dest_addr) except (BlockingIOError, InterruptedError): return # The callback will be retried except Exception as exc: @@ -208,13 +211,12 @@ def sendto_ready(packet, socket, future, dest): future.set_result(None) -async def send_one_ping(my_socket, dest_addr, id_, timeout, family): +async def send_one_ping(my_socket: socket.socket, dest_addr: SocketAddress, id_: int, family: Optional[socket.AddressFamily] = None): """ Send one ping to the given >dest_addr<. :param my_socket: :param dest_addr: :param id_: - :param timeout: :return: """ icmp_type = ICMP_ECHO_REQUEST if family == socket.AddressFamily.AF_INET\ @@ -240,13 +242,13 @@ async def send_one_ping(my_socket, dest_addr, id_, timeout, family): packet = header + data future = asyncio.get_event_loop().create_future() - callback = functools.partial(sendto_ready, packet=packet, socket=my_socket, dest=dest_addr, future=future) + callback = functools.partial(sendto_ready, packet=packet, socket=my_socket, dest_addr=dest_addr, future=future) asyncio.get_event_loop().add_writer(my_socket, callback) await future -async def ping(dest_addr, timeout=10, family=None): +async def ping(dest_addr: Union[str, bytes], timeout: Union[int, float] = 10.0, family: Optional[socket.AddressFamily] = None) -> float: """ Returns either the delay (in seconds) or raises an exception. :param dest_addr: @@ -292,14 +294,14 @@ async def ping(dest_addr, timeout=10, family=None): my_id = uuid.uuid4().int & 0xFFFF - await send_one_ping(my_socket, addr, my_id, timeout, family) + await send_one_ping(my_socket, addr, my_id, family) delay = await receive_one_ping(my_socket, my_id, timeout) my_socket.close() return delay -async def verbose_ping(dest_addr, timeout=2, count=3, family=None): +async def verbose_ping(dest_addr: Union[str, bytes], timeout=2.0, count=3, family: Optional[socket.AddressFamily] = None): """ Send >count< ping to >dest_addr< with the given >timeout< and display the result. @@ -313,7 +315,7 @@ async def verbose_ping(dest_addr, timeout=2, count=3, family=None): try: delay = await ping(dest_addr, timeout, family) - except TimeoutError as e: + except TimeoutError: logger.error("%s timed out after %ss" % (dest_addr, timeout)) except Exception as e: logger.error("%s failed: %s" % (dest_addr, str(e)))