From 93ca742cf228e0d7401475d3670e9dfd03193e13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20St=C4=99pie=C5=84?= Date: Sun, 12 Jan 2025 06:01:34 +0100 Subject: [PATCH 1/2] Synchronise room settings over websockets --- internal/infra/ws/wshub.go | 39 ++++++- internal/lib/interfaces.go | 5 + internal/lib/messages/client.go | 14 ++- internal/lib/messages/server.go | 1 + internal/lib/room/processMessages.go | 7 ++ internal/lib/room/room.go | 100 ++++++++++++++---- .../roomrepository/inmemoryroomrepository.go | 10 +- 7 files changed, 139 insertions(+), 37 deletions(-) create mode 100644 internal/lib/interfaces.go diff --git a/internal/infra/ws/wshub.go b/internal/infra/ws/wshub.go index beb7006..6a991b2 100644 --- a/internal/infra/ws/wshub.go +++ b/internal/infra/ws/wshub.go @@ -6,6 +6,7 @@ import ( "fmt" "sync" + "github.com/buger/jsonparser" "github.com/feelbeatapp/feelbeatserver/internal/infra/fblog" "github.com/feelbeatapp/feelbeatserver/internal/lib/component" "github.com/feelbeatapp/feelbeatserver/internal/lib/feelbeaterror" @@ -122,20 +123,50 @@ func (h *WSHub) passMessages(ctx context.Context, wg *sync.WaitGroup, from strin h.unregister <- from return } - var message messages.ClientMessage - err := json.Unmarshal(bytes, &message) + + msgType, err := jsonparser.GetString(bytes, "type") + if err != nil { + fblog.Error(component.Hub, "Failed to parse client message", "err", err) + continue + } + payload, _, _, err := jsonparser.Get(bytes, "payload") if err != nil { fblog.Error(component.Hub, "Failed to parse client message", "err", err) + continue } - message.From = from - h.rcv <- message + h.rcv <- decodeMessage(from, msgType, payload) case <-ctx.Done(): return } } } +func decodeMessage(from string, msgType string, payload []byte) messages.ClientMessage { + var settingsUpdate messages.SettingsUpdatePayload + + var err error + var result interface{} + switch msgType { + case messages.SettingsUpdate: + err = json.Unmarshal(payload, &settingsUpdate) + result = settingsUpdate + } + + if err != nil { + return messages.ClientMessage{ + From: from, + Type: messages.ClientMessageType(msgType), + } + } else { + return messages.ClientMessage{ + From: from, + Type: messages.ClientMessageType(msgType), + Payload: result, + } + } +} + func (h *WSHub) sendMessage(message messages.ServerMessage) { bytes, err := json.Marshal(message.ToUnit()) if err != nil { diff --git a/internal/lib/interfaces.go b/internal/lib/interfaces.go new file mode 100644 index 0000000..af9ee70 --- /dev/null +++ b/internal/lib/interfaces.go @@ -0,0 +1,5 @@ +package lib + +type SpotifyApi interface { + FetchPlaylistData(playlistId string, token string) (PlaylistData, error) +} diff --git a/internal/lib/messages/client.go b/internal/lib/messages/client.go index 2646dc7..9437f6e 100644 --- a/internal/lib/messages/client.go +++ b/internal/lib/messages/client.go @@ -5,16 +5,22 @@ import "github.com/feelbeatapp/feelbeatserver/internal/lib" type ClientMessageType string const ( - JoiningPlayer = "JOIN" - LeavingPlayer = "LEAVE" + JoiningPlayer = "JOIN" + LeavingPlayer = "LEAVE" + SettingsUpdate = "SETTINGS_UPDATE" ) type ClientMessage struct { Type ClientMessageType `json:"type"` - From string - Payload interface{} `json:"payload"` + From string `json:"-"` + Payload interface{} `json:"payload"` } type JoiningPlayerPayload struct { User lib.UserProfile } + +type SettingsUpdatePayload struct { + Token string `json:"token"` + Settings lib.RoomSettings `json:"settings"` +} diff --git a/internal/lib/messages/server.go b/internal/lib/messages/server.go index f38ee8d..b8bb3b9 100644 --- a/internal/lib/messages/server.go +++ b/internal/lib/messages/server.go @@ -28,6 +28,7 @@ const ( InitialMessage = "INITIAL" NewPlayer = "NEW_PLAYER" PlayerLeft = "PLAYER_LEFT" + ServerError = "SERVER_ERROR" ) type InitialGameState struct { diff --git a/internal/lib/room/processMessages.go b/internal/lib/room/processMessages.go index 6c6f548..121bd36 100644 --- a/internal/lib/room/processMessages.go +++ b/internal/lib/room/processMessages.go @@ -18,6 +18,13 @@ func (r *Room) processMessages() { } case messages.LeavingPlayer: r.removePlayer(message.From) + case messages.SettingsUpdate: + payload, ok := message.Payload.(messages.SettingsUpdatePayload) + if !ok { + logIncorrectPayload("Incorrect payload in settings update", message.Payload, message.From) + } else { + r.updateSettings(message.From, payload) + } default: fblog.Warn(component.Room, "Received unexpected message", "room", r.id, "from", message.From, "type", message.Type, "payload", message.Payload) } diff --git a/internal/lib/room/room.go b/internal/lib/room/room.go index 5bdcc17..ada89a3 100644 --- a/internal/lib/room/room.go +++ b/internal/lib/room/room.go @@ -1,38 +1,50 @@ package room import ( + "errors" + "github.com/feelbeatapp/feelbeatserver/internal/infra/fblog" "github.com/feelbeatapp/feelbeatserver/internal/lib" "github.com/feelbeatapp/feelbeatserver/internal/lib/component" + "github.com/feelbeatapp/feelbeatserver/internal/lib/feelbeaterror" "github.com/feelbeatapp/feelbeatserver/internal/lib/messages" ) type Room struct { - id string - playlist lib.PlaylistData - owner lib.UserProfile - settings lib.RoomSettings - players map[string]Player - hub messages.Hub - snd chan messages.ServerMessage - rcv <-chan messages.ClientMessage - onCleanup func(*Room) + id string + playlist lib.PlaylistData + owner lib.UserProfile + settings lib.RoomSettings + players map[string]Player + hub messages.Hub + snd chan messages.ServerMessage + rcv <-chan messages.ClientMessage + onCleanup func(*Room) + spotifyApi lib.SpotifyApi } type Player struct { profile lib.UserProfile } -func NewRoom(id string, playlist lib.PlaylistData, owner lib.UserProfile, settings lib.RoomSettings, hub messages.Hub, onCleanup func(*Room)) *Room { +func NewRoom(id string, + playlist lib.PlaylistData, + owner lib.UserProfile, + settings lib.RoomSettings, + hub messages.Hub, + spotifyApi lib.SpotifyApi, + onCleanup func(*Room), +) *Room { return &Room{ - id: id, - playlist: playlist, - owner: owner, - settings: settings, - players: make(map[string]Player), - hub: hub, - snd: make(chan messages.ServerMessage), - onCleanup: onCleanup, + id: id, + playlist: playlist, + owner: owner, + settings: settings, + players: make(map[string]Player), + hub: hub, + snd: make(chan messages.ServerMessage), + onCleanup: onCleanup, + spotifyApi: spotifyApi, } } @@ -77,6 +89,11 @@ func (r *Room) addPlayer(profile lib.UserProfile) { fblog.Info(component.Room, "new player", "roomId", r.id, "userId", profile.Id) + r.snd <- r.packIntialState(profile.Id) + r.sendToAllExcept(profile.Id, messages.NewPlayer, profile) +} + +func (r *Room) packIntialState(me string) messages.ServerMessage { playerProfiles := make([]lib.UserProfile, 0) for _, p := range r.players { playerProfiles = append(playerProfiles, p.profile) @@ -93,12 +110,12 @@ func (r *Room) addPlayer(profile lib.UserProfile) { }) } - r.snd <- messages.ServerMessage{ - To: []string{profile.Id}, + return messages.ServerMessage{ + To: []string{me}, Type: messages.InitialMessage, Payload: messages.InitialGameState{ Id: r.id, - Me: profile.Id, + Me: me, Admin: r.owner.Id, Playlist: messages.PlaylistState{ Name: r.Name(), @@ -109,7 +126,7 @@ func (r *Room) addPlayer(profile lib.UserProfile) { Settings: r.settings, }, } - r.sendToAllExcept(profile.Id, messages.NewPlayer, profile) + } func (r *Room) removePlayer(id string) { @@ -145,6 +162,45 @@ func (r *Room) removePlayer(id string) { } } +func (r *Room) updateSettings(from string, settingsPayload messages.SettingsUpdatePayload) { + if settingsPayload.Settings.MaxPlayers < len(r.players) { + return + } + + ok := true + if settingsPayload.Settings.PlaylistId != r.settings.PlaylistId { + playlistData, err := r.spotifyApi.FetchPlaylistData(settingsPayload.Settings.PlaylistId, settingsPayload.Token) + if err != nil { + ok = false + fblog.Error(component.Room, "failed to change playlist", "roomId", r.id, "err", err) + var fbErr *feelbeaterror.FeelBeatError + if errors.As(err, &fbErr) { + + r.snd <- messages.ServerMessage{ + Type: messages.ServerError, + To: []string{from}, + Payload: fbErr.UserMessage, + } + } + } + + r.playlist = playlistData + + } + + if ok { + r.settings = settingsPayload.Settings + fblog.Info(component.Room, "settings updated", "roomId", r.id, "settings", r.settings) + + for _, player := range r.players { + r.snd <- r.packIntialState(player.profile.Id) + } + } else { + r.snd <- r.packIntialState(from) + } + +} + func (r *Room) sendToAllExcept(id string, messageType messages.ServerMessageType, payload interface{}) { recipents := make([]string, 0) for _, p := range r.players { diff --git a/internal/lib/roomrepository/inmemoryroomrepository.go b/internal/lib/roomrepository/inmemoryroomrepository.go index e2d5be9..16d7a29 100644 --- a/internal/lib/roomrepository/inmemoryroomrepository.go +++ b/internal/lib/roomrepository/inmemoryroomrepository.go @@ -12,18 +12,14 @@ import ( "github.com/google/uuid" ) -type SpotifyApi interface { - FetchPlaylistData(playlistId string, token string) (lib.PlaylistData, error) -} - type InMemoryRoomRepository struct { createHub func() messages.Hub - spotify SpotifyApi + spotify lib.SpotifyApi rooms map[string]*room.Room m sync.RWMutex } -func NewInMemoryRoomRepository(spotify SpotifyApi, createHub func() messages.Hub) *InMemoryRoomRepository { +func NewInMemoryRoomRepository(spotify lib.SpotifyApi, createHub func() messages.Hub) *InMemoryRoomRepository { return &InMemoryRoomRepository{ createHub: createHub, spotify: spotify, @@ -37,7 +33,7 @@ func (r *InMemoryRoomRepository) CreateRoom(user auth.User, settings lib.RoomSet return "", err } - newRoom := room.NewRoom(uuid.NewString(), playlistData, user.Profile, settings, r.createHub(), func(room *room.Room) { + newRoom := room.NewRoom(uuid.NewString(), playlistData, user.Profile, settings, r.createHub(), r.spotify, func(room *room.Room) { r.m.Lock() delete(r.rooms, room.Id()) r.m.Unlock() From 3902fc0f64560dff1db74b90f0cc5c1df6ba97dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20St=C4=99pie=C5=84?= Date: Sun, 12 Jan 2025 16:30:17 +0100 Subject: [PATCH 2/2] Add readiness handling and stage transition --- internal/infra/ws/servewebsockets.go | 7 +++ internal/infra/ws/wshub.go | 2 + internal/lib/feelbeaterror/errorcode.go | 3 ++ internal/lib/messages/client.go | 1 + internal/lib/messages/server.go | 8 +++ internal/lib/room/processMessages.go | 6 +++ internal/lib/room/room.go | 51 +++++++++++++++++++ internal/lib/room/roomstage.go | 8 +++ .../roomrepository/inmemoryroomrepository.go | 6 ++- 9 files changed, 90 insertions(+), 2 deletions(-) create mode 100644 internal/lib/room/roomstage.go diff --git a/internal/infra/ws/servewebsockets.go b/internal/infra/ws/servewebsockets.go index 554f120..61a3064 100644 --- a/internal/infra/ws/servewebsockets.go +++ b/internal/infra/ws/servewebsockets.go @@ -8,6 +8,7 @@ import ( "github.com/feelbeatapp/feelbeatserver/internal/infra/auth" "github.com/feelbeatapp/feelbeatserver/internal/lib/feelbeaterror" "github.com/feelbeatapp/feelbeatserver/internal/lib/messages" + roomLib "github.com/feelbeatapp/feelbeatserver/internal/lib/room" "github.com/gorilla/websocket" ) @@ -32,6 +33,12 @@ func (w WSHandler) websocketHandler(user auth.User, res http.ResponseWriter, req return } + if room.Stage() != roomLib.LobbyStage { + http.Error(res, feelbeaterror.RoomGameStage, feelbeaterror.StatusCode(feelbeaterror.RoomGameStage)) + api.LogApiError("user rejected, room in game stage", nil, user.Profile.Id, req) + return + } + conn, err := upgrader.Upgrade(res, req, nil) if err != nil { http.Error(res, feelbeaterror.Default, feelbeaterror.StatusCode(feelbeaterror.Default)) diff --git a/internal/infra/ws/wshub.go b/internal/infra/ws/wshub.go index 6a991b2..057aa85 100644 --- a/internal/infra/ws/wshub.go +++ b/internal/infra/ws/wshub.go @@ -151,6 +151,8 @@ func decodeMessage(from string, msgType string, payload []byte) messages.ClientM case messages.SettingsUpdate: err = json.Unmarshal(payload, &settingsUpdate) result = settingsUpdate + case messages.ReadyStatus: + result, err = jsonparser.GetBoolean(payload) } if err != nil { diff --git a/internal/lib/feelbeaterror/errorcode.go b/internal/lib/feelbeaterror/errorcode.go index d9ea3cd..561bbbe 100644 --- a/internal/lib/feelbeaterror/errorcode.go +++ b/internal/lib/feelbeaterror/errorcode.go @@ -11,6 +11,7 @@ const ( RoomNotFound = "Room not found" RoomFull = "Room is full" EncodingMessageFailed = "Encoding message failed" + RoomGameStage = "Room is already in game mode" ) func StatusCode(code ErrorCode) int { @@ -21,6 +22,8 @@ func StatusCode(code ErrorCode) int { return http.StatusForbidden case AuthFailed: return http.StatusForbidden + case RoomGameStage: + return http.StatusBadRequest default: return http.StatusInternalServerError } diff --git a/internal/lib/messages/client.go b/internal/lib/messages/client.go index 9437f6e..9e3c157 100644 --- a/internal/lib/messages/client.go +++ b/internal/lib/messages/client.go @@ -8,6 +8,7 @@ const ( JoiningPlayer = "JOIN" LeavingPlayer = "LEAVE" SettingsUpdate = "SETTINGS_UPDATE" + ReadyStatus = "READY_STATUS" ) type ClientMessage struct { diff --git a/internal/lib/messages/server.go b/internal/lib/messages/server.go index b8bb3b9..3dc3f7e 100644 --- a/internal/lib/messages/server.go +++ b/internal/lib/messages/server.go @@ -29,6 +29,8 @@ const ( NewPlayer = "NEW_PLAYER" PlayerLeft = "PLAYER_LEFT" ServerError = "SERVER_ERROR" + RoomStage = "ROOM_STAGE" + PlayerReady = "PLAYER_READY" ) type InitialGameState struct { @@ -38,6 +40,7 @@ type InitialGameState struct { Playlist PlaylistState `json:"playlist"` Players []lib.UserProfile `json:"players"` Settings lib.RoomSettings `json:"settings"` + ReadyMap map[string]bool `json:"readyMap"` } type PlaylistState struct { @@ -58,3 +61,8 @@ type PlayerLeftPayload struct { Left string `json:"left"` Admin string `json:"admin"` } + +type PlayerReadyPayload struct { + Player string `json:"player"` + Ready bool `json:"ready"` +} diff --git a/internal/lib/room/processMessages.go b/internal/lib/room/processMessages.go index 121bd36..3c7f5f0 100644 --- a/internal/lib/room/processMessages.go +++ b/internal/lib/room/processMessages.go @@ -25,6 +25,12 @@ func (r *Room) processMessages() { } else { r.updateSettings(message.From, payload) } + case messages.ReadyStatus: + if ready, ok := message.Payload.(bool); ok { + r.updateReady(message.From, ready) + } else { + logIncorrectPayload("Incorrect paylod in ready status", message.Payload, message.From) + } default: fblog.Warn(component.Room, "Received unexpected message", "room", r.id, "from", message.From, "type", message.Type, "payload", message.Payload) } diff --git a/internal/lib/room/room.go b/internal/lib/room/room.go index ada89a3..708dc80 100644 --- a/internal/lib/room/room.go +++ b/internal/lib/room/room.go @@ -16,6 +16,8 @@ type Room struct { owner lib.UserProfile settings lib.RoomSettings players map[string]Player + readyMap map[string]bool + stage RoomStage hub messages.Hub snd chan messages.ServerMessage rcv <-chan messages.ClientMessage @@ -41,6 +43,8 @@ func NewRoom(id string, owner: owner, settings: settings, players: make(map[string]Player), + readyMap: make(map[string]bool), + stage: LobbyStage, hub: hub, snd: make(chan messages.ServerMessage), onCleanup: onCleanup, @@ -73,6 +77,10 @@ func (r *Room) Settings() lib.RoomSettings { return r.settings } +func (r *Room) Stage() RoomStage { + return r.stage +} + func (r *Room) Start() { r.rcv = r.hub.Run(r.snd) go r.processMessages() @@ -83,6 +91,10 @@ func (r *Room) Hub() messages.Hub { } func (r *Room) addPlayer(profile lib.UserProfile) { + if r.stage != LobbyStage { + return + } + r.players[profile.Id] = Player{ profile: profile, } @@ -123,6 +135,7 @@ func (r *Room) packIntialState(me string) messages.ServerMessage { Songs: packedSongs, }, Players: playerProfiles, + ReadyMap: r.readyMap, Settings: r.settings, }, } @@ -160,6 +173,10 @@ func (r *Room) removePlayer(id string) { Admin: r.owner.Id, }, } + + if r.allReady() { + r.broadcastRoomStage(GameStage) + } } func (r *Room) updateSettings(from string, settingsPayload messages.SettingsUpdatePayload) { @@ -201,6 +218,40 @@ func (r *Room) updateSettings(from string, settingsPayload messages.SettingsUpda } +func (r *Room) updateReady(from string, ready bool) { + r.readyMap[from] = ready + + if r.allReady() { + r.broadcastRoomStage(GameStage) + } else { + r.sendToAllExcept(from, messages.PlayerReady, messages.PlayerReadyPayload{ + Player: from, + Ready: ready, + }) + } +} + +func (r *Room) allReady() bool { + allReady := true + for _, p := range r.players { + allReady = allReady && r.readyMap[p.profile.Id] + } + return allReady +} + +func (r *Room) broadcastRoomStage(stage RoomStage) { + recipents := make([]string, 0) + for _, p := range r.players { + recipents = append(recipents, p.profile.Id) + } + + r.snd <- messages.ServerMessage{ + To: recipents, + Type: messages.RoomStage, + Payload: stage, + } +} + func (r *Room) sendToAllExcept(id string, messageType messages.ServerMessageType, payload interface{}) { recipents := make([]string, 0) for _, p := range r.players { diff --git a/internal/lib/room/roomstage.go b/internal/lib/room/roomstage.go new file mode 100644 index 0000000..7d48e52 --- /dev/null +++ b/internal/lib/room/roomstage.go @@ -0,0 +1,8 @@ +package room + +type RoomStage string + +const ( + LobbyStage = "LOBBY" + GameStage = "GAME" +) diff --git a/internal/lib/roomrepository/inmemoryroomrepository.go b/internal/lib/roomrepository/inmemoryroomrepository.go index 16d7a29..175b6cf 100644 --- a/internal/lib/roomrepository/inmemoryroomrepository.go +++ b/internal/lib/roomrepository/inmemoryroomrepository.go @@ -53,8 +53,10 @@ func (r *InMemoryRoomRepository) CreateRoom(user auth.User, settings lib.RoomSet func (r *InMemoryRoomRepository) GetAllRooms() []*room.Room { result := make([]*room.Room, 0) r.m.RLock() - for _, room := range r.rooms { - result = append(result, room) + for _, r := range r.rooms { + if r.Stage() == room.LobbyStage { + result = append(result, r) + } } r.m.RUnlock()