diff --git a/src/py/extra/server.py b/src/py/extra/server.py index e1cd13b..c7503a3 100644 --- a/src/py/extra/server.py +++ b/src/py/extra/server.py @@ -354,38 +354,48 @@ async def Serve( app: Application, options: ServerOptions = ServerOptions(), ) -> None: - """Main server coroutine.""" - server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - server.bind((options.host, options.port)) - # The argument is the backlog of connections that will be accepted before - # they are refused. - server.listen(options.backlog) - # This is what we need to use it with asyncio + """Main server coroutine (Optimized).""" + + loop = asyncio.get_running_loop() + + # Use socket.create_server (Python 3.8+) + # Handles IPv6/IPv4, reuse_port, and cleanup automatically. + server = socket.create_server( + (options.host, options.port), + family=socket.AF_INET, + backlog=options.backlog, + reuse_port=True, + dualstack_ipv6=False + ) server.setblocking(False) + server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) tasks: set[asyncio.Task[None]] = set() - try: - loop = asyncio.get_running_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - - # Manage server state state = ServerState() - # Registers handlers for signals and exception (so that we log them). Note - # that we'll get a `set_wakeup_fd only works in main thread of the main interpreter` - # when this is not run out of the main thread. + + # Signal Handling + # We need to cancel the current task to break out of the await loop.sock_accept + main_task = asyncio.current_task(loop) + + def signal_handler(): + state.stop() + if main_task: + main_task.cancel() + if ( options.stopSignals and threading.current_thread() is threading.main_thread() ): - loop.add_signal_handler(SIGINT, lambda: state.stop()) - loop.add_signal_handler(SIGTERM, lambda: state.stop()) + for sig in (SIGINT, SIGTERM): + try: + loop.add_signal_handler(sig, signal_handler) + except NotImplementedError: + pass + loop.set_exception_handler(state.onException) info( - "Extra AIO Server listening", + "Extra AIO Server listening (Optimized)", icon="🚀", Host=options.host, Port=options.port, @@ -395,35 +405,41 @@ async def Serve( while state.isRunning: if options.condition and not options.condition(): break + try: - res = await asyncio.wait_for( - loop.sock_accept(server), timeout=options.polling or 1.0 - ) - if res is None: - continue - else: - client = res[0] - # NOTE: Should do something with the tasks + # Direct await - no wait_for. This removes the race condition causing InvalidStateError. + client, _ = await loop.sock_accept(server) + task = loop.create_task( cls.OnRequest(app, client, loop=loop, options=options) ) tasks.add(task) task.add_done_callback(tasks.discard) - except asyncio.TimeoutError: - continue + + except asyncio.CancelledError: + state.stop() + break except OSError as e: - # This can be: [OSError] [Errno 24] Too many open files - if e.errno == 24: - # Implement backpressure or wait mechanism here - await asyncio.sleep(0.1) # Short delay before retrying + if e.errno == 24: # Too many open files + warning("Too many open files. Pausing accept loop for 1s.") + await asyncio.sleep(1.0) else: exception(e) + await asyncio.sleep(0.1) + except Exception as e: + exception(e) finally: server.close() - for task in tasks: - task.cancel() - await asyncio.gather(*tasks, return_exceptions=True) + + if tasks: + info(f"Cancelling {len(tasks)} active tasks...") + for task in tasks: + task.cancel() + + await asyncio.gather(*tasks, return_exceptions=True) + + info("Server shutdown complete.") def run(