Skip to content
184 changes: 75 additions & 109 deletions app/apis/FirebaseAuth.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import firebase_admin.auth
from firebase_admin.auth import *
from fastapi import Request
from fastapi.responses import JSONResponse
from constants import COD_ERROR_ESPERADO, COD_ERROR_INESPERADO, COD_EXITO
from utils.Validadores import validar_txt_token
from utils.Fechas import convertir_datetime_str
from utils.Diccionario import ver_si_existe_clave
Expand All @@ -22,60 +20,42 @@ def validar_token(
int: 1 si el token es válido, 0 en caso contrario y -1 si hay un error de validación.
"""
try:
datos = firebase_admin.auth.verify_id_token(
datos = verify_id_token(
token, firebase_app, check_revoked=True
)
return (1, datos) if obtener_datos else 1
return (COD_EXITO, datos) if obtener_datos else COD_EXITO
except (ExpiredIdTokenError, RevokedIdTokenError, UserDisabledError):
return (0, None) if obtener_datos else 0
return (COD_ERROR_ESPERADO, None) if obtener_datos else COD_ERROR_ESPERADO
except (ValueError, CertificateFetchError, InvalidIdTokenError):
return (-1, None) if obtener_datos else -1
return (COD_ERROR_INESPERADO, None) if obtener_datos else COD_ERROR_INESPERADO


async def verificar_token(
peticion: Request, firebase_app, call_next, authorization: str, textos: dict[str, str], idioma: str ="es"
) -> JSONResponse:
async def verificar_token(firebase_app, token: str) -> int:
"""
Verifica el token de Firebase en la solicitud.
Args:
peticion (Request): La solicitud que contiene el token.
authorization (str | None): El token de autorización de la solicitud.
token (str | None): El token de autorización de la solicitud.
firebase_app (object): La instancia de la aplicación Firebase.
call_next (function): La función para pasar al siguiente middleware o ruta.
idioma (str): El idioma para los mensajes de error.
Returns:
JSONResponse: La respuesta de la solicitud, o un error si el token es inválido.
int: Código de estado: 1 si el token es válido, 0 si es inválido, -1 si hay un error.
"""
try:
token = authorization.split("Bearer ")[1]
token = token.split("Bearer ")[1]
reg_validacion = validar_txt_token(token)
res_validacion = (
0 if (not reg_validacion) else validar_token(token, firebase_app, False)
)
match res_validacion:
case 1:
return await call_next(peticion)
case 0:
return JSONResponse(
{"error": textos[idioma]["errTokenInvalido"]},
status_code=403,
media_type="application/json",
)
case _:
return JSONResponse(
{"error": textos[idioma]["errValidarToken"]},
status_code=400,
media_type="application/json",
)
except Exception as e:
return JSONResponse(
{"error": f"{textos[idioma]['errTry']} {e}"},
status_code=500,
media_type="application/json",
COD_ERROR_ESPERADO
if (not reg_validacion)
else validar_token(token, firebase_app, False)
)

return res_validacion
except Exception:
return COD_ERROR_INESPERADO

def ver_datos_token(token: str, firebase_app, idioma: str, textos: dict[str, str]) -> tuple[int, dict]:

def ver_datos_token(
token: str, firebase_app, idioma: str, textos: dict[str, str]
) -> tuple[int, dict]:
"""
Obtiene los datos del token de Firebase.
Args:
Expand All @@ -91,36 +71,40 @@ def ver_datos_token(token: str, firebase_app, idioma: str, textos: dict[str, str
reg_validacion = validar_txt_token(token)

if not reg_validacion:
return (0, {"error": f"{textos[idioma]['errTokenInvalido']}"})
return (
COD_ERROR_ESPERADO,
{"error": f"{textos[idioma]['errTokenInvalido']}"},
)

res_validacion = validar_token(token, firebase_app, True)
CODIGO, RES = validar_token(token, firebase_app, True)
if CODIGO != COD_EXITO:
error = (
{"error": f"{textos[idioma]['errTokenInvalido']}"}
if CODIGO == COD_ERROR_ESPERADO
else {"error": f"{textos[idioma]['errValidarToken']}"}
)

match res_validacion[0]:
case 1:
return res_validacion
case 0:
return (0, {"error": f"{textos[idioma]['errTokenInvalido']}"})
case _:
return (-1, {"error": f"{textos[idioma]['errValidarToken']}"})
return (CODIGO, RES if CODIGO == COD_EXITO else error)

except Exception as e:
return (-1, {"error": f"{textos[idioma]['errProcesarToken']}: {e}."})
return (
COD_ERROR_INESPERADO,
{"error": f"{textos[idioma]['errProcesarToken']}: {str(e)}."},
)


async def ver_datos_usuarios(firebase_app, idioma: str, textos: dict[str, str]) -> JSONResponse:
async def ver_datos_usuarios(firebase_app) -> tuple[int, list[dict] | None]:
"""
Obtiene los datos de los usuarios registrados en Firebase.
Args:
firebase_app: La instancia de la aplicación Firebase.
idioma (str): El idioma para los mensajes de error.
textos (dict[str, str]): El diccionario de textos para los mensajes de error.
Returns:
JSONResponse: Los datos de los usuarios, o un error si ocurre un problema.
tuple[int, list[dict] | None]: Un código de estado y los datos de los usuarios si se obtuvieron correctamente, o None si hubo un error.
"""
try:
AUX = []
roles_task = asyncio.create_task(obtener_roles_usuarios())
usuarios = firebase_admin.auth.list_users(app=firebase_app)
usuarios = list_users(app=firebase_app)
ROLES = await roles_task

while True:
Expand Down Expand Up @@ -149,20 +133,12 @@ async def ver_datos_usuarios(firebase_app, idioma: str, textos: dict[str, str])
else:
usuarios = usuarios.get_next_page()

return JSONResponse(
{"usuarios": AUX},
status_code=200,
media_type="application/json",
)
except Exception as e:
return JSONResponse(
{"error": f"{textos[idioma]['errObtenerDatosUsuarios']}: {e}"},
status_code=400,
media_type="application/json",
)
return (COD_EXITO, AUX)
except Exception:
return (COD_ERROR_INESPERADO, None)


async def ver_datos_usuario(firebase_app, uid: str, idioma: str, textos: dict[str, str]) -> JSONResponse:
async def ver_datos_usuario(firebase_app, uid: str) -> tuple[int, dict | None]:
"""
Obtiene los datos de un usuario específico usando el UID.
Args:
Expand All @@ -171,15 +147,15 @@ async def ver_datos_usuario(firebase_app, uid: str, idioma: str, textos: dict[st
idioma (str): El idioma para los mensajes de error.
textos (dict[str, str]): El diccionario de textos para los mensajes de error.
Returns:
JSONResponse: Los datos del usuario, o un error si ocurre un problema.
tuple[int, dict | str | None]: Un código de estado y los datos del usuario si se encuentra.
"""
try:
roles_task = asyncio.create_task(obtener_rol_usuario(uid))
usuario = firebase_admin.auth.get_user(uid, firebase_app)
usuario = get_user(uid, firebase_app)
ROL = await roles_task

if ROL == -1:
raise UserNotFoundError(f"{textos[idioma]['errUsuarioNoEncontrado']}")
raise UserNotFoundError("")

RES = {
"correo": usuario.email,
Expand All @@ -195,23 +171,11 @@ async def ver_datos_usuario(firebase_app, uid: str, idioma: str, textos: dict[st
),
}

return JSONResponse(
RES,
status_code=200,
media_type="application/json",
)
return (COD_EXITO, RES)
except UserNotFoundError:
return JSONResponse(
{"error": f"{textos[idioma]['errUsuarioNoEncontrado']}"},
status_code=404,
media_type="application/json",
)
return (COD_ERROR_ESPERADO, None)
except Exception as e:
return JSONResponse(
{"error": f"{textos[idioma]['errObtenerDatosUsuarios']}: {e}"},
status_code=400,
media_type="application/json",
)
return (COD_ERROR_INESPERADO, str(e))


def ver_usuario_firebase(firebase_app, uid: str) -> tuple[int, UserRecord | None]:
Expand All @@ -224,44 +188,46 @@ def ver_usuario_firebase(firebase_app, uid: str) -> tuple[int, UserRecord | None
tuple[int, UserRecord | None]: Un código de estado y el registro del usuario si se encuentra.
"""
try:
return (1, firebase_admin.auth.get_user(uid, firebase_app))
RES = get_user(uid, firebase_app)
return (COD_EXITO, RES)
except UserNotFoundError:
return (0, None)
except:
return (-1, None)
return (COD_ERROR_ESPERADO, None)
except Exception:
return (COD_ERROR_INESPERADO, None)


def actualizar_estado_usuario(
firebase_app, uid: str, estado: bool, idioma: str, textos: dict[str, str]
) -> JSONResponse:
firebase_app, uid: str, estado: bool
) -> tuple[int, dict | None]:
"""
Actualiza el estado (activado/desactivado) de un usuario específico.
Args:
firebase_app: La instancia de la aplicación Firebase.
uid (str): El UID del usuario a actualizar.
estado (bool): El nuevo estado del usuario (True para desactivado, False para activado).
idioma (str): El idioma para los mensajes de error.
textos (dict[str, str]): El diccionario de textos para los mensajes de error.
Returns:
JSONResponse | Response: Mensaje de éxito o error.
tuple[int, dict | None]: Un código de estado y los datos del usuario actualizado si se actualiza correctamente.
"""
try:
firebase_admin.auth.update_user(uid=uid, disabled=estado, app=firebase_app)

return JSONResponse(
{"mensaje": f"{textos[idioma]['msgUsuarioActualizado']}"},
status_code=200,
media_type="application/json",
USUARIO = update_user(
uid=uid, disabled=estado, app=firebase_app
)

RES = {
"correo": USUARIO.email,
"uid": USUARIO.uid,
"nombre": USUARIO.display_name,
"estado": not USUARIO.disabled,
"fecha_registro": convertir_datetime_str(
USUARIO.user_metadata.creation_timestamp
),
"ultima_conexion": convertir_datetime_str(
USUARIO.user_metadata.last_refresh_timestamp
),
}

return (COD_EXITO, RES)
except ValueError:
return JSONResponse(
{"error": f"{textos[idioma]['errEstadoInvalido']}"},
status_code=401,
media_type="application/json",
)
return (COD_ERROR_ESPERADO, None)
except Exception as e:
return JSONResponse(
{"error": f"{textos[idioma]['errTry']} {str(e)}"},
status_code=500,
media_type="application/json",
)
return (COD_ERROR_INESPERADO, str(e))
3 changes: 3 additions & 0 deletions app/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
from utils.Dominios import obtener_lista_dominios

ROL_ADMIN = 1001
COD_EXITO = 1
COD_ERROR_ESPERADO = 0
COD_ERROR_INESPERADO = -1
CORS_ORIGINS = obtener_lista_dominios(getenv("CORS_ORIGINS", "http://localhost:5173,"))
ORIGENES_AUTORIZADOS = obtener_lista_dominios(getenv("ORIGENES_AUTORIZADOS", "http://localhost:5173,"))
ACTIVAR_DOCS = getenv("ACTIVAR_DOCS", "false").lower() == "true"
Expand Down
74 changes: 41 additions & 33 deletions app/dependencies/usuarios_dependencies.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,59 @@
from apis.FirebaseAuth import ver_datos_token
from apis.Firestore import verificar_rol_usuario
from fastapi import Header, Request
from fastapi import Header, Request, Depends
from fastapi.responses import JSONResponse
from constants import COD_ERROR_ESPERADO, COD_ERROR_INESPERADO
from models.Excepciones import AccesoNoAutorizado, UIDInvalido
from utils.Validadores import validar_uid
from urllib.parse import unquote
from dependencies.general_dependencies import verificar_idioma


async def verificar_usuario_administrador(
peticion: Request,
language: str | None = Header(default="es"),
authorization: str | None = Header(default=""),
idioma: str = Depends(verificar_idioma),
) -> tuple[bool, JSONResponse | None]:
"""
Verifica si el usuario está autenticado y es administrador antes de permitir el acceso a las rutas protegidas.

Args:
peticion (Request): La solicitud HTTP entrante.
language (str | None): El idioma de la solicitud HTTP.
authorization (str | None): El token de autorización de Firebase.
Returns:
tuple: Un tuple que contiene un booleano indicando si el usuario está autenticado y es administrador, y
en caso de error, un JSONResponse con el mensaje de error y el código de estado correspondiente.
idioma (str): El idioma preferido del usuario, obtenido a través de la dependencia `verificar_idioma`.
"""
firebase_app = peticion.state.firebase_app
TEXTOS = peticion.state.textos
idioma = language if language in ("es", "en") else "es"
try:
RES, DATOS = ver_datos_token(authorization, firebase_app, idioma, TEXTOS)

if RES in (-1, 0):
return False, JSONResponse(
DATOS,
status_code=403 if RES == 0 else 400,
media_type="application/json",
)

VALIDAR_ROL = await verificar_rol_usuario(DATOS["uid"])

if not VALIDAR_ROL:
return False, JSONResponse(
{"error": f"{TEXTOS[idioma]['errAccesoDenegado']}"},
status_code=403,
media_type="application/json",
)

return True, None
except Exception as e:
return False, JSONResponse(
{"error": f"{TEXTOS[idioma]['errTry']} {str(e)}"},
status_code=500,
media_type="application/json",
)
RES, DATOS = ver_datos_token(authorization, firebase_app, idioma, TEXTOS)

if RES in (COD_ERROR_INESPERADO, COD_ERROR_ESPERADO):
raise AccesoNoAutorizado(DATOS, 403)

VALIDAR_ROL = await verificar_rol_usuario(DATOS["uid"])

if not VALIDAR_ROL:
raise AccesoNoAutorizado({ "error": TEXTOS[idioma]["errAccesoDenegado"] }, 403)


async def validador_uid(
peticion: Request, uid: str, idioma: str = Depends(verificar_idioma)
) -> str:
"""
Valida el UID proporcionado en la solicitud. Si es inválido lanza una excepción.

Args:
peticion (Request): La solicitud HTTP entrante.
uid (str): El UID a validar.
idioma (str): El idioma preferido del usuario, obtenido a través de la dependencia `

Returns:
str: El UID validado.
"""
uid = unquote(uid)
TEXTOS = peticion.state.textos
VALIDACION = validar_uid(uid)

if not VALIDACION:
raise UIDInvalido({"error": f"{TEXTOS[idioma]['errUIDInvalido']}"})

return uid
Loading