diff --git a/.env.example b/.env.example index c00720ee..2cd1e25e 100755 --- a/.env.example +++ b/.env.example @@ -33,6 +33,7 @@ DB_PASSWORD= DB_PORT= # MONGO +MONGO_ENABLED=true MONGO_USER= MONGO_PASSWORD= MONGO_HOST= @@ -41,6 +42,8 @@ MONGO_NAME= MONGO_AUTH_SOURCE= # RABBIT +RABBIT_ENABLED= +RABBIT_OPTIONS= RABBIT_USER= RABBIT_PASSWORD= RABBIT_HOST= diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml deleted file mode 100644 index e19c1721..00000000 --- a/.gitlab-ci.yml +++ /dev/null @@ -1,123 +0,0 @@ -stages: - - analysis - - build - - deploy - -variables: - HELM_CHART_NAME: default/default-chart - HELM_NAME: microservice-template - PROJECT_GROUP: pagtel - - VERSION_PATTERN: '(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(-(0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(\.(0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*)?(\+[0-9a-zA-Z-]+(\.[0-9a-zA-Z-]+)*)?$' - CONFIG_REPOSITORY_BASE_URL: https://gitlab.com/api/v4/projects/28522576/repository/files - -sonar: - stage: analysis - image: - name: sonarsource/sonar-scanner-cli:latest - entrypoint: [""] - variables: - SONAR_USER_HOME: "${CI_PROJECT_DIR}/.sonar" - GIT_DEPTH: "0" - cache: - key: "${CI_JOB_NAME}" - paths: - - .sonar/cache - script: - - sonar-scanner - allow_failure: true - when: manual - only: - - /^v.*$/ - - /^v.*-.*$/ - - -Build (Homologation): - tags: - - arm64 - stage: build - image: docker:latest - services: - - docker:dind - before_script: - - export DOCKER_BUILDKIT=1 - - export BUILD_ARCHITECTURE=linux/arm64 - - export CACHE_IMAGE_TAG=cache-arm64 - - - apk add --no-cache --upgrade grep - - docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY - - docker run --privileged --rm tonistiigi/binfmt --install all - script: - - export TAG=$( echo $CI_COMMIT_REF_NAME | grep -Po $VERSION_PATTERN ) - - echo 'Building version:' $TAG - - docker buildx create --name custom-builder --use --bootstrap - - docker buildx build --cache-from type=registry,ref=$CI_REGISTRY_IMAGE:${CACHE_IMAGE_TAG} -t $CI_REGISTRY_IMAGE:${CACHE_IMAGE_TAG} --push --platform ${BUILD_ARCHITECTURE} -t $CI_REGISTRY_IMAGE:$TAG . - when: manual - only: - - /^v.*$/ - - /^v.*-.*$/ - -Deploy (Homologation): - image: registry.gitlab.com/pagtel-devops/docker-imagens:helm-1.0.0 - stage: deploy - before_script: - - echo $K8S_AWS_CLUSTER_IP kube.pagtel.com.br >> /etc/hosts - - mkdir -p $HOME/.kube - - echo $K8S_AWS_CLUSTER_CONFIG | base64 --decode > $HOME/.kube/config - - chown $(id -u):$(id -g) $HOME/.kube/config - - helm repo add default https://pagtel-devops.gitlab.io/charts-repository/default - - helm repo update - script: - - export TAG=$( echo $CI_COMMIT_REF_NAME | grep -Po $VERSION_PATTERN ) - - echo 'Deploy of TAG:' $TAG - - kubectl apply -n homologation -f "${CONFIG_REPOSITORY_BASE_URL}/${PROJECT_GROUP}%2F${HELM_NAME}%2Fconfig-map.yaml/raw?private_token=$ACCESS_TOKEN_READ_REPOSITORY&ref=homologation" - - kubectl apply -n homologation -f "${CONFIG_REPOSITORY_BASE_URL}/${PROJECT_GROUP}%2F${HELM_NAME}%2Fsecret.yaml/raw?private_token=$ACCESS_TOKEN_READ_REPOSITORY&ref=homologation" - - helm upgrade -n homologation ${PROJECT_GROUP}-${HELM_NAME} $HELM_CHART_NAME --set image.tag=$TAG --values ./helm/homologation-values.yaml --install - when: manual - only: - - /^v.*$/ - - /^v.*-.*$/ - -Build (Production): - stage: build - image: docker:latest - services: - - docker:dind - before_script: - - export DOCKER_BUILDKIT=1 - - export BUILD_ARCHITECTURE=linux/amd64 - - export CACHE_IMAGE_TAG=cache-amd64 - - export NOT_ALLOWED_VERSION="^v.*-.*$" - - - apk add --no-cache --upgrade grep - - docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY - - docker run --privileged --rm tonistiigi/binfmt --install all - script: - - if [[ $CI_COMMIT_TAG =~ $NOT_ALLOWED_VERSION ]]; then echo 'VERSION NOT ALLOWED.' && exit 0; fi - - export TAG=$( echo $CI_COMMIT_TAG | grep -Po $VERSION_PATTERN ) - - echo 'Building version:' $TAG - - docker buildx create --name custom-builder --use --bootstrap - - docker buildx build --cache-from type=registry,ref=$CI_REGISTRY_IMAGE:${CACHE_IMAGE_TAG} -t $CI_REGISTRY_IMAGE:${CACHE_IMAGE_TAG} --push --platform ${BUILD_ARCHITECTURE} -t $CI_REGISTRY_IMAGE:$TAG . - when: manual - only: - - tags - -Deploy (Production): - image: registry.gitlab.com/pagtel-devops/docker-imagens:helm-1.0.0 - stage: deploy - before_script: - - echo $K8S_VIVO_CLUSTER_IP kube.pagtel.com.br >> /etc/hosts - - mkdir -p $HOME/.kube - - echo $K8S_VIVO_CLUSTER_CONFIG | base64 --decode > $HOME/.kube/config - - chown $(id -u):$(id -g) $HOME/.kube/config - - helm repo add default https://pagtel-devops.gitlab.io/charts-repository/default - - helm repo update - script: - - export TAG=$( echo $CI_COMMIT_TAG | grep -Po $VERSION_PATTERN ) - - echo 'Deploy of TAG:' $TAG - - kubectl apply -n production -f "${CONFIG_REPOSITORY_BASE_URL}/${PROJECT_GROUP}%2F${HELM_NAME}%2Fconfig-map.yaml/raw?private_token=$ACCESS_TOKEN_READ_REPOSITORY&ref=production" - - kubectl apply -n production -f "${CONFIG_REPOSITORY_BASE_URL}/${PROJECT_GROUP}%2F${HELM_NAME}%2Fsecret.yaml/raw?private_token=$ACCESS_TOKEN_READ_REPOSITORY&ref=production" - - helm upgrade -n production ${PROJECT_GROUP}-${HELM_NAME} $HELM_CHART_NAME --set image.tag=$TAG --values ./helm/production-values.yaml --install - when: manual - only: - - tags diff --git a/CHANGELOG.md b/CHANGELOG.md index 28af8baf..40efc69b 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,42 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [2.3.0] - 2025-05-06 + +### Added + +- New `normalize` and `normalizeOptions` utilities for payload normalization in the reprocessing module. +- APM decorators for handlers, enabling instrumentation. +- Functionality to extract RabbitMQ options from the `RABBIT_OPTIONS` environment variable. + +### Changed + +- Renamed non-standard folder from `utils` to `util`. +- Improved application startup and shutdown process. +- Enhanced MemCached connection process and added connection timeout. +- Updated the `.env.example` file. +- Adjusted when Elastic is started. +- Added protocol adaptation for new RabbitMQ implementations. +- Improved handler processing logic to prevent subsequent handlers from executing prematurely. +- Optimized APM decorators. + +### Fixed + +- Added normalization before message validation in the reprocessing module. +- Optimized `searchLabels` to prevent maximum call stack overflow. + +### Deprecated + +- _No deprecations._ + +### Removed + +- _No removals._ + +### Security + +- _No security fixes. + ## [2.2.0] - 2025-03-19 ### Added diff --git a/package.json b/package.json index 5c8a09ff..16c1e74b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "template-microservice", - "version": "1.0.0", + "version": "2.3.0", "scripts": { "helm:gen": "NODE_NO_WARNINGS=1 node script/helm/index.mjs", "helm:gen:production": "NODE_NO_WARNINGS=1 node script/helm/index.mjs -e production --scan-routes", diff --git a/src/data/protocols/mq/publish-in-exchange.ts b/src/data/protocols/mq/publish-in-exchange.ts index 40f1b731..2a5a1776 100755 --- a/src/data/protocols/mq/publish-in-exchange.ts +++ b/src/data/protocols/mq/publish-in-exchange.ts @@ -4,5 +4,5 @@ export interface PublishInExchangeService { message: object, routingKey: string, headers?: object - ): Promise; + ): Promise; } diff --git a/src/data/protocols/mq/publish-in-queue.ts b/src/data/protocols/mq/publish-in-queue.ts index a1228b27..d689582e 100755 --- a/src/data/protocols/mq/publish-in-queue.ts +++ b/src/data/protocols/mq/publish-in-queue.ts @@ -1,3 +1,7 @@ export interface PublishInQueueService { - publishInQueue(queue: string, message: object, headers?: object): void; + publishInQueue( + queue: string, + message: object, + headers?: object + ): Promise; } diff --git a/src/data/protocols/utils/date/date-is-after.ts b/src/data/protocols/util/date/date-is-after.ts similarity index 100% rename from src/data/protocols/utils/date/date-is-after.ts rename to src/data/protocols/util/date/date-is-after.ts diff --git a/src/data/protocols/utils/date/formate-date.ts b/src/data/protocols/util/date/formate-date.ts similarity index 100% rename from src/data/protocols/utils/date/formate-date.ts rename to src/data/protocols/util/date/formate-date.ts diff --git a/src/data/protocols/utils/date/index.ts b/src/data/protocols/util/date/index.ts similarity index 100% rename from src/data/protocols/utils/date/index.ts rename to src/data/protocols/util/date/index.ts diff --git a/src/data/protocols/utils/date/sub-days.ts b/src/data/protocols/util/date/sub-days.ts similarity index 100% rename from src/data/protocols/utils/date/sub-days.ts rename to src/data/protocols/util/date/sub-days.ts diff --git a/src/data/protocols/utils/date/sum-business-days.ts b/src/data/protocols/util/date/sum-business-days.ts similarity index 100% rename from src/data/protocols/utils/date/sum-business-days.ts rename to src/data/protocols/util/date/sum-business-days.ts diff --git a/src/data/protocols/utils/date/sum-days.ts b/src/data/protocols/util/date/sum-days.ts similarity index 100% rename from src/data/protocols/utils/date/sum-days.ts rename to src/data/protocols/util/date/sum-days.ts diff --git a/src/data/protocols/utils/index.ts b/src/data/protocols/util/index.ts similarity index 100% rename from src/data/protocols/utils/index.ts rename to src/data/protocols/util/index.ts diff --git a/src/data/protocols/utils/logger.ts b/src/data/protocols/util/logger.ts similarity index 100% rename from src/data/protocols/utils/logger.ts rename to src/data/protocols/util/logger.ts diff --git a/src/data/protocols/utils/object/filter-by.ts b/src/data/protocols/util/object/filter-by.ts similarity index 100% rename from src/data/protocols/utils/object/filter-by.ts rename to src/data/protocols/util/object/filter-by.ts diff --git a/src/data/protocols/utils/object/filter-keys.ts b/src/data/protocols/util/object/filter-keys.ts similarity index 100% rename from src/data/protocols/utils/object/filter-keys.ts rename to src/data/protocols/util/object/filter-keys.ts diff --git a/src/data/protocols/utils/object/index.ts b/src/data/protocols/util/object/index.ts similarity index 100% rename from src/data/protocols/utils/object/index.ts rename to src/data/protocols/util/object/index.ts diff --git a/src/data/protocols/utils/object/merge.ts b/src/data/protocols/util/object/merge.ts similarity index 100% rename from src/data/protocols/utils/object/merge.ts rename to src/data/protocols/util/object/merge.ts diff --git a/src/data/protocols/utils/object/override-attribute-value.ts b/src/data/protocols/util/object/override-attribute-value.ts similarity index 100% rename from src/data/protocols/utils/object/override-attribute-value.ts rename to src/data/protocols/util/object/override-attribute-value.ts diff --git a/src/data/protocols/utils/object/process-value.ts b/src/data/protocols/util/object/process-value.ts similarity index 100% rename from src/data/protocols/utils/object/process-value.ts rename to src/data/protocols/util/object/process-value.ts diff --git a/src/data/protocols/utils/object/rename-keys.ts b/src/data/protocols/util/object/rename-keys.ts similarity index 100% rename from src/data/protocols/utils/object/rename-keys.ts rename to src/data/protocols/util/object/rename-keys.ts diff --git a/src/data/protocols/utils/text/capitalize.ts b/src/data/protocols/util/text/capitalize.ts similarity index 100% rename from src/data/protocols/utils/text/capitalize.ts rename to src/data/protocols/util/text/capitalize.ts diff --git a/src/data/protocols/utils/text/index.ts b/src/data/protocols/util/text/index.ts similarity index 100% rename from src/data/protocols/utils/text/index.ts rename to src/data/protocols/util/text/index.ts diff --git a/src/data/protocols/utils/transaction/commit-all.ts b/src/data/protocols/util/transaction/commit-all.ts similarity index 100% rename from src/data/protocols/utils/transaction/commit-all.ts rename to src/data/protocols/util/transaction/commit-all.ts diff --git a/src/data/protocols/utils/transaction/index.ts b/src/data/protocols/util/transaction/index.ts similarity index 100% rename from src/data/protocols/utils/transaction/index.ts rename to src/data/protocols/util/transaction/index.ts diff --git a/src/data/protocols/utils/transaction/merge-transactions.ts b/src/data/protocols/util/transaction/merge-transactions.ts similarity index 100% rename from src/data/protocols/utils/transaction/merge-transactions.ts rename to src/data/protocols/util/transaction/merge-transactions.ts diff --git a/src/data/protocols/utils/transaction/rollback-all.ts b/src/data/protocols/util/transaction/rollback-all.ts similarity index 100% rename from src/data/protocols/utils/transaction/rollback-all.ts rename to src/data/protocols/util/transaction/rollback-all.ts diff --git a/src/data/usecases/elasticsearch/es-create-event.ts b/src/data/usecases/elasticsearch/es-create-event.ts index da7f4558..1479cca3 100644 --- a/src/data/usecases/elasticsearch/es-create-event.ts +++ b/src/data/usecases/elasticsearch/es-create-event.ts @@ -1,6 +1,6 @@ import { GetAPMTransactionIds } from '@/data/protocols/apm'; import { CreateDocumentService } from '@/data/protocols/elasticsearch'; -import { FormatDate } from '@/data/protocols/utils'; +import { FormatDate } from '@/data/protocols/util'; import { CreateEvent } from '@/domain/usecases/event'; export class EsCreateEvent implements CreateEvent { diff --git a/src/data/usecases/elasticsearch/es-update-event.ts b/src/data/usecases/elasticsearch/es-update-event.ts index 920051f5..457b384c 100644 --- a/src/data/usecases/elasticsearch/es-update-event.ts +++ b/src/data/usecases/elasticsearch/es-update-event.ts @@ -3,7 +3,7 @@ import { GetDocumentByIdService, UpdateDocumentService } from '@/data/protocols/elasticsearch'; -import { FormatDate, Merge } from '@/data/protocols/utils'; +import { FormatDate, Merge } from '@/data/protocols/util'; import { UpdateEvent } from '@/domain/usecases/event'; export class EsUpdateEvent implements UpdateEvent { diff --git a/src/infra/cache/cache-server.ts b/src/infra/cache/cache-server.ts index d567490d..d547512b 100644 --- a/src/infra/cache/cache-server.ts +++ b/src/infra/cache/cache-server.ts @@ -21,6 +21,10 @@ export class CacheServer { private connectionString!: string; private connected: boolean = false; + private CONNECTION_TIMEOUT = 10000; + private CONNECTION_RETIES = 5; + private RETIES_TIMEOUT = 2; + public Error = { CONNECTION_ERROR: 'Server not connected!', CREDENTIALS_NOT_DEFINED: 'Server credentials not defined!', @@ -77,12 +81,15 @@ export class CacheServer { if (!this.connectionString) throw new Error(this.Error.CREDENTIALS_NOT_DEFINED); if (this.connected) return; - this.server = memjs.Client.create(this.connectionString); + this.server = memjs.Client.create(this.connectionString, { + timeout: this.CONNECTION_TIMEOUT, + retries: this.CONNECTION_RETIES, + retry_delay: this.RETIES_TIMEOUT + }); this.connected = true; } public disconnect() { - if (!this.connected) return; this.server.close(); this.connected = false; } diff --git a/src/infra/http/util/web-server/web-server.ts b/src/infra/http/util/web-server/web-server.ts index c45a4bce..d55ced7b 100755 --- a/src/infra/http/util/web-server/web-server.ts +++ b/src/infra/http/util/web-server/web-server.ts @@ -15,6 +15,7 @@ import { SharedState } from '@/presentation/protocols/shared-state'; import { internalImplementationError, serverError } from '@/presentation/utils'; import { DICTIONARY, + apmSpan, convertCamelCaseKeysToSnakeCase, convertSnakeCaseKeysToCamelCase, logger @@ -245,17 +246,8 @@ export class WebServer { } } - return new Promise((resolve, reject) => { - this.fastify.server.close((error) => { - if (error) { - reject(error); - return; - } - - logger.log({ level: 'info', message: 'Shutting down server' }); - resolve(null); - }); - }); + await this.fastify.close(); + logger.log({ level: 'info', message: 'Shutting down server' }); } public setBaseUrl(url: string) { @@ -395,9 +387,51 @@ export class WebServer { this.makeSetStateInRequest(state) ]; - const response = await (typeof middleware !== 'function' - ? middleware.handle(request, stateHook, next) - : middleware(request, reply, next, stateHook)); + function handler() { + const decoratorOptions = { + options: { + name: '', + subType: 'handler' + }, + params: {} + }; + + if (typeof middleware === 'function') { + decoratorOptions.options.name = middleware.name; + decoratorOptions.params = { stateHook: 3 }; + const decorator = apmSpan(decoratorOptions); + const proto = {}; + + const methodName = middleware.name; + + const desc: PropertyDescriptor = { + value: middleware.bind(null), + writable: true, + configurable: true, + enumerable: false + }; + + const newDesc = decorator(proto, methodName, desc); + + return newDesc.value(request, reply, next, stateHook); + } + + decoratorOptions.options.name = middleware.constructor.name; + decoratorOptions.params = { stateHook: 1 }; + const decorator = apmSpan(decoratorOptions); + + const proto = middleware.constructor.prototype; + const methodName = 'handle'; + const desc = Object.getOwnPropertyDescriptor(proto, methodName)!; + + const newDesc = decorator(proto, methodName, desc); + + Object.defineProperty(proto, methodName, newDesc); + + return middleware.handle(request, stateHook, next); + } + + const response = await handler(); if (!response) return; diff --git a/src/infra/mq/utils/rabbitmq-server/rabbitmq-server.ts b/src/infra/mq/utils/rabbitmq-server/rabbitmq-server.ts index 12a7a283..6b47afd5 100755 --- a/src/infra/mq/utils/rabbitmq-server/rabbitmq-server.ts +++ b/src/infra/mq/utils/rabbitmq-server/rabbitmq-server.ts @@ -2,6 +2,7 @@ import { readdirSync } from 'fs'; import { resolve } from 'path'; import { EventEmitter } from 'stream'; import { Channel, Connection, Message, connect } from 'amqplib'; +import { randomInt, randomUUID } from 'crypto'; import { Job } from '@/job/protocols'; import { jobAdapter } from '@/main/adapters'; @@ -9,7 +10,8 @@ import { amqpLogger, CONSUMER, convertCamelCaseKeysToSnakeCase, - convertSnakeCaseKeysToCamelCase + convertSnakeCaseKeysToCamelCase, + RABBIT } from '@/util'; import { logger } from '@/util/observability'; import { apmSpan, apmTransaction } from '@/util/observability/apm'; @@ -25,11 +27,17 @@ import { export class RabbitMqServer { private connection: Connection | null = null; - private channel: Channel | null = null; - private consumers: Map = new Map(); + private channelPoolLength = 3; + private channelPool: Map = new Map(); + + private consumers: Map = + new Map(); + private messages: Set = new Set(); - private uri!: string; - private prefetch!: number; + private url!: string; + + private defaultPrefetch: number | undefined = undefined; + private queueLoaderOptions: { allowAll: boolean; denyAll: boolean; @@ -42,46 +50,44 @@ export class RabbitMqServer { allow: [] }; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private optionsFromEnv: Record | null = null; + private thereIsAPendingRestart = false; - private preventChannelRecover = false; private closing = false; - private theChannelIsActive = false; public Error = { CredentialsNotDefined: 'Credentials not defined, use setCredentials to define credentials', InvalidCredentialFieldValue: (field: string) => `Invalid value for (${field}) field`, - InvalidPrefetchValue: 'Prefetch must be a number' + InvalidPrefetchValue: 'Prefetch must be a number', + ConnectionNotDefined: + 'The connection has not been defined, you must start a connection before continuing', + EmptyChannelPool: + 'The channel pool is empty; for the class to work properly, the channel pool must have at least one channel ' }; private event = new EventEmitter(); - private events = { RESTART: Symbol('RESTART'), ACK: Symbol('ACK') }; + private events = { + RESTART: Symbol('RESTART'), + ACK: Symbol('ACK'), + CHECK_CHANNEL_POOL: Symbol('CHECK_CHANNEL_POOL'), + REBUILD_CONSUMER: Symbol('REBUILD_CONSUMER') + }; private static instance: RabbitMqServer; constructor(private credentials: Credentials | null = null) { if (this.credentials) this.setCredentials(this.credentials); - for (const item of CONSUMER.LIST) { - if (item === '*') { - this.queueLoaderOptions.allowAll = true; - this.queueLoaderOptions.denyAll = false; - continue; - } - if (item === '!*') { - this.queueLoaderOptions.allowAll = false; - this.queueLoaderOptions.denyAll = true; - continue; - } - - if (item[0] === '!') { - this.queueLoaderOptions.deny.push(item.substring(1)); - } else { - this.queueLoaderOptions.allow.push(item); - } + if (RABBIT.DEFAULT_PREFETCH) { + this.defaultPrefetch = RABBIT.DEFAULT_PREFETCH; } + + this.extractQueueOptions(); + this.loadOptionsFromEnv(); } public static getInstance(): RabbitMqServer { @@ -92,12 +98,28 @@ export class RabbitMqServer { return RabbitMqServer.instance; } - public setPrefetch(prefetch: number) { - if (typeof prefetch !== 'number') - throw new Error(this.Error.InvalidPrefetchValue); + private logger(error: Error): void; + private logger(args: { + level: string; + message: string; + payload?: Record; + }): void; + private logger( + args: Error & { + level: string; + message: string; + payload?: Record; + } + ) { + logger.log(args); + } + + private loadOptionsFromEnv() { + if (typeof process.env.RABBIT_OPTIONS !== 'string') return; - this.prefetch = prefetch; - return this; + this.optionsFromEnv = this.parseOptionsFromString( + process.env.RABBIT_OPTIONS + ); } public setCredentials(credentials: Credentials) { @@ -110,30 +132,26 @@ export class RabbitMqServer { ? `/${credentials.virtualHost}` : ''; - this.uri = `amqp://${credentials.user}:${credentials.password}@${credentials.host}:${credentials.port}${virtualHost}`; + this.url = `amqp://${credentials.user}:${credentials.password}@${credentials.host}:${credentials.port}${virtualHost}`; return this; } public async start() { - logger.log({ level: 'info', message: 'Starting connection' }); + this.logger({ level: 'info', message: 'Starting connection' }); if (!this.connection) { - if (!this.uri) throw new Error(this.Error.CredentialsNotDefined); - this.connection = await connect(this.uri); + if (!this.url) throw new Error(this.Error.CredentialsNotDefined); + this.connection = await connect(this.url); } - if (!this.channel) { - this.channel = await this.connection.createChannel(); - } - - this.theChannelIsActive = true; + await this.createChannelPool(this.channelPoolLength); - if (typeof this.prefetch === 'number') { - await this.channel.prefetch(this.prefetch); + if (this.channelPool.size === 0) { + throw new Error(this.Error.EmptyChannelPool); } this.startEventLiveners(); - logger.log({ level: 'info', message: 'Connection started' }); + this.logger({ level: 'info', message: 'Connection started' }); return this; } @@ -154,47 +172,32 @@ export class RabbitMqServer { } public async stop() { - logger.log({ level: 'info', message: 'Closing connection' }); + this.logger({ level: 'info', message: 'Closing connection' }); if (this.closing) { return; } this.closing = true; - this.preventChannelRecover = true; await this.waitForPendingProcessing(); - this.channel?.removeAllListeners(); this.connection?.removeAllListeners(); this.event.removeAllListeners(); - if (this.channel) { - try { - await this.channel.close(); - } catch (error) { - logger.log(error); - } finally { - this.theChannelIsActive = false; - } - } - if (this.connection) { try { await this.connection.close(); } catch (error) { - logger.log(error); + this.logger(error); } this.closing = false; } - this.channel = null; this.connection = null; - logger.log({ level: 'info', message: 'Connection closed' }); - - this.preventChannelRecover = false; + this.logger({ level: 'info', message: 'Connection closed' }); return this; } @@ -204,24 +207,22 @@ export class RabbitMqServer { } public async cancelConsumers() { - logger.log({ level: 'info', message: 'Closing consumers' }); - if (!this.channel) { - return; - } + this.logger({ level: 'info', message: 'Closing consumers' }); + const consumers = Array.from(this.consumers); - const promises = consumers.map(async ([key]) => { + const promises = consumers.map(async ([key, value]) => { try { - await this.channel?.cancel(key); - } catch (error) { - logger.log(error); - } finally { this.consumers.delete(key); + await value.channel.cancel(key); + await value.channel.close(); + } catch (error) { + this.logger(error); } }); await Promise.allSettled(promises); - logger.log({ + this.logger({ level: 'info', message: 'All consumer are closed' }); @@ -229,22 +230,23 @@ export class RabbitMqServer { public async restart() { const RESTAR_TIMEOUT = 1500; - logger.log({ level: 'info', message: 'Restart event' }); + this.logger({ level: 'info', message: 'Restart event' }); if (this.thereIsAPendingRestart) return; - logger.log({ level: 'info', message: 'Restart event accepted' }); + this.logger({ level: 'info', message: 'Restart event accepted' }); this.thereIsAPendingRestart = true; try { await this.stop(); await this.start(); } catch (error) { - logger.log({ level: 'info', message: 'Error when restarting' }); - logger.log(error); + this.logger({ level: 'info', message: 'Error when restarting' }); + this.logger(error); this.thereIsAPendingRestart = false; this.event.emit(this.events.RESTART); + return; } - await this.rebuildConsumers(true); + await this.rebuildConsumers(false); setTimeout(() => { this.thereIsAPendingRestart = false; @@ -253,20 +255,6 @@ export class RabbitMqServer { return this; } - private async recoverChannel(): Promise { - try { - logger.log({ level: 'info', message: 'Recover channel event' }); - if (this.preventChannelRecover) return false; - logger.log({ level: 'info', message: 'Recover channel event accepted' }); - await this.channel?.recover(); - return true; - } catch (error) { - logger.log({ level: 'info', message: 'Error when recover channel' }); - logger.log(error); - return false; - } - } - @loggerDecorator({ options: { name: 'Publish message in queue', subType: 'rabbitmq' }, input: { queue: 0, message: 1, headers: 2 } @@ -276,12 +264,33 @@ export class RabbitMqServer { params: { queue: 0, message: 1, headers: 2 } }) public async publishInQueue(queue: string, message: object, headers: object) { - if (!this.connection || !this.channel) await this.restart(); - const messageFromBuffer = this.convertMessageToBuffer( + if (!this.connection) { + this.logger({ + level: 'error', + message: 'Unable to publish message, connection not defined', + payload: { queue, message, headers } + }); + throw new Error(this.Error.ConnectionNotDefined); + } + + if (this.channelPool.size === 0) { + this.logger({ + level: 'error', + message: 'Unable to publish message, empty channel pool', + payload: { queue, message, headers } + }); + throw new Error(this.Error.EmptyChannelPool); + } + + const chanelIndex = randomInt(0, this.channelPool.size - 1); + + const buffer = this.convertMessageToBuffer( convertCamelCaseKeysToSnakeCase(message) ); - this.channel?.sendToQueue(queue, messageFromBuffer, { + const channelList = Array.from(this.channelPool).map(([, value]) => value); + + return channelList[chanelIndex].sendToQueue(queue, buffer, { headers }); } @@ -299,76 +308,130 @@ export class RabbitMqServer { routingKey: string, headers?: object ) { - if (!this.connection || !this.channel) await this.restart(); - this.channel?.publish( - exchange, - routingKey, - this.convertMessageToBuffer(convertCamelCaseKeysToSnakeCase(message)), - { headers } + if (!this.connection) { + this.logger({ + level: 'error', + message: 'Unable to publish message, connection not defined', + payload: { exchange, message, routingKey, headers } + }); + throw new Error(this.Error.ConnectionNotDefined); + } + + if (this.channelPool.size === 0) { + this.logger({ + level: 'error', + message: 'Unable to publish message, empty channel pool', + payload: { exchange, message, routingKey, headers } + }); + throw new Error(this.Error.EmptyChannelPool); + } + + const buffer = this.convertMessageToBuffer( + convertCamelCaseKeysToSnakeCase(message) ); + + const chanelIndex = randomInt(0, this.channelPool.size - 1); + + const channelList = Array.from(this.channelPool).map(([, value]) => value); + + return channelList[chanelIndex].publish(exchange, routingKey, buffer, { + headers + }); } - public async consume(queue: string, callback: ConsumerCallback) { - const consumer = await this.channel?.consume(queue, async (message) => { - if (!message) return; + public async consume( + queue: string, + callback: ConsumerCallback, + options?: { prefetch?: number } + ) { + try { + if (!this.connection) { + throw new Error(this.Error.ConnectionNotDefined); + } - try { - if (this.closing) this.reject(message); + const channel = await this.connection.createChannel(); - this.messages.add(message); + if (options?.prefetch) { + await channel.prefetch(options.prefetch); + } - this.setCustomMessageProperties(message, { rejected: false }); + const consumer = await channel.consume(queue, async (message) => { + if (!message) return; - const payload = { - body: convertSnakeCaseKeysToCamelCase( - this.convertMessageToJson(message) - ), - headers: message.properties.headers, - fields: { queue, ...message.fields }, - properties: message.properties, - reject: (requeue?: boolean) => { - this.reject(message, requeue ?? true); - } - }; + try { + if (this.closing) this.reject(message); - await this.transactionHandler(queue, payload, callback); - } catch (error) { - logger.log(error); - if (error.stack.includes('at JSON.parse')) { - logger.log({ - level: 'warn', - message: 'Unable to convert message to JSON', + this.messages.add(message); + + this.setCustomMessageProperties(message, { rejected: false }); + + const payload = { + body: convertSnakeCaseKeysToCamelCase( + this.convertMessageToJson(message) + ), + headers: message.properties.headers, + fields: { queue, ...message.fields }, + properties: message.properties, + reject: (requeue?: boolean) => { + this.reject(message, requeue ?? true); + } + }; + + await this.transactionHandler(queue, payload, callback); + } catch (error) { + this.logger(error); + if (error.stack.includes('at JSON.parse')) { + this.logger({ + level: 'warn', + message: 'Unable to convert message to JSON', + payload: { + message: message.content.toString(), + headers: message.properties.headers, + properties: { queue, ...message.fields } + } + }); + } + } finally { + this.logger({ + level: 'verbose', + message: 'The message left the queue', payload: { message: message.content.toString(), headers: message.properties.headers, properties: { queue, ...message.fields } } }); + + this.ack(message); + this.messages.delete(message); } - } finally { - logger.log({ - level: 'verbose', - message: 'The message left the queue', - payload: { - message: message.content.toString(), - headers: message.properties.headers, - properties: { queue, ...message.fields } - } - }); + }); - this.ack(message); - this.messages.delete(message); - } - }); + channel.once('close', () => { + const REBUILD_DELAY = 1_000; + setTimeout(() => { + this.event.emit(this.events.REBUILD_CONSUMER, consumer.consumerTag); + }, REBUILD_DELAY); + }); - if (!consumer) return; + channel.on('error', (error) => { + this.logger(error); + }); - this.consumers.set(consumer.consumerTag, { queue, callback }); + if (!consumer) return; - logger.log({ - level: 'info', - message: `Consumer to ${queue} is online!` - }); + this.consumers.set(consumer.consumerTag, { + channel, + consumer: { queue, callback, options } + }); + + this.logger({ + level: 'info', + message: `Consumer to ${queue} is online!` + }); + } catch (error) { + this.logger(error); + } } public makeConsumer(queue: string, ...callbacks: (Job | Function)[]): void; @@ -380,8 +443,21 @@ export class RabbitMqServer { arg1: string | ConsumerOptions, ...callbacks: (Job | Function)[] ): void { - const queue = typeof arg1 === 'object' ? arg1.queue : arg1; - const enabled = typeof arg1 === 'object' ? !!arg1?.enabled : true; + let queue; + let enabled = true; + let prefetch; + + if (typeof arg1 === 'object') { + queue = arg1.queue; + enabled = this.optionsFromEnv?.[queue]?.enabled ?? !!arg1.enabled; + + prefetch = + this.optionsFromEnv?.[queue]?.prefetch ?? + arg1?.prefetch ?? + this.defaultPrefetch; + } else { + queue = arg1; + } const { allow, allowAll, deny, denyAll } = this.queueLoaderOptions; @@ -391,7 +467,7 @@ export class RabbitMqServer { if (!enabled && !allowAll && !allow.includes(queue)) return; - this.consume(queue, jobAdapter(...callbacks)); + this.consume(queue, jobAdapter(...callbacks), { prefetch }); } public async consumersDirectory(path: string): Promise { @@ -422,6 +498,38 @@ export class RabbitMqServer { } } + private async createChannelPool(poolLength: number) { + if (!this.connection) { + throw new Error(this.Error.ConnectionNotDefined); + } + + const chanelPromises = []; + + for (let index = 0; index < poolLength; index++) { + const promise = this.connection.createChannel(); + chanelPromises.push(promise); + } + + const pool = await Promise.allSettled(chanelPromises); + + for (const item of pool) { + if (item.status === 'fulfilled') { + const channelId = randomUUID(); + + item.value.on('error', (error) => { + this.logger(error); + }); + + item.value.once('close', () => { + this.channelPool.delete(channelId); + this.event.emit(this.events.CHECK_CHANNEL_POOL); + }); + + this.channelPool.set(channelId, item.value); + } + } + } + private setCustomMessageProperties( message: Message, properties: Record @@ -455,17 +563,22 @@ export class RabbitMqServer { private reject(message: Message, requeue?: boolean) { try { if ( - !this.theChannelIsActive || - !this.channel || + !this.connection || this.getCustomMessageProperties(message, 'rejected') || this.getCustomMessageProperties(message, 'acked') ) return; + this.setCustomMessageProperties(message, { rejected: true }); - this.channel.reject(message, requeue); + + const { consumerTag } = message.fields; + + if (!consumerTag) return; + + this.consumers.get(consumerTag)?.channel.reject(message, requeue); } catch (error) { - logger.log(error); - logger.log({ + this.logger(error); + this.logger({ level: 'warn', message: 'Unable to reject message', payload: { message: message.content.toString() } @@ -476,17 +589,22 @@ export class RabbitMqServer { private ack(message: Message) { try { if ( - !this.theChannelIsActive || - !this.channel || + !this.connection || this.getCustomMessageProperties(message, 'rejected') || this.getCustomMessageProperties(message, 'acked') ) return; + this.setCustomMessageProperties(message, { acked: true }); - this.channel.ack(message); + + const { consumerTag } = message.fields; + + if (!consumerTag) return; + + this.consumers.get(consumerTag)?.channel.ack(message); } catch (error) { - logger.log(error); - logger.log({ + this.logger(error); + this.logger({ level: 'warn', message: 'Unable to ack message', payload: { message: message.content.toString() } @@ -496,78 +614,114 @@ export class RabbitMqServer { private startEventLiveners() { this.connection?.on('error', (error) => { - logger.log(error); - this.event.emit(this.events.RESTART); - }); - - this.channel?.on('error', async (error) => { - logger.log(error); - - this.theChannelIsActive = false; - - const wasItRecovered = await this.recoverChannel(); - - if (!wasItRecovered) { - this.event.emit(this.events.RESTART); - } + this.logger(error); }); - this.connection?.on('close', (error) => { - logger.log(error); + this.connection?.on('close', () => { if (!this.thereIsAPendingRestart) { this.event.emit(this.events.RESTART); } }); - this.channel?.on('close', async (error) => { - logger.log(error); - logger.log(error); - - this.theChannelIsActive = false; - - const wasItRecovered = await this.recoverChannel(); + this.event.on(this.events.RESTART, this.restart.bind(this)); + this.event.on(this.events.CHECK_CHANNEL_POOL, async () => { + if (this.channelPool.size !== 0) return; - if (!wasItRecovered && !this.preventChannelRecover) { - this.event.emit(this.events.RESTART); - } + await this.createChannelPool(this.channelPoolLength); }); - this.event.on(this.events.RESTART, this.restart.bind(this)); + this.event.on( + this.events.REBUILD_CONSUMER, + this.rebuildConsumer.bind(this) + ); } private async rebuildConsumers(deleteConsumers = true) { - logger.log({ level: 'info', message: 'Starting rebuilding consumers' }); - if (!this.channel) { - this.event.emit(this.events.RESTART); - return; - } + this.logger({ + level: 'info', + message: 'Starting rebuilding all consumers' + }); const consumers = Array.from(this.consumers); const promises = consumers.map(async ([key, value]) => { try { - if (deleteConsumers) await this.channel?.cancel(key); this.consumers.delete(key); - return this.consume(value.queue, value.callback); + if (deleteConsumers) { + await value.channel.cancel(key); + await value.channel.close(); + } + return this.consume( + value.consumer.queue, + value.consumer.callback, + value.consumer.options + ); } catch (error) { - logger.log(error); + this.logger(error); return null; } }); await Promise.allSettled(promises); - logger.log({ + + this.logger({ level: 'info', - message: 'Consumer reconstruction completed' + message: 'All consumers reconstruction completed' }); } + private async rebuildConsumer(consumerTag: string) { + if (!this.connection) { + throw new Error(this.Error.ConnectionNotDefined); + } + + if (this.thereIsAPendingRestart || this.closing) return; + + this.logger({ + level: 'info', + message: `Starting rebuilding consumer: ${consumerTag}` + }); + + const targetConsumer = this.consumers.get(consumerTag); + + if (!targetConsumer) { + this.logger({ + level: 'warn', + message: 'Unable to rebuild consumer, consumer not found', + payload: { consumerTag } + }); + return; + } + + try { + this.consumers.delete(consumerTag); + await targetConsumer.channel.cancel(consumerTag); + await targetConsumer.channel.close(); + + const promise = this.consume( + targetConsumer.consumer.queue, + targetConsumer.consumer.callback, + targetConsumer.consumer.options + ); + + this.logger({ + level: 'info', + message: 'Consumer reconstruction completed' + }); + + return promise; + } catch (error) { + this.logger(error); + return null; + } + } + private convertMessageToBuffer(message: Record): Buffer { const string = JSON.stringify(message); return Buffer.from(string); } - private convertMessageToJson(message: Message): object { + private convertMessageToJson(message: Message): Record { return JSON.parse(message.content.toString()); } @@ -592,4 +746,70 @@ export class RabbitMqServer { return callback({ ...bodyAndHeadersToCamelCase, ...restOfPayload }); } + + private extractQueueOptions() { + for (const item of CONSUMER.LIST) { + if (item === '*') { + this.queueLoaderOptions.allowAll = true; + this.queueLoaderOptions.denyAll = false; + continue; + } + if (item === '!*') { + this.queueLoaderOptions.allowAll = false; + this.queueLoaderOptions.denyAll = true; + continue; + } + + if (item[0] === '!') { + this.queueLoaderOptions.deny.push(item.substring(1)); + } else { + this.queueLoaderOptions.allow.push(item); + } + } + } + + private parseOptionsFromString( + stringOptions: string + ): Record { + const options: Record = {}; + + function parseValue(value: string) { + if (value.toLowerCase() === 'true') { + return true; + } + if (value.toLowerCase() === 'false') { + return false; + } + + const numberValue = Number(value); + + if (!Number.isNaN(numberValue)) { + return numberValue; + } + + return value; + } + + const keyValuePairs = stringOptions.split(',').map((item) => item.trim()); + + for (const pair of keyValuePairs) { + const [path, value] = pair.split('=').map((item) => item.trim()); + const pathParts = path.split('.'); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + pathParts.reduce((acc: any, part: string, index: number) => { + if (index === pathParts.length - 1) { + acc[part] = parseValue(value); + + return acc[part]; + } + + acc[part] = acc[part] || {}; + + return acc[part]; + }, options); + } + + return options; + } } diff --git a/src/infra/mq/utils/rabbitmq-server/types.ts b/src/infra/mq/utils/rabbitmq-server/types.ts index be82e486..1413ebd8 100644 --- a/src/infra/mq/utils/rabbitmq-server/types.ts +++ b/src/infra/mq/utils/rabbitmq-server/types.ts @@ -13,6 +13,11 @@ export type ConsumerCallback = (payload: Payload) => Promise; export type Consumer = { queue: string; callback: ConsumerCallback; + options?: { prefetch?: number }; }; -export type ConsumerOptions = { queue: string; enabled?: boolean }; +export type ConsumerOptions = { + queue: string; + enabled?: boolean; + prefetch?: number; +}; diff --git a/src/job/jobs/cache/get-cache-value-job.ts b/src/job/jobs/cache/get-cache-value-job.ts index fbc15304..4d7c6238 100644 --- a/src/job/jobs/cache/get-cache-value-job.ts +++ b/src/job/jobs/cache/get-cache-value-job.ts @@ -1,4 +1,4 @@ -import { Logger } from '@/data/protocols/utils'; +import { Logger } from '@/data/protocols/util'; import { ErrorHandler, GetCacheValue } from '@/domain/usecases'; import { Job } from '@/job/protocols'; import { ExtractValues } from '@/plugin'; diff --git a/src/job/jobs/event/create-event-job.ts b/src/job/jobs/event/create-event-job.ts index 05121878..f1f5166c 100755 --- a/src/job/jobs/event/create-event-job.ts +++ b/src/job/jobs/event/create-event-job.ts @@ -1,4 +1,4 @@ -import { Logger } from '@/data/protocols/utils'; +import { Logger } from '@/data/protocols/util'; import { CreateEvent, ErrorHandler } from '@/domain/usecases'; import { Job } from '@/job/protocols'; import { ELASTICSEARCH } from '@/util'; diff --git a/src/job/jobs/event/update-event-job.ts b/src/job/jobs/event/update-event-job.ts index 75be9ca7..d203c3d2 100755 --- a/src/job/jobs/event/update-event-job.ts +++ b/src/job/jobs/event/update-event-job.ts @@ -1,4 +1,4 @@ -import { Logger } from '@/data/protocols/utils'; +import { Logger } from '@/data/protocols/util'; import { ErrorHandler, UpdateEvent } from '@/domain/usecases'; import { Job } from '@/job/protocols'; import { ELASTICSEARCH } from '@/util'; diff --git a/src/job/jobs/example/example-job.ts b/src/job/jobs/example/example-job.ts index 14e8e620..787c5314 100755 --- a/src/job/jobs/example/example-job.ts +++ b/src/job/jobs/example/example-job.ts @@ -1,4 +1,4 @@ -import { Logger } from '@/data/protocols/utils'; +import { Logger } from '@/data/protocols/util'; import { ErrorHandler } from '@/domain/usecases'; import { Job } from '@/job/protocols'; diff --git a/src/job/jobs/example/trouble-example-job.ts b/src/job/jobs/example/trouble-example-job.ts index 16bdcb16..2c09469d 100644 --- a/src/job/jobs/example/trouble-example-job.ts +++ b/src/job/jobs/example/trouble-example-job.ts @@ -1,7 +1,7 @@ -import { Logger } from '@/data/protocols/utils'; +import { Logger } from '@/data/protocols/util'; import { TroubleExample } from '@/domain/usecases'; import { Job } from '@/job/protocols'; -import { reprocessing } from '@/job/utils'; +import { reprocessing } from '@/job/util'; export class TroubleExampleJob implements Job { constructor( diff --git a/src/job/utils/index.ts b/src/job/util/index.ts similarity index 100% rename from src/job/utils/index.ts rename to src/job/util/index.ts diff --git a/src/job/utils/override-state.ts b/src/job/util/override-state.ts similarity index 100% rename from src/job/utils/override-state.ts rename to src/job/util/override-state.ts diff --git a/src/job/utils/reprocessing.ts b/src/job/util/reprocessing.ts similarity index 93% rename from src/job/utils/reprocessing.ts rename to src/job/util/reprocessing.ts index e4e2e1d6..4a1c3891 100755 --- a/src/job/utils/reprocessing.ts +++ b/src/job/util/reprocessing.ts @@ -3,7 +3,7 @@ import { MqSendReprocessing } from '@/data/usecases/mq'; import { ReprocessingRepository } from '@/infra/db/mongodb/reprocessing'; import { rabbitMqServer } from '@/infra/mq/utils'; -import { overrideState } from '@/job/utils'; +import { overrideState } from '@/job/util'; import { REPROCESSING } from '@/util'; const mqServer = rabbitMqServer(); @@ -20,11 +20,11 @@ const skipMiddleware = ( ); }; -const normalizePayload = ( +export const normalizeReprocessingPayload = ( payload: Record, [state, setState]: [Record, Function] ) => { - if (!payload.body.reprocessing) return; + if (!payload.body?.reprocessing) return; if (payload.body.reprocessing && !state.reprocessing) setState({ reprocessing: payload.body.reprocessing }); @@ -75,7 +75,7 @@ export function reprocessing(options: Options = {}) { ); try { - normalizePayload(payload, [state, setState]); + normalizeReprocessingPayload(payload, [state, setState]); if ( skipMiddleware(state.reprocessing, target.constructor.name) && diff --git a/src/job/utils/state-dependencies.ts b/src/job/util/state-dependencies.ts similarity index 100% rename from src/job/utils/state-dependencies.ts rename to src/job/util/state-dependencies.ts diff --git a/src/main/adapters/flow-adapter.ts b/src/main/adapters/flow-adapter.ts index a5f2e4a9..df50d386 100755 --- a/src/main/adapters/flow-adapter.ts +++ b/src/main/adapters/flow-adapter.ts @@ -14,22 +14,23 @@ export default >(data: Data) => let resolve!: Function; - const nextFunction = () => { - event.emit(NEXT_EVENT_SYMBOL); - }; - const callStack = callbacks .map((middleware) => async () => { let nextFunctionWasCalled = false; - function nextFunctionDecorator() { + function nextFunction() { if (nextFunctionWasCalled) return; - nextFunction(); nextFunctionWasCalled = true; } try { - return await middleware(data, nextFunctionDecorator); + const response = await middleware(data, nextFunction); + + if (nextFunctionWasCalled) { + event.emit(NEXT_EVENT_SYMBOL); + } + + return response; } catch (error) { throw error; } finally { @@ -40,7 +41,7 @@ export default >(data: Data) => }) .reverse(); - event.on(NEXT_EVENT_SYMBOL, async () => { + event.on(NEXT_EVENT_SYMBOL, () => { const handler = callStack.pop(); if (handler === undefined) { @@ -48,7 +49,7 @@ export default >(data: Data) => return; } - await handler(); + handler(); }); event.once(RESOLVER_EVENT_SYMBOL, () => { diff --git a/src/main/adapters/flow-manager/flow-manager-adapter.ts b/src/main/adapters/flow-manager/flow-manager-adapter.ts index 5ae57f5f..f40a5a10 100755 --- a/src/main/adapters/flow-manager/flow-manager-adapter.ts +++ b/src/main/adapters/flow-manager/flow-manager-adapter.ts @@ -33,13 +33,18 @@ function coercion( throw new Error('INVALID_TYPE'); } -export default function flowManager(...options: Option[]): Function; +export default function flowManager(options: Option[]): Function; export default function flowManager( firstOption: Option, + ...options: Option[] +): Function; +export default function flowManager( + arg1: Option[] | Option, ...restOfOptions: Option[] ) { return (...args: unknown[]) => { - const options = [firstOption, ...restOfOptions]; + const firstOptions = Array.isArray(arg1) ? arg1 : [arg1]; + const options = [...firstOptions, ...restOfOptions]; for (const option of options) { if (!option.when) return option.handler(...args); diff --git a/src/main/adapters/flow-manager/util/index.ts b/src/main/adapters/flow-manager/util/index.ts new file mode 100755 index 00000000..685dc139 --- /dev/null +++ b/src/main/adapters/flow-manager/util/index.ts @@ -0,0 +1,2 @@ +export * from './skip'; +export * from './reprocessing'; diff --git a/src/main/adapters/flow-manager/util/reprocessing.ts b/src/main/adapters/flow-manager/util/reprocessing.ts new file mode 100755 index 00000000..9a943eec --- /dev/null +++ b/src/main/adapters/flow-manager/util/reprocessing.ts @@ -0,0 +1,38 @@ +import { normalizeReprocessingPayload } from '@/job/util'; + +import { Option } from '../flow-manager-adapter'; + +type Args = [Record, [Record, Function]]; + +export function normalize(callback: Function) { + return (...args: Args) => { + normalizeReprocessingPayload(...args); + return callback(...args); + }; +} + +export function normalizeOptions(options: Option[]): Option[]; +export function normalizeOptions( + firstOption: Option, + ...options: Option[] +): Option[]; +export function normalizeOptions( + arg1: Option[] | Option, + ...restOfOptions: Option[] +) { + const firstOptions = Array.isArray(arg1) ? arg1 : [arg1]; + const options = [...firstOptions, ...restOfOptions]; + + return options.map((option) => { + const { strict } = option; + + const handler = normalize(option.handler); + + const when = + typeof option.when === 'function' + ? normalize(option.when) + : option.handler; + + return { when, handler, strict }; + }); +} diff --git a/src/main/routes/flow-manager/utils/skip.ts b/src/main/adapters/flow-manager/util/skip.ts similarity index 100% rename from src/main/routes/flow-manager/utils/skip.ts rename to src/main/adapters/flow-manager/util/skip.ts diff --git a/src/main/adapters/job-adapter.ts b/src/main/adapters/job-adapter.ts index 28e0d1a9..24cac7f3 100755 --- a/src/main/adapters/job-adapter.ts +++ b/src/main/adapters/job-adapter.ts @@ -1,4 +1,5 @@ import { Job } from '@/job/protocols'; +import { apmSpan } from '@/util'; import makeFlow from './flow-adapter'; @@ -20,7 +21,43 @@ export const jobAdapter = (...jobs: (Job | Function)[]) => { // eslint-disable-next-line @typescript-eslint/no-explicit-any const stateHook = <[any, any]>[state, setState]; - if (typeof job === 'function') return job(payload, stateHook, next); + const decoratorOptions = { + options: { + name: '', + subType: 'handler' + }, + params: { payload: 0, stateHook: 1 } + }; + + if (typeof job === 'function') { + decoratorOptions.options.name = job.name; + const decorator = apmSpan(decoratorOptions); + const proto = {}; + + const methodName = job.name; + + const desc: PropertyDescriptor = { + value: job.bind(null), + writable: true, + configurable: true, + enumerable: false + }; + + const newDesc = decorator(proto, methodName, desc); + + return newDesc.value(payload, stateHook, next); + } + + decoratorOptions.options.name = job.constructor.name; + const decorator = apmSpan(decoratorOptions); + + const proto = job.constructor.prototype; + const methodName = 'handle'; + const desc = Object.getOwnPropertyDescriptor(proto, methodName)!; + + const newDesc = decorator(proto, methodName, desc); + + Object.defineProperty(proto, methodName, newDesc); return job.handle(payload, stateHook, next); }; diff --git a/src/main/adapters/job-validation-adapter.ts b/src/main/adapters/job-validation-adapter.ts index e80335a6..b5059914 100755 --- a/src/main/adapters/job-validation-adapter.ts +++ b/src/main/adapters/job-validation-adapter.ts @@ -1,4 +1,5 @@ import { Job } from '@/job/protocols'; +import { normalizeReprocessingPayload } from '@/job/util'; import { YupSchema } from '@/presentation/protocols'; import { convertCamelCaseKeysToSnakeCase, @@ -9,8 +10,10 @@ import { export function messageValidationAdapter( schema: YupSchema ): (payload: Job.Payload, state: Job.State, next: Job.Next) => Job.Result { - return async (payload: Job.Payload, _state: Job.State, next: Job.Next) => { + return async (payload: Job.Payload, state: Job.State, next: Job.Next) => { try { + normalizeReprocessingPayload(payload, state); + const messageInSnakeCase = convertCamelCaseKeysToSnakeCase(payload); await schema.validate(messageInSnakeCase, { diff --git a/src/main/agendash.ts b/src/main/agendash.ts index 1899f6dc..9da3ca5b 100644 --- a/src/main/agendash.ts +++ b/src/main/agendash.ts @@ -28,7 +28,7 @@ fastify.listen({ port: +PORT }, (error) => { logger.log( { level: 'info', - message: `Agendash started http://localhost:${PORT}${BASE_URI}/` + message: `Agendash started at: http://127.0.0.1:${PORT}${BASE_URI}/` }, 'offline' ); diff --git a/src/main/bootstrap.ts b/src/main/bootstrap.ts index eac64976..b8b3b6ed 100644 --- a/src/main/bootstrap.ts +++ b/src/main/bootstrap.ts @@ -1,9 +1,21 @@ import path from 'path'; +import { Mongoose } from 'mongoose'; +import { CacheServer } from '@/infra/cache/cache-server'; import knexSetup from '@/infra/db/mssql/util/knex'; -import { workerManager } from '@/infra/worker'; -import { CONSUMER, MEMCACHED, SERVER, WORKER, logger } from '@/util'; -import { makeCacheServer } from '@/infra/cache'; +import { RabbitMqServer } from '@/infra/mq/utils'; +import { WorkerManager, workerManager } from '@/infra/worker'; +import { + CONSUMER, + MEMCACHED, + MONGO, + RABBIT, + SERVER, + WORKER, + elasticAPM, + logger, + splitPromises +} from '@/util'; import { getArgs } from './cli'; import { @@ -29,6 +41,15 @@ const ENABLED_SERVICES = { const SHUTDOWN_TIMEOUT = 30_000; export async function bootstrap() { + elasticAPM(); + + const promises: Array<() => Promise> = []; + + let rabbitServer: RabbitMqServer | null = null; + let worker: WorkerManager | null = null; + let cacheServer: CacheServer | null = null; + let mongoose: Mongoose | null = null; + if (Object.values(ENABLED_SERVICES).every((item) => item === false)) { logger.log( { @@ -41,45 +62,67 @@ export async function bootstrap() { process.exit(0); } - const [rabbitServer, mongoose] = await Promise.all([ - getRabbitmqConnection(), - getMongooseConnection(), - checkDatabaseConnection() - ]); + async function connectToMongoose() { + mongoose = await getMongooseConnection(); + + logger.log({ + level: 'info', + message: `Mongoose connection opened!` + }); + } + + async function connectToRabbit() { + rabbitServer = await getRabbitmqConnection(); - const worker = workerManager(); + if (!ENABLED_SERVICES.CONSUMER) return; - if (ENABLED_SERVICES.CONSUMER) { const consumersFolder = path.resolve(__dirname, 'consumers'); rabbitServer.consumersDirectory(consumersFolder); logger.log({ level: 'info', message: 'Consumer started' }); } - if (ENABLED_SERVICES.SERVER) { + function connectToMemcached() { + cacheServer = setMemcachedConnection(); + cacheServer.connect(); + logger.log({ level: 'info', message: 'Memcached started!' }); + } + + async function startServer() { await webServer.listen(SERVER.PORT); logger.log({ level: 'info', - message: `Server is running on port: ${SERVER.PORT}` + message: `Server is running at: ${SERVER.PORT}` }); - } - if (ENABLED_SERVICES.WORKER) { - worker.start(); - const workersFolder = path.resolve(__dirname, 'workers'); - worker.tasksDirectory(workersFolder); + logger.log({ + level: 'info', + message: `Base URL: http://127.0.0.1:${SERVER.PORT}${SERVER.BASE_URI}` + }); } - if (ENABLED_SERVICES.DASHBOARD) { + async function startAgendaDashboard() { await import('./agendash'); } - if (MEMCACHED.ENABLED) { - setMemcachedConnection(); - makeCacheServer().connect(); - logger.log({ level: 'info', message: 'Memcached started!' }); + async function startWorker() { + worker = workerManager(); + await worker.start(); + const workersFolder = path.resolve(__dirname, 'workers'); + worker.tasksDirectory(workersFolder); } + promises.push(() => checkDatabaseConnection()); + + if (MONGO.ENABLED) promises.push(connectToMongoose); + if (RABBIT.ENABLED) promises.push(connectToRabbit); + if (ENABLED_SERVICES.SERVER) promises.push(startServer); + if (ENABLED_SERVICES.WORKER) promises.push(startWorker); + if (ENABLED_SERVICES.DASHBOARD) promises.push(startAgendaDashboard); + if (MEMCACHED.ENABLED) connectToMemcached(); + + await splitPromises(promises, 2); + async function gracefulShutdown() { try { setTimeout(() => { @@ -98,30 +141,38 @@ export async function bootstrap() { }); } - if (ENABLED_SERVICES.WORKER) { + if (worker) { await worker.stop(); } - await rabbitServer.stop(); + if (rabbitServer) { + await rabbitServer.stop(); + } + logger.log({ level: 'info', message: 'RabbitMq connection closed' }); - await mongoose.disconnect(); - logger.log( - { - level: 'info', - message: 'Mongoose connection closed' - }, - 'offline' - ); + if (mongoose) { + await mongoose.disconnect(); - makeCacheServer().disconnect(); - logger.log({ - level: 'info', - message: 'Memcached connection closed' - }); + logger.log( + { + level: 'info', + message: 'Mongoose connection closed' + }, + 'offline' + ); + } + + if (cacheServer) { + cacheServer.disconnect(); + logger.log({ + level: 'info', + message: 'Memcached connection closed' + }); + } process.exit(0); } catch (error) { diff --git a/src/main/routes/flow-manager/utils/index.ts b/src/main/routes/flow-manager/utils/index.ts deleted file mode 100755 index 586f30c8..00000000 --- a/src/main/routes/flow-manager/utils/index.ts +++ /dev/null @@ -1 +0,0 @@ -export * from './skip'; diff --git a/src/main/util/get-rabbitmq-connection.ts b/src/main/util/get-rabbitmq-connection.ts index e8489a83..b7c99fdf 100644 --- a/src/main/util/get-rabbitmq-connection.ts +++ b/src/main/util/get-rabbitmq-connection.ts @@ -11,6 +11,5 @@ export function getRabbitmqConnection() { host: RABBIT.HOST, port: RABBIT.PORT }) - .setPrefetch(RABBIT.PREFETCH) .start(); } diff --git a/src/main/util/set-memcached-connection.ts b/src/main/util/set-memcached-connection.ts index 7d818dbc..7c59e515 100644 --- a/src/main/util/set-memcached-connection.ts +++ b/src/main/util/set-memcached-connection.ts @@ -2,7 +2,7 @@ import { makeCacheServer } from '@/infra/cache'; import { MEMCACHED } from '@/util'; export function setMemcachedConnection() { - makeCacheServer().setCredentials({ + return makeCacheServer().setCredentials({ host: MEMCACHED.HOST, port: MEMCACHED.PORT, password: MEMCACHED.PASSWORD, diff --git a/src/main/web-server.ts b/src/main/web-server.ts index 4bdce3ed..a0fb8153 100755 --- a/src/main/web-server.ts +++ b/src/main/web-server.ts @@ -1,7 +1,6 @@ import path from 'path'; import { webServer as webServerFactory } from '@/infra/http/util/web-server'; -import { elasticAPM } from '@/util'; import { SERVER } from '@/util/constants'; import cors from '@fastify/cors'; import helmet from '@fastify/helmet'; @@ -12,8 +11,6 @@ import { injectApmTransactionIdOnHeadersMiddleware } from './middlewares'; -elasticAPM(); - const webServer = webServerFactory(); webServer.use(cors, { diff --git a/src/presentation/controllers/example/create-example-controller.ts b/src/presentation/controllers/example/create-example-controller.ts index 87a49b1c..59d5293d 100644 --- a/src/presentation/controllers/example/create-example-controller.ts +++ b/src/presentation/controllers/example/create-example-controller.ts @@ -1,4 +1,4 @@ -import { CommitAll } from '@/data/protocols/utils/transaction'; +import { CommitAll } from '@/data/protocols/util/transaction'; import { Controller } from '@/presentation/protocols'; import { created } from '@/presentation/utils'; import { DICTIONARY, template } from '@/util'; diff --git a/src/presentation/middlewares/authentication-key/validate-authentication-key-middleware.ts b/src/presentation/middlewares/authentication-key/validate-authentication-key-middleware.ts index ce60087f..ee4080e5 100755 --- a/src/presentation/middlewares/authentication-key/validate-authentication-key-middleware.ts +++ b/src/presentation/middlewares/authentication-key/validate-authentication-key-middleware.ts @@ -1,4 +1,4 @@ -import { Logger } from '@/data/protocols/utils'; +import { Logger } from '@/data/protocols/util'; import { ErrorHandler } from '@/domain/usecases'; import { ValidateAuthenticationKey } from '@/domain/usecases/authentication-key/validate-authentication-key'; import { Middleware } from '@/presentation/protocols/middleware'; diff --git a/src/presentation/middlewares/cache/get-cache-value-middleware.ts b/src/presentation/middlewares/cache/get-cache-value-middleware.ts index 4197fafc..8a070f37 100644 --- a/src/presentation/middlewares/cache/get-cache-value-middleware.ts +++ b/src/presentation/middlewares/cache/get-cache-value-middleware.ts @@ -1,4 +1,4 @@ -import { Logger } from '@/data/protocols/utils'; +import { Logger } from '@/data/protocols/util'; import { ErrorHandler, GetCacheValue } from '@/domain/usecases'; import { ExtractValues } from '@/plugin'; import { Middleware } from '@/presentation/protocols'; diff --git a/src/presentation/middlewares/example/create-example-middleware.ts b/src/presentation/middlewares/example/create-example-middleware.ts index 04520270..fe0c4b82 100755 --- a/src/presentation/middlewares/example/create-example-middleware.ts +++ b/src/presentation/middlewares/example/create-example-middleware.ts @@ -1,6 +1,6 @@ import { InferType } from 'yup'; -import { Logger } from '@/data/protocols/utils'; +import { Logger } from '@/data/protocols/util'; import { CreateExample, DataValidation, ErrorHandler } from '@/domain/usecases'; import { ExtractValues } from '@/plugin'; import { Middleware } from '@/presentation/protocols'; diff --git a/src/presentation/middlewares/example/get-example-middleware.ts b/src/presentation/middlewares/example/get-example-middleware.ts index 411a4aa3..82f5d074 100755 --- a/src/presentation/middlewares/example/get-example-middleware.ts +++ b/src/presentation/middlewares/example/get-example-middleware.ts @@ -1,4 +1,4 @@ -import { Logger } from '@/data/protocols/utils'; +import { Logger } from '@/data/protocols/util'; import { ErrorHandler, GetExample } from '@/domain/usecases'; import { ExtractValues } from '@/plugin'; import { Middleware } from '@/presentation/protocols'; diff --git a/src/presentation/middlewares/mq/mq-publish-in-exchange-middleware.ts b/src/presentation/middlewares/mq/mq-publish-in-exchange-middleware.ts index 87803481..571e7469 100644 --- a/src/presentation/middlewares/mq/mq-publish-in-exchange-middleware.ts +++ b/src/presentation/middlewares/mq/mq-publish-in-exchange-middleware.ts @@ -1,4 +1,4 @@ -import { Logger } from '@/data/protocols/utils'; +import { Logger } from '@/data/protocols/util'; import { ErrorHandler, PublishInExchange } from '@/domain/usecases'; import { ExtractValues } from '@/plugin'; import { Middleware } from '@/presentation/protocols'; diff --git a/src/presentation/middlewares/token/validate-token-middleware.ts b/src/presentation/middlewares/token/validate-token-middleware.ts index a3704b4a..08858bde 100755 --- a/src/presentation/middlewares/token/validate-token-middleware.ts +++ b/src/presentation/middlewares/token/validate-token-middleware.ts @@ -1,4 +1,4 @@ -import { Logger } from '@/data/protocols/utils'; +import { Logger } from '@/data/protocols/util'; import { ErrorHandler } from '@/domain/usecases'; import { ValidateToken } from '@/domain/usecases/token'; import { Middleware } from '@/presentation/protocols/middleware'; diff --git a/src/util/constants/environment.ts b/src/util/constants/environment.ts index f2a3e5f4..040fcc3c 100755 --- a/src/util/constants/environment.ts +++ b/src/util/constants/environment.ts @@ -62,15 +62,23 @@ export const DB = { }; export const RABBIT = { + ENABLED: stringToBoolean(process.env.RABBIT_ENABLED) ?? false, USER: process.env.RABBIT_USER || '', PASSWORD: process.env.RABBIT_PASSWORD || '', HOST: process.env.RABBIT_HOST || '', VIRTUAL_HOST: process.env.RABBIT_VIRTUAL_HOST || '', PORT: +(() => process.env.RABBIT_PORT || 5672)(), - PREFETCH: +(() => process.env.RABBIT_PREFETCH || 10)() + DEFAULT_PREFETCH: (() => { + if (process.env.RABBIT_DEFAULT_PREFETCH) { + return Number(process.env.RABBIT_DEFAULT_PREFETCH); + } + + return null; + })() }; export const MONGO = { + ENABLED: stringToBoolean(process.env.MONGO_ENABLED) ?? false, USER: process.env.MONGO_USER || '', PASSWORD: process.env.MONGO_PASSWORD || '', HOST: process.env.MONGO_HOST || '', @@ -83,7 +91,7 @@ export const MONGO = { }; export const MEMCACHED = { - ENABLED: stringToBoolean(process.env.MEMCACHED_ENABLED) || false, + ENABLED: stringToBoolean(process.env.MEMCACHED_ENABLED) ?? false, USER: process.env.MEMCACHED_USER || '', PASSWORD: process.env.MEMCACHED_PASSWORD || '', HOST: process.env.MEMCACHED_HOST || '', @@ -92,21 +100,21 @@ export const MEMCACHED = { }; export const APM = { - ENABLED: stringToBoolean(process.env.APM_ENABLED) || false, + ENABLED: stringToBoolean(process.env.APM_ENABLED) ?? false, SECRET_TOKEN: process.env.APM_SECRET_TOKEN || '', SERVER_URL: process.env.APM_SERVER_URL || '', ENVIRONMENT: process.env.APM_ENVIRONMENT || '' }; export const ELASTICSEARCH = { - ENABLED: stringToBoolean(process.env.ELASTICSEARCH_ENABLED) || false, + ENABLED: stringToBoolean(process.env.ELASTICSEARCH_ENABLED) ?? false, USERNAME: process.env.ELASTICSEARCH_USERNAME || '', PASSWORD: process.env.ELASTICSEARCH_PASSWORD || '', SERVER_URL: process.env.ELASTICSEARCH_SERVER_URL || '' }; export const REPROCESSING = { - ENABLED: stringToBoolean(process.env.REPROCESSING_ENABLED) || false, + ENABLED: stringToBoolean(process.env.REPROCESSING_ENABLED) ?? false, MAX_TRIES: +(() => process.env.REPROCESSING_MAX_TRIES || 1)(), DELAYS: process.env.REPROCESSING_DELAYS?.split(',').map(Number) || [], MODE: process.env.REPROCESSING_MODE || 'STOPPED_MIDDLEWARE' diff --git a/src/util/observability/apm/index.ts b/src/util/observability/apm/index.ts index 08cc4a23..891784e5 100755 --- a/src/util/observability/apm/index.ts +++ b/src/util/observability/apm/index.ts @@ -1,4 +1,4 @@ export * from './factory'; -export * from './utils/get-apm-transaction-id'; -export * from './utils/trace/span-decorator'; -export * from './utils/trace/transaction-decorator'; +export * from './util/get-apm-transaction-id'; +export * from './util/trace/span-decorator'; +export * from './util/trace/transaction-decorator'; diff --git a/src/util/observability/apm/utils/get-apm-transaction-id.ts b/src/util/observability/apm/util/get-apm-transaction-id.ts similarity index 100% rename from src/util/observability/apm/utils/get-apm-transaction-id.ts rename to src/util/observability/apm/util/get-apm-transaction-id.ts diff --git a/src/util/observability/apm/utils/index.ts b/src/util/observability/apm/util/index.ts similarity index 100% rename from src/util/observability/apm/utils/index.ts rename to src/util/observability/apm/util/index.ts diff --git a/src/util/observability/apm/util/trace/span-decorator.ts b/src/util/observability/apm/util/trace/span-decorator.ts new file mode 100755 index 00000000..a53ee38a --- /dev/null +++ b/src/util/observability/apm/util/trace/span-decorator.ts @@ -0,0 +1,111 @@ +import { Span } from 'elastic-apm-node'; + +import { getName, getType, labelParamsToString, searchLabels } from './util'; +import { elasticAPM } from '../../factory'; +import { SpanOptions, TraceLabels } from './types'; + +type TraceParams = { + options: SpanOptions; + params?: TraceLabels; + result?: TraceLabels; +}; + +export function apmSpan({ options, params, result }: TraceParams) { + return function ( + _target: Object, + _key: string | symbol, + descriptor: PropertyDescriptor + ) { + const apm = elasticAPM().getAPM(); + + const setTypeAndSubtype = (subType: string, instanceOfSpan: Span) => { + const type = getType(subType); + if (type) instanceOfSpan.type = type; + + instanceOfSpan.subtype = subType; + }; + const setParams = ( + params: TraceLabels, + args: unknown[], + instanceOfSpan: Span + ) => { + const labels = searchLabels(params, args); + const labelsToString = labelParamsToString(labels); + instanceOfSpan.addLabels(labelsToString, true); + }; + const setResult = ( + result: TraceLabels, + response: unknown, + instanceOfSpan: Span + ) => { + const labels = searchLabels(result, response); + const labelsToString = labelParamsToString(labels); + instanceOfSpan.addLabels(labelsToString, true); + }; + + const originalHandler = descriptor.value; + const isAsync = originalHandler.constructor.name === 'AsyncFunction'; + + if (isAsync) { + descriptor.value = async function (...args: T[]) { + if (!apm?.currentTransaction) return originalHandler.apply(this, args); + + const spanName = getName(args, options); + + const span = apm.currentTransaction.startSpan(spanName)!; + + const response = await originalHandler.apply(this, args); + + if (options.subType) { + setTypeAndSubtype(options.subType, span); + } + + if (params) { + setParams(params, args, span); + } + + if (!result) { + span.end(); + return response; + } + + setResult(result, response, span); + span.end(); + + return response; + }; + + return descriptor; + } + + descriptor.value = function (...args: T[]) { + if (!apm?.currentTransaction) return originalHandler.apply(this, args); + + const spanName = getName(args, options); + + const span = apm.currentTransaction.startSpan(spanName)!; + const response = originalHandler.apply(this, args); + + if (options.subType) { + setTypeAndSubtype(options.subType, span); + } + + if (params) { + setParams(params, args, span); + } + + if (!result) { + span.end(); + return response; + } + + setResult(result, response, span); + + span.end(); + + return response; + }; + + return descriptor; + }; +} diff --git a/src/util/observability/apm/util/trace/transaction-decorator.ts b/src/util/observability/apm/util/trace/transaction-decorator.ts new file mode 100755 index 00000000..ad2c03fb --- /dev/null +++ b/src/util/observability/apm/util/trace/transaction-decorator.ts @@ -0,0 +1,100 @@ +import { Transaction } from 'elastic-apm-node'; + +import { getName, labelParamsToString, searchLabels } from './util'; +import { elasticAPM } from '../../factory'; +import { TraceLabels, TransactionOptions } from './types'; + +type TransactionParams = { + options: TransactionOptions; + params?: TraceLabels; + result?: TraceLabels; +}; + +export function apmTransaction({ options, params, result }: TransactionParams) { + return function ( + _target: Object, + _key: string | symbol, + descriptor: PropertyDescriptor + ) { + const setParams = ( + params: TraceLabels, + args: unknown[], + instanceOfTransaction: Transaction + ) => { + const labels = searchLabels(params, args); + const labelsToString = labelParamsToString(labels); + instanceOfTransaction.addLabels(labelsToString, true); + }; + const setResult = ( + result: TraceLabels, + response: unknown, + instanceOfTransaction: Transaction + ) => { + const labels = searchLabels(result, response); + const labelsToString = labelParamsToString(labels); + instanceOfTransaction.addLabels(labelsToString, true); + }; + const apm = elasticAPM().getAPM(); + + const originalHandler = descriptor.value; + const isAsync = originalHandler.constructor.name === 'AsyncFunction'; + + if (isAsync) { + descriptor.value = async function (...args: T[]) { + const transactionName = getName(args, options); + const transaction = apm?.startTransaction(transactionName); + + const response = await originalHandler.apply(this, args); + + if (!transaction) return response; + + if (options.type) transaction.type = options.type; + + if (params) { + setParams(params, args, transaction); + } + + if (!result) { + transaction.end(); + return response; + } + + setResult(result, response, transaction); + + transaction.end(); + + return response; + }; + + return descriptor; + } + + descriptor.value = function (...args: T[]) { + const transactionName = getName(args, options); + const response = originalHandler.apply(this, args); + + const transaction = apm?.startTransaction(transactionName); + + if (!transaction) return response; + + if (options.type) transaction.type = options.type; + + if (params) { + setParams(params, args, transaction); + } + + if (!result) { + transaction.end(); + return response; + } + + setResult(result, response, transaction); + + transaction.end(); + + return response; + }; + + return descriptor; + }; +} diff --git a/src/util/observability/apm/utils/trace/trace-protocols.ts b/src/util/observability/apm/util/trace/types.ts similarity index 68% rename from src/util/observability/apm/utils/trace/trace-protocols.ts rename to src/util/observability/apm/util/trace/types.ts index 94fcc132..6f098d98 100755 --- a/src/util/observability/apm/utils/trace/trace-protocols.ts +++ b/src/util/observability/apm/util/trace/types.ts @@ -2,17 +2,18 @@ interface TraceObject { [label: string]: string | number; } -export type traceLabels = number[] | string[] | TraceObject; +export type TraceLabels = number[] | string[] | TraceObject; -type appSubtype = +type AppSubtype = | 'inferred' | 'controller' | 'graphql' | 'mailer' | 'resource' + | 'worker' | 'handler' - | 'worker'; -type dbSubtype = + | 'function'; +type DbSubtype = | 'cassandra' | 'cosmos-bd' | 'db2' @@ -35,7 +36,7 @@ type dbSubtype = | 'sqlite3' | 'sql-server' | 'unknown'; -type externalSubtype = 'dubbo' | 'grpc' | 'http'; +type ExternalSubtype = 'dubbo' | 'grpc' | 'http'; type jsonSubtype = 'parse' | 'generate'; type messagingSubtype = | 'azure-queue' @@ -45,8 +46,8 @@ type messagingSubtype = | 'rabbitmq' | 'sns' | 'sqs'; -type storageSubtype = 'azure-blob' | 'azure-file' | 'azure-table' | 's3'; -type websocketSubtype = 'send'; +type StorageSubtype = 'azure-blob' | 'azure-file' | 'azure-table' | 's3'; +type WebsocketSubtype = 'send'; type LiteralUnion = T | (U & Object); @@ -54,13 +55,13 @@ export interface SpanOptions { name?: string; nameByParameter?: string | number; subType?: LiteralUnion< - | appSubtype - | dbSubtype - | externalSubtype + | AppSubtype + | DbSubtype + | ExternalSubtype | jsonSubtype | messagingSubtype - | storageSubtype - | websocketSubtype + | StorageSubtype + | WebsocketSubtype >; } @@ -68,12 +69,12 @@ export interface TransactionOptions { name?: string; nameByParameter?: string | number; type?: LiteralUnion< - | appSubtype - | dbSubtype - | externalSubtype + | AppSubtype + | DbSubtype + | ExternalSubtype | jsonSubtype | messagingSubtype - | storageSubtype - | websocketSubtype + | StorageSubtype + | WebsocketSubtype >; } diff --git a/src/util/observability/apm/utils/trace/index.ts b/src/util/observability/apm/util/trace/util.ts similarity index 83% rename from src/util/observability/apm/utils/trace/index.ts rename to src/util/observability/apm/util/trace/util.ts index 2f0acafb..13857391 100755 --- a/src/util/observability/apm/utils/trace/index.ts +++ b/src/util/observability/apm/util/trace/util.ts @@ -1,12 +1,26 @@ // TODO: We should seek better alternatives in the future, but for now, it's not a problem. /* eslint-disable @typescript-eslint/no-explicit-any */ -import { traceLabels, TransactionOptions } from './trace-protocols'; +import { TraceLabels, TransactionOptions } from './types'; export const searchLabels = ( - labels: traceLabels | undefined, + labels: TraceLabels | undefined, args: any | undefined -): Object => - Object.entries(args ?? {}).reduce((acc, [key, value]) => { +): Object => { + const entries = Object.entries(labels || {}); + + const areTheyAllIndices = entries.every( + ([_, value]) => typeof value === 'number' + ); + + if (areTheyAllIndices) { + const newEntries = entries.map(([key, index]) => { + return [key, args[index]]; + }); + + return Object.fromEntries(newEntries); + } + + return Object.entries(args ?? {}).reduce((acc, [key, value]) => { const label = Object.entries(labels ?? {}).find( // eslint-disable-next-line eqeqeq ([, labelValue]) => labelValue == key @@ -21,6 +35,7 @@ export const searchLabels = ( return { ...acc, [label[0]]: value }; }, {}); +}; export const labelParamsToString = (params: object) => { return Object.entries(params).reduce((acc, [key, value]) => { @@ -55,7 +70,8 @@ export const getType = (subType: string): string | void => { 'graphql', 'mailer', 'resource', - 'handler' + 'handler', + 'function' ] }, { diff --git a/src/util/observability/apm/utils/trace/span-decorator.ts b/src/util/observability/apm/utils/trace/span-decorator.ts deleted file mode 100755 index 066c4bc3..00000000 --- a/src/util/observability/apm/utils/trace/span-decorator.ts +++ /dev/null @@ -1,56 +0,0 @@ -import { getName, getType, labelParamsToString, searchLabels } from '.'; -import { elasticAPM } from '../../factory'; -import { SpanOptions, traceLabels } from './trace-protocols'; - -type TraceParams = { - options: SpanOptions; - params?: traceLabels; - result?: traceLabels; -}; - -export function apmSpan({ options, params, result }: TraceParams) { - return function ( - _target: Object, - _key: string | symbol, - descriptor: PropertyDescriptor - ) { - const apm = elasticAPM().getAPM(); - - const originalMethod = descriptor.value; - - descriptor.value = async function (...args: T[]) { - const spanName = getName(args, options); - - const span = apm?.currentTransaction?.startSpan(spanName); - - const methodResult = await originalMethod.apply(this, args); - - if (!span) return methodResult; - - if (options?.subType) { - const type = getType(options.subType); - if (type) span.type = type; - - span.subtype = options.subType; - } - - if (params) { - const labels = searchLabels(params, args); - const labelsToString = labelParamsToString(labels); - span.addLabels(labelsToString, true); - } - - if (result) { - const labels = searchLabels(result, methodResult); - const labelsToString = labelParamsToString(labels); - span.addLabels(labelsToString, true); - } - - span.end(); - - return methodResult; - }; - - return descriptor; - }; -} diff --git a/src/util/observability/apm/utils/trace/transaction-decorator.ts b/src/util/observability/apm/utils/trace/transaction-decorator.ts deleted file mode 100755 index 5a761d48..00000000 --- a/src/util/observability/apm/utils/trace/transaction-decorator.ts +++ /dev/null @@ -1,51 +0,0 @@ -import { getName, labelParamsToString, searchLabels } from '.'; -import { elasticAPM } from '../../factory'; -import { traceLabels, TransactionOptions } from './trace-protocols'; - -type TransactionParams = { - options: TransactionOptions; - params?: traceLabels; - result?: traceLabels; -}; - -export function apmTransaction({ options, params, result }: TransactionParams) { - return function ( - _target: Object, - _key: string | symbol, - descriptor: PropertyDescriptor - ) { - const apm = elasticAPM().getAPM(); - - const originalMethod = descriptor.value; - - descriptor.value = async function (...args: T[]) { - const transactionName = getName(args, options); - - const transaction = apm?.startTransaction(transactionName); - - const methodResult = await originalMethod.apply(this, args); - - if (!transaction) return methodResult; - - if (options?.type) transaction.type = options?.type; - - if (params) { - const labels = searchLabels(params, args); - const labelsToString = labelParamsToString(labels); - transaction.addLabels(labelsToString, true); - } - - if (result) { - const labels = searchLabels(result, methodResult); - const labelsToString = labelParamsToString(labels); - transaction.addLabels(labelsToString, true); - } - - transaction.end(); - - return methodResult; - }; - - return descriptor; - }; -} diff --git a/src/util/observability/loggers/default/custom-logger.ts b/src/util/observability/loggers/default/custom-logger.ts index 9015fc2d..07bcedb5 100755 --- a/src/util/observability/loggers/default/custom-logger.ts +++ b/src/util/observability/loggers/default/custom-logger.ts @@ -6,7 +6,7 @@ import { ElasticsearchTransport } from 'winston-elasticsearch'; import ecsFormat from '@elastic/ecs-winston-format'; import pkg from '@/../package.json'; import { createMongoLog } from '@/main/facades'; -import { ELASTICSEARCH, LOGGER } from '@/util/constants'; +import { ELASTICSEARCH, LOGGER, MONGO } from '@/util/constants'; import { elasticAPM, getAPMTransactionIds } from '../../apm'; import { defaultIndexTemplate } from './elasticsearch'; @@ -117,7 +117,13 @@ export class CustomLogger { public log(params: LogParams, type?: LoggerType): void; public log(data: LogParams | Error, type: LoggerType = 'default'): void { if (!LOGGER.ENABLED) return; - const logger = type === 'offline' ? this.offlineLogger : this.defaultLogger; + + let logger = this.defaultLogger; + + if (MONGO.ENABLED && type === 'offline') { + logger = this.offlineLogger; + } + this.handler(data, logger); } diff --git a/test/integration/http/endpoint-with-flow-manager.spec.ts b/test/integration/http/endpoint-with-flow-manager.spec.ts index 2b901c92..6c4980c8 100644 --- a/test/integration/http/endpoint-with-flow-manager.spec.ts +++ b/test/integration/http/endpoint-with-flow-manager.spec.ts @@ -26,7 +26,7 @@ describe('Endpoint With Flow Manager', () => { beforeAll(async () => { const server = webServer(); - server.router().get('/test/:id', flowManager(...condition), () => { + server.router().get('/test/:id', flowManager(condition), () => { notBeCalledMock(); }); await server.ready(); diff --git a/test/unit/data/protocols/mq/publish-in-exchange-service-stub.ts b/test/unit/data/protocols/mq/publish-in-exchange-service-stub.ts index 268ff143..c8d15e0b 100644 --- a/test/unit/data/protocols/mq/publish-in-exchange-service-stub.ts +++ b/test/unit/data/protocols/mq/publish-in-exchange-service-stub.ts @@ -1,5 +1,7 @@ import { PublishInExchangeService } from '@/data/protocols/mq'; export class PublishInExchangeServiceStub implements PublishInExchangeService { - async publishInExchange(): Promise {} + async publishInExchange(): Promise { + return true; + } } diff --git a/test/unit/data/protocols/mq/publish-in-queue-service-stub.ts b/test/unit/data/protocols/mq/publish-in-queue-service-stub.ts index 2c2ee357..fe7287c5 100644 --- a/test/unit/data/protocols/mq/publish-in-queue-service-stub.ts +++ b/test/unit/data/protocols/mq/publish-in-queue-service-stub.ts @@ -1,5 +1,7 @@ import { PublishInQueueService } from '@/data/protocols/mq'; export class PublishInQueueServiceStub implements PublishInQueueService { - async publishInQueue(): Promise {} + async publishInQueue(): Promise { + return true; + } } diff --git a/test/unit/infra/http/utils/http-server.spec.ts b/test/unit/infra/http/utils/http-server.spec.ts index dc14aedb..d16dd9ad 100644 --- a/test/unit/infra/http/utils/http-server.spec.ts +++ b/test/unit/infra/http/utils/http-server.spec.ts @@ -145,7 +145,6 @@ describe('HttpServer', () => { await sut.refresh(); expect(close).toHaveBeenCalledTimes(1); - expect(close).toHaveBeenCalledWith(expect.any(Function)); }); }); describe('#close', () => { @@ -164,7 +163,6 @@ describe('HttpServer', () => { sut.close(); expect(close).toHaveBeenCalledTimes(1); - expect(close).toHaveBeenCalledWith(expect.any(Function)); }); }); describe('#route', () => { diff --git a/test/unit/job/jobs/cache/get-cache-value-job.spec.ts b/test/unit/job/jobs/cache/get-cache-value-job.spec.ts index 21814b2b..08c4331f 100644 --- a/test/unit/job/jobs/cache/get-cache-value-job.spec.ts +++ b/test/unit/job/jobs/cache/get-cache-value-job.spec.ts @@ -1,4 +1,4 @@ -import { Logger } from '@/data/protocols/utils'; +import { Logger } from '@/data/protocols/util'; import { ErrorHandler, GetCacheValue } from '@/domain/usecases'; import { GetCacheValueMiddleware } from '@/presentation/middlewares'; import { SharedState } from '@/job/protocols'; diff --git a/test/unit/job/jobs/event/create-event-job.spec.ts b/test/unit/job/jobs/event/create-event-job.spec.ts index 17b438af..b59dcd0e 100644 --- a/test/unit/job/jobs/event/create-event-job.spec.ts +++ b/test/unit/job/jobs/event/create-event-job.spec.ts @@ -1,4 +1,4 @@ -import { Logger } from '@/data/protocols/utils'; +import { Logger } from '@/data/protocols/util'; import { CreateEvent, ErrorHandler } from '@/domain/usecases'; import { CreateEventJob } from '@/job/jobs/event'; import { CreateEventStub } from '@/test/unit/domain/usecases'; diff --git a/test/unit/job/jobs/event/update-event-job.spec.ts b/test/unit/job/jobs/event/update-event-job.spec.ts index 6e87ebb8..d0bc5f59 100644 --- a/test/unit/job/jobs/event/update-event-job.spec.ts +++ b/test/unit/job/jobs/event/update-event-job.spec.ts @@ -1,4 +1,4 @@ -import { Logger } from '@/data/protocols/utils'; +import { Logger } from '@/data/protocols/util'; import { ErrorHandler, UpdateEvent } from '@/domain/usecases'; import { UpdateEventJob } from '@/job/jobs/event'; import { UpdateEventStub } from '@/test/unit/domain/usecases'; diff --git a/test/unit/main/adapters/flow-manager-adapter.spec.ts b/test/unit/main/adapters/flow-manager-adapter.spec.ts index 353a2a99..f7797d8c 100644 --- a/test/unit/main/adapters/flow-manager-adapter.spec.ts +++ b/test/unit/main/adapters/flow-manager-adapter.spec.ts @@ -36,7 +36,7 @@ describe('flowManager', () => { } }); - await flow(webServer().adapter(flowManager(...condition)))(); + await flow(webServer().adapter(flowManager(condition)))(); expect(statusMock).toHaveBeenCalledTimes(1); expect(statusMock).toHaveBeenCalledWith(200); @@ -60,7 +60,7 @@ describe('flowManager', () => { } }); - await flow(webServer().adapter(flowManager(...condition)))(); + await flow(webServer().adapter(flowManager(condition)))(); expect(statusMock).toHaveBeenCalledTimes(1); expect(statusMock).toHaveBeenCalledWith(200); diff --git a/test/unit/presentation/controllers/example/create-example-controller.spec.ts b/test/unit/presentation/controllers/example/create-example-controller.spec.ts index e0876d8c..f5a53471 100644 --- a/test/unit/presentation/controllers/example/create-example-controller.spec.ts +++ b/test/unit/presentation/controllers/example/create-example-controller.spec.ts @@ -1,4 +1,4 @@ -import { CommitAll } from '@/data/protocols/utils'; +import { CommitAll } from '@/data/protocols/util'; import { CreateExampleController } from '@/presentation/controllers'; import { commitAllStub } from '@/test/unit/data/protocols/utils'; import { databaseTransactionMock } from '@/test/unit/domain/models'; diff --git a/test/unit/presentation/middlewares/mq/mq-publish-in-exchange-middleware.spec.ts b/test/unit/presentation/middlewares/mq/mq-publish-in-exchange-middleware.spec.ts index ef83176b..5ca96152 100644 --- a/test/unit/presentation/middlewares/mq/mq-publish-in-exchange-middleware.spec.ts +++ b/test/unit/presentation/middlewares/mq/mq-publish-in-exchange-middleware.spec.ts @@ -1,4 +1,4 @@ -import { Logger } from '@/data/protocols/utils'; +import { Logger } from '@/data/protocols/util'; import { ErrorHandler } from '@/domain/usecases'; import { MqPublishInExchangeMiddleware } from '@/presentation/middlewares'; import { ModelMock } from '@/test/unit/domain/models'; diff --git a/test/util/logger-stub.ts b/test/util/logger-stub.ts index 3d69a34b..3bedb33a 100644 --- a/test/util/logger-stub.ts +++ b/test/util/logger-stub.ts @@ -1,4 +1,4 @@ -import { Logger } from '@/data/protocols/utils'; +import { Logger } from '@/data/protocols/util'; export class LoggerStub implements Logger { log(params: {