Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ jobs:
- name: Test with pytest
run: |
source .venv/bin/activate
pytest
pytest -v
1 change: 0 additions & 1 deletion aura/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from dotenv import load_dotenv

from aura.aura import SignalingServer, VideoStreamer

load_dotenv()
if not os.environ.get("STORAGE_PATH"):
os.makedirs("output", exist_ok=True)
Expand Down
4 changes: 4 additions & 0 deletions aura/queue/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .queue import QueueManager, Priority, UserState, QueueError

__version__ = "0.1.0"
__all__ = ["QueueManager", "Priority", "UserState", "QueueError"]
147 changes: 147 additions & 0 deletions aura/queue/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
from dataclasses import dataclass
from enum import Enum
import time
from typing import Dict, List, Optional, Deque
from collections import deque
import json

class QueueError(Exception):
pass

class UserState(Enum):
WAITING = "WAITING"
CONNECTING = "CONNECTING"
CONNECTED = "CONNECTED"
DISCONNECTED = "DISCONNECTED"
TIMEOUT = "TIMEOUT"
ERROR = "ERROR"

class Priority(Enum):
HIGH = 3
NORMAL = 2
LOW = 1

@classmethod
def high(cls):
return cls.HIGH

@classmethod
def normal(cls):
return cls.NORMAL

@classmethod
def low(cls):
return cls.LOW

@dataclass
class QueuedUser:
id: str
state: UserState
join_time: float
priority: Priority
last_activity: float
connection_attempts: int
metadata: Dict[str, str]

def __init__(self, id: str, priority: Optional[Priority] = None):
self.id = id
self.state = UserState.WAITING
self.join_time = time.time()
self.priority = priority or Priority.NORMAL
self.last_activity = time.time()
self.connection_attempts = 0
self.metadata = {}

class QueueStats:
def __init__(self):
self.total_users = 0
self.waiting_users = 0
self.connected_users = 0
self.average_wait_time = 0
self.max_wait_time = 0

class QueueManager:
def __init__(self, max_session_minutes: int = 5, max_queue_size: int = 100, max_reconnect_attempts: int = 3):
self.queue: Deque[QueuedUser] = deque()
self.current_user: Optional[QueuedUser] = None
self.max_session_time = max_session_minutes * 60
self.max_queue_size = max_queue_size
self.max_reconnect_attempts = max_reconnect_attempts
self.user_timeouts: Dict[str, float] = {}
self.stats = QueueStats()

def add_to_queue(self, user_id: str, priority: Optional[Priority] = None) -> int:
if len(self.queue) >= self.max_queue_size:
raise QueueError("Queue is full")

if any(u.id == user_id for u in self.queue):
raise QueueError(f"User {user_id} already in queue")

user = QueuedUser(user_id, priority)
self._update_stats(user)

insert_pos = len(self.queue)
for i, u in enumerate(self.queue):
if u.priority.value <= user.priority.value:
insert_pos = i
break

self.queue.insert(insert_pos, user)
return insert_pos + 1

def remove_from_queue(self, user_id: str) -> bool:
for i, user in enumerate(self.queue):
if user.id == user_id:
self.queue.remove(user)
return True
return False

def update_user_state(self, user_id: str, new_state: UserState):
for user in self.queue:
if user.id == user_id:
if not self._is_valid_state_transition(user.state, new_state):
raise QueueError(f"Invalid state transition from {user.state} to {new_state}")
user.state = new_state
user.last_activity = time.time()
return
raise QueueError(f"User {user_id} not found")

def cleanup_timeouts(self) -> List[str]:
timeout_duration = 30
current_time = time.time()
timed_out = []

self.queue = deque([user for user in self.queue if not (
current_time - user.last_activity > timeout_duration and
not timed_out.append(user.id)
)])

return timed_out

def get_queue_stats(self) -> str:
return json.dumps({
"total_users": self.stats.total_users,
"waiting_users": self.stats.waiting_users,
"connected_users": self.stats.connected_users,
"average_wait_time": self.stats.average_wait_time,
"max_wait_time": self.stats.max_wait_time
})

def _update_stats(self, user: QueuedUser):
self.stats.total_users += 1
self.stats.waiting_users = len(self.queue)
wait_time = time.time() - user.join_time
self.stats.average_wait_time = (self.stats.average_wait_time + wait_time) / 2
if wait_time > self.stats.max_wait_time:
self.stats.max_wait_time = wait_time

def _is_valid_state_transition(self, from_state: UserState, to_state: UserState) -> bool:
valid_transitions = {
UserState.WAITING: [UserState.CONNECTING],
UserState.CONNECTING: [UserState.CONNECTED, UserState.ERROR],
UserState.CONNECTED: [UserState.DISCONNECTED, UserState.TIMEOUT],
UserState.DISCONNECTED: [],
UserState.TIMEOUT: [],
UserState.ERROR: []
}
return to_state in valid_transitions.get(from_state, [])
70 changes: 70 additions & 0 deletions aura/tests/test_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import pytest
import time
from aura.queue import QueueManager, Priority, UserState, QueueError

@pytest.fixture
def queue_manager():
return QueueManager(5, 100, 3)

def test_queue_initialization(queue_manager):
assert queue_manager.max_session_time == 300
assert queue_manager.max_queue_size == 100
assert queue_manager.max_reconnect_attempts == 3

def test_add_to_queue(queue_manager):
position = queue_manager.add_to_queue("user1", Priority.normal())
assert position == 1

def test_priority_ordering(queue_manager):
queue_manager.add_to_queue("user1", Priority.normal())
queue_manager.add_to_queue("user2", Priority.high())
queue_manager.add_to_queue("user3", Priority.low())

queue_list = list(queue_manager.queue)
assert queue_list[0].id == "user2" # High priority first
assert queue_list[1].id == "user1" # Normal priority second
assert queue_list[2].id == "user3" # Low priority last

def test_duplicate_user(queue_manager):
queue_manager.add_to_queue("user1", None)
with pytest.raises(QueueError) as exc_info:
queue_manager.add_to_queue("user1", None)
assert "already in queue" in str(exc_info.value).lower()

def test_queue_full():
queue = QueueManager(5, 2, 3)
queue.add_to_queue("user1", None)
queue.add_to_queue("user2", None)
with pytest.raises(QueueError) as exc_info:
queue.add_to_queue("user3", None)
assert "queue is full" in str(exc_info.value).lower()

def test_state_transitions(queue_manager):
queue_manager.add_to_queue("user1", None)
queue_manager.update_user_state("user1", UserState.CONNECTING)
queue_manager.update_user_state("user1", UserState.CONNECTED)
queue_manager.update_user_state("user1", UserState.DISCONNECTED)

def test_cleanup_timeouts(queue_manager):
queue_manager.add_to_queue("user1", None)
# Force timeout by waiting
time.sleep(31)
timed_out = queue_manager.cleanup_timeouts()
assert len(timed_out) == 1
assert timed_out[0] == "user1"

def test_remove_from_queue(queue_manager):
queue_manager.add_to_queue("user1", None)
assert queue_manager.remove_from_queue("user1") is True
assert queue_manager.remove_from_queue("nonexistent") is False

def test_invalid_state_transition(queue_manager):
queue_manager.add_to_queue("user1", None)
with pytest.raises(QueueError) as exc_info:
queue_manager.update_user_state("user1", UserState.DISCONNECTED)
assert "invalid state transition" in str(exc_info.value).lower()

def test_user_not_found(queue_manager):
with pytest.raises(QueueError) as exc_info:
queue_manager.update_user_state("nonexistent", UserState.CONNECTED)
assert "not found" in str(exc_info.value).lower()
18 changes: 14 additions & 4 deletions aura/tests/test_signaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,21 @@ def unused_tcp_port():
def signaling_server(unused_tcp_port):
server = SignalingServer(port=unused_tcp_port)
server.start()
# Allow some time for the server to start
time.sleep(1)

async def is_server_ready():
uri = f'ws://localhost:{unused_tcp_port}/signaling'
for _ in range(10):
try:
async with websockets.connect(uri):
return True
except (ConnectionRefusedError, OSError):
await asyncio.sleep(0.1)
return False

if not asyncio.run(is_server_ready()):
raise RuntimeError("Server did not start in time")

yield server
# No explicit cleanup needed as the server runs in a separate thread

@pytest.fixture
async def websocket_clients(signaling_server, unused_tcp_port):
Expand All @@ -35,7 +46,6 @@ async def websocket_clients(signaling_server, unused_tcp_port):
client2 = await websockets.connect(uri)
yield client1, client2
finally:
# Cleanup
if client1:
await client1.close()
if client2:
Expand Down
Loading