Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app/db/crud/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
NodeStatus,
NodeUsage,
NodeUsageResetLogs,
NodeUserLimit,
NodeUserUsage,
)
from app.db.compiles_types import DateDiff
Expand Down Expand Up @@ -274,6 +275,7 @@ async def remove_node(db: AsyncSession, db_node: Node) -> None:

# Remove dependent rows explicitly to avoid ORM cascading overhead on large tables.
await db.execute(delete(NodeUserUsage).where(NodeUserUsage.node_id == node_id))
await db.execute(delete(NodeUserLimit).where(NodeUserLimit.node_id == node_id))
await db.execute(delete(NodeUsage).where(NodeUsage.node_id == node_id))
await db.execute(delete(NodeUsageResetLogs).where(NodeUsageResetLogs.node_id == node_id))
await db.execute(delete(NodeStat).where(NodeStat.node_id == node_id))
Expand Down
236 changes: 236 additions & 0 deletions app/db/crud/node_user_limit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
from typing import Optional

from sqlalchemy import and_, delete, select
from sqlalchemy.ext.asyncio import AsyncSession

from app.db.models import NodeUserLimit, NodeUserUsage
from app.models.node_user_limit import NodeUserLimitCreate


async def get_node_user_limit(
db: AsyncSession, user_id: int, node_id: int
) -> Optional[NodeUserLimit]:
"""
Retrieves a specific node user limit by user_id and node_id.

Args:
db (AsyncSession): The database session.
user_id (int): The user ID.
node_id (int): The node ID.

Returns:
Optional[NodeUserLimit]: The NodeUserLimit object if found, None otherwise.
"""
stmt = select(NodeUserLimit).where(
and_(NodeUserLimit.user_id == user_id, NodeUserLimit.node_id == node_id)
)
return (await db.execute(stmt)).scalar_one_or_none()


async def get_node_user_limit_by_id(db: AsyncSession, limit_id: int) -> Optional[NodeUserLimit]:
"""
Retrieves a node user limit by its ID.

Args:
db (AsyncSession): The database session.
limit_id (int): The limit ID.

Returns:
Optional[NodeUserLimit]: The NodeUserLimit object if found, None otherwise.
"""
stmt = select(NodeUserLimit).where(NodeUserLimit.id == limit_id)
return (await db.execute(stmt)).scalar_one_or_none()


async def get_user_limits_for_node(db: AsyncSession, node_id: int) -> list[NodeUserLimit]:
"""
Retrieves all user limits for a specific node.

Args:
db (AsyncSession): The database session.
node_id (int): The node ID.

Returns:
list[NodeUserLimit]: List of NodeUserLimit objects for the node.
"""
stmt = select(NodeUserLimit).where(NodeUserLimit.node_id == node_id)
return list((await db.execute(stmt)).scalars().all())


async def get_node_limits_for_user(db: AsyncSession, user_id: int) -> list[NodeUserLimit]:
"""
Retrieves all node limits for a specific user.

Args:
db (AsyncSession): The database session.
user_id (int): The user ID.

Returns:
list[NodeUserLimit]: List of NodeUserLimit objects for the user.
"""
stmt = select(NodeUserLimit).where(NodeUserLimit.user_id == user_id)
return list((await db.execute(stmt)).scalars().all())


async def create_node_user_limit(
db: AsyncSession, limit: NodeUserLimitCreate
) -> NodeUserLimit:
"""
Creates a new node user limit.

Args:
db (AsyncSession): The database session.
limit (NodeUserLimitCreate): The limit creation model.

Returns:
NodeUserLimit: The newly created NodeUserLimit object.
"""
db_limit = NodeUserLimit(**limit.model_dump())
db.add(db_limit)
await db.commit()
await db.refresh(db_limit)
return db_limit


async def upsert_node_user_limit(
db: AsyncSession, limit: NodeUserLimitCreate
) -> NodeUserLimit:
"""
Creates or updates a node user limit (upsert).
If limit exists for user_id/node_id, updates it; otherwise creates new.

Args:
db (AsyncSession): The database session.
limit (NodeUserLimitCreate): The limit data.

Returns:
NodeUserLimit: The created or updated NodeUserLimit object.
"""
existing = await get_node_user_limit(db, limit.user_id, limit.node_id)

if existing:
existing.data_limit = limit.data_limit
existing.data_limit_reset_strategy = limit.data_limit_reset_strategy
existing.reset_time = limit.reset_time
await db.commit()
await db.refresh(existing)
return existing
else:
return await create_node_user_limit(db, limit)


async def modify_node_user_limit(
db: AsyncSession, db_limit: NodeUserLimit, data_limit: int,
data_limit_reset_strategy: str | None = None, reset_time: int | None = None
) -> NodeUserLimit:
"""
Modifies an existing node user limit.

Args:
db (AsyncSession): The database session.
db_limit (NodeUserLimit): The NodeUserLimit object to modify.
data_limit (int): The new data limit value.
data_limit_reset_strategy (str | None): The reset strategy.
reset_time (int | None): The reset time.

Returns:
NodeUserLimit: The modified NodeUserLimit object.
"""
db_limit.data_limit = data_limit
if data_limit_reset_strategy is not None:
db_limit.data_limit_reset_strategy = data_limit_reset_strategy
if reset_time is not None:
db_limit.reset_time = reset_time
await db.commit()
await db.refresh(db_limit)
return db_limit


async def remove_node_user_limit(db: AsyncSession, db_limit: NodeUserLimit) -> None:
"""
Removes a node user limit.

Args:
db (AsyncSession): The database session.
db_limit (NodeUserLimit): The NodeUserLimit object to remove.
"""
await db.execute(delete(NodeUserLimit).where(NodeUserLimit.id == db_limit.id))
await db.commit()


async def bulk_set_user_limits_for_node(
db: AsyncSession, node_id: int, user_limits: dict[int, int],
data_limit_reset_strategy: str = "no_reset", reset_time: int = -1
) -> list[NodeUserLimit]:
"""
Bulk sets or updates user limits for a specific node.

Args:
db (AsyncSession): The database session.
node_id (int): The node ID.
user_limits (dict[int, int]): Dictionary mapping user_id to data_limit.
data_limit_reset_strategy (str): Reset strategy to apply to all limits.
reset_time (int): Reset time to apply to all limits.

Returns:
list[NodeUserLimit]: List of created/updated NodeUserLimit objects.
"""
result_limits = []

for user_id, data_limit in user_limits.items():
# Check if limit already exists
existing_limit = await get_node_user_limit(db, user_id, node_id)

if existing_limit:
# Update existing limit
existing_limit.data_limit = data_limit
existing_limit.data_limit_reset_strategy = data_limit_reset_strategy
existing_limit.reset_time = reset_time
result_limits.append(existing_limit)
else:
# Create new limit
new_limit = NodeUserLimit(
user_id=user_id,
node_id=node_id,
data_limit=data_limit,
data_limit_reset_strategy=data_limit_reset_strategy,
reset_time=reset_time
)
db.add(new_limit)
result_limits.append(new_limit)

await db.commit()

# Refresh all limits
for limit in result_limits:
await db.refresh(limit)

return result_limits
async def get_nodes_with_over_limit_users(db: AsyncSession) -> list[int]:
"""
Finds IDs of nodes that have at least one user who exceeded their per-node limit.

Args:
db (AsyncSession): The database session.

Returns:
list[int]: List of node IDs.
"""
from sqlalchemy import func

stmt = (
select(NodeUserLimit.node_id)
.join(
NodeUserUsage,
and_(
NodeUserLimit.user_id == NodeUserUsage.user_id,
NodeUserLimit.node_id == NodeUserUsage.node_id,
),
)
.where(NodeUserLimit.data_limit > 0)
.group_by(NodeUserLimit.node_id, NodeUserLimit.user_id, NodeUserLimit.data_limit)
.having(func.sum(NodeUserUsage.used_traffic) >= NodeUserLimit.data_limit)
.distinct()
)
result = await db.execute(stmt)
return list(result.scalars().all())
78 changes: 78 additions & 0 deletions app/db/crud/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
DataLimitResetStrategy,
Group,
NextPlan,
NodeUserLimit,
NodeUserUsage,
NotificationReminder,
ReminderType,
Expand Down Expand Up @@ -406,6 +407,10 @@ async def get_user_usages(
node_id_val = row_dict.pop("node_id", node_id)
if node_id_val not in stats:
stats[node_id_val] = []
# Convert period_start from string to datetime if needed
if "period_start" in row_dict and isinstance(row_dict["period_start"], str):
row_dict["period_start"] = datetime.fromisoformat(row_dict["period_start"])

stats[node_id_val].append(UserUsageStat(**row_dict))

return UserUsageStatsList(period=period, start=start, end=end, stats=stats)
Expand Down Expand Up @@ -502,6 +507,29 @@ async def create_user(db: AsyncSession, new_user: UserCreate, groups: list[Group
await db.commit()
await db.refresh(db_user)

# Apply per-node default limits from nodes that have user_data_limit configured
# This provides the lowest-priority default; template limits will override this later
from app.db.models import Node, NodeUserLimit

# Get all nodes with user_data_limit configured
nodes_stmt = select(Node).where(Node.user_data_limit > 0)
nodes_result = await db.execute(nodes_stmt)
nodes = nodes_result.scalars().all()

for node in nodes:
# Create default limit for new user based on Node.user_data_limit
new_limit = NodeUserLimit(
user_id=db_user.id,
node_id=node.id,
data_limit=node.user_data_limit,
data_limit_reset_strategy=node.user_data_limit_reset_strategy,
reset_time=node.user_reset_time
)
db.add(new_limit)

await db.commit()
await db.refresh(db_user)

await load_user_attrs(db_user)
return db_user

Expand Down Expand Up @@ -553,13 +581,15 @@ async def _delete_user_dependencies(db: AsyncSession, user_ids: list[int]):
return

await db.execute(delete(NodeUserUsage).where(NodeUserUsage.user_id.in_(user_ids)))
await db.execute(delete(NodeUserLimit).where(NodeUserLimit.user_id.in_(user_ids)))
await db.execute(delete(NotificationReminder).where(NotificationReminder.user_id.in_(user_ids)))
await db.execute(delete(UserSubscriptionUpdate).where(UserSubscriptionUpdate.user_id.in_(user_ids)))
await db.execute(delete(UserUsageResetLogs).where(UserUsageResetLogs.user_id.in_(user_ids)))
await db.execute(delete(NextPlan).where(NextPlan.user_id.in_(user_ids)))
await db.execute(users_groups_association.delete().where(users_groups_association.c.user_id.in_(user_ids)))



async def remove_user(db: AsyncSession, db_user: User) -> User:
"""
Removes a user from the database.
Expand Down Expand Up @@ -753,6 +783,49 @@ async def bulk_reset_user_data_usage(db: AsyncSession, users: list[User]) -> lis
return users


async def reset_user_node_usage(db: AsyncSession, db_user: User, node_ids: list[int]) -> User:
"""
Resets the data usage of a user for specific nodes.

Args:
db (AsyncSession): Database session.
db_user (User): The user object.
node_ids (list[int]): List of node IDs to reset usage for.

Returns:
User: The updated user object.
"""
if not node_ids:
return db_user

# Delete usage records for these nodes
await db.execute(
delete(NodeUserUsage)
.where(NodeUserUsage.user_id == db_user.id)
.where(NodeUserUsage.node_id.in_(node_ids))
)

# Recalculate total used traffic from remaining records
# If no records left, sum will be None, so we coalesce to 0
result = await db.execute(
select(func.coalesce(func.sum(NodeUserUsage.used_traffic), 0))
.where(NodeUserUsage.user_id == db_user.id)
)
total_usage = result.scalar()

db_user.used_traffic = total_usage

# If user was limited due to usage, check if we can reactivate
if db_user.status not in [UserStatus.expired, UserStatus.disabled, UserStatus.on_hold]:
if not db_user.data_limit or db_user.used_traffic < db_user.data_limit:
db_user.status = UserStatus.active

await db.commit()
await db.refresh(db_user)
await load_user_attrs(db_user)
return db_user


async def reset_user_by_next(db: AsyncSession, db_user: User) -> User:
"""
Resets the data usage of a user based on next user.
Expand Down Expand Up @@ -1013,6 +1086,11 @@ async def get_all_users_usages(
for row in result.mappings():
row_dict = dict(row)
node_id_val = row_dict.pop("node_id", node_id)

# Convert period_start from string to datetime if needed
if "period_start" in row_dict and isinstance(row_dict["period_start"], str):
row_dict["period_start"] = datetime.fromisoformat(row_dict["period_start"])

if node_id_val not in stats:
stats[node_id_val] = []
stats[node_id_val].append(UserUsageStat(**row_dict))
Expand Down
Loading
Loading