From 37a171dfb385810deb87c34caf05791a9e8fc5bb Mon Sep 17 00:00:00 2001 From: Cristian O <84862634+Criser2013@users.noreply.github.com> Date: Sun, 15 Feb 2026 17:07:47 +0000 Subject: [PATCH 1/3] =?UTF-8?q?Inicializaci=C3=B3n=20de=20modelos=20y=20fi?= =?UTF-8?q?rebase=20implementado=20usando=20el=20objeto=20"lifespan".?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/apis/FirebaseAuth.py | 2 +- app/constants.py | 9 ---- app/firebase_admin_config.py | 23 ++++++---- app/main.py | 81 ++++++++++++++++++++++++++++++------ app/models/Diagnostico.py | 33 +++++++-------- 5 files changed, 98 insertions(+), 50 deletions(-) diff --git a/app/apis/FirebaseAuth.py b/app/apis/FirebaseAuth.py index 8daa5b8..dc61dd7 100644 --- a/app/apis/FirebaseAuth.py +++ b/app/apis/FirebaseAuth.py @@ -1,6 +1,6 @@ import firebase_admin.auth from firebase_admin.auth import * -from fastapi import Request, Header +from fastapi import Request from fastapi.responses import JSONResponse from utils.Validadores import validar_txt_token from utils.Fechas import convertir_datetime_str diff --git a/app/constants.py b/app/constants.py index f3340d9..505df26 100644 --- a/app/constants.py +++ b/app/constants.py @@ -1,15 +1,6 @@ from os import getenv -from dill import load as dload -from json import load as jload -from pathlib import Path from utils.Dominios import obtener_lista_dominios -with open(f"{Path(__file__).resolve().parent}/bin/explicador.pkl", "rb") as archivo: - EXPLAINER = dload(archivo) - -with open(f"{Path(__file__).resolve().parent}/bin/textos.json") as archivo: - TEXTOS = jload(archivo) - ROL_ADMIN = 1001 CORS_ORIGINS = obtener_lista_dominios(getenv("CORS_ORIGINS", "http://localhost:5173,")) ORIGENES_AUTORIZADOS = obtener_lista_dominios(getenv("ORIGENES_AUTORIZADOS", "http://localhost:5173,")) diff --git a/app/firebase_admin_config.py b/app/firebase_admin_config.py index f95e7e4..68ef729 100644 --- a/app/firebase_admin_config.py +++ b/app/firebase_admin_config.py @@ -1,16 +1,21 @@ -import firebase_admin +from firebase_admin import initialize_app, App +from firebase_admin.credentials import Certificate from pathlib import Path from os.path import join, exists -from dotenv import load_dotenv from os import getenv -load_dotenv() +def inicializar_firebase() -> App: + """ + Inicializa la aplicación de Firebase utilizando las credenciales proporcionadas en un archivo JSON. -ALT_PATH = Path(getenv("FIREBASE_ADMIN_CREDS_PATH", "")) + Returns: + firebase_admin.App: Una instancia de la aplicación de Firebase inicializada. + """ + ALT_PATH = Path(getenv("FIREBASE_ADMIN_CREDS_PATH", "")) -BASE_DIR = Path(__file__).resolve().parent.parent -PATH = join(BASE_DIR, "firebase_token.json") if (ALT_PATH != "" and (not exists(ALT_PATH))) else ALT_PATH + BASE_DIR = Path(__file__).resolve().parent.parent + PATH = join(BASE_DIR, "firebase_token.json") if (ALT_PATH != "" and (not exists(ALT_PATH))) else ALT_PATH -# Inicialización del servicio de Firebase -cred = firebase_admin.credentials.Certificate(PATH) -firebase_app = firebase_admin.initialize_app(cred) \ No newline at end of file + # Inicialización del servicio de Firebase + cred = Certificate(PATH) + return initialize_app(cred) \ No newline at end of file diff --git a/app/main.py b/app/main.py index 9309328..4dc73d6 100644 --- a/app/main.py +++ b/app/main.py @@ -1,16 +1,63 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware -from fastapi import Request, Response, Header +from fastapi import Request, Response +from dotenv import load_dotenv from routers.main_router import router as main_router from routers.usuarios_router import router as usuarios_router from apis.FirebaseAuth import verificar_token -from constants import CORS_ORIGINS, ALLOWED_HOSTS, ACTIVAR_DOCS, ORIGENES_AUTORIZADOS +from constants import ( + CORS_ORIGINS, + ALLOWED_HOSTS, + ACTIVAR_DOCS, + ORIGENES_AUTORIZADOS, + CREDS_FIREBASE_CLIENTE, +) from utils.Validadores import validar_origen from utils.Diccionario import ver_si_existe_clave -from firebase_admin_config import firebase_app +from contextlib import asynccontextmanager +from pathlib import Path +from dill import load as dload +from json import load as jload +from onnxruntime import InferenceSession +from firebase_admin_config import inicializar_firebase + +load_dotenv() + + +@asynccontextmanager +async def inicializar_modelos(app: FastAPI): + PATH_BASE = Path(__file__).resolve().parent + FIREBASE_APP = inicializar_firebase() + + with open(f"{PATH_BASE}/bin/explicador.pkl", "rb") as archivo: + EXPLAINER = dload(archivo) + + with open(f"{PATH_BASE}/bin/textos.json") as archivo: + TEXTOS = jload(archivo) + + MODELO = InferenceSession( + f"{PATH_BASE}/bin/modelo_red_neuronal.onnx", + providers=["CPUExecutionProvider"], + ) + + yield { + "explicador": EXPLAINER, + "textos": TEXTOS, + "modelo": MODELO, + "firebase_app": FIREBASE_APP, + "credenciales": CREDS_FIREBASE_CLIENTE, + } -app = FastAPI(docs_url=None if not ACTIVAR_DOCS else "/docs", redoc_url=None if not ACTIVAR_DOCS else "/redoc") + FIREBASE_APP._cleanup() + del EXPLAINER, TEXTOS, MODELO, FIREBASE_APP, CREDS_FIREBASE_CLIENTE + + +app = FastAPI( + lifespan=inicializar_modelos, + docs_url=None if not ACTIVAR_DOCS else "/docs", + redoc_url=None if not ACTIVAR_DOCS else "/redoc", +) app.include_router(main_router) app.include_router(usuarios_router, prefix="/admin") @@ -25,10 +72,8 @@ ) # Middleware que no permite peticiones de hosts no autorizados -app.add_middleware( - TrustedHostMiddleware, - allowed_hosts=ALLOWED_HOSTS -) +app.add_middleware(TrustedHostMiddleware, allowed_hosts=ALLOWED_HOSTS) + @app.middleware("http") async def verificar_origen_autorizado(peticion: Request, call_next) -> Response: @@ -49,6 +94,7 @@ async def verificar_origen_autorizado(peticion: Request, call_next) -> Response: else: return await call_next(peticion) + @app.middleware("http") async def verificar_credenciales(peticion: Request, call_next) -> Response: """ @@ -59,11 +105,22 @@ async def verificar_credenciales(peticion: Request, call_next) -> Response: """ RUTAS_NO_PROTEGIDAS = ("/recaptcha",) METODOS_RESTRINGIDOS = ("POST",) + firebase_app = peticion.app.state.firebase_app - token = peticion.headers["authorization"] if ver_si_existe_clave(peticion.headers, "authorization") else "" - idioma = peticion.headers["language"] if ver_si_existe_clave(peticion.headers, "language") else "es" + token = ( + peticion.headers["authorization"] + if ver_si_existe_clave(peticion.headers, "authorization") + else "" + ) + idioma = ( + peticion.headers["language"] + if ver_si_existe_clave(peticion.headers, "language") + else "es" + ) - if peticion.method in METODOS_RESTRINGIDOS and (peticion.url.path not in RUTAS_NO_PROTEGIDAS): + if peticion.method in METODOS_RESTRINGIDOS and ( + peticion.url.path not in RUTAS_NO_PROTEGIDAS + ): return await verificar_token(peticion, firebase_app, call_next, token, idioma) else: - return await call_next(peticion) \ No newline at end of file + return await call_next(peticion) diff --git a/app/models/Diagnostico.py b/app/models/Diagnostico.py index 6902afb..c5b7e63 100644 --- a/app/models/Diagnostico.py +++ b/app/models/Diagnostico.py @@ -1,8 +1,9 @@ from onnxruntime import InferenceSession +from lime.lime_tabular import LimeTabularExplainer from pathlib import Path from numpy import ndarray, zeros, float32, array from utils.Preprocesamiento import preprocesar_instancia -from constants import EXPLAINER +#from constants import EXPLAINER class Diagnostico: @@ -10,8 +11,10 @@ class Diagnostico: Clase que representa una instancia de diagnóstico usando el modelo de red neuronal en ONNX. """ - def __init__(self, datos: dict): + def __init__(self, datos: dict, modelo: InferenceSession, explicador: LimeTabularExplainer): self.datos = datos + self.modelo = modelo + self.explicador = explicador self.BASE_PATH = Path(__file__).resolve().parent.parent def obtener_array_datos(self) -> ndarray: @@ -74,22 +77,21 @@ def convertir_a_diccionario(self, array_datos: ndarray) -> dict: return claves def obtener_probabilidades_predicciones( - self, instancias: ndarray, sesion: InferenceSession + self, instancias: ndarray ) -> ndarray: """ Hace la clasificación de varias instancias empleando el modelo Args: instancias (ndarray): Las instancias a clasificar. - sesion (InferenceSession): La sesión de inferencia de ONNX. Returns: ndarray: Las probabilidades de pertenecer a una clase u otra según el modelo. """ - input_name = [i.name for i in sesion.get_inputs()] + input_name = [i.name for i in self.modelo.get_inputs()] instancias = self.convertir_a_diccionario(instancias) instancias = preprocesar_instancia(instancias) - RES = sesion.run(None, {i: array(instancias[i], dtype=float32).reshape(-1, 1) for i in input_name}) + RES = self.modelo.run(None, {i: array(instancias[i], dtype=float32).reshape(-1, 1) for i in input_name}) ARRAY = zeros((len(instancias["Edad"]), 2), dtype=float32) for i in range(len(instancias["Edad"])): @@ -97,16 +99,13 @@ def obtener_probabilidades_predicciones( return ARRAY - def generar_explicacion(self, sesion): + def generar_explicacion(self): """ Genera una explicación para la predicción de la instancia usando LIME (5000 muestras y máximo 10 atributos). - - Args: - sesion (InferenceSession): La sesión de inferencia de ONNX. """ - explicacion = EXPLAINER.explain_instance( + explicacion = self.explicador.explain_instance( self.obtener_array_datos(), - lambda x: self.obtener_probabilidades_predicciones(x, sesion), + lambda x: self.obtener_probabilidades_predicciones(x), num_features=10, num_samples=2000 ) SALIDA = [] @@ -127,14 +126,10 @@ async def generar_diagnostico(self): Genera el diagnóstico de los datos usando el modelo ONNX para normalizarlos y luego clasificarlos """ - sesion = InferenceSession( - f"{self.BASE_PATH}/bin/modelo_red_neuronal.onnx", - providers=["CPUExecutionProvider"], - ) - input_name = [i.name for i in sesion.get_inputs()] + input_name = [i.name for i in self.modelo.get_inputs()] preprocesados = preprocesar_instancia(self.datos) - pred = sesion.run(None, {i: array(preprocesados[i], dtype=float32).reshape(-1, 1) for i in input_name}) - self.generar_explicacion(sesion) + pred = await self.modelo.run_async(None, {i: array(preprocesados[i], dtype=float32).reshape(-1, 1) for i in input_name}) + self.generar_explicacion() RES = pred[0][0] return { From 5feda3d7d5ca03f402f8e3f13e152e595aee1c65 Mon Sep 17 00:00:00 2001 From: Cristian O <84862634+Criser2013@users.noreply.github.com> Date: Sun, 15 Feb 2026 19:58:47 +0000 Subject: [PATCH 2/3] =?UTF-8?q?Inicializaci=C3=B3n=20de=20credenciales,=20?= =?UTF-8?q?Firebase=20y=20los=20modelos=20implementada.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/apis/FirebaseAuth.py | 43 ++++++++++++----------- app/apis/Recaptcha.py | 13 +++---- app/constants.py | 13 +------ app/dependencies/usuarios_dependencies.py | 11 +++--- app/main.py | 30 +++++++++++++--- app/models/Diagnostico.py | 3 +- app/routers/main_router.py | 19 ++++++---- app/routers/usuarios_router.py | 20 +++++++---- 8 files changed, 89 insertions(+), 63 deletions(-) diff --git a/app/apis/FirebaseAuth.py b/app/apis/FirebaseAuth.py index dc61dd7..2ec3f91 100644 --- a/app/apis/FirebaseAuth.py +++ b/app/apis/FirebaseAuth.py @@ -6,7 +6,6 @@ from utils.Fechas import convertir_datetime_str from utils.Diccionario import ver_si_existe_clave from apis.Firestore import obtener_roles_usuarios, obtener_rol_usuario -from constants import TEXTOS import asyncio @@ -34,7 +33,7 @@ def validar_token( async def verificar_token( - peticion: Request, firebase_app, call_next, authorization: str, idioma="es" + peticion: Request, firebase_app, call_next, authorization: str, textos: dict[str, str], idioma: str ="es" ) -> JSONResponse: """ Verifica el token de Firebase en la solicitud. @@ -58,31 +57,32 @@ async def verificar_token( return await call_next(peticion) case 0: return JSONResponse( - {"error": TEXTOS[idioma]["errTokenInvalido"]}, + {"error": textos[idioma]["errTokenInvalido"]}, status_code=403, media_type="application/json", ) case _: return JSONResponse( - {"error": TEXTOS[idioma]["errValidarToken"]}, + {"error": textos[idioma]["errValidarToken"]}, status_code=400, media_type="application/json", ) except Exception as e: return JSONResponse( - {"error": f"{TEXTOS[idioma]['errTry']} {e}"}, + {"error": f"{textos[idioma]['errTry']} {e}"}, status_code=500, media_type="application/json", ) -def ver_datos_token(token: str, firebase_app, idioma: str) -> tuple[int, dict]: +def ver_datos_token(token: str, firebase_app, idioma: str, textos: dict[str, str]) -> tuple[int, dict]: """ Obtiene los datos del token de Firebase. Args: token (str): El token de autorización de la solicitud. firebase_app: La instancia de la aplicación Firebase. idioma (str): El idioma para los mensajes de error. + textos (dict[str, str]): El diccionario de textos para los mensajes de error. Returns: tuple: (True, datos) si el token es válido, (False, error) si hay un error. """ @@ -91,7 +91,7 @@ def ver_datos_token(token: str, firebase_app, idioma: str) -> tuple[int, dict]: reg_validacion = validar_txt_token(token) if not reg_validacion: - return (0, {"error": f"{TEXTOS[idioma]['errTokenInvalido']}"}) + return (0, {"error": f"{textos[idioma]['errTokenInvalido']}"}) res_validacion = validar_token(token, firebase_app, True) @@ -99,20 +99,21 @@ def ver_datos_token(token: str, firebase_app, idioma: str) -> tuple[int, dict]: case 1: return res_validacion case 0: - return (0, {"error": f"{TEXTOS[idioma]['errTokenInvalido']}"}) + return (0, {"error": f"{textos[idioma]['errTokenInvalido']}"}) case _: - return (-1, {"error": f"{TEXTOS[idioma]['errValidarToken']}"}) + return (-1, {"error": f"{textos[idioma]['errValidarToken']}"}) except Exception as e: - return (-1, {"error": f"{TEXTOS[idioma]['errProcesarToken']}: {e}."}) + return (-1, {"error": f"{textos[idioma]['errProcesarToken']}: {e}."}) -async def ver_datos_usuarios(firebase_app, idioma: str) -> JSONResponse: +async def ver_datos_usuarios(firebase_app, idioma: str, textos: dict[str, str]) -> JSONResponse: """ Obtiene los datos de los usuarios registrados en Firebase. Args: firebase_app: La instancia de la aplicación Firebase. idioma (str): El idioma para los mensajes de error. + textos (dict[str, str]): El diccionario de textos para los mensajes de error. Returns: JSONResponse: Los datos de los usuarios, o un error si ocurre un problema. """ @@ -155,19 +156,20 @@ async def ver_datos_usuarios(firebase_app, idioma: str) -> JSONResponse: ) except Exception as e: return JSONResponse( - {"error": f"{TEXTOS[idioma]['errObtenerDatosUsuarios']}: {e}"}, + {"error": f"{textos[idioma]['errObtenerDatosUsuarios']}: {e}"}, status_code=400, media_type="application/json", ) -async def ver_datos_usuario(firebase_app, uid: str, idioma: str) -> JSONResponse: +async def ver_datos_usuario(firebase_app, uid: str, idioma: str, textos: dict[str, str]) -> JSONResponse: """ Obtiene los datos de un usuario específico usando el UID. Args: firebase_app: La instancia de la aplicación Firebase. uid (str): El UID del usuario a buscar. idioma (str): El idioma para los mensajes de error. + textos (dict[str, str]): El diccionario de textos para los mensajes de error. Returns: JSONResponse: Los datos del usuario, o un error si ocurre un problema. """ @@ -177,7 +179,7 @@ async def ver_datos_usuario(firebase_app, uid: str, idioma: str) -> JSONResponse ROL = await roles_task if ROL == -1: - raise UserNotFoundError(f"{TEXTOS[idioma]['errUsuarioNoEncontrado']}") + raise UserNotFoundError(f"{textos[idioma]['errUsuarioNoEncontrado']}") RES = { "correo": usuario.email, @@ -200,13 +202,13 @@ async def ver_datos_usuario(firebase_app, uid: str, idioma: str) -> JSONResponse ) except UserNotFoundError: return JSONResponse( - {"error": f"{TEXTOS[idioma]['errUsuarioNoEncontrado']}"}, + {"error": f"{textos[idioma]['errUsuarioNoEncontrado']}"}, status_code=404, media_type="application/json", ) except Exception as e: return JSONResponse( - {"error": f"{TEXTOS[idioma]['errObtenerDatosUsuarios']}: {e}"}, + {"error": f"{textos[idioma]['errObtenerDatosUsuarios']}: {e}"}, status_code=400, media_type="application/json", ) @@ -230,7 +232,7 @@ def ver_usuario_firebase(firebase_app, uid: str) -> tuple[int, UserRecord | None def actualizar_estado_usuario( - firebase_app, uid: str, estado: bool, idioma: str + firebase_app, uid: str, estado: bool, idioma: str, textos: dict[str, str] ) -> JSONResponse: """ Actualiza el estado (activado/desactivado) de un usuario específico. @@ -239,6 +241,7 @@ def actualizar_estado_usuario( uid (str): El UID del usuario a actualizar. estado (bool): El nuevo estado del usuario (True para desactivado, False para activado). idioma (str): El idioma para los mensajes de error. + textos (dict[str, str]): El diccionario de textos para los mensajes de error. Returns: JSONResponse | Response: Mensaje de éxito o error. """ @@ -246,19 +249,19 @@ def actualizar_estado_usuario( firebase_admin.auth.update_user(uid=uid, disabled=estado, app=firebase_app) return JSONResponse( - {"mensaje": f"{TEXTOS[idioma]['msgUsuarioActualizado']}"}, + {"mensaje": f"{textos[idioma]['msgUsuarioActualizado']}"}, status_code=200, media_type="application/json", ) except ValueError: return JSONResponse( - {"error": f"{TEXTOS[idioma]['errEstadoInvalido']}"}, + {"error": f"{textos[idioma]['errEstadoInvalido']}"}, status_code=401, media_type="application/json", ) except Exception as e: return JSONResponse( - {"error": f"{TEXTOS[idioma]['errTry']} {str(e)}"}, + {"error": f"{textos[idioma]['errTry']} {str(e)}"}, status_code=500, media_type="application/json", ) diff --git a/app/apis/Recaptcha.py b/app/apis/Recaptcha.py index 24f028e..585619d 100644 --- a/app/apis/Recaptcha.py +++ b/app/apis/Recaptcha.py @@ -1,35 +1,36 @@ from constants import RECAPTCHA_SECRET, RECAPTCHA_API_URL from requests import post -from constants import TEXTOS -def manejador_errores(error: str, idioma: str) -> str: +def manejador_errores(error: str, idioma: str, textos: dict[str, str]) -> str: """ Maneja los errores devueltos por la API de reCAPTCHA. Args: error (str): El mensaje de error devuelto por la API de reCAPTCHA. idioma (str): El idioma para los mensajes de error. + textos (dict[str, str]): El diccionario de textos para los mensajes de error. Returns: str: Un mensaje de error amigable para el usuario. """ match error: case "invalid-input-response": - return f"{TEXTOS[idioma]['errCaptchaTokenErroneo']}" + return f"{textos[idioma]['errCaptchaTokenErroneo']}" case "timeout-or-duplicate": - return f"{TEXTOS[idioma]['errCaptchaTokenInvalido']}" + return f"{textos[idioma]['errCaptchaTokenInvalido']}" case _: return error -def verificar_peticion_recaptcha(token: str, idioma: str) -> dict: +def verificar_peticion_recaptcha(token: str, idioma: str, textos: dict[str, str]) -> dict: """ Envía una petición al API de ReCAPTCHA para verificar que el token es válido. Args: token (str): El token de ReCAPTCHA a verificar. idioma (str): El idioma para los mensajes de error. + textos (dict[str, str]): El diccionario de textos para los mensajes de error. Returns: dict: La respuesta del API de ReCAPTCHA. @@ -44,6 +45,6 @@ def verificar_peticion_recaptcha(token: str, idioma: str) -> dict: for key, value in res.items(): if key == "error-codes": - res[key] = [manejador_errores(i, idioma) for i in value] + res[key] = [manejador_errores(i, idioma, textos) for i in value] return res diff --git a/app/constants.py b/app/constants.py index 505df26..b7744af 100644 --- a/app/constants.py +++ b/app/constants.py @@ -7,15 +7,4 @@ ACTIVAR_DOCS = getenv("ACTIVAR_DOCS", "false").lower() == "true" ALLOWED_HOSTS = obtener_lista_dominios(getenv("ALLOWED_HOSTS", "localhost,")) RECAPTCHA_SECRET = getenv("CAPTCHA_SECRET") -RECAPTCHA_API_URL = getenv("API_RECAPTCHA_URL", "https://www.google.com/recaptcha/api/siteverify") -CREDS_FIREBASE_CLIENTE = { - "apiKey": getenv("CLIENTE_FIREBASE_API_KEY"), - "authDomain": getenv("CLIENTE_FIREBASE_AUTH_DOMAIN"), - "projectId": getenv("CLIENTE_FIREBASE_PROJECT_ID"), - "storageBucket": getenv("CLIENTE_FIREBASE_STORAGE_BUCKET"), - "messagingSenderId": getenv("CLIENTE_FIREBASE_MESSAGING_SENDER_ID"), - "appId": getenv("CLIENTE_FIREBASE_APP_ID"), - "measurementId": getenv("CLIENTE_FIREBASE_MEASUREMENT_ID"), - "driveScopes": obtener_lista_dominios(getenv("CLIENTE_DRIVE_SCOPES","")), - "reCAPTCHA": getenv("CLIENTE_CAPTCHA"), -} \ No newline at end of file +RECAPTCHA_API_URL = getenv("API_RECAPTCHA_URL", "https://www.google.com/recaptcha/api/siteverify") \ No newline at end of file diff --git a/app/dependencies/usuarios_dependencies.py b/app/dependencies/usuarios_dependencies.py index 25b71e9..3246590 100644 --- a/app/dependencies/usuarios_dependencies.py +++ b/app/dependencies/usuarios_dependencies.py @@ -1,13 +1,11 @@ -from firebase_admin_config import firebase_app from apis.FirebaseAuth import ver_datos_token from apis.Firestore import verificar_rol_usuario -from fastapi import Header +from fastapi import Header, Request from fastapi.responses import JSONResponse -#from utils.Diccionario import ver_si_existe_clave -from constants import TEXTOS async def verificar_usuario_administrador( + peticion: Request, language: str | None = Header(default="es"), authorization: str | None = Header(default=""), ) -> tuple[bool, JSONResponse | None]: @@ -15,15 +13,18 @@ async def verificar_usuario_administrador( Verifica si el usuario está autenticado y es administrador antes de permitir el acceso a las rutas protegidas. Args: + peticion (Request): La solicitud HTTP entrante. language (str | None): El idioma de la solicitud HTTP. authorization (str | None): El token de autorización de Firebase. Returns: tuple: Un tuple que contiene un booleano indicando si el usuario está autenticado y es administrador, y en caso de error, un JSONResponse con el mensaje de error y el código de estado correspondiente. """ + firebase_app = peticion.state.firebase_app + TEXTOS = peticion.state.textos idioma = language if language in ("es", "en") else "es" try: - RES, DATOS = ver_datos_token(authorization, firebase_app, idioma) + RES, DATOS = ver_datos_token(authorization, firebase_app, idioma, TEXTOS) if RES in (-1, 0): return False, JSONResponse( diff --git a/app/main.py b/app/main.py index 4dc73d6..40d25d2 100644 --- a/app/main.py +++ b/app/main.py @@ -3,15 +3,16 @@ from fastapi.middleware.trustedhost import TrustedHostMiddleware from fastapi import Request, Response from dotenv import load_dotenv +from os import getenv from routers.main_router import router as main_router +from utils.Dominios import obtener_lista_dominios from routers.usuarios_router import router as usuarios_router from apis.FirebaseAuth import verificar_token from constants import ( CORS_ORIGINS, ALLOWED_HOSTS, ACTIVAR_DOCS, - ORIGENES_AUTORIZADOS, - CREDS_FIREBASE_CLIENTE, + ORIGENES_AUTORIZADOS ) from utils.Validadores import validar_origen from utils.Diccionario import ver_si_existe_clave @@ -25,11 +26,25 @@ load_dotenv() +# Inicialización de los modelos y recursos necesarios para la aplicación @asynccontextmanager async def inicializar_modelos(app: FastAPI): + # Esto se ejecuta al iniciar el backend PATH_BASE = Path(__file__).resolve().parent FIREBASE_APP = inicializar_firebase() + CREDS_FIREBASE_CLIENTE = { + "apiKey": getenv("CLIENTE_FIREBASE_API_KEY"), + "authDomain": getenv("CLIENTE_FIREBASE_AUTH_DOMAIN"), + "projectId": getenv("CLIENTE_FIREBASE_PROJECT_ID"), + "storageBucket": getenv("CLIENTE_FIREBASE_STORAGE_BUCKET"), + "messagingSenderId": getenv("CLIENTE_FIREBASE_MESSAGING_SENDER_ID"), + "appId": getenv("CLIENTE_FIREBASE_APP_ID"), + "measurementId": getenv("CLIENTE_FIREBASE_MEASUREMENT_ID"), + "driveScopes": obtener_lista_dominios(getenv("CLIENTE_DRIVE_SCOPES","")), + "reCAPTCHA": getenv("CLIENTE_CAPTCHA"), +} + with open(f"{PATH_BASE}/bin/explicador.pkl", "rb") as archivo: EXPLAINER = dload(archivo) @@ -49,16 +64,19 @@ async def inicializar_modelos(app: FastAPI): "credenciales": CREDS_FIREBASE_CLIENTE, } + # Esto se ejecuta después de cerrar el backend + FIREBASE_APP._cleanup() del EXPLAINER, TEXTOS, MODELO, FIREBASE_APP, CREDS_FIREBASE_CLIENTE - +# Inicialización de la aplicación FastAPI app = FastAPI( lifespan=inicializar_modelos, docs_url=None if not ACTIVAR_DOCS else "/docs", redoc_url=None if not ACTIVAR_DOCS else "/redoc", ) +# Añadiendo los enrutadores app.include_router(main_router) app.include_router(usuarios_router, prefix="/admin") @@ -75,6 +93,7 @@ async def inicializar_modelos(app: FastAPI): app.add_middleware(TrustedHostMiddleware, allowed_hosts=ALLOWED_HOSTS) +# Middlewares personalizados @app.middleware("http") async def verificar_origen_autorizado(peticion: Request, call_next) -> Response: """ @@ -105,7 +124,8 @@ async def verificar_credenciales(peticion: Request, call_next) -> Response: """ RUTAS_NO_PROTEGIDAS = ("/recaptcha",) METODOS_RESTRINGIDOS = ("POST",) - firebase_app = peticion.app.state.firebase_app + firebase_app = peticion.state.firebase_app + TEXTOS = peticion.state.textos token = ( peticion.headers["authorization"] @@ -121,6 +141,6 @@ async def verificar_credenciales(peticion: Request, call_next) -> Response: if peticion.method in METODOS_RESTRINGIDOS and ( peticion.url.path not in RUTAS_NO_PROTEGIDAS ): - return await verificar_token(peticion, firebase_app, call_next, token, idioma) + return await verificar_token(peticion, firebase_app, call_next, token, TEXTOS, idioma) else: return await call_next(peticion) diff --git a/app/models/Diagnostico.py b/app/models/Diagnostico.py index c5b7e63..7daef58 100644 --- a/app/models/Diagnostico.py +++ b/app/models/Diagnostico.py @@ -3,7 +3,6 @@ from pathlib import Path from numpy import ndarray, zeros, float32, array from utils.Preprocesamiento import preprocesar_instancia -#from constants import EXPLAINER class Diagnostico: @@ -128,7 +127,7 @@ async def generar_diagnostico(self): """ input_name = [i.name for i in self.modelo.get_inputs()] preprocesados = preprocesar_instancia(self.datos) - pred = await self.modelo.run_async(None, {i: array(preprocesados[i], dtype=float32).reshape(-1, 1) for i in input_name}) + pred = self.modelo.run(None, {i: array(preprocesados[i], dtype=float32).reshape(-1, 1) for i in input_name}) self.generar_explicacion() RES = pred[0][0] diff --git a/app/routers/main_router.py b/app/routers/main_router.py index 8f04bb5..1d7d67a 100644 --- a/app/routers/main_router.py +++ b/app/routers/main_router.py @@ -1,9 +1,8 @@ from models.Diagnostico import Diagnostico from models.PeticionDiagnostico import PeticionDiagnostico from models.PeticionRecaptcha import PeticionRecaptcha -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, Request from fastapi.responses import JSONResponse -from constants import CREDS_FIREBASE_CLIENTE, TEXTOS from apis.Recaptcha import verificar_peticion_recaptcha from dependencies.general_dependencies import verificar_idioma @@ -14,8 +13,10 @@ async def healthcheck(): return JSONResponse({"status": "ok"}, status_code=200, media_type="application/json") @router.get("/credenciales") -async def obtener_credenciales(idioma: str = Depends(verificar_idioma)): +async def obtener_credenciales(peticion: Request, idioma: str = Depends(verificar_idioma)): try: + TEXTOS = peticion.state.textos + CREDS_FIREBASE_CLIENTE = peticion.state.credenciales return JSONResponse( CREDS_FIREBASE_CLIENTE, status_code=200, media_type="application/json" ) @@ -28,10 +29,13 @@ async def obtener_credenciales(idioma: str = Depends(verificar_idioma)): @router.post("/diagnosticar") -async def diagnosticar(req: PeticionDiagnostico, idioma: str = Depends(verificar_idioma)) -> JSONResponse: +async def diagnosticar(peticion: Request, req: PeticionDiagnostico, idioma: str = Depends(verificar_idioma)) -> JSONResponse: try: + TEXTOS = peticion.state.textos + MODELO = peticion.state.modelo + EXPLICADOR = peticion.state.explicador DATOS = req.obtener_diccionario_instancia() - DIAGNOSTICO = Diagnostico(DATOS) + DIAGNOSTICO = Diagnostico(DATOS, MODELO, EXPLICADOR) RES = await DIAGNOSTICO.generar_diagnostico() return JSONResponse(RES, status_code=200, media_type="application/json") @@ -43,9 +47,10 @@ async def diagnosticar(req: PeticionDiagnostico, idioma: str = Depends(verificar ) @router.post("/recaptcha") -async def verificar_recaptcha(req: PeticionRecaptcha, idioma: str = Depends(verificar_idioma)) -> JSONResponse: +async def verificar_recaptcha(peticion: Request, req: PeticionRecaptcha, idioma: str = Depends(verificar_idioma)) -> JSONResponse: try: - resultado = verificar_peticion_recaptcha(req.token, idioma) + TEXTOS = peticion.state.textos + resultado = verificar_peticion_recaptcha(req.token, idioma, TEXTOS) return JSONResponse(resultado, status_code=200, media_type="application/json") except Exception as e: return JSONResponse( diff --git a/app/routers/usuarios_router.py b/app/routers/usuarios_router.py index 6c520cb..ee245e3 100644 --- a/app/routers/usuarios_router.py +++ b/app/routers/usuarios_router.py @@ -1,12 +1,10 @@ -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, Request from fastapi.responses import JSONResponse from apis.FirebaseAuth import * -from firebase_admin_config import firebase_app from utils.Validadores import validar_uid from urllib.parse import unquote from dependencies.usuarios_dependencies import verificar_usuario_administrador from dependencies.general_dependencies import verificar_idioma -from constants import TEXTOS router = APIRouter( prefix="/usuarios", dependencies=[Depends(verificar_usuario_administrador), Depends(verificar_idioma)] @@ -15,14 +13,17 @@ @router.get("") async def ver_usuarios( + peticion: Request, res_validacion_auth: tuple[bool, JSONResponse | None] = Depends(verificar_usuario_administrador), idioma: str = Depends(verificar_idioma) ) -> JSONResponse: try: + TEXTOS = peticion.state.textos + firebase_app = peticion.state.firebase_app if not res_validacion_auth[0]: return res_validacion_auth[1] - return await ver_datos_usuarios(firebase_app, idioma) + return await ver_datos_usuarios(firebase_app, idioma, TEXTOS) except Exception as e: return JSONResponse( {"error": f"{TEXTOS[idioma]['errTry']} Error al procesar la solicitud: {str(e)}"}, @@ -33,10 +34,13 @@ async def ver_usuarios( @router.get("/{uid}") async def ver_usuario( + peticion: Request, uid: str, res_validacion_auth: tuple[bool, JSONResponse | None] = Depends(verificar_usuario_administrador), idioma: str = Depends(verificar_idioma) ) -> JSONResponse: try: + TEXTOS = peticion.state.textos + firebase_app = peticion.state.firebase_app if not res_validacion_auth[0]: return res_validacion_auth[1] @@ -46,7 +50,7 @@ async def ver_usuario( if not VALIDACION: raise ValueError(f"{TEXTOS[idioma]['errUIDInvalido']}") - return await ver_datos_usuario(firebase_app, uid, idioma) + return await ver_datos_usuario(firebase_app, uid, idioma, TEXTOS) except ValueError: return JSONResponse( {"error": TEXTOS[idioma]['errUIDInvalido']}, @@ -62,10 +66,14 @@ async def ver_usuario( @router.patch("/{uid}") async def actualizar_usuario( + peticion: Request, uid: str, desactivar: bool, res_validacion_auth: tuple[bool, JSONResponse | None] = Depends(verificar_usuario_administrador), idioma: str = Depends(verificar_idioma) ) -> JSONResponse: try: + TEXTOS = peticion.state.textos + firebase_app = peticion.state.firebase_app + if not res_validacion_auth[0]: return res_validacion_auth[1] @@ -85,7 +93,7 @@ async def actualizar_usuario( elif DATOS[0] == -1: raise Exception(f"{TEXTOS[idioma]['errObtenerUsuario']}") - return actualizar_estado_usuario(firebase_app, uid, desactivar, idioma) + return actualizar_estado_usuario(firebase_app, uid, desactivar, idioma, TEXTOS) except ValueError: return JSONResponse( {"error": f"{TEXTOS[idioma]['errUIDInvalido']}"}, From 9b595d145d27e38120035b0d3a777b357ed68524 Mon Sep 17 00:00:00 2001 From: Cristian O <84862634+Criser2013@users.noreply.github.com> Date: Mon, 16 Feb 2026 21:44:09 +0000 Subject: [PATCH 3/3] Scripts de pruebas actualizados. --- app/main.py | 32 +- tests/scripts/api/test_firebaseauth.py | 62 +-- tests/scripts/api/test_main.py | 147 +++--- tests/scripts/api/test_recaptcha.py | 15 +- .../dependencies/test_usuarios_deps.py | 2 + tests/scripts/models/test_diagnostico.py | 37 +- tests/scripts/routers/test_main_router.py | 140 +++-- tests/scripts/routers/test_usuarios_router.py | 477 ++++++++++++------ 8 files changed, 560 insertions(+), 352 deletions(-) diff --git a/app/main.py b/app/main.py index 40d25d2..0e2d0d3 100644 --- a/app/main.py +++ b/app/main.py @@ -8,12 +8,7 @@ from utils.Dominios import obtener_lista_dominios from routers.usuarios_router import router as usuarios_router from apis.FirebaseAuth import verificar_token -from constants import ( - CORS_ORIGINS, - ALLOWED_HOSTS, - ACTIVAR_DOCS, - ORIGENES_AUTORIZADOS -) +from constants import CORS_ORIGINS, ALLOWED_HOSTS, ACTIVAR_DOCS, ORIGENES_AUTORIZADOS from utils.Validadores import validar_origen from utils.Diccionario import ver_si_existe_clave from contextlib import asynccontextmanager @@ -34,16 +29,16 @@ async def inicializar_modelos(app: FastAPI): FIREBASE_APP = inicializar_firebase() CREDS_FIREBASE_CLIENTE = { - "apiKey": getenv("CLIENTE_FIREBASE_API_KEY"), - "authDomain": getenv("CLIENTE_FIREBASE_AUTH_DOMAIN"), - "projectId": getenv("CLIENTE_FIREBASE_PROJECT_ID"), - "storageBucket": getenv("CLIENTE_FIREBASE_STORAGE_BUCKET"), - "messagingSenderId": getenv("CLIENTE_FIREBASE_MESSAGING_SENDER_ID"), - "appId": getenv("CLIENTE_FIREBASE_APP_ID"), - "measurementId": getenv("CLIENTE_FIREBASE_MEASUREMENT_ID"), - "driveScopes": obtener_lista_dominios(getenv("CLIENTE_DRIVE_SCOPES","")), - "reCAPTCHA": getenv("CLIENTE_CAPTCHA"), -} + "apiKey": getenv("CLIENTE_FIREBASE_API_KEY"), + "authDomain": getenv("CLIENTE_FIREBASE_AUTH_DOMAIN"), + "projectId": getenv("CLIENTE_FIREBASE_PROJECT_ID"), + "storageBucket": getenv("CLIENTE_FIREBASE_STORAGE_BUCKET"), + "messagingSenderId": getenv("CLIENTE_FIREBASE_MESSAGING_SENDER_ID"), + "appId": getenv("CLIENTE_FIREBASE_APP_ID"), + "measurementId": getenv("CLIENTE_FIREBASE_MEASUREMENT_ID"), + "driveScopes": obtener_lista_dominios(getenv("CLIENTE_DRIVE_SCOPES", "")), + "reCAPTCHA": getenv("CLIENTE_CAPTCHA"), + } with open(f"{PATH_BASE}/bin/explicador.pkl", "rb") as archivo: EXPLAINER = dload(archivo) @@ -69,6 +64,7 @@ async def inicializar_modelos(app: FastAPI): FIREBASE_APP._cleanup() del EXPLAINER, TEXTOS, MODELO, FIREBASE_APP, CREDS_FIREBASE_CLIENTE + # Inicialización de la aplicación FastAPI app = FastAPI( lifespan=inicializar_modelos, @@ -141,6 +137,8 @@ async def verificar_credenciales(peticion: Request, call_next) -> Response: if peticion.method in METODOS_RESTRINGIDOS and ( peticion.url.path not in RUTAS_NO_PROTEGIDAS ): - return await verificar_token(peticion, firebase_app, call_next, token, TEXTOS, idioma) + return await verificar_token( + peticion, firebase_app, call_next, token, TEXTOS, idioma + ) else: return await call_next(peticion) diff --git a/tests/scripts/api/test_firebaseauth.py b/tests/scripts/api/test_firebaseauth.py index 6321f0b..681ed66 100644 --- a/tests/scripts/api/test_firebaseauth.py +++ b/tests/scripts/api/test_firebaseauth.py @@ -4,30 +4,10 @@ from app.apis.FirebaseAuth import * from firebase_admin.auth import ExpiredIdTokenError, CertificateFetchError, ListUsersPage, ExportedUserRecord, UserMetadata -TEST_CREDS = { - "apiKey": "test_api_key", - "authDomain": "test_auth_domain", - "projectId": "test_project_id", - "storageBucket": "test_storage_bucket", - "messagingSenderId": "test_messaging_sender_id", - "appId": "test_app_id", - "measurementId": "test_measurement_id", - "driveScopes": [ - "https://www.googleapis.com/auth/drive", - ], - } - @pytest.fixture(autouse=True) def setup_module(mocker: MockerFixture): - MOCK_APP = { - "appId": "test_app_id", - "cred": {"projectId": "test_project_id", "certificated": True}, - } - - mocker.patch("app.main.firebase_app", MOCK_APP) mocker.patch("app.main.CORS_ORIGINS", ["http://localhost:5178",]) mocker.patch("app.main.ALLOWED_HOSTS", ["localhost",], ) - mocker.patch("app.routers.main_router.CREDS_FIREBASE_CLIENTE", TEST_CREDS) yield mocker.resetall() @@ -38,10 +18,11 @@ async def test_11(mocker: MockerFixture): es inválido """ REQ = mocker.MagicMock(spec=Request) + TEXTOS = {"es": {"errTokenInvalido": "Token inválido"}} VALIDADOR = mocker.patch("app.apis.FirebaseAuth.validar_txt_token", return_value=False) - RES = await verificar_token(REQ, "firebase_app", None, "Bearer token_invalido", "es") + RES = await verificar_token(REQ, "firebase_app", None, "Bearer token_invalido", TEXTOS, "es") assert RES.status_code == 403 assert RES.body.decode("utf-8") == '{"error":"Token inválido"}' @@ -55,11 +36,12 @@ async def test_12(mocker: MockerFixture): una excepción al procesar la solicitud """ REQ = mocker.MagicMock(spec=Request) + TEXTOS = {"es": {"errValidarToken": "Error al validar el token"}} VALIDADOR = mocker.patch("app.apis.FirebaseAuth.validar_txt_token", return_value=True) FIREBASE_VAL = mocker.patch("app.apis.FirebaseAuth.validar_token", return_value=-1) - RES = await verificar_token(REQ, "firebase_app", None, "Bearer token_invalido", "es") + RES = await verificar_token(REQ, "firebase_app", None, "Bearer token_invalido", TEXTOS, "es") assert RES.status_code == 400 assert RES.body.decode("utf-8") == '{"error":"Error al validar el token"}' @@ -74,12 +56,13 @@ async def test_13(mocker: MockerFixture): errores inesperados al validar el token de Firebase. """ REQ = mocker.MagicMock(spec=Request) + TEXTOS = {"es": {"errTry": "Error al procesar la solicitud:"}} VALIDADOR = mocker.patch("app.apis.FirebaseAuth.validar_txt_token", return_value=True) FIREBASE_VAL = mocker.patch("app.apis.FirebaseAuth.validar_token") FIREBASE_VAL.side_effect = Exception("Excepción imprevista") - RES = await verificar_token(REQ, "firebase_app", None, "Bearer token_invalido", "es") + RES = await verificar_token(REQ, "firebase_app", None, "Bearer token_invalido", TEXTOS, "es") assert RES.status_code == 500 assert RES.body.decode("utf-8") == '{"error":"Error al procesar la solicitud: Excepción imprevista"}' @@ -136,10 +119,11 @@ def test_23(mocker: MockerFixture): Test para validar que la función "ver_datos_token" retorne los datos del token cuando este es válido """ + TEXTOS = {"es": {"errTry": "Error al procesar la solicitud:"}} VALIDADOR = mocker.patch("app.apis.FirebaseAuth.validar_txt_token", return_value=True) TOKEN = mocker.patch("app.apis.FirebaseAuth.validar_token", return_value=(1, {"uid": "a1234H"})) - RES = ver_datos_token("Bearer token_valido", "firebase_app", "es") + RES = ver_datos_token("Bearer token_valido", "firebase_app", "es", TEXTOS) assert RES == (1, {"uid": "a1234H"}) VALIDADOR.assert_called_once_with("token_valido") @@ -149,10 +133,11 @@ def test_24(mocker: MockerFixture): """ Test para validar que la función "ver_datos_token" cuando se provee un token inválido. """ + TEXTOS = {"es": {"errTokenInvalido": "Token inválido"}} VALIDADOR = mocker.patch("app.apis.FirebaseAuth.validar_txt_token", return_value=False) TOKEN = mocker.patch("apis.FirebaseAuth.validar_token") - RES = ver_datos_token("Bearer token_invalido", "firebase_app", "es") + RES = ver_datos_token("Bearer token_invalido", "firebase_app", "es", TEXTOS) assert RES == (0, {"error": "Token inválido"}) VALIDADOR.assert_called_once_with("token_invalido") @@ -163,10 +148,11 @@ def test_25(mocker: MockerFixture): Test para validar que la función "ver_datos_token" maneje correctamente las excepciones. """ + TEXTOS = {"es": {"errProcesarToken": "Error al procesar el token"}} VALIDADOR = mocker.patch("app.apis.FirebaseAuth.validar_txt_token") VALIDADOR.side_effect = Exception("Error inesperado") - RES = ver_datos_token("Bearer token_invalido", "firebase_app", "es") + RES = ver_datos_token("Bearer token_invalido", "firebase_app", "es", TEXTOS) assert RES == (-1, {"error": "Error al procesar el token: Error inesperado."}) VALIDADOR.assert_called_once_with("token_invalido") @@ -196,7 +182,7 @@ async def test_26(mocker: MockerFixture): FIREBASE = mocker.patch("firebase_admin.auth.list_users", return_value=LISTA) - RES = await ver_datos_usuarios("firebase_app", "es") + RES = await ver_datos_usuarios("firebase_app", "es", {}) assert RES.status_code == 200 assert RES.body.decode("utf-8") == '{"usuarios":[{"correo":"usuario@correo.com","uid":"12345","nombre":"usuario","rol":0,"estado":true,"fecha_registro":"26/07/2025 11:56 AM","ultima_conexion":"26/07/2025 11:56 AM"}]}' @@ -235,7 +221,7 @@ async def test_27(mocker: MockerFixture): FIREBASE = mocker.patch("firebase_admin.auth.list_users", return_value=LISTA) - RES = await ver_datos_usuarios("firebase_app", "es") + RES = await ver_datos_usuarios("firebase_app", "es", {}) assert RES.status_code == 200 assert RES.body.decode("utf-8") == '{"usuarios":[{"correo":"usuario@correo.com","uid":"12345","nombre":"usuario","rol":0,"estado":true,"fecha_registro":"26/07/2025 11:56 AM","ultima_conexion":"26/07/2025 11:56 AM"},{"correo":"usuario@correo.com","uid":"12345","nombre":"usuario","rol":0,"estado":true,"fecha_registro":"26/07/2025 11:56 AM","ultima_conexion":"26/07/2025 11:56 AM"}]}' @@ -247,10 +233,11 @@ async def test_28(mocker: MockerFixture): """ Test para validar que la función "ver_datos_usuarios" maneje correctamente las excepciones. """ + TEXTOS = {"es": {"errObtenerDatosUsuarios": "Error al obtener los datos de los usuarios"}} FIREBASE = mocker.patch("firebase_admin.auth.list_users") FIREBASE.side_effect = Exception("Error al obtener los usuarios") - RES = await ver_datos_usuarios("firebase_app", "es") + RES = await ver_datos_usuarios("firebase_app", "es", TEXTOS) assert RES.status_code == 400 assert RES.body.decode("utf-8") == '{"error":"Error al obtener los datos de los usuarios: Error al obtener los usuarios"}' @@ -279,7 +266,7 @@ async def test_39(mocker: MockerFixture): FIREBASE = mocker.patch("firebase_admin.auth.get_user", return_value=USUARIO) - RES = await ver_datos_usuario("firebase_app", "12345", "es") + RES = await ver_datos_usuario("firebase_app", "12345", "es", {}) assert RES.status_code == 200 assert RES.body.decode("utf-8") == '{"correo":"usuario@correo.com","uid":"12345","nombre":"usuario","rol":0,"estado":true,"fecha_registro":"26/07/2025 11:56 AM","ultima_conexion":"26/07/2025 11:56 AM"}' @@ -293,12 +280,13 @@ async def test_40(mocker: MockerFixture): Test para validar que la función "ver_datos_usuario" arroje una excepción al no encontrar el usuario. """ + TEXTOS = {"es": {"errUsuarioNoEncontrado": "Usuario no encontrado"}} FIRESTORE = mocker.patch("app.apis.FirebaseAuth.obtener_rol_usuario") FIRESTORE.return_value = -1 FIREBASE = mocker.patch("firebase_admin.auth.get_user", return_value=None) - RES = await ver_datos_usuario("firebase_app", "a1234H", "es") + RES = await ver_datos_usuario("firebase_app", "a1234H", "es", TEXTOS) assert RES.status_code == 404 assert RES.body.decode("utf-8") == '{"error":"Usuario no encontrado"}' @@ -311,12 +299,13 @@ async def test_41(mocker: MockerFixture): """ Test para validar que la función "ver_datos_usuario" maneje correctamente las excepciones. """ + TEXTOS = {"es": {"errObtenerDatosUsuarios": "Error al obtener los datos de los usuarios"}} FIRESTORE = mocker.patch("app.apis.FirebaseAuth.obtener_rol_usuario") FIRESTORE.side_effect = Exception("a1234H") mocker.patch("firebase_admin.auth.get_user", return_value=None) - RES = await ver_datos_usuario("firebase_app", "a1234H", "es") + RES = await ver_datos_usuario("firebase_app", "a1234H", "es", TEXTOS) assert RES.status_code == 400 assert RES.body.decode("utf-8") == '{"error":"Error al obtener los datos de los usuarios: a1234H"}' @@ -371,9 +360,10 @@ def test_50(mocker: MockerFixture): Test para validar que la función "actualizar_estado_usuario" retorne una JSONResponse indicando que el usuario fue actualizado correctamente. """ + TEXTOS = {"es": {"msgUsuarioActualizado": "Estado del usuario actualizado correctamente"}} FIREBASE = mocker.patch("firebase_admin.auth.update_user") - RES = actualizar_estado_usuario("firebase_app", "1234", False, "es") + RES = actualizar_estado_usuario("firebase_app", "1234", False, "es", TEXTOS) assert RES.status_code == 200 assert RES.body.decode("utf-8") == '{"mensaje":"Estado del usuario actualizado correctamente"}' @@ -385,10 +375,11 @@ def test_51(mocker: MockerFixture): Test para validar que la función "actualizar_estado_usuario" retorne un error cuando los valores de actualización son inválidos """ + TEXTOS = {"es": {"errEstadoInvalido": "Estado inválido"}} FIREBASE = mocker.patch("firebase_admin.auth.update_user") FIREBASE.side_effect = ValueError("Estado inválido") - RES = actualizar_estado_usuario("firebase_app", "1234", False, "es") + RES = actualizar_estado_usuario("firebase_app", "1234", False, "es", TEXTOS) assert RES.status_code == 401 assert RES.body.decode("utf-8") == '{"error":"Estado inválido"}' @@ -399,10 +390,11 @@ def test_52(mocker: MockerFixture): """ Test para validar que la función "actualizar_estado_usuario" maneje correctamente las excepciones """ + TEXTOS = {"es": {"errTry": "Error al procesar la solicitud:"}} FIREBASE = mocker.patch("firebase_admin.auth.update_user") FIREBASE.side_effect = Exception("Error inesperado") - RES = actualizar_estado_usuario("firebase_app", "1234", False, "es") + RES = actualizar_estado_usuario("firebase_app", "1234", False, "es", TEXTOS) assert RES.status_code == 500 assert RES.body.decode("utf-8") == '{"error":"Error al procesar la solicitud: Error inesperado"}' diff --git a/tests/scripts/api/test_main.py b/tests/scripts/api/test_main.py index 788347c..ec0cc90 100644 --- a/tests/scripts/api/test_main.py +++ b/tests/scripts/api/test_main.py @@ -1,33 +1,49 @@ import pytest from pytest_mock import MockerFixture from fastapi.testclient import TestClient -import app.main - +from app.main import app +from contextlib import asynccontextmanager +# Constantes de prueba TEST_CREDS = { - "apiKey": "test_api_key", - "authDomain": "test_auth_domain", - "projectId": "test_project_id", - "storageBucket": "test_storage_bucket", - "messagingSenderId": "test_messaging_sender_id", - "appId": "test_app_id", - "measurementId": "test_measurement_id", - "driveScopes": [ - "https://www.googleapis.com/auth/drive", - ], + "apiKey": "test_api_key", + "authDomain": "test_auth_domain", + "projectId": "test_project_id", + "storageBucket": "test_storage_bucket", + "messagingSenderId": "test_messaging_sender_id", + "appId": "test_app_id", + "measurementId": "test_measurement_id", + "driveScopes": [ + "https://www.googleapis.com/auth/drive", + ], +} + +MOCK_FIREBASE_APP = { + "appId": "test_app_id", + "cred": {"projectId": "test_project_id", "certificated": True}, +} + +MOCK_TEXTOS = { + "es": {"errTry": "Error de prueba:"}, + "en": {"errTry": "Test error:"}, +} + + +@asynccontextmanager +async def mock_inicializar_modelos(app): + yield { + "explicador": None, # Mock del explicador + "textos": MOCK_TEXTOS, + "modelo": None, # Mock del modelo + "firebase_app": MOCK_FIREBASE_APP, + "credenciales": TEST_CREDS, } + @pytest.fixture(autouse=True) def setup_module(mocker: MockerFixture): - MOCK_APP = { - "appId": "test_app_id", - "cred": {"projectId": "test_project_id", "certificated": True}, - } - - mocker.patch("app.main.firebase_app", MOCK_APP) mocker.patch("app.main.CORS_ORIGINS", ["http://localhost:5178",]) mocker.patch("app.main.ALLOWED_HOSTS", ["localhost",]) - mocker.patch("routers.main_router.CREDS_FIREBASE_CLIENTE", TEST_CREDS) mocker.patch("app.main.ORIGENES_AUTORIZADOS", ["*"]) yield mocker.resetall() @@ -36,13 +52,12 @@ def test_8(): """ Test para validar que el API rechace las peticiones de hosts no autorizados """ - - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.get( - "/credenciales", - headers={"Origin": "http://localhost:5178", "Host": "google"}, - ) + app.router.lifespan_context = mock_inicializar_modelos + with TestClient(app) as CLIENTE: + RES = CLIENTE.get( + "/credenciales", + headers={"Origin": "http://localhost:5178", "Host": "google"}, + ) assert RES.status_code == 400 assert RES.content.decode() == "Invalid host header" @@ -52,45 +67,41 @@ def test_9(mocker: MockerFixture): Test para validar que el middleware que revisa las credenciales de Firebase deje pasar un token válido con una petición POST proveniente de un host autorizado """ - - CLIENTE = TestClient(app.main.app) - + app.router.lifespan_context = mock_inicializar_modelos VALIDADOR = mocker.patch("apis.FirebaseAuth.validar_txt_token", return_value=True) FIREBASE = mocker.patch("firebase_admin.auth.verify_id_token", return_value=1) - RES = CLIENTE.post( - "/credenciales", - headers={ - "Authorization": "Bearer token_valido", - "Origin": "http://localhost:5178", - "Host": "localhost" - }, - ) + with TestClient(app) as CLIENTE: + RES = CLIENTE.post( + "/credenciales", + headers={ + "Authorization": "Bearer token_valido", + "Origin": "http://localhost:5178", + "Host": "localhost" + }, + ) assert RES.status_code == 405 assert RES.json() == { "detail": "Method Not Allowed" } - + VALIDADOR.assert_called_once_with("token_valido") - FIREBASE.assert_called_once_with("token_valido", { - "appId": "test_app_id", - "cred": {"projectId": "test_project_id", "certificated": True}, - }, check_revoked=True) + FIREBASE.assert_called_once_with("token_valido", MOCK_FIREBASE_APP, check_revoked=True) -def test_10(mocker: MockerFixture): +@pytest.mark.asyncio +async def test_10(mocker: MockerFixture): """ Test para validar que el API retorne correctamente las credenciales de Firebase y no aplique el middleware de verificación de credenciales cuando se hace una petición GET a la ruta /credenciales """ + app.router.lifespan_context = mock_inicializar_modelos + with TestClient(app) as CLIENTE: + VALIDADOR = mocker.patch("apis.FirebaseAuth.verificar_token") - VALIDADOR = mocker.patch("apis.FirebaseAuth.verificar_token") - - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.get( - "/credenciales", - headers={"Origin": "http://localhost:5178", "Host": "localhost"}, - ) + RES = CLIENTE.get( + "/credenciales", + headers={"Origin": "http://localhost:5178", "Host": "localhost"}, + ) assert RES.status_code == 200 assert RES.json() == TEST_CREDS @@ -102,11 +113,11 @@ def test_80(): Test para validar que el middleware retorne el mensaje adecuado cuando no se ha colocado el header 'origin' """ - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.get( - "/credenciales" - ) + app.router.lifespan_context = mock_inicializar_modelos + with TestClient(app) as CLIENTE: + RES = CLIENTE.get( + "/credenciales" + ) assert RES.status_code == 400 assert RES.content.decode() == "Encabezado 'origin' inválido" @@ -116,14 +127,14 @@ def test_81(mocker: MockerFixture): Test para validar que el middleware rechace la petición cuando viene de un origen no autorizado """ + app.router.lifespan_context = mock_inicializar_modelos mocker.patch("app.main.ORIGENES_AUTORIZADOS", ["https://dominio.subdominio1.*.com"]) - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.get( - "/credenciales", - headers={ "origin": "https://dominio.subdominio2.hola.com"} - ) + with TestClient(app) as CLIENTE: + RES = CLIENTE.get( + "/credenciales", + headers={ "origin": "https://dominio.subdominio2.hola.com"} + ) assert RES.status_code == 403 assert RES.content.decode() == "Origen no autorizado" @@ -132,14 +143,14 @@ def test_82(mocker: MockerFixture): """ Test para validar que el middleware acepte una petición de un origen autorizado """ + app.router.lifespan_context = mock_inicializar_modelos mocker.patch("app.main.ORIGENES_AUTORIZADOS", ["https://dominio.subdominio1.*.com"]) - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.get( - "/credenciales", - headers={ "origin": "https://dominio.subdominio1.hola.com", "Host": "localhost"} - ) + with TestClient(app) as CLIENTE: + RES = CLIENTE.get( + "/credenciales", + headers={ "origin": "https://dominio.subdominio1.hola.com", "Host": "localhost"} + ) assert RES.status_code == 200 assert RES.json() == TEST_CREDS \ No newline at end of file diff --git a/tests/scripts/api/test_recaptcha.py b/tests/scripts/api/test_recaptcha.py index 61d4ccc..df5c5cf 100644 --- a/tests/scripts/api/test_recaptcha.py +++ b/tests/scripts/api/test_recaptcha.py @@ -7,7 +7,8 @@ def test_68(): Test para validar que la función "manejador_errores" devuelve el mensaje de error correcto cuando el token es erroneo. """ - RES = manejador_errores("invalid-input-response", "es") + TEXTOS = { "es": { "errCaptchaTokenErroneo": "El token proveído tiene errores." }} + RES = manejador_errores("invalid-input-response", "es", TEXTOS) assert RES == "El token proveído tiene errores." def test_69(): @@ -15,7 +16,8 @@ def test_69(): Test para validar que la función "manejador_errores" devuelve el mensaje de error correcto cuando el token ya expiró o no es válido. """ - RES = manejador_errores("timeout-or-duplicate", "es") + TEXTOS = { "es": { "errCaptchaTokenInvalido": "El token ha expirado o ya fue utilizado." }} + RES = manejador_errores("timeout-or-duplicate", "es", TEXTOS) assert RES == "El token ha expirado o ya fue utilizado." def test_70(): @@ -23,7 +25,7 @@ def test_70(): Test para validar que la función "manejador_errores" no devuelve ningún mensaje cuando el error no es conocido. """ - RES = manejador_errores("invalid-input-secret", "es") + RES = manejador_errores("invalid-input-secret", "es", {}) assert RES == "invalid-input-secret" def test_71(mocker: MockerFixture): @@ -39,7 +41,7 @@ def test_71(mocker: MockerFixture): MOCK.return_value = RESPUESTA FUNC = mocker.patch("app.apis.Recaptcha.manejador_errores") - RES = verificar_peticion_recaptcha("token_valido", "es") + RES = verificar_peticion_recaptcha("token_valido", "es", {}) assert RES == {"success": True, "hostname": "host.com" } FUNC.assert_not_called() @@ -48,13 +50,14 @@ def test_72(mocker: MockerFixture): """ Test para validar que la función "verificar_peticion_recaptcha" no procese los errores. """ - + TEXTOS = { "es": { "errCaptchaTokenInvalido": "El token ha expirado o ya fue utilizado.", + "errCaptchaTokenErroneo": "El token proveído tiene errores."}} RESPUESTA = mocker.MagicMock(spec=Response) RESPUESTA.json.return_value = {"success": False, "error-codes": ["invalid-input-response", "timeout-or-duplicate"]} MOCK = mocker.patch("app.apis.Recaptcha.post") MOCK.return_value = RESPUESTA - RES = verificar_peticion_recaptcha("token_invalido", "es") + RES = verificar_peticion_recaptcha("token_invalido", "es", TEXTOS) assert RES == {"success": False, "error-codes": ["El token proveído tiene errores.", "El token ha expirado o ya fue utilizado."] } \ No newline at end of file diff --git a/tests/scripts/dependencies/test_usuarios_deps.py b/tests/scripts/dependencies/test_usuarios_deps.py index 68b934b..48499c1 100644 --- a/tests/scripts/dependencies/test_usuarios_deps.py +++ b/tests/scripts/dependencies/test_usuarios_deps.py @@ -68,6 +68,7 @@ async def test_56(mocker: MockerFixture): """ PETICION = mocker.MagicMock(spec=Request) PETICION.headers = {"authorization": "Bearer token_valido"} + PETICION.state.textos = { "es": { "errAccesoDenegado": "Acceso denegado." } } DATOS_TOKEN = mocker.patch("app.dependencies.usuarios_dependencies.ver_datos_token", return_value=(1, {"uid": "a1234H"})) ROL = mocker.patch("app.dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=False) @@ -89,6 +90,7 @@ async def test_57(mocker: MockerFixture): """ PETICION = mocker.MagicMock(spec=Request) PETICION.headers = {"authorization": "Bearer token_valido"} + PETICION.state.textos = { "es": { "errTry": "Error al procesar la solicitud:" } } DATOS_TOKEN = mocker.patch("app.dependencies.usuarios_dependencies.ver_datos_token") DATOS_TOKEN.side_effect = Exception("Error inesperado") diff --git a/tests/scripts/models/test_diagnostico.py b/tests/scripts/models/test_diagnostico.py index c985226..4551107 100644 --- a/tests/scripts/models/test_diagnostico.py +++ b/tests/scripts/models/test_diagnostico.py @@ -2,8 +2,27 @@ from onnxruntime import InferenceSession from numpy import array, float32 from pathlib import Path +from dill import load as dload +from pytest_mock import MockerFixture +from onnxruntime import InferenceSession +from pathlib import Path import pytest +@pytest.fixture(autouse=True) +def setup_module(mocker: MockerFixture): + global EXPLAINER, TEXTOS, MODELO + + PATH_BASE = f"{Path(__file__).resolve().parent.parent.parent.parent}/app" + with open(f"{PATH_BASE}/bin/explicador.pkl", "rb") as archivo: + EXPLAINER = dload(archivo) + + MODELO = InferenceSession( + f"{PATH_BASE}/bin/modelo_red_neuronal.onnx", + providers=["CPUExecutionProvider"], + ) + yield + mocker.resetall() + @pytest.mark.asyncio async def test_7(): """ @@ -22,7 +41,7 @@ async def test_7(): "Hepatopatía_crónica": [0], "Renal": [0], "Cardíaca": [0], "Neurológica": [0], "Pulmonar": [0], "Endocrina": [0], "Gastrointestinal": [0], "Hematologica": [0], "Urológica": [0], "Vascular": [0], } - OBJ = Diagnostico(INSTANCIA) + OBJ = Diagnostico(INSTANCIA, MODELO, EXPLAINER) RES = await OBJ.generar_diagnostico() assert RES["prediccion"] == True @@ -44,12 +63,8 @@ def test_87(): 13.8, 211100, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], dtype=float32) - OBJ = Diagnostico(INSTANCIA) - sesion = InferenceSession( - f"{Path(__file__).resolve().parent.parent.parent.parent}/app/bin/modelo_red_neuronal.onnx", - providers=["CPUExecutionProvider"], - ) - RES = OBJ.obtener_probabilidades_predicciones(array([INSTANCIA, INSTANCIA1]).reshape(2, -1), sesion) + OBJ = Diagnostico(INSTANCIA, MODELO, EXPLAINER) + RES = OBJ.obtener_probabilidades_predicciones(array([INSTANCIA, INSTANCIA1]).reshape(2, -1)) assert round(RES[0][1], 0) == 0 assert round(RES[1][1], 0) == 1 @@ -71,12 +86,8 @@ def test_88(): "Hepatopatía_crónica": [0], "Renal": [0], "Cardíaca": [0], "Neurológica": [0], "Pulmonar": [0], "Endocrina": [0], "Gastrointestinal": [0], "Hematologica": [0], "Urológica": [0], "Vascular": [0], } - OBJ = Diagnostico(INSTANCIA) - sesion = InferenceSession( - f"{Path(__file__).resolve().parent.parent.parent.parent}/app/bin/modelo_red_neuronal.onnx", - providers=["CPUExecutionProvider"], - ) - OBJ.generar_explicacion(sesion) + OBJ = Diagnostico(INSTANCIA, MODELO, EXPLAINER) + OBJ.generar_explicacion() assert OBJ.explicacion is not None assert len(OBJ.explicacion) == 10 diff --git a/tests/scripts/routers/test_main_router.py b/tests/scripts/routers/test_main_router.py index e83bc95..e014aff 100644 --- a/tests/scripts/routers/test_main_router.py +++ b/tests/scripts/routers/test_main_router.py @@ -1,33 +1,61 @@ from pytest_mock import MockerFixture from fastapi.testclient import TestClient -import app.main +from app.main import app import pytest +from contextlib import asynccontextmanager +from dill import load as dload +from json import load as jload +from onnxruntime import InferenceSession +from pathlib import Path +# Constantes de prueba TEST_CREDS = { - "apiKey": "test_api_key", - "authDomain": "test_auth_domain", - "projectId": "test_project_id", - "storageBucket": "test_storage_bucket", - "messagingSenderId": "test_messaging_sender_id", - "appId": "test_app_id", - "measurementId": "test_measurement_id", - "driveScopes": [ - "https://www.googleapis.com/auth/drive", - ], + "apiKey": "test_api_key", + "authDomain": "test_auth_domain", + "projectId": "test_project_id", + "storageBucket": "test_storage_bucket", + "messagingSenderId": "test_messaging_sender_id", + "appId": "test_app_id", + "measurementId": "test_measurement_id", + "driveScopes": [ + "https://www.googleapis.com/auth/drive", + ], + "reCAPTCHA": "test_recaptcha", +} + +MOCK_FIREBASE_APP = { + "appId": "test_app_id", + "cred": {"projectId": "test_project_id", "certificated": True}, +} + +TEXTOS = { + "es": { "errTry": "Error al procesar la solicitud:"}, +} + + +@asynccontextmanager +async def mock_inicializar_modelos(app): + PATH_BASE = Path(__file__).resolve().parent.parent.parent.parent + with open(f"{PATH_BASE}/app/bin/explicador.pkl", "rb") as archivo: + EXPLAINER = dload(archivo) + + MODELO = InferenceSession( + f"{PATH_BASE}/app/bin/modelo_red_neuronal.onnx", + providers=["CPUExecutionProvider"], + ) + yield { + "explicador": EXPLAINER, + "textos": TEXTOS, + "modelo": MODELO, + "firebase_app": MOCK_FIREBASE_APP, + "credenciales": TEST_CREDS, } @pytest.fixture(autouse=True) def setup_module(mocker: MockerFixture): - MOCK_APP = { - "appId": "test_app_id", - "cred": {"projectId": "test_project_id", "certificated": True}, - } - - mocker.patch("app.main.firebase_app", MOCK_APP) mocker.patch("app.main.CORS_ORIGINS", ["http://localhost:5178",]) mocker.patch("app.main.ALLOWED_HOSTS", ["localhost",], ) mocker.patch("app.main.ORIGENES_AUTORIZADOS", ["*"]) - mocker.patch("app.routers.main_router.CREDS_FIREBASE_CLIENTE", TEST_CREDS) yield mocker.resetall() @@ -83,17 +111,18 @@ def test_16(mocker: MockerFixture): "vih": 0, } + app.router.lifespan_context = mock_inicializar_modelos + VALIDADOR = mocker.patch("apis.FirebaseAuth.validar_txt_token", return_value=True) FIREBASE = mocker.patch("firebase_admin.auth.verify_id_token", return_value=1) - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.post( - "/diagnosticar", - headers={"Origin": "http://localhost:5178", "Host": "localhost", - "Authorization": "Bearer token_valido"}, - json=INSTANCIA - ) + with TestClient(app) as CLIENTE: + RES = CLIENTE.post( + "/diagnosticar", + headers={"Origin": "http://localhost:5178", "Host": "localhost", + "Authorization": "Bearer token_valido"}, + json=INSTANCIA + ) JSON = RES.json() assert RES.status_code == 200 @@ -160,20 +189,20 @@ def test_17(mocker: MockerFixture): "vih": 0, } + app.router.lifespan_context = mock_inicializar_modelos VALIDADOR = mocker.patch("apis.FirebaseAuth.validar_txt_token", return_value=True) FIREBASE = mocker.patch("firebase_admin.auth.verify_id_token", return_value=1) DIAGNOSTICO = mocker.patch("models.Diagnostico.Diagnostico.generar_diagnostico") DIAGNOSTICO.side_effect = Exception("Error al generar el diagnóstico") - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.post( - "/diagnosticar", - headers={"Origin": "http://localhost:5178", "Host": "localhost", - "Authorization": "Bearer token_valido"}, - json=INSTANCIA - ) + with TestClient(app) as CLIENTE: + RES = CLIENTE.post( + "/diagnosticar", + headers={"Origin": "http://localhost:5178", "Host": "localhost", + "Authorization": "Bearer token_valido"}, + json=INSTANCIA + ) assert RES.status_code == 500 assert RES.json() == {"error": "Error al procesar la solicitud: Error al generar el diagnóstico"} @@ -190,51 +219,50 @@ def test_73(mocker: MockerFixture): Test para validar el endpoint de recaptcha retorne la respuesta correspondiente a la verificación de un token. """ + app.router.lifespan_context = mock_inicializar_modelos FUNC = mocker.patch("routers.main_router.verificar_peticion_recaptcha", return_value={"success": True, "hostname": "0.0.0.0"}) - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.post( - "/recaptcha", - headers={"Origin": "http://localhost:5178", "Host": "localhost", - "Authorization": "Bearer token_valido"}, - json={"token": "token_valido"} - ) + with TestClient(app) as CLIENTE: + RES = CLIENTE.post( + "/recaptcha", + headers={"Origin": "http://localhost:5178", "Host": "localhost", + "Authorization": "Bearer token_valido"}, + json={"token": "token_valido"} + ) assert RES.status_code == 200 assert RES.json() == {"success": True, "hostname": "0.0.0.0"} - FUNC.assert_called_once_with("token_valido", "es") + FUNC.assert_called_once_with("token_valido", "es", TEXTOS) def test_74(mocker: MockerFixture): """ Test para validar que el endpoint para verificar el captcha maneje correctamente las excepciones. """ + app.router.lifespan_context = mock_inicializar_modelos FUNC = mocker.patch("routers.main_router.verificar_peticion_recaptcha", side_effect=Exception("Error de verificación")) - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.post( - "/recaptcha", - headers={"Origin": "http://localhost:5178", "Host": "localhost", - "Authorization": "Bearer token_valido"}, - json={"token": "token_valido"} - ) + with TestClient(app) as CLIENTE: + RES = CLIENTE.post( + "/recaptcha", + headers={"Origin": "http://localhost:5178", "Host": "localhost", + "Authorization": "Bearer token_valido"}, + json={"token": "token_valido"} + ) assert RES.status_code == 500 assert RES.json() == {"error": "Error al procesar la solicitud: Error de verificación"} - FUNC.assert_called_once_with("token_valido", "es") + FUNC.assert_called_once_with("token_valido", "es", TEXTOS) def test_91(): """ Test para validar que el endpoint de healthcheck retorne la respuesta correcta. """ - - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.get("/healthcheck", headers={"Origin": "http://localhost:5178", "Host": "localhost"}) + app.router.lifespan_context = mock_inicializar_modelos + with TestClient(app) as CLIENTE: + RES = CLIENTE.get("/healthcheck", headers={"Origin": "http://localhost:5178", "Host": "localhost"}) assert RES.status_code == 200 assert RES.json() == {"status": "ok"} \ No newline at end of file diff --git a/tests/scripts/routers/test_usuarios_router.py b/tests/scripts/routers/test_usuarios_router.py index e416c32..f4053ba 100644 --- a/tests/scripts/routers/test_usuarios_router.py +++ b/tests/scripts/routers/test_usuarios_router.py @@ -2,47 +2,110 @@ from fastapi.testclient import TestClient from fastapi.responses import JSONResponse from firebase_admin.auth import UserRecord -import app.main +from app.main import app import pytest +from contextlib import asynccontextmanager + +# Constantes de prueba +TEST_CREDS = { + "apiKey": "test_api_key", + "authDomain": "test_auth_domain", + "projectId": "test_project_id", + "storageBucket": "test_storage_bucket", + "messagingSenderId": "test_messaging_sender_id", + "appId": "test_app_id", + "measurementId": "test_measurement_id", + "driveScopes": [ + "https://www.googleapis.com/auth/drive", + ], +} + +MOCK_FIREBASE_APP = { + "appId": "test_app_id", + "cred": {"projectId": "test_project_id", "certificated": True}, +} + +MOCK_TEXTOS = { + "es": { + "errTry": "Error al procesar la solicitud:", + "errAccesoDenegado": "Acceso denegado.", + "errTokenInvalido": "Token inválido", + "errUIDInvalido": "UID inválido", + "errObtenerUsuario": "Error al obtener el usuario", + "errUsuarioNoEncontrado": "Usuario no encontrado", + } +} -@pytest.fixture(autouse=True) -def setup_module(mocker: MockerFixture): - MOCK_APP = { - "appId": "test_app_id", - "cred": {"projectId": "test_project_id", "certificated": True}, + +@asynccontextmanager +async def mock_inicializar_modelos(app): + yield { + "explicador": None, # Mock del explicador + "textos": MOCK_TEXTOS, + "modelo": None, # Mock del modelo + "firebase_app": MOCK_FIREBASE_APP, + "credenciales": TEST_CREDS, } - mocker.patch("routers.usuarios_router.firebase_app", MOCK_APP) - mocker.patch("app.main.CORS_ORIGINS", ["http://localhost:5178",]) - mocker.patch("app.main.ALLOWED_HOSTS", ["localhost",] ) + +@pytest.fixture(autouse=True) +def setup_module(mocker: MockerFixture): + mocker.patch( + "app.main.CORS_ORIGINS", + [ + "http://localhost:5178", + ], + ) + mocker.patch( + "app.main.ALLOWED_HOSTS", + [ + "localhost", + ], + ) mocker.patch("app.main.ORIGENES_AUTORIZADOS", ["*"]) yield mocker.resetall() + def test_31(mocker: MockerFixture): """ Test para validar que el API retorne los datos de los usuarios con una petición autenticada. """ - - DATOS = [{"correo": "usuario@correo.com", "uid": "a1234H", "nombre": "usuario", "ultima_conexion": 1000, "rol": 0, "estado": True}] - DATOS_TOKEN = mocker.patch("dependencies.usuarios_dependencies.ver_datos_token", return_value=(1, {"uid": "a1234H"})) - ROL = mocker.patch("dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True) + DATOS = [ + { + "correo": "usuario@correo.com", + "uid": "a1234H", + "nombre": "usuario", + "ultima_conexion": 1000, + "rol": 0, + "estado": True, + } + ] + DATOS_TOKEN = mocker.patch( + "dependencies.usuarios_dependencies.ver_datos_token", + return_value=(1, {"uid": "a1234H"}), + ) + ROL = mocker.patch( + "dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True + ) USUARIO = mocker.patch("routers.usuarios_router.ver_datos_usuarios") USUARIO.return_value = JSONResponse( - status_code=200, - media_type="application/json", - content={"usuarios": DATOS} + status_code=200, media_type="application/json", content={"usuarios": DATOS} ) - CLIENTE = TestClient(app.main.app) + app.router.lifespan_context = mock_inicializar_modelos - RES = CLIENTE.get( - "/admin/usuarios", - headers={"Origin": "http://localhost:5178", "Host": "localhost", - "Authorization": "Bearer token_valido"} - ) + with TestClient(app) as CLIENTE: + RES = CLIENTE.get( + "/admin/usuarios", + headers={ + "Origin": "http://localhost:5178", + "Host": "localhost", + "Authorization": "Bearer token_valido", + }, + ) assert RES.status_code == 200 assert RES.json() == {"usuarios": DATOS} @@ -51,22 +114,31 @@ def test_31(mocker: MockerFixture): ROL.assert_called_once_with("a1234H") USUARIO.assert_called_once() + def test_32(mocker: MockerFixture): """ Test para validar que el API no retorne los datos de los usuarios si el usuario no es administrador. """ + DATOS_TOKEN = mocker.patch( + "dependencies.usuarios_dependencies.ver_datos_token", + return_value=(1, {"uid": "a1234H"}), + ) + ROL = mocker.patch( + "dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=False + ) - DATOS_TOKEN = mocker.patch("dependencies.usuarios_dependencies.ver_datos_token", return_value=(1, {"uid": "a1234H"})) - ROL = mocker.patch("dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=False) - - CLIENTE = TestClient(app.main.app) + app.router.lifespan_context = mock_inicializar_modelos - RES = CLIENTE.get( - "/admin/usuarios", - headers={"Origin": "http://localhost:5178", "Host": "localhost", - "Authorization": "Bearer token_valido"} - ) + with TestClient(app) as CLIENTE: + RES = CLIENTE.get( + "/admin/usuarios", + headers={ + "Origin": "http://localhost:5178", + "Host": "localhost", + "Authorization": "Bearer token_valido", + }, + ) assert RES.status_code == 403 assert RES.json() == {"error": "Acceso denegado."} @@ -74,84 +146,115 @@ def test_32(mocker: MockerFixture): DATOS_TOKEN.assert_called_once() ROL.assert_called_once_with("a1234H") + def test_33(mocker: MockerFixture): """ Test para validar que el API no retorne los datos de los usuarios si el usuario no es administrador. """ - DATOS_TOKEN = mocker.patch("dependencies.usuarios_dependencies.ver_datos_token") DATOS_TOKEN.side_effect = Exception("Error inesperado") - CLIENTE = TestClient(app.main.app) + app.router.lifespan_context = mock_inicializar_modelos - RES = CLIENTE.get( - "/admin/usuarios", - headers={"Origin": "http://localhost:5178", "Host": "localhost", - "Authorization": "Bearer token_valido"} - ) + with TestClient(app) as CLIENTE: + RES = CLIENTE.get( + "/admin/usuarios", + headers={ + "Origin": "http://localhost:5178", + "Host": "localhost", + "Authorization": "Bearer token_valido", + }, + ) assert RES.status_code == 500 assert RES.json() == {"error": "Error al procesar la solicitud: Error inesperado"} DATOS_TOKEN.assert_called_once() + def test_42(mocker: MockerFixture): """ Test para validar que el API retorne los datos de un usuario con una petición autenticada. """ - - DATOS = {"correo": "usuario@correo.com", "uid": "a1234H", "nombre": "usuario", "ultima_conexion": 1000, "rol": 0, "estado": True} - DATOS_TOKEN = mocker.patch("dependencies.usuarios_dependencies.ver_datos_token", return_value=(1, {"uid": "a1234H"})) - ROL = mocker.patch("dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True) + DATOS = { + "correo": "usuario@correo.com", + "uid": "a1234H", + "nombre": "usuario", + "ultima_conexion": 1000, + "rol": 0, + "estado": True, + } + DATOS_TOKEN = mocker.patch( + "dependencies.usuarios_dependencies.ver_datos_token", + return_value=(1, {"uid": "a1234H"}), + ) + ROL = mocker.patch( + "dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True + ) mocker.patch("routers.usuarios_router.validar_uid", return_value=True) + app.router.lifespan_context = mock_inicializar_modelos + USUARIO = mocker.patch("routers.usuarios_router.ver_datos_usuario") USUARIO.return_value = JSONResponse( - status_code=200, - media_type="application/json", - content=DATOS + status_code=200, media_type="application/json", content=DATOS ) - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.get( - "/admin/usuarios/a1234H", - headers={"Origin": "http://localhost:5178", "Host": "localhost", - "Authorization": "Bearer token_valido", "Language": "es"} - ) + with TestClient(app) as CLIENTE: + RES = CLIENTE.get( + "/admin/usuarios/a1234H", + headers={ + "Origin": "http://localhost:5178", + "Host": "localhost", + "Authorization": "Bearer token_valido", + "Language": "es", + }, + ) assert RES.status_code == 200 assert RES.json() == DATOS DATOS_TOKEN.assert_called_once() ROL.assert_called_once_with("a1234H") - USUARIO.assert_called_once_with({ - "appId": "test_app_id", - "cred": {"projectId": "test_project_id", "certificated": True}, - }, "a1234H", "es") + USUARIO.assert_called_once_with( + { + "appId": "test_app_id", + "cred": {"projectId": "test_project_id", "certificated": True}, + }, + "a1234H", + "es", + MOCK_TEXTOS, + ) + def test_43(mocker: MockerFixture): """ Test para validar que el API no retorne los datos del usuario si el token es inválido. """ - - DATOS_TOKEN = mocker.patch("dependencies.usuarios_dependencies.ver_datos_token", return_value=(0, {"error": "Token inválido"})) - - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.get( - "/admin/usuarios/a1234H", - headers={"Origin": "http://localhost:5178", "Host": "localhost", - "Authorization": "Bearer token_invalido"} + DATOS_TOKEN = mocker.patch( + "dependencies.usuarios_dependencies.ver_datos_token", + return_value=(0, {"error": "Token inválido"}), ) + app.router.lifespan_context = mock_inicializar_modelos + + with TestClient(app) as CLIENTE: + RES = CLIENTE.get( + "/admin/usuarios/a1234H", + headers={ + "Origin": "http://localhost:5178", + "Host": "localhost", + "Authorization": "Bearer token_invalido", + }, + ) assert RES.status_code == 403 assert RES.json() == {"error": "Token inválido"} DATOS_TOKEN.assert_called_once() + def test_44(mocker: MockerFixture): """ Test para validar que el API no retorne los datos de los usuarios si ocurre una excepción. @@ -159,37 +262,48 @@ def test_44(mocker: MockerFixture): DATOS_TOKEN = mocker.patch("dependencies.usuarios_dependencies.ver_datos_token") DATOS_TOKEN.side_effect = Exception("Error inesperado") - - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.get( - "/admin/usuarios/a1234H", - headers={"Origin": "http://localhost:5178", "Host": "localhost", - "Authorization": "Bearer token_valido"} - ) + app.router.lifespan_context = mock_inicializar_modelos + + with TestClient(app) as CLIENTE: + RES = CLIENTE.get( + "/admin/usuarios/a1234H", + headers={ + "Origin": "http://localhost:5178", + "Host": "localhost", + "Authorization": "Bearer token_valido", + }, + ) assert RES.status_code == 500 assert RES.json() == {"error": "Error al procesar la solicitud: Error inesperado"} DATOS_TOKEN.assert_called_once() + def test_45(mocker: MockerFixture): """ Test para validar que el API no retorne los datos de un usuario con un UID inválido """ - - DATOS_TOKEN = mocker.patch("dependencies.usuarios_dependencies.ver_datos_token", return_value=(1, {"uid": "a1234H"})) - ROL = mocker.patch("dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True) - mocker.patch("routers.usuarios_router.validar_uid", return_value=False) - - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.get( - "/admin/usuarios/a1234H", - headers={"Origin": "http://localhost:5178", "Host": "localhost", - "Authorization": "Bearer token_valido"} + DATOS_TOKEN = mocker.patch( + "dependencies.usuarios_dependencies.ver_datos_token", + return_value=(1, {"uid": "a1234H"}), + ) + ROL = mocker.patch( + "dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True ) + mocker.patch("routers.usuarios_router.validar_uid", return_value=False) + app.router.lifespan_context = mock_inicializar_modelos + + with TestClient(app) as CLIENTE: + RES = CLIENTE.get( + "/admin/usuarios/a1234H", + headers={ + "Origin": "http://localhost:5178", + "Host": "localhost", + "Authorization": "Bearer token_valido", + }, + ) assert RES.status_code == 400 assert RES.json() == {"error": "UID inválido"} @@ -197,23 +311,31 @@ def test_45(mocker: MockerFixture): DATOS_TOKEN.assert_called_once() ROL.assert_called_once_with("a1234H") + def test_46(mocker: MockerFixture): """ Test para validar que el API no retorne los datos de un usuario si se lanza un ValueError. """ - - DATOS_TOKEN = mocker.patch("dependencies.usuarios_dependencies.ver_datos_token", return_value=(1, {"uid": "a1234H"})) - ROL = mocker.patch("dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True) + DATOS_TOKEN = mocker.patch( + "dependencies.usuarios_dependencies.ver_datos_token", + return_value=(1, {"uid": "a1234H"}), + ) + ROL = mocker.patch( + "dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True + ) MOCK = mocker.patch("routers.usuarios_router.validar_uid") MOCK.side_effect = ValueError("UID inválido") - - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.get( - "/admin/usuarios/a1234H", - headers={"Origin": "http://localhost:5178", "Host": "localhost", - "Authorization": "Bearer token_valido"} - ) + app.router.lifespan_context = mock_inicializar_modelos + + with TestClient(app) as CLIENTE: + RES = CLIENTE.get( + "/admin/usuarios/a1234H", + headers={ + "Origin": "http://localhost:5178", + "Host": "localhost", + "Authorization": "Bearer token_valido", + }, + ) assert RES.status_code == 400 assert RES.json() == {"error": "UID inválido"} @@ -221,12 +343,18 @@ def test_46(mocker: MockerFixture): DATOS_TOKEN.assert_called_once() ROL.assert_called_once_with("a1234H") + def test_60(mocker: MockerFixture): """ Test para validar que el API actualice el estado de un usuario correctamente. """ - DATOS_TOKEN = mocker.patch("dependencies.usuarios_dependencies.ver_datos_token", return_value=(1, {"uid": "a1234H"})) - ROL = mocker.patch("dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True) + DATOS_TOKEN = mocker.patch( + "dependencies.usuarios_dependencies.ver_datos_token", + return_value=(1, {"uid": "a1234H"}), + ) + ROL = mocker.patch( + "dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True + ) mocker.patch("routers.usuarios_router.validar_uid", return_value=True) USUARIO = mocker.MagicMock(spec=UserRecord) @@ -235,30 +363,40 @@ def test_60(mocker: MockerFixture): FIRESTORE = mocker.patch("routers.usuarios_router.ver_usuario_firebase") FIRESTORE.return_value = (1, USUARIO) - ACT = mocker.patch("routers.usuarios_router.actualizar_estado_usuario",) + ACT = mocker.patch( + "routers.usuarios_router.actualizar_estado_usuario", + ) ACT.return_value = JSONResponse( - {"mensaje": "Estado del usuario actualizado correctamente"}, - status_code=200, - media_type="application/json", - ) + {"mensaje": "Estado del usuario actualizado correctamente"}, + status_code=200, + media_type="application/json", + ) - CLIENTE = TestClient(app.main.app) + app.router.lifespan_context = mock_inicializar_modelos - RES = CLIENTE.patch( - "/admin/usuarios/a1234H?desactivar=false", - headers={"Origin": "http://localhost:5178", "Host": "localhost", - "Authorization": "Bearer token_valido"} - ) + with TestClient(app) as CLIENTE: + RES = CLIENTE.patch( + "/admin/usuarios/a1234H?desactivar=false", + headers={ + "Origin": "http://localhost:5178", + "Host": "localhost", + "Authorization": "Bearer token_valido", + }, + ) assert RES.status_code == 200 assert RES.json() == {"mensaje": "Estado del usuario actualizado correctamente"} DATOS_TOKEN.assert_called_once() ROL.assert_called_once_with("a1234H") - FIRESTORE.assert_called_once_with({ - "appId": "test_app_id", - "cred": {"projectId": "test_project_id", "certificated": True}, - }, "a1234H") + FIRESTORE.assert_called_once_with( + { + "appId": "test_app_id", + "cred": {"projectId": "test_project_id", "certificated": True}, + }, + "a1234H", + ) + def test_61(mocker: MockerFixture): """ @@ -268,14 +406,17 @@ def test_61(mocker: MockerFixture): DATOS_TOKEN.return_value = (0, {"error": "Token inválido"}) FUNC = mocker.patch("routers.usuarios_router.actualizar_estado_usuario") - - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.patch( - "/admin/usuarios/a1234H?desactivar=true", - headers={"Origin": "http://localhost:5178", "Host": "localhost", - "Authorization": "Bearer token_invalido"} - ) + app.router.lifespan_context = mock_inicializar_modelos + + with TestClient(app) as CLIENTE: + RES = CLIENTE.patch( + "/admin/usuarios/a1234H?desactivar=true", + headers={ + "Origin": "http://localhost:5178", + "Host": "localhost", + "Authorization": "Bearer token_invalido", + }, + ) assert RES.status_code == 403 assert RES.json() == {"error": "Token inválido"} @@ -283,6 +424,7 @@ def test_61(mocker: MockerFixture): DATOS_TOKEN.assert_called_once() FUNC.assert_not_called() + def test_62(mocker: MockerFixture): """ Test para validar que el API retorne un error al intentar actualizar el estado de un @@ -294,19 +436,23 @@ def test_62(mocker: MockerFixture): FIRESTORE = mocker.patch("routers.usuarios_router.ver_usuario_firebase") FIRESTORE.return_value = (0, None) - ROL = mocker.patch("dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True) - + ROL = mocker.patch( + "dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True + ) FUNC = mocker.patch("routers.usuarios_router.actualizar_estado_usuario") mocker.patch("routers.usuarios_router.validar_uid", return_value=True) - - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.patch( - "/admin/usuarios/a1234H?desactivar=true", - headers={"Origin": "http://localhost:5178", "Host": "localhost", - "Authorization": "Bearer token_valido"} - ) + app.router.lifespan_context = mock_inicializar_modelos + + with TestClient(app) as CLIENTE: + RES = CLIENTE.patch( + "/admin/usuarios/a1234H?desactivar=true", + headers={ + "Origin": "http://localhost:5178", + "Host": "localhost", + "Authorization": "Bearer token_valido", + }, + ) assert RES.status_code == 404 assert RES.json() == {"error": "Usuario no encontrado"} @@ -315,6 +461,7 @@ def test_62(mocker: MockerFixture): DATOS_TOKEN.assert_called_once() FUNC.assert_not_called() + def test_63(mocker: MockerFixture): """ Test para validar que el API no actualice los datos de un usuario si ocurre una excepción. @@ -325,25 +472,33 @@ def test_63(mocker: MockerFixture): FIRESTORE = mocker.patch("routers.usuarios_router.ver_usuario_firebase") FIRESTORE.return_value = (-1, None) - ROL = mocker.patch("dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True) + ROL = mocker.patch( + "dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True + ) FUNC = mocker.patch("routers.usuarios_router.actualizar_estado_usuario") mocker.patch("routers.usuarios_router.validar_uid", return_value=True) - - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.patch( - "/admin/usuarios/a1234H?desactivar=true", - headers={"Origin": "http://localhost:5178", "Host": "localhost", - "Authorization": "Bearer token_valido"} - ) + app.router.lifespan_context = mock_inicializar_modelos + + with TestClient(app) as CLIENTE: + RES = CLIENTE.patch( + "/admin/usuarios/a1234H?desactivar=true", + headers={ + "Origin": "http://localhost:5178", + "Host": "localhost", + "Authorization": "Bearer token_valido", + }, + ) assert RES.status_code == 500 - assert RES.json() == {"error": "Error al procesar la solicitud: Error al obtener el usuario"} + assert RES.json() == { + "error": "Error al procesar la solicitud: Error al obtener el usuario" + } FUNC.assert_not_called() ROL.assert_called_once() + def test_64(mocker: MockerFixture): """ Test para validar que actualice los datos de un usuario si se lanza un ValueError. @@ -351,20 +506,23 @@ def test_64(mocker: MockerFixture): DATOS_TOKEN = mocker.patch("dependencies.usuarios_dependencies.ver_datos_token") DATOS_TOKEN.return_value = (1, {"uid": "a1234H"}) - ROL = mocker.patch("dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True) - + ROL = mocker.patch( + "dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True + ) mocker.patch("routers.usuarios_router.validar_uid", return_value=False) - FUNC = mocker.patch("app.routers.usuarios_router.actualizar_estado_usuario") + app.router.lifespan_context = mock_inicializar_modelos - CLIENTE = TestClient(app.main.app) - - RES = CLIENTE.patch( - "/admin/usuarios/a1234H?desactivar=true", - headers={"Origin": "http://localhost:5178", "Host": "localhost", - "Authorization": "Bearer token_valido"} - ) + with TestClient(app) as CLIENTE: + RES = CLIENTE.patch( + "/admin/usuarios/a1234H?desactivar=true", + headers={ + "Origin": "http://localhost:5178", + "Host": "localhost", + "Authorization": "Bearer token_valido", + }, + ) assert RES.status_code == 400 assert RES.json() == {"error": "UID inválido"} @@ -372,6 +530,7 @@ def test_64(mocker: MockerFixture): FUNC.assert_not_called() ROL.assert_called_once_with("a1234H") + def test_65(mocker: MockerFixture): """ Test para validar que actualice los datos de un usuario si se lanza un ValueError. @@ -381,15 +540,19 @@ def test_65(mocker: MockerFixture): FUNC = mocker.patch("routers.usuarios_router.actualizar_estado_usuario") - CLIENTE = TestClient(app.main.app) + app.router.lifespan_context = mock_inicializar_modelos - RES = CLIENTE.patch( - "/admin/usuarios/a1234H?desactivar=true", - headers={"Origin": "http://localhost:5178", "Host": "localhost", - "Authorization": "Bearer token_valido"} - ) + with TestClient(app) as CLIENTE: + RES = CLIENTE.patch( + "/admin/usuarios/a1234H?desactivar=true", + headers={ + "Origin": "http://localhost:5178", + "Host": "localhost", + "Authorization": "Bearer token_valido", + }, + ) assert RES.status_code == 500 assert RES.json() == {"error": "Error al procesar la solicitud: Error inesperado"} - FUNC.assert_not_called() \ No newline at end of file + FUNC.assert_not_called()