-
Notifications
You must be signed in to change notification settings - Fork 0
Implement Google OAuth2 with Swagger UI integration #43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
af95e0a
4dfc113
1bf8034
32ab1f7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| """add oauth related fields | ||
|
|
||
| Revision ID: 02d907ef8e5b | ||
| Revises: 60c8ccec25ac | ||
| Create Date: 2025-07-27 19:45:21.800069 | ||
|
|
||
| """ | ||
| from collections.abc import Sequence | ||
| from typing import Union | ||
|
|
||
| import sqlalchemy as sa | ||
| from alembic import op | ||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision: str = '02d907ef8e5b' | ||
| down_revision: Union[str, None] = '60c8ccec25ac' | ||
| branch_labels: Union[str, Sequence[str], None] = None | ||
| depends_on: Union[str, Sequence[str], None] = None | ||
|
|
||
|
|
||
| def upgrade() -> None: | ||
| # ### commands auto generated by Alembic - please adjust! ### | ||
| op.add_column('businesses', sa.Column('oauth_provider', sa.String(length=50), nullable=True)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (code-quality): Extract duplicate code into function ( |
||
| op.add_column('businesses', sa.Column('oauth_id', sa.String(length=100), nullable=True)) | ||
| op.add_column('businesses', sa.Column('oauth_email', sa.String(length=100), nullable=True)) | ||
| op.add_column('businesses', sa.Column('created_via_oauth', sa.Boolean(), nullable=True)) | ||
| op.alter_column('businesses', 'password', | ||
| existing_type=sa.TEXT(), | ||
| nullable=True) | ||
| op.alter_column('businesses', 'address_line_1', | ||
| existing_type=sa.TEXT(), | ||
| nullable=True) | ||
| op.alter_column('businesses', 'city', | ||
| existing_type=sa.TEXT(), | ||
| nullable=True) | ||
| op.alter_column('businesses', 'state', | ||
| existing_type=sa.TEXT(), | ||
| nullable=True) | ||
| op.add_column('customers', sa.Column('oauth_provider', sa.String(length=50), nullable=True)) | ||
| op.add_column('customers', sa.Column('oauth_id', sa.String(length=100), nullable=True)) | ||
| op.add_column('customers', sa.Column('oauth_email', sa.String(length=100), nullable=True)) | ||
| op.add_column('customers', sa.Column('created_via_oauth', sa.Boolean(), nullable=True)) | ||
| op.alter_column('customers', 'password', | ||
| existing_type=sa.TEXT(), | ||
| nullable=True) | ||
| op.alter_column('customers', 'address_line_1', | ||
| existing_type=sa.TEXT(), | ||
| nullable=True) | ||
| op.alter_column('customers', 'city', | ||
| existing_type=sa.TEXT(), | ||
| nullable=True) | ||
| op.alter_column('customers', 'state', | ||
| existing_type=sa.TEXT(), | ||
| nullable=True) | ||
| # ### end Alembic commands ### | ||
|
|
||
|
|
||
| def downgrade() -> None: | ||
| # ### commands auto generated by Alembic - please adjust! ### | ||
| op.alter_column('customers', 'state', | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (code-quality): Extract duplicate code into function ( |
||
| existing_type=sa.TEXT(), | ||
| nullable=False) | ||
| op.alter_column('customers', 'city', | ||
| existing_type=sa.TEXT(), | ||
| nullable=False) | ||
| op.alter_column('customers', 'address_line_1', | ||
| existing_type=sa.TEXT(), | ||
| nullable=False) | ||
| op.alter_column('customers', 'password', | ||
| existing_type=sa.TEXT(), | ||
| nullable=False) | ||
| op.drop_column('customers', 'created_via_oauth') | ||
| op.drop_column('customers', 'oauth_email') | ||
| op.drop_column('customers', 'oauth_id') | ||
| op.drop_column('customers', 'oauth_provider') | ||
| op.alter_column('businesses', 'state', | ||
| existing_type=sa.TEXT(), | ||
| nullable=False) | ||
| op.alter_column('businesses', 'city', | ||
| existing_type=sa.TEXT(), | ||
| nullable=False) | ||
| op.alter_column('businesses', 'address_line_1', | ||
| existing_type=sa.TEXT(), | ||
| nullable=False) | ||
| op.alter_column('businesses', 'password', | ||
| existing_type=sa.TEXT(), | ||
| nullable=False) | ||
| op.drop_column('businesses', 'created_via_oauth') | ||
| op.drop_column('businesses', 'oauth_email') | ||
| op.drop_column('businesses', 'oauth_id') | ||
| op.drop_column('businesses', 'oauth_provider') | ||
| # ### end Alembic commands ### | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,64 +1,67 @@ | ||
| import bcrypt | ||
| from typing import Optional | ||
|
|
||
| from fastapi import Depends, HTTPException, status | ||
| from fastapi.security import HTTPBasic, HTTPBasicCredentials | ||
| from sqlalchemy.ext.asyncio import AsyncSession | ||
| from sqlalchemy.future import select | ||
| from sqlalchemy.orm import selectinload | ||
|
|
||
| from fastapi_ecom.database.db_setup import get_db | ||
| from fastapi_ecom.database.models.business import Business | ||
| from fastapi_ecom.database.models.customer import Customer | ||
| from fastapi_ecom.utils.basic_auth import verify_basic_business_cred, verify_basic_customer_cred | ||
| from fastapi_ecom.utils.oauth import verify_oauth_business_cred, verify_oauth_customer_cred | ||
|
|
||
| # Initialize HTTP Basic Authentication. | ||
| # This will prompt users for a username and password when accessing secured endpoints. | ||
| security = HTTPBasic() | ||
|
|
||
| async def verify_cust_cred(credentials: HTTPBasicCredentials = Depends(security), db: AsyncSession = Depends(get_db)) -> Customer: | ||
| async def verify_cust_cred( | ||
| basic_customer: Optional[Customer] = Depends(verify_basic_customer_cred), | ||
| oauth_customer: Optional[Customer] = Depends(verify_oauth_customer_cred) | ||
| ) -> Customer: | ||
| """ | ||
| Verify customer credentials using HTTP Basic Authentication. | ||
| This function retrieves the customer record from the database using the provided username | ||
| (email address) and verifies the provided password against the stored hashed password. | ||
| Verify customer credentials using either HTTP Basic Authentication or OAuth. | ||
|
|
||
| This function attempts to authenticate a customer using multiple authentication | ||
| methods. It first tries HTTP Basic Authentication, then falls back to OAuth | ||
| authentication if basic auth is not provided or fails. | ||
|
|
||
| :param credentials: HTTPBasicCredentials containing the username and password. | ||
| :param db: Database session to query customer data. | ||
| :param basic_customer: Customer object from HTTP Basic Authentication or None. | ||
| :param oauth_customer: Customer object from OAuth authentication or None. | ||
|
|
||
| :return: The customer object if authentication is successful. | ||
| :return: The authenticated customer object. | ||
|
|
||
| :raises HTTPException: If the username or password is incorrect. | ||
| :raises HTTPException: If neither authentication method succeeds. | ||
| """ | ||
| query = select(Customer).where(Customer.email == credentials.username).options(selectinload("*")) | ||
| result = await db.execute(query) | ||
| customer_by_email = result.scalar_one_or_none() | ||
| if not customer_by_email or not bcrypt.checkpw(credentials.password.encode('utf-8'), customer_by_email.password.encode('utf-8')): | ||
| raise HTTPException( | ||
| status_code = status.HTTP_401_UNAUTHORIZED, | ||
| detail = "Invalid authentication credentials", | ||
| headers = {"WWW-Authenticate": "Basic"}, | ||
| ) | ||
| else: | ||
| return customer_by_email | ||
|
|
||
| async def verify_business_cred(credentials: HTTPBasicCredentials = Depends(security), db: AsyncSession = Depends(get_db)) -> Business: | ||
| if basic_customer: | ||
| return basic_customer | ||
|
|
||
| if oauth_customer: | ||
| return oauth_customer | ||
|
|
||
| raise HTTPException( | ||
| status_code = status.HTTP_401_UNAUTHORIZED, | ||
| detail = "Not Authenticated", | ||
| ) | ||
|
|
||
| async def verify_business_cred( | ||
| basic_business: Optional[Business] = Depends(verify_basic_business_cred), | ||
| oauth_business: Optional[Business] = Depends(verify_oauth_business_cred) | ||
| ) -> Business: | ||
| """ | ||
| Verify business credentials using HTTP Basic Authentication. | ||
| This function retrieves the business record from the database using the provided username | ||
| (email address) and verifies the provided password against the stored hashed password. | ||
| Verify business credentials using either HTTP Basic Authentication or OAuth. | ||
|
|
||
| This function attempts to authenticate a business using multiple authentication | ||
| methods. It first tries HTTP Basic Authentication, then falls back to OAuth | ||
| authentication if basic auth is not provided or fails. | ||
|
|
||
| :param credentials: HTTPBasicCredentials containing the username and password. | ||
| :param db: Database session to query business data. | ||
| :param basic_business: Business object from HTTP Basic Authentication or None. | ||
| :param oauth_business: Business object from OAuth authentication or None. | ||
|
|
||
| :return: The business object if authentication is successful. | ||
| :return: The authenticated business object. | ||
|
|
||
| :raises HTTPException: If the username or password is incorrect. | ||
| :raises HTTPException: If neither authentication method succeeds. | ||
| """ | ||
| query = select(Business).where(Business.email == credentials.username).options(selectinload("*")) | ||
| result = await db.execute(query) | ||
| business_by_email = result.scalar_one_or_none() | ||
| if not business_by_email or not bcrypt.checkpw(credentials.password.encode('utf-8'), business_by_email.password.encode('utf-8')): | ||
| raise HTTPException( | ||
| status_code = status.HTTP_401_UNAUTHORIZED, | ||
| detail = "Invalid authentication credentials", | ||
| headers = {"WWW-Authenticate": "Basic"}, | ||
| ) | ||
| else: | ||
| return business_by_email | ||
| if basic_business: | ||
| return basic_business | ||
|
|
||
| if oauth_business: | ||
| return oauth_business | ||
|
|
||
| raise HTTPException( | ||
| status_code = status.HTTP_401_UNAUTHORIZED, | ||
| detail = "Not Authenticated", | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| import bcrypt | ||
| from fastapi import Depends | ||
| from fastapi.security import HTTPBasic, HTTPBasicCredentials | ||
| from sqlalchemy.ext.asyncio import AsyncSession | ||
| from sqlalchemy.future import select | ||
| from sqlalchemy.orm import selectinload | ||
|
|
||
| from fastapi_ecom.database.db_setup import get_db | ||
| from fastapi_ecom.database.models.business import Business | ||
| from fastapi_ecom.database.models.customer import Customer | ||
|
|
||
| # Initialize HTTP Basic Authentication. | ||
| # This will prompt users for a username and password when accessing secured endpoints. | ||
| security = HTTPBasic(auto_error=False) | ||
|
|
||
| async def verify_basic_customer_cred(credentials: HTTPBasicCredentials | None = Depends(security), db: AsyncSession = Depends(get_db)) -> Customer: | ||
| """ | ||
| Verify customer credentials using HTTP Basic Authentication. | ||
|
|
||
| This function retrieves the customer record from the database using the provided username | ||
| (email address) and verifies the provided password against the stored hashed password. | ||
|
|
||
| :param credentials: HTTPBasicCredentials containing the username and password. | ||
| :param db: Database session to query customer data. | ||
|
|
||
| :return: The customer object if authentication is successful else returns None. | ||
| """ | ||
| if not credentials: #pragma: no cover | ||
| return None | ||
|
|
||
| query = select(Customer).where(Customer.email == credentials.username).options(selectinload("*")) | ||
| result = await db.execute(query) | ||
| customer_by_email = result.scalar_one_or_none() | ||
|
|
||
| if not customer_by_email: | ||
| return None | ||
| elif not bcrypt.checkpw(credentials.password.encode('utf-8'), customer_by_email.password.encode('utf-8')): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (bug_risk): No check for None password before encoding in basic auth verification. Add a check to ensure customer_by_email.password is not None before encoding, to prevent exceptions for users created via OAuth. |
||
| return None | ||
| else: | ||
| return customer_by_email | ||
|
|
||
| async def verify_basic_business_cred(credentials: HTTPBasicCredentials | None = Depends(security), db: AsyncSession = Depends(get_db)) -> Business: | ||
| """ | ||
| Verify business credentials using HTTP Basic Authentication. | ||
|
|
||
| This function retrieves the business record from the database using the provided username | ||
| (email address) and verifies the provided password against the stored hashed password. | ||
|
|
||
| :param credentials: HTTPBasicCredentials containing the username and password. | ||
| :param db: Database session to query business data. | ||
|
|
||
| :return: The business object if authentication is successful else returns None. | ||
| """ | ||
| if not credentials: #pragma: no cover | ||
| return None | ||
|
|
||
| query = select(Business).where(Business.email == credentials.username).options(selectinload("*")) | ||
| result = await db.execute(query) | ||
| business_by_email = result.scalar_one_or_none() | ||
|
|
||
| if not business_by_email: | ||
| return None | ||
| elif not bcrypt.checkpw(credentials.password.encode('utf-8'), business_by_email.password.encode('utf-8')): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (bug_risk): No check for None password before encoding in business basic auth verification. If business_by_email.password is None, encoding will fail. Please add a check to ensure it is not None before encoding. |
||
| return None | ||
| else: | ||
| return business_by_email | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚨 issue (security): Using the OAuth client secret as the session middleware secret key may be risky.
Use a unique, randomly generated secret for the session middleware to avoid increasing security risks from secret reuse.