diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 59e9847..23f3b8d 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -60,4 +60,4 @@ jobs: - name: Test with pytest run: | source .venv/bin/activate - pytest + pytest -v diff --git a/aura/__init__.py b/aura/__init__.py index a3e07f0..eb077ce 100644 --- a/aura/__init__.py +++ b/aura/__init__.py @@ -2,7 +2,6 @@ from dotenv import load_dotenv from aura.aura import SignalingServer, VideoStreamer - load_dotenv() if not os.environ.get("STORAGE_PATH"): os.makedirs("output", exist_ok=True) diff --git a/aura/queue/__init__.py b/aura/queue/__init__.py new file mode 100644 index 0000000..26c1f2c --- /dev/null +++ b/aura/queue/__init__.py @@ -0,0 +1,4 @@ +from .queue import QueueManager, Priority, UserState, QueueError + +__version__ = "0.1.0" +__all__ = ["QueueManager", "Priority", "UserState", "QueueError"] diff --git a/aura/queue/queue.py b/aura/queue/queue.py new file mode 100644 index 0000000..4fdb35d --- /dev/null +++ b/aura/queue/queue.py @@ -0,0 +1,147 @@ +from dataclasses import dataclass +from enum import Enum +import time +from typing import Dict, List, Optional, Deque +from collections import deque +import json + +class QueueError(Exception): + pass + +class UserState(Enum): + WAITING = "WAITING" + CONNECTING = "CONNECTING" + CONNECTED = "CONNECTED" + DISCONNECTED = "DISCONNECTED" + TIMEOUT = "TIMEOUT" + ERROR = "ERROR" + +class Priority(Enum): + HIGH = 3 + NORMAL = 2 + LOW = 1 + + @classmethod + def high(cls): + return cls.HIGH + + @classmethod + def normal(cls): + return cls.NORMAL + + @classmethod + def low(cls): + return cls.LOW + +@dataclass +class QueuedUser: + id: str + state: UserState + join_time: float + priority: Priority + last_activity: float + connection_attempts: int + metadata: Dict[str, str] + + def __init__(self, id: str, priority: Optional[Priority] = None): + self.id = id + self.state = UserState.WAITING + self.join_time = time.time() + self.priority = priority or Priority.NORMAL + self.last_activity = time.time() + self.connection_attempts = 0 + self.metadata = {} + +class QueueStats: + def __init__(self): + self.total_users = 0 + self.waiting_users = 0 + self.connected_users = 0 + self.average_wait_time = 0 + self.max_wait_time = 0 + +class QueueManager: + def __init__(self, max_session_minutes: int = 5, max_queue_size: int = 100, max_reconnect_attempts: int = 3): + self.queue: Deque[QueuedUser] = deque() + self.current_user: Optional[QueuedUser] = None + self.max_session_time = max_session_minutes * 60 + self.max_queue_size = max_queue_size + self.max_reconnect_attempts = max_reconnect_attempts + self.user_timeouts: Dict[str, float] = {} + self.stats = QueueStats() + + def add_to_queue(self, user_id: str, priority: Optional[Priority] = None) -> int: + if len(self.queue) >= self.max_queue_size: + raise QueueError("Queue is full") + + if any(u.id == user_id for u in self.queue): + raise QueueError(f"User {user_id} already in queue") + + user = QueuedUser(user_id, priority) + self._update_stats(user) + + insert_pos = len(self.queue) + for i, u in enumerate(self.queue): + if u.priority.value <= user.priority.value: + insert_pos = i + break + + self.queue.insert(insert_pos, user) + return insert_pos + 1 + + def remove_from_queue(self, user_id: str) -> bool: + for i, user in enumerate(self.queue): + if user.id == user_id: + self.queue.remove(user) + return True + return False + + def update_user_state(self, user_id: str, new_state: UserState): + for user in self.queue: + if user.id == user_id: + if not self._is_valid_state_transition(user.state, new_state): + raise QueueError(f"Invalid state transition from {user.state} to {new_state}") + user.state = new_state + user.last_activity = time.time() + return + raise QueueError(f"User {user_id} not found") + + def cleanup_timeouts(self) -> List[str]: + timeout_duration = 30 + current_time = time.time() + timed_out = [] + + self.queue = deque([user for user in self.queue if not ( + current_time - user.last_activity > timeout_duration and + not timed_out.append(user.id) + )]) + + return timed_out + + def get_queue_stats(self) -> str: + return json.dumps({ + "total_users": self.stats.total_users, + "waiting_users": self.stats.waiting_users, + "connected_users": self.stats.connected_users, + "average_wait_time": self.stats.average_wait_time, + "max_wait_time": self.stats.max_wait_time + }) + + def _update_stats(self, user: QueuedUser): + self.stats.total_users += 1 + self.stats.waiting_users = len(self.queue) + wait_time = time.time() - user.join_time + self.stats.average_wait_time = (self.stats.average_wait_time + wait_time) / 2 + if wait_time > self.stats.max_wait_time: + self.stats.max_wait_time = wait_time + + def _is_valid_state_transition(self, from_state: UserState, to_state: UserState) -> bool: + valid_transitions = { + UserState.WAITING: [UserState.CONNECTING], + UserState.CONNECTING: [UserState.CONNECTED, UserState.ERROR], + UserState.CONNECTED: [UserState.DISCONNECTED, UserState.TIMEOUT], + UserState.DISCONNECTED: [], + UserState.TIMEOUT: [], + UserState.ERROR: [] + } + return to_state in valid_transitions.get(from_state, []) diff --git a/aura/tests/test_queue.py b/aura/tests/test_queue.py new file mode 100644 index 0000000..3f0fdb0 --- /dev/null +++ b/aura/tests/test_queue.py @@ -0,0 +1,70 @@ +import pytest +import time +from aura.queue import QueueManager, Priority, UserState, QueueError + +@pytest.fixture +def queue_manager(): + return QueueManager(5, 100, 3) + +def test_queue_initialization(queue_manager): + assert queue_manager.max_session_time == 300 + assert queue_manager.max_queue_size == 100 + assert queue_manager.max_reconnect_attempts == 3 + +def test_add_to_queue(queue_manager): + position = queue_manager.add_to_queue("user1", Priority.normal()) + assert position == 1 + +def test_priority_ordering(queue_manager): + queue_manager.add_to_queue("user1", Priority.normal()) + queue_manager.add_to_queue("user2", Priority.high()) + queue_manager.add_to_queue("user3", Priority.low()) + + queue_list = list(queue_manager.queue) + assert queue_list[0].id == "user2" # High priority first + assert queue_list[1].id == "user1" # Normal priority second + assert queue_list[2].id == "user3" # Low priority last + +def test_duplicate_user(queue_manager): + queue_manager.add_to_queue("user1", None) + with pytest.raises(QueueError) as exc_info: + queue_manager.add_to_queue("user1", None) + assert "already in queue" in str(exc_info.value).lower() + +def test_queue_full(): + queue = QueueManager(5, 2, 3) + queue.add_to_queue("user1", None) + queue.add_to_queue("user2", None) + with pytest.raises(QueueError) as exc_info: + queue.add_to_queue("user3", None) + assert "queue is full" in str(exc_info.value).lower() + +def test_state_transitions(queue_manager): + queue_manager.add_to_queue("user1", None) + queue_manager.update_user_state("user1", UserState.CONNECTING) + queue_manager.update_user_state("user1", UserState.CONNECTED) + queue_manager.update_user_state("user1", UserState.DISCONNECTED) + +def test_cleanup_timeouts(queue_manager): + queue_manager.add_to_queue("user1", None) + # Force timeout by waiting + time.sleep(31) + timed_out = queue_manager.cleanup_timeouts() + assert len(timed_out) == 1 + assert timed_out[0] == "user1" + +def test_remove_from_queue(queue_manager): + queue_manager.add_to_queue("user1", None) + assert queue_manager.remove_from_queue("user1") is True + assert queue_manager.remove_from_queue("nonexistent") is False + +def test_invalid_state_transition(queue_manager): + queue_manager.add_to_queue("user1", None) + with pytest.raises(QueueError) as exc_info: + queue_manager.update_user_state("user1", UserState.DISCONNECTED) + assert "invalid state transition" in str(exc_info.value).lower() + +def test_user_not_found(queue_manager): + with pytest.raises(QueueError) as exc_info: + queue_manager.update_user_state("nonexistent", UserState.CONNECTED) + assert "not found" in str(exc_info.value).lower() diff --git a/aura/tests/test_signaling.py b/aura/tests/test_signaling.py index d216243..72d0d6e 100644 --- a/aura/tests/test_signaling.py +++ b/aura/tests/test_signaling.py @@ -19,10 +19,21 @@ def unused_tcp_port(): def signaling_server(unused_tcp_port): server = SignalingServer(port=unused_tcp_port) server.start() - # Allow some time for the server to start - time.sleep(1) + + async def is_server_ready(): + uri = f'ws://localhost:{unused_tcp_port}/signaling' + for _ in range(10): + try: + async with websockets.connect(uri): + return True + except (ConnectionRefusedError, OSError): + await asyncio.sleep(0.1) + return False + + if not asyncio.run(is_server_ready()): + raise RuntimeError("Server did not start in time") + yield server - # No explicit cleanup needed as the server runs in a separate thread @pytest.fixture async def websocket_clients(signaling_server, unused_tcp_port): @@ -35,7 +46,6 @@ async def websocket_clients(signaling_server, unused_tcp_port): client2 = await websockets.connect(uri) yield client1, client2 finally: - # Cleanup if client1: await client1.close() if client2: