Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 54 additions & 38 deletions src/py/extra/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,38 +354,48 @@ async def Serve(
app: Application,
options: ServerOptions = ServerOptions(),
) -> None:
"""Main server coroutine."""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
server.bind((options.host, options.port))
# The argument is the backlog of connections that will be accepted before
# they are refused.
server.listen(options.backlog)
# This is what we need to use it with asyncio
"""Main server coroutine (Optimized)."""

loop = asyncio.get_running_loop()

# Use socket.create_server (Python 3.8+)
# Handles IPv6/IPv4, reuse_port, and cleanup automatically.
server = socket.create_server(
(options.host, options.port),
family=socket.AF_INET,
backlog=options.backlog,
reuse_port=True,
dualstack_ipv6=False
)
server.setblocking(False)
server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

tasks: set[asyncio.Task[None]] = set()
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()

# Manage server state
state = ServerState()
# Registers handlers for signals and exception (so that we log them). Note
# that we'll get a `set_wakeup_fd only works in main thread of the main interpreter`
# when this is not run out of the main thread.

# Signal Handling
# We need to cancel the current task to break out of the await loop.sock_accept
main_task = asyncio.current_task(loop)

def signal_handler():
state.stop()
if main_task:
main_task.cancel()

if (
options.stopSignals
and threading.current_thread() is threading.main_thread()
):
loop.add_signal_handler(SIGINT, lambda: state.stop())
loop.add_signal_handler(SIGTERM, lambda: state.stop())
for sig in (SIGINT, SIGTERM):
try:
loop.add_signal_handler(sig, signal_handler)
except NotImplementedError:
pass

loop.set_exception_handler(state.onException)

info(
"Extra AIO Server listening",
"Extra AIO Server listening (Optimized)",
icon="🚀",
Host=options.host,
Port=options.port,
Expand All @@ -395,35 +405,41 @@ async def Serve(
while state.isRunning:
if options.condition and not options.condition():
break

try:
res = await asyncio.wait_for(
loop.sock_accept(server), timeout=options.polling or 1.0
)
if res is None:
continue
else:
client = res[0]
# NOTE: Should do something with the tasks
# Direct await - no wait_for. This removes the race condition causing InvalidStateError.
client, _ = await loop.sock_accept(server)

task = loop.create_task(
cls.OnRequest(app, client, loop=loop, options=options)
)
tasks.add(task)
task.add_done_callback(tasks.discard)
except asyncio.TimeoutError:
continue

except asyncio.CancelledError:
state.stop()
break
except OSError as e:
# This can be: [OSError] [Errno 24] Too many open files
if e.errno == 24:
# Implement backpressure or wait mechanism here
await asyncio.sleep(0.1) # Short delay before retrying
if e.errno == 24: # Too many open files
warning("Too many open files. Pausing accept loop for 1s.")
await asyncio.sleep(1.0)
else:
exception(e)
await asyncio.sleep(0.1)
except Exception as e:
exception(e)

finally:
server.close()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)

if tasks:
info(f"Cancelling {len(tasks)} active tasks...")
for task in tasks:
task.cancel()

await asyncio.gather(*tasks, return_exceptions=True)

info("Server shutdown complete.")


def run(
Expand Down
Loading