Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 35 additions & 24 deletions app/apis/FirebaseAuth.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from firebase_admin.auth import *
from firebase_admin.exceptions import NotFoundError
from firebase_admin import App
from constants import COD_ERROR_ESPERADO, COD_ERROR_INESPERADO, COD_EXITO
from models.Peticiones import UsuarioActualizar
from utils.Validadores import validar_txt_token
from utils.Fechas import convertir_datetime_str
from utils.Diccionario import ver_si_existe_clave
from apis.Firestore import obtener_roles_usuarios, obtener_rol_usuario
import asyncio



def validar_token(
Expand All @@ -21,9 +21,7 @@ def validar_token(
int: 1 si el token es válido, 0 en caso contrario y -1 si hay un error de validación.
"""
try:
datos = verify_id_token(
token, firebase_app, check_revoked=True
)
datos = verify_id_token(token, firebase_app, check_revoked=True)
return (COD_EXITO, datos) if obtener_datos else COD_EXITO
except (ExpiredIdTokenError, RevokedIdTokenError, UserDisabledError):
return (COD_ERROR_ESPERADO, None) if obtener_datos else COD_ERROR_ESPERADO
Expand Down Expand Up @@ -104,9 +102,7 @@ async def ver_datos_usuarios(firebase_app: App) -> tuple[int, list[dict] | None]
"""
try:
AUX = []
roles_task = asyncio.create_task(obtener_roles_usuarios())
usuarios = list_users(app=firebase_app)
ROLES = await roles_task

while True:
AUX.extend(
Expand All @@ -115,9 +111,7 @@ async def ver_datos_usuarios(firebase_app: App) -> tuple[int, list[dict] | None]
"correo": x.email,
"uid": x.uid,
"nombre": x.display_name,
"rol": (
ROLES[x.uid] if ver_si_existe_clave(ROLES, x.uid) else "N/A"
),
"administrador": x.custom_claims["admin"],
"estado": not x.disabled,
"fecha_registro": convertir_datetime_str(
x.user_metadata.creation_timestamp
Expand All @@ -135,7 +129,7 @@ async def ver_datos_usuarios(firebase_app: App) -> tuple[int, list[dict] | None]
usuarios = usuarios.get_next_page()

return (COD_EXITO, AUX)
except Exception:
except:
return (COD_ERROR_INESPERADO, None)


Expand All @@ -151,18 +145,13 @@ async def ver_datos_usuario(firebase_app: App, uid: str) -> tuple[int, dict | No
tuple[int, dict | str | None]: Un código de estado y los datos del usuario si se encuentra.
"""
try:
roles_task = asyncio.create_task(obtener_rol_usuario(uid))
usuario = get_user(uid, firebase_app)
ROL = await roles_task

if ROL == -1:
raise UserNotFoundError("")

RES = {
"correo": usuario.email,
"uid": usuario.uid,
"nombre": usuario.display_name,
"rol": ROL,
"administrador": usuario.custom_claims["admin"],
"estado": not usuario.disabled,
"fecha_registro": convertir_datetime_str(
usuario.user_metadata.creation_timestamp
Expand All @@ -175,8 +164,8 @@ async def ver_datos_usuario(firebase_app: App, uid: str) -> tuple[int, dict | No
return (COD_EXITO, RES)
except UserNotFoundError:
return (COD_ERROR_ESPERADO, None)
except Exception as e:
return (COD_ERROR_INESPERADO, str(e))
except Exception:
return (COD_ERROR_INESPERADO, None)


def ver_usuario_firebase(firebase_app: App, uid: str) -> tuple[int, UserRecord | None]:
Expand All @@ -198,27 +187,31 @@ def ver_usuario_firebase(firebase_app: App, uid: str) -> tuple[int, UserRecord |


def actualizar_estado_usuario(
firebase_app: App, uid: str, estado: bool
firebase_app: App, uid: str, usuario: UsuarioActualizar
) -> tuple[int, dict | None]:
"""
Actualiza el estado (activado/desactivado) de un usuario específico.
Args:
firebase_app (App): La instancia de la aplicación Firebase.
uid (str): El UID del usuario a actualizar.
estado (bool): El nuevo estado del usuario (True para desactivado, False para activado).
usuario (UsuarioActualizar): La instancia de usuario a actualizar con los nuevos valores de estado y administrador.
Returns:
tuple[int, dict | None]: Un código de estado y los datos del usuario actualizado si se actualiza correctamente.
"""
try:
USUARIO = update_user(
uid=uid, disabled=estado, app=firebase_app
uid=uid,
disabled=usuario.desactivar,
app=firebase_app,
custom_claims={"admin": usuario.administrador},
)

RES = {
"correo": USUARIO.email,
"uid": USUARIO.uid,
"nombre": USUARIO.display_name,
"estado": not USUARIO.disabled,
"administrador": USUARIO.custom_claims["admin"],
"fecha_registro": convertir_datetime_str(
USUARIO.user_metadata.creation_timestamp
),
Expand All @@ -228,7 +221,25 @@ def actualizar_estado_usuario(
}

return (COD_EXITO, RES)
except ValueError:
except NotFoundError:
return (COD_ERROR_ESPERADO, None)
except Exception as e:
return (COD_ERROR_INESPERADO, str(e))


def establecer_rol_usuario(firebase_app: App, uid: str) -> tuple[int, str | None]:
"""
Establece el rol de un usuario específico cuando este se registra.
Args:
firebase_app (App): La instancia de la aplicación Firebase.
uid (str): El UID del usuario al que se le asignará el rol.
Returns:
tuple[int, str | None]: Un código de estado indicando el resultado de la operación.
"""
try:
set_custom_user_claims(uid, {"admin": False}, app=firebase_app)
return (COD_EXITO, None)
except NotFoundError:
return (COD_ERROR_ESPERADO, None)
except Exception as e:
return (COD_ERROR_INESPERADO, str(e))
57 changes: 0 additions & 57 deletions app/apis/Firestore.py

This file was deleted.

2 changes: 0 additions & 2 deletions app/bin/textos.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
"errUIDInvalido": "Invalid UID",
"errUsuarioNoEncontrado": "User not found",
"errObtenerUsuario": "Error retrieving user",
"errEstadoInvalido": "Invalid status",
"msgUsuarioActualizado": "User status updated successfully",
"errProcesarToken": "Error processing the token",
"errCaptchaTokenErroneo": "The provided token contains errors.",
Expand All @@ -22,7 +21,6 @@
"errUIDInvalido": "UID inválido",
"errUsuarioNoEncontrado": "Usuario no encontrado",
"errObtenerUsuario": "Error al obtener el usuario",
"errEstadoInvalido": "Estado inválido",
"msgUsuarioActualizado": "Estado del usuario actualizado correctamente",
"errProcesarToken": "Error al procesar el token",
"errCaptchaTokenErroneo": "El token proveído tiene errores.",
Expand Down
1 change: 0 additions & 1 deletion app/constants.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from os import getenv
from utils.Dominios import obtener_lista_dominios

ROL_ADMIN = 1001
COD_EXITO = 1
COD_ERROR_ESPERADO = 0
COD_ERROR_INESPERADO = -1
Expand Down
32 changes: 28 additions & 4 deletions app/dependencies/general_dependencies.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,36 @@
from fastapi import Header
from apis.FirebaseAuth import verificar_token
from fastapi import Header, Request
from models.Excepciones import AccesoNoAutorizado
from constants import COD_ERROR_ESPERADO, COD_EXITO

async def verificar_idioma (language: str | None = Header(default="es")) -> str:

async def verificar_idioma(language: str | None = Header(default="es")) -> str:
"""
Verifica si el idioma de la solicitud es válido.

Args:
language (str | None): El idioma de la solicitud HTTP.
Returns:
str: El idioma de la solicitud, por defecto "es" (español).
"""
return "es" if language not in ("es", "en") else language
return "es" if language not in ("es", "en") else language


async def verificar_autenticado(
peticion: Request,
authorization: str | None = Header(default=""),
language: str = Header(default="es"),
) -> bool:
"""
Verifica si el usuario está autenticado.
Args:
pet (Request): La solicitud HTTP.
authorization (str | None): El token de autorización.
language (str): El idioma de la solicitud.
"""
firebase_app = peticion.state.firebase_app
TEXTOS = peticion.state.textos
RES = await verificar_token(firebase_app, authorization)

if RES != COD_EXITO:
texto = "errAccesoDenegado" if RES == COD_ERROR_ESPERADO else "errTokenInvalido"
raise AccesoNoAutorizado({"error": f"{TEXTOS[language][texto]}"}, 403)
7 changes: 2 additions & 5 deletions app/dependencies/usuarios_dependencies.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from apis.FirebaseAuth import ver_datos_token
from apis.Firestore import verificar_rol_usuario
from fastapi import Header, Request, Depends
from fastapi.responses import JSONResponse
from constants import COD_ERROR_ESPERADO, COD_ERROR_INESPERADO
Expand All @@ -25,13 +24,11 @@ async def verificar_usuario_administrador(
firebase_app = peticion.state.firebase_app
TEXTOS = peticion.state.textos
RES, DATOS = ver_datos_token(authorization, firebase_app, idioma, TEXTOS)

if RES in (COD_ERROR_INESPERADO, COD_ERROR_ESPERADO):
raise AccesoNoAutorizado(DATOS, 403)

VALIDAR_ROL = await verificar_rol_usuario(DATOS["uid"])

if not VALIDAR_ROL:
if DATOS["admin"] != True:
raise AccesoNoAutorizado({ "error": TEXTOS[idioma]["errAccesoDenegado"] }, 403)


Expand Down
44 changes: 1 addition & 43 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from routers.main_router import router as main_router
from utils.Dominios import obtener_lista_dominios
from routers.usuarios_router import router as usuarios_router
from apis.FirebaseAuth import verificar_token
from constants import *
from utils.Validadores import validar_origen
from utils.Diccionario import ver_si_existe_clave
Expand All @@ -19,6 +18,7 @@
from onnxruntime import InferenceSession
from firebase_admin_config import inicializar_firebase
from models.Excepciones import *
import uvicorn

load_dotenv()

Expand Down Expand Up @@ -112,48 +112,6 @@ async def verificar_origen_autorizado(peticion: Request, call_next) -> Response:
return await call_next(peticion)


@app.middleware("http")
async def verificar_credenciales(peticion: Request, call_next) -> Response:
"""
Middleware para verificar las credenciales de Firebase en cada solicitud de diagnóstico.
Args:
peticion (Diagnostico): La solicitud que contiene el token.
call_next: La función para pasar al siguiente middleware o ruta.
"""
RUTAS_NO_PROTEGIDAS = ("/recaptcha",)
METODOS_RESTRINGIDOS = ("POST",)
firebase_app = peticion.state.firebase_app
TEXTOS = peticion.state.textos

token = (
peticion.headers["authorization"]
if ver_si_existe_clave(peticion.headers, "authorization")
else ""
)
idioma = (
peticion.headers["language"]
if ver_si_existe_clave(peticion.headers, "language")
else "es"
)

if peticion.method in METODOS_RESTRINGIDOS and (
peticion.url.path not in RUTAS_NO_PROTEGIDAS
):
RES = await verificar_token(
firebase_app, token
)

if RES != COD_EXITO:
TEXTO = "errValidarToken" if RES == COD_ERROR_ESPERADO else "errTokenInvalido"
CODIGO = 403 if RES == COD_ERROR_ESPERADO else 400
return JSONResponse(
{"error": TEXTOS[idioma][TEXTO]},
status_code=CODIGO,
media_type="application/json",
)

return await call_next(peticion)

# Manejadores globales de excepciones personalizadas
@app.exception_handler(AccesoNoAutorizado)
async def manejar_acceso_no_autorizado(peticion: Request, excepcion: AccesoNoAutorizado):
Expand Down
9 changes: 8 additions & 1 deletion app/models/Peticiones.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,11 @@ def obtener_diccionario_instancia(self):
"Hipertensión_arterial": [self.hipertension_arterial], "Neurológica": [self.neurologica], "Pulmonar": [self.pulmonar],
"Renal": [self.renal], "Trombofilia": [self.trombofilia], "Urológica": [self.urologica], "Vascular": [self.vascular],
"VIH": [self.vih],
}
}

class UsuarioActualizar(BaseModel):
"""
Clase que representa una petición para actualizar el estado de un usuario.
"""
desactivar: bool
administrador: bool
Loading