diff --git a/.coverage b/.coverage new file mode 100644 index 0000000..45e1207 Binary files /dev/null and b/.coverage differ diff --git a/README.md b/README.md index c66fb4a..6b6029c 100644 --- a/README.md +++ b/README.md @@ -155,6 +155,24 @@ hack-id/ └── permissions.json # API permissions ``` + +## ✅ Testing + +Run the full automated suite (unit + route + Playwright API interaction tests): + +```bash +python -m pytest -q +``` + +Run the auth/OAuth interaction coverage gate at 100%: + +```bash +coverage run --source="routes,models" -m pytest -q tests/test_auth_routes.py tests/test_oauth_routes.py +coverage report -m --fail-under=100 +``` + +> Note: Playwright is included for interaction tests via API request contexts (no local browser binary required for these tests). + ## 🔧 Configuration ### Environment Variables diff --git a/requirements.txt b/requirements.txt index bdb0283..df3b7a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -71,3 +71,6 @@ workos==5.32.0 wrapt==1.17.2 WTForms==3.2.1 yarl==1.20.0 +playwright==1.56.0 +pytest==8.3.5 +pytest-cov==6.0.0 diff --git a/routes/auth.py b/routes/auth.py index b2c7970..256886d 100644 --- a/routes/auth.py +++ b/routes/auth.py @@ -403,7 +403,11 @@ def oauth_token(): client_secret = request.form.get("client_secret") if DEBUG_MODE: - print(f"OAuth token request: grant_type={grant_type}, code={code[:20]}..., client_id={client_id}, redirect_uri={redirect_uri}") + code_preview = (code[:20] + "...") if code else "None" + print( + f"OAuth token request: grant_type={grant_type}, code={code_preview}, " + f"client_id={client_id}, redirect_uri={redirect_uri}" + ) # Validate grant_type if grant_type != "authorization_code": diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..9a0e9a0 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,11 @@ +import os +import sys +from pathlib import Path + +os.environ.setdefault("SECRET_KEY", "test-secret") +os.environ.setdefault("WORKOS_API_KEY", "test-workos-key") +os.environ.setdefault("WORKOS_CLIENT_ID", "test-workos-client") + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..cf7951a --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,116 @@ +import os + +os.environ.setdefault("SECRET_KEY", "test-secret") + +from flask import Flask, jsonify +import routes.api as api_routes + + +def create_test_app(): + app = Flask(__name__) + app.register_blueprint(api_routes.api_bp) + return app + + +def test_require_api_key_rejects_missing_authorization_header(): + app = Flask(__name__) + + @app.route("/protected") + @api_routes.require_api_key("users.read") + def protected(): + return jsonify({"success": True}) + + client = app.test_client() + response = client.get("/protected") + + assert response.status_code == 401 + assert response.get_json()["error"] == "Missing or invalid Authorization header" + + +def test_require_api_key_rejects_insufficient_permissions(monkeypatch): + app = Flask(__name__) + + @app.route("/protected") + @api_routes.require_api_key("users.read") + def protected(): + return jsonify({"success": True}) + + monkeypatch.setattr(api_routes, "get_key_permissions", lambda _api_key: ["events.register"]) + + client = app.test_client() + response = client.get( + "/protected", headers={"Authorization": "Bearer test-key"} + ) + + assert response.status_code == 403 + assert response.get_json()["error"] == "Insufficient permissions" + + +def test_require_api_key_allows_valid_key_and_logs_usage(monkeypatch): + app = Flask(__name__) + + @app.route("/protected") + @api_routes.require_api_key("users.read") + def protected(): + return jsonify({"success": True}) + + log_calls = [] + + monkeypatch.setattr(api_routes, "get_key_permissions", lambda _api_key: ["users.read"]) + monkeypatch.setattr( + api_routes, + "log_api_key_usage", + lambda api_key, action, metadata: log_calls.append( + {"api_key": api_key, "action": action, "metadata": metadata} + ), + ) + + client = app.test_client() + response = client.get( + "/protected", headers={"Authorization": "Bearer test-key"} + ) + + assert response.status_code == 200 + assert response.get_json()["success"] is True + assert len(log_calls) == 1 + assert log_calls[0]["action"] == "protected" + + +def test_api_current_event_returns_404_when_no_current_event(monkeypatch): + app = create_test_app() + monkeypatch.setattr(api_routes, "get_current_event", lambda: None) + + client = app.test_client() + response = client.get("/api/current-event") + + assert response.status_code == 404 + payload = response.get_json() + assert payload["success"] is False + + +def test_api_user_status_returns_data_for_valid_request(monkeypatch): + app = create_test_app() + + monkeypatch.setattr(api_routes, "get_key_permissions", lambda _api_key: ["users.read"]) + monkeypatch.setattr(api_routes, "log_api_key_usage", lambda *_args, **_kwargs: None) + monkeypatch.setattr( + api_routes, + "get_user_event_status", + lambda user_email, event_id: { + "success": True, + "user_email": user_email, + "event_id": event_id, + "registered": True, + }, + ) + + client = app.test_client() + response = client.get( + "/api/user-status?user_email=test@example.com&event_id=hackathon-1", + headers={"Authorization": "Bearer valid-key"}, + ) + + assert response.status_code == 200 + payload = response.get_json() + assert payload["success"] is True + assert payload["registered"] is True diff --git a/tests/test_api_routes_comprehensive.py b/tests/test_api_routes_comprehensive.py new file mode 100644 index 0000000..aa04d7e --- /dev/null +++ b/tests/test_api_routes_comprehensive.py @@ -0,0 +1,1088 @@ +import os + +os.environ.setdefault("SECRET_KEY", "test-secret") + +from flask import Flask, jsonify +import pytest + +import routes.api as api_routes + + +def create_api_app(): + app = Flask(__name__) + app.secret_key = "test-secret" + app.register_blueprint(api_routes.api_bp) + return app + + +def auth_header(token="valid-key"): + return {"Authorization": f"Bearer {token}"} + + +@pytest.fixture +def api_client(monkeypatch): + app = create_api_app() + monkeypatch.setattr( + api_routes, + "get_key_permissions", + lambda _key: ["users.read", "events.register", "oauth", "discord.manage"], + ) + monkeypatch.setattr(api_routes, "log_api_key_usage", lambda *_args, **_kwargs: None) + return app.test_client() + + +def test_require_api_key_rejects_invalid_api_key(monkeypatch): + app = Flask(__name__) + + @app.route("/protected") + @api_routes.require_api_key("users.read") + def protected(): + return jsonify({"success": True}) + + monkeypatch.setattr(api_routes, "get_key_permissions", lambda _key: []) + client = app.test_client() + + response = client.get("/protected", headers=auth_header("bad-key")) + + assert response.status_code == 403 + assert response.get_json()["error"] == "Invalid API key" + + +def test_require_api_key_without_required_permissions_allows_valid_key(monkeypatch): + app = Flask(__name__) + calls = [] + + @app.route("/open") + @api_routes.require_api_key() + def open_route(): + return jsonify({"success": True}) + + monkeypatch.setattr(api_routes, "get_key_permissions", lambda _key: ["any.permission"]) + monkeypatch.setattr( + api_routes, + "log_api_key_usage", + lambda key, action, _metadata: calls.append((key, action)), + ) + client = app.test_client() + + response = client.get("/open", headers=auth_header("k1")) + + assert response.status_code == 200 + assert response.get_json()["success"] is True + assert calls == [("k1", "open_route")] + + +def test_api_current_event_success(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, + "get_current_event", + lambda: {"id": "hack-1", "name": "Hack 1", "description": "Desc", "discord-role-id": "123"}, + ) + + response = api_client.get("/api/current-event") + + assert response.status_code == 200 + payload = response.get_json() + assert payload["success"] is True + assert payload["current_event"]["id"] == "hack-1" + + +def test_api_current_event_exception_returns_500(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "DEBUG_MODE", False) + + def raise_exc(): + raise RuntimeError("boom") + + monkeypatch.setattr(api_routes, "get_current_event", raise_exc) + + response = api_client.get("/api/current-event") + + assert response.status_code == 500 + assert response.get_json()["error"] == "Internal server error" + + +def test_api_all_events_success(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, + "get_all_events", + lambda: { + "hack-1": {"name": "Hack 1", "description": "Desc", "discord-role-id": "123"}, + "hack-2": {"name": "Hack 2"}, + }, + ) + monkeypatch.setattr(api_routes, "get_current_event", lambda: {"id": "hack-1"}) + + response = api_client.get("/api/events") + + assert response.status_code == 200 + payload = response.get_json() + assert payload["success"] is True + assert len(payload["events"]) == 2 + assert any(event["is_current"] for event in payload["events"]) + + +def test_api_all_events_exception_returns_500(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "DEBUG_MODE", False) + monkeypatch.setattr(api_routes, "get_all_events", lambda: (_ for _ in ()).throw(RuntimeError("boom"))) + + response = api_client.get("/api/events") + + assert response.status_code == 500 + assert response.get_json()["error"] == "Internal server error" + + +def test_api_register_event_requires_json(api_client): + response = api_client.post( + "/api/register-event", + data="null", + content_type="application/json", + headers=auth_header(), + ) + + assert response.status_code == 400 + assert response.get_json()["error"] == "JSON data required" + + +def test_api_register_event_validation_failure(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "validate_api_request", lambda *_args: {"valid": False, "errors": ["bad"]}) + monkeypatch.setattr( + api_routes, + "handle_validation_error", + lambda _result: (api_routes.jsonify({"success": False, "error": "validation"}), 400), + ) + + response = api_client.post("/api/register-event", json={"foo": "bar"}, headers=auth_header()) + + assert response.status_code == 400 + assert response.get_json()["error"] == "validation" + + +def test_api_register_event_success_and_optional_fields_passed(api_client, monkeypatch): + captured = {} + + monkeypatch.setattr( + api_routes, + "validate_api_request", + lambda data, _required: {"valid": True, "data": data}, + ) + + def fake_register(**kwargs): + captured.update(kwargs) + return {"success": True, "registered": True} + + monkeypatch.setattr(api_routes, "register_user_for_event", fake_register) + + payload = { + "user_email": "test@example.com", + "event_id": "hack-1", + "phone_number": "123", + "address": "addr", + "emergency_contact_name": "name", + "emergency_contact_email": "ec@example.com", + "emergency_contact_phone": "321", + "dietary_restrictions": "none", + "tshirt_size": "M", + } + response = api_client.post("/api/register-event", json=payload, headers=auth_header()) + + assert response.status_code == 200 + assert response.get_json()["registered"] is True + assert captured["user_email"] == "test@example.com" + assert captured["tshirt_size"] == "M" + + +def test_api_register_event_service_failure_returns_400(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, + "validate_api_request", + lambda data, _required: {"valid": True, "data": data}, + ) + monkeypatch.setattr( + api_routes, + "register_user_for_event", + lambda **_kwargs: {"success": False, "error": "failed"}, + ) + + response = api_client.post( + "/api/register-event", json={"user_email": "test@example.com"}, headers=auth_header() + ) + + assert response.status_code == 400 + assert response.get_json()["error"] == "failed" + + +def test_api_register_event_exception_uses_handle_api_error(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, + "validate_api_request", + lambda data, _required: {"valid": True, "data": data}, + ) + monkeypatch.setattr( + api_routes, + "register_user_for_event", + lambda **_kwargs: (_ for _ in ()).throw(RuntimeError("boom")), + ) + monkeypatch.setattr( + api_routes, + "handle_api_error", + lambda _exc, ctx: (api_routes.jsonify({"success": False, "error": ctx}), 500), + ) + + response = api_client.post( + "/api/register-event", json={"user_email": "test@example.com"}, headers=auth_header() + ) + + assert response.status_code == 500 + assert response.get_json()["error"] == "api_register_event" + + +def test_api_user_status_requires_user_email(api_client): + response = api_client.get("/api/user-status", headers=auth_header()) + + assert response.status_code == 400 + assert "user_email parameter is required" in response.get_json()["error"] + + +def test_api_user_status_service_failure(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, "get_user_event_status", lambda *_args: {"success": False, "error": "not found"} + ) + + response = api_client.get("/api/user-status?user_email=test@example.com", headers=auth_header()) + + assert response.status_code == 400 + assert response.get_json()["error"] == "not found" + + +def test_api_user_status_exception_returns_500(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "DEBUG_MODE", False) + monkeypatch.setattr( + api_routes, "get_user_event_status", lambda *_args: (_ for _ in ()).throw(RuntimeError("boom")) + ) + + response = api_client.get("/api/user-status?user_email=test@example.com", headers=auth_header()) + + assert response.status_code == 500 + assert response.get_json()["error"] == "Internal server error" + + +def test_oauth2_user_info_invalid_header(api_client): + response = api_client.get("/api/oauth/user-info") + + assert response.status_code == 401 + assert response.get_json()["error"] == "invalid_request" + + +def test_oauth2_user_info_invalid_token(api_client, monkeypatch): + monkeypatch.setattr("models.oauth.verify_access_token", lambda _token: None) + + response = api_client.get("/api/oauth/user-info", headers=auth_header("oauth-token")) + + assert response.status_code == 401 + assert response.get_json()["error"] == "invalid_token" + + +def test_oauth2_user_info_user_not_found(api_client, monkeypatch): + monkeypatch.setattr( + "models.oauth.verify_access_token", + lambda _token: {"user_email": "test@example.com", "scope": "profile email"}, + ) + monkeypatch.setattr(api_routes, "get_user_by_email", lambda _email: None) + + response = api_client.get("/api/oauth/user-info", headers=auth_header("oauth-token")) + + assert response.status_code == 404 + assert response.get_json()["error"] == "not_found" + + +def test_oauth2_user_info_success_includes_scope_based_fields(api_client, monkeypatch): + monkeypatch.setattr( + "models.oauth.verify_access_token", + lambda _token: { + "user_email": "test@example.com", + "scope": "profile email dob events discord", + }, + ) + monkeypatch.setattr( + api_routes, + "get_user_by_email", + lambda _email: { + "email": "test@example.com", + "legal_name": "Legal Name", + "preferred_name": "Nick", + "pronouns": "they/them", + "dob": "01/01/2000", + "events": ["hack-1"], + "discord_id": "123", + "discord_username": "tester", + }, + ) + monkeypatch.setattr(api_routes, "is_admin", lambda _email: True) + + response = api_client.get("/api/oauth/user-info", headers=auth_header("oauth-token")) + + assert response.status_code == 200 + payload = response.get_json() + assert payload["email"] == "test@example.com" + assert payload["discord_id"] == "123" + assert payload["is_admin"] is True + + +def test_oauth2_user_info_with_no_granted_scopes_returns_is_admin_only(api_client, monkeypatch): + monkeypatch.setattr( + "models.oauth.verify_access_token", + lambda _token: {"user_email": "test@example.com", "scope": ""}, + ) + monkeypatch.setattr( + api_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com"}, + ) + monkeypatch.setattr(api_routes, "is_admin", lambda _email: False) + + response = api_client.get("/api/oauth/user-info", headers=auth_header("oauth-token")) + + assert response.status_code == 200 + assert response.get_json() == {"is_admin": False} + + +def test_oauth2_user_info_exception_returns_server_error(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "DEBUG_MODE", False) + monkeypatch.setattr("models.oauth.verify_access_token", lambda _token: (_ for _ in ()).throw(RuntimeError("boom"))) + + response = api_client.get("/api/oauth/user-info", headers=auth_header("oauth-token")) + + assert response.status_code == 500 + assert response.get_json()["error"] == "server_error" + + +def test_oauth_legacy_user_info_token_required(api_client): + response = api_client.post("/api/oauth/user-info", json={}, headers=auth_header()) + + assert response.status_code == 400 + assert response.get_json()["error"] == "Token is required" + + +def test_oauth_legacy_user_info_invalid_token(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "verify_oauth_token", lambda _token: None) + + response = api_client.post("/api/oauth/user-info", json={"token": "bad"}, headers=auth_header()) + + assert response.status_code == 401 + assert response.get_json()["error"] == "Invalid or expired token" + + +def test_oauth_legacy_user_info_user_not_found(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "verify_oauth_token", lambda _token: "test@example.com") + monkeypatch.setattr(api_routes, "get_user_by_email", lambda _email: None) + + response = api_client.post("/api/oauth/user-info", json={"token": "good"}, headers=auth_header()) + + assert response.status_code == 404 + assert response.get_json()["error"] == "User not found" + + +def test_oauth_legacy_user_info_success(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "verify_oauth_token", lambda _token: "test@example.com") + monkeypatch.setattr( + api_routes, + "get_user_by_email", + lambda _email: { + "email": "test@example.com", + "legal_name": "Legal Name", + "preferred_name": "Nick", + "pronouns": "they/them", + "dob": "01/01/2000", + }, + ) + monkeypatch.setattr(api_routes, "is_admin", lambda _email: False) + + response = api_client.post("/api/oauth/user-info", json={"token": "good"}, headers=auth_header()) + + assert response.status_code == 200 + payload = response.get_json() + assert payload["success"] is True + assert payload["user"]["email"] == "test@example.com" + + +def test_oauth_legacy_user_info_exception_returns_500(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "DEBUG_MODE", False) + monkeypatch.setattr(api_routes, "verify_oauth_token", lambda _token: (_ for _ in ()).throw(RuntimeError("boom"))) + + response = api_client.post("/api/oauth/user-info", json={"token": "good"}, headers=auth_header()) + + assert response.status_code == 500 + assert response.get_json()["error"] == "Internal server error" + + +def test_api_test_endpoint_success(api_client): + response = api_client.get("/api/test", headers=auth_header()) + + assert response.status_code == 200 + assert response.get_json()["success"] is True + + +def test_discord_user_not_found(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "get_user_by_discord_id", lambda _id: None) + + response = api_client.get("/api/discord/user/123", headers=auth_header()) + + assert response.status_code == 404 + assert response.get_json()["error"] == "User not found" + + +def test_discord_user_success(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, + "get_user_by_discord_id", + lambda _id: { + "id": "user-1", + "email": "test@example.com", + "legal_name": "Legal Name", + "preferred_name": "Nick", + "pronouns": "they/them", + "dob": "01/01/2000", + "discord_id": "123", + "events": ["hack-1"], + }, + ) + monkeypatch.setattr("models.admin.is_admin", lambda _email: True) + + response = api_client.get("/api/discord/user/123", headers=auth_header()) + + assert response.status_code == 200 + payload = response.get_json() + assert payload["success"] is True + assert payload["user"]["is_admin"] is True + + +def test_discord_user_exception_uses_handle_api_error(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, "get_user_by_discord_id", lambda _id: (_ for _ in ()).throw(RuntimeError("boom")) + ) + monkeypatch.setattr( + api_routes, + "handle_api_error", + lambda _exc, ctx: (api_routes.jsonify({"success": False, "error": ctx}), 500), + ) + + response = api_client.get("/api/discord/user/123", headers=auth_header()) + + assert response.status_code == 500 + assert response.get_json()["error"] == "api_discord_user" + + +def test_create_verification_token_requires_json(api_client): + response = api_client.post( + "/api/discord/verification-token", + data="null", + content_type="application/json", + headers=auth_header(), + ) + + assert response.status_code == 400 + assert response.get_json()["error"] == "JSON data required" + + +def test_create_verification_token_requires_fields(api_client): + response = api_client.post( + "/api/discord/verification-token", + json={"discord_username": "tester"}, + headers=auth_header(), + ) + + assert response.status_code == 400 + assert "Missing field: discord_id" in response.get_json()["error"] + + +def test_create_verification_token_success(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "save_verification_token", lambda *_args: "tok_1") + + response = api_client.post( + "/api/discord/verification-token", + json={"discord_id": 123, "discord_username": "tester", "message_id": "m1"}, + headers=auth_header(), + ) + + assert response.status_code == 200 + assert response.get_json()["token"] == "tok_1" + + +def test_create_verification_token_exception(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, "save_verification_token", lambda *_args: (_ for _ in ()).throw(RuntimeError("boom")) + ) + monkeypatch.setattr( + api_routes, + "handle_api_error", + lambda _exc, ctx: (api_routes.jsonify({"success": False, "error": ctx}), 500), + ) + + response = api_client.post( + "/api/discord/verification-token", + json={"discord_id": 123, "discord_username": "tester"}, + headers=auth_header(), + ) + + assert response.status_code == 500 + assert response.get_json()["error"] == "api_create_verification_token" + + +def test_get_verification_token_not_found(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "get_verification_token", lambda _token: None) + + response = api_client.get("/api/discord/verification-token/tok_1", headers=auth_header()) + + assert response.status_code == 404 + + +def test_get_verification_token_success(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, + "get_verification_token", + lambda _token: { + "token": "tok_1", + "discord_id": "123", + "discord_username": "tester", + "message_id": "m1", + "expires_at": "2026-01-01", + "used": 1, + }, + ) + + response = api_client.get("/api/discord/verification-token/tok_1", headers=auth_header()) + + assert response.status_code == 200 + assert response.get_json()["token_data"]["used"] is True + + +def test_get_verification_token_exception(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, "get_verification_token", lambda _token: (_ for _ in ()).throw(RuntimeError("boom")) + ) + monkeypatch.setattr( + api_routes, + "handle_api_error", + lambda _exc, ctx: (api_routes.jsonify({"success": False, "error": ctx}), 500), + ) + + response = api_client.get("/api/discord/verification-token/tok_1", headers=auth_header()) + + assert response.status_code == 500 + assert response.get_json()["error"] == "api_get_verification_token" + + +def test_mark_token_used_not_found(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "get_verification_token", lambda _token: None) + + response = api_client.delete("/api/discord/verification-token/tok_1", headers=auth_header()) + + assert response.status_code == 404 + + +def test_mark_token_used_success(api_client, monkeypatch): + used = {} + monkeypatch.setattr(api_routes, "get_verification_token", lambda _token: {"token": "tok_1"}) + monkeypatch.setattr(api_routes, "mark_token_used", lambda token: used.setdefault("token", token)) + + response = api_client.delete("/api/discord/verification-token/tok_1", headers=auth_header()) + + assert response.status_code == 200 + assert used["token"] == "tok_1" + + +def test_mark_token_used_exception(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, "get_verification_token", lambda _token: (_ for _ in ()).throw(RuntimeError("boom")) + ) + monkeypatch.setattr( + api_routes, + "handle_api_error", + lambda _exc, ctx: (api_routes.jsonify({"success": False, "error": ctx}), 500), + ) + + response = api_client.delete("/api/discord/verification-token/tok_1", headers=auth_header()) + + assert response.status_code == 500 + assert response.get_json()["error"] == "api_mark_token_used" + + +def test_discord_role_mappings_success(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, + "get_all_events", + lambda: {"hack-1": {"discord-role-id": "123"}, "hack-2": {"name": "No role"}}, + ) + + response = api_client.get("/api/discord/role-mappings", headers=auth_header()) + + assert response.status_code == 200 + payload = response.get_json() + assert payload["role_mappings"] == {"hack-1": "123"} + + +def test_discord_role_mappings_exception(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "get_all_events", lambda: (_ for _ in ()).throw(RuntimeError("boom"))) + monkeypatch.setattr( + api_routes, + "handle_api_error", + lambda _exc, ctx: (api_routes.jsonify({"success": False, "error": ctx}), 500), + ) + + response = api_client.get("/api/discord/role-mappings", headers=auth_header()) + + assert response.status_code == 500 + assert response.get_json()["error"] == "api_discord_role_mappings" + + +def test_discord_user_roles_not_found(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "get_user_by_discord_id", lambda _id: None) + + response = api_client.get("/api/discord/user-roles/123", headers=auth_header()) + + assert response.status_code == 404 + + +def test_discord_user_roles_success(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "get_user_by_discord_id", lambda _id: {"events": ["hack-1", "hack-2"]}) + monkeypatch.setattr( + api_routes, + "get_all_events", + lambda: { + "hack-1": {"discord-role-id": "123", "name": "Hack 1"}, + "hack-2": {"name": "Hack 2"}, + }, + ) + + response = api_client.get("/api/discord/user-roles/123", headers=auth_header()) + + assert response.status_code == 200 + payload = response.get_json() + assert payload["roles_to_assign"] == [{"event_id": "hack-1", "role_id": "123", "event_name": "Hack 1"}] + + +def test_discord_user_roles_exception(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, "get_user_by_discord_id", lambda _id: (_ for _ in ()).throw(RuntimeError("boom")) + ) + monkeypatch.setattr( + api_routes, + "handle_api_error", + lambda _exc, ctx: (api_routes.jsonify({"success": False, "error": ctx}), 500), + ) + + response = api_client.get("/api/discord/user-roles/123", headers=auth_header()) + + assert response.status_code == 500 + assert response.get_json()["error"] == "api_discord_user_roles" + + +def test_discord_verified_users_success(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, + "get_all_users", + lambda: [ + {"id": "1", "email": "a@example.com", "discord_id": "123", "events": [], "preferred_name": "A", "legal_name": "A"}, + {"id": "2", "email": "b@example.com", "discord_id": "", "events": []}, + ], + ) + + response = api_client.get("/api/discord/verified-users", headers=auth_header()) + + assert response.status_code == 200 + payload = response.get_json() + assert payload["count"] == 1 + assert payload["verified_users"][0]["email"] == "a@example.com" + + +def test_discord_verified_users_exception(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "get_all_users", lambda: (_ for _ in ()).throw(RuntimeError("boom"))) + monkeypatch.setattr( + api_routes, + "handle_api_error", + lambda _exc, ctx: (api_routes.jsonify({"success": False, "error": ctx}), 500), + ) + + response = api_client.get("/api/discord/verified-users", headers=auth_header()) + + assert response.status_code == 500 + assert response.get_json()["error"] == "api_discord_verified_users" + + +def test_discord_complete_verification_requires_json(api_client): + response = api_client.post( + "/api/discord/complete-verification", + data="null", + content_type="application/json", + headers=auth_header(), + ) + + assert response.status_code == 400 + assert response.get_json()["error"] == "JSON data required" + + +def test_discord_complete_verification_requires_fields(api_client): + response = api_client.post( + "/api/discord/complete-verification", + json={"discord_id": "123"}, + headers=auth_header(), + ) + + assert response.status_code == 400 + assert "Missing field: user_email" in response.get_json()["error"] + + +def test_discord_complete_verification_user_not_found(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "get_user_by_email", lambda _email: None) + + response = api_client.post( + "/api/discord/complete-verification", + json={"discord_id": "123", "user_email": "test@example.com"}, + headers=auth_header(), + ) + + assert response.status_code == 404 + + +def test_discord_complete_verification_rejects_id_linked_to_other_user(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "get_user_by_email", lambda _email: {"id": "user-1", "email": "test@example.com"}) + monkeypatch.setattr(api_routes, "get_user_by_discord_id", lambda _id: {"email": "other@example.com"}) + + response = api_client.post( + "/api/discord/complete-verification", + json={"discord_id": "123", "user_email": "test@example.com"}, + headers=auth_header(), + ) + + assert response.status_code == 400 + assert "already linked" in response.get_json()["error"] + + +def test_discord_complete_verification_update_value_error(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "get_user_by_email", lambda _email: {"id": "user-1", "email": "test@example.com"}) + monkeypatch.setattr(api_routes, "get_user_by_discord_id", lambda _id: None) + + def raise_value_error(*_args, **_kwargs): + raise ValueError("invalid") + + monkeypatch.setattr("models.user.update_user", raise_value_error) + + response = api_client.post( + "/api/discord/complete-verification", + json={"discord_id": "123", "user_email": "test@example.com"}, + headers=auth_header(), + ) + + assert response.status_code == 400 + assert response.get_json()["error"] == "invalid" + + +def test_discord_complete_verification_success(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "get_user_by_email", lambda _email: {"id": "user-1", "email": "test@example.com"}) + monkeypatch.setattr(api_routes, "get_user_by_discord_id", lambda _id: None) + monkeypatch.setattr("models.user.update_user", lambda *_args, **_kwargs: None) + + response = api_client.post( + "/api/discord/complete-verification", + json={"discord_id": "123", "user_email": "test@example.com"}, + headers=auth_header(), + ) + + assert response.status_code == 200 + assert response.get_json()["success"] is True + + +def test_discord_complete_verification_exception(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, "get_user_by_email", lambda _email: (_ for _ in ()).throw(RuntimeError("boom")) + ) + monkeypatch.setattr( + api_routes, + "handle_api_error", + lambda _exc, ctx: (api_routes.jsonify({"success": False, "error": ctx}), 500), + ) + + response = api_client.post( + "/api/discord/complete-verification", + json={"discord_id": "123", "user_email": "test@example.com"}, + headers=auth_header(), + ) + + assert response.status_code == 500 + assert response.get_json()["error"] == "api_discord_complete_verification" + + +def test_discord_remove_roles_requires_json(api_client): + response = api_client.post( + "/api/discord/remove-roles", + data="null", + content_type="application/json", + headers=auth_header(), + ) + + assert response.status_code == 400 + + +def test_discord_remove_roles_requires_identifier(api_client): + response = api_client.post("/api/discord/remove-roles", json={"foo": "bar"}, headers=auth_header()) + + assert response.status_code == 400 + assert "Either discord_id or user_email is required" in response.get_json()["error"] + + +def test_discord_remove_roles_user_email_not_found(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "get_user_by_email", lambda _email: None) + + response = api_client.post( + "/api/discord/remove-roles", + json={"user_email": "test@example.com"}, + headers=auth_header(), + ) + + assert response.status_code == 404 + + +def test_discord_remove_roles_user_without_discord(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "get_user_by_email", lambda _email: {"email": "test@example.com", "discord_id": None}) + + response = api_client.post( + "/api/discord/remove-roles", + json={"user_email": "test@example.com"}, + headers=auth_header(), + ) + + assert response.status_code == 404 + assert "no Discord account linked" in response.get_json()["error"] + + +def test_discord_remove_roles_success(api_client, monkeypatch): + monkeypatch.setattr( + "utils.discord.remove_all_event_roles", + lambda _discord_id: {"success": True, "roles_removed": ["a"], "total_removed": 1}, + ) + + response = api_client.post( + "/api/discord/remove-roles", + json={"discord_id": "123"}, + headers=auth_header(), + ) + + assert response.status_code == 200 + payload = response.get_json() + assert payload["success"] is True + assert payload["total_removed"] == 1 + + +def test_discord_remove_roles_user_email_with_discord_success(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "discord_id": "123"}, + ) + monkeypatch.setattr( + "utils.discord.remove_all_event_roles", + lambda _discord_id: {"success": True, "roles_removed": [], "total_removed": 0}, + ) + + response = api_client.post( + "/api/discord/remove-roles", + json={"user_email": "test@example.com"}, + headers=auth_header(), + ) + + assert response.status_code == 200 + assert response.get_json()["success"] is True + + +def test_discord_remove_roles_failure_returns_500(api_client, monkeypatch): + monkeypatch.setattr( + "utils.discord.remove_all_event_roles", + lambda _discord_id: { + "success": False, + "error": "remove failed", + "roles_removed": [], + "roles_failed": ["a"], + }, + ) + + response = api_client.post( + "/api/discord/remove-roles", + json={"discord_id": "123"}, + headers=auth_header(), + ) + + assert response.status_code == 500 + assert response.get_json()["error"] == "remove failed" + + +def test_discord_remove_roles_exception(api_client, monkeypatch): + monkeypatch.setattr( + "utils.discord.remove_all_event_roles", + lambda _discord_id: (_ for _ in ()).throw(RuntimeError("boom")), + ) + monkeypatch.setattr( + api_routes, + "handle_api_error", + lambda _exc, ctx: (api_routes.jsonify({"success": False, "error": ctx}), 500), + ) + + response = api_client.post( + "/api/discord/remove-roles", + json={"discord_id": "123"}, + headers=auth_header(), + ) + + assert response.status_code == 500 + assert response.get_json()["error"] == "api_discord_remove_roles" + + +def test_discord_unlink_requires_json(api_client): + response = api_client.post( + "/api/discord/unlink", + data="null", + content_type="application/json", + headers=auth_header(), + ) + + assert response.status_code == 400 + + +def test_discord_unlink_requires_identifier(api_client): + response = api_client.post("/api/discord/unlink", json={"foo": "bar"}, headers=auth_header()) + + assert response.status_code == 400 + + +def test_discord_unlink_discord_id_not_found(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "get_user_by_discord_id", lambda _id: None) + + response = api_client.post( + "/api/discord/unlink", + json={"discord_id": "123"}, + headers=auth_header(), + ) + + assert response.status_code == 404 + assert "No user found" in response.get_json()["error"] + + +def test_discord_unlink_user_email_not_found(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "get_user_by_email", lambda _email: None) + + response = api_client.post( + "/api/discord/unlink", + json={"user_email": "test@example.com"}, + headers=auth_header(), + ) + + assert response.status_code == 404 + + +def test_discord_unlink_user_without_discord(api_client, monkeypatch): + monkeypatch.setattr(api_routes, "get_user_by_email", lambda _email: {"email": "test@example.com", "discord_id": None}) + + response = api_client.post( + "/api/discord/unlink", + json={"user_email": "test@example.com"}, + headers=auth_header(), + ) + + assert response.status_code == 400 + assert "no Discord account linked" in response.get_json()["error"] + + +def test_discord_unlink_success(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, "get_user_by_discord_id", lambda _id: {"email": "test@example.com"} + ) + monkeypatch.setattr( + "services.auth_service.unlink_discord_account", + lambda _email: { + "success": True, + "user_email": "test@example.com", + "previous_discord_id": "123", + "roles_removed": [], + "roles_failed": [], + "total_roles_removed": 0, + "total_roles_failed": 0, + "role_removal_success": True, + }, + ) + + response = api_client.post( + "/api/discord/unlink", + json={"discord_id": "123"}, + headers=auth_header(), + ) + + assert response.status_code == 200 + assert response.get_json()["success"] is True + + +def test_discord_unlink_success_by_user_email(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "discord_id": "123"}, + ) + monkeypatch.setattr( + "services.auth_service.unlink_discord_account", + lambda _email: { + "success": True, + "user_email": "test@example.com", + "previous_discord_id": "123", + "roles_removed": [], + "roles_failed": [], + "total_roles_removed": 0, + "total_roles_failed": 0, + "role_removal_success": True, + }, + ) + + response = api_client.post( + "/api/discord/unlink", + json={"user_email": "test@example.com"}, + headers=auth_header(), + ) + + assert response.status_code == 200 + assert response.get_json()["success"] is True + + +def test_discord_unlink_service_failure(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, "get_user_by_discord_id", lambda _id: {"email": "test@example.com"} + ) + monkeypatch.setattr( + "services.auth_service.unlink_discord_account", + lambda _email: {"success": False, "error": "unlink failed"}, + ) + + response = api_client.post( + "/api/discord/unlink", + json={"discord_id": "123"}, + headers=auth_header(), + ) + + assert response.status_code == 400 + assert response.get_json()["error"] == "unlink failed" + + +def test_discord_unlink_exception(api_client, monkeypatch): + monkeypatch.setattr( + api_routes, "get_user_by_discord_id", lambda _id: (_ for _ in ()).throw(RuntimeError("boom")) + ) + monkeypatch.setattr( + api_routes, + "handle_api_error", + lambda _exc, ctx: (api_routes.jsonify({"success": False, "error": ctx}), 500), + ) + + response = api_client.post( + "/api/discord/unlink", + json={"discord_id": "123"}, + headers=auth_header(), + ) + + assert response.status_code == 500 + assert response.get_json()["error"] == "api_discord_unlink" diff --git a/tests/test_auth_routes.py b/tests/test_auth_routes.py new file mode 100644 index 0000000..50731dd --- /dev/null +++ b/tests/test_auth_routes.py @@ -0,0 +1,205 @@ +import os +from pathlib import Path + +os.environ.setdefault("SECRET_KEY", "test-secret") + +from flask import Flask +import routes.auth as auth_routes + + +def create_auth_app(): + template_dir = Path(__file__).resolve().parents[1] / "templates" + app = Flask(__name__, template_folder=str(template_dir)) + app.secret_key = "test-secret" + app.jinja_env.globals["csrf_token"] = lambda: "test-csrf" + app.register_blueprint(auth_routes.auth_bp) + return app + + +def test_index_unauthenticated_renders_login_page(): + app = create_auth_app() + client = app.test_client() + + response = client.get("/") + + assert response.status_code == 200 + assert b"Sign in" in response.data + + +def test_auth_google_redirects_to_google_provider(monkeypatch): + app = create_auth_app() + client = app.test_client() + monkeypatch.setattr( + auth_routes, + "get_google_auth_url", + lambda: "https://accounts.google.com/o/oauth2/auth?state=test", + ) + + response = client.get("/auth/google") + + assert response.status_code == 302 + assert response.location.startswith("https://accounts.google.com/o/oauth2/auth") + + +def test_auth_google_callback_rejects_invalid_state(): + app = create_auth_app() + client = app.test_client() + + with client.session_transaction() as sess: + sess["oauth_state"] = "expected-state" + + response = client.get("/auth/google/callback?state=wrong-state&code=abc") + + assert response.status_code == 200 + assert b"Invalid OAuth state" in response.data + + +def test_send_code_json_requires_email(): + app = create_auth_app() + client = app.test_client() + + response = client.post("/send-code", json={}) + + assert response.status_code == 200 + assert response.get_json()["success"] is False + assert response.get_json()["error"] == "Email is required" + + +def test_send_code_json_success(monkeypatch): + app = create_auth_app() + client = app.test_client() + + monkeypatch.setattr(auth_routes, "send_email_verification", lambda _email: True) + + response = client.post("/send-code", json={"email": "test@example.com"}) + + assert response.status_code == 200 + payload = response.get_json() + assert payload["success"] is True + assert "Magic link sent" in payload["message"] + + +def test_email_callback_without_code_returns_error(): + app = create_auth_app() + client = app.test_client() + + response = client.get("/auth/email/callback") + + assert response.status_code == 200 + assert b"No code provided" in response.data + + +def test_email_callback_logs_in_existing_user(monkeypatch): + app = create_auth_app() + client = app.test_client() + + monkeypatch.setattr( + auth_routes, + "verify_email_code", + lambda _code: {"success": True, "email": "test@example.com", "name": "Test User"}, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Test User", "preferred_name": ""}, + ) + + response = client.get("/auth/email/callback?code=valid-code") + + assert response.status_code == 302 + assert response.location.endswith("/") + with client.session_transaction() as sess: + assert sess["user_email"] == "test@example.com" + + +def test_oauth_authorize_missing_required_params_shows_error(): + app = create_auth_app() + client = app.test_client() + + response = client.get("/oauth/authorize") + + assert response.status_code == 200 + assert b"Missing required OAuth parameters" in response.data + + +def test_oauth_authorize_unauthenticated_user_sees_login(monkeypatch): + app = create_auth_app() + client = app.test_client() + + monkeypatch.setattr( + auth_routes, + "get_app_by_client_id", + lambda _client_id: { + "id": "app_1", + "is_active": True, + "redirect_uris": '["https://example.com/callback"]', + "allowed_scopes": '["profile", "email"]', + "allow_anyone": True, + }, + ) + monkeypatch.setattr(auth_routes, "validate_redirect_uri", lambda _uri, _allowed: True) + + response = client.get( + "/oauth/authorize?client_id=client_1&redirect_uri=https://example.com/callback&scope=profile%20email&response_type=code" + ) + + assert response.status_code == 200 + assert b"Sign in" in response.data + + +def test_oauth_authorize_logged_in_with_skip_consent_redirects_with_code(monkeypatch): + app = create_auth_app() + client = app.test_client() + + monkeypatch.setattr( + auth_routes, + "get_app_by_client_id", + lambda _client_id: { + "id": "app_1", + "is_active": True, + "redirect_uris": '["https://example.com/callback"]', + "allowed_scopes": '["profile", "email"]', + "allow_anyone": True, + "skip_consent_screen": True, + }, + ) + monkeypatch.setattr(auth_routes, "validate_redirect_uri", lambda _uri, _allowed: True) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Test User"}, + ) + monkeypatch.setattr(auth_routes, "create_authorization_code", lambda **_kwargs: "auth-code-123") + + with client.session_transaction() as sess: + sess["user_email"] = "test@example.com" + + response = client.get( + "/oauth/authorize?client_id=client_1&redirect_uri=https://example.com/callback&scope=profile%20email&state=abc&response_type=code" + ) + + assert response.status_code == 302 + assert response.location == "https://example.com/callback?code=auth-code-123&state=abc" + + +def test_oauth_authorize_consent_denied_redirects_with_access_denied(monkeypatch): + app = create_auth_app() + client = app.test_client() + + monkeypatch.setattr( + auth_routes, + "get_app_by_client_id", + lambda _client_id: {"id": "app_1", "is_active": True, "allow_anyone": True}, + ) + + with client.session_transaction() as sess: + sess["user_email"] = "test@example.com" + sess["oauth2_client_id"] = "client_1" + sess["oauth2_redirect_uri"] = "https://example.com/callback" + sess["oauth2_scope"] = "profile email" + sess["oauth2_state"] = "xyz" + + response = client.post("/oauth/authorize", data={"action": "deny"}) + + assert response.status_code == 302 + assert response.location == "https://example.com/callback?error=access_denied&state=xyz" diff --git a/tests/test_auth_routes_comprehensive.py b/tests/test_auth_routes_comprehensive.py new file mode 100644 index 0000000..f8407a8 --- /dev/null +++ b/tests/test_auth_routes_comprehensive.py @@ -0,0 +1,1723 @@ +import os +from pathlib import Path + +os.environ.setdefault("SECRET_KEY", "test-secret") + +from flask import Flask + +import routes.auth as auth_routes + + +def create_auth_app(register_oauth=False): + template_dir = Path(__file__).resolve().parents[1] / "templates" + app = Flask(__name__, template_folder=str(template_dir)) + app.secret_key = "test-secret" + app.jinja_env.globals["csrf_token"] = lambda: "test-csrf" + app.register_blueprint(auth_routes.auth_bp) + if register_oauth: + app.register_blueprint(auth_routes.oauth_bp) + return app + + +def login_session(client, email="test@example.com", name="Tester"): + with client.session_transaction() as sess: + sess["user_email"] = email + sess["user_name"] = name + + +def oauth2_session(client): + with client.session_transaction() as sess: + sess["oauth2_client_id"] = "client_1" + sess["oauth2_redirect_uri"] = "https://example.com/callback" + sess["oauth2_scope"] = "profile email" + sess["oauth2_state"] = "abc" + + +def test_index_logged_in_profile_incomplete_redirects_to_register(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + + monkeypatch.setattr("models.admin.is_admin", lambda _email: False) + monkeypatch.setattr( + "services.dashboard_service.get_user_dashboard_data", + lambda _email: {"profile_complete": False}, + ) + + response = client.get("/") + + assert response.status_code == 302 + assert response.location.endswith("/register") + + +def test_index_logged_in_profile_complete_renders_dashboard(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + + monkeypatch.setattr("models.admin.is_admin", lambda _email: True) + monkeypatch.setattr( + "services.dashboard_service.get_user_dashboard_data", + lambda _email: {"profile_complete": True}, + ) + monkeypatch.setattr( + auth_routes, + "render_template", + lambda template, **ctx: f"{template}:{ctx.get('is_admin')}", + ) + + response = client.get("/") + + assert response.status_code == 200 + assert b"dashboard.html:True" in response.data + + +def test_auth_google_callback_missing_code_renders_error(): + app = create_auth_app() + client = app.test_client() + + with client.session_transaction() as sess: + sess["oauth_state"] = "state-1" + + response = client.get("/auth/google/callback?state=state-1") + + assert response.status_code == 200 + assert b"No authentication code received" in response.data + + +def test_auth_google_callback_provider_error_renders_error(monkeypatch): + app = create_auth_app() + client = app.test_client() + + with client.session_transaction() as sess: + sess["oauth_state"] = "state-1" + + monkeypatch.setattr( + auth_routes, + "handle_google_oauth_callback", + lambda _code: {"success": False, "error": "oauth failed"}, + ) + + response = client.get("/auth/google/callback?state=state-1&code=abc") + + assert response.status_code == 200 + assert b"oauth failed" in response.data + + +def test_auth_google_callback_new_user_redirects_register(monkeypatch): + app = create_auth_app() + client = app.test_client() + + with client.session_transaction() as sess: + sess["oauth_state"] = "state-1" + + monkeypatch.setattr( + auth_routes, + "handle_google_oauth_callback", + lambda _code: { + "success": True, + "user": {"email": "new@example.com", "name": "New User"}, + }, + ) + monkeypatch.setattr(auth_routes, "get_user_by_email", lambda _email: None) + + response = client.get("/auth/google/callback?state=state-1&code=abc") + + assert response.status_code == 302 + assert response.location.endswith("/register") + + +def test_auth_google_callback_verification_flow_redirects_verify_complete(monkeypatch): + app = create_auth_app() + client = app.test_client() + + with client.session_transaction() as sess: + sess["oauth_state"] = "state-1" + sess["verification_token"] = "verify-123" + + monkeypatch.setattr( + auth_routes, + "handle_google_oauth_callback", + lambda _code: { + "success": True, + "user": {"email": "test@example.com", "name": "Test User"}, + }, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Test User"}, + ) + + response = client.get("/auth/google/callback?state=state-1&code=abc") + + assert response.status_code == 302 + assert response.location.endswith("/verify/complete") + + +def test_auth_google_callback_oauth2_session_redirects_authorize(monkeypatch): + app = create_auth_app() + client = app.test_client() + + with client.session_transaction() as sess: + sess["oauth_state"] = "state-1" + sess["oauth2_client_id"] = "client_1" + sess["oauth2_redirect_uri"] = "https://example.com/callback" + + monkeypatch.setattr( + auth_routes, + "handle_google_oauth_callback", + lambda _code: { + "success": True, + "user": {"email": "test@example.com", "name": "Test User"}, + }, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Test User"}, + ) + + response = client.get("/auth/google/callback?state=state-1&code=abc") + + assert response.status_code == 302 + assert response.location.endswith("/oauth/authorize") + + +def test_auth_google_callback_legacy_flow_inactive_app_clears_session(monkeypatch): + app = create_auth_app() + client = app.test_client() + + with client.session_transaction() as sess: + sess["oauth_state"] = "state-1" + sess["oauth_redirect"] = "https://example.com/callback" + sess["oauth_app_id"] = "app_1" + + monkeypatch.setattr( + auth_routes, + "handle_google_oauth_callback", + lambda _code: { + "success": True, + "user": {"email": "test@example.com", "name": "Test User"}, + }, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Test User"}, + ) + monkeypatch.setattr("models.app.get_app_by_id", lambda _app_id: {"is_active": False}) + + response = client.get("/auth/google/callback?state=state-1&code=abc") + + assert response.status_code == 200 + assert b"no longer available" in response.data + with client.session_transaction() as sess: + assert "oauth_redirect" not in sess + assert "oauth_app_id" not in sess + + +def test_auth_google_callback_legacy_restricted_non_admin_denied(monkeypatch): + app = create_auth_app() + client = app.test_client() + + with client.session_transaction() as sess: + sess["oauth_state"] = "state-1" + sess["oauth_redirect"] = "https://example.com/callback" + sess["oauth_app_id"] = "app_1" + + monkeypatch.setattr( + auth_routes, + "handle_google_oauth_callback", + lambda _code: { + "success": True, + "user": {"email": "test@example.com", "name": "Test User"}, + }, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Test User"}, + ) + monkeypatch.setattr( + "models.app.get_app_by_id", + lambda _app_id: {"id": "app_1", "is_active": True, "allow_anyone": False}, + ) + monkeypatch.setattr(auth_routes, "is_admin", lambda _email: False) + + response = client.get("/auth/google/callback?state=state-1&code=abc") + + assert response.status_code == 200 + assert b"permission to access this app" in response.data + with client.session_transaction() as sess: + assert "oauth_redirect" not in sess + assert "oauth_app_id" not in sess + + +def test_auth_google_callback_legacy_restricted_no_explicit_permission_denied( + monkeypatch, +): + app = create_auth_app() + client = app.test_client() + + with client.session_transaction() as sess: + sess["oauth_state"] = "state-1" + sess["oauth_redirect"] = "https://example.com/callback" + sess["oauth_app_id"] = "app_1" + + monkeypatch.setattr( + auth_routes, + "handle_google_oauth_callback", + lambda _code: { + "success": True, + "user": {"email": "test@example.com", "name": "Test User"}, + }, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Test User"}, + ) + monkeypatch.setattr( + "models.app.get_app_by_id", + lambda _app_id: {"id": "app_1", "is_active": True, "allow_anyone": False}, + ) + monkeypatch.setattr(auth_routes, "is_admin", lambda _email: True) + monkeypatch.setattr(auth_routes, "has_app_permission", lambda *_args: False) + + response = client.get("/auth/google/callback?state=state-1&code=abc") + + assert response.status_code == 200 + assert b"permission to access this app" in response.data + + +def test_auth_google_callback_legacy_success_redirects_with_token(monkeypatch): + app = create_auth_app() + client = app.test_client() + + with client.session_transaction() as sess: + sess["oauth_state"] = "state-1" + sess["oauth_redirect"] = "https://example.com/callback" + sess["oauth_app_id"] = "app_1" + + monkeypatch.setattr( + auth_routes, + "handle_google_oauth_callback", + lambda _code: { + "success": True, + "user": {"email": "test@example.com", "name": "Test User"}, + }, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Test User"}, + ) + monkeypatch.setattr( + "models.app.get_app_by_id", + lambda _app_id: {"id": "app_1", "is_active": True, "allow_anyone": True}, + ) + monkeypatch.setattr(auth_routes, "create_oauth_token", lambda *_args, **_kwargs: "tok_1") + + response = client.get("/auth/google/callback?state=state-1&code=abc") + + assert response.status_code == 302 + assert response.location == "https://example.com/callback?token=tok_1" + + +def test_auth_google_callback_default_success_redirects_home(monkeypatch): + app = create_auth_app() + client = app.test_client() + + with client.session_transaction() as sess: + sess["oauth_state"] = "state-1" + + monkeypatch.setattr( + auth_routes, + "handle_google_oauth_callback", + lambda _code: { + "success": True, + "user": {"email": "test@example.com", "name": "Test User"}, + }, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Test User"}, + ) + + response = client.get("/auth/google/callback?state=state-1&code=abc") + + assert response.status_code == 302 + assert response.location.endswith("/") + + +def test_auth_google_with_debug_disabled_still_redirects(monkeypatch): + app = create_auth_app() + client = app.test_client() + monkeypatch.setattr(auth_routes, "DEBUG_MODE", False) + monkeypatch.setattr(auth_routes, "get_google_auth_url", lambda: "https://accounts.example/oauth") + + response = client.get("/auth/google") + + assert response.status_code == 302 + assert response.location == "https://accounts.example/oauth" + + +def test_auth_google_callback_legacy_restricted_authorized_user_redirects_with_token(monkeypatch): + app = create_auth_app() + client = app.test_client() + + with client.session_transaction() as sess: + sess["oauth_state"] = "state-1" + sess["oauth_redirect"] = "https://example.com/callback" + sess["oauth_app_id"] = "app_1" + + monkeypatch.setattr( + auth_routes, + "handle_google_oauth_callback", + lambda _code: { + "success": True, + "user": {"email": "test@example.com", "name": "Test User"}, + }, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Test User"}, + ) + monkeypatch.setattr( + "models.app.get_app_by_id", + lambda _app_id: {"id": "app_1", "is_active": True, "allow_anyone": False}, + ) + monkeypatch.setattr(auth_routes, "is_admin", lambda _email: True) + monkeypatch.setattr(auth_routes, "has_app_permission", lambda *_args: True) + monkeypatch.setattr(auth_routes, "create_oauth_token", lambda *_args, **_kwargs: "tok_1") + + response = client.get("/auth/google/callback?state=state-1&code=abc") + + assert response.status_code == 302 + assert response.location == "https://example.com/callback?token=tok_1" + + +def oauth_app( + *, allow_anyone=True, skip_consent_screen=False, active=True, scopes='["profile", "email"]' +): + return { + "id": "app_1", + "is_active": active, + "redirect_uris": '["https://example.com/callback"]', + "allowed_scopes": scopes, + "allow_anyone": allow_anyone, + "skip_consent_screen": skip_consent_screen, + } + + +def test_oauth_authorize_unsupported_response_type(monkeypatch): + app = create_auth_app() + client = app.test_client() + monkeypatch.setattr(auth_routes, "get_app_by_client_id", lambda _id: oauth_app()) + + response = client.get( + "/oauth/authorize?client_id=client_1&redirect_uri=https://example.com/callback&response_type=token" + ) + + assert response.status_code == 200 + assert b"Unsupported response_type" in response.data + + +def test_oauth_authorize_invalid_client(monkeypatch): + app = create_auth_app() + client = app.test_client() + monkeypatch.setattr(auth_routes, "get_app_by_client_id", lambda _id: None) + + response = client.get( + "/oauth/authorize?client_id=client_1&redirect_uri=https://example.com/callback" + ) + + assert response.status_code == 200 + assert b"Invalid client_id" in response.data + + +def test_oauth_authorize_inactive_client(monkeypatch): + app = create_auth_app() + client = app.test_client() + monkeypatch.setattr( + auth_routes, "get_app_by_client_id", lambda _id: oauth_app(active=False) + ) + + response = client.get( + "/oauth/authorize?client_id=client_1&redirect_uri=https://example.com/callback" + ) + + assert response.status_code == 200 + assert b"currently disabled" in response.data + + +def test_oauth_authorize_invalid_redirect_uri(monkeypatch): + app = create_auth_app() + client = app.test_client() + monkeypatch.setattr(auth_routes, "get_app_by_client_id", lambda _id: oauth_app()) + monkeypatch.setattr(auth_routes, "validate_redirect_uri", lambda *_args: False) + + response = client.get( + "/oauth/authorize?client_id=client_1&redirect_uri=https://wrong.example/callback" + ) + + assert response.status_code == 200 + assert b"Invalid redirect_uri" in response.data + + +def test_oauth_authorize_invalid_scope(monkeypatch): + app = create_auth_app() + client = app.test_client() + monkeypatch.setattr(auth_routes, "get_app_by_client_id", lambda _id: oauth_app()) + monkeypatch.setattr(auth_routes, "validate_redirect_uri", lambda *_args: True) + + response = client.get( + "/oauth/authorize?client_id=client_1&redirect_uri=https://example.com/callback&scope=admin" + ) + + assert response.status_code == 200 + assert b"Invalid scope" in response.data + + +def test_oauth_authorize_restricted_app_permission_denied(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + monkeypatch.setattr( + auth_routes, "get_app_by_client_id", lambda _id: oauth_app(allow_anyone=False) + ) + monkeypatch.setattr(auth_routes, "validate_redirect_uri", lambda *_args: True) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Legal Name"}, + ) + monkeypatch.setattr(auth_routes, "is_admin", lambda _email: False) + monkeypatch.setattr(auth_routes, "has_app_permission", lambda *_args: False) + + response = client.get( + "/oauth/authorize?client_id=client_1&redirect_uri=https://example.com/callback" + ) + + assert response.status_code == 200 + assert b"permission to access this app" in response.data + + +def test_oauth_authorize_restricted_app_authorized_user_can_continue(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + monkeypatch.setattr( + auth_routes, + "get_app_by_client_id", + lambda _id: oauth_app(allow_anyone=False, skip_consent_screen=True), + ) + monkeypatch.setattr(auth_routes, "validate_redirect_uri", lambda *_args: True) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Legal Name"}, + ) + monkeypatch.setattr(auth_routes, "is_admin", lambda _email: True) + monkeypatch.setattr(auth_routes, "has_app_permission", lambda *_args: True) + monkeypatch.setattr(auth_routes, "create_authorization_code", lambda **_kwargs: "code-1") + + response = client.get( + "/oauth/authorize?client_id=client_1&redirect_uri=https://example.com/callback&state=abc" + ) + + assert response.status_code == 302 + assert response.location == "https://example.com/callback?code=code-1&state=abc" + + +def test_oauth_authorize_logged_in_shows_consent_screen(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + monkeypatch.setattr(auth_routes, "get_app_by_client_id", lambda _id: oauth_app()) + monkeypatch.setattr(auth_routes, "validate_redirect_uri", lambda *_args: True) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Legal Name"}, + ) + + response = client.get( + "/oauth/authorize?client_id=client_1&redirect_uri=https://example.com/callback" + ) + + assert response.status_code == 200 + assert b"Authorize" in response.data + + +def test_oauth_authorize_logged_in_without_legal_name_shows_login(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + monkeypatch.setattr(auth_routes, "get_app_by_client_id", lambda _id: oauth_app()) + monkeypatch.setattr(auth_routes, "validate_redirect_uri", lambda *_args: True) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": ""}, + ) + + response = client.get( + "/oauth/authorize?client_id=client_1&redirect_uri=https://example.com/callback" + ) + + assert response.status_code == 200 + assert b"Sign in" in response.data + + +def test_oauth_authorize_uses_session_cached_params(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + oauth2_session(client) + + monkeypatch.setattr( + auth_routes, + "get_app_by_client_id", + lambda client_id: oauth_app() if client_id == "client_1" else None, + ) + monkeypatch.setattr(auth_routes, "validate_redirect_uri", lambda *_args: True) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Legal Name"}, + ) + + response = client.get("/oauth/authorize") + + assert response.status_code == 200 + assert b"Authorize" in response.data + + +def test_oauth_authorize_consent_requires_user_session(): + app = create_auth_app() + client = app.test_client() + + response = client.post("/oauth/authorize", data={"action": "approve"}) + + assert response.status_code == 200 + assert b"Session expired" in response.data + + +def test_oauth_authorize_consent_requires_oauth_session(login_email="test@example.com"): + app = create_auth_app() + client = app.test_client() + login_session(client, email=login_email) + + response = client.post("/oauth/authorize", data={"action": "approve"}) + + assert response.status_code == 200 + assert b"Invalid OAuth session" in response.data + + +def test_oauth_authorize_consent_inactive_app_redirects_invalid_client(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + oauth2_session(client) + monkeypatch.setattr(auth_routes, "get_app_by_client_id", lambda _id: oauth_app(active=False)) + + response = client.post("/oauth/authorize", data={"action": "approve"}) + + assert response.status_code == 302 + assert response.location == "https://example.com/callback?error=invalid_client&state=abc" + with client.session_transaction() as sess: + assert "oauth2_client_id" not in sess + assert "oauth2_redirect_uri" not in sess + assert "oauth2_scope" not in sess + assert "oauth2_state" not in sess + + +def test_oauth_authorize_consent_restricted_permission_denied(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + oauth2_session(client) + monkeypatch.setattr( + auth_routes, "get_app_by_client_id", lambda _id: oauth_app(allow_anyone=False) + ) + monkeypatch.setattr(auth_routes, "is_admin", lambda _email: False) + + response = client.post("/oauth/authorize", data={"action": "approve"}) + + assert response.status_code == 302 + assert ( + response.location + == "https://example.com/callback?error=access_denied&error_description=insufficient_permissions&state=abc" + ) + + +def test_oauth_authorize_consent_restricted_authorized_user_can_deny(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + oauth2_session(client) + monkeypatch.setattr( + auth_routes, "get_app_by_client_id", lambda _id: oauth_app(allow_anyone=False) + ) + monkeypatch.setattr(auth_routes, "is_admin", lambda _email: True) + monkeypatch.setattr(auth_routes, "has_app_permission", lambda *_args: True) + + response = client.post("/oauth/authorize", data={"action": "deny"}) + + assert response.status_code == 302 + assert response.location == "https://example.com/callback?error=access_denied&state=abc" + + +def test_oauth_authorize_consent_approve_success_generates_code_and_clears_session( + monkeypatch, +): + app = create_auth_app() + client = app.test_client() + login_session(client) + oauth2_session(client) + + monkeypatch.setattr(auth_routes, "get_app_by_client_id", lambda _id: oauth_app()) + monkeypatch.setattr( + auth_routes, "create_authorization_code", lambda **_kwargs: "issued-code-1" + ) + + response = client.post("/oauth/authorize", data={"action": "approve"}) + + assert response.status_code == 302 + assert response.location == "https://example.com/callback?code=issued-code-1&state=abc" + with client.session_transaction() as sess: + assert "oauth2_client_id" not in sess + + +def test_oauth_token_maps_invalid_client_error(monkeypatch): + app = create_auth_app(register_oauth=True) + client = app.test_client() + monkeypatch.setattr( + auth_routes, + "exchange_code_for_token", + lambda *_args: {"success": False, "error": "invalid_client"}, + ) + + response = client.post( + "/oauth/token", + data={ + "grant_type": "authorization_code", + "code": "code_1", + "redirect_uri": "https://example.com/callback", + "client_id": "client_1", + "client_secret": "secret_1", + }, + ) + + assert response.status_code == 400 + payload = response.get_json() + assert payload["error"] == "invalid_client" + assert "client_secret" in payload["error_description"] + + +def test_oauth_token_maps_unknown_exchange_error(monkeypatch): + app = create_auth_app(register_oauth=True) + client = app.test_client() + monkeypatch.setattr( + auth_routes, + "exchange_code_for_token", + lambda *_args: {"success": False, "error": "server_boom"}, + ) + + response = client.post( + "/oauth/token", + data={ + "grant_type": "authorization_code", + "code": "code_1", + "redirect_uri": "https://example.com/callback", + "client_id": "client_1", + "client_secret": "secret_1", + }, + ) + + assert response.status_code == 400 + payload = response.get_json() + assert payload["error"] == "server_boom" + assert "Invalid authorization code or client credentials" in payload["error_description"] + + +def test_oauth_token_with_debug_disabled_success(monkeypatch): + app = create_auth_app(register_oauth=True) + client = app.test_client() + monkeypatch.setattr(auth_routes, "DEBUG_MODE", False) + monkeypatch.setattr( + auth_routes, + "exchange_code_for_token", + lambda *_args: { + "success": True, + "access_token": "access_1", + "token_type": "Bearer", + "expires_in": 3600, + "scope": "profile", + }, + ) + + response = client.post( + "/oauth/token", + data={ + "grant_type": "authorization_code", + "code": "code_1", + "redirect_uri": "https://example.com/callback", + "client_id": "client_1", + "client_secret": "secret_1", + }, + ) + + assert response.status_code == 200 + assert response.get_json()["access_token"] == "access_1" + + +def test_oauth_revoke_invokes_revoke_function(monkeypatch): + app = create_auth_app(register_oauth=True) + client = app.test_client() + revoked = {} + monkeypatch.setattr( + auth_routes, "revoke_access_token", lambda token: revoked.setdefault("token", token) + ) + + response = client.post("/oauth/revoke", data={"token": "tok_123"}) + + assert response.status_code == 200 + assert response.get_json()["success"] is True + assert revoked["token"] == "tok_123" + + +def test_oauth_legacy_inactive_app_returns_error(monkeypatch): + app = create_auth_app() + client = app.test_client() + monkeypatch.setattr( + auth_routes, + "validate_app_redirect", + lambda _redirect: {"id": "app_1", "is_active": False}, + ) + + response = client.get("/oauth?redirect=https://example.com/callback") + + assert response.status_code == 200 + assert b"currently disabled" in response.data + + +def test_oauth_legacy_restricted_non_admin_denied(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + monkeypatch.setattr( + auth_routes, + "validate_app_redirect", + lambda _redirect: {"id": "app_1", "is_active": True, "allow_anyone": False}, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Legal Name"}, + ) + monkeypatch.setattr(auth_routes, "is_admin", lambda _email: False) + + response = client.get("/oauth?redirect=https://example.com/callback") + + assert response.status_code == 200 + assert b"Please contact an administrator" in response.data + + +def test_oauth_legacy_restricted_no_permission_denied(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + monkeypatch.setattr( + auth_routes, + "validate_app_redirect", + lambda _redirect: {"id": "app_1", "is_active": True, "allow_anyone": False}, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Legal Name"}, + ) + monkeypatch.setattr(auth_routes, "is_admin", lambda _email: True) + monkeypatch.setattr(auth_routes, "has_app_permission", lambda *_args: False) + + response = client.get("/oauth?redirect=https://example.com/callback") + + assert response.status_code == 200 + assert b"Please contact an administrator" in response.data + + +def test_oauth_legacy_restricted_authorized_user_gets_token(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + monkeypatch.setattr( + auth_routes, + "validate_app_redirect", + lambda _redirect: {"id": "app_1", "is_active": True, "allow_anyone": False}, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Legal Name"}, + ) + monkeypatch.setattr(auth_routes, "is_admin", lambda _email: True) + monkeypatch.setattr(auth_routes, "has_app_permission", lambda *_args: True) + monkeypatch.setattr(auth_routes, "create_oauth_token", lambda *_args, **_kwargs: "tok_1") + + response = client.get("/oauth?redirect=https://example.com/callback") + + assert response.status_code == 302 + assert response.location == "https://example.com/callback?token=tok_1" + + +def test_oauth_legacy_logged_in_without_complete_profile_shows_login(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + monkeypatch.setattr( + auth_routes, + "validate_app_redirect", + lambda _redirect: {"id": "app_1", "is_active": True, "allow_anyone": True}, + ) + monkeypatch.setattr( + auth_routes, "get_user_by_email", lambda _email: {"email": "test@example.com", "legal_name": ""} + ) + + response = client.get("/oauth?redirect=https://example.com/callback") + + assert response.status_code == 200 + assert b"Sign in" in response.data + + +def test_oauth_legacy_success_with_existing_query_uses_ampersand(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + monkeypatch.setattr( + auth_routes, + "validate_app_redirect", + lambda _redirect: {"id": "app_1", "is_active": True, "allow_anyone": True}, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Legal Name"}, + ) + monkeypatch.setattr(auth_routes, "create_oauth_token", lambda *_args, **_kwargs: "tok_1") + + response = client.get("/oauth?redirect=https%3A%2F%2Fexample.com%2Fcallback%3Fnext%3D1") + + assert response.status_code == 302 + assert response.location == "https://example.com/callback?next=1&token=tok_1" + + +def test_send_code_json_failure(monkeypatch): + app = create_auth_app() + client = app.test_client() + monkeypatch.setattr(auth_routes, "send_email_verification", lambda _email: False) + + response = client.post("/send-code", json={"email": "test@example.com"}) + + assert response.status_code == 200 + payload = response.get_json() + assert payload["success"] is False + assert "Failed to send magic link" in payload["error"] + + +def test_send_code_form_requires_email(): + app = create_auth_app() + client = app.test_client() + + response = client.post("/send-code", data={}) + + assert response.status_code == 200 + assert b"Email is required" in response.data + + +def test_send_code_form_success(monkeypatch): + app = create_auth_app() + client = app.test_client() + monkeypatch.setattr(auth_routes, "send_email_verification", lambda _email: True) + + response = client.post("/send-code", data={"email": "test@example.com"}) + + assert response.status_code == 200 + assert b"Check Your Email" in response.data + + +def test_send_code_form_failure(monkeypatch): + app = create_auth_app() + client = app.test_client() + monkeypatch.setattr(auth_routes, "send_email_verification", lambda _email: False) + + response = client.post("/send-code", data={"email": "test@example.com"}) + + assert response.status_code == 200 + assert b"Failed to send magic link" in response.data + + +def test_email_callback_failed_verification_renders_error(monkeypatch): + app = create_auth_app() + client = app.test_client() + monkeypatch.setattr( + auth_routes, "verify_email_code", lambda _code: {"success": False, "error": "expired"} + ) + + response = client.get("/auth/email/callback?code=bad") + + assert response.status_code == 200 + assert b"expired" in response.data + + +def test_email_callback_existing_user_oauth2_redirect(monkeypatch): + app = create_auth_app() + client = app.test_client() + oauth2_session(client) + monkeypatch.setattr( + auth_routes, + "verify_email_code", + lambda _code: {"success": True, "email": "test@example.com", "name": "Tester"}, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Legal Name"}, + ) + + response = client.get("/auth/email/callback?code=good") + + assert response.status_code == 302 + assert response.location.endswith("/oauth/authorize") + + +def test_email_callback_existing_user_legacy_inactive_app(monkeypatch): + app = create_auth_app() + client = app.test_client() + with client.session_transaction() as sess: + sess["oauth_redirect"] = "https://example.com/callback" + sess["oauth_app_id"] = "app_1" + monkeypatch.setattr( + auth_routes, + "verify_email_code", + lambda _code: {"success": True, "email": "test@example.com", "name": "Tester"}, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Legal Name"}, + ) + monkeypatch.setattr("models.app.get_app_by_id", lambda _id: {"is_active": False}) + + response = client.get("/auth/email/callback?code=good") + + assert response.status_code == 200 + assert b"no longer available" in response.data + + +def test_email_callback_existing_user_legacy_restricted_non_admin(monkeypatch): + app = create_auth_app() + client = app.test_client() + with client.session_transaction() as sess: + sess["oauth_redirect"] = "https://example.com/callback" + sess["oauth_app_id"] = "app_1" + monkeypatch.setattr( + auth_routes, + "verify_email_code", + lambda _code: {"success": True, "email": "test@example.com", "name": "Tester"}, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Legal Name"}, + ) + monkeypatch.setattr( + "models.app.get_app_by_id", + lambda _id: {"id": "app_1", "is_active": True, "allow_anyone": False}, + ) + monkeypatch.setattr(auth_routes, "is_admin", lambda _email: False) + + response = client.get("/auth/email/callback?code=good") + + assert response.status_code == 200 + assert b"permission to access this app" in response.data + + +def test_email_callback_existing_user_legacy_restricted_missing_permission(monkeypatch): + app = create_auth_app() + client = app.test_client() + with client.session_transaction() as sess: + sess["oauth_redirect"] = "https://example.com/callback" + sess["oauth_app_id"] = "app_1" + monkeypatch.setattr( + auth_routes, + "verify_email_code", + lambda _code: {"success": True, "email": "test@example.com", "name": "Tester"}, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Legal Name"}, + ) + monkeypatch.setattr( + "models.app.get_app_by_id", + lambda _id: {"id": "app_1", "is_active": True, "allow_anyone": False}, + ) + monkeypatch.setattr(auth_routes, "is_admin", lambda _email: True) + monkeypatch.setattr(auth_routes, "has_app_permission", lambda *_args: False) + + response = client.get("/auth/email/callback?code=good") + + assert response.status_code == 200 + assert b"permission to access this app" in response.data + + +def test_email_callback_existing_user_legacy_success_redirects_with_token(monkeypatch): + app = create_auth_app() + client = app.test_client() + with client.session_transaction() as sess: + sess["oauth_redirect"] = "https://example.com/callback" + sess["oauth_app_id"] = "app_1" + monkeypatch.setattr( + auth_routes, + "verify_email_code", + lambda _code: {"success": True, "email": "test@example.com", "name": "Tester"}, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Legal Name"}, + ) + monkeypatch.setattr( + "models.app.get_app_by_id", + lambda _id: {"id": "app_1", "is_active": True, "allow_anyone": True}, + ) + monkeypatch.setattr(auth_routes, "create_oauth_token", lambda *_args, **_kwargs: "tok_1") + + response = client.get("/auth/email/callback?code=good") + + assert response.status_code == 302 + assert response.location == "https://example.com/callback?token=tok_1" + + +def test_email_callback_existing_user_legacy_restricted_authorized_user(monkeypatch): + app = create_auth_app() + client = app.test_client() + with client.session_transaction() as sess: + sess["oauth_redirect"] = "https://example.com/callback" + sess["oauth_app_id"] = "app_1" + monkeypatch.setattr( + auth_routes, + "verify_email_code", + lambda _code: {"success": True, "email": "test@example.com", "name": "Tester"}, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Legal Name"}, + ) + monkeypatch.setattr( + "models.app.get_app_by_id", + lambda _id: {"id": "app_1", "is_active": True, "allow_anyone": False}, + ) + monkeypatch.setattr(auth_routes, "is_admin", lambda _email: True) + monkeypatch.setattr(auth_routes, "has_app_permission", lambda *_args: True) + monkeypatch.setattr(auth_routes, "create_oauth_token", lambda *_args, **_kwargs: "tok_1") + + response = client.get("/auth/email/callback?code=good") + + assert response.status_code == 302 + assert response.location == "https://example.com/callback?token=tok_1" + + +def test_email_callback_new_user_sets_pending_registration(monkeypatch): + app = create_auth_app() + client = app.test_client() + monkeypatch.setattr( + auth_routes, + "verify_email_code", + lambda _code: {"success": True, "email": "new@example.com", "name": "New User"}, + ) + monkeypatch.setattr(auth_routes, "get_user_by_email", lambda _email: None) + + response = client.get("/auth/email/callback?code=good") + + assert response.status_code == 302 + assert response.location.endswith("/register") + with client.session_transaction() as sess: + assert sess["pending_registration"] is True + assert sess["user_email"] == "new@example.com" + + +def test_verify_code_route_returns_magic_link_guidance(): + app = create_auth_app() + client = app.test_client() + + response = client.post("/verify-code") + + assert response.status_code == 200 + payload = response.get_json() + assert payload["success"] is False + assert "magic link" in payload["error"] + + +def test_verify_discord_missing_token(): + app = create_auth_app() + client = app.test_client() + + response = client.get("/verify") + + assert response.status_code == 200 + assert b"Missing verification token" in response.data + + +def test_verify_discord_invalid_token(monkeypatch): + app = create_auth_app() + client = app.test_client() + monkeypatch.setattr(auth_routes, "verify_discord_token", lambda _token: None) + + response = client.get("/verify?token=bad") + + assert response.status_code == 200 + assert b"Invalid or expired verification link" in response.data + + +def test_verify_discord_valid_token_sets_session(monkeypatch): + app = create_auth_app() + client = app.test_client() + monkeypatch.setattr( + auth_routes, + "verify_discord_token", + lambda _token: {"discord_id": "123", "discord_username": "tester"}, + ) + + response = client.get("/verify?token=good") + + assert response.status_code == 200 + assert b"Sign in" in response.data + with client.session_transaction() as sess: + assert sess["verification_token"] == "good" + assert sess["discord_id"] == "123" + + +def test_verify_discord_valid_token_logged_in_redirects_complete(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + monkeypatch.setattr( + auth_routes, + "verify_discord_token", + lambda _token: {"discord_id": "123", "discord_username": "tester"}, + ) + + response = client.get("/verify?token=good") + + assert response.status_code == 302 + assert response.location.endswith("/verify/complete") + + +def test_verify_complete_requires_verification_token(): + app = create_auth_app() + client = app.test_client() + + response = client.get("/verify/complete") + + assert response.status_code == 200 + assert b"Invalid verification state" in response.data + + +def test_verify_complete_requires_logged_in_user(): + app = create_auth_app() + client = app.test_client() + with client.session_transaction() as sess: + sess["verification_token"] = "tok_1" + + response = client.get("/verify/complete") + + assert response.status_code == 200 + assert b"Please log in first" in response.data + + +def test_verify_complete_success_clears_session(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + with client.session_transaction() as sess: + sess["verification_token"] = "tok_1" + sess["discord_id"] = "123" + sess["discord_username"] = "tester" + + monkeypatch.setattr( + auth_routes, + "complete_discord_verification", + lambda _token, _email: { + "success": True, + "discord_username": "tester", + "roles_assigned": [{"event_id": "hackathon", "role_name": "Hacker"}], + "roles_failed": [], + "total_roles_assigned": 1, + "total_roles_failed": 0, + }, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: { + "email": "test@example.com", + "preferred_name": "Tester", + "events": ["event-1"], + }, + ) + + response = client.get("/verify/complete") + + assert response.status_code == 200 + assert b"Discord Verification Successful" in response.data + with client.session_transaction() as sess: + assert "verification_token" not in sess + assert "discord_id" not in sess + + +def test_verify_complete_failure_renders_error(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + with client.session_transaction() as sess: + sess["verification_token"] = "tok_1" + + monkeypatch.setattr( + auth_routes, + "complete_discord_verification", + lambda _token, _email: {"success": False, "error": "verification failed"}, + ) + + response = client.get("/verify/complete") + + assert response.status_code == 200 + assert b"verification failed" in response.data + + +def test_register_requires_logged_in_user(): + app = create_auth_app() + client = app.test_client() + + response = client.get("/register") + + assert response.status_code == 302 + assert response.location.endswith("/") + + +def test_register_existing_complete_user_redirects_home(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Legal Name"}, + ) + + response = client.get("/register") + + assert response.status_code == 302 + assert response.location.endswith("/") + + +def test_register_get_renders_form_when_pending(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + with client.session_transaction() as sess: + sess["pending_registration"] = True + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Legal Name"}, + ) + + response = client.get("/register") + + assert response.status_code == 200 + assert b"Complete Your Registration" in response.data + + +def test_register_post_validation_errors(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + monkeypatch.setattr(auth_routes, "get_user_by_email", lambda _email: None) + + response = client.post("/register", data={"legal_name": "", "dob": "", "pronouns": ""}) + + assert response.status_code == 200 + assert b"Legal name is required" in response.data + assert b"Date of birth is required" in response.data + assert b"Pronouns are required" in response.data + + +def test_register_post_invalid_dob(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + monkeypatch.setattr(auth_routes, "get_user_by_email", lambda _email: None) + + response = client.post( + "/register", + data={"legal_name": "Legal Name", "dob": "not-a-date", "pronouns": "they/them"}, + ) + + assert response.status_code == 200 + assert b"Invalid date format" in response.data + + +def test_register_post_update_user_value_error(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"id": "user_1", "email": "test@example.com", "legal_name": ""}, + ) + + def raise_value_error(*_args, **_kwargs): + raise ValueError("update failed") + + monkeypatch.setattr(auth_routes, "update_user", raise_value_error) + + response = client.post( + "/register", + data={ + "legal_name": "Legal Name", + "preferred_name": "Nick", + "dob": "2000-01-01", + "pronouns": "they/them", + }, + ) + + assert response.status_code == 200 + assert b"update failed" in response.data + + +def test_register_post_update_redirects_verify_complete(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + with client.session_transaction() as sess: + sess["verification_token"] = "tok_1" + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"id": "user_1", "email": "test@example.com", "legal_name": ""}, + ) + monkeypatch.setattr(auth_routes, "update_user", lambda *_args, **_kwargs: None) + + response = client.post( + "/register", + data={"legal_name": "Legal Name", "dob": "2000-01-01", "pronouns": "they/them"}, + ) + + assert response.status_code == 302 + assert response.location.endswith("/verify/complete") + + +def test_register_post_update_redirects_oauth_authorize(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + oauth2_session(client) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"id": "user_1", "email": "test@example.com", "legal_name": ""}, + ) + monkeypatch.setattr(auth_routes, "update_user", lambda *_args, **_kwargs: None) + + response = client.post( + "/register", + data={"legal_name": "Legal Name", "dob": "2000-01-01", "pronouns": "they/them"}, + ) + + assert response.status_code == 302 + assert response.location.endswith("/oauth/authorize") + + +def test_register_post_update_legacy_inactive_app(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + with client.session_transaction() as sess: + sess["oauth_redirect"] = "https://example.com/callback" + sess["oauth_app_id"] = "app_1" + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"id": "user_1", "email": "test@example.com", "legal_name": ""}, + ) + monkeypatch.setattr(auth_routes, "update_user", lambda *_args, **_kwargs: None) + monkeypatch.setattr("models.app.get_app_by_id", lambda _id: {"is_active": False}) + + response = client.post( + "/register", + data={"legal_name": "Legal Name", "dob": "2000-01-01", "pronouns": "they/them"}, + ) + + assert response.status_code == 200 + assert b"no longer available" in response.data + + +def test_register_post_update_legacy_restricted_non_admin(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + with client.session_transaction() as sess: + sess["oauth_redirect"] = "https://example.com/callback" + sess["oauth_app_id"] = "app_1" + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"id": "user_1", "email": "test@example.com", "legal_name": ""}, + ) + monkeypatch.setattr(auth_routes, "update_user", lambda *_args, **_kwargs: None) + monkeypatch.setattr( + "models.app.get_app_by_id", + lambda _id: {"id": "app_1", "is_active": True, "allow_anyone": False}, + ) + monkeypatch.setattr(auth_routes, "is_admin", lambda _email: False) + + response = client.post( + "/register", + data={"legal_name": "Legal Name", "dob": "2000-01-01", "pronouns": "they/them"}, + ) + + assert response.status_code == 200 + assert b"permission to access this app" in response.data + + +def test_register_post_update_legacy_restricted_missing_permission(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + with client.session_transaction() as sess: + sess["oauth_redirect"] = "https://example.com/callback" + sess["oauth_app_id"] = "app_1" + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"id": "user_1", "email": "test@example.com", "legal_name": ""}, + ) + monkeypatch.setattr(auth_routes, "update_user", lambda *_args, **_kwargs: None) + monkeypatch.setattr( + "models.app.get_app_by_id", + lambda _id: {"id": "app_1", "is_active": True, "allow_anyone": False}, + ) + monkeypatch.setattr(auth_routes, "is_admin", lambda _email: True) + monkeypatch.setattr(auth_routes, "has_app_permission", lambda *_args: False) + + response = client.post( + "/register", + data={"legal_name": "Legal Name", "dob": "2000-01-01", "pronouns": "they/them"}, + ) + + assert response.status_code == 200 + assert b"permission to access this app" in response.data + + +def test_register_post_update_legacy_success_redirects_with_token(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + with client.session_transaction() as sess: + sess["oauth_redirect"] = "https://example.com/callback" + sess["oauth_app_id"] = "app_1" + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"id": "user_1", "email": "test@example.com", "legal_name": ""}, + ) + monkeypatch.setattr(auth_routes, "update_user", lambda *_args, **_kwargs: None) + monkeypatch.setattr( + "models.app.get_app_by_id", + lambda _id: {"id": "app_1", "is_active": True, "allow_anyone": True}, + ) + monkeypatch.setattr(auth_routes, "create_oauth_token", lambda *_args, **_kwargs: "tok_1") + + response = client.post( + "/register", + data={"legal_name": "Legal Name", "dob": "2000-01-01", "pronouns": "they/them"}, + ) + + assert response.status_code == 302 + assert response.location == "https://example.com/callback?token=tok_1" + + +def test_register_post_update_legacy_restricted_authorized_user_redirects_with_token( + monkeypatch, +): + app = create_auth_app() + client = app.test_client() + login_session(client) + with client.session_transaction() as sess: + sess["oauth_redirect"] = "https://example.com/callback" + sess["oauth_app_id"] = "app_1" + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"id": "user_1", "email": "test@example.com", "legal_name": ""}, + ) + monkeypatch.setattr(auth_routes, "update_user", lambda *_args, **_kwargs: None) + monkeypatch.setattr( + "models.app.get_app_by_id", + lambda _id: {"id": "app_1", "is_active": True, "allow_anyone": False}, + ) + monkeypatch.setattr(auth_routes, "is_admin", lambda _email: True) + monkeypatch.setattr(auth_routes, "has_app_permission", lambda *_args: True) + monkeypatch.setattr(auth_routes, "create_oauth_token", lambda *_args, **_kwargs: "tok_1") + + response = client.post( + "/register", + data={"legal_name": "Legal Name", "dob": "2000-01-01", "pronouns": "they/them"}, + ) + + assert response.status_code == 302 + assert response.location == "https://example.com/callback?token=tok_1" + + +def test_register_post_create_user_default_redirect_home_without_oauth_redirect(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client, email="new2@example.com") + monkeypatch.setattr(auth_routes, "get_user_by_email", lambda _email: None) + monkeypatch.setattr(auth_routes, "create_user", lambda **_kwargs: None) + + response = client.post( + "/register", + data={ + "legal_name": "Legal Name", + "preferred_name": "Nick", + "dob": "2000-01-01", + "pronouns": "they/them", + }, + ) + + assert response.status_code == 302 + assert response.location.endswith("/") + + +def test_register_post_create_user_and_clear_incomplete_oauth_session(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client, email="new@example.com") + with client.session_transaction() as sess: + sess["oauth_redirect"] = "https://example.com/callback" + + created = {} + monkeypatch.setattr(auth_routes, "get_user_by_email", lambda _email: None) + monkeypatch.setattr( + auth_routes, + "create_user", + lambda **kwargs: created.update(kwargs), + ) + + response = client.post( + "/register", + data={ + "legal_name": "Legal Name", + "preferred_name": "Nick", + "dob": "2000-01-01", + "pronouns": "they/them", + }, + ) + + assert response.status_code == 302 + assert response.location.endswith("/") + assert created["email"] == "new@example.com" + assert created["dob"] == "01/01/2000" + with client.session_transaction() as sess: + assert "oauth_redirect" not in sess + + +def test_unlink_discord_dashboard_requires_login(): + app = create_auth_app() + client = app.test_client() + + response = client.post("/dashboard/discord/unlink") + + assert response.status_code == 401 + assert response.get_json()["success"] is False + + +def test_unlink_discord_dashboard_success(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + monkeypatch.setattr( + auth_routes, + "unlink_discord_account", + lambda _email: { + "success": True, + "total_roles_removed": 2, + "total_roles_failed": 0, + "role_removal_success": True, + }, + ) + + response = client.post("/dashboard/discord/unlink") + + assert response.status_code == 200 + payload = response.get_json() + assert payload["success"] is True + assert payload["roles_removed"] == 2 + + +def test_unlink_discord_dashboard_service_failure(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + monkeypatch.setattr( + auth_routes, + "unlink_discord_account", + lambda _email: {"success": False, "error": "unlink failed"}, + ) + + response = client.post("/dashboard/discord/unlink") + + assert response.status_code == 400 + assert response.get_json()["error"] == "unlink failed" + + +def test_unlink_discord_dashboard_exception_returns_500(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + + def raise_exc(_email): + raise RuntimeError("boom") + + monkeypatch.setattr(auth_routes, "unlink_discord_account", raise_exc) + + response = client.post("/dashboard/discord/unlink") + + assert response.status_code == 500 + assert response.get_json()["error"] == "Internal server error" + + +def test_unlink_discord_dashboard_exception_with_debug_disabled(monkeypatch): + app = create_auth_app() + client = app.test_client() + login_session(client) + monkeypatch.setattr(auth_routes, "DEBUG_MODE", False) + + def raise_exc(_email): + raise RuntimeError("boom") + + monkeypatch.setattr(auth_routes, "unlink_discord_account", raise_exc) + + response = client.post("/dashboard/discord/unlink") + + assert response.status_code == 500 + assert response.get_json()["error"] == "Internal server error" diff --git a/tests/test_data_deletion.py b/tests/test_data_deletion.py new file mode 100644 index 0000000..5d030ff --- /dev/null +++ b/tests/test_data_deletion.py @@ -0,0 +1,79 @@ +import os + +os.environ.setdefault("SECRET_KEY", "test-secret") + +import services.data_deletion as data_deletion + + +class FakeCursor: + def __init__(self, rowcount): + self.rowcount = rowcount + + +class FakeConnection: + def __init__(self, rowcount=0): + self.rowcount = rowcount + self.committed = False + self.closed = False + + def execute(self, _query, _params): + return FakeCursor(self.rowcount) + + def commit(self): + self.committed = True + + def close(self): + self.closed = True + + +def test_delete_user_data_returns_error_when_user_not_found(monkeypatch): + monkeypatch.setattr( + data_deletion, + "get_user_data_summary", + lambda _email: {"user_found": False, "discord_linked": False}, + ) + + result = data_deletion.delete_user_data("missing@example.com") + + assert result["success"] is False + assert "User not found" in result["errors"] + + +def test_delete_user_data_deletes_opt_out_and_user_record(monkeypatch): + fake_conn = FakeConnection(rowcount=2) + deleted_user_ids = [] + + monkeypatch.setattr( + data_deletion, + "get_user_data_summary", + lambda _email: {"user_found": True, "discord_linked": False}, + ) + monkeypatch.setattr(data_deletion, "get_db_connection", lambda: fake_conn) + monkeypatch.setattr( + data_deletion, + "delete_subscriber_by_email", + lambda _email: {"success": True}, + ) + monkeypatch.setattr( + data_deletion, + "get_user_by_email", + lambda _email: {"id": "user_123", "email": "test@example.com"}, + ) + + import models.user as user_model + + monkeypatch.setattr( + user_model, "delete_user", lambda user_id: deleted_user_ids.append(user_id) + ) + + result = data_deletion.delete_user_data( + "test@example.com", include_discord=False, include_listmonk=True + ) + + assert result["success"] is True + assert result["deletion_counts"]["opt_out_tokens"] == 2 + assert result["deletion_counts"]["users"] == 1 + assert result["total_records_deleted"] == 3 + assert deleted_user_ids == ["user_123"] + assert fake_conn.committed is True + assert fake_conn.closed is True diff --git a/tests/test_data_deletion_comprehensive.py b/tests/test_data_deletion_comprehensive.py new file mode 100644 index 0000000..cead16e --- /dev/null +++ b/tests/test_data_deletion_comprehensive.py @@ -0,0 +1,431 @@ +import os + +os.environ.setdefault("SECRET_KEY", "test-secret") + +import builtins + +import services.data_deletion as data_deletion + + +class FakeCursor: + def __init__(self, rowcount=0, row=None): + self.rowcount = rowcount + self._row = row + + def fetchone(self): + return self._row + + +class FakeConnection: + def __init__(self, execute_impl): + self._execute_impl = execute_impl + self.committed = False + self.closed = False + + def execute(self, query, params): + return self._execute_impl(query, params) + + def commit(self): + self.committed = True + + def close(self): + self.closed = True + + +def test_get_user_data_summary_user_not_found(monkeypatch): + conn = FakeConnection(lambda _query, _params: FakeCursor(row=({"count": 0}))) + monkeypatch.setattr(data_deletion, "get_db_connection", lambda: conn) + monkeypatch.setattr(data_deletion, "get_user_by_email", lambda _email: None) + + summary = data_deletion.get_user_data_summary("missing@example.com") + + assert summary["user_found"] is False + assert summary["tables_with_data"] == [] + assert conn.closed is True + + +def test_get_user_data_summary_with_opt_out_tokens(monkeypatch): + conn = FakeConnection( + lambda _query, _params: FakeCursor(row={"count": 3}) + ) + monkeypatch.setattr(data_deletion, "get_db_connection", lambda: conn) + monkeypatch.setattr( + data_deletion, + "get_user_by_email", + lambda _email: {"id": "user_1", "discord_id": "123"}, + ) + + summary = data_deletion.get_user_data_summary("test@example.com") + + assert summary["user_found"] is True + assert summary["discord_linked"] is True + assert summary["opt_out_tokens"] == 3 + assert "users" in summary["tables_with_data"] + assert "opt_out_tokens" in summary["tables_with_data"] + assert conn.closed is True + + +def test_get_user_data_summary_user_with_no_opt_out_tokens(monkeypatch): + conn = FakeConnection(lambda _query, _params: FakeCursor(row={"count": 0})) + monkeypatch.setattr(data_deletion, "get_db_connection", lambda: conn) + monkeypatch.setattr( + data_deletion, + "get_user_by_email", + lambda _email: {"id": "user_1", "discord_id": None}, + ) + + summary = data_deletion.get_user_data_summary("test@example.com") + + assert summary["user_found"] is True + assert summary["opt_out_tokens"] == 0 + assert summary["tables_with_data"] == ["users"] + + +def test_remove_discord_roles_when_no_discord_account(monkeypatch): + monkeypatch.setattr( + data_deletion, "get_user_by_email", lambda _email: {"email": "test@example.com", "discord_id": None} + ) + + result = data_deletion.remove_discord_roles("test@example.com") + + assert result["success"] is False + assert result["error"] == "No Discord account linked" + + +def test_remove_discord_roles_success(monkeypatch): + monkeypatch.setattr(data_deletion, "DEBUG_MODE", False) + monkeypatch.setattr( + data_deletion, "get_user_by_email", lambda _email: {"email": "test@example.com", "discord_id": "123"} + ) + monkeypatch.setattr( + "utils.discord.remove_all_event_roles", + lambda _discord_id: {"success": True, "roles_removed": ["a"], "total_removed": 1}, + ) + + result = data_deletion.remove_discord_roles("test@example.com") + + assert result["success"] is True + assert result["total_removed"] == 1 + assert result["roles_removed"] == ["a"] + + +def test_remove_discord_roles_partial_failure(monkeypatch): + monkeypatch.setattr(data_deletion, "DEBUG_MODE", False) + monkeypatch.setattr( + data_deletion, "get_user_by_email", lambda _email: {"email": "test@example.com", "discord_id": "123"} + ) + monkeypatch.setattr( + "utils.discord.remove_all_event_roles", + lambda _discord_id: { + "success": False, + "error": "remove failed", + "roles_removed": ["a"], + "roles_failed": ["b"], + }, + ) + + result = data_deletion.remove_discord_roles("test@example.com") + + assert result["success"] is False + assert result["error"] == "remove failed" + assert result["roles_failed"] == ["b"] + + +def test_remove_discord_roles_handles_unexpected_exception(monkeypatch): + monkeypatch.setattr( + data_deletion, "get_user_by_email", lambda _email: {"email": "test@example.com", "discord_id": "123"} + ) + monkeypatch.setattr( + "utils.discord.remove_all_event_roles", + lambda _discord_id: (_ for _ in ()).throw(RuntimeError("boom")), + ) + + result = data_deletion.remove_discord_roles("test@example.com") + + assert result["success"] is False + assert "Discord role removal failed" in result["error"] + + +def test_remove_discord_roles_handles_import_error(monkeypatch): + monkeypatch.setattr( + data_deletion, "get_user_by_email", lambda _email: {"email": "test@example.com", "discord_id": "123"} + ) + + original_import = builtins.__import__ + + def fake_import(name, globals=None, locals=None, fromlist=(), level=0): + if name == "utils.discord": + raise ImportError("missing discord utilities") + return original_import(name, globals, locals, fromlist, level) + + monkeypatch.setattr(builtins, "__import__", fake_import) + + result = data_deletion.remove_discord_roles("test@example.com") + + assert result["success"] is False + assert result["error"] == "Discord utilities not available" + + +def test_remove_discord_roles_handles_user_lookup_exception(monkeypatch): + monkeypatch.setattr( + data_deletion, + "get_user_by_email", + lambda _email: (_ for _ in ()).throw(RuntimeError("lookup boom")), + ) + + result = data_deletion.remove_discord_roles("test@example.com") + + assert result["success"] is False + assert "Error accessing user data" in result["error"] + + +def test_delete_user_data_adds_discord_and_listmonk_errors(monkeypatch): + conn = FakeConnection(lambda _query, _params: FakeCursor(rowcount=0)) + + monkeypatch.setattr( + data_deletion, + "get_user_data_summary", + lambda _email: {"user_found": True, "discord_linked": True}, + ) + monkeypatch.setattr( + data_deletion, + "remove_discord_roles", + lambda _email: {"success": False, "error": "discord failed"}, + ) + monkeypatch.setattr( + data_deletion, + "delete_subscriber_by_email", + lambda _email: {"success": False, "error": "listmonk failed", "skipped": False}, + ) + monkeypatch.setattr(data_deletion, "get_db_connection", lambda: conn) + monkeypatch.setattr(data_deletion, "get_user_by_email", lambda _email: None) + + result = data_deletion.delete_user_data("test@example.com") + + assert result["success"] is False + assert "Discord: discord failed" in result["errors"] + assert "Listmonk: listmonk failed" in result["errors"] + assert conn.committed is True + assert conn.closed is True + + +def test_delete_user_data_discord_success_does_not_add_discord_error(monkeypatch): + conn = FakeConnection(lambda _query, _params: FakeCursor(rowcount=0)) + monkeypatch.setattr( + data_deletion, + "get_user_data_summary", + lambda _email: {"user_found": True, "discord_linked": True}, + ) + monkeypatch.setattr( + data_deletion, + "remove_discord_roles", + lambda _email: {"success": True, "error": None}, + ) + monkeypatch.setattr( + data_deletion, + "delete_subscriber_by_email", + lambda _email: {"success": True}, + ) + monkeypatch.setattr(data_deletion, "get_db_connection", lambda: conn) + monkeypatch.setattr(data_deletion, "get_user_by_email", lambda _email: None) + + result = data_deletion.delete_user_data("test@example.com") + + assert result["success"] is True + assert all(not err.startswith("Discord:") for err in result["errors"]) + + +def test_delete_user_data_skipped_listmonk_does_not_add_error(monkeypatch): + conn = FakeConnection(lambda _query, _params: FakeCursor(rowcount=0)) + monkeypatch.setattr( + data_deletion, + "get_user_data_summary", + lambda _email: {"user_found": True, "discord_linked": False}, + ) + monkeypatch.setattr( + data_deletion, + "delete_subscriber_by_email", + lambda _email: {"success": False, "skipped": True}, + ) + monkeypatch.setattr(data_deletion, "get_db_connection", lambda: conn) + monkeypatch.setattr(data_deletion, "get_user_by_email", lambda _email: None) + + result = data_deletion.delete_user_data("test@example.com") + + assert result["success"] is True + assert result["errors"] == [] + + +def test_delete_user_data_with_listmonk_disabled_skips_external_delete(monkeypatch): + conn = FakeConnection(lambda _query, _params: FakeCursor(rowcount=0)) + monkeypatch.setattr( + data_deletion, + "get_user_data_summary", + lambda _email: {"user_found": True, "discord_linked": False}, + ) + monkeypatch.setattr(data_deletion, "get_db_connection", lambda: conn) + monkeypatch.setattr(data_deletion, "get_user_by_email", lambda _email: None) + + result = data_deletion.delete_user_data( + "test@example.com", include_discord=False, include_listmonk=False + ) + + assert result["success"] is True + assert result["listmonk_result"] is None + + +def test_delete_user_data_handles_sqlite_delete_error(monkeypatch): + def execute_impl(_query, _params): + raise RuntimeError("sqlite failed") + + conn = FakeConnection(execute_impl) + monkeypatch.setattr( + data_deletion, + "get_user_data_summary", + lambda _email: {"user_found": True, "discord_linked": False}, + ) + monkeypatch.setattr( + data_deletion, + "delete_subscriber_by_email", + lambda _email: {"success": True}, + ) + monkeypatch.setattr(data_deletion, "get_db_connection", lambda: conn) + monkeypatch.setattr(data_deletion, "get_user_by_email", lambda _email: None) + + result = data_deletion.delete_user_data("test@example.com") + + assert result["success"] is False + assert any("Error deleting from opt_out_tokens" in err for err in result["errors"]) + + +def test_delete_user_data_handles_teable_delete_error(monkeypatch): + conn = FakeConnection(lambda _query, _params: FakeCursor(rowcount=0)) + monkeypatch.setattr( + data_deletion, + "get_user_data_summary", + lambda _email: {"user_found": True, "discord_linked": False}, + ) + monkeypatch.setattr( + data_deletion, + "delete_subscriber_by_email", + lambda _email: {"success": True}, + ) + monkeypatch.setattr(data_deletion, "get_db_connection", lambda: conn) + monkeypatch.setattr( + data_deletion, "get_user_by_email", lambda _email: {"id": "user_1", "email": "test@example.com"} + ) + + def raise_delete(_user_id): + raise RuntimeError("teable failed") + + monkeypatch.setattr("models.user.delete_user", raise_delete) + + result = data_deletion.delete_user_data("test@example.com") + + assert result["success"] is False + assert any("Error deleting from Teable users" in err for err in result["errors"]) + + +def test_delete_user_data_outer_exception_is_captured(monkeypatch): + monkeypatch.setattr( + data_deletion, + "get_user_data_summary", + lambda _email: (_ for _ in ()).throw(RuntimeError("critical")), + ) + + result = data_deletion.delete_user_data("test@example.com") + + assert result["success"] is False + assert any("Critical error during data deletion" in err for err in result["errors"]) + + +def test_verify_user_deletion_when_fully_deleted(monkeypatch): + conn = FakeConnection(lambda _query, _params: FakeCursor(row={"count": 0})) + monkeypatch.setattr(data_deletion, "get_db_connection", lambda: conn) + monkeypatch.setattr(data_deletion, "get_user_by_email", lambda _email: None) + + verification = data_deletion.verify_user_deletion("test@example.com") + + assert verification["completely_deleted"] is True + assert verification["remaining_data"] == {} + assert "opt_out_tokens" in verification["tables_checked"] + assert "users" in verification["tables_checked"] + assert conn.closed is True + + +def test_verify_user_deletion_detects_remaining_data(monkeypatch): + conn = FakeConnection(lambda _query, _params: FakeCursor(row={"count": 2})) + monkeypatch.setattr(data_deletion, "get_db_connection", lambda: conn) + monkeypatch.setattr(data_deletion, "get_user_by_email", lambda _email: {"id": "user_1"}) + + verification = data_deletion.verify_user_deletion("test@example.com") + + assert verification["completely_deleted"] is False + assert verification["remaining_data"]["opt_out_tokens"] == 2 + assert verification["remaining_data"]["users"] == 1 + + +def test_verify_user_deletion_handles_user_lookup_exception(monkeypatch): + conn = FakeConnection(lambda _query, _params: FakeCursor(row={"count": 0})) + monkeypatch.setattr(data_deletion, "get_db_connection", lambda: conn) + monkeypatch.setattr( + data_deletion, + "get_user_by_email", + lambda _email: (_ for _ in ()).throw(RuntimeError("teable lookup failed")), + ) + + verification = data_deletion.verify_user_deletion("test@example.com") + + assert verification["completely_deleted"] is True + assert "users" not in verification["remaining_data"] + + +def test_verify_user_deletion_continues_when_sqlite_check_fails(monkeypatch): + def execute_impl(_query, _params): + raise RuntimeError("query failed") + + conn = FakeConnection(execute_impl) + monkeypatch.setattr(data_deletion, "get_db_connection", lambda: conn) + monkeypatch.setattr(data_deletion, "get_user_by_email", lambda _email: None) + + verification = data_deletion.verify_user_deletion("test@example.com") + + assert verification["completely_deleted"] is True + assert verification["remaining_data"] == {} + + +def test_get_deletion_preview_user_not_found(monkeypatch): + monkeypatch.setattr(data_deletion, "get_user_data_summary", lambda _email: {"user_found": False}) + + preview = data_deletion.get_deletion_preview("missing@example.com") + + assert preview["user_found"] is False + assert "No account found" in preview["message"] + + +def test_get_deletion_preview_for_discord_linked_user(monkeypatch): + monkeypatch.setattr( + data_deletion, + "get_user_data_summary", + lambda _email: {"user_found": True, "discord_linked": True}, + ) + + preview = data_deletion.get_deletion_preview("test@example.com") + + assert preview["user_found"] is True + assert preview["discord_warning"] is True + assert any("Discord verification status and roles" in item for item in preview["items_to_delete"]) + + +def test_get_deletion_preview_for_non_discord_user(monkeypatch): + monkeypatch.setattr( + data_deletion, + "get_user_data_summary", + lambda _email: {"user_found": True, "discord_linked": False}, + ) + + preview = data_deletion.get_deletion_preview("test@example.com") + + assert preview["user_found"] is True + assert preview["discord_warning"] is False + assert all("Discord verification status and roles" not in item for item in preview["items_to_delete"]) diff --git a/tests/test_oauth_routes.py b/tests/test_oauth_routes.py new file mode 100644 index 0000000..28ffb9e --- /dev/null +++ b/tests/test_oauth_routes.py @@ -0,0 +1,198 @@ +import os +from pathlib import Path + +os.environ.setdefault("SECRET_KEY", "test-secret") + +from flask import Flask +import routes.auth as auth_routes + + +def create_oauth_app(): + template_dir = Path(__file__).resolve().parents[1] / "templates" + app = Flask(__name__, template_folder=str(template_dir)) + app.secret_key = "test-secret" + app.jinja_env.globals["csrf_token"] = lambda: "test-csrf" + app.register_blueprint(auth_routes.auth_bp) + app.register_blueprint(auth_routes.oauth_bp) + return app + + +def test_oauth_token_rejects_unsupported_grant_type(): + app = create_oauth_app() + client = app.test_client() + + response = client.post( + "/oauth/token", + data={ + "grant_type": "client_credentials", + "code": "abc", + "redirect_uri": "https://example.com/callback", + "client_id": "client_1", + "client_secret": "secret_1", + }, + ) + + assert response.status_code == 400 + payload = response.get_json() + assert payload["error"] == "unsupported_grant_type" + + +def test_oauth_token_requires_all_parameters(): + app = create_oauth_app() + client = app.test_client() + + response = client.post("/oauth/token", data={"grant_type": "authorization_code"}) + + assert response.status_code == 400 + payload = response.get_json() + assert payload["error"] == "invalid_request" + + +def test_oauth_token_returns_access_token_payload(monkeypatch): + app = create_oauth_app() + client = app.test_client() + + monkeypatch.setattr( + auth_routes, + "exchange_code_for_token", + lambda code, client_id, client_secret, redirect_uri: { + "success": True, + "access_token": "access_123", + "token_type": "Bearer", + "expires_in": 3600, + "scope": "profile email", + }, + ) + + response = client.post( + "/oauth/token", + data={ + "grant_type": "authorization_code", + "code": "code_123", + "redirect_uri": "https://example.com/callback", + "client_id": "client_1", + "client_secret": "secret_1", + }, + ) + + assert response.status_code == 200 + payload = response.get_json() + assert payload["access_token"] == "access_123" + assert payload["token_type"] == "Bearer" + + +def test_oauth_token_maps_invalid_grant_error(monkeypatch): + app = create_oauth_app() + client = app.test_client() + + monkeypatch.setattr( + auth_routes, + "exchange_code_for_token", + lambda code, client_id, client_secret, redirect_uri: { + "success": False, + "error": "invalid_grant", + }, + ) + + response = client.post( + "/oauth/token", + data={ + "grant_type": "authorization_code", + "code": "expired_code", + "redirect_uri": "https://example.com/callback", + "client_id": "client_1", + "client_secret": "secret_1", + }, + ) + + assert response.status_code == 400 + payload = response.get_json() + assert payload["error"] == "invalid_grant" + assert "expired" in payload["error_description"] + + +def test_oauth_revoke_requires_token(): + app = create_oauth_app() + client = app.test_client() + + response = client.post("/oauth/revoke", data={}) + + assert response.status_code == 400 + assert response.get_json()["error"] == "invalid_request" + + +def test_oauth_revoke_returns_success_even_for_invalid_tokens(monkeypatch): + app = create_oauth_app() + client = app.test_client() + + monkeypatch.setattr(auth_routes, "revoke_access_token", lambda _token: False) + + response = client.post("/oauth/revoke", data={"token": "already-invalid"}) + + assert response.status_code == 200 + assert response.get_json()["success"] is True + + +def test_oauth_legacy_requires_redirect_parameter(): + app = create_oauth_app() + client = app.test_client() + + response = client.get("/oauth") + + assert response.status_code == 200 + assert b"Missing redirect parameter" in response.data + + +def test_oauth_legacy_rejects_unregistered_redirect(monkeypatch): + app = create_oauth_app() + client = app.test_client() + + monkeypatch.setattr(auth_routes, "validate_app_redirect", lambda _redirect: None) + + response = client.get("/oauth?redirect=https://example.com/callback") + + assert response.status_code == 200 + assert b"Invalid redirect URL" in response.data + + +def test_oauth_legacy_logged_in_user_redirects_with_token(monkeypatch): + app = create_oauth_app() + client = app.test_client() + + monkeypatch.setattr( + auth_routes, + "validate_app_redirect", + lambda _redirect: {"id": "app_1", "is_active": True, "allow_anyone": True}, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "test@example.com", "legal_name": "Test User"}, + ) + monkeypatch.setattr(auth_routes, "create_oauth_token", lambda _email, expires_in_seconds: "oauth_token_123") + + with client.session_transaction() as sess: + sess["user_email"] = "test@example.com" + + response = client.get("/oauth?redirect=https://example.com/callback") + + assert response.status_code == 302 + assert response.location == "https://example.com/callback?token=oauth_token_123" + + +def test_logout_clears_session_and_redirects_home(): + app = create_oauth_app() + client = app.test_client() + + with client.session_transaction() as sess: + sess["user_email"] = "test@example.com" + sess["user_name"] = "Tester" + + response = client.get("/logout") + + assert response.status_code == 302 + assert response.location.endswith("/") + + with client.session_transaction() as sess: + assert "user_email" not in sess + assert "user_name" not in sess diff --git a/tests/test_playwright_auth.py b/tests/test_playwright_auth.py new file mode 100644 index 0000000..73f30db --- /dev/null +++ b/tests/test_playwright_auth.py @@ -0,0 +1,103 @@ +import os +from pathlib import Path +from threading import Thread + +os.environ.setdefault("SECRET_KEY", "test-secret") + +import pytest +from flask import Flask, session +from playwright.sync_api import sync_playwright +from werkzeug.serving import make_server + +import routes.auth as auth_routes + + +@pytest.fixture +def auth_server(monkeypatch): + """Run a minimal auth app with deterministic mocked integrations.""" + template_dir = Path(__file__).resolve().parents[1] / "templates" + app = Flask(__name__, template_folder=str(template_dir)) + app.secret_key = "test-secret" + app.jinja_env.globals["csrf_token"] = lambda: "test-csrf" + + monkeypatch.setattr( + auth_routes, + "validate_app_redirect", + lambda _redirect: {"id": "app_1", "is_active": True, "allow_anyone": True}, + ) + monkeypatch.setattr( + auth_routes, + "get_user_by_email", + lambda _email: {"email": "playwright@example.com", "legal_name": "Playwright User"}, + ) + monkeypatch.setattr( + auth_routes, + "create_oauth_token", + lambda _email, expires_in_seconds: "pw_oauth_token", + ) + monkeypatch.setattr(auth_routes, "send_email_verification", lambda _email: True) + + app.register_blueprint(auth_routes.auth_bp) + + @app.get("/_test/login") + def _test_login(): + session["user_email"] = "playwright@example.com" + return "ok", 200 + + server = make_server("127.0.0.1", 0, app) + port = server.server_port + thread = Thread(target=server.serve_forever, daemon=True) + thread.start() + + try: + yield f"http://127.0.0.1:{port}" + finally: + server.shutdown() + thread.join(timeout=2) + + +@pytest.fixture +def pw_request(): + """Playwright API request context (no browser binary required).""" + with sync_playwright() as playwright: + request_context = playwright.request.new_context(ignore_https_errors=True) + try: + yield request_context + finally: + request_context.dispose() + + +def test_playwright_login_page_renders(auth_server, pw_request): + response = pw_request.get(f"{auth_server}/") + + assert response.status == 200 + assert "Sign in" in response.text() + + +def test_playwright_send_code_json_flow(auth_server, pw_request): + response = pw_request.post( + f"{auth_server}/send-code", + data={"email": "playwright@example.com"}, + ) + + assert response.status == 200 + payload = response.json() + assert payload["success"] is True + assert "Magic link sent" in payload["message"] + + +def test_playwright_oauth_login_and_redirect_flow(auth_server, pw_request): + oauth_url = ( + f"{auth_server}/oauth?redirect=https://example.com/callback" + ) + + unauthenticated = pw_request.get(oauth_url) + assert unauthenticated.status == 200 + assert "Sign in" in unauthenticated.text() + + login_response = pw_request.get(f"{auth_server}/_test/login") + assert login_response.status == 200 + + authenticated = pw_request.get(oauth_url, max_redirects=0) + assert authenticated.status == 302 + assert authenticated.headers["location"] == "https://example.com/callback?token=pw_oauth_token"