Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions aioping/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import random
import platform
import os
from typing import Optional, Tuple, Union

logger = logging.getLogger("aioping")
default_timer = time.perf_counter
Expand All @@ -105,8 +106,10 @@
proto_icmp = socket.getprotobyname("icmp")
proto_icmp6 = socket.getprotobyname("ipv6-icmp")

SocketAddress = Union[Tuple[str, int], Tuple[str, int, int, int]]

def checksum(buffer):

def checksum(buffer: bytes) -> int:
"""
I'm not too confident that this is right but testing seems
to suggest that it gives the same answers as in_cksum in ping.c
Expand Down Expand Up @@ -138,7 +141,7 @@ def checksum(buffer):
return answer


async def receive_one_ping(my_socket, id_, timeout):
async def receive_one_ping(my_socket: socket.socket, id_: int, timeout: Union[int, float]) -> float:
"""
receive the ping from the socket.
:param my_socket:
Expand All @@ -153,7 +156,7 @@ async def receive_one_ping(my_socket, id_, timeout):
while True:
rec_packet = await loop.sock_recv(my_socket, 1024)

# No IP Header when unpriviledged on Linux
# No IP Header when unprivileged on Linux
has_ip_header = (
(os.name != "posix")
or (platform.system() == "Darwin")
Expand All @@ -176,7 +179,7 @@ async def receive_one_ping(my_socket, id_, timeout):
if type != ICMP_ECHO_REPLY and type != ICMP6_ECHO_REPLY:
continue

if not has_ip_header:
if not has_ip_header:
# When unprivileged on Linux, ICMP ID is rewrited by kernel
# According to https://stackoverflow.com/a/14023878/4528364
id_ = int.from_bytes(my_socket.getsockname()[1].to_bytes(2, "big"), "little")
Expand All @@ -195,9 +198,9 @@ async def receive_one_ping(my_socket, id_, timeout):
raise TimeoutError("Ping timeout")


def sendto_ready(packet, socket, future, dest):
def sendto_ready(packet: bytes, socket: socket.socket, future: asyncio.Future, dest_addr: SocketAddress):
try:
socket.sendto(packet, dest)
socket.sendto(packet, dest_addr)
except (BlockingIOError, InterruptedError):
return # The callback will be retried
except Exception as exc:
Expand All @@ -208,13 +211,12 @@ def sendto_ready(packet, socket, future, dest):
future.set_result(None)


async def send_one_ping(my_socket, dest_addr, id_, timeout, family):
async def send_one_ping(my_socket: socket.socket, dest_addr: SocketAddress, id_: int, family: Optional[socket.AddressFamily] = None):
"""
Send one ping to the given >dest_addr<.
:param my_socket:
:param dest_addr:
:param id_:
:param timeout:
:return:
"""
icmp_type = ICMP_ECHO_REQUEST if family == socket.AddressFamily.AF_INET\
Expand All @@ -240,13 +242,13 @@ async def send_one_ping(my_socket, dest_addr, id_, timeout, family):
packet = header + data

future = asyncio.get_event_loop().create_future()
callback = functools.partial(sendto_ready, packet=packet, socket=my_socket, dest=dest_addr, future=future)
callback = functools.partial(sendto_ready, packet=packet, socket=my_socket, dest_addr=dest_addr, future=future)
asyncio.get_event_loop().add_writer(my_socket, callback)

await future


async def ping(dest_addr, timeout=10, family=None):
async def ping(dest_addr: Union[str, bytes], timeout: Union[int, float] = 10.0, family: Optional[socket.AddressFamily] = None) -> float:
"""
Returns either the delay (in seconds) or raises an exception.
:param dest_addr:
Expand Down Expand Up @@ -292,14 +294,14 @@ async def ping(dest_addr, timeout=10, family=None):

my_id = uuid.uuid4().int & 0xFFFF

await send_one_ping(my_socket, addr, my_id, timeout, family)
await send_one_ping(my_socket, addr, my_id, family)
delay = await receive_one_ping(my_socket, my_id, timeout)
my_socket.close()

return delay


async def verbose_ping(dest_addr, timeout=2, count=3, family=None):
async def verbose_ping(dest_addr: Union[str, bytes], timeout=2.0, count=3, family: Optional[socket.AddressFamily] = None):
"""
Send >count< ping to >dest_addr< with the given >timeout< and display
the result.
Expand All @@ -313,7 +315,7 @@ async def verbose_ping(dest_addr, timeout=2, count=3, family=None):

try:
delay = await ping(dest_addr, timeout, family)
except TimeoutError as e:
except TimeoutError:
logger.error("%s timed out after %ss" % (dest_addr, timeout))
except Exception as e:
logger.error("%s failed: %s" % (dest_addr, str(e)))
Expand Down