From edc612961f2597f2081e41d3a0b25893b923189e Mon Sep 17 00:00:00 2001 From: Matic Babnik Date: Sat, 10 Jan 2026 03:04:44 +0100 Subject: [PATCH 1/2] feat: WS Framework done --- backend/index.ts | 1 - backend/src/constants.ts | 2 + backend/src/handlers/common.ts | 10 +++ backend/src/handlers/index.ts | 17 ++++ backend/src/handlers/owner.ts | 14 +++ backend/src/models/Room.ts | 3 +- backend/src/reaper.ts | 50 +++++++++++ backend/src/server.ts | 114 +++++++++++++++++++++---- backend/src/services/RoomManager.ts | 17 ++-- backend/src/services/SessionManager.ts | 5 ++ wire/package.json | 2 +- wire/readme.md | 11 +-- wire/src/backend/index.ts | 42 +++++---- wire/src/types/error.ts | 8 +- wire/src/types/msg.ts | 2 +- 15 files changed, 243 insertions(+), 55 deletions(-) delete mode 100644 backend/index.ts create mode 100644 backend/src/constants.ts create mode 100644 backend/src/handlers/common.ts create mode 100644 backend/src/handlers/index.ts create mode 100644 backend/src/handlers/owner.ts create mode 100644 backend/src/reaper.ts diff --git a/backend/index.ts b/backend/index.ts deleted file mode 100644 index f67b2c6..0000000 --- a/backend/index.ts +++ /dev/null @@ -1 +0,0 @@ -console.log("Hello via Bun!"); \ No newline at end of file diff --git a/backend/src/constants.ts b/backend/src/constants.ts new file mode 100644 index 0000000..d6967e8 --- /dev/null +++ b/backend/src/constants.ts @@ -0,0 +1,2 @@ +export const REAPER_INTERVAL_MS = 15_000 +export const REAPER_TIMEOUT_MS = 30_000 diff --git a/backend/src/handlers/common.ts b/backend/src/handlers/common.ts new file mode 100644 index 0000000..5eb110e --- /dev/null +++ b/backend/src/handlers/common.ts @@ -0,0 +1,10 @@ +import type { ClientMessages } from '@sync/wire' +import type { HandlerMap } from '.' +import type { SyncServer, SyncWS } from '../server' + +export const COMMON_HANDLERS = { + ping: (ws: SyncWS, msg: ClientMessages['ping'], server: SyncServer) => {}, + struggle: (ws: SyncWS, msg: ClientMessages['struggle'], server: SyncServer) => {}, + message: (ws: SyncWS, msg: ClientMessages['message'], server: SyncServer) => {}, + playbackStats: (ws: SyncWS, msg: ClientMessages['playbackStats'], server: SyncServer) => {}, +} satisfies Partial diff --git a/backend/src/handlers/index.ts b/backend/src/handlers/index.ts new file mode 100644 index 0000000..67ac9c4 --- /dev/null +++ b/backend/src/handlers/index.ts @@ -0,0 +1,17 @@ +import type { ClientMessage } from '@sync/wire' +import type { SyncServer, SyncWS } from '../server' +import { OWNER_HANDLERS } from './owner' +import { COMMON_HANDLERS } from './common' + +export type HandlerMap = { + [TKey in ClientMessage['type']]: ( + ws: SyncWS, + msg: Extract, + server: SyncServer, + ) => void | Promise +} + +export const HANDLERS = { + ...OWNER_HANDLERS, + ...COMMON_HANDLERS, +} satisfies HandlerMap diff --git a/backend/src/handlers/owner.ts b/backend/src/handlers/owner.ts new file mode 100644 index 0000000..cb8cad7 --- /dev/null +++ b/backend/src/handlers/owner.ts @@ -0,0 +1,14 @@ +import type { ClientMessages } from '@sync/wire' +import type { HandlerMap } from '.' +import type { SyncServer, SyncWS } from '../server' + +export const OWNER_HANDLERS = { + sync: (ws: SyncWS, msg: ClientMessages['sync'], server: SyncServer) => {}, + updateRoom: (ws: SyncWS, msg: ClientMessages['updateRoom'], server: SyncServer) => {}, + clearChat: (ws: SyncWS, msg: ClientMessages['clearChat'], server: SyncServer) => {}, + destroyRoom: (ws: SyncWS, msg: ClientMessages['destroyRoom'], server: SyncServer) => {}, + kick: (ws: SyncWS, msg: ClientMessages['kick'], server: SyncServer) => {}, + kickAll: (ws: SyncWS, msg: ClientMessages['kickAll'], server: SyncServer) => {}, + updatePlaylist: (ws: SyncWS, msg: ClientMessages['updatePlaylist'], server: SyncServer) => {}, + queryPlayback: (ws: SyncWS, msg: ClientMessages['queryPlayback'], server: SyncServer) => {}, +} satisfies Partial diff --git a/backend/src/models/Room.ts b/backend/src/models/Room.ts index d2e82df..8ef8ad8 100644 --- a/backend/src/models/Room.ts +++ b/backend/src/models/Room.ts @@ -55,11 +55,12 @@ export class Room { * Remove a user from this room and sets a new random owner or `undefined` if the room is empty * @param user The user */ - public removeUser(user: User) { + public removeUser(user: User, newOwnerCallback: (ownerId?: string) => void) { this.users.delete(user) if (this._owner === user) { this.assignNewOwner() + if (this._owner) newOwnerCallback(this._owner.id) } } diff --git a/backend/src/reaper.ts b/backend/src/reaper.ts new file mode 100644 index 0000000..25d1b8a --- /dev/null +++ b/backend/src/reaper.ts @@ -0,0 +1,50 @@ +import { serializeMsg } from '@sync/wire/backend' +import { REAPER_TIMEOUT_MS } from './constants' +import type { User } from './models' +import type { SyncServer } from './server' +import { RoomManager } from './services/RoomManager' +import { SessionManager } from './services/SessionManager' + +export function reap(server: SyncServer) { + const roomsToDestroy = new Set() + const usersToRemove = new Set() + + let userCount = 0 + let roomCount = 0 + + for (const room of RoomManager.roomsIterator()) { + usersToRemove.clear() + + for (const user of room.users.values()) { + if ( + user.state !== 'present' && + Date.now() - user.lastStateChangeTimestamp > REAPER_TIMEOUT_MS + ) { + usersToRemove.add(user) + SessionManager.destroy(user.sessionId) + } + } + + for (const user of usersToRemove) { + userCount++ + + room.removeUser(user, (ownerId) => { + server.publish(room.topic, serializeMsg('roomUpdated', { ownerId })) + }) + + if (user.state !== 'new') + server.publish(room.topic, serializeMsg('userLeft', { userId: user.id })) + } + + if (room.users.size === 0) { + roomsToDestroy.add(room.slug) + } + } + + for (const roomSlug of roomsToDestroy) { + roomCount++ + RoomManager.deleteRoom(roomSlug) + } + + console.log(`[Reaper] Removed ${userCount} users and ${roomCount} rooms`) +} diff --git a/backend/src/server.ts b/backend/src/server.ts index 0e73551..e0d2b31 100644 --- a/backend/src/server.ts +++ b/backend/src/server.ts @@ -2,13 +2,25 @@ import { Elysia, t } from 'elysia' import { Room, User } from './models' import { RoomManager } from './services/RoomManager' import { SessionManager } from './services/SessionManager' -import { CloseCode, CloseReason, serializeMsg } from '@sync/wire/backend' +import { + CloseCode, + CloseReason, + MalformedMsgError, + parseMessage, + serializeMsg, + type ClientMessage, +} from '@sync/wire/backend' +import { HANDLERS } from './handlers' +import { REAPER_INTERVAL_MS } from './constants' +import { reap } from './reaper' export type WSData = { user: User + closedByServer?: boolean } export type SyncWS = Bun.ServerWebSocket +export type SyncServer = Bun.Server const app = new Elysia() .post('/room/canCreate/:id', ({ params: { id }, status }) => { @@ -92,7 +104,7 @@ const app = new Elysia() }), }, ) - .post('/media/check', (req) => {}, { + .post('/media/check', ({ body }) => {}, { body: t.Object({}), }) @@ -122,53 +134,123 @@ export const server = Bun.serve({ console.log(`[${user.room.slug}:${user.displayName}] Open`) + // Set the user as present user.state = 'present' user.lastStateChangeTimestamp = Date.now() if (user.webSocket) { + ws.data.closedByServer = true user.webSocket.close(CloseCode.ConnectedElsewhere, CloseReason.ConnectedElsewhere) user.webSocket = ws } else { server.publish( user.room.topic, - serializeMsg('userState', { userId: user.id, state: 'present' }), + serializeMsg('userState', { + userId: user.id, + timestamp: Date.now(), + state: 'present', + }), ) } + // Hello the new connection user.webSocket = ws ws.subscribe(user.room.topic) - - //TODO: Send hello ws.send(serializeMsg('roomHello', { you: user.toWire(), ...user.room.toWire() })) }, - message: (ws, msg) => {}, + message: async (ws, msg) => { + if (typeof msg !== 'string') { + ws.send(serializeMsg('error', { type: 'binaryData', message: 'No buffers pls' })) + return + } + + let parsedMsg: ClientMessage + + try { + parsedMsg = parseMessage(msg) + } catch (e: unknown) { + let errorMsg = 'Unknown error', + replyTo: number | undefined = undefined + + if (e instanceof MalformedMsgError) { + errorMsg = e.message + replyTo = e.messageId + } + + ws.send(serializeMsg('error', { type: 'malformedMsg', message: errorMsg }, replyTo)) + return + } + + const handler = HANDLERS[parsedMsg.type] + + if (!handler) { + ws.send( + serializeMsg( + 'error', + { type: 'nobodyCared', message: `Who? Asked`, cause: parsedMsg.type }, + parsedMsg.id, + ), + ) + return + } + + try { + await handler(ws, parsedMsg as never, server) + } catch (e) { + console.error('Error handling message:', e) + + ws.send( + serializeMsg( + 'error', + { + type: 'serverError', + message: 'Server error while handling message', + cause: parsedMsg.type, + }, + parsedMsg.id, + ), + ) + } + }, close: (ws, code, reason) => { const { user } = ws.data console.log(`[${user.room.slug}:${user.displayName}] Close: ${code} (${reason})`) - if (code === CloseCode.ConnectedElsewhere) return + // When setting closedByServer, we don't want to run the normal close logic + if (ws.data.closedByServer) return if (code === CloseCode.Leave) { - user.room.removeUser(user) + // User intentionally left + user.room.removeUser(user, (ownerId) => { + server.publish(user.room.topic, serializeMsg('roomUpdated', { ownerId })) + }) + user.webSocket = undefined server.publish(user.room.topic, serializeMsg('userLeft', { userId: user.id })) return - } else { - user.state = 'reconnecting' - user.lastStateChangeTimestamp = Date.now() - - server.publish( - user.room.topic, - serializeMsg('userState', { userId: user.id, state: 'disconnected' }), - ) } + // User disconnected unexpectedly (or wrongly) user.state = 'reconnecting' user.lastStateChangeTimestamp = Date.now() + + server.publish( + user.room.topic, + serializeMsg('userState', { + userId: user.id, + timestamp: Date.now(), + state: 'disconnected', + }), + ) + // The user will either reconnect or be reapt }, }, }) + +setInterval(() => { + reap(server) +}, REAPER_INTERVAL_MS) diff --git a/backend/src/services/RoomManager.ts b/backend/src/services/RoomManager.ts index f17e937..7d4d6ce 100644 --- a/backend/src/services/RoomManager.ts +++ b/backend/src/services/RoomManager.ts @@ -6,13 +6,6 @@ import { Room } from '../models' export namespace RoomManager { const rooms: Map = new Map() - /** - * Returns a shallow clone of all rooms - */ - export function allRooms(): Map { - return new Map(rooms) - } - /** * Create a new room * @param id @@ -41,11 +34,11 @@ export namespace RoomManager { * @returns true if room was successfully deleted, false otherwise */ export function deleteRoom(id: string): boolean { - getRoom(id) - ?.users.values() - .forEach((user) => { - user.webSocket?.close(4000, 'Room deleted.') - }) + console.log(`[RoomManager] Deleted ${id}`) return rooms.delete(id) } + + export function roomsIterator() { + return rooms.values() + } } diff --git a/backend/src/services/SessionManager.ts b/backend/src/services/SessionManager.ts index e3af1dd..cd55512 100644 --- a/backend/src/services/SessionManager.ts +++ b/backend/src/services/SessionManager.ts @@ -16,4 +16,9 @@ export namespace SessionManager { console.log(`[SessionManager] Created user ${user.id}`) } + + export function destroy(id: string) { + sessions.delete(id) + console.log(`[SessionManager] Destroyed session ${id}`) + } } diff --git a/wire/package.json b/wire/package.json index 4cc4d3d..6c24b30 100644 --- a/wire/package.json +++ b/wire/package.json @@ -14,7 +14,7 @@ ], "scripts": { "build": "tsc", - "prepublish": "tsc", + "dev": "tsc --watch", "check:all": "bun run check:type && bun run check:format", "check:type": "tsc --noEmit", "check:format": "prettier --check --experimental-cli src/" diff --git a/wire/readme.md b/wire/readme.md index cb7d59c..34bc502 100644 --- a/wire/readme.md +++ b/wire/readme.md @@ -89,7 +89,7 @@ type WireUser = { ``` -### `Media` JWT body +### `Media` JWS body ```ts @@ -177,7 +177,7 @@ or 404, 400 on error. ### `POST /media/check` -Checks a media file and signs a JWT media token. Request body: +Checks a media file and signs a JWS media token. Request body: ```ts @@ -191,7 +191,7 @@ type CheckMediaRequest = { Returns `200 OK` with media token: ```ts -type CheckMediaResponse = string; //JWT, see MediaBody +type CheckMediaResponse = string; //JWS, see MediaBody ``` or 400 with error message. @@ -237,7 +237,7 @@ type PingPayload = null Sent by the client to send a chat message to the room. ```ts -// a JWT token of a validated video source +// a JWS token of a validated video source type Media = string; type MessagePayload = { @@ -255,7 +255,7 @@ Replies/related: `chatMessage`, `error` Sent by the client to update the room state (video playback position, etc.) ```ts -// a JWT token of a validated video source +// a JWS token of a validated video source type Media = string; type SyncIdle = { @@ -484,6 +484,7 @@ Sent by the server to all clients when a user's state changes. ```ts type UserStatePayload = { userId: string; + timestamp: number; state: 'present' | 'disconnected'; } ``` diff --git a/wire/src/backend/index.ts b/wire/src/backend/index.ts index 0187bac..9a934a3 100644 --- a/wire/src/backend/index.ts +++ b/wire/src/backend/index.ts @@ -7,7 +7,7 @@ const CHAT_TEXT_VALIDATOR = Type.String({ maxLength: 250, }) -const MEDIA_JWT_VALIDATOR = Type.String({ +const MEDIA_JWS_VALIDATOR = Type.String({ minLength: 8, }) @@ -20,7 +20,7 @@ const MESSAGE_VALIDATORS_UNCOMPILED = { }), Type.Object({ text: Type.Optional(CHAT_TEXT_VALIDATOR), - recommendation: MEDIA_JWT_VALIDATOR, + recommendation: MEDIA_JWS_VALIDATOR, }), ]), @@ -31,13 +31,13 @@ const MESSAGE_VALIDATORS_UNCOMPILED = { Type.Object({ state: Type.Literal('paused'), - media: MEDIA_JWT_VALIDATOR, + media: MEDIA_JWS_VALIDATOR, position: Type.Number({ minimum: 0 }), }), Type.Object({ state: Type.Literal('playing'), - media: MEDIA_JWT_VALIDATOR, + media: MEDIA_JWS_VALIDATOR, offset: Type.Number(/* i'm too dumb to figgure out what the range is here */), rate: Type.Number({ minimum: 0 }), }), @@ -63,7 +63,7 @@ const MESSAGE_VALIDATORS_UNCOMPILED = { userId: Type.String({ format: 'uuid' }), }), - updatePlaylist: Type.Array(MEDIA_JWT_VALIDATOR), + updatePlaylist: Type.Array(MEDIA_JWS_VALIDATOR), playbackStats: Type.Object({ latency: Type.Number({ minimum: 0 }), @@ -100,48 +100,60 @@ export type ClientMessages = { export type ClientMessage = ClientMessages[keyof ClientMessages] +export class MalformedMsgError extends Error { + constructor( + message: string, + public messageId?: number, + ) { + super(message) + } +} + export function parseMessage(message: string): ClientMessage { let parsed: unknown try { parsed = JSON.parse(message) } catch { - throw new Error('Invalid JSON') + throw new MalformedMsgError('Invalid JSON') } if (typeof parsed !== 'object' || parsed === null) { - throw new Error('Not a SyncMsg object') + throw new MalformedMsgError('Not a SyncMsg object') } if ('id' in parsed && typeof parsed.id !== 'number') { - throw new Error('Invalid id type') + throw new MalformedMsgError('Invalid id type') } + const id = (parsed as SyncMsg).id + if ('replyTo' in parsed && typeof parsed.replyTo !== 'number') { - throw new Error('Invalid replyTo type') + throw new MalformedMsgError('Invalid replyTo type', id) } if (!('type' in parsed) || !('body' in parsed)) { - throw new Error('Not a SyncMsg object') + throw new MalformedMsgError('Missing type or body', id) } - if (typeof parsed.type !== 'string' || !(parsed.type in COMPILED_MESSAGE_VALIDATORS)) { - throw new Error('Unknown message type') + if (typeof parsed.type !== 'string') { + throw new MalformedMsgError('Invalid message type', id) } const validator = _validators[parsed.type] if (!validator) { - throw new Error('Invalid message type') + throw new MalformedMsgError('Unknown message type', id) } if (!validator.Check(parsed.body)) { - throw new Error( + throw new MalformedMsgError( 'Invalid message body: ' + validator .Errors(parsed.body) .map((e) => e.message) .join(', '), + id, ) } @@ -151,8 +163,8 @@ export function parseMessage(message: string): ClientMessage { export function serializeMsg( type: Tkey, body: ServerMsgMap[Tkey], - id?: number, replyTo?: number, + id?: number, ): string { const obj: SyncMsg = { type, diff --git a/wire/src/types/error.ts b/wire/src/types/error.ts index b829798..360cf6f 100644 --- a/wire/src/types/error.ts +++ b/wire/src/types/error.ts @@ -1,14 +1,16 @@ type ErrorType = - | 'nobodyCared' + | 'binaryData' | 'malformedMsg' + | 'nobodyCared' + | 'serverError' | 'unauthorized' | 'ratelimit' - | 'invalidMedia' | 'badSync' + | 'invalidMedia' | 'userNotFound' | 'badRoomUpdate' - | 'playlistCurrentMediaMissing' | 'playlistDuplicates' + | 'playlistCurrentMediaMissing' export interface SyncError { cause?: string // message type that caused the error (helping clients out) diff --git a/wire/src/types/msg.ts b/wire/src/types/msg.ts index d21aae9..5e0d80a 100644 --- a/wire/src/types/msg.ts +++ b/wire/src/types/msg.ts @@ -24,7 +24,7 @@ export interface ServerMsgMap { userJoined: WireUser userLeft: { userId: string } - userState: { userId: string; state: WireUserState } + userState: { userId: string; timestamp: number; state: WireUserState } playbackQuery: null playbackReport: { userId: string; stats: PlaybackStats } From a657afe003e5b43c51a7ece66feb67658c82e6f9 Mon Sep 17 00:00:00 2001 From: Matic Babnik Date: Sat, 10 Jan 2026 06:16:56 +0100 Subject: [PATCH 2/2] feat: Implemented WS handlers --- backend/src/constants.ts | 2 + backend/src/handlers/common.ts | 80 +++++++++++- backend/src/handlers/owner.ts | 168 +++++++++++++++++++++++-- backend/src/models/Room.ts | 112 ++++++++++++++--- backend/src/reaper.ts | 5 +- backend/src/server.ts | 37 +++++- backend/src/services/MediaManager.ts | 27 ++++ backend/src/services/SessionManager.ts | 6 + backend/src/util/guards.ts | 79 ++++++++++++ backend/src/util/msg.ts | 38 ++++++ wire/src/backend/index.ts | 16 +-- wire/src/types/error.ts | 10 +- wire/src/types/msg.ts | 1 + 13 files changed, 536 insertions(+), 45 deletions(-) create mode 100644 backend/src/services/MediaManager.ts create mode 100644 backend/src/util/guards.ts create mode 100644 backend/src/util/msg.ts diff --git a/backend/src/constants.ts b/backend/src/constants.ts index d6967e8..f3d4313 100644 --- a/backend/src/constants.ts +++ b/backend/src/constants.ts @@ -1,2 +1,4 @@ export const REAPER_INTERVAL_MS = 15_000 export const REAPER_TIMEOUT_MS = 30_000 + +export const ROOM_MAX_CHAT_HISTORY = 50 diff --git a/backend/src/handlers/common.ts b/backend/src/handlers/common.ts index 5eb110e..8446f4a 100644 --- a/backend/src/handlers/common.ts +++ b/backend/src/handlers/common.ts @@ -1,10 +1,80 @@ -import type { ClientMessages } from '@sync/wire' +import type { ClientMessages, UserMessage } from '@sync/wire' import type { HandlerMap } from '.' import type { SyncServer, SyncWS } from '../server' +import { serializeMsg } from '@sync/wire/backend' +import { MediaManager } from '../services/MediaManager' +import { notOwnerGuard } from '../util/guards' +import { reply, replyError } from '../util/msg' export const COMMON_HANDLERS = { - ping: (ws: SyncWS, msg: ClientMessages['ping'], server: SyncServer) => {}, - struggle: (ws: SyncWS, msg: ClientMessages['struggle'], server: SyncServer) => {}, - message: (ws: SyncWS, msg: ClientMessages['message'], server: SyncServer) => {}, - playbackStats: (ws: SyncWS, msg: ClientMessages['playbackStats'], server: SyncServer) => {}, + ping: (ws: SyncWS, msg: ClientMessages['ping']) => { + reply(ws, msg, 'pong', { + timestamp: Date.now(), + }) + }, + + struggle: (ws: SyncWS, msg: ClientMessages['struggle']) => { + const me = ws.data.user + const owner = me.room.owner + if (!owner) return + + if (notOwnerGuard(ws, msg)) return + + // TODO: Rate limit. + + ws.data.user.room.owner?.webSocket?.send( + serializeMsg('userStruggle', { userId: ws.data.user.id }), + ) + }, + + message: async (ws: SyncWS, msg: ClientMessages['message'], server: SyncServer) => { + let text = msg.body.text?.trim() + if (text?.length === 0) text = undefined + + //TODO: Rate limit + + if (!text && !msg.body.recommendation) { + replyError(ws, msg, { + type: 'invalidChatMessage', + message: 'Message must contain text or a media recommendation.', + }) + return + } + + const recommendation = msg.body.recommendation + + if (recommendation && !(await MediaManager.verifyMedia(recommendation))) { + replyError(ws, msg, { + type: 'invalidMedia', + message: 'The recommended media is invalid.', + }) + return + } + + const cmsg: UserMessage = { + type: 'user', + userId: ws.data.user.id, + timestamp: Date.now(), + text, + recommendation: msg.body.recommendation, + } + + ws.data.user.room.addMessage(cmsg) + server.publish(ws.data.user.room.topic, serializeMsg('chatMessage', cmsg)) + }, + + playbackStats: (ws: SyncWS, msg: ClientMessages['playbackStats']) => { + const me = ws.data.user + const owner = me.room.owner + + if (!owner) return + + if (notOwnerGuard(ws, msg)) return + + // TODO: Rate limit. + + ws.data.user.room.owner?.webSocket?.send( + serializeMsg('playbackReport', { userId: ws.data.user.id, stats: msg.body }), + ) + }, } satisfies Partial diff --git a/backend/src/handlers/owner.ts b/backend/src/handlers/owner.ts index cb8cad7..75f6020 100644 --- a/backend/src/handlers/owner.ts +++ b/backend/src/handlers/owner.ts @@ -1,14 +1,166 @@ import type { ClientMessages } from '@sync/wire' import type { HandlerMap } from '.' import type { SyncServer, SyncWS } from '../server' +import { CloseCode, CloseReason, serializeMsg } from '@sync/wire/backend' +import { Room } from '../models' +import { SessionManager } from '../services/SessionManager' +import { RoomManager } from '../services/RoomManager' +import { notSelfGuard, ownerGuard, targetUserExistsGuard } from '../util/guards' +import { broadcast, reply, replyError, replyOk } from '../util/msg' export const OWNER_HANDLERS = { - sync: (ws: SyncWS, msg: ClientMessages['sync'], server: SyncServer) => {}, - updateRoom: (ws: SyncWS, msg: ClientMessages['updateRoom'], server: SyncServer) => {}, - clearChat: (ws: SyncWS, msg: ClientMessages['clearChat'], server: SyncServer) => {}, - destroyRoom: (ws: SyncWS, msg: ClientMessages['destroyRoom'], server: SyncServer) => {}, - kick: (ws: SyncWS, msg: ClientMessages['kick'], server: SyncServer) => {}, - kickAll: (ws: SyncWS, msg: ClientMessages['kickAll'], server: SyncServer) => {}, - updatePlaylist: (ws: SyncWS, msg: ClientMessages['updatePlaylist'], server: SyncServer) => {}, - queryPlayback: (ws: SyncWS, msg: ClientMessages['queryPlayback'], server: SyncServer) => {}, + sync: (ws: SyncWS, msg: ClientMessages['sync'], server: SyncServer) => { + if (ownerGuard(ws, msg)) return + + const room = ws.data.user.room + + room.setSync(msg.body) + + broadcast(server, ws, 'ssync', msg.body) + }, + + updateRoom: (ws: SyncWS, msg: ClientMessages['updateRoom'], server: SyncServer) => { + if (ownerGuard(ws, msg)) return + + const room = ws.data.user.room + + let changed = false + + if (msg.body.name) { + if (Room.checkName) { + const nameError = Room.checkName(msg.body.name) + + if (nameError) { + return replyError(ws, msg, { + type: 'badRoomUpdate', + message: nameError, + }) + } + } + + room.name = msg.body.name.trim() + changed = true + } + + replyOk(ws, msg) + + if (changed) { + broadcast(server, ws, 'roomUpdated', { room: { name: room.name, slug: room.slug } }) + } + }, + + clearChat: (ws: SyncWS, msg: ClientMessages['clearChat'], server: SyncServer) => { + if (ownerGuard(ws, msg)) return + + const room = ws.data.user.room + + room.chat.length = 0 + + broadcast(server, ws, 'chatCleared', null) + }, + + destroyRoom: (ws: SyncWS, msg: ClientMessages['destroyRoom']) => { + if (ownerGuard(ws, msg)) return + + const room = ws.data.user.room + + for (const user of room.users.values()) { + if (!user.webSocket) continue + + user.webSocket.data.closedByServer = true + user.webSocket.close(CloseCode.RoomClosed, CloseReason.RoomClosed) + user.webSocket = undefined + SessionManager.destroy(user.sessionId) + } + + room.users.clear() + RoomManager.deleteRoom(room.slug) + }, + + promote: (ws: SyncWS, msg: ClientMessages['promote'], server: SyncServer) => { + if (ownerGuard(ws, msg)) return + if (notSelfGuard(ws, msg)) return + if (targetUserExistsGuard(ws, msg)) return + + const room = ws.data.user.room + + const targetUser = room.users.get(msg.body.userId) + if (!targetUser) return // TODO: Assert/errror/... + + room.promote(targetUser) + + broadcast(server, ws, 'roomUpdated', { ownerId: targetUser.id }) + }, + + kick: (ws: SyncWS, msg: ClientMessages['kick'], server: SyncServer) => { + if (ownerGuard(ws, msg)) return + if (notSelfGuard(ws, msg)) return + if (targetUserExistsGuard(ws, msg)) return + + const room = ws.data.user.room + + const targetUser = room.users.get(msg.body.userId) + + if (!targetUser) return // TODO: Assert/errror/... + + if (targetUser.webSocket) { + targetUser.webSocket.data.closedByServer = true + targetUser.webSocket.close(CloseCode.Kicked, CloseReason.Kicked) + targetUser.webSocket = undefined + } + + room.removeUser(targetUser, () => {}) + SessionManager.destroy(targetUser.sessionId) + + broadcast(server, ws, 'userLeft', { userId: targetUser.id }) + }, + + kickAll: (ws: SyncWS, msg: ClientMessages['kickAll']) => { + if (ownerGuard(ws, msg)) return + + const room = ws.data.user.room + + for (const user of room.users.values()) { + if (!user.webSocket || user.webSocket === ws) continue + + user.webSocket.data.closedByServer = true + user.webSocket.close(CloseCode.Kicked, CloseReason.Kicked) + user.webSocket = undefined + SessionManager.destroy(user.sessionId) + } + + room.users.clear() + room.users.set(ws.data.user.id, ws.data.user) + + reply(ws, msg, 'roomUpdated', { users: [ws.data.user.toWire()] }) + }, + + updatePlaylist: (ws: SyncWS, msg: ClientMessages['updatePlaylist'], server: SyncServer) => { + if (ownerGuard(ws, msg)) return + + const room = ws.data.user.room + + const error = room.updatePlaylist(msg.body) + + if (error) { + return replyError(ws, msg, { + type: 'badPlaylist', + message: error, + }) + } + + replyOk(ws, msg) + broadcast(server, ws, 'roomUpdated', { playlist: room.playlist }) + }, + + queryPlayback: (ws: SyncWS, msg: ClientMessages['queryPlayback']) => { + if (ownerGuard(ws, msg)) return + if (notSelfGuard(ws, msg)) return + if (targetUserExistsGuard(ws, msg)) return + + const room = ws.data.user.room + const targetUser = room.users.get(msg.body.userId) + + targetUser?.webSocket?.send(serializeMsg('playbackQuery', null)) + }, } satisfies Partial diff --git a/backend/src/models/Room.ts b/backend/src/models/Room.ts index 8ef8ad8..2ebd323 100644 --- a/backend/src/models/Room.ts +++ b/backend/src/models/Room.ts @@ -1,5 +1,7 @@ +import { ROOM_MAX_CHAT_HISTORY } from '../constants.ts' +import { MediaManager } from '../services/MediaManager.ts' import { type User } from './User.ts' -import type { WireRoom } from '@sync/wire/types' +import type { ChatMessage, SyncState, WireRoom } from '@sync/wire/types' const SLUG_REGEX = /^[a-zA-Z0-9-_]{3,64}$/ @@ -11,13 +13,9 @@ export class Room { return SLUG_REGEX.test(slug) } - public static check(slug: string, name: string): string | undefined { + public static checkName(name: string): string | undefined { name = name.trim() - if (!Room.isValidSlug(slug)) { - return 'Invalid room slug' - } - if (name.length === 0) { return 'Room name cannot be empty' } @@ -27,11 +25,29 @@ export class Room { } } + public static check(slug: string, name: string): string | undefined { + name = name.trim() + + if (!Room.isValidSlug(slug)) { + return 'Invalid room slug' + } + + const nameError = Room.checkName(name) + if (nameError) return nameError + } + public readonly topic: string - public readonly users = new Set() + public readonly users = new Map() + public _owner: User | undefined + public readonly chat: ChatMessage[] = [] + + public playlist: string[] = [] + + private sync: SyncState = { state: 'idle' } + public constructor( public readonly slug: string, public name: string, @@ -46,7 +62,7 @@ export class Room { * @param user The user */ public addUser(user: User) { - this.users.add(user) + this.users.set(user.id, user) if (!this._owner) this._owner = user } @@ -56,15 +72,20 @@ export class Room { * @param user The user */ public removeUser(user: User, newOwnerCallback: (ownerId?: string) => void) { - this.users.delete(user) + this.users.delete(user.id) if (this._owner === user) { - this.assignNewOwner() + this.findNewOwner() if (this._owner) newOwnerCallback(this._owner.id) } } - private assignNewOwner() { + private findNewOwner() { + if (this.users.size === 0) { + this._owner = undefined + return + } + const candidates = Array.from(this.users.values()) candidates.sort((a, b) => { @@ -84,6 +105,14 @@ export class Room { this._owner = candidates[0] } + /** + * Promote a user to be the owner of this room + */ + public promote(user: User): void { + if (!this.users.has(user.id)) throw new Error('User is not in this room') + this._owner = user + } + /** * Get the owner of this room */ @@ -95,6 +124,60 @@ export class Room { return this.users.size === 0 } + /** + * Stores a message in the room's history + */ + public addMessage(msg: ChatMessage): void { + this.chat.push(msg) + + if (this.chat.length > ROOM_MAX_CHAT_HISTORY) { + this.chat.shift() + } + } + + public setSync(sync: SyncState): void { + this.sync = sync + } + + /** + * Attempts to update the room's playlist, returns an error message on failure + */ + public updatePlaylist(newPlaylist: string[]): string | undefined { + const toValidate: string[] = [] + + const pm = new Map() + + for (const m of this.playlist) { + pm.set(m, 0) + } + + for (const m of newPlaylist) { + const c = pm.get(m) + + if (c === undefined) { + toValidate.push(m) + pm.set(m, 1) + } else if (c === 0) { + pm.set(m, 1) + } else { + return 'Playlist contains duplicate media IDs' + } + } + + for (const m of toValidate) { + if (!MediaManager.validateMedia(m)) { + return `Media ID ${m} is invalid` + } + } + + // every media either: + // - was already in the playlist + // - is new and valid + // - got removed + + this.playlist = newPlaylist + } + public toWire(): WireRoom { return { room: { @@ -104,10 +187,9 @@ export class Room { users: Array.from(this.users.values()).map((u) => u.toWire()), ownerId: this._owner?.id ?? '', - // TODO: impl - chat: [], - playlist: [], - sync: { state: 'idle' }, + chat: this.chat, + playlist: this.playlist, + sync: this.sync, } } } diff --git a/backend/src/reaper.ts b/backend/src/reaper.ts index 25d1b8a..8d482de 100644 --- a/backend/src/reaper.ts +++ b/backend/src/reaper.ts @@ -36,7 +36,7 @@ export function reap(server: SyncServer) { server.publish(room.topic, serializeMsg('userLeft', { userId: user.id })) } - if (room.users.size === 0) { + if (room.isEmpty) { roomsToDestroy.add(room.slug) } } @@ -46,5 +46,6 @@ export function reap(server: SyncServer) { RoomManager.deleteRoom(roomSlug) } - console.log(`[Reaper] Removed ${userCount} users and ${roomCount} rooms`) + if (userCount || roomCount) + console.log(`[Reaper] Removed ${userCount} users and ${roomCount} rooms`) } diff --git a/backend/src/server.ts b/backend/src/server.ts index e0d2b31..df0d784 100644 --- a/backend/src/server.ts +++ b/backend/src/server.ts @@ -13,6 +13,7 @@ import { import { HANDLERS } from './handlers' import { REAPER_INTERVAL_MS } from './constants' import { reap } from './reaper' +import { MediaManager, MediaValidationError } from './services/MediaManager' export type WSData = { user: User @@ -104,9 +105,38 @@ const app = new Elysia() }), }, ) - .post('/media/check', ({ body }) => {}, { - body: t.Object({}), - }) + .post( + '/media/check', + async ({ body, status, headers }) => { + const s = SessionManager.authenticateFromHeader(headers.authorization) + + if (!s) { + return status(401) + } + + try { + const media = await MediaManager.verifyMedia(body.source) + return { media } + } catch (e) { + if (e instanceof MediaValidationError) { + return status(400, { + type: e.type, + message: e.message, + }) + } else { + return status(500, { type: 'ise', message: 'Internal server error' }) + } + } + }, + { + body: t.Object({ + source: t.String({ format: 'uri' }), + }), + headers: t.Object({ + authorization: t.Optional(t.String()), + }), + }, + ) export const server = Bun.serve({ fetch: app.handle.bind(app), @@ -227,6 +257,7 @@ export const server = Bun.serve({ user.room.removeUser(user, (ownerId) => { server.publish(user.room.topic, serializeMsg('roomUpdated', { ownerId })) }) + SessionManager.destroy(user.sessionId) user.webSocket = undefined server.publish(user.room.topic, serializeMsg('userLeft', { userId: user.id })) diff --git a/backend/src/services/MediaManager.ts b/backend/src/services/MediaManager.ts new file mode 100644 index 0000000..e3f65e9 --- /dev/null +++ b/backend/src/services/MediaManager.ts @@ -0,0 +1,27 @@ +export namespace MediaManager { + export async function verifyMedia(source: string): Promise { + // TODO: Implement media verification logic + return source + } + + export function validateMedia(_jws: string): boolean { + // TODO: Implement media validation logic + return true + } +} + +export type MediaValidationErrorType = + | 'invalidSourceFormat' + | 'invalidScheme' + | 'timeout' + | 'badResponse' + | 'unsupportedMime' + +export class MediaValidationError extends Error { + constructor( + public type: MediaValidationErrorType, + message: string, + ) { + super(message) + } +} diff --git a/backend/src/services/SessionManager.ts b/backend/src/services/SessionManager.ts index cd55512..e3c18d2 100644 --- a/backend/src/services/SessionManager.ts +++ b/backend/src/services/SessionManager.ts @@ -21,4 +21,10 @@ export namespace SessionManager { sessions.delete(id) console.log(`[SessionManager] Destroyed session ${id}`) } + + export function authenticateFromHeader(authorization?: string) { + if (!authorization) return undefined + const token = authorization?.replace('Bearer ', '') + return get(token) + } } diff --git a/backend/src/util/guards.ts b/backend/src/util/guards.ts new file mode 100644 index 0000000..42e459a --- /dev/null +++ b/backend/src/util/guards.ts @@ -0,0 +1,79 @@ +import type { SyncMsg } from '@sync/wire' +import type { SyncWS } from '../server' +import { replyError } from './msg' + +/** + * Returns true, if the user is NOT the owner of the room. (it sent an error back to the user) + * @param ws Message sender + * @param msg Message we're guarding against + */ +export function ownerGuard(ws: SyncWS, msg: SyncMsg): boolean { + const me = ws.data.user + + if (me === me.room.owner) { + return false + } + + replyError(ws, msg, { + type: 'unauthorized', + message: 'You must be the owner of the room to perform this action.', + }) + + return true +} + +/** + * Returns true if the target user does IS the owner (sends an error back to the user) + * @param ws Message sender + * @param msg Message we're guarding against + */ +export function notOwnerGuard(ws: SyncWS, msg: SyncMsg): boolean { + const me = ws.data.user + + if (me !== me.room.owner) { + return false + } + + replyError(ws, msg, { + type: 'unauthorized', + message: "You can't perform this action as owner!", + }) + return true +} + +/** + * Returns true if the target user does NOT exist in the room (sends an error back to the user) + * @param ws Message sender + * @param msg Message we're guarding against + */ +export function targetUserExistsGuard( + ws: SyncWS, + msg: SyncMsg, +): boolean { + const me = ws.data.user + if (me.room.users.has(msg.body.userId)) return false + + replyError(ws, msg, { + type: 'userNotFound', + message: 'The specified user does not exist in the room.', + }) + + return true +} + +/** + * Returns true if sender IS also the target user (sends an error back to the user) + * @param ws Message sender + * @param msg Message we're guarding against + */ +export function notSelfGuard(ws: SyncWS, msg: SyncMsg): boolean { + const me = ws.data.user + + if (msg.body.userId !== me.id) return false + + replyError(ws, msg, { + type: 'selfSuckIncident', + message: "You can't perform this action on yourself!", + }) + return true +} diff --git a/backend/src/util/msg.ts b/backend/src/util/msg.ts new file mode 100644 index 0000000..38029ae --- /dev/null +++ b/backend/src/util/msg.ts @@ -0,0 +1,38 @@ +import type { ClientMessage, ServerMessage, SyncError, SyncMsg } from '@sync/wire' +import { serializeMsg, type BodyTypeFromKey } from '@sync/wire/backend' +import type { SyncServer, SyncWS } from '../server' + +export function replyError(ws: SyncWS, msg: SyncMsg, err: SyncError) { + ws.send( + serializeMsg( + 'error', + { + ...err, + cause: msg.type, + }, + msg.id, + ), + ) +} + +export function replyOk(ws: SyncWS, msg: SyncMsg) { + ws.send(serializeMsg('ok', null, msg.id)) +} + +export function broadcast( + server: SyncServer, + ws: SyncWS, + replyType: T, + replyBody: BodyTypeFromKey, +) { + server.publish(ws.data.user.room.topic, serializeMsg(replyType, replyBody)) +} + +export function reply( + ws: SyncWS, + msg: ClientMessage, + replyType: T, + replyBody: BodyTypeFromKey, +) { + ws.send(serializeMsg(replyType, replyBody, msg.id)) +} diff --git a/wire/src/backend/index.ts b/wire/src/backend/index.ts index 9a934a3..8e14696 100644 --- a/wire/src/backend/index.ts +++ b/wire/src/backend/index.ts @@ -14,16 +14,12 @@ const MEDIA_JWS_VALIDATOR = Type.String({ const MESSAGE_VALIDATORS_UNCOMPILED = { ping: Type.Null(), - message: Type.Union([ + message: Type.Partial( Type.Object({ text: CHAT_TEXT_VALIDATOR, - }), - Type.Object({ - text: Type.Optional(CHAT_TEXT_VALIDATOR), recommendation: MEDIA_JWS_VALIDATOR, }), - ]), - + ), sync: Type.Union([ Type.Object({ state: Type.Literal('idle'), @@ -38,7 +34,7 @@ const MESSAGE_VALIDATORS_UNCOMPILED = { Type.Object({ state: Type.Literal('playing'), media: MEDIA_JWS_VALIDATOR, - offset: Type.Number(/* i'm too dumb to figgure out what the range is here */), + offset: Type.Number(/* i'm too dumb to figure out what the range is here */), rate: Type.Number({ minimum: 0 }), }), ]), @@ -47,6 +43,10 @@ const MESSAGE_VALIDATORS_UNCOMPILED = { userId: Type.String({ format: 'uuid' }), }), + promote: Type.Object({ + userId: Type.String({ format: 'uuid' }), + }), + clearChat: Type.Null(), updateRoom: Type.Partial( @@ -160,6 +160,8 @@ export function parseMessage(message: string): ClientMessage { return parsed as ClientMessage } +export type BodyTypeFromKey = ServerMsgMap[Tkey] + export function serializeMsg( type: Tkey, body: ServerMsgMap[Tkey], diff --git a/wire/src/types/error.ts b/wire/src/types/error.ts index 360cf6f..0660f22 100644 --- a/wire/src/types/error.ts +++ b/wire/src/types/error.ts @@ -5,13 +5,13 @@ type ErrorType = | 'serverError' | 'unauthorized' | 'ratelimit' - | 'badSync' - | 'invalidMedia' | 'userNotFound' + | 'selfSuckIncident' + | 'invalidChatMessage' + | 'invalidMedia' + | 'badSync' + | 'badPlaylist' | 'badRoomUpdate' - | 'playlistDuplicates' - | 'playlistCurrentMediaMissing' - export interface SyncError { cause?: string // message type that caused the error (helping clients out) diff --git a/wire/src/types/msg.ts b/wire/src/types/msg.ts index 5e0d80a..94c6cff 100644 --- a/wire/src/types/msg.ts +++ b/wire/src/types/msg.ts @@ -31,6 +31,7 @@ export interface ServerMsgMap { userStruggle: { userId: string } + ok: null error: SyncError }