diff --git a/app/apis/FirebaseAuth.py b/app/apis/FirebaseAuth.py index 2ec3f91..ce272c7 100644 --- a/app/apis/FirebaseAuth.py +++ b/app/apis/FirebaseAuth.py @@ -1,7 +1,5 @@ -import firebase_admin.auth from firebase_admin.auth import * -from fastapi import Request -from fastapi.responses import JSONResponse +from constants import COD_ERROR_ESPERADO, COD_ERROR_INESPERADO, COD_EXITO from utils.Validadores import validar_txt_token from utils.Fechas import convertir_datetime_str from utils.Diccionario import ver_si_existe_clave @@ -22,60 +20,42 @@ def validar_token( int: 1 si el token es válido, 0 en caso contrario y -1 si hay un error de validación. """ try: - datos = firebase_admin.auth.verify_id_token( + datos = verify_id_token( token, firebase_app, check_revoked=True ) - return (1, datos) if obtener_datos else 1 + return (COD_EXITO, datos) if obtener_datos else COD_EXITO except (ExpiredIdTokenError, RevokedIdTokenError, UserDisabledError): - return (0, None) if obtener_datos else 0 + return (COD_ERROR_ESPERADO, None) if obtener_datos else COD_ERROR_ESPERADO except (ValueError, CertificateFetchError, InvalidIdTokenError): - return (-1, None) if obtener_datos else -1 + return (COD_ERROR_INESPERADO, None) if obtener_datos else COD_ERROR_INESPERADO -async def verificar_token( - peticion: Request, firebase_app, call_next, authorization: str, textos: dict[str, str], idioma: str ="es" -) -> JSONResponse: +async def verificar_token(firebase_app, token: str) -> int: """ Verifica el token de Firebase en la solicitud. Args: - peticion (Request): La solicitud que contiene el token. - authorization (str | None): El token de autorización de la solicitud. + token (str | None): El token de autorización de la solicitud. firebase_app (object): La instancia de la aplicación Firebase. - call_next (function): La función para pasar al siguiente middleware o ruta. - idioma (str): El idioma para los mensajes de error. Returns: - JSONResponse: La respuesta de la solicitud, o un error si el token es inválido. + int: Código de estado: 1 si el token es válido, 0 si es inválido, -1 si hay un error. """ try: - token = authorization.split("Bearer ")[1] + token = token.split("Bearer ")[1] reg_validacion = validar_txt_token(token) res_validacion = ( - 0 if (not reg_validacion) else validar_token(token, firebase_app, False) - ) - match res_validacion: - case 1: - return await call_next(peticion) - case 0: - return JSONResponse( - {"error": textos[idioma]["errTokenInvalido"]}, - status_code=403, - media_type="application/json", - ) - case _: - return JSONResponse( - {"error": textos[idioma]["errValidarToken"]}, - status_code=400, - media_type="application/json", - ) - except Exception as e: - return JSONResponse( - {"error": f"{textos[idioma]['errTry']} {e}"}, - status_code=500, - media_type="application/json", + COD_ERROR_ESPERADO + if (not reg_validacion) + else validar_token(token, firebase_app, False) ) + return res_validacion + except Exception: + return COD_ERROR_INESPERADO -def ver_datos_token(token: str, firebase_app, idioma: str, textos: dict[str, str]) -> tuple[int, dict]: + +def ver_datos_token( + token: str, firebase_app, idioma: str, textos: dict[str, str] +) -> tuple[int, dict]: """ Obtiene los datos del token de Firebase. Args: @@ -91,36 +71,40 @@ def ver_datos_token(token: str, firebase_app, idioma: str, textos: dict[str, str reg_validacion = validar_txt_token(token) if not reg_validacion: - return (0, {"error": f"{textos[idioma]['errTokenInvalido']}"}) + return ( + COD_ERROR_ESPERADO, + {"error": f"{textos[idioma]['errTokenInvalido']}"}, + ) - res_validacion = validar_token(token, firebase_app, True) + CODIGO, RES = validar_token(token, firebase_app, True) + if CODIGO != COD_EXITO: + error = ( + {"error": f"{textos[idioma]['errTokenInvalido']}"} + if CODIGO == COD_ERROR_ESPERADO + else {"error": f"{textos[idioma]['errValidarToken']}"} + ) - match res_validacion[0]: - case 1: - return res_validacion - case 0: - return (0, {"error": f"{textos[idioma]['errTokenInvalido']}"}) - case _: - return (-1, {"error": f"{textos[idioma]['errValidarToken']}"}) + return (CODIGO, RES if CODIGO == COD_EXITO else error) except Exception as e: - return (-1, {"error": f"{textos[idioma]['errProcesarToken']}: {e}."}) + return ( + COD_ERROR_INESPERADO, + {"error": f"{textos[idioma]['errProcesarToken']}: {str(e)}."}, + ) -async def ver_datos_usuarios(firebase_app, idioma: str, textos: dict[str, str]) -> JSONResponse: +async def ver_datos_usuarios(firebase_app) -> tuple[int, list[dict] | None]: """ Obtiene los datos de los usuarios registrados en Firebase. Args: firebase_app: La instancia de la aplicación Firebase. - idioma (str): El idioma para los mensajes de error. - textos (dict[str, str]): El diccionario de textos para los mensajes de error. Returns: - JSONResponse: Los datos de los usuarios, o un error si ocurre un problema. + tuple[int, list[dict] | None]: Un código de estado y los datos de los usuarios si se obtuvieron correctamente, o None si hubo un error. """ try: AUX = [] roles_task = asyncio.create_task(obtener_roles_usuarios()) - usuarios = firebase_admin.auth.list_users(app=firebase_app) + usuarios = list_users(app=firebase_app) ROLES = await roles_task while True: @@ -149,20 +133,12 @@ async def ver_datos_usuarios(firebase_app, idioma: str, textos: dict[str, str]) else: usuarios = usuarios.get_next_page() - return JSONResponse( - {"usuarios": AUX}, - status_code=200, - media_type="application/json", - ) - except Exception as e: - return JSONResponse( - {"error": f"{textos[idioma]['errObtenerDatosUsuarios']}: {e}"}, - status_code=400, - media_type="application/json", - ) + return (COD_EXITO, AUX) + except Exception: + return (COD_ERROR_INESPERADO, None) -async def ver_datos_usuario(firebase_app, uid: str, idioma: str, textos: dict[str, str]) -> JSONResponse: +async def ver_datos_usuario(firebase_app, uid: str) -> tuple[int, dict | None]: """ Obtiene los datos de un usuario específico usando el UID. Args: @@ -171,15 +147,15 @@ async def ver_datos_usuario(firebase_app, uid: str, idioma: str, textos: dict[st idioma (str): El idioma para los mensajes de error. textos (dict[str, str]): El diccionario de textos para los mensajes de error. Returns: - JSONResponse: Los datos del usuario, o un error si ocurre un problema. + tuple[int, dict | str | None]: Un código de estado y los datos del usuario si se encuentra. """ try: roles_task = asyncio.create_task(obtener_rol_usuario(uid)) - usuario = firebase_admin.auth.get_user(uid, firebase_app) + usuario = get_user(uid, firebase_app) ROL = await roles_task if ROL == -1: - raise UserNotFoundError(f"{textos[idioma]['errUsuarioNoEncontrado']}") + raise UserNotFoundError("") RES = { "correo": usuario.email, @@ -195,23 +171,11 @@ async def ver_datos_usuario(firebase_app, uid: str, idioma: str, textos: dict[st ), } - return JSONResponse( - RES, - status_code=200, - media_type="application/json", - ) + return (COD_EXITO, RES) except UserNotFoundError: - return JSONResponse( - {"error": f"{textos[idioma]['errUsuarioNoEncontrado']}"}, - status_code=404, - media_type="application/json", - ) + return (COD_ERROR_ESPERADO, None) except Exception as e: - return JSONResponse( - {"error": f"{textos[idioma]['errObtenerDatosUsuarios']}: {e}"}, - status_code=400, - media_type="application/json", - ) + return (COD_ERROR_INESPERADO, str(e)) def ver_usuario_firebase(firebase_app, uid: str) -> tuple[int, UserRecord | None]: @@ -224,44 +188,46 @@ def ver_usuario_firebase(firebase_app, uid: str) -> tuple[int, UserRecord | None tuple[int, UserRecord | None]: Un código de estado y el registro del usuario si se encuentra. """ try: - return (1, firebase_admin.auth.get_user(uid, firebase_app)) + RES = get_user(uid, firebase_app) + return (COD_EXITO, RES) except UserNotFoundError: - return (0, None) - except: - return (-1, None) + return (COD_ERROR_ESPERADO, None) + except Exception: + return (COD_ERROR_INESPERADO, None) def actualizar_estado_usuario( - firebase_app, uid: str, estado: bool, idioma: str, textos: dict[str, str] -) -> JSONResponse: + firebase_app, uid: str, estado: bool +) -> tuple[int, dict | None]: """ Actualiza el estado (activado/desactivado) de un usuario específico. Args: firebase_app: La instancia de la aplicación Firebase. uid (str): El UID del usuario a actualizar. estado (bool): El nuevo estado del usuario (True para desactivado, False para activado). - idioma (str): El idioma para los mensajes de error. - textos (dict[str, str]): El diccionario de textos para los mensajes de error. Returns: - JSONResponse | Response: Mensaje de éxito o error. + tuple[int, dict | None]: Un código de estado y los datos del usuario actualizado si se actualiza correctamente. """ try: - firebase_admin.auth.update_user(uid=uid, disabled=estado, app=firebase_app) - - return JSONResponse( - {"mensaje": f"{textos[idioma]['msgUsuarioActualizado']}"}, - status_code=200, - media_type="application/json", + USUARIO = update_user( + uid=uid, disabled=estado, app=firebase_app ) + + RES = { + "correo": USUARIO.email, + "uid": USUARIO.uid, + "nombre": USUARIO.display_name, + "estado": not USUARIO.disabled, + "fecha_registro": convertir_datetime_str( + USUARIO.user_metadata.creation_timestamp + ), + "ultima_conexion": convertir_datetime_str( + USUARIO.user_metadata.last_refresh_timestamp + ), + } + + return (COD_EXITO, RES) except ValueError: - return JSONResponse( - {"error": f"{textos[idioma]['errEstadoInvalido']}"}, - status_code=401, - media_type="application/json", - ) + return (COD_ERROR_ESPERADO, None) except Exception as e: - return JSONResponse( - {"error": f"{textos[idioma]['errTry']} {str(e)}"}, - status_code=500, - media_type="application/json", - ) + return (COD_ERROR_INESPERADO, str(e)) diff --git a/app/constants.py b/app/constants.py index b7744af..a05fdea 100644 --- a/app/constants.py +++ b/app/constants.py @@ -2,6 +2,9 @@ from utils.Dominios import obtener_lista_dominios ROL_ADMIN = 1001 +COD_EXITO = 1 +COD_ERROR_ESPERADO = 0 +COD_ERROR_INESPERADO = -1 CORS_ORIGINS = obtener_lista_dominios(getenv("CORS_ORIGINS", "http://localhost:5173,")) ORIGENES_AUTORIZADOS = obtener_lista_dominios(getenv("ORIGENES_AUTORIZADOS", "http://localhost:5173,")) ACTIVAR_DOCS = getenv("ACTIVAR_DOCS", "false").lower() == "true" diff --git a/app/dependencies/usuarios_dependencies.py b/app/dependencies/usuarios_dependencies.py index 3246590..24dbddd 100644 --- a/app/dependencies/usuarios_dependencies.py +++ b/app/dependencies/usuarios_dependencies.py @@ -1,51 +1,59 @@ from apis.FirebaseAuth import ver_datos_token from apis.Firestore import verificar_rol_usuario -from fastapi import Header, Request +from fastapi import Header, Request, Depends from fastapi.responses import JSONResponse +from constants import COD_ERROR_ESPERADO, COD_ERROR_INESPERADO +from models.Excepciones import AccesoNoAutorizado, UIDInvalido +from utils.Validadores import validar_uid +from urllib.parse import unquote +from dependencies.general_dependencies import verificar_idioma async def verificar_usuario_administrador( peticion: Request, - language: str | None = Header(default="es"), authorization: str | None = Header(default=""), + idioma: str = Depends(verificar_idioma), ) -> tuple[bool, JSONResponse | None]: """ Verifica si el usuario está autenticado y es administrador antes de permitir el acceso a las rutas protegidas. Args: peticion (Request): La solicitud HTTP entrante. - language (str | None): El idioma de la solicitud HTTP. authorization (str | None): El token de autorización de Firebase. - Returns: - tuple: Un tuple que contiene un booleano indicando si el usuario está autenticado y es administrador, y - en caso de error, un JSONResponse con el mensaje de error y el código de estado correspondiente. + idioma (str): El idioma preferido del usuario, obtenido a través de la dependencia `verificar_idioma`. """ firebase_app = peticion.state.firebase_app TEXTOS = peticion.state.textos - idioma = language if language in ("es", "en") else "es" - try: - RES, DATOS = ver_datos_token(authorization, firebase_app, idioma, TEXTOS) - - if RES in (-1, 0): - return False, JSONResponse( - DATOS, - status_code=403 if RES == 0 else 400, - media_type="application/json", - ) - - VALIDAR_ROL = await verificar_rol_usuario(DATOS["uid"]) - - if not VALIDAR_ROL: - return False, JSONResponse( - {"error": f"{TEXTOS[idioma]['errAccesoDenegado']}"}, - status_code=403, - media_type="application/json", - ) - - return True, None - except Exception as e: - return False, JSONResponse( - {"error": f"{TEXTOS[idioma]['errTry']} {str(e)}"}, - status_code=500, - media_type="application/json", - ) + RES, DATOS = ver_datos_token(authorization, firebase_app, idioma, TEXTOS) + + if RES in (COD_ERROR_INESPERADO, COD_ERROR_ESPERADO): + raise AccesoNoAutorizado(DATOS, 403) + + VALIDAR_ROL = await verificar_rol_usuario(DATOS["uid"]) + + if not VALIDAR_ROL: + raise AccesoNoAutorizado({ "error": TEXTOS[idioma]["errAccesoDenegado"] }, 403) + + +async def validador_uid( + peticion: Request, uid: str, idioma: str = Depends(verificar_idioma) +) -> str: + """ + Valida el UID proporcionado en la solicitud. Si es inválido lanza una excepción. + + Args: + peticion (Request): La solicitud HTTP entrante. + uid (str): El UID a validar. + idioma (str): El idioma preferido del usuario, obtenido a través de la dependencia ` + + Returns: + str: El UID validado. + """ + uid = unquote(uid) + TEXTOS = peticion.state.textos + VALIDACION = validar_uid(uid) + + if not VALIDACION: + raise UIDInvalido({"error": f"{TEXTOS[idioma]['errUIDInvalido']}"}) + + return uid diff --git a/app/main.py b/app/main.py index 0e2d0d3..414be92 100644 --- a/app/main.py +++ b/app/main.py @@ -2,13 +2,14 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware from fastapi import Request, Response +from fastapi.responses import JSONResponse from dotenv import load_dotenv from os import getenv from routers.main_router import router as main_router from utils.Dominios import obtener_lista_dominios from routers.usuarios_router import router as usuarios_router from apis.FirebaseAuth import verificar_token -from constants import CORS_ORIGINS, ALLOWED_HOSTS, ACTIVAR_DOCS, ORIGENES_AUTORIZADOS +from constants import * from utils.Validadores import validar_origen from utils.Diccionario import ver_si_existe_clave from contextlib import asynccontextmanager @@ -17,6 +18,7 @@ from json import load as jload from onnxruntime import InferenceSession from firebase_admin_config import inicializar_firebase +from models.Excepciones import * load_dotenv() @@ -137,8 +139,51 @@ async def verificar_credenciales(peticion: Request, call_next) -> Response: if peticion.method in METODOS_RESTRINGIDOS and ( peticion.url.path not in RUTAS_NO_PROTEGIDAS ): - return await verificar_token( - peticion, firebase_app, call_next, token, TEXTOS, idioma + RES = await verificar_token( + firebase_app, token ) - else: - return await call_next(peticion) + + if RES != COD_EXITO: + TEXTO = "errValidarToken" if RES == COD_ERROR_ESPERADO else "errTokenInvalido" + CODIGO = 403 if RES == COD_ERROR_ESPERADO else 400 + return JSONResponse( + {"error": TEXTOS[idioma][TEXTO]}, + status_code=CODIGO, + media_type="application/json", + ) + + return await call_next(peticion) + +# Manejadores globales de excepciones personalizadas +@app.exception_handler(AccesoNoAutorizado) +async def manejar_acceso_no_autorizado(peticion: Request, excepcion: AccesoNoAutorizado): + return JSONResponse( + excepcion.mensaje, + status_code=excepcion.codigo, + media_type="application/json", + ) + + +@app.exception_handler(UIDInvalido) +async def manejar_uid_invalido(peticion: Request, excepcion: UIDInvalido): + return JSONResponse( + excepcion.mensaje, + status_code=400, + media_type="application/json", + ) + +@app.exception_handler(UsuarioInexistente) +async def manejar_usuario_inexistente(peticion: Request, excepcion: UsuarioInexistente): + return JSONResponse( + excepcion.mensaje, + status_code=404, + media_type="application/json", + ) + +@app.exception_handler(ErrorInterno) +async def manejar_error_interno(peticion: Request, excepcion: ErrorInterno): + return JSONResponse( + excepcion.mensaje, + status_code=400, + media_type="application/json", + ) \ No newline at end of file diff --git a/app/models/Diagnostico.py b/app/models/Diagnostico.py index 7daef58..b3c9f86 100644 --- a/app/models/Diagnostico.py +++ b/app/models/Diagnostico.py @@ -23,22 +23,10 @@ def obtener_array_datos(self) -> ndarray: Returns: ndarray: Los datos convertidos en un array de numpy. """ - return array([self.datos["Edad"][0], self.datos["Género"][0], self.datos["Bebedor"][0], self.datos["Fumador"][0], - self.datos["Procedimiento_Quirurgicos___Traumatismo_Grave_en_los_últimos_15_dias"][0], self.datos["Inmovilidad_de_M_inferiores"][0], - self.datos["Viaje_prolongado"][0], self.datos["TEP___TVP_Previo"][0], self.datos["Malignidad"][0], - self.datos["Disnea"][0], self.datos["Dolor_toracico"][0], self.datos["Tos"][0], - self.datos["Hemoptisis"][0], self.datos["Síntomas_disautonomicos"][0], - self.datos["Edema_de_M_inferiores"][0], self.datos["Frecuencia_respiratoria"][0], - self.datos["Saturación_de_la_sangre"][0], self.datos["Frecuencia_cardíaca"][0], - self.datos["Presión_sistólica"][0], self.datos["Presión_diastólica"][0], - self.datos["Fiebre"][0], self.datos["Crepitaciones"][0], self.datos["Sibilancias"][0], - self.datos["Soplos"][0], self.datos["WBC"][0], self.datos["HB"][0], self.datos["PLT"][0], - self.datos["Derrame"][0], self.datos["Otra_Enfermedad"][0], self.datos["Hematologica"][0], - self.datos["Cardíaca"][0], self.datos["Enfermedad_coronaria"][0], self.datos["Diabetes_Mellitus"][0], - self.datos["Endocrina"][0], self.datos["Gastrointestinal"][0], self.datos["Hepatopatía_crónica"][0], - self.datos["Hipertensión_arterial"][0], self.datos["Neurológica"][0], self.datos["Pulmonar"][0], - self.datos["Renal"][0], self.datos["Trombofilia"][0], self.datos["Urológica"][0], self.datos["Vascular"][0], - self.datos["VIH"][0]], dtype=float32) + AUX = [] + for i in self.datos.keys(): + AUX.append(self.datos[i][0]) + return array(AUX, dtype=float32) def convertir_a_diccionario(self, array_datos: ndarray) -> dict: """ @@ -50,29 +38,20 @@ def convertir_a_diccionario(self, array_datos: ndarray) -> dict: Returns: dict: El diccionario con los datos del diagnóstico. """ - claves = { - "Edad": [], "Género": [], "Bebedor": [], "Fumador": [], - "Procedimiento_Quirurgicos___Traumatismo_Grave_en_los_últimos_15_dias": [], "Inmovilidad_de_M_inferiores": [], - "Viaje_prolongado": [], "TEP___TVP_Previo": [], "Malignidad": [], - "Disnea": [], "Dolor_toracico": [], "Tos": [], - "Hemoptisis": [], "Síntomas_disautonomicos": [], - "Edema_de_M_inferiores": [], "Frecuencia_respiratoria": [], - "Saturación_de_la_sangre": [], "Frecuencia_cardíaca": [], - "Presión_sistólica": [], "Presión_diastólica": [], - "Fiebre": [], "Crepitaciones": [], "Sibilancias": [], - "Soplos": [], "WBC": [], "HB": [], "PLT": [], - "Derrame": [], "Otra_Enfermedad": [], "Hematologica": [], - "Cardíaca": [], "Enfermedad_coronaria": [], "Diabetes_Mellitus": [], - "Endocrina": [], "Gastrointestinal": [], "Hepatopatía_crónica": [], - "Hipertensión_arterial": [], "Neurológica": [], "Pulmonar": [], - "Renal": [], "Trombofilia": [], "Urológica": [], "Vascular": [], - "VIH": [] - } + campos = ( + "Edad", "Género", "Bebedor", "Fumador", "Procedimiento_Quirurgicos___Traumatismo_Grave_en_los_últimos_15_dias", + "Inmovilidad_de_M_inferiores", "Viaje_prolongado", "TEP___TVP_Previo", "Malignidad", "Disnea", "Dolor_toracico", + "Tos", "Hemoptisis", "Síntomas_disautonomicos", "Edema_de_M_inferiores", "Frecuencia_respiratoria", + "Saturación_de_la_sangre", "Frecuencia_cardíaca", "Presión_sistólica","Presión_diastólica", "Fiebre", + "Crepitaciones", "Sibilancias", "Soplos", "WBC", "HB", "PLT", "Derrame", "Otra_Enfermedad", "Hematologica", + "Cardíaca", "Enfermedad_coronaria", "Diabetes_Mellitus", "Endocrina", "Gastrointestinal", "Hepatopatía_crónica", + "Hipertensión_arterial", "Neurológica", "Pulmonar", "Renal", "Trombofilia", "Urológica", "Vascular","VIH" + ) + claves = { i: [] for i in campos } for i in array_datos: for j, clave in enumerate(claves.keys()): claves[clave].append(i[j]) - return claves def obtener_probabilidades_predicciones( @@ -120,7 +99,7 @@ def generar_explicacion(self): self.explicacion = SALIDA - async def generar_diagnostico(self): + def generar_diagnostico(self): """ Genera el diagnóstico de los datos usando el modelo ONNX para normalizarlos y luego clasificarlos diff --git a/app/models/Excepciones.py b/app/models/Excepciones.py new file mode 100644 index 0000000..dbfc893 --- /dev/null +++ b/app/models/Excepciones.py @@ -0,0 +1,28 @@ +class AccesoNoAutorizado(Exception): + """Excepción personalizada para indicar que el acceso no está autorizado.""" + + def __init__(self, mensaje: str | dict, codigo: int): + self.mensaje = mensaje + self.codigo = codigo + super().__init__(self.mensaje) + +class UIDInvalido(Exception): + """Excepción personalizada para indicar que el UID es inválido.""" + + def __init__(self, mensaje: str | dict): + self.mensaje = mensaje + super().__init__(self.mensaje) + +class UsuarioInexistente(Exception): + """Excepción personalizada para indicar que el usuario no existe.""" + + def __init__(self, mensaje: str | dict): + self.mensaje = mensaje + super().__init__(self.mensaje) + +class ErrorInterno(Exception): + """Excepción personalizada para indicar que ocurrió un error interno.""" + + def __init__(self, mensaje: str | dict): + self.mensaje = mensaje + super().__init__(self.mensaje) \ No newline at end of file diff --git a/app/models/PeticionDiagnostico.py b/app/models/PeticionDiagnostico.py deleted file mode 100644 index f65248e..0000000 --- a/app/models/PeticionDiagnostico.py +++ /dev/null @@ -1,72 +0,0 @@ -from pydantic import BaseModel - -class PeticionDiagnostico(BaseModel): - """ - Clase que representa una petición de diagnóstico con los datos necesarios. - Esta clase solo valida que los datos sean del tipo correcto y los valores - requeridos para el diagnostico estén presentes. - """ - edad: int - sexo: int - bebedor: int - fumador: int - proc_quirurgico_traumatismo: int - inmovilidad_de_m_inferiores: int - viaje_prolongado: int - TEP_TVP_previo: int - malignidad: int - disnea: int - dolor_toracico: int - tos: int - hemoptisis: int - sintomas_disautonomicos: int - edema_de_m_inferiores: int - frecuencia_respiratoria: int - saturacion_de_la_sangre: float - frecuencia_cardiaca: int - presion_sistolica: int - presion_diastolica: int - fiebre: int - crepitaciones: int - sibilancias: int - soplos: int - wbc: float - hb: float - plt: float - derrame: int - otra_enfermedad: int - hematologica: int - cardiaca: int - enfermedad_coronaria: int - diabetes_mellitus: int - endocrina: int - gastrointestinal: int - hepatopatia_cronica: int - hipertension_arterial: int - neurologica: int - pulmonar: int - renal: int - trombofilia: int - urologica: int - vascular: int - vih: int - - def obtener_diccionario_instancia(self): - """ - Convierte la instancia de la petición de diagnóstico en diccionario - - Returns: - dict: Diccionario con los valores de la instancia - """ - return { - "Inmovilidad_de_M_inferiores": [self.inmovilidad_de_m_inferiores], "Procedimiento_Quirurgicos___Traumatismo_Grave_en_los_últimos_15_dias": [self.proc_quirurgico_traumatismo], - "Viaje_prolongado": [self.viaje_prolongado], "Síntomas_disautonomicos": [self.sintomas_disautonomicos], "Edema_de_M_inferiores": [self.edema_de_m_inferiores], - "Dolor_toracico": [self.dolor_toracico], "TEP___TVP_Previo": [self.TEP_TVP_previo], "Frecuencia_respiratoria": [self.frecuencia_respiratoria], - "Presión_sistólica": [self.presion_sistolica], "Presión_diastólica": [self.presion_diastolica], "Saturación_de_la_sangre": [self.saturacion_de_la_sangre], "Frecuencia_cardíaca": [self.frecuencia_cardiaca], - "WBC": [self.wbc], "HB": [self.hb], "PLT": [self.plt], "Género": [self.sexo], "Edad": [self.edad], "Fumador": [self.fumador], "Bebedor": [self.bebedor], "Malignidad": [self.malignidad], - "Disnea": [self.disnea], "Tos": [self.tos], "Hemoptisis": [self.hemoptisis], "Fiebre": [self.fiebre], "Crepitaciones": [self.crepitaciones], "Sibilancias": [self.sibilancias], - "Soplos": [self.soplos], "Derrame": [self.derrame], "Otra_Enfermedad": [self.otra_enfermedad], "Diabetes_Mellitus": [self.diabetes_mellitus], - "Hipertensión_arterial": [self.hipertension_arterial], "Enfermedad_coronaria": [self.enfermedad_coronaria], "Trombofilia": [self.trombofilia], "VIH": [self.vih], - "Hepatopatía_crónica": [self.hepatopatia_cronica], "Renal": [self.renal], "Cardíaca": [self.cardiaca], "Neurológica": [self.neurologica], "Pulmonar": [self.pulmonar], "Endocrina": [self.endocrina], - "Gastrointestinal": [self.gastrointestinal], "Hematologica": [self.hematologica], "Urológica": [self.urologica], "Vascular": [self.vascular], - } \ No newline at end of file diff --git a/app/models/PeticionRecaptcha.py b/app/models/PeticionRecaptcha.py deleted file mode 100644 index d8bf2b2..0000000 --- a/app/models/PeticionRecaptcha.py +++ /dev/null @@ -1,7 +0,0 @@ -from pydantic import BaseModel - -class PeticionRecaptcha(BaseModel): - """ - Clase que representa una petición para validar un token de reCAPTCHA. - """ - token: str \ No newline at end of file diff --git a/app/models/Peticiones.py b/app/models/Peticiones.py new file mode 100644 index 0000000..92d0609 --- /dev/null +++ b/app/models/Peticiones.py @@ -0,0 +1,120 @@ +from pydantic import BaseModel, model_validator +from utils.Validadores import validar_txt_token + +class TokenRecaptcha(BaseModel): + """ + Clase que representa una petición para validar un token de reCAPTCHA. + """ + token: str + + @model_validator(mode="after") + def validar_token(self): + if not validar_txt_token(self.token): + raise ValueError("El token de reCAPTCHA no es válido") + return self + + +class InstanciaDiagnostico(BaseModel): + """ + Clase que representa una petición de diagnóstico con los datos necesarios. + Esta clase solo valida que los datos sean del tipo correcto y los valores + requeridos para el diagnostico estén presentes. + """ + edad: int + sexo: int + bebedor: int + fumador: int + proc_quirurgico_traumatismo: int + inmovilidad_de_m_inferiores: int + viaje_prolongado: int + TEP_TVP_previo: int + malignidad: int + disnea: int + dolor_toracico: int + tos: int + hemoptisis: int + sintomas_disautonomicos: int + edema_de_m_inferiores: int + frecuencia_respiratoria: int + saturacion_de_la_sangre: float + frecuencia_cardiaca: int + presion_sistolica: int + presion_diastolica: int + fiebre: int + crepitaciones: int + sibilancias: int + soplos: int + wbc: float + hb: float + plt: float + derrame: int + otra_enfermedad: int + hematologica: int + cardiaca: int + enfermedad_coronaria: int + diabetes_mellitus: int + endocrina: int + gastrointestinal: int + hepatopatia_cronica: int + hipertension_arterial: int + neurologica: int + pulmonar: int + renal: int + trombofilia: int + urologica: int + vascular: int + vih: int + + @model_validator(mode="after") + def validar_datos(self): + numericas = { + "edad": self.edad, "wbc": self.wbc, "hb": self.hb, "plt": self.plt, "frecuencia_respiratoria": self.frecuencia_respiratoria, + "saturacion_de_la_sangre": self.saturacion_de_la_sangre, "frecuencia_cardiaca": self.frecuencia_cardiaca, + "presion_sistolica": self.presion_sistolica, "presion_diastolica": self.presion_diastolica + } + booleanas = { + "sexo": self.sexo, "bebedor": self.bebedor, "fumador": self.fumador, "proc_quirurgico_traumatismo": self.proc_quirurgico_traumatismo, + "inmovilidad_de_m_inferiores": self.inmovilidad_de_m_inferiores, "viaje_prolongado": self.viaje_prolongado, "TEP_TVP_previo": self.TEP_TVP_previo, + "malignidad": self.malignidad, "disnea": self.disnea, "dolor_toracico": self.dolor_toracico, "tos": self.tos, "hemoptisis": self.hemoptisis, + "sintomas_disautonomicos": self.sintomas_disautonomicos, "edema_de_m_inferiores": self.edema_de_m_inferiores, "fiebre": self.fiebre, + "crepitaciones": self.crepitaciones, "sibilancias": self.sibilancias, "soplos": self.soplos, "derrame": self.derrame, + "otra_enfermedad": self.otra_enfermedad, "hematologica": self.hematologica, "cardiaca": self.cardiaca, + "enfermedad_coronaria": self.enfermedad_coronaria, "diabetes_mellitus": self.diabetes_mellitus, "endocrina": self.endocrina, + "gastrointestinal": self.gastrointestinal, "hepatopatia_cronica":self.hepatopatia_cronica,"hipertension_arterial": self.hipertension_arterial, + "neurologica" :self.neurologica,"pulmonar" :self.pulmonar,"renal" :self.renal,"trombofilia" :self.trombofilia,"urologica" :self.urologica, + "vascular": self.vascular, "vih": self.vih + } + + for i in booleanas.keys(): + if booleanas[i] not in (0, 1): + raise ValueError(f"El valor {i} no es válido para un campo booleano") + for i in numericas.keys(): + if numericas[i] < 0: + raise ValueError(f"El valor {i} no puede ser negativo") + return self + + def obtener_diccionario_instancia(self): + """ + Convierte la instancia de la petición de diagnóstico en diccionario + + Returns: + dict: Diccionario con los valores de la instancia + """ + return { + "Edad": [self.edad], "Género": [self.sexo], "Bebedor": [self.bebedor], "Fumador": [self.fumador], + "Procedimiento_Quirurgicos___Traumatismo_Grave_en_los_últimos_15_dias": [self.proc_quirurgico_traumatismo], + "Inmovilidad_de_M_inferiores": [self.inmovilidad_de_m_inferiores], "Viaje_prolongado": [self.viaje_prolongado], + "TEP___TVP_Previo": [self.TEP_TVP_previo], "Malignidad": [self.malignidad], "Disnea": [self.disnea], + "Dolor_toracico": [self.dolor_toracico], "Tos": [self.tos], "Hemoptisis": [self.hemoptisis], + "Síntomas_disautonomicos": [self.sintomas_disautonomicos], "Edema_de_M_inferiores": [self.edema_de_m_inferiores], + "Frecuencia_respiratoria": [self.frecuencia_respiratoria], "Saturación_de_la_sangre": [self.saturacion_de_la_sangre], + "Frecuencia_cardíaca": [self.frecuencia_cardiaca], "Presión_sistólica": [self.presion_sistolica], + "Presión_diastólica": [self.presion_diastolica], "Fiebre": [self.fiebre], "Crepitaciones": [self.crepitaciones], + "Sibilancias": [self.sibilancias], "Soplos": [self.soplos], "WBC": [self.wbc], "HB": [self.hb], "PLT": [self.plt], + "Derrame": [self.derrame], "Otra_Enfermedad": [self.otra_enfermedad], "Hematologica": [self.hematologica], + "Cardíaca": [self.cardiaca], "Enfermedad_coronaria": [self.enfermedad_coronaria], "Diabetes_Mellitus": [self.diabetes_mellitus], + "Endocrina": [self.endocrina], "Gastrointestinal": [self.gastrointestinal], "Hepatopatía_crónica": [self.hepatopatia_cronica], + "Hipertensión_arterial": [self.hipertension_arterial], "Neurológica": [self.neurologica], "Pulmonar": [self.pulmonar], + "Renal": [self.renal], "Trombofilia": [self.trombofilia], "Urológica": [self.urologica], "Vascular": [self.vascular], + "VIH": [self.vih], + } \ No newline at end of file diff --git a/app/routers/main_router.py b/app/routers/main_router.py index 1d7d67a..85083ef 100644 --- a/app/routers/main_router.py +++ b/app/routers/main_router.py @@ -1,44 +1,34 @@ from models.Diagnostico import Diagnostico -from models.PeticionDiagnostico import PeticionDiagnostico -from models.PeticionRecaptcha import PeticionRecaptcha +from models.Peticiones import * from fastapi import APIRouter, Depends, Request from fastapi.responses import JSONResponse from apis.Recaptcha import verificar_peticion_recaptcha from dependencies.general_dependencies import verificar_idioma -router = APIRouter(dependencies=[Depends(verificar_idioma)]) +router = APIRouter() @router.get("/healthcheck") async def healthcheck(): - return JSONResponse({"status": "ok"}, status_code=200, media_type="application/json") + return {"status": "ok"} @router.get("/credenciales") -async def obtener_credenciales(peticion: Request, idioma: str = Depends(verificar_idioma)): - try: - TEXTOS = peticion.state.textos - CREDS_FIREBASE_CLIENTE = peticion.state.credenciales - return JSONResponse( - CREDS_FIREBASE_CLIENTE, status_code=200, media_type="application/json" - ) - except Exception as e: - return JSONResponse( - {"error": f"{TEXTOS[idioma]['errTry']} {str(e)}"}, - status_code=500, - media_type="application/json", - ) +async def obtener_credenciales(peticion: Request) -> JSONResponse: + CREDS_FIREBASE_CLIENTE = peticion.state.credenciales + return CREDS_FIREBASE_CLIENTE @router.post("/diagnosticar") -async def diagnosticar(peticion: Request, req: PeticionDiagnostico, idioma: str = Depends(verificar_idioma)) -> JSONResponse: +async def diagnosticar(peticion: Request, req: InstanciaDiagnostico, idioma: str = Depends(verificar_idioma)) -> JSONResponse: + TEXTOS = peticion.state.textos + MODELO = peticion.state.modelo + EXPLICADOR = peticion.state.explicador + try: - TEXTOS = peticion.state.textos - MODELO = peticion.state.modelo - EXPLICADOR = peticion.state.explicador DATOS = req.obtener_diccionario_instancia() DIAGNOSTICO = Diagnostico(DATOS, MODELO, EXPLICADOR) - RES = await DIAGNOSTICO.generar_diagnostico() + RES = DIAGNOSTICO.generar_diagnostico() - return JSONResponse(RES, status_code=200, media_type="application/json") + return RES except Exception as e: return JSONResponse( {"error": f"{TEXTOS[idioma]['errTry']} {str(e)}"}, @@ -47,11 +37,11 @@ async def diagnosticar(peticion: Request, req: PeticionDiagnostico, idioma: str ) @router.post("/recaptcha") -async def verificar_recaptcha(peticion: Request, req: PeticionRecaptcha, idioma: str = Depends(verificar_idioma)) -> JSONResponse: +async def verificar_recaptcha(peticion: Request, req: TokenRecaptcha, idioma: str = Depends(verificar_idioma)) -> JSONResponse: + TEXTOS = peticion.state.textos try: - TEXTOS = peticion.state.textos - resultado = verificar_peticion_recaptcha(req.token, idioma, TEXTOS) - return JSONResponse(resultado, status_code=200, media_type="application/json") + RES = verificar_peticion_recaptcha(req.token, idioma, TEXTOS) + return RES except Exception as e: return JSONResponse( {"error": f"{TEXTOS[idioma]['errTry']} {str(e)}"}, diff --git a/app/routers/usuarios_router.py b/app/routers/usuarios_router.py index ee245e3..13470de 100644 --- a/app/routers/usuarios_router.py +++ b/app/routers/usuarios_router.py @@ -1,108 +1,78 @@ from fastapi import APIRouter, Depends, Request from fastapi.responses import JSONResponse from apis.FirebaseAuth import * -from utils.Validadores import validar_uid -from urllib.parse import unquote -from dependencies.usuarios_dependencies import verificar_usuario_administrador +from dependencies.usuarios_dependencies import * from dependencies.general_dependencies import verificar_idioma +from constants import COD_ERROR_ESPERADO, COD_ERROR_INESPERADO, COD_EXITO +from models.Excepciones import UsuarioInexistente, ErrorInterno router = APIRouter( - prefix="/usuarios", dependencies=[Depends(verificar_usuario_administrador), Depends(verificar_idioma)] + prefix="/usuarios", dependencies=[Depends(verificar_usuario_administrador)] ) @router.get("") async def ver_usuarios( - peticion: Request, - res_validacion_auth: tuple[bool, JSONResponse | None] = Depends(verificar_usuario_administrador), - idioma: str = Depends(verificar_idioma) + peticion: Request, idioma: str = Depends(verificar_idioma) ) -> JSONResponse: - try: - TEXTOS = peticion.state.textos - firebase_app = peticion.state.firebase_app - if not res_validacion_auth[0]: - return res_validacion_auth[1] - - return await ver_datos_usuarios(firebase_app, idioma, TEXTOS) - except Exception as e: - return JSONResponse( - {"error": f"{TEXTOS[idioma]['errTry']} Error al procesar la solicitud: {str(e)}"}, - status_code=500, - media_type="application/json", - ) + TEXTOS = peticion.state.textos + firebase_app = peticion.state.firebase_app + COD, RES = await ver_datos_usuarios(firebase_app) + + if COD != COD_EXITO: + raise ErrorInterno({"error": TEXTOS[idioma]["errObtenerDatosUsuarios"]}) + + return { "usuarios": RES } @router.get("/{uid}") async def ver_usuario( peticion: Request, - uid: str, res_validacion_auth: tuple[bool, JSONResponse | None] = Depends(verificar_usuario_administrador), - idioma: str = Depends(verificar_idioma) + uid: str = Depends(validador_uid), + idioma: str = Depends(verificar_idioma), ) -> JSONResponse: - try: - TEXTOS = peticion.state.textos - firebase_app = peticion.state.firebase_app - if not res_validacion_auth[0]: - return res_validacion_auth[1] + TEXTOS = peticion.state.textos + firebase_app = peticion.state.firebase_app - uid = unquote(uid) - VALIDACION = validar_uid(uid) + CODIGO, RES = await ver_datos_usuario(firebase_app, uid) - if not VALIDACION: - raise ValueError(f"{TEXTOS[idioma]['errUIDInvalido']}") - - return await ver_datos_usuario(firebase_app, uid, idioma, TEXTOS) - except ValueError: - return JSONResponse( - {"error": TEXTOS[idioma]['errUIDInvalido']}, - status_code=400, - media_type="application/json", - ) - except Exception as e: - return JSONResponse( - {"error": f"{TEXTOS[idioma]['errTry']} {str(e)}"}, - status_code=500, - media_type="application/json", + if CODIGO == COD_ERROR_ESPERADO: + raise UsuarioInexistente( + {"error": TEXTOS[idioma]["errUsuarioNoEncontrado"]} ) + elif CODIGO == COD_ERROR_INESPERADO: + raise ErrorInterno({"error": TEXTOS[idioma]["errObtenerDatosUsuario"]}) + + return RES + @router.patch("/{uid}") async def actualizar_usuario( peticion: Request, - uid: str, desactivar: bool, res_validacion_auth: tuple[bool, JSONResponse | None] = Depends(verificar_usuario_administrador), - idioma: str = Depends(verificar_idioma) + desactivar: bool, + uid: str = Depends(validador_uid), + idioma: str = Depends(verificar_idioma), ) -> JSONResponse: - try: - TEXTOS = peticion.state.textos - firebase_app = peticion.state.firebase_app - - if not res_validacion_auth[0]: - return res_validacion_auth[1] - - uid = unquote(uid) - VALIDACION = validar_uid(uid) - - if not VALIDACION: - raise ValueError(f"{TEXTOS[idioma]['errUIDInvalido']}") - - DATOS = ver_usuario_firebase(firebase_app, uid) - if DATOS[0] == 0: - return JSONResponse( - {"error": f"{TEXTOS[idioma]['errUsuarioNoEncontrado']}"}, - status_code=404, - media_type="application/json", - ) - elif DATOS[0] == -1: - raise Exception(f"{TEXTOS[idioma]['errObtenerUsuario']}") - - return actualizar_estado_usuario(firebase_app, uid, desactivar, idioma, TEXTOS) - except ValueError: - return JSONResponse( - {"error": f"{TEXTOS[idioma]['errUIDInvalido']}"}, - status_code=400, - media_type="application/json", + TEXTOS = peticion.state.textos + firebase_app = peticion.state.firebase_app + COD, RES = ver_usuario_firebase(firebase_app, uid) + + if COD == COD_ERROR_ESPERADO: + raise UsuarioInexistente( + {"error": TEXTOS[idioma]["errUsuarioNoEncontrado"]} ) - except Exception as e: + elif COD == COD_ERROR_INESPERADO: + raise ErrorInterno({"error": TEXTOS[idioma]["errObtenerUsuario"]}) + + CODIGO, RES = actualizar_estado_usuario(firebase_app, uid, desactivar) + + if CODIGO != COD_EXITO: + TEXTO = "errEstadoInvalido" if CODIGO == COD_ERROR_ESPERADO else "errTry" + COD = 401 if CODIGO == COD_ERROR_ESPERADO else 500 return JSONResponse( - {"error": f"{TEXTOS[idioma]['errTry']} {str(e)}"}, - status_code=500, + {"error": f"{TEXTOS[idioma][TEXTO]}"}, + status_code=COD, media_type="application/json", - ) \ No newline at end of file + ) + + return RES diff --git a/app/utils/Diccionario.py b/app/utils/Diccionario.py index 2b34787..5848d93 100644 --- a/app/utils/Diccionario.py +++ b/app/utils/Diccionario.py @@ -7,8 +7,4 @@ def ver_si_existe_clave(diccionario: dict, clave: str) -> bool: Returns: bool: True si la clave existe, False en caso contrario. """ - try: - diccionario[clave] - return True - except KeyError: - return False \ No newline at end of file + return clave in diccionario.keys() \ No newline at end of file diff --git a/app/utils/Dominios.py b/app/utils/Dominios.py index d503e16..76e383c 100644 --- a/app/utils/Dominios.py +++ b/app/utils/Dominios.py @@ -8,8 +8,10 @@ def obtener_lista_dominios(texto: str) -> list[str]: list[str]: Lista de dominios sin espacios en blanco. """ RES = [dominio.strip() for dominio in texto.split(",")] + NUM_VACIOS = RES.count("") - if RES.count("") > 0: - RES.remove("") + if NUM_VACIOS > 0: + for _ in range(NUM_VACIOS): + RES.remove("") return RES \ No newline at end of file diff --git a/app/utils/Fechas.py b/app/utils/Fechas.py index 566b704..bfe8912 100644 --- a/app/utils/Fechas.py +++ b/app/utils/Fechas.py @@ -11,7 +11,8 @@ def convertir_hora(hora: int, minuto: int) -> str: Returns: str: La hora en formato HH:MM. """ - AUXHORA = 12 if (hora == 0 or hora == 12) else (hora % 12) + HORA = hora % 12 + AUXHORA = 12 if (HORA == 0) else HORA momento = "PM" if hora >= 12 else "AM" diff --git a/app/utils/Preprocesamiento.py b/app/utils/Preprocesamiento.py index 06ba623..67e1cf6 100644 --- a/app/utils/Preprocesamiento.py +++ b/app/utils/Preprocesamiento.py @@ -1,5 +1,3 @@ -from numpy import array, float32, ndarray - def evaluar_intervalo(val: int | float, intervalos: tuple[tuple]) -> int: """ Evalúa en qué intervalo se encuentra un valor dado. @@ -12,16 +10,18 @@ def evaluar_intervalo(val: int | float, intervalos: tuple[tuple]) -> int: Returns: int: La etiqueta del intervalo en el que se encuentra el valor, o -1 si no se encuentra en ningún intervalo. """ + INF = float("inf") for i in intervalos: - if (i[0] is not None) and (i[1] is not None): - if (val >= i[0]) and (val < i[1]): - return i[2] - elif (i[0] is not None) and (i[1] is None): - if val >= i[0]: - return i[2] - elif (i[0] is None) and (i[1] is not None): - if val < i[1]: - return i[2] + MIN, MAX, TAG = i + if (MIN != -INF) and (MAX != INF): + if (val >= MIN) and (val < MAX): + return TAG + elif (MIN != -INF) and (MAX == INF): + if val >= MIN: + return TAG + elif (MIN == -INF) and (MAX != INF): + if val < MAX: + return TAG return -1 @@ -39,50 +39,50 @@ def preprocesar_instancia( """ CLAVES = ( # Edad - ("Edad", ((0, 20, 0), (20, 41, 1), (41, 61, 2), (61, 81, 3), (81, None, 4))), + ("Edad", ((0, 20, 0), (20, 41, 1), (41, 61, 2), (61, 81, 3), (81, float("inf"), 4))), # Frecuencia respiratoria ("Frecuencia_respiratoria", ((15, 20, 1), (20, 25, 2), (25, 30, 3), (30, 35, 4), (35, 40, 5), - (40, 45, 6), (45, 50, 7), (50, 55, 8), (55, 60, 9), (None, 15, 10), - (60, None, 11)), + (40, 45, 6), (45, 50, 7), (50, 55, 8), (55, 60, 9), (-float("inf"), 15, 10), + (60, float("inf"), 11)), ), # Saturación de la sangre (SO2) ("Saturación_de_la_sangre", ((50, 55, 1), (55, 60, 2), (60, 65, 3), (65, 70, 4), (70, 75, 5), (75, 80, 6), (80, 85, 7), (85, 90, 8), (90, 95, 9), (95, 100, 10), - (None, 50, 11), (100, None, 12)), + (-float("inf"), 50, 11), (100, float("inf"), 12)), ), # Frecuencia cardiaca ("Frecuencia_cardíaca", ((50, 70, 1), (70, 90, 2), (90, 110, 3), (110, 130, 4), (130, 150, 5), - (150, 170, 6), (170, 190, 7), (190, 210, 8), (None, 50, 9), (210, None, 10)), + (150, 170, 6), (170, 190, 7), (190, 210, 8), (-float("inf"), 50, 9), (210, float("inf"), 10)), ), # Presión sistólica ("Presión_sistólica", ((50, 70, 1), (70, 90, 2), (90, 110, 3), (110, 130, 4), (130, 150, 5), - (150, 170, 6), (170, 190, 7), (190, 210, 8), (None, 50, 9), (210, None, 10)), + (150, 170, 6), (170, 190, 7), (190, 210, 8), (-float("inf"), 50, 9), (210, float("inf"), 10)), ), # Presión diastólica ("Presión_diastólica", ((40, 50, 1), (50, 60, 2), (60, 70, 3), (70, 80, 4), (80, 90, 5), - (90, 100, 6), (100, 110, 7), (110, 120, 8), (None, 40, 9), (120, None, 10)), + (90, 100, 6), (100, 110, 7), (110, 120, 8), (-float("inf"), 40, 9), (120, float("inf"), 10)), ), ( # Globulos blancos (WBC) "WBC", ((2000, 4000, 1), (4000, 10000, 2), (10000, 15000, 3), (15000, 20000, 4), - (20000, 30000, 5), (30000, 35000, 6), (None, 2000, 7), (35000, None, 7)), + (20000, 30000, 5), (30000, 35000, 6), (-float("inf"), 2000, 7), (35000, float("inf"), 8)), ), ( # Hemoglobina (HB) "HB", ( (6, 8, 1), (8, 10, 2), (10, 12, 3), (12, 14, 4), (14, 16, 5), (16, 18, 6), - (18, 20, 7), (20, 22, 8), (None, 6, 9), (22, None, 10)), + (18, 20, 7), (20, 22, 8), (-float("inf"), 6, 9), (22, float("inf"), 10)), ), ( # Plaquetas (PLT) "PLT", ((10000, 50000, 1), (50000, 100000, 2), (100000, 150000, 3), (150000, 400000, 4), - (400000, 500000, 5), (500000, 600000, 6), (600000, 700000, 7), (None, 10000, 9), - (700000, None, 10)), + (400000, 500000, 5), (500000, 600000, 6), (600000, 700000, 7), (-float("inf"), 10000, 9), + (700000, float("inf"), 10)), ), ) diff --git a/tests/scripts/api/test_firebaseauth.py b/tests/scripts/api/test_firebaseauth.py index 681ed66..3e3af0b 100644 --- a/tests/scripts/api/test_firebaseauth.py +++ b/tests/scripts/api/test_firebaseauth.py @@ -17,15 +17,10 @@ async def test_11(mocker: MockerFixture): Test para validar que la función "verificar_token" retorne un error cuando el token es inválido """ - REQ = mocker.MagicMock(spec=Request) - TEXTOS = {"es": {"errTokenInvalido": "Token inválido"}} - VALIDADOR = mocker.patch("app.apis.FirebaseAuth.validar_txt_token", return_value=False) + RES = await verificar_token("firebase_app", "Bearer token_invalido") - RES = await verificar_token(REQ, "firebase_app", None, "Bearer token_invalido", TEXTOS, "es") - - assert RES.status_code == 403 - assert RES.body.decode("utf-8") == '{"error":"Token inválido"}' + assert RES == 0 VALIDADOR.assert_called_once_with("token_invalido") @@ -35,16 +30,11 @@ async def test_12(mocker: MockerFixture): Test para validar que la función "verificar_token" retorne un error cuando ocurre una excepción al procesar la solicitud """ - REQ = mocker.MagicMock(spec=Request) - TEXTOS = {"es": {"errValidarToken": "Error al validar el token"}} - VALIDADOR = mocker.patch("app.apis.FirebaseAuth.validar_txt_token", return_value=True) FIREBASE_VAL = mocker.patch("app.apis.FirebaseAuth.validar_token", return_value=-1) - - RES = await verificar_token(REQ, "firebase_app", None, "Bearer token_invalido", TEXTOS, "es") + RES = await verificar_token("firebase_app", "Bearer token_invalido") - assert RES.status_code == 400 - assert RES.body.decode("utf-8") == '{"error":"Error al validar el token"}' + assert RES == -1 VALIDADOR.assert_called_once_with("token_invalido") FIREBASE_VAL.assert_called_once_with("token_invalido", "firebase_app", False) @@ -55,17 +45,13 @@ async def test_13(mocker: MockerFixture): Test para validar que la función "verificar_token" maneje correctamente los errores inesperados al validar el token de Firebase. """ - REQ = mocker.MagicMock(spec=Request) - TEXTOS = {"es": {"errTry": "Error al procesar la solicitud:"}} - VALIDADOR = mocker.patch("app.apis.FirebaseAuth.validar_txt_token", return_value=True) FIREBASE_VAL = mocker.patch("app.apis.FirebaseAuth.validar_token") FIREBASE_VAL.side_effect = Exception("Excepción imprevista") - RES = await verificar_token(REQ, "firebase_app", None, "Bearer token_invalido", TEXTOS, "es") + RES = await verificar_token("firebase_app", "Bearer token_invalido") - assert RES.status_code == 500 - assert RES.body.decode("utf-8") == '{"error":"Error al procesar la solicitud: Excepción imprevista"}' + assert RES == -1 VALIDADOR.assert_called_once_with("token_invalido") FIREBASE_VAL.assert_called_once_with("token_invalido", "firebase_app", False) @@ -78,7 +64,7 @@ def test_14(mocker: MockerFixture): REQ = mocker.MagicMock(spec=Request) REQ.headers = {"authorization": "Bearer token_invalido"} - FIREBASE = mocker.patch("firebase_admin.auth.verify_id_token") + FIREBASE = mocker.patch("app.apis.FirebaseAuth.verify_id_token") FIREBASE.side_effect = ExpiredIdTokenError("Token expirado", "EL token está expirado.") RES = validar_token("token_invalido", "firebase_app", False) @@ -94,7 +80,7 @@ def test_15(mocker: MockerFixture): REQ = mocker.MagicMock(spec=Request) REQ.headers = {"authorization": "Bearer token_invalido"} - FIREBASE = mocker.patch("firebase_admin.auth.verify_id_token") + FIREBASE = mocker.patch("app.apis.FirebaseAuth.verify_id_token") FIREBASE.side_effect = CertificateFetchError("Error al obtener el certificado", "No se pudo obtener el certificado.") RES = validar_token("token_invalido", "firebase_app", False) @@ -107,7 +93,7 @@ def test_22(mocker: MockerFixture): Test para validar que la función "validar_token" retorne los datos del token cuando este es válido """ - FIREBASE = mocker.patch("firebase_admin.auth.verify_id_token", return_value={ "uid": "a1234H" }) + FIREBASE = mocker.patch("app.apis.FirebaseAuth.verify_id_token", return_value={ "uid": "a1234H" }) RES = validar_token("token_valido", "firebase_app", True) @@ -180,12 +166,12 @@ async def test_26(mocker: MockerFixture): FIRESTORE = mocker.patch("app.apis.FirebaseAuth.obtener_roles_usuarios") FIRESTORE.return_value = { "12345": 0 } - FIREBASE = mocker.patch("firebase_admin.auth.list_users", return_value=LISTA) + FIREBASE = mocker.patch("app.apis.FirebaseAuth.list_users", return_value=LISTA) - RES = await ver_datos_usuarios("firebase_app", "es", {}) + RES = await ver_datos_usuarios("firebase_app") + USUARIOS = [{"correo":"usuario@correo.com","uid":"12345","nombre":"usuario","rol":0,"estado":True,"fecha_registro":"26/07/2025 11:56 AM","ultima_conexion":"26/07/2025 11:56 AM"}] - assert RES.status_code == 200 - assert RES.body.decode("utf-8") == '{"usuarios":[{"correo":"usuario@correo.com","uid":"12345","nombre":"usuario","rol":0,"estado":true,"fecha_registro":"26/07/2025 11:56 AM","ultima_conexion":"26/07/2025 11:56 AM"}]}' + assert RES == (1, USUARIOS) FIREBASE.assert_called_once_with(app="firebase_app") FIRESTORE.assert_called_once() @@ -219,12 +205,13 @@ async def test_27(mocker: MockerFixture): FIRESTORE = mocker.patch("app.apis.FirebaseAuth.obtener_roles_usuarios") FIRESTORE.return_value = { "12345": 0 } - FIREBASE = mocker.patch("firebase_admin.auth.list_users", return_value=LISTA) - - RES = await ver_datos_usuarios("firebase_app", "es", {}) + FIREBASE = mocker.patch("app.apis.FirebaseAuth.list_users", return_value=LISTA) + USUARIOS = [ + {"correo":"usuario@correo.com","uid":"12345","nombre":"usuario","rol":0,"estado":True,"fecha_registro":"26/07/2025 11:56 AM","ultima_conexion":"26/07/2025 11:56 AM"},{"correo":"usuario@correo.com","uid":"12345","nombre":"usuario","rol":0,"estado":True,"fecha_registro":"26/07/2025 11:56 AM","ultima_conexion":"26/07/2025 11:56 AM"} + ] + RES = await ver_datos_usuarios("firebase_app") - assert RES.status_code == 200 - assert RES.body.decode("utf-8") == '{"usuarios":[{"correo":"usuario@correo.com","uid":"12345","nombre":"usuario","rol":0,"estado":true,"fecha_registro":"26/07/2025 11:56 AM","ultima_conexion":"26/07/2025 11:56 AM"},{"correo":"usuario@correo.com","uid":"12345","nombre":"usuario","rol":0,"estado":true,"fecha_registro":"26/07/2025 11:56 AM","ultima_conexion":"26/07/2025 11:56 AM"}]}' + assert RES == (1, USUARIOS) FIREBASE.assert_called_once_with(app="firebase_app") @@ -233,14 +220,11 @@ async def test_28(mocker: MockerFixture): """ Test para validar que la función "ver_datos_usuarios" maneje correctamente las excepciones. """ - TEXTOS = {"es": {"errObtenerDatosUsuarios": "Error al obtener los datos de los usuarios"}} - FIREBASE = mocker.patch("firebase_admin.auth.list_users") + FIREBASE = mocker.patch("app.apis.FirebaseAuth.list_users") FIREBASE.side_effect = Exception("Error al obtener los usuarios") + RES = await ver_datos_usuarios("firebase_app") - RES = await ver_datos_usuarios("firebase_app", "es", TEXTOS) - - assert RES.status_code == 400 - assert RES.body.decode("utf-8") == '{"error":"Error al obtener los datos de los usuarios: Error al obtener los usuarios"}' + assert RES == (-1, None) FIREBASE.assert_called_once_with(app="firebase_app") @@ -264,12 +248,11 @@ async def test_39(mocker: MockerFixture): FIRESTORE = mocker.patch("app.apis.FirebaseAuth.obtener_rol_usuario") FIRESTORE.return_value = 0 - FIREBASE = mocker.patch("firebase_admin.auth.get_user", return_value=USUARIO) + FIREBASE = mocker.patch("app.apis.FirebaseAuth.get_user", return_value=USUARIO) - RES = await ver_datos_usuario("firebase_app", "12345", "es", {}) + RES = await ver_datos_usuario("firebase_app", "12345") - assert RES.status_code == 200 - assert RES.body.decode("utf-8") == '{"correo":"usuario@correo.com","uid":"12345","nombre":"usuario","rol":0,"estado":true,"fecha_registro":"26/07/2025 11:56 AM","ultima_conexion":"26/07/2025 11:56 AM"}' + assert RES == (1, {"correo": "usuario@correo.com", "uid": "12345", "nombre": "usuario", "rol": 0, "estado": True, "fecha_registro": "26/07/2025 11:56 AM", "ultima_conexion": "26/07/2025 11:56 AM"}) FIREBASE.assert_called_once_with("12345", "firebase_app") FIRESTORE.assert_called_once() @@ -280,18 +263,14 @@ async def test_40(mocker: MockerFixture): Test para validar que la función "ver_datos_usuario" arroje una excepción al no encontrar el usuario. """ - TEXTOS = {"es": {"errUsuarioNoEncontrado": "Usuario no encontrado"}} FIRESTORE = mocker.patch("app.apis.FirebaseAuth.obtener_rol_usuario") FIRESTORE.return_value = -1 - FIREBASE = mocker.patch("firebase_admin.auth.get_user", return_value=None) - - RES = await ver_datos_usuario("firebase_app", "a1234H", "es", TEXTOS) + FIREBASE = mocker.patch("app.apis.FirebaseAuth.get_user", return_value=None) + RES = await ver_datos_usuario("firebase_app", "a1234H") - assert RES.status_code == 404 - assert RES.body.decode("utf-8") == '{"error":"Usuario no encontrado"}' + assert RES == (0, None) - FIRESTORE.assert_called_once_with("a1234H") FIREBASE.assert_called_once_with("a1234H", "firebase_app") @pytest.mark.asyncio @@ -299,28 +278,25 @@ async def test_41(mocker: MockerFixture): """ Test para validar que la función "ver_datos_usuario" maneje correctamente las excepciones. """ - TEXTOS = {"es": {"errObtenerDatosUsuarios": "Error al obtener los datos de los usuarios"}} FIRESTORE = mocker.patch("app.apis.FirebaseAuth.obtener_rol_usuario") - FIRESTORE.side_effect = Exception("a1234H") + FIRESTORE.side_effect = Exception("Error inesperado") - mocker.patch("firebase_admin.auth.get_user", return_value=None) + mocker.patch("app.apis.FirebaseAuth.get_user", return_value=None) - RES = await ver_datos_usuario("firebase_app", "a1234H", "es", TEXTOS) + RES = await ver_datos_usuario("firebase_app", "a1234H") - assert RES.status_code == 400 - assert RES.body.decode("utf-8") == '{"error":"Error al obtener los datos de los usuarios: a1234H"}' + assert RES == (-1, "Error inesperado") def test_47(mocker: MockerFixture): """ Test para validar que la función "ver_usuario_firebase" retorne los datos de un usuario existente. """ - USUARIO = mocker.MagicMock(spec=UserRecord) USUARIO.uid = "12345" USUARIO.disabled = False USUARIO.email = "correo@correo.com" - FIREBASE = mocker.patch("firebase_admin.auth.get_user", return_value=USUARIO) + FIREBASE = mocker.patch("app.apis.FirebaseAuth.get_user", return_value=USUARIO) RES = ver_usuario_firebase("firebase_app", "12345") @@ -333,7 +309,7 @@ def test_48(mocker: MockerFixture): Test para validar que la función "ver_usuario_firebase" no retorne los datos de un usuario que no existe. """ - FIREBASE = mocker.patch("firebase_admin.auth.get_user") + FIREBASE = mocker.patch("app.apis.FirebaseAuth.get_user") FIREBASE.side_effect = UserNotFoundError("Usuario no encontrado") RES = ver_usuario_firebase("firebase_app", "a1234H") @@ -346,7 +322,7 @@ def test_49(mocker: MockerFixture): """ Test para validar que la función "ver_usuario_firebase" maneje correctamente las excepciones """ - FIREBASE = mocker.patch("firebase_admin.auth.get_user") + FIREBASE = mocker.patch("app.apis.FirebaseAuth.get_user") FIREBASE.side_effect = Exception("Error inesperado") RES = ver_usuario_firebase("firebase_app", "a1234H") @@ -360,13 +336,22 @@ def test_50(mocker: MockerFixture): Test para validar que la función "actualizar_estado_usuario" retorne una JSONResponse indicando que el usuario fue actualizado correctamente. """ - TEXTOS = {"es": {"msgUsuarioActualizado": "Estado del usuario actualizado correctamente"}} - FIREBASE = mocker.patch("firebase_admin.auth.update_user") + USUARIO = mocker.MagicMock(spec=UserRecord) + USUARIO.uid = "1234" + USUARIO.disabled = False + USUARIO.email = "correo@correo.com" + USUARIO.display_name = "usuario" + + METADATOS = mocker.MagicMock(spec=UserMetadata) + METADATOS.creation_timestamp = 175354900609 + METADATOS.last_refresh_timestamp = 175354900809 + USUARIO.user_metadata = METADATOS - RES = actualizar_estado_usuario("firebase_app", "1234", False, "es", TEXTOS) + INSTANCIA = {"correo": "correo@correo.com", "uid": "1234", "nombre": "usuario", "estado": True, "fecha_registro": "23/07/1975 08:41 AM", "ultima_conexion": "23/07/1975 08:41 AM"} + FIREBASE = mocker.patch("app.apis.FirebaseAuth.update_user", return_value=USUARIO) + RES = actualizar_estado_usuario("firebase_app", "1234", False) - assert RES.status_code == 200 - assert RES.body.decode("utf-8") == '{"mensaje":"Estado del usuario actualizado correctamente"}' + assert RES == (1, INSTANCIA) FIREBASE.assert_called_once_with(uid="1234", disabled=False, app="firebase_app") @@ -375,14 +360,11 @@ def test_51(mocker: MockerFixture): Test para validar que la función "actualizar_estado_usuario" retorne un error cuando los valores de actualización son inválidos """ - TEXTOS = {"es": {"errEstadoInvalido": "Estado inválido"}} - FIREBASE = mocker.patch("firebase_admin.auth.update_user") + FIREBASE = mocker.patch("app.apis.FirebaseAuth.update_user") FIREBASE.side_effect = ValueError("Estado inválido") + RES = actualizar_estado_usuario("firebase_app", "1234", False) - RES = actualizar_estado_usuario("firebase_app", "1234", False, "es", TEXTOS) - - assert RES.status_code == 401 - assert RES.body.decode("utf-8") == '{"error":"Estado inválido"}' + assert RES == (0, None) FIREBASE.assert_called_once_with(uid="1234", disabled=False, app="firebase_app") @@ -390,13 +372,10 @@ def test_52(mocker: MockerFixture): """ Test para validar que la función "actualizar_estado_usuario" maneje correctamente las excepciones """ - TEXTOS = {"es": {"errTry": "Error al procesar la solicitud:"}} - FIREBASE = mocker.patch("firebase_admin.auth.update_user") + FIREBASE = mocker.patch("app.apis.FirebaseAuth.update_user") FIREBASE.side_effect = Exception("Error inesperado") + RES = actualizar_estado_usuario("firebase_app", "1234", False) - RES = actualizar_estado_usuario("firebase_app", "1234", False, "es", TEXTOS) - - assert RES.status_code == 500 - assert RES.body.decode("utf-8") == '{"error":"Error al procesar la solicitud: Error inesperado"}' + assert RES == (-1, "Error inesperado") FIREBASE.assert_called_once_with(uid="1234", disabled=False, app="firebase_app") \ No newline at end of file diff --git a/tests/scripts/api/test_main.py b/tests/scripts/api/test_main.py index ec0cc90..f7f5ccd 100644 --- a/tests/scripts/api/test_main.py +++ b/tests/scripts/api/test_main.py @@ -1,6 +1,9 @@ import pytest from pytest_mock import MockerFixture from fastapi.testclient import TestClient +from app.firebase_admin_config import inicializar_firebase +from firebase_admin import App +from firebase_admin.credentials import Certificate from app.main import app from contextlib import asynccontextmanager @@ -68,8 +71,7 @@ def test_9(mocker: MockerFixture): deje pasar un token válido con una petición POST proveniente de un host autorizado """ app.router.lifespan_context = mock_inicializar_modelos - VALIDADOR = mocker.patch("apis.FirebaseAuth.validar_txt_token", return_value=True) - FIREBASE = mocker.patch("firebase_admin.auth.verify_id_token", return_value=1) + mocker.patch("app.main.verificar_token", return_value=1) with TestClient(app) as CLIENTE: RES = CLIENTE.post( @@ -83,9 +85,6 @@ def test_9(mocker: MockerFixture): assert RES.status_code == 405 assert RES.json() == { "detail": "Method Not Allowed" } - - VALIDADOR.assert_called_once_with("token_valido") - FIREBASE.assert_called_once_with("token_valido", MOCK_FIREBASE_APP, check_revoked=True) @pytest.mark.asyncio async def test_10(mocker: MockerFixture): @@ -153,4 +152,19 @@ def test_82(mocker: MockerFixture): ) assert RES.status_code == 200 - assert RES.json() == TEST_CREDS \ No newline at end of file + assert RES.json() == TEST_CREDS + +def test_65(mocker: MockerFixture): + """ + Test para validar que la función que inicializa Firebase sea llamada + """ + APP = mocker.MagicMock(spec=App) + CERT = mocker.MagicMock(spec=Certificate) + + FUNC = mocker.patch("app.firebase_admin_config.initialize_app", return_value=APP) + mocker.patch("app.firebase_admin_config.Certificate", return_value=CERT) + RES = inicializar_firebase() + + assert RES == APP + + FUNC.assert_called_once_with(CERT) \ No newline at end of file diff --git a/tests/scripts/dependencies/test_general_deps.py b/tests/scripts/dependencies/test_general_deps.py index 9b0d498..dc6deec 100644 --- a/tests/scripts/dependencies/test_general_deps.py +++ b/tests/scripts/dependencies/test_general_deps.py @@ -1,5 +1,4 @@ import pytest -from pytest_mock import MockerFixture from app.dependencies.general_dependencies import verificar_idioma @pytest.mark.asyncio diff --git a/tests/scripts/dependencies/test_usuarios_deps.py b/tests/scripts/dependencies/test_usuarios_deps.py index 48499c1..328f031 100644 --- a/tests/scripts/dependencies/test_usuarios_deps.py +++ b/tests/scripts/dependencies/test_usuarios_deps.py @@ -15,10 +15,7 @@ async def test_53(mocker: MockerFixture): DATOS_TOKEN = mocker.patch("app.dependencies.usuarios_dependencies.ver_datos_token", return_value=(1, {"uid": "a1234H"})) ROL = mocker.patch("app.dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True) - RES = await verificar_usuario_administrador(PETICION) - - assert RES[0] == True - assert RES[1] is None + await verificar_usuario_administrador(PETICION, "", "es") DATOS_TOKEN.assert_called_once() ROL.assert_called_once_with("a1234H") @@ -33,11 +30,10 @@ async def test_54(mocker: MockerFixture): DATOS_TOKEN = mocker.patch("app.dependencies.usuarios_dependencies.ver_datos_token", return_value=(0, {"error": "Token inválido"})) - RES = await verificar_usuario_administrador(PETICION) - - assert RES[0] == False - assert RES[1].status_code == 403 - assert RES[1].body.decode("utf-8") == '{"error":"Token inválido"}' + with pytest.raises(AccesoNoAutorizado) as exc_info: + await verificar_usuario_administrador(PETICION, "", "es") + assert exc_info.value.detail == {"error": "Token inválido."} + assert exc_info.value.status_code == 403 DATOS_TOKEN.assert_called_once() @@ -52,11 +48,10 @@ async def test_55(mocker: MockerFixture): DATOS_TOKEN = mocker.patch("app.dependencies.usuarios_dependencies.ver_datos_token", return_value=(-1, {"error": "Error al validar el token"})) - RES = await verificar_usuario_administrador(PETICION) - - assert RES[0] == False - assert RES[1].status_code == 400 - assert RES[1].body.decode("utf-8") == '{"error":"Error al validar el token"}' + with pytest.raises(AccesoNoAutorizado) as exc_info: + await verificar_usuario_administrador(PETICION, "", "es") + assert exc_info.value.detail == {"error": "Error al validar el token"} + assert exc_info.value.status_code == 403 DATOS_TOKEN.assert_called_once() @@ -73,11 +68,10 @@ async def test_56(mocker: MockerFixture): DATOS_TOKEN = mocker.patch("app.dependencies.usuarios_dependencies.ver_datos_token", return_value=(1, {"uid": "a1234H"})) ROL = mocker.patch("app.dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=False) - RES = await verificar_usuario_administrador(PETICION) - - assert RES[0] == False - assert RES[1].status_code == 403 - assert RES[1].body.decode("utf-8") == '{"error":"Acceso denegado."}' + with pytest.raises(AccesoNoAutorizado) as exc_info: + await verificar_usuario_administrador(PETICION, "", "es") + assert exc_info.value.detail == {"error": "Acceso denegado."} + assert exc_info.value.status_code == 403 DATOS_TOKEN.assert_called_once() ROL.assert_called_once_with("a1234H") @@ -85,20 +79,29 @@ async def test_56(mocker: MockerFixture): @pytest.mark.asyncio async def test_57(mocker: MockerFixture): """ - Test para validar que el API no retorne los datos de los usuarios si el usuario no - es administrador. + Test para validar que la dependencia "validador_uid" retorne el uid si es válido. """ PETICION = mocker.MagicMock(spec=Request) - PETICION.headers = {"authorization": "Bearer token_valido"} - PETICION.state.textos = { "es": { "errTry": "Error al procesar la solicitud:" } } + PETICION.state.textos = {} + VALIDAR_UID = mocker.patch("app.dependencies.usuarios_dependencies.validar_uid", return_value=True) - DATOS_TOKEN = mocker.patch("app.dependencies.usuarios_dependencies.ver_datos_token") - DATOS_TOKEN.side_effect = Exception("Error inesperado") + RES = await validador_uid(PETICION, "a1234H", "es") + + assert RES == "a1234H" + VALIDAR_UID.assert_called_once_with("a1234H") + +@pytest.mark.asyncio +async def test_44(mocker: MockerFixture): + """ + Test para validar que la dependencia "validador_uid" arroje una excepción si el uid es inválido. + """ + PETICION = mocker.MagicMock(spec=Request) + PETICION.state.textos = { "es": { "errUIDInvalido": "UID inválido." } } - RES = await verificar_usuario_administrador(PETICION) + VALIDAR_UID = mocker.patch("app.dependencies.usuarios_dependencies.validar_uid", return_value=False) - assert RES[0] == False - assert RES[1].status_code == 500 - assert RES[1].body.decode("utf-8") == '{"error":"Error al procesar la solicitud: Error inesperado"}' + with pytest.raises(UIDInvalido) as exc_info: + await validador_uid(PETICION, "a1234H", "es") + assert exc_info.value.detail == {"error": "UID inválido."} - DATOS_TOKEN.assert_called_once() \ No newline at end of file + VALIDAR_UID.assert_called_once_with("a1234H") \ No newline at end of file diff --git a/tests/scripts/models/test_diagnostico.py b/tests/scripts/models/test_diagnostico.py index 4551107..818ba93 100644 --- a/tests/scripts/models/test_diagnostico.py +++ b/tests/scripts/models/test_diagnostico.py @@ -42,7 +42,7 @@ async def test_7(): "Gastrointestinal": [0], "Hematologica": [0], "Urológica": [0], "Vascular": [0], } OBJ = Diagnostico(INSTANCIA, MODELO, EXPLAINER) - RES = await OBJ.generar_diagnostico() + RES = OBJ.generar_diagnostico() assert RES["prediccion"] == True assert round(RES["probabilidad"],0) == 1.0 diff --git a/tests/scripts/models/test_peticionRecaptcha.py b/tests/scripts/models/test_peticionRecaptcha.py deleted file mode 100644 index 02db9dd..0000000 --- a/tests/scripts/models/test_peticionRecaptcha.py +++ /dev/null @@ -1,17 +0,0 @@ -from app.models.PeticionRecaptcha import PeticionRecaptcha -from pydantic import ValidationError -import pytest - -def test_66(): - """ - Test para validar que la clase reconoce correctamente una instancia. - """ - instancia = PeticionRecaptcha(**{"token": "test_token"}) - assert instancia.token == "test_token" - -def test_67(): - """ - Test para validar que la clase lanza un error con datos inválidos. - """ - with pytest.raises(ValidationError): - PeticionRecaptcha(**{"token": 123}) \ No newline at end of file diff --git a/tests/scripts/models/test_peticionDiagnostico.py b/tests/scripts/models/test_peticiones.py similarity index 89% rename from tests/scripts/models/test_peticionDiagnostico.py rename to tests/scripts/models/test_peticiones.py index 52c6627..f0400c9 100644 --- a/tests/scripts/models/test_peticionDiagnostico.py +++ b/tests/scripts/models/test_peticiones.py @@ -1,5 +1,4 @@ -from app.models.PeticionDiagnostico import PeticionDiagnostico -from numpy import array +from app.models.Peticiones import * from pydantic import ValidationError import pytest @@ -54,7 +53,7 @@ def test_5(): "vascular": 0, "vih": 0, } - OBJ = PeticionDiagnostico(**DATOS) + OBJ = InstanciaDiagnostico(**DATOS) RES = OBJ.obtener_diccionario_instancia() assert RES == { "Inmovilidad_de_M_inferiores": [0], "Procedimiento_Quirurgicos___Traumatismo_Grave_en_los_últimos_15_dias": [0], @@ -77,7 +76,7 @@ def test_6(): DATOS = { "edad": "4", "sexo": True, - "bebedor": "0", + "bebedor": -1, "fumador": "0", "proc_quirurgico_traumatismo": "0", "inmovilidad_de_m_inferiores": [], @@ -120,4 +119,18 @@ def test_6(): "vascular": "0", "vih": "0", } - PeticionDiagnostico(**DATOS) \ No newline at end of file + InstanciaDiagnostico(**DATOS) + +def test_66(): + """ + Test para validar que la clase reconoce correctamente una instancia. + """ + instancia = TokenRecaptcha(**{"token": "a"*829}) + assert instancia.token == "a"*829 + +def test_67(): + """ + Test para validar que la clase lanza un error con datos inválidos. + """ + with pytest.raises(ValidationError): + TokenRecaptcha(**{"token": 123}) \ No newline at end of file diff --git a/tests/scripts/routers/test_main_router.py b/tests/scripts/routers/test_main_router.py index e014aff..2a75c32 100644 --- a/tests/scripts/routers/test_main_router.py +++ b/tests/scripts/routers/test_main_router.py @@ -29,7 +29,8 @@ } TEXTOS = { - "es": { "errTry": "Error al procesar la solicitud:"}, + "es": { "errTry": "Error al procesar la solicitud:", + "errTokenInvalido": "Token inválido o expirado." }, } @@ -114,7 +115,7 @@ def test_16(mocker: MockerFixture): app.router.lifespan_context = mock_inicializar_modelos VALIDADOR = mocker.patch("apis.FirebaseAuth.validar_txt_token", return_value=True) - FIREBASE = mocker.patch("firebase_admin.auth.verify_id_token", return_value=1) + FIREBASE = mocker.patch("apis.FirebaseAuth.verify_id_token", return_value=1) with TestClient(app) as CLIENTE: RES = CLIENTE.post( @@ -131,10 +132,7 @@ def test_16(mocker: MockerFixture): assert len(JSON["lime"]) == 10 VALIDADOR.assert_called_once_with("token_valido") - FIREBASE.assert_called_once_with("token_valido", { - "appId": "test_app_id", - "cred": {"projectId": "test_project_id", "certificated": True}, - }, check_revoked=True) + FIREBASE.assert_called_once_with("token_valido", MOCK_FIREBASE_APP, check_revoked=True) def test_17(mocker: MockerFixture): """ @@ -191,7 +189,7 @@ def test_17(mocker: MockerFixture): app.router.lifespan_context = mock_inicializar_modelos VALIDADOR = mocker.patch("apis.FirebaseAuth.validar_txt_token", return_value=True) - FIREBASE = mocker.patch("firebase_admin.auth.verify_id_token", return_value=1) + FIREBASE = mocker.patch("apis.FirebaseAuth.verify_id_token", return_value=1) DIAGNOSTICO = mocker.patch("models.Diagnostico.Diagnostico.generar_diagnostico") DIAGNOSTICO.side_effect = Exception("Error al generar el diagnóstico") @@ -208,10 +206,7 @@ def test_17(mocker: MockerFixture): assert RES.json() == {"error": "Error al procesar la solicitud: Error al generar el diagnóstico"} VALIDADOR.assert_called_once_with("token_valido") - FIREBASE.assert_called_once_with("token_valido", { - "appId": "test_app_id", - "cred": {"projectId": "test_project_id", "certificated": True}, - }, check_revoked=True) + FIREBASE.assert_called_once_with("token_valido", MOCK_FIREBASE_APP, check_revoked=True) DIAGNOSTICO.assert_called_once() def test_73(mocker: MockerFixture): @@ -227,13 +222,13 @@ def test_73(mocker: MockerFixture): "/recaptcha", headers={"Origin": "http://localhost:5178", "Host": "localhost", "Authorization": "Bearer token_valido"}, - json={"token": "token_valido"} + json={"token": "token_valido"*80} ) assert RES.status_code == 200 assert RES.json() == {"success": True, "hostname": "0.0.0.0"} - FUNC.assert_called_once_with("token_valido", "es", TEXTOS) + FUNC.assert_called_once_with("token_valido"*80, "es", TEXTOS) def test_74(mocker: MockerFixture): """ @@ -248,15 +243,15 @@ def test_74(mocker: MockerFixture): "/recaptcha", headers={"Origin": "http://localhost:5178", "Host": "localhost", "Authorization": "Bearer token_valido"}, - json={"token": "token_valido"} + json={"token": "token_valido"*80} ) assert RES.status_code == 500 assert RES.json() == {"error": "Error al procesar la solicitud: Error de verificación"} - FUNC.assert_called_once_with("token_valido", "es", TEXTOS) + FUNC.assert_called_once_with("token_valido"*80, "es", TEXTOS) -def test_91(): +def test_33(): """ Test para validar que el endpoint de healthcheck retorne la respuesta correcta. """ diff --git a/tests/scripts/routers/test_usuarios_router.py b/tests/scripts/routers/test_usuarios_router.py index f4053ba..cffe766 100644 --- a/tests/scripts/routers/test_usuarios_router.py +++ b/tests/scripts/routers/test_usuarios_router.py @@ -91,10 +91,7 @@ def test_31(mocker: MockerFixture): ) USUARIO = mocker.patch("routers.usuarios_router.ver_datos_usuarios") - USUARIO.return_value = JSONResponse( - status_code=200, media_type="application/json", content={"usuarios": DATOS} - ) - + USUARIO.return_value = (1, DATOS) app.router.lifespan_context = mock_inicializar_modelos with TestClient(app) as CLIENTE: @@ -147,37 +144,12 @@ def test_32(mocker: MockerFixture): ROL.assert_called_once_with("a1234H") -def test_33(mocker: MockerFixture): - """ - Test para validar que el API no retorne los datos de los usuarios si el usuario no - es administrador. - """ - DATOS_TOKEN = mocker.patch("dependencies.usuarios_dependencies.ver_datos_token") - DATOS_TOKEN.side_effect = Exception("Error inesperado") - - app.router.lifespan_context = mock_inicializar_modelos - - with TestClient(app) as CLIENTE: - RES = CLIENTE.get( - "/admin/usuarios", - headers={ - "Origin": "http://localhost:5178", - "Host": "localhost", - "Authorization": "Bearer token_valido", - }, - ) - - assert RES.status_code == 500 - assert RES.json() == {"error": "Error al procesar la solicitud: Error inesperado"} - - DATOS_TOKEN.assert_called_once() - - def test_42(mocker: MockerFixture): """ Test para validar que el API retorne los datos de un usuario con una petición autenticada. """ + UID = mocker.patch("dependencies.usuarios_dependencies.validar_uid", return_value="a1234H") DATOS = { "correo": "usuario@correo.com", "uid": "a1234H", @@ -198,9 +170,7 @@ def test_42(mocker: MockerFixture): app.router.lifespan_context = mock_inicializar_modelos USUARIO = mocker.patch("routers.usuarios_router.ver_datos_usuario") - USUARIO.return_value = JSONResponse( - status_code=200, media_type="application/json", content=DATOS - ) + USUARIO.return_value = (1, DATOS) with TestClient(app) as CLIENTE: RES = CLIENTE.get( @@ -216,17 +186,10 @@ def test_42(mocker: MockerFixture): assert RES.status_code == 200 assert RES.json() == DATOS + UID.assert_called_once_with("a1234H") DATOS_TOKEN.assert_called_once() ROL.assert_called_once_with("a1234H") - USUARIO.assert_called_once_with( - { - "appId": "test_app_id", - "cred": {"projectId": "test_project_id", "certificated": True}, - }, - "a1234H", - "es", - MOCK_TEXTOS, - ) + USUARIO.assert_called_once_with(MOCK_FIREBASE_APP, "a1234H") def test_43(mocker: MockerFixture): @@ -254,32 +217,6 @@ def test_43(mocker: MockerFixture): DATOS_TOKEN.assert_called_once() - -def test_44(mocker: MockerFixture): - """ - Test para validar que el API no retorne los datos de los usuarios si ocurre una excepción. - """ - - DATOS_TOKEN = mocker.patch("dependencies.usuarios_dependencies.ver_datos_token") - DATOS_TOKEN.side_effect = Exception("Error inesperado") - app.router.lifespan_context = mock_inicializar_modelos - - with TestClient(app) as CLIENTE: - RES = CLIENTE.get( - "/admin/usuarios/a1234H", - headers={ - "Origin": "http://localhost:5178", - "Host": "localhost", - "Authorization": "Bearer token_valido", - }, - ) - - assert RES.status_code == 500 - assert RES.json() == {"error": "Error al procesar la solicitud: Error inesperado"} - - DATOS_TOKEN.assert_called_once() - - def test_45(mocker: MockerFixture): """ Test para validar que el API no retorne los datos de un usuario con un @@ -348,6 +285,13 @@ def test_60(mocker: MockerFixture): """ Test para validar que el API actualice el estado de un usuario correctamente. """ + MOCK_USUARIO = { + "correo" : "correo@correo.com", "uid" : "a1234H", + "nombre": "correo", "estado": False, + "fecha_registro": "12/12/2025 4:00 PM", + "ultima_conexion": "12/12/2025 4:00 PM", + } + UID = mocker.patch("dependencies.usuarios_dependencies.validar_uid", return_value="a1234H") DATOS_TOKEN = mocker.patch( "dependencies.usuarios_dependencies.ver_datos_token", return_value=(1, {"uid": "a1234H"}), @@ -355,7 +299,6 @@ def test_60(mocker: MockerFixture): ROL = mocker.patch( "dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True ) - mocker.patch("routers.usuarios_router.validar_uid", return_value=True) USUARIO = mocker.MagicMock(spec=UserRecord) USUARIO.uid = "a1234H" @@ -366,11 +309,7 @@ def test_60(mocker: MockerFixture): ACT = mocker.patch( "routers.usuarios_router.actualizar_estado_usuario", ) - ACT.return_value = JSONResponse( - {"mensaje": "Estado del usuario actualizado correctamente"}, - status_code=200, - media_type="application/json", - ) + ACT.return_value = (1, MOCK_USUARIO) app.router.lifespan_context = mock_inicializar_modelos @@ -385,17 +324,12 @@ def test_60(mocker: MockerFixture): ) assert RES.status_code == 200 - assert RES.json() == {"mensaje": "Estado del usuario actualizado correctamente"} + assert RES.json() == MOCK_USUARIO DATOS_TOKEN.assert_called_once() + UID.assert_called_once_with("a1234H") ROL.assert_called_once_with("a1234H") - FIRESTORE.assert_called_once_with( - { - "appId": "test_app_id", - "cred": {"projectId": "test_project_id", "certificated": True}, - }, - "a1234H", - ) + FIRESTORE.assert_called_once_with(MOCK_FIREBASE_APP, "a1234H") def test_61(mocker: MockerFixture): @@ -430,6 +364,7 @@ def test_62(mocker: MockerFixture): Test para validar que el API retorne un error al intentar actualizar el estado de un usuario inexistente. """ + UID = mocker.patch("dependencies.usuarios_dependencies.validar_uid", return_value="a1234H") DATOS_TOKEN = mocker.patch("dependencies.usuarios_dependencies.ver_datos_token") DATOS_TOKEN.return_value = (1, {"uid": "a1234H"}) @@ -440,8 +375,6 @@ def test_62(mocker: MockerFixture): "dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True ) FUNC = mocker.patch("routers.usuarios_router.actualizar_estado_usuario") - - mocker.patch("routers.usuarios_router.validar_uid", return_value=True) app.router.lifespan_context = mock_inicializar_modelos with TestClient(app) as CLIENTE: @@ -458,6 +391,7 @@ def test_62(mocker: MockerFixture): assert RES.json() == {"error": "Usuario no encontrado"} ROL.assert_called_once() + UID.assert_called_once_with("a1234H") DATOS_TOKEN.assert_called_once() FUNC.assert_not_called() @@ -466,18 +400,14 @@ def test_63(mocker: MockerFixture): """ Test para validar que el API no actualice los datos de un usuario si ocurre una excepción. """ - DATOS_TOKEN = mocker.patch("dependencies.usuarios_dependencies.ver_datos_token") - DATOS_TOKEN.return_value = (1, {"uid": "a1234H"}) + UID = mocker.patch("dependencies.usuarios_dependencies.validar_uid", return_value="a1234H") + TOKEN = mocker.patch("dependencies.usuarios_dependencies.ver_datos_token", return_value=(1, {"uid": "a1234H"})) + ROL = mocker.patch("dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True) - FIRESTORE = mocker.patch("routers.usuarios_router.ver_usuario_firebase") + FIRESTORE = mocker.patch("app.routers.usuarios_router.ver_usuario_firebase") FIRESTORE.return_value = (-1, None) - ROL = mocker.patch( - "dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True - ) - - FUNC = mocker.patch("routers.usuarios_router.actualizar_estado_usuario") - mocker.patch("routers.usuarios_router.validar_uid", return_value=True) + FUNC = mocker.patch("app.routers.usuarios_router.actualizar_estado_usuario") app.router.lifespan_context = mock_inicializar_modelos with TestClient(app) as CLIENTE: @@ -490,18 +420,20 @@ def test_63(mocker: MockerFixture): }, ) - assert RES.status_code == 500 + assert RES.status_code == 400 assert RES.json() == { - "error": "Error al procesar la solicitud: Error al obtener el usuario" + "error": "Error al obtener el usuario" } - FUNC.assert_not_called() ROL.assert_called_once() + FUNC.assert_not_called() + UID.assert_called_once() + TOKEN.assert_called_once() def test_64(mocker: MockerFixture): """ - Test para validar que actualice los datos de un usuario si se lanza un ValueError. + Test para validar que no actualice los datos de un usuario si se lanza un ValueError. """ DATOS_TOKEN = mocker.patch("dependencies.usuarios_dependencies.ver_datos_token") DATOS_TOKEN.return_value = (1, {"uid": "a1234H"}) @@ -509,7 +441,7 @@ def test_64(mocker: MockerFixture): ROL = mocker.patch( "dependencies.usuarios_dependencies.verificar_rol_usuario", return_value=True ) - mocker.patch("routers.usuarios_router.validar_uid", return_value=False) + mocker.patch("dependencies.usuarios_dependencies.validar_uid", return_value=False) FUNC = mocker.patch("app.routers.usuarios_router.actualizar_estado_usuario") app.router.lifespan_context = mock_inicializar_modelos @@ -528,31 +460,4 @@ def test_64(mocker: MockerFixture): assert RES.json() == {"error": "UID inválido"} FUNC.assert_not_called() - ROL.assert_called_once_with("a1234H") - - -def test_65(mocker: MockerFixture): - """ - Test para validar que actualice los datos de un usuario si se lanza un ValueError. - """ - DATOS_TOKEN = mocker.patch("dependencies.usuarios_dependencies.ver_datos_token") - DATOS_TOKEN.side_effect = Exception("Error inesperado") - - FUNC = mocker.patch("routers.usuarios_router.actualizar_estado_usuario") - - app.router.lifespan_context = mock_inicializar_modelos - - with TestClient(app) as CLIENTE: - RES = CLIENTE.patch( - "/admin/usuarios/a1234H?desactivar=true", - headers={ - "Origin": "http://localhost:5178", - "Host": "localhost", - "Authorization": "Bearer token_valido", - }, - ) - - assert RES.status_code == 500 - assert RES.json() == {"error": "Error al procesar la solicitud: Error inesperado"} - - FUNC.assert_not_called() + ROL.assert_called_once_with("a1234H") \ No newline at end of file diff --git a/tests/scripts/utils/test_preprocesamiento.py b/tests/scripts/utils/test_preprocesamiento.py index 8de600a..63425ba 100644 --- a/tests/scripts/utils/test_preprocesamiento.py +++ b/tests/scripts/utils/test_preprocesamiento.py @@ -12,14 +12,14 @@ def test_84(): Validar que la función retorne correctamente el número de intervalo al que pertenece un valor cuando este solo está acotado a su derecha """ - assert evaluar_intervalo(12, [[None, 15, 3], [15, 20, 2], [21, None, 1]]) == 3 + assert evaluar_intervalo(12, [[-float("inf"), 15, 3], [15, 20, 2], [21, float("inf"), 1]]) == 3 def test_85(): """ Validar que la función retorne correctamente el número de intervalo al que pertenece un valor cuando este solo está acotado a su izquierda """ - assert evaluar_intervalo(20, [[19, None, 4], [10, 19, 1]]) == 4 + assert evaluar_intervalo(20, [[19, float("inf"), 4], [10, 19, 1]]) == 4 def test_86(): """