Implement Google OAuth2 with Swagger UI integration#43
Conversation
Signed-off-by: Shounak Dey <shounakdey@ymail.com>
Signed-off-by: Shounak Dey <shounakdey@ymail.com>
Add OAuth2 authentication alongside existing basic auth using Google OIDC provider. Key changes: - Add OAuth utilities with OIDC user model and token validation - Split basic auth into separate module - Update auth.py to support both basic and OAuth authentication - Add required dependencies Signed-off-by: Shounak Dey <shounakdey@ymail.com>
Signed-off-by: Shounak Dey <shounakdey@ymail.com>
|
@sourcery-ai review |
Reviewer's GuideThis PR integrates Google OAuth2 (OIDC) alongside the existing HTTP Basic authentication by refactoring and splitting the auth utilities, extending the database schema and models for OAuth support, configuring Swagger UI and session middleware for OAuth flows, and updating tests and dependencies to cover the new authentication scenarios. Sequence diagram for dual authentication (Basic and OAuth) in credential verificationsequenceDiagram
participant API
participant BasicAuth
participant OAuth
participant DB
API->>BasicAuth: Try basic auth (verify_basic_customer_cred)
alt Basic auth success
BasicAuth->>DB: Query customer by email
DB-->>BasicAuth: Customer record
BasicAuth-->>API: Return customer
else Basic auth fails
API->>OAuth: Try OAuth (verify_oauth_customer_cred)
OAuth->>DB: Query customer by email
alt Customer exists
DB-->>OAuth: Customer record
OAuth->>DB: Update with OAuth info
DB-->>OAuth: Updated record
OAuth-->>API: Return customer
else Customer does not exist
OAuth->>DB: Create new customer with OAuth info
DB-->>OAuth: New customer record
OAuth-->>API: Return customer
end
end
API-->>Client: Authenticated customer or error
ER diagram for updated Business and Customer tables with OAuth fieldserDiagram
BUSINESS {
INTEGER id PK
STRING email_address
TEXT password
STRING business_name
TEXT address_line_1
TEXT address_line_2
TEXT city
TEXT state
BOOLEAN is_verified
STRING oauth_provider
STRING oauth_id
STRING oauth_email
BOOLEAN created_via_oauth
}
CUSTOMER {
INTEGER id PK
STRING email_address
TEXT password
STRING full_name
TEXT address_line_1
TEXT address_line_2
TEXT city
TEXT state
BOOLEAN is_verified
STRING oauth_provider
STRING oauth_id
STRING oauth_email
BOOLEAN created_via_oauth
}
BUSINESS ||--o{ PRODUCT : offers
CUSTOMER ||--o{ ORDER : places
Class diagram for new OIDCUser and updated Business/Customer modelsclassDiagram
class OIDCUser {
+str email
+str|None name
+str sub
+from_userinfo(userinfo: UserInfo) OIDCUser
}
class Business {
+int id
+str email
+str|None password
+str name
+str|None addr_line_1
+str|None addr_line_2
+str|None city
+str|None state
+bool is_verified
+str|None oauth_provider
+str|None oauth_id
+str|None oauth_email
+bool created_via_oauth
+products
}
class Customer {
+int id
+str email
+str|None password
+str name
+str|None addr_line_1
+str|None addr_line_2
+str|None city
+str|None state
+bool is_verified
+str|None oauth_provider
+str|None oauth_id
+str|None oauth_email
+bool created_via_oauth
+orders
}
OIDCUser <.. Business : uses
OIDCUser <.. Customer : uses
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey @sdglitched - I've reviewed your changes - here's some feedback:
- verify_oauth_customer_cred and verify_oauth_business_cred share nearly identical logic; consider extracting the common flow into a helper to reduce duplication and simplify maintenance.
- Avoid exposing your Google client secret in swagger_ui_init_oauth and using it as the session middleware secret—use a distinct server-side secret for sessions and only expose the client ID in the Swagger config.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- verify_oauth_customer_cred and verify_oauth_business_cred share nearly identical logic; consider extracting the common flow into a helper to reduce duplication and simplify maintenance.
- Avoid exposing your Google client secret in swagger_ui_init_oauth and using it as the session middleware secret—use a distinct server-side secret for sessions and only expose the client ID in the Swagger config.
## Individual Comments
### Comment 1
<location> `fastapi_ecom/utils/basic_auth.py:37` </location>
<code_context>
+
+ if not customer_by_email:
+ return None
+ elif not bcrypt.checkpw(credentials.password.encode('utf-8'), customer_by_email.password.encode('utf-8')):
+ return None
+ else:
</code_context>
<issue_to_address>
No check for None password before encoding in basic auth verification.
Add a check to ensure customer_by_email.password is not None before encoding, to prevent exceptions for users created via OAuth.
</issue_to_address>
### Comment 2
<location> `fastapi_ecom/utils/basic_auth.py:63` </location>
<code_context>
+
+ if not business_by_email:
+ return None
+ elif not bcrypt.checkpw(credentials.password.encode('utf-8'), business_by_email.password.encode('utf-8')):
+ return None
+ else:
</code_context>
<issue_to_address>
No check for None password before encoding in business basic auth verification.
If business_by_email.password is None, encoding will fail. Please add a check to ensure it is not None before encoding.
</issue_to_address>
### Comment 3
<location> `fastapi_ecom/app.py:28` </location>
<code_context>
+ }
)
+app.add_middleware(SessionMiddleware, secret_key=config.GOOGLE_CLIENT_SECRET)
+
PREFIX = "/api/v1"
</code_context>
<issue_to_address>
Using the OAuth client secret as the session middleware secret key may be risky.
Use a unique, randomly generated secret for the session middleware to avoid increasing security risks from secret reuse.
</issue_to_address>
### Comment 4
<location> `tests/business/test_business_get.py:146` </location>
<code_context>
+async def test_get_business_me_oauth(
</code_context>
<issue_to_address>
Consider adding a test case for an OIDC user with missing or malformed fields.
Add a test where OIDC user info is missing required fields or contains malformed data to verify proper handling by the authentication logic.
Suggested implementation:
```python
pytest.param(
{
"email": "delete@example.com",
"name": "delete user",
"sub": "delete_sub"
},
id="BUSINESS GET Endpoint - Fetch email of the `delete` authenticated business with oidc"
),
pytest.param(
{
# Missing 'email' and 'sub', malformed 'name'
"name": 12345
},
id="BUSINESS GET Endpoint - OIDC user with missing/malformed fields"
)
]
)
```
```python
async def test_get_business_me_oauth(
client: AsyncClient,
db_test_create: None,
db_test_data: None,
mock_oidc_user: dict,
mocker: MockerFixture,
) -> None:
"""
Test the `get` endpoint for the currently authenticated business of the Business API.
:param client: The test client to send HTTP requests.
"""
# If the OIDC user info is missing required fields or contains malformed data,
# verify that the authentication logic handles it properly.
if "email" not in mock_oidc_user or "sub" not in mock_oidc_user or not isinstance(mock_oidc_user.get("name", ""), str):
response = await client.get("/business/me")
assert response.status_code in (400, 401, 422)
# Optionally, check for error message in response
assert "error" in response.json() or "detail" in response.json()
return
# Existing test logic for valid OIDC user info goes here...
```
</issue_to_address>
### Comment 5
<location> `tests/business/test_business_get.py:195` </location>
<code_context>
+async def test_get_business_me_oauth_token_issue(
</code_context>
<issue_to_address>
Add a test for expired or revoked OAuth tokens.
Please add a test that mocks the OAuth provider to simulate an expired or revoked token and verifies the correct error response.
Suggested implementation:
```python
@pytest.mark.parametrize(
"token",
[
pytest.param("Bearer", id="BUSINESS GET Endpoint - Invalid Bearer token length"),
pytest.param("Dummy token", id="BUSINESS GET Endpoint - Dummy Bearer token")
]
)
async def test_get_business_me_oauth_token_issue(
client: AsyncClient,
db_test_create: None,
token: str,
mocker: MockerFixture,
) -> None:
"""
Test the `get` endpoint for the currently authenticated business of the Business API.
:param client: The test client to send HTTP requests.
:param db_test_create: Fixture which creates a test database.
@pytest.mark.asyncio
async def test_get_business_me_oauth_token_expired_or_revoked(
client: AsyncClient,
db_test_create: None,
mocker: MockerFixture,
) -> None:
"""
Test the `get` endpoint for the currently authenticated business with an expired or revoked OAuth token.
This test mocks the OAuth provider to simulate an expired or revoked token and verifies the correct error response.
"""
# Simulate an expired/revoked token
expired_token = "Bearer expired_or_revoked_token"
# Mock the OAuth provider's introspection/validation endpoint to return invalid token
mocker.patch(
"path.to.oauth_validation_function", # Replace with actual import path
return_value=False # Simulate token is invalid
)
response = await client.get(
"/api/business/me",
headers={"Authorization": expired_token}
)
assert response.status_code == 401
assert "invalid" in response.text.lower() or "expired" in response.text.lower() or "revoked" in response.text.lower()
```
- Replace `"path.to.oauth_validation_function"` with the actual import path or function used in your codebase to validate OAuth tokens.
- If your codebase uses a different error message or status code for expired/revoked tokens, adjust the assertions accordingly.
- Ensure that the test client and endpoint (`/api/business/me`) match your actual API.
</issue_to_address>
### Comment 6
<location> `tests/customer/test_customer_get.py:146` </location>
<code_context>
+async def test_get_customer_me_oauth(
</code_context>
<issue_to_address>
Consider adding a test for OIDC users with duplicate emails in both basic and OAuth records.
Please add a test to ensure that when a user exists in both basic and OAuth records with the same email, the correct record is updated and returned, and duplicates are not created.
Suggested implementation:
```python
async def test_get_customer_me_oauth(
client: AsyncClient,
db_test_create: None,
db_test_data: None,
mock_oidc_user,
mocker: MockerFixture,
) -> None:
"""
Test the `get` endpoint for the currently authenticated customer of the Business API.
:param client: The test client to send HTTP requests.
"""
@pytest.mark.asyncio
async def test_get_customer_me_oidc_duplicate_email(
client: AsyncClient,
db_test_create: None,
db_test_data: None,
mock_oidc_user,
mocker: MockerFixture,
) -> None:
"""
Test that when a user exists in both basic and OAuth records with the same email,
the correct record is updated and returned, and duplicates are not created.
"""
# Setup: create a basic user and an OAuth user with the same email
email = "duplicate@example.com"
basic_user_data = {
"email": email,
"name": "Basic User",
"sub": None
}
oauth_user_data = {
"email": email,
"name": "OAuth User",
"sub": "oauth_sub_123"
}
# Insert basic user
await db_test_create("customer", basic_user_data)
# Insert OAuth user
await db_test_create("customer", oauth_user_data)
# Mock OIDC user to authenticate as the OAuth user
mock_oidc_user.return_value = oauth_user_data
# Make request to /customer/me endpoint
response = await client.get("/customer/me")
assert response.status_code == 200
data = response.json()
# Ensure the returned record is the OAuth user
assert data["email"] == email
assert data["sub"] == "oauth_sub_123"
assert data["name"] == "OAuth User"
# Ensure no duplicate customer records exist for the same email
# (Assuming db_test_data returns all customers)
all_customers = await db_test_data("customer")
emails = [c["email"] for c in all_customers]
assert emails.count(email) == 1
```
- If your test database setup or helper functions (`db_test_create`, `db_test_data`) differ, you may need to adjust the test setup and assertions accordingly.
- Ensure that the `/customer/me` endpoint logic merges or updates records as expected for OIDC users with duplicate emails.
</issue_to_address>
### Comment 7
<location> `tests/customer/test_customer_get.py:195` </location>
<code_context>
+async def test_get_customer_me_oauth_token_issue(
</code_context>
<issue_to_address>
Add a test for OAuth tokens with incorrect type (e.g., not 'Bearer').
Please include a test case for tokens with an incorrect type to verify they are properly rejected.
Suggested implementation:
```python
@pytest.mark.parametrize(
"token",
[
pytest.param("Bearer", id="CUSTOMER GET Endpoint - Invalid Bearer token length"),
pytest.param("Dummy token", id="CUSTOMER GET Endpoint - Dummy Bearer token"),
pytest.param("Basic abcdef12345", id="CUSTOMER GET Endpoint - Incorrect token type"),
]
)
```
Make sure that the test implementation for `test_get_customer_me_oauth_token_issue` asserts the correct rejection behavior (e.g., status code 401 or 403) for tokens with incorrect types. If the test currently only checks for "Bearer" tokens, you may need to update the assertion logic to handle this new case.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
|
||
| if not customer_by_email: | ||
| return None | ||
| elif not bcrypt.checkpw(credentials.password.encode('utf-8'), customer_by_email.password.encode('utf-8')): |
There was a problem hiding this comment.
issue (bug_risk): No check for None password before encoding in basic auth verification.
Add a check to ensure customer_by_email.password is not None before encoding, to prevent exceptions for users created via OAuth.
|
|
||
| if not business_by_email: | ||
| return None | ||
| elif not bcrypt.checkpw(credentials.password.encode('utf-8'), business_by_email.password.encode('utf-8')): |
There was a problem hiding this comment.
issue (bug_risk): No check for None password before encoding in business basic auth verification.
If business_by_email.password is None, encoding will fail. Please add a check to ensure it is not None before encoding.
| } | ||
| ) | ||
|
|
||
| app.add_middleware(SessionMiddleware, secret_key=config.GOOGLE_CLIENT_SECRET) |
There was a problem hiding this comment.
🚨 issue (security): Using the OAuth client secret as the session middleware secret key may be risky.
Use a unique, randomly generated secret for the session middleware to avoid increasing security risks from secret reuse.
| async def test_get_business_me_oauth( | ||
| client: AsyncClient, | ||
| db_test_create: None, | ||
| db_test_data: None, | ||
| mock_oidc_user: dict, | ||
| mocker: MockerFixture, | ||
| ) -> None: | ||
| """ | ||
| Test the `get` endpoint for the currently authenticated business of the Business API. | ||
|
|
There was a problem hiding this comment.
suggestion (testing): Consider adding a test case for an OIDC user with missing or malformed fields.
Add a test where OIDC user info is missing required fields or contains malformed data to verify proper handling by the authentication logic.
Suggested implementation:
pytest.param(
{
"email": "delete@example.com",
"name": "delete user",
"sub": "delete_sub"
},
id="BUSINESS GET Endpoint - Fetch email of the `delete` authenticated business with oidc"
),
pytest.param(
{
# Missing 'email' and 'sub', malformed 'name'
"name": 12345
},
id="BUSINESS GET Endpoint - OIDC user with missing/malformed fields"
)
]
)async def test_get_business_me_oauth(
client: AsyncClient,
db_test_create: None,
db_test_data: None,
mock_oidc_user: dict,
mocker: MockerFixture,
) -> None:
"""
Test the `get` endpoint for the currently authenticated business of the Business API.
:param client: The test client to send HTTP requests.
"""
# If the OIDC user info is missing required fields or contains malformed data,
# verify that the authentication logic handles it properly.
if "email" not in mock_oidc_user or "sub" not in mock_oidc_user or not isinstance(mock_oidc_user.get("name", ""), str):
response = await client.get("/business/me")
assert response.status_code in (400, 401, 422)
# Optionally, check for error message in response
assert "error" in response.json() or "detail" in response.json()
return
# Existing test logic for valid OIDC user info goes here...| """ | ||
| assert response.status_code == 401 | ||
| assert response.json()["detail"] == "Invalid authentication credentials" | ||
| assert response.json()["detail"] == "Not Authenticated" |
There was a problem hiding this comment.
suggestion (testing): Add a test for expired or revoked OAuth tokens.
Please add a test that mocks the OAuth provider to simulate an expired or revoked token and verifies the correct error response.
Suggested implementation:
@pytest.mark.parametrize(
"token",
[
pytest.param("Bearer", id="BUSINESS GET Endpoint - Invalid Bearer token length"),
pytest.param("Dummy token", id="BUSINESS GET Endpoint - Dummy Bearer token")
]
)
async def test_get_business_me_oauth_token_issue(
client: AsyncClient,
db_test_create: None,
token: str,
mocker: MockerFixture,
) -> None:
"""
Test the `get` endpoint for the currently authenticated business of the Business API.
:param client: The test client to send HTTP requests.
:param db_test_create: Fixture which creates a test database.
@pytest.mark.asyncio
async def test_get_business_me_oauth_token_expired_or_revoked(
client: AsyncClient,
db_test_create: None,
mocker: MockerFixture,
) -> None:
"""
Test the `get` endpoint for the currently authenticated business with an expired or revoked OAuth token.
This test mocks the OAuth provider to simulate an expired or revoked token and verifies the correct error response.
"""
# Simulate an expired/revoked token
expired_token = "Bearer expired_or_revoked_token"
# Mock the OAuth provider's introspection/validation endpoint to return invalid token
mocker.patch(
"path.to.oauth_validation_function", # Replace with actual import path
return_value=False # Simulate token is invalid
)
response = await client.get(
"/api/business/me",
headers={"Authorization": expired_token}
)
assert response.status_code == 401
assert "invalid" in response.text.lower() or "expired" in response.text.lower() or "revoked" in response.text.lower()- Replace
"path.to.oauth_validation_function"with the actual import path or function used in your codebase to validate OAuth tokens. - If your codebase uses a different error message or status code for expired/revoked tokens, adjust the assertions accordingly.
- Ensure that the test client and endpoint (
/api/business/me) match your actual API.
| """ | ||
| assert response.status_code == 401 | ||
| assert response.json()["detail"] == "Invalid authentication credentials" | ||
| assert response.json()["detail"] == "Not Authenticated" |
There was a problem hiding this comment.
suggestion (testing): Add a test for OAuth tokens with incorrect type (e.g., not 'Bearer').
Please include a test case for tokens with an incorrect type to verify they are properly rejected.
Suggested implementation:
@pytest.mark.parametrize(
"token",
[
pytest.param("Bearer", id="CUSTOMER GET Endpoint - Invalid Bearer token length"),
pytest.param("Dummy token", id="CUSTOMER GET Endpoint - Dummy Bearer token"),
pytest.param("Basic abcdef12345", id="CUSTOMER GET Endpoint - Incorrect token type"),
]
)Make sure that the test implementation for test_get_customer_me_oauth_token_issue asserts the correct rejection behavior (e.g., status code 401 or 403) for tokens with incorrect types. If the test currently only checks for "Bearer" tokens, you may need to update the assertion logic to handle this new case.
|
|
||
| def upgrade() -> None: | ||
| # ### commands auto generated by Alembic - please adjust! ### | ||
| op.add_column('businesses', sa.Column('oauth_provider', sa.String(length=50), nullable=True)) |
There was a problem hiding this comment.
issue (code-quality): Extract duplicate code into function (extract-duplicate-method)
|
|
||
| def downgrade() -> None: | ||
| # ### commands auto generated by Alembic - please adjust! ### | ||
| op.alter_column('customers', 'state', |
There was a problem hiding this comment.
issue (code-quality): Extract duplicate code into function (extract-duplicate-method)
| email = oidc.email, | ||
| name = oidc.name, | ||
| uuid = uuid4().hex[0:8], | ||
| is_verified = True, # OAuth emails are pre-verified | ||
| oauth_provider = "google", | ||
| oauth_id = oidc.sub, | ||
| oauth_email = oidc.email, | ||
| created_via_oauth = True |
There was a problem hiding this comment.
suggestion (code-quality): Replace a[0:x] with a[:x] and a[x:len(a)] with a[x:] (remove-redundant-slice-index)
| email = oidc.email, | |
| name = oidc.name, | |
| uuid = uuid4().hex[0:8], | |
| is_verified = True, # OAuth emails are pre-verified | |
| oauth_provider = "google", | |
| oauth_id = oidc.sub, | |
| oauth_email = oidc.email, | |
| created_via_oauth = True | |
| email=oidc.email, | |
| name=oidc.name, | |
| uuid=uuid4().hex[:8], | |
| is_verified=True, | |
| oauth_provider="google", | |
| oauth_id=oidc.sub, | |
| oauth_email=oidc.email, | |
| created_via_oauth=True, |
| email = oidc.email, | ||
| name = oidc.name, | ||
| uuid = uuid4().hex[0:8], | ||
| is_verified = True, # OAuth emails are pre-verified | ||
| oauth_provider = "google", | ||
| oauth_id = oidc.sub, | ||
| oauth_email = oidc.email, | ||
| created_via_oauth = True |
There was a problem hiding this comment.
suggestion (code-quality): Replace a[0:x] with a[:x] and a[x:len(a)] with a[x:] (remove-redundant-slice-index)
| email = oidc.email, | |
| name = oidc.name, | |
| uuid = uuid4().hex[0:8], | |
| is_verified = True, # OAuth emails are pre-verified | |
| oauth_provider = "google", | |
| oauth_id = oidc.sub, | |
| oauth_email = oidc.email, | |
| created_via_oauth = True | |
| email=oidc.email, | |
| name=oidc.name, | |
| uuid=uuid4().hex[:8], | |
| is_verified=True, | |
| oauth_provider="google", | |
| oauth_id=oidc.sub, | |
| oauth_email=oidc.email, | |
| created_via_oauth=True, |
This PR adds OAuth2 authentication alongside existing basic auth using Google OIDC provider.
Key changes:
Fixes: #42
Summary by Sourcery
Implement Google OAuth2 integration alongside basic authentication, including schema updates, utility modules, and tests for OAuth flows
New Features:
Enhancements:
Build:
Tests: