diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..11643a83 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,40 @@ +import os +import sys +from unittest.mock import MagicMock, patch + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + + +def make_room(name="testroom", dbhandle=None): + from syncplay.server import Room + return Room(name, dbhandle) + + +def make_controlled_room(name="+testroom:AABBCCDDEE11", dbhandle=None): + from syncplay.server import ControlledRoom + return ControlledRoom(name, dbhandle) + + +def make_watcher(server=None, connector=None, name="testuser"): + from syncplay.server import Watcher + if server is None: + server = MagicMock() + server.disableReady = False + if connector is None: + connector = MagicMock() + connector.isLogged.return_value = True + connector.getFeatures.return_value = {"uiMode": "GUI"} + connector.getVersion.return_value = "1.7.4" + connector.meetsMinVersion.return_value = True + with patch('syncplay.server.reactor'): + watcher = Watcher(server, connector, name) + return watcher + + +def make_room_manager(roomsdbfile=None, permanentRooms=None): + from syncplay.server import RoomManager + if permanentRooms is None: + permanentRooms = [] + with patch('syncplay.server.RoomDBManager'): + manager = RoomManager(roomsdbfile=roomsdbfile, permanentRooms=permanentRooms) + return manager diff --git a/tests/test_constants.py b/tests/test_constants.py new file mode 100644 index 00000000..81b28f07 --- /dev/null +++ b/tests/test_constants.py @@ -0,0 +1,175 @@ +import re +import unittest + + +class TestKeyConstants(unittest.TestCase): + def test_default_port(self): + from syncplay.constants import DEFAULT_PORT + self.assertIsInstance(DEFAULT_PORT, int) + self.assertGreater(DEFAULT_PORT, 0) + self.assertLess(DEFAULT_PORT, 65536) + + def test_protocol_timeout(self): + from syncplay.constants import PROTOCOL_TIMEOUT + self.assertIsInstance(PROTOCOL_TIMEOUT, (int, float)) + self.assertGreater(PROTOCOL_TIMEOUT, 0) + + def test_server_state_interval(self): + from syncplay.constants import SERVER_STATE_INTERVAL + self.assertIsInstance(SERVER_STATE_INTERVAL, (int, float)) + self.assertGreater(SERVER_STATE_INTERVAL, 0) + + def test_ping_moving_average_weight(self): + from syncplay.constants import PING_MOVING_AVERAGE_WEIGHT + self.assertGreater(PING_MOVING_AVERAGE_WEIGHT, 0) + self.assertLess(PING_MOVING_AVERAGE_WEIGHT, 1) + + def test_max_lengths_positive(self): + from syncplay.constants import ( + MAX_CHAT_MESSAGE_LENGTH, MAX_USERNAME_LENGTH, + MAX_ROOM_NAME_LENGTH, MAX_FILENAME_LENGTH + ) + for val in [MAX_CHAT_MESSAGE_LENGTH, MAX_USERNAME_LENGTH, + MAX_ROOM_NAME_LENGTH, MAX_FILENAME_LENGTH]: + self.assertIsInstance(val, int) + self.assertGreater(val, 0) + + def test_playlist_limits(self): + from syncplay.constants import PLAYLIST_MAX_CHARACTERS, PLAYLIST_MAX_ITEMS + self.assertIsInstance(PLAYLIST_MAX_CHARACTERS, int) + self.assertIsInstance(PLAYLIST_MAX_ITEMS, int) + self.assertGreater(PLAYLIST_MAX_CHARACTERS, 0) + self.assertGreater(PLAYLIST_MAX_ITEMS, 0) + + def test_sync_thresholds(self): + from syncplay.constants import ( + DEFAULT_REWIND_THRESHOLD, DEFAULT_FASTFORWARD_THRESHOLD, + DIFFERENT_DURATION_THRESHOLD, SEEK_THRESHOLD + ) + self.assertGreater(DEFAULT_REWIND_THRESHOLD, 0) + self.assertGreater(DEFAULT_FASTFORWARD_THRESHOLD, 0) + self.assertGreater(DIFFERENT_DURATION_THRESHOLD, 0) + self.assertGreater(SEEK_THRESHOLD, 0) + + def test_version_strings_format(self): + from syncplay.constants import ( + RECENT_CLIENT_THRESHOLD, CONTROLLED_ROOMS_MIN_VERSION, + USER_READY_MIN_VERSION, SHARED_PLAYLIST_MIN_VERSION, + CHAT_MIN_VERSION + ) + for ver in [RECENT_CLIENT_THRESHOLD, CONTROLLED_ROOMS_MIN_VERSION, + USER_READY_MIN_VERSION, SHARED_PLAYLIST_MIN_VERSION, + CHAT_MIN_VERSION]: + parts = ver.split(".") + self.assertEqual(len(parts), 3) + for part in parts: + int(part) + + def test_ui_modes(self): + from syncplay.constants import CONSOLE_UI_MODE, GRAPHICAL_UI_MODE, UNKNOWN_UI_MODE + self.assertEqual(CONSOLE_UI_MODE, "CLI") + self.assertEqual(GRAPHICAL_UI_MODE, "GUI") + self.assertEqual(UNKNOWN_UI_MODE, "Unknown") + + def test_privacy_modes(self): + from syncplay.constants import ( + PRIVACY_SENDRAW_MODE, PRIVACY_SENDHASHED_MODE, + PRIVACY_DONTSEND_MODE, PRIVACY_HIDDENFILENAME + ) + self.assertIsInstance(PRIVACY_SENDRAW_MODE, str) + self.assertIsInstance(PRIVACY_SENDHASHED_MODE, str) + self.assertIsInstance(PRIVACY_DONTSEND_MODE, str) + self.assertIsInstance(PRIVACY_HIDDENFILENAME, str) + + +class TestGetValueForOS(unittest.TestCase): + def test_returns_value(self): + from syncplay.constants import getValueForOS + result = getValueForOS({"win": "a", "linux": "b", "darwin": "c", "default": "d"}) + self.assertIsNotNone(result) + + def test_default_fallback(self): + from syncplay.constants import getValueForOS + result = getValueForOS({"default": "fallback"}) + self.assertIsNotNone(result) + + def test_monospace_font(self): + from syncplay.constants import MONOSPACE_FONT + self.assertIsInstance(MONOSPACE_FONT, str) + self.assertGreater(len(MONOSPACE_FONT), 0) + + +class TestRegexPatterns(unittest.TestCase): + def test_parse_time_regex_compiles(self): + from syncplay.constants import PARSE_TIME_REGEX + compiled = re.compile(PARSE_TIME_REGEX) + self.assertIsNotNone(compiled) + + def test_filename_strip_regex_compiles(self): + from syncplay.constants import FILENAME_STRIP_REGEX + compiled = re.compile(FILENAME_STRIP_REGEX) + self.assertIsNotNone(compiled) + + def test_room_name_strip_regex_compiles(self): + from syncplay.constants import ROOM_NAME_STRIP_REGEX + compiled = re.compile(ROOM_NAME_STRIP_REGEX) + self.assertIsNotNone(compiled) + + def test_argument_split_regex_compiles(self): + from syncplay.constants import ARGUMENT_SPLIT_REGEX + compiled = re.compile(ARGUMENT_SPLIT_REGEX) + self.assertIsNotNone(compiled) + + def test_ui_command_regex_compiles(self): + from syncplay.constants import UI_COMMAND_REGEX + compiled = re.compile(UI_COMMAND_REGEX) + self.assertIsNotNone(compiled) + + def test_ui_offset_regex_compiles(self): + from syncplay.constants import UI_OFFSET_REGEX + compiled = re.compile(UI_OFFSET_REGEX) + self.assertIsNotNone(compiled) + + def test_ui_seek_regex_compiles(self): + from syncplay.constants import UI_SEEK_REGEX + compiled = re.compile(UI_SEEK_REGEX) + self.assertIsNotNone(compiled) + + def test_mplayer_answer_regex_compiles(self): + from syncplay.constants import MPLAYER_ANSWER_REGEX + compiled = re.compile(MPLAYER_ANSWER_REGEX) + self.assertIsNotNone(compiled) + + def test_vlc_answer_regex_compiles(self): + from syncplay.constants import VLC_ANSWER_REGEX + compiled = re.compile(VLC_ANSWER_REGEX) + self.assertIsNotNone(compiled) + + def test_message_with_username_regex_compiles(self): + from syncplay.constants import MESSAGE_WITH_USERNAME_REGEX + compiled = re.compile(MESSAGE_WITH_USERNAME_REGEX) + self.assertIsNotNone(compiled) + + def test_parse_time_regex_matches_basic(self): + from syncplay.constants import PARSE_TIME_REGEX + compiled = re.compile(PARSE_TIME_REGEX) + m = compiled.match("1:30:00") + self.assertIsNotNone(m) + + def test_room_name_strip_regex_matches(self): + from syncplay.constants import ROOM_NAME_STRIP_REGEX + compiled = re.compile(ROOM_NAME_STRIP_REGEX) + m = compiled.match("+myroom:AABBCCDDEE11") + self.assertIsNotNone(m) + self.assertEqual(m.group("roomnamebase"), "myroom") + + +class TestTLSConstants(unittest.TestCase): + def test_tls_cert_rotation_retries(self): + from syncplay.constants import TLS_CERT_ROTATION_MAX_RETRIES + self.assertIsInstance(TLS_CERT_ROTATION_MAX_RETRIES, int) + self.assertGreater(TLS_CERT_ROTATION_MAX_RETRIES, 0) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_protocols.py b/tests/test_protocols.py new file mode 100644 index 00000000..8e875fb6 --- /dev/null +++ b/tests/test_protocols.py @@ -0,0 +1,319 @@ +import time +import unittest +from unittest.mock import MagicMock, patch, PropertyMock + + +class TestJSONCommandProtocolHandleMessages(unittest.TestCase): + def _make_protocol(self): + from syncplay.protocols import JSONCommandProtocol + proto = JSONCommandProtocol() + proto.handleHello = MagicMock() + proto.handleSet = MagicMock() + proto.handleState = MagicMock() + proto.handleList = MagicMock() + proto.handleChat = MagicMock() + proto.handleError = MagicMock() + proto.handleTLS = MagicMock() + proto.dropWithError = MagicMock() + proto.showDebugMessage = MagicMock() + return proto + + def test_hello_dispatch(self): + proto = self._make_protocol() + proto.handleMessages({"Hello": {"username": "alice"}}) + proto.handleHello.assert_called_once_with({"username": "alice"}) + + def test_set_dispatch(self): + proto = self._make_protocol() + proto.handleMessages({"Set": {"room": {"name": "test"}}}) + proto.handleSet.assert_called_once() + + def test_state_dispatch(self): + proto = self._make_protocol() + proto.handleMessages({"State": {"playstate": {}}}) + proto.handleState.assert_called_once() + + def test_list_dispatch(self): + proto = self._make_protocol() + proto.handleMessages({"List": None}) + proto.handleList.assert_called_once() + + def test_chat_dispatch(self): + proto = self._make_protocol() + proto.handleMessages({"Chat": {"message": "hi"}}) + proto.handleChat.assert_called_once() + + def test_error_dispatch(self): + proto = self._make_protocol() + proto.handleMessages({"Error": {"message": "oops"}}) + proto.handleError.assert_called_once() + + def test_tls_dispatch(self): + proto = self._make_protocol() + proto.handleMessages({"TLS": {"startTLS": "send"}}) + proto.handleTLS.assert_called_once() + + def test_unknown_command_drops(self): + proto = self._make_protocol() + proto.handleMessages({"UnknownCmd": {}}) + proto.dropWithError.assert_called_once() + + +class TestSyncClientProtocolExtractHello(unittest.TestCase): + def _make_client_protocol(self): + from syncplay.protocols import SyncClientProtocol + client = MagicMock() + client.ui = MagicMock() + proto = SyncClientProtocol(client) + return proto + + def test_full_extraction(self): + proto = self._make_client_protocol() + hello = { + "username": "alice", + "room": {"name": "testroom"}, + "version": "1.2.255", + "realversion": "1.7.4", + "motd": "Welcome", + "features": {"chat": True} + } + username, roomName, version, motd, features = proto._extractHelloArguments(hello) + self.assertEqual(username, "alice") + self.assertEqual(roomName, "testroom") + self.assertEqual(version, "1.7.4") + self.assertEqual(motd, "Welcome") + self.assertEqual(features, {"chat": True}) + + def test_missing_fields(self): + proto = self._make_client_protocol() + hello = {} + username, roomName, version, motd, features = proto._extractHelloArguments(hello) + self.assertIsNone(username) + self.assertIsNone(roomName) + self.assertIsNone(version) + self.assertIsNone(motd) + self.assertIsNone(features) + + def test_realversion_overrides_version(self): + proto = self._make_client_protocol() + hello = {"version": "1.2.255", "realversion": "1.7.4"} + _, _, version, _, _ = proto._extractHelloArguments(hello) + self.assertEqual(version, "1.7.4") + + +class TestSyncClientProtocolExtractStatePlaystate(unittest.TestCase): + def _make_client_protocol(self): + from syncplay.protocols import SyncClientProtocol + proto = SyncClientProtocol(MagicMock()) + return proto + + def test_full_extraction(self): + proto = self._make_client_protocol() + state = { + "playstate": { + "position": 42.5, + "paused": True, + "doSeek": False, + "setBy": "alice" + } + } + position, paused, doSeek, setBy = proto._extractStatePlaystateArguments(state) + self.assertEqual(position, 42.5) + self.assertTrue(paused) + self.assertFalse(doSeek) + self.assertEqual(setBy, "alice") + + def test_missing_fields_have_defaults(self): + proto = self._make_client_protocol() + state = {"playstate": {}} + position, paused, doSeek, setBy = proto._extractStatePlaystateArguments(state) + self.assertEqual(position, 0) + self.assertIsNone(paused) + self.assertIsNone(doSeek) + self.assertIsNone(setBy) + + +class TestSyncClientProtocolSendHello(unittest.TestCase): + def test_sends_hello_structure(self): + from syncplay.protocols import SyncClientProtocol + client = MagicMock() + client.getUsername.return_value = "alice" + client.getPassword.return_value = None + client.getRoom.return_value = "testroom" + client.getFeatures.return_value = {"chat": True} + proto = SyncClientProtocol(client) + proto.sendMessage = MagicMock() + proto.sendHello() + call_args = proto.sendMessage.call_args[0][0] + self.assertIn("Hello", call_args) + hello = call_args["Hello"] + self.assertEqual(hello["username"], "alice") + self.assertEqual(hello["room"], {"name": "testroom"}) + self.assertEqual(hello["version"], "1.2.255") + self.assertIn("realversion", hello) + self.assertIn("features", hello) + + def test_includes_password_when_set(self): + from syncplay.protocols import SyncClientProtocol + client = MagicMock() + client.getUsername.return_value = "alice" + client.getPassword.return_value = "secret" + client.getRoom.return_value = "testroom" + client.getFeatures.return_value = {} + proto = SyncClientProtocol(client) + proto.sendMessage = MagicMock() + proto.sendHello() + hello = proto.sendMessage.call_args[0][0]["Hello"] + self.assertEqual(hello["password"], "secret") + + +class TestSyncClientProtocolSetUser(unittest.TestCase): + def test_joined_event(self): + from syncplay.protocols import SyncClientProtocol + client = MagicMock() + proto = SyncClientProtocol(client) + users = {"alice": {"room": {"name": "testroom"}, "event": {"joined": True}}} + proto._SetUser(users) + client.userlist.addUser.assert_called_once_with("alice", "testroom", None) + + def test_left_event(self): + from syncplay.protocols import SyncClientProtocol + client = MagicMock() + proto = SyncClientProtocol(client) + users = {"alice": {"room": {"name": "testroom"}, "event": {"left": True}}} + proto._SetUser(users) + client.removeUser.assert_called_once_with("alice") + + def test_mod_event(self): + from syncplay.protocols import SyncClientProtocol + client = MagicMock() + proto = SyncClientProtocol(client) + users = {"alice": {"room": {"name": "testroom"}, "file": {"name": "video.mkv"}}} + proto._SetUser(users) + client.userlist.modUser.assert_called_once() + + +class TestSyncServerProtocolExtractHello(unittest.TestCase): + def _make_server_protocol(self): + from syncplay.protocols import SyncServerProtocol + factory = MagicMock() + proto = SyncServerProtocol(factory) + proto.transport = MagicMock() + proto.transport.getPeer.return_value = MagicMock(host="127.0.0.1") + return proto + + def test_full_extraction(self): + proto = self._make_server_protocol() + hello = { + "username": "alice", + "password": "secret", + "room": {"name": "testroom"}, + "version": "1.2.255", + "realversion": "1.7.4", + "features": {"chat": True} + } + username, password, roomName, version, features = proto._extractHelloArguments(hello) + self.assertEqual(username, "alice") + self.assertEqual(password, "secret") + self.assertEqual(roomName, "testroom") + self.assertEqual(version, "1.7.4") + self.assertEqual(features, {"chat": True}) + + def test_missing_fields(self): + proto = self._make_server_protocol() + hello = {} + username, password, roomName, version, features = proto._extractHelloArguments(hello) + self.assertIsNone(username) + self.assertIsNone(password) + self.assertIsNone(roomName) + self.assertIsNone(version) + self.assertIsNone(features) + + def test_username_stripped(self): + proto = self._make_server_protocol() + hello = {"username": " alice ", "room": {"name": "testroom"}, "version": "1.7.4"} + username, _, _, _, _ = proto._extractHelloArguments(hello) + self.assertEqual(username, "alice") + + +class TestSyncServerProtocolGetFeatures(unittest.TestCase): + def _make_server_protocol(self): + from syncplay.protocols import SyncServerProtocol + factory = MagicMock() + proto = SyncServerProtocol(factory) + proto.transport = MagicMock() + return proto + + def test_features_for_modern_client(self): + proto = self._make_server_protocol() + proto._version = "1.7.4" + features = proto.getFeatures() + self.assertTrue(features["sharedPlaylists"]) + self.assertTrue(features["chat"]) + self.assertTrue(features["readiness"]) + self.assertTrue(features["managedRooms"]) + + def test_features_for_old_client(self): + proto = self._make_server_protocol() + proto._version = "1.2.0" + features = proto.getFeatures() + self.assertFalse(features["sharedPlaylists"]) + self.assertFalse(features["chat"]) + self.assertFalse(features["readiness"]) + self.assertFalse(features["managedRooms"]) + + +class TestPingService(unittest.TestCase): + def test_initial_state(self): + from syncplay.protocols import PingService + ps = PingService() + self.assertEqual(ps.getRtt(), 0) + self.assertEqual(ps.getLastForwardDelay(), 0) + + def test_new_timestamp(self): + from syncplay.protocols import PingService + ps = PingService() + ts = ps.newTimestamp() + self.assertAlmostEqual(ts, time.time(), delta=1.0) + + def test_receive_message_updates_rtt(self): + from syncplay.protocols import PingService + ps = PingService() + ts = time.time() - 0.1 + ps.receiveMessage(ts, 0.05) + self.assertGreater(ps.getRtt(), 0) + self.assertGreater(ps.getLastForwardDelay(), 0) + + def test_receive_message_no_timestamp(self): + from syncplay.protocols import PingService + ps = PingService() + ps.receiveMessage(None, 0.05) + self.assertEqual(ps.getRtt(), 0) + + def test_negative_rtt_ignored(self): + from syncplay.protocols import PingService + ps = PingService() + ps.receiveMessage(time.time() + 100, 0.05) + self.assertLessEqual(ps.getRtt(), 0) + self.assertEqual(ps.getLastForwardDelay(), 0) + + def test_moving_average(self): + from syncplay.protocols import PingService + ps = PingService() + for _ in range(5): + ts = time.time() - 0.1 + ps.receiveMessage(ts, 0.05) + self.assertGreater(ps._avrRtt, 0) + + def test_forward_delay_when_sender_behind(self): + """When senderRtt < self._rtt, fd includes the difference.""" + from syncplay.protocols import PingService + ps = PingService() + ts = time.time() - 0.2 + ps.receiveMessage(ts, 0.05) + fd = ps.getLastForwardDelay() + self.assertGreater(fd, ps._avrRtt / 2) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_room.py b/tests/test_room.py new file mode 100644 index 00000000..ececd9e7 --- /dev/null +++ b/tests/test_room.py @@ -0,0 +1,301 @@ +import time +import unittest +from unittest.mock import MagicMock, patch + +from tests.conftest import make_room, make_controlled_room, make_watcher + + +class TestRoomCreation(unittest.TestCase): + def test_defaults(self): + """Room starts paused at position 0.""" + room = make_room("testroom") + self.assertTrue(room.isPaused()) + self.assertFalse(room.isPlaying()) + self.assertEqual(room.getName(), "testroom") + self.assertTrue(room.isEmpty()) + self.assertEqual(room.getPlaylist(), []) + self.assertIsNone(room.getPlaylistIndex()) + self.assertIsNone(room.getSetBy()) + + def test_str_returns_name(self): + room = make_room("myroom") + self.assertEqual(str(room), "myroom") + + +class TestRoomPersistence(unittest.TestCase): + def test_not_persistent_without_db(self): + room = make_room("testroom", dbhandle=None) + self.assertFalse(room.isPersistent()) + self.assertFalse(room.roomsCanPersist()) + + def test_persistent_with_db(self): + room = make_room("testroom", dbhandle=MagicMock()) + self.assertTrue(room.isPersistent()) + + def test_temp_room_not_persistent(self): + room = make_room("testroom-temp", dbhandle=MagicMock()) + self.assertTrue(room.isMarkedAsTemporary()) + self.assertFalse(room.isPersistent()) + + def test_temp_in_controlled_name(self): + room = make_room("testroom-temp:AABBCCDDEE11", dbhandle=MagicMock()) + self.assertTrue(room.isMarkedAsTemporary()) + + def test_permanent_flag(self): + room = make_room("testroom") + self.assertFalse(room.isPermanent()) + self.assertTrue(room.isNotPermanent()) + room.setPermanent(True) + self.assertTrue(room.isPermanent()) + self.assertFalse(room.isNotPermanent()) + + +class TestRoomPlayState(unittest.TestCase): + def test_set_paused(self): + from syncplay.server import Room + room = make_room() + room.setPaused(Room.STATE_PLAYING) + self.assertTrue(room.isPlaying()) + self.assertFalse(room.isPaused()) + + def test_set_paused_back(self): + from syncplay.server import Room + room = make_room() + room.setPaused(Room.STATE_PLAYING) + room.setPaused(Room.STATE_PAUSED) + self.assertTrue(room.isPaused()) + + def test_set_paused_tracks_setby(self): + room = make_room() + watcher = make_watcher(name="alice") + room.setPaused(setBy=watcher) + self.assertEqual(room.getSetBy(), watcher) + + +class TestRoomPosition(unittest.TestCase): + def test_initial_position(self): + room = make_room() + pos = room.getPosition() + self.assertAlmostEqual(pos, 0, delta=0.5) + + def test_set_position(self): + room = make_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + room.setPosition(42.0) + self.assertAlmostEqual(room._position, 42.0) + + def test_position_advances_when_playing(self): + """When playing with no watchers, position should advance with time.""" + from syncplay.server import Room + room = make_room() + room._position = 10.0 + room._lastUpdate = time.time() - 2.0 + room.setPaused(Room.STATE_PLAYING) + pos = room.getPosition() + self.assertGreater(pos, 10.0) + + def test_position_static_when_paused(self): + room = make_room() + room._position = 10.0 + room._lastUpdate = time.time() - 5.0 + pos = room.getPosition() + self.assertAlmostEqual(pos, 10.0, delta=0.5) + + +class TestRoomWatchers(unittest.TestCase): + def test_add_watcher(self): + room = make_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + self.assertFalse(room.isEmpty()) + self.assertEqual(len(room.getWatchers()), 1) + self.assertEqual(watcher.getRoom(), room) + + def test_remove_watcher(self): + room = make_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + room.removeWatcher(watcher) + self.assertTrue(room.isEmpty()) + self.assertIsNone(watcher.getRoom()) + + def test_remove_resets_position_when_empty_non_persistent(self): + room = make_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + room._position = 50.0 + room.removeWatcher(watcher) + self.assertEqual(room._position, 0) + + def test_add_watcher_syncs_position_when_others_present(self): + room = make_room() + w1 = make_watcher(name="alice") + room.addWatcher(w1) + room._position = 30.0 + w2 = make_watcher(name="bob") + room.addWatcher(w2) + self.assertIsNotNone(w2._position) + + def test_remove_nonexistent_watcher(self): + """Removing a watcher not in the room should be a no-op.""" + room = make_room() + watcher = make_watcher(name="alice") + room.removeWatcher(watcher) + + +class TestRoomPlaylist(unittest.TestCase): + def test_set_and_get_playlist(self): + room = make_room() + room.setPlaylist(["file1.mkv", "file2.mkv"]) + self.assertEqual(room.getPlaylist(), ["file1.mkv", "file2.mkv"]) + + def test_set_and_get_playlist_index(self): + room = make_room() + room.setPlaylistIndex(2) + self.assertEqual(room.getPlaylistIndex(), 2) + + def test_playlist_empty(self): + room = make_room() + self.assertTrue(room.isPlaylistEmpty()) + room.setPlaylist(["file1.mkv"]) + self.assertFalse(room.isPlaylistEmpty()) + + +class TestRoomSanitizeFilename(unittest.TestCase): + def test_replaces_blacklisted_chars(self): + room = make_room() + result = room.sanitizeFilename('file<>:name.mkv') + self.assertNotIn("<", result) + self.assertNotIn(">", result) + self.assertNotIn(":", result) + self.assertIn("_", result) + + def test_replaces_control_chars(self): + room = make_room() + result = room.sanitizeFilename("file\x01name.mkv") + self.assertNotIn("\x01", result) + + +class TestRoomLoadRoom(unittest.TestCase): + def test_load_room_from_tuple(self): + room = make_room() + room.loadRoom(("loaded_room", "file1\nfile2", 1, 42.5, 1000)) + self.assertEqual(room.getName(), "loaded_room") + self.assertEqual(room.getPlaylist(), ["file1", "file2"]) + self.assertEqual(room.getPlaylistIndex(), 1) + self.assertEqual(room._position, 42.5) + + def test_load_room_empty_playlist(self): + room = make_room() + room.loadRoom(("loaded_room", "", 0, 0, 0)) + self.assertEqual(room.getPlaylist(), []) + + +class TestRoomCanControl(unittest.TestCase): + def test_regular_room_anyone_can_control(self): + room = make_room() + watcher = make_watcher(name="alice") + self.assertTrue(room.canControl(watcher)) + + def test_get_controllers_empty(self): + room = make_room() + self.assertEqual(room.getControllers(), []) + + +class TestControlledRoom(unittest.TestCase): + def test_creation(self): + room = make_controlled_room() + self.assertTrue(room.isPaused()) + self.assertTrue(room.isEmpty()) + + def test_can_control_requires_controller(self): + room = make_controlled_room() + watcher = make_watcher(name="alice") + self.assertFalse(room.canControl(watcher)) + + def test_add_controller(self): + room = make_controlled_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + room.addController(watcher) + self.assertTrue(room.canControl(watcher)) + + def test_set_paused_requires_controller(self): + from syncplay.server import Room + room = make_controlled_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + room.setPaused(Room.STATE_PLAYING, setBy=watcher) + self.assertTrue(room.isPaused()) + + def test_set_paused_by_controller(self): + from syncplay.server import Room + room = make_controlled_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + room.addController(watcher) + room.setPaused(Room.STATE_PLAYING, setBy=watcher) + self.assertTrue(room.isPlaying()) + + def test_set_position_requires_controller(self): + room = make_controlled_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + room.setPosition(50.0, setBy=watcher) + self.assertAlmostEqual(room._position, 0, delta=0.5) + + def test_set_position_by_controller(self): + room = make_controlled_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + room.addController(watcher) + room.setPosition(50.0, setBy=watcher) + self.assertAlmostEqual(room._position, 50.0) + + def test_set_playlist_requires_controller(self): + room = make_controlled_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + room.setPlaylist(["file.mkv"], setBy=watcher) + self.assertEqual(room.getPlaylist(), []) + + def test_set_playlist_by_controller(self): + room = make_controlled_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + room.addController(watcher) + room.setPlaylist(["file.mkv"], setBy=watcher) + self.assertEqual(room.getPlaylist(), ["file.mkv"]) + + def test_set_playlist_index_requires_controller(self): + room = make_controlled_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + room.setPlaylistIndex(3, setBy=watcher) + self.assertIsNone(room.getPlaylistIndex()) + + def test_set_playlist_index_by_controller(self): + room = make_controlled_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + room.addController(watcher) + room.setPlaylistIndex(3, setBy=watcher) + self.assertEqual(room.getPlaylistIndex(), 3) + + def test_remove_watcher_removes_controller(self): + room = make_controlled_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + room.addController(watcher) + self.assertTrue(room.canControl(watcher)) + room.removeWatcher(watcher) + self.assertFalse(room.canControl(watcher)) + + def test_get_controllers_returns_dict(self): + room = make_controlled_room() + self.assertIsInstance(room.getControllers(), dict) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_room_manager.py b/tests/test_room_manager.py new file mode 100644 index 00000000..d56a3ccd --- /dev/null +++ b/tests/test_room_manager.py @@ -0,0 +1,190 @@ +import unittest +from unittest.mock import MagicMock, patch + +from tests.conftest import make_room_manager, make_watcher + + +class TestFindFreeUsername(unittest.TestCase): + def test_unique_name_unchanged(self): + manager = make_room_manager() + self.assertEqual(manager.findFreeUsername("alice"), "alice") + + def test_collision_appends_underscore(self): + manager = make_room_manager() + from syncplay.server import Room + room = Room("testroom", None) + w1 = make_watcher(name="alice") + room.addWatcher(w1) + manager._rooms["testroom"] = room + result = manager.findFreeUsername("alice") + self.assertEqual(result, "alice_") + + def test_multiple_collisions(self): + manager = make_room_manager() + from syncplay.server import Room + room = Room("testroom", None) + w1 = make_watcher(name="alice") + w2 = make_watcher(name="alice_") + room.addWatcher(w1) + room.addWatcher(w2) + manager._rooms["testroom"] = room + result = manager.findFreeUsername("alice") + self.assertEqual(result, "alice__") + + def test_trailing_underscores_stripped_before_collision(self): + """If username ends with _ and collides, trailing underscores are stripped first.""" + manager = make_room_manager() + from syncplay.server import Room + room = Room("testroom", None) + w1 = make_watcher(name="alice_") + room.addWatcher(w1) + manager._rooms["testroom"] = room + result = manager.findFreeUsername("alice_") + self.assertEqual(result, "alice") + + def test_truncation(self): + manager = make_room_manager() + long_name = "a" * 100 + result = manager.findFreeUsername(long_name, maxUsernameLength=16) + self.assertLessEqual(len(result), 17) + + +class TestMoveWatcher(unittest.TestCase): + def test_move_creates_room(self): + manager = make_room_manager() + watcher = make_watcher(name="alice") + watcher._room = None + manager.moveWatcher(watcher, "newroom") + self.assertIn("newroom", manager._rooms) + self.assertEqual(watcher.getRoom().getName(), "newroom") + + def test_move_between_rooms(self): + manager = make_room_manager() + watcher = make_watcher(name="alice") + watcher._room = None + manager.moveWatcher(watcher, "room1") + manager.moveWatcher(watcher, "room2") + self.assertEqual(watcher.getRoom().getName(), "room2") + + def test_creates_controlled_room(self): + manager = make_room_manager() + watcher = make_watcher(name="alice") + watcher._room = None + from syncplay.server import ControlledRoom + manager.moveWatcher(watcher, "+myroom:AABBCCDDEE11") + self.assertIsInstance(manager._rooms["+myroom:AABBCCDDEE11"], ControlledRoom) + + +class TestRemoveWatcher(unittest.TestCase): + def test_remove_cleans_empty_room(self): + manager = make_room_manager() + watcher = make_watcher(name="alice") + watcher._room = None + manager.moveWatcher(watcher, "testroom") + manager.removeWatcher(watcher) + self.assertNotIn("testroom", manager._rooms) + + def test_remove_keeps_room_with_watchers(self): + manager = make_room_manager() + w1 = make_watcher(name="alice") + w2 = make_watcher(name="bob") + w1._room = None + w2._room = None + manager.moveWatcher(w1, "testroom") + manager.moveWatcher(w2, "testroom") + manager.removeWatcher(w1) + self.assertIn("testroom", manager._rooms) + self.assertEqual(len(manager._rooms["testroom"].getWatchers()), 1) + + +class TestGetRoom(unittest.TestCase): + def test_creates_room_on_demand(self): + manager = make_room_manager() + from syncplay.server import Room + room = manager._getRoom("newroom") + self.assertIsInstance(room, Room) + self.assertIn("newroom", manager._rooms) + + def test_returns_existing_room(self): + manager = make_room_manager() + room1 = manager._getRoom("myroom") + room2 = manager._getRoom("myroom") + self.assertIs(room1, room2) + + def test_creates_controlled_room_for_controlled_name(self): + manager = make_room_manager() + from syncplay.server import ControlledRoom + room = manager._getRoom("+myroom:AABBCCDDEE11") + self.assertIsInstance(room, ControlledRoom) + + +class TestBroadcast(unittest.TestCase): + def test_broadcast_reaches_all_rooms(self): + manager = make_room_manager() + w1 = make_watcher(name="alice") + w2 = make_watcher(name="bob") + w1._room = None + w2._room = None + manager.moveWatcher(w1, "room1") + manager.moveWatcher(w2, "room2") + received = [] + manager.broadcast(w1, lambda w: received.append(w.getName())) + self.assertIn("alice", received) + self.assertIn("bob", received) + + def test_broadcast_room_only_reaches_same_room(self): + manager = make_room_manager() + w1 = make_watcher(name="alice") + w2 = make_watcher(name="bob") + w1._room = None + w2._room = None + manager.moveWatcher(w1, "room1") + manager.moveWatcher(w2, "room2") + received = [] + manager.broadcastRoom(w1, lambda w: received.append(w.getName())) + self.assertIn("alice", received) + self.assertNotIn("bob", received) + + +class TestPublicRoomManager(unittest.TestCase): + def test_broadcast_isolated_to_room(self): + from syncplay.server import PublicRoomManager + manager = PublicRoomManager() + w1 = make_watcher(name="alice") + w2 = make_watcher(name="bob") + w1._room = None + w2._room = None + manager.moveWatcher(w1, "room1") + manager.moveWatcher(w2, "room2") + received = [] + manager.broadcast(w1, lambda w: received.append(w.getName())) + self.assertIn("alice", received) + self.assertNotIn("bob", received) + + def test_get_all_watchers_isolated(self): + from syncplay.server import PublicRoomManager + manager = PublicRoomManager() + w1 = make_watcher(name="alice") + w2 = make_watcher(name="bob") + w1._room = None + w2._room = None + manager.moveWatcher(w1, "room1") + manager.moveWatcher(w2, "room2") + watchers = manager.getAllWatchersForUser(w1) + names = [w.getName() for w in watchers] + self.assertIn("alice", names) + self.assertNotIn("bob", names) + + +class TestExportRooms(unittest.TestCase): + def test_export_returns_rooms_dict(self): + manager = make_room_manager() + watcher = make_watcher(name="alice") + watcher._room = None + manager.moveWatcher(watcher, "testroom") + rooms = manager.exportRooms() + self.assertIn("testroom", rooms) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 00000000..e194fba2 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,419 @@ +import re +import unittest + + +class TestParseTime(unittest.TestCase): + def test_plain_seconds(self): + from syncplay.utils import parseTime + self.assertEqual(parseTime("90"), 90.0) + + def test_plain_float_seconds(self): + from syncplay.utils import parseTime + self.assertEqual(parseTime("1.5"), 1.5) + + def test_minutes_seconds(self): + from syncplay.utils import parseTime + self.assertEqual(parseTime("1:30"), 90.0) + + def test_hours_minutes_seconds(self): + from syncplay.utils import parseTime + self.assertEqual(parseTime("1:30:00"), 5400.0) + + def test_with_milliseconds(self): + from syncplay.utils import parseTime + result = parseTime("0:01.500") + self.assertIsNotNone(result) + self.assertAlmostEqual(result, 1.5, places=1) + + def test_no_match_returns_none(self): + from syncplay.utils import parseTime + self.assertIsNone(parseTime(":")) + + +class TestFormatTime(unittest.TestCase): + def test_zero(self): + from syncplay.utils import formatTime + self.assertEqual(formatTime(0), "00:00") + + def test_seconds_only(self): + from syncplay.utils import formatTime + self.assertEqual(formatTime(65), "01:05") + + def test_hours(self): + from syncplay.utils import formatTime + self.assertEqual(formatTime(3661), "01:01:01") + + def test_negative(self): + from syncplay.utils import formatTime + result = formatTime(-65) + self.assertTrue(result.startswith("-")) + self.assertIn("01:05", result) + + def test_days(self): + from syncplay.utils import formatTime + result = formatTime(90000) + self.assertIn("d,", result) + + def test_weeks_as_titles(self): + from syncplay.utils import formatTime + result = formatTime(604800 + 60, weeksAsTitles=True) + self.assertIn("Title 1", result) + + def test_weeks_not_as_titles(self): + from syncplay.utils import formatTime + result = formatTime(604800 + 60, weeksAsTitles=False) + self.assertIn("w,", result) + + +class TestFormatSize(unittest.TestCase): + def test_zero_returns_unknown(self): + from syncplay.utils import formatSize + self.assertEqual(formatSize(0), "???") + + def test_megabytes(self): + from syncplay.utils import formatSize + result = formatSize(10 * 1048576) + self.assertIn("10", result) + + def test_precise(self): + from syncplay.utils import formatSize + result = formatSize(int(1.5 * 1048576), precise=True) + self.assertIn("1.5", result) + + def test_hashed_returns_unknown(self): + from syncplay.utils import formatSize + self.assertEqual(formatSize("abc123abc123"), "???") + + +class TestIsASCII(unittest.TestCase): + def test_ascii_string(self): + from syncplay.utils import isASCII + self.assertTrue(isASCII("hello world")) + + def test_non_ascii(self): + from syncplay.utils import isASCII + self.assertFalse(isASCII("héllo")) + + def test_empty(self): + from syncplay.utils import isASCII + self.assertTrue(isASCII("")) + + +class TestIsURL(unittest.TestCase): + def test_http_url(self): + from syncplay.utils import isURL + self.assertTrue(isURL("http://example.com/video.mp4")) + + def test_https_url(self): + from syncplay.utils import isURL + self.assertTrue(isURL("https://example.com/video.mp4")) + + def test_none(self): + from syncplay.utils import isURL + self.assertFalse(isURL(None)) + + def test_regular_path(self): + from syncplay.utils import isURL + self.assertFalse(isURL("/home/user/video.mp4")) + + +class TestMeetsMinVersion(unittest.TestCase): + def test_equal_versions(self): + from syncplay.utils import meetsMinVersion + self.assertTrue(meetsMinVersion("1.7.3", "1.7.3")) + + def test_higher_version(self): + from syncplay.utils import meetsMinVersion + self.assertTrue(meetsMinVersion("1.7.4", "1.7.3")) + + def test_lower_version(self): + from syncplay.utils import meetsMinVersion + self.assertFalse(meetsMinVersion("1.7.2", "1.7.3")) + + def test_major_version_higher(self): + from syncplay.utils import meetsMinVersion + self.assertTrue(meetsMinVersion("2.0.0", "1.9.9")) + + def test_major_version_lower(self): + from syncplay.utils import meetsMinVersion + self.assertFalse(meetsMinVersion("1.0.0", "2.0.0")) + + +class TestStripFilename(unittest.TestCase): + def test_basic_strip(self): + from syncplay.utils import stripfilename + result = stripfilename("my_file (2).mkv", stripURL=False) + self.assertNotIn("_", result) + self.assertNotIn("(", result) + self.assertNotIn(")", result) + self.assertNotIn(" ", result) + + def test_url_strip(self): + from syncplay.utils import stripfilename + result = stripfilename("http://example.com/path/video.mp4", stripURL=True) + self.assertNotIn("http", result) + + def test_none_returns_empty(self): + from syncplay.utils import stripfilename + self.assertEqual(stripfilename(None, stripURL=False), "") + + def test_empty_returns_empty(self): + from syncplay.utils import stripfilename + self.assertEqual(stripfilename("", stripURL=False), "") + + +class TestStripRoomName(unittest.TestCase): + def test_controlled_room_name(self): + from syncplay.utils import stripRoomName + result = stripRoomName("+myroom:AABBCCDDEE11") + self.assertEqual(result, "myroom") + + def test_regular_room_name(self): + from syncplay.utils import stripRoomName + result = stripRoomName("regularroom") + self.assertEqual(result, "regularroom") + + def test_none_returns_empty(self): + from syncplay.utils import stripRoomName + self.assertEqual(stripRoomName(None), "") + + def test_empty_returns_empty(self): + from syncplay.utils import stripRoomName + self.assertEqual(stripRoomName(""), "") + + +class TestHashFilename(unittest.TestCase): + def test_deterministic(self): + from syncplay.utils import hashFilename + h1 = hashFilename("test.mkv") + h2 = hashFilename("test.mkv") + self.assertEqual(h1, h2) + + def test_length(self): + from syncplay.utils import hashFilename + h = hashFilename("test.mkv") + self.assertEqual(len(h), 12) + + def test_url_forces_strip(self): + from syncplay.utils import hashFilename + h = hashFilename("http://example.com/video.mp4") + self.assertEqual(len(h), 12) + + +class TestHashFilesize(unittest.TestCase): + def test_deterministic(self): + from syncplay.utils import hashFilesize + h1 = hashFilesize(12345) + h2 = hashFilesize(12345) + self.assertEqual(h1, h2) + + def test_length(self): + from syncplay.utils import hashFilesize + self.assertEqual(len(hashFilesize(12345)), 12) + + def test_different_sizes_different_hashes(self): + from syncplay.utils import hashFilesize + self.assertNotEqual(hashFilesize(100), hashFilesize(200)) + + +class TestSameFilename(unittest.TestCase): + def test_identical(self): + from syncplay.utils import sameFilename + self.assertTrue(sameFilename("video.mkv", "video.mkv")) + + def test_different(self): + from syncplay.utils import sameFilename + self.assertFalse(sameFilename("video1.mkv", "video2.mkv")) + + def test_hidden_filename_matches_anything(self): + from syncplay.utils import sameFilename + from syncplay.constants import PRIVACY_HIDDENFILENAME + self.assertTrue(sameFilename(PRIVACY_HIDDENFILENAME, "anything.mkv")) + self.assertTrue(sameFilename("anything.mkv", PRIVACY_HIDDENFILENAME)) + + +class TestSameFilesize(unittest.TestCase): + def test_identical(self): + from syncplay.utils import sameFilesize + self.assertTrue(sameFilesize(12345, 12345)) + + def test_different(self): + from syncplay.utils import sameFilesize + self.assertFalse(sameFilesize(12345, 67890)) + + def test_zero_matches_anything(self): + from syncplay.utils import sameFilesize + self.assertTrue(sameFilesize(0, 12345)) + self.assertTrue(sameFilesize(12345, 0)) + + +class TestSameFileduration(unittest.TestCase): + def test_same_duration(self): + from syncplay.utils import sameFileduration + self.assertTrue(sameFileduration(100.0, 100.0)) + + def test_within_threshold(self): + from syncplay.utils import sameFileduration + self.assertTrue(sameFileduration(100.0, 102.0)) + + def test_beyond_threshold(self): + from syncplay.utils import sameFileduration + self.assertFalse(sameFileduration(100.0, 103.0)) + + +class TestTruncateText(unittest.TestCase): + def test_short_text(self): + from syncplay.utils import truncateText + self.assertEqual(truncateText("hello", 10), "hello") + + def test_truncation(self): + from syncplay.utils import truncateText + self.assertEqual(truncateText("hello world", 5), "hello") + + def test_bytes_input(self): + from syncplay.utils import truncateText + result = truncateText(b"hello world", 5) + self.assertEqual(result, "hello") + + +class TestSplitText(unittest.TestCase): + def test_basic_split(self): + from syncplay.utils import splitText + result = splitText("abcdef", 3) + self.assertEqual(result, ["abc", "def"]) + + def test_no_split_needed(self): + from syncplay.utils import splitText + result = splitText("abc", 10) + self.assertEqual(result, ["abc"]) + + +class TestPlaylistIsValid(unittest.TestCase): + def test_valid_playlist(self): + from syncplay.utils import playlistIsValid + self.assertTrue(playlistIsValid(["file1.mkv", "file2.mkv"])) + + def test_too_many_items(self): + from syncplay.utils import playlistIsValid + files = ["file{}.mkv".format(i) for i in range(251)] + self.assertFalse(playlistIsValid(files)) + + def test_too_many_characters(self): + from syncplay.utils import playlistIsValid + files = ["a" * 5001, "b" * 5001] + self.assertFalse(playlistIsValid(files)) + + +class TestGetDomainFromURL(unittest.TestCase): + def test_basic_url(self): + from syncplay.utils import getDomainFromURL + self.assertEqual(getDomainFromURL("https://example.com/page"), "example.com") + + def test_www_prefix_stripped(self): + from syncplay.utils import getDomainFromURL + self.assertEqual(getDomainFromURL("https://www.example.com/page"), "example.com") + + def test_no_domain(self): + from syncplay.utils import getDomainFromURL + self.assertIsNone(getDomainFromURL("not-a-url")) + + def test_none_like(self): + from syncplay.utils import getDomainFromURL + self.assertIsNone(getDomainFromURL("")) + + +class TestRoomPasswordProvider(unittest.TestCase): + def test_is_controlled_room_true(self): + from syncplay.utils import RoomPasswordProvider + self.assertTrue(RoomPasswordProvider.isControlledRoom("+myroom:AABBCCDDEE11")) + + def test_is_controlled_room_false(self): + from syncplay.utils import RoomPasswordProvider + self.assertFalse(RoomPasswordProvider.isControlledRoom("regularroom")) + self.assertFalse(RoomPasswordProvider.isControlledRoom("+nopasshash")) + + def test_get_controlled_room_name_format(self): + from syncplay.utils import RoomPasswordProvider + name = RoomPasswordProvider.getControlledRoomName("myroom", "AB-123-456", "SALTSALTSALT") + self.assertTrue(name.startswith("+myroom:")) + self.assertEqual(len(name), len("+myroom:") + 12) + + def test_check_valid_password(self): + from syncplay.utils import RoomPasswordProvider + salt = "SALTSALTSALT" + password = "AB-123-456" + roomName = RoomPasswordProvider.getControlledRoomName("myroom", password, salt) + self.assertTrue(RoomPasswordProvider.check(roomName, password, salt)) + + def test_check_wrong_password(self): + from syncplay.utils import RoomPasswordProvider + salt = "SALTSALTSALT" + password = "AB-123-456" + roomName = RoomPasswordProvider.getControlledRoomName("myroom", password, salt) + self.assertFalse(RoomPasswordProvider.check(roomName, "CD-789-012", salt)) + + def test_check_invalid_password_format(self): + from syncplay.utils import RoomPasswordProvider + with self.assertRaises(ValueError): + RoomPasswordProvider.check("+room:AABBCCDDEE11", "badformat", "SALT") + + def test_check_not_controlled_room(self): + from syncplay.utils import RoomPasswordProvider, NotControlledRoom + with self.assertRaises(NotControlledRoom): + RoomPasswordProvider.check("regularroom", "AB-123-456", "SALT") + + def test_check_empty_password(self): + from syncplay.utils import RoomPasswordProvider + with self.assertRaises(ValueError): + RoomPasswordProvider.check("+room:AABBCCDDEE11", "", "SALT") + + +class TestRandomStringGenerator(unittest.TestCase): + def test_password_format(self): + from syncplay.utils import RandomStringGenerator + password = RandomStringGenerator.generate_room_password() + self.assertRegex(password, r"^[A-Z]{2}-\d{3}-\d{3}$") + + def test_salt_format(self): + from syncplay.utils import RandomStringGenerator + salt = RandomStringGenerator.generate_server_salt() + self.assertEqual(len(salt), 10) + self.assertTrue(salt.isalpha()) + self.assertTrue(salt.isupper()) + + def test_password_uniqueness(self): + from syncplay.utils import RandomStringGenerator + passwords = {RandomStringGenerator.generate_room_password() for _ in range(20)} + self.assertGreater(len(passwords), 1) + + +class TestListConversions(unittest.TestCase): + def test_list_to_multiline(self): + from syncplay.utils import getListAsMultilineString + result = getListAsMultilineString(["a", "b", "c"]) + self.assertEqual(result, "a\nb\nc") + + def test_empty_list(self): + from syncplay.utils import getListAsMultilineString + self.assertEqual(getListAsMultilineString([]), "") + + def test_none_list(self): + from syncplay.utils import getListAsMultilineString + self.assertEqual(getListAsMultilineString(None), "") + + def test_multiline_to_list(self): + from syncplay.utils import convertMultilineStringToList + result = convertMultilineStringToList("a\nb\nc") + self.assertEqual(result, ["a", "b", "c"]) + + def test_empty_multiline(self): + from syncplay.utils import convertMultilineStringToList + self.assertEqual(convertMultilineStringToList(""), []) + + def test_none_multiline(self): + from syncplay.utils import convertMultilineStringToList + self.assertEqual(convertMultilineStringToList(None), []) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_watcher.py b/tests/test_watcher.py new file mode 100644 index 00000000..026619dd --- /dev/null +++ b/tests/test_watcher.py @@ -0,0 +1,213 @@ +import time +import unittest +from unittest.mock import MagicMock, patch + +from tests.conftest import make_watcher, make_room + + +class TestWatcherCreation(unittest.TestCase): + def test_initial_state(self): + watcher = make_watcher(name="alice") + self.assertEqual(watcher.getName(), "alice") + self.assertIsNone(watcher.getRoom()) + self.assertIsNone(watcher.getFile()) + self.assertIsNone(watcher._position) + self.assertIsNone(watcher._ready) + + def test_connector_set_watcher_called(self): + connector = MagicMock() + connector.isLogged.return_value = True + connector.getFeatures.return_value = {"uiMode": "GUI"} + connector.getVersion.return_value = "1.7.4" + with patch('syncplay.server.reactor'): + from syncplay.server import Watcher + watcher = Watcher(MagicMock(), connector, "alice") + connector.setWatcher.assert_called_once_with(watcher) + + +class TestWatcherPosition(unittest.TestCase): + def test_position_none_when_unset(self): + watcher = make_watcher(name="alice") + self.assertIsNone(watcher.getPosition()) + + def test_position_set(self): + room = make_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + watcher.setPosition(42.0) + self.assertIsNotNone(watcher.getPosition()) + + def test_position_paused_stays_still(self): + room = make_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + watcher.setPosition(10.0) + watcher._lastUpdatedOn = time.time() - 2.0 + pos = watcher.getPosition() + self.assertAlmostEqual(pos, 10.0, delta=0.5) + + def test_position_playing_advances(self): + from syncplay.server import Room + room = make_room() + room.setPaused(Room.STATE_PLAYING) + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + watcher.setPosition(10.0) + watcher._lastUpdatedOn = time.time() - 2.0 + pos = watcher.getPosition() + self.assertGreater(pos, 10.0) + + +class TestWatcherComparison(unittest.TestCase): + def test_lt_by_position(self): + room = make_room() + w1 = make_watcher(name="alice") + w2 = make_watcher(name="bob") + room.addWatcher(w1) + room.addWatcher(w2) + w1.setPosition(10.0) + w1._file = {"name": "file.mkv"} + w2.setPosition(20.0) + w2._file = {"name": "file.mkv"} + self.assertTrue(w1 < w2) + self.assertFalse(w2 < w1) + + def test_no_position_not_less(self): + """Watcher with no position is never less than another.""" + room = make_room() + w1 = make_watcher(name="alice") + w2 = make_watcher(name="bob") + room.addWatcher(w1) + room.addWatcher(w2) + w2.setPosition(10.0) + w2._file = {"name": "file.mkv"} + self.assertFalse(w1 < w2) + + def test_no_file_not_less(self): + room = make_room() + w1 = make_watcher(name="alice") + w2 = make_watcher(name="bob") + room.addWatcher(w1) + room.addWatcher(w2) + w1.setPosition(5.0) + w2.setPosition(10.0) + w2._file = {"name": "file.mkv"} + self.assertFalse(w1 < w2) + + def test_other_no_position_means_self_is_less(self): + room = make_room() + w1 = make_watcher(name="alice") + w2 = make_watcher(name="bob") + room.addWatcher(w1) + room.addWatcher(w2) + w1.setPosition(10.0) + w1._file = {"name": "file.mkv"} + self.assertTrue(w1 < w2) + + +class TestWatcherReady(unittest.TestCase): + def test_set_ready(self): + watcher = make_watcher(name="alice") + watcher.setReady(True) + self.assertTrue(watcher.isReady()) + + def test_ready_none_when_disabled(self): + server = MagicMock() + server.disableReady = True + watcher = make_watcher(server=server, name="alice") + watcher.setReady(True) + self.assertIsNone(watcher.isReady()) + + +class TestWatcherIsController(unittest.TestCase): + def test_controller_in_controlled_room(self): + from tests.conftest import make_controlled_room + room = make_controlled_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + room.addController(watcher) + self.assertTrue(watcher.isController()) + + def test_not_controller_in_controlled_room(self): + from tests.conftest import make_controlled_room + room = make_controlled_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + self.assertFalse(watcher.isController()) + + def test_not_controller_in_regular_room(self): + room = make_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + self.assertFalse(watcher.isController()) + + +class TestWatcherUpdateState(unittest.TestCase): + def test_pause_triggers_room_pause(self): + from syncplay.server import Room + room = make_room() + room.setPaused(Room.STATE_PLAYING) + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + watcher.updateState(position=10.0, paused=True, doSeek=False, messageAge=0.0) + self.assertTrue(room.isPaused()) + + def test_unpause_triggers_room_play(self): + from syncplay.server import Room + room = make_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + watcher.updateState(position=10.0, paused=False, doSeek=False, messageAge=0.0) + self.assertTrue(room.isPlaying()) + + def test_position_updated(self): + room = make_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + watcher.updateState(position=25.0, paused=True, doSeek=False, messageAge=0.0) + self.assertAlmostEqual(watcher._position, 25.0, delta=0.5) + + def test_message_age_adjusts_position(self): + room = make_room() + watcher = make_watcher(name="alice") + room.addWatcher(watcher) + watcher.updateState(position=25.0, paused=False, doSeek=False, messageAge=1.0) + self.assertAlmostEqual(watcher._position, 26.0, delta=0.5) + + +class TestWatcherDelegation(unittest.TestCase): + def test_get_version(self): + connector = MagicMock() + connector.getVersion.return_value = "1.7.4" + watcher = make_watcher(connector=connector, name="alice") + self.assertEqual(watcher.getVersion(), "1.7.4") + + def test_get_features(self): + connector = MagicMock() + connector.getFeatures.return_value = {"chat": True, "uiMode": "GUI"} + watcher = make_watcher(connector=connector, name="alice") + features = watcher.getFeatures() + self.assertIn("chat", features) + + +class TestWatcherFile(unittest.TestCase): + def test_set_file(self): + watcher = make_watcher(name="alice") + room = make_room() + room.addWatcher(watcher) + file_info = {"name": "video.mkv", "size": 1000} + watcher.setFile(file_info) + self.assertEqual(watcher.getFile()["name"], "video.mkv") + + def test_set_file_truncates_name(self): + from syncplay import constants + watcher = make_watcher(name="alice") + room = make_room() + room.addWatcher(watcher) + long_name = "a" * 500 + ".mkv" + watcher.setFile({"name": long_name}) + self.assertLessEqual(len(watcher.getFile()["name"]), constants.MAX_FILENAME_LENGTH) + + +if __name__ == "__main__": + unittest.main()