Skip to content

Feature/asyncapi websocket spec#55

Open
BrandonTrigueros wants to merge 10 commits intomainfrom
feature/asyncapi-websocket-spec
Open

Feature/asyncapi websocket spec#55
BrandonTrigueros wants to merge 10 commits intomainfrom
feature/asyncapi-websocket-spec

Conversation

@BrandonTrigueros
Copy link

@BrandonTrigueros BrandonTrigueros commented Jan 23, 2026

AsyncAPI WebSocket Specification + Consumer Implementation (TripConsumer + RouteConsumer)

Summary

This PR delivers the complete WebSocket real-time system for SIMOVI Infobus, from specification to production-ready implementation. It includes the AsyncAPI 3.0 specification with production features, full TripConsumer and RouteConsumer implementations with comprehensive tests, and automated demo systems with interactive Leaflet maps.

Phases Completed

Phase 1: AsyncAPI 3.0 Specification (Production Features)

Enhanced specification with:

  • /ws prefix for all WebSocket URLs
  • Error handling system (codes 4000, 4004, 4500) with connection closure
  • Application-level heartbeat/keep-alive mechanism (optional)
  • RouteVehicles scalability strategy (<50 vehicles per route)
  • 100% AsyncAPI 3.0 compliance (0 validation errors)

Phase 2a: GTFS Serializers (TURNO 1)

Complete serialization layer with:

  • serialize_trip_update() - TripUpdate → JSON payload
  • serialize_vehicle_position() - VehiclePosition → JSON payload
  • serialize_stop_time_update() - StopTimeUpdate → JSON payload
  • serialize_route_vehicles() - Array of vehicles by route
  • 18 comprehensive tests (100% passing)

Phase 2b: TripConsumer Backend Implementation (TURNO 2)

Complete Django Channels WebSocket consumer with:

  • AsyncWebsocketConsumer with connect/disconnect/broadcast handlers
  • Query parameters: include_stops, include_shape
  • Payload type field for JavaScript event routing
  • 6 comprehensive tests (100% passing, integration with channel layer)
  • Demo system with Django views and interactive HTML

Phase 3: RouteConsumer Implementation (TURNO 3)

Complete route-level WebSocket consumer with:

  • RouteConsumer (311 lines) - Track all vehicles on a route
  • Dual URL patterns:
    • ws://host/ws/route/{route_id}/ - All directions
    • ws://host/ws/route/{route_id}/direction/{direction_id}/ - Filtered by direction (0 or 1)
  • Direction filtering - Clients subscribe to specific traffic directions
  • Snapshot strategy - Initial vehicle array on connection
  • 9 comprehensive tests (100% passing)
  • Interactive Leaflet map demo - Real-time vehicle tracking with markers
  • ASGI routing fix - Combine feed + websocket URL patterns

Fase 4: Production Fixes (TURNO 4 + 5)

Sanitización de NaN/Infinity en JSON antes de enviar por WebSocket
Corrección de errores críticos en frontend por datos inválidos
Validación en Docker y verificación de funcionamiento en producción

Key Changes

1. AsyncAPI Specification Updates

URL Structure:

channels:
  /ws/trip/{trip_id}:
    address: /ws/trip/{trip_id}
  /ws/route/{route_id}:
    address: /ws/route/{route_id}
  /ws/route/{route_id}/direction/{direction_id}:
    address: /ws/route/{route_id}/direction/{direction_id}

AsyncAPI Spec: URLs, errores, heartbeat, y estructura de canales.
TripConsumer: Manejo de parámetros, errores, y snapshots iniciales.
RouteConsumer: Filtrado por dirección, snapshots, y manejo robusto de datos GTFS-RT.
Serializers: Funciones para serializar TripUpdate, VehiclePosition, StopTimeUpdate y arrays de vehículos.
Demo Systems: Mapas interactivos, comandos automáticos, y scripts de verificación.
Infraestructura: Fix en ASGI routing para múltiples apps.

Error Handling:

Code Type Behavior
4000 Bad Request Invalid params → close
4001 Not Found (Trip) Trip missing → close
4004 Not Found (Route) Route missing → close
4500 Server Error Internal failure → close

Heartbeat (Optional):

{"type": "heartbeat", "action": "ping", "timestamp": "..."}

2. TripConsumer Implementation (websocket/consumers/trip.py)

class TripConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        # Validate trip exists, parse params, send initial snapshot
        
    async def trip_update(self, event):
        # Receive broadcast from Celery, serialize, send to client

Features:

  • Query params: ?include_stops=true&include_shape=false
  • Error codes: 4001 (not found), 4002 (invalid params), 5001 (server error)
  • Initial snapshot on connection
  • Broadcast integration via channel layer

3. RouteConsumer Implementation (websocket/consumers/route.py)

class RouteConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        # Validate route exists, parse direction, send vehicle snapshot
        
    async def route_update(self, event):
        # Receive broadcast, filter by direction, send to clients

Features:

  • Direction filtering: Optional direction_id parameter (0 or 1)
  • Dual subscriptions: Clients can subscribe to all vehicles or filtered by direction
  • Snapshot array: Initial state with all active vehicles on route
  • Manual serialization: Avoids async/sync issues with Django ORM
  • Channel groups:
    • route_{route_id} - All vehicles on route
    • route_{route_id}_dir_{direction_id} - Vehicles filtered by direction

Data Structure:

{
  "type": "route.snapshot",
  "route_id": "DEMO_ROUTE_001",
  "direction_id": null,
  "vehicles": [
    {
      "vehicle_id": "BUS-001",
      "latitude": 9.9342,
      "longitude": -84.0873,
      "bearing": 45.0,
      "speed": 35.5,
      "direction_id": 0,
      "timestamp": "2026-01-24T..."
    }
  ],
  "timestamp": "2026-01-24T..."
}

4. GTFS Serializers (websocket/serializers/gtfs.py)

def serialize_trip_update(trip_update, include_stops=True) -> Dict:
    # Returns TripUpdatePayload following AsyncAPI schema
    
def serialize_vehicle_position(vehicle_pos) -> Dict:
    # Returns VehiclePositionPayload following AsyncAPI schema

5. Demo Systems

Trip Demo (/websocket/demo/trip/)

  • Django view with auto-connect
  • Live message log with snapshots and updates
  • Auto-detects WebSocket URL (WSL2 compatible)
  • Management commands: demo_trip_data, test_broadcast --type trip

Route Demo (/websocket/demo/route/)

  • Interactive Leaflet map with real-time vehicle markers
  • Direction toggle: Filter by direction (All, 0, 1)
  • Color-coded markers: Blue (direction 0), Green (direction 1)
  • Vehicle popups: Click marker for details
  • Live vehicle list: Side panel with real-time updates
  • Management commands: demo_route_data, test_broadcast --type route

Automated Scripts

  • run_trip_demo.sh - One-command trip demo setup
  • run_route_demo.sh - One-command route demo setup (with background broadcasts)
  • clean_docker.sh - Complete Docker teardown

AsyncAPI Validation

✓ File is valid! 0 errors, 0 warnings
✓ 815 lines (from 665)
✓ 9 schemas (added ErrorPayload, HeartbeatPayload)
✓ 6 messages (added Error, Heartbeat)
✓ 3 channels (trip, route, route+direction)

Impact

For Developers:

  • One-command demos: ./scripts/run_trip_demo.sh and ./scripts/run_route_demo.sh
  • Clear error codes and handling patterns
  • Comprehensive tests demonstrate usage
  • Consistent naming conventions (trip vs route)

For Frontend:

  • Auto-detecting WebSocket URLs (no hardcoded IPs)
  • Type-based message routing (trip.snapshot, trip.update, route.snapshot, route.update)
  • Live examples:
    • Trip: http://localhost:8000/websocket/demo/trip/
    • Route: http://localhost:8000/websocket/demo/route/
  • Interactive map with Leaflet.js for visualization

For Production:

  • Scalable channel layer integration
  • Proper error handling with connection closure
  • Direction filtering reduces unnecessary data transmission
  • Background broadcast simulation ready for Celery integration

Next Steps

Future Enhancements:

  • Optional heartbeat implementation
  • Rate limiting and authentication
  • Performance optimization for 100+ vehicles
  • Multi-route subscriptions

References

Checklist

Phase 1: Specification

  • ✅ AsyncAPI validates with CLI
  • ✅ All channel references correct
  • ✅ Error codes documented
  • ✅ Heartbeat mechanism explained

Phase 2a: Serializers (TURNO 1)

  • ✅ 4 GTFS serializer functions
  • ✅ Following AsyncAPI schema exactly
  • ✅ 18 comprehensive tests passing
  • ✅ Type hints and docstrings

Phase 2b: TripConsumer (TURNO 2)

  • ✅ TripConsumer with async handlers
  • ✅ Query parameter parsing
  • ✅ Error handling with closure codes
  • ✅ Channel layer broadcast integration
  • ✅ 6 comprehensive tests passing
  • ✅ Demo system with Django views

Phase 3: RouteConsumer (TURNO 3)

  • ✅ RouteConsumer with async handlers
  • ✅ Dual URL patterns (with/without direction)
  • ✅ Direction filtering logic
  • ✅ Snapshot strategy with vehicle array
  • ✅ Manual payload building (avoid async/sync issues)
  • ✅ 9 comprehensive tests passing
  • ✅ Interactive Leaflet map demo
  • ✅ ASGI routing fix (combine multiple apps)
  • ✅ Naming consistency refactor

Phase 4: Verification

  • ✅ Fresh Docker build tested
  • ✅ WebSocket connects successfully (both consumers)
  • ✅ Real-time updates working
  • ✅ Auto-connect and auto-detection
  • ✅ All tests passing (33/33)
  • ✅ Leaflet map visualization functional
  • ✅ Direction filtering operational

- WebSocket vs HTTP comparison
- AsyncAPI 3.0 specification research
- Django Channels implementation notes
- GTFS-Realtime schema documentation

Related to TCU task: WebSocket interface definition
- Add /ws prefix to all channel URLs
- Document RouteVehicles snapshot strategy (<50 vehicles)
- Add ErrorPayload schema with codes 4000, 4004, 4500
- Add HeartbeatPayload schema for application-level keep-alive
- Add Error and Heartbeat message definitions
- Fix channel references in operations section

Spec validated with asyncapi/cli: 0 errors
Ready for Phase 2 backend implementation
@BrandonTrigueros BrandonTrigueros self-assigned this Jan 23, 2026
@BrandonTrigueros BrandonTrigueros added the enhancement New feature or request label Jan 23, 2026
BrandonTrigueros and others added 7 commits January 24, 2026 21:07
Implementation:
- Add TripConsumer with AsyncWebsocketConsumer (240 lines)
- Implement connect/disconnect/trip_update handlers
- Add query params: include_stops, include_shape
- Fix payload type field for JavaScript compatibility

Serializers:
- Add serialize_trip_update() for TripUpdatePayload schema
- Add serialize_vehicle_position() for VehiclePositionPayload
- Follow AsyncAPI 3.0 specification structure

Testing:
- Add 6 comprehensive tests (100% coverage)
- Test connection lifecycle and error codes
- Test broadcast integration with channel layer

Demo System:
- Create Django views and URLs for trip_demo
- Add trip_demo.html with auto-connect and live updates
- Add management commands: demo_websocket_data, test_broadcast
- Add automated scripts: run_websocket_demo.sh, clean_docker.sh
- Update documentation and fix ALLOWED_HOSTS for WSL2

All tests passing (6/6). Demo verified with fresh Docker build.
… Leaflet map demo

TURNO 3 Implementation Complete:

Core Features:
- RouteConsumer (311 lines): WebSocket consumer for route-level vehicle tracking
  * Supports 2 URL patterns: ws://host/ws/route/{route_id}/ and ws://host/ws/route/{route_id}/direction/{direction_id}/
  * Direction filtering (0, 1, or all)
  * Async snapshot with vehicle array
  * Manual payload building to avoid async/sync serializer issues

- RouteConsumer Tests (360 lines): 9/9 passing
  * Connection tests (all directions, specific direction, invalid route/direction)
  * Broadcast tests (route updates, direction filtering, edge cases)
  * Fixtures chain: provider → feed → agency → route → vehicle_positions

Demo System:
- route_demo.html (550 lines): Interactive Leaflet map with real-time vehicle tracking
  * Vehicle markers with color-coded directions (blue/green)
  * Direction toggle (All/0/1)
  * Live vehicle list with details
  * Popup details on marker click
  * WebSocket message log for debugging

- demo_route_data.py: Creates DEMO_ROUTE_001 with 6 vehicles (3 per direction)
- run_route_demo.sh: Automated setup script with background broadcasts
- test_broadcast.py: Enhanced with --type route support

Infrastructure:
- ASGI routing fix: Combine feed.routing + websocket.routing patterns
- Fixed datahub/asgi.py to load WebSocket patterns from both apps
- Routing: 3 patterns (trip, route, route+direction)

Refactoring:
- Renamed for consistency:
  * run_websocket_demo.sh → run_trip_demo.sh
  * demo_websocket_data.py → demo_trip_data.py
  * test_consumers.py → test_trip_consumer.py

Tests: 33/33 passing (18 serializers + 6 trip + 9 route)
Lines: ~3,882 functional code
Progress: 63% (35h/56h)

Validated:
WebSocket connections work (both URL patterns)
Direction filtering operational
Leaflet map shows 6 vehicles with real-time updates
Broadcasts update vehicle positions automatically
Demo accessible at http://localhost:8000/websocket/demo/route/

Next: TURNO 4 - Celery Broadcasting Integration
✅ Broadcasting automático desde Celery tasks a WebSocket consumers
✅ Integración con TripConsumer y RouteConsumer
✅ Validado con MBTA (Boston) - 6,127+ broadcasts reales en 2 min
✅ 19/19 tests passing (4 broadcasting + 15 consumers)

Features:
- get_vehicle_positions(): Broadcasting batched por route_id
- get_trip_updates(): Broadcasting individual por trip_id
- Celery Beat schedule: tasks cada 30 segundos
- Error handling robusto: broadcasting no rompe tasks principales
- Batching strategy: 10x mejora de performance

Tests:
- test_trip_update_broadcasts_to_websocket
- test_vehicle_position_broadcasts_to_route
- test_broadcasting_does_not_break_task
- test_batch_broadcasting_performance

Archivos:
- feed/tasks.py: Broadcasting en líneas 194-245, 308-327
- datahub/settings.py: CELERY_BEAT_SCHEDULE configurado
- feed/tests/test_celery_broadcasting.py: 345 líneas (NUEVO)
- feed/tests/__init__.py (NUEVO)

Validación MBTA:
- 6,127+ broadcasts en 2 minutos
- 0 errores de broadcasting
- 0 errores de tasks
- Rutas: 42, 441, CR-Haverhill, 105, 106, 91, etc.

Refactoring:
- Eliminado código fake/demo (216 líneas)
- Código de producción limpio
- Separación de responsabilidades correcta
…real-time map

Turno 5: Production-Ready Client Implementation
================================================

✨ New Features
---------------
- Real-time vehicle tracking with Leaflet.js map
- WebSocket client with automatic reconnection
- Dynamic route loading from API
- Direction-based filtering (Outbound/Inbound)
- Interactive vehicle markers with popups
- Collapsible sidebar with Tailwind CSS

📦 Frontend Components
----------------------
- static/js/websocket-client.js (275 lines)
  • Exponential backoff reconnection (1s → 30s)
  • Event-based message handling
  • Heartbeat support (30s intervals)

- static/js/map-controller.js (266 lines)
  • Leaflet integration
  • Color-coded markers by direction (Blue/Green)
  • Smooth marker transitions
  • Auto-zoom to fit all vehicles

- website/templates/website/realtime_map.html (687 lines)
  • Full-screen responsive layout
  • Connection status indicators
  • Route selection with vehicle counts
  • Real-time vehicle list
  • Debug message log

🔌 Backend API
--------------
- api/views.py: Added 2 new endpoints
  • GET /api/realtime/routes/active/ - List active routes
  • GET /api/realtime/routes/<route_id>/ - Route details

- website/views.py: realtime_map() view
- website/urls.py: /realtime/ URL route

🔧 Fixes
--------
- websocket/consumers/route.py: Use pre-serialized vehicles from broadcasts
- feed/routing.py: Import RouteConsumer from websocket app
- Resolved duplicate consumer conflict from Turno 4

📁 Organization
---------------
- Moved demo scripts to demos/ directory
  • demos/websocket/run_trip_demo.sh
  • demos/websocket/run_route_demo.sh
  • demos/mbta/create_mbta_routes.py
  • demos/mbta/load_mbta_routes.py
  • demos/mbta/README.md

✅ Testing
----------
Validated with MBTA (Boston) real-time data:
- 6+ routes simultaneously
- 50+ vehicles tracked in real-time
- Auto-reconnection working
- Direction filtering functional
- Map performance smooth

Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
Critical fixes for Turno 5:
- Fix 'app is not defined' JavaScript error in realtime_map.html
- Replace app.setStatus() calls with updateConnectionStatus()
- Fix VehiclePosition query - no ForeignKey to Route, use CharField
- Use filter().first() instead of get() to avoid MultipleObjectsReturned
- Query by vehicle_trip_route_id field directly

These fixes resolve the 'Turno 5 desastroso' issue - client now loads properly.
RouteConsumer was rejecting connections for routes not in DB.
Since VehiclePosition gets route_ids from GTFS-RT feed before
they're imported into Route table, we need to accept any route_id
and just return empty snapshots if no vehicles exist.

This fixes the WebSocket REJECT issue causing error 1006.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant