From 2383ed93741aefccda4dc4c8ff07cb92d10dad8d Mon Sep 17 00:00:00 2001 From: cyri113 Date: Wed, 28 May 2025 11:33:53 +0200 Subject: [PATCH 1/5] Refactor docker-compose.yml by removing commented-out services and cleaning up configuration. --- docker-compose.yml | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 2f8c75d..2a047d0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -32,21 +32,4 @@ services: interval: 1m30s timeout: 10s retries: 2 - start_period: 30s - - # telegram-bot: - # <<: [ *telegram-common ] - # container_name: telegram-bot - # command: npm run start:dev bot - - question-service-ext: - image: ghcr.io/togethercrew/question-service:main - platform: linux/x86_64 - ports: - - 9999:80 - -# volumes: -# mongo_data: -# neo4j_data: -# neo4j_import: -# neo4j_plugins: + start_period: 30s \ No newline at end of file From 20360bd47d87cd427aa7db96272049877525db0d Mon Sep 17 00:00:00 2001 From: cyri113 Date: Wed, 28 May 2025 11:34:06 +0200 Subject: [PATCH 2/5] Add VerifyHandler to handle Telegram verification requests with error handling and response management. --- apps/bot/src/handlers/verify.handler.ts | 33 +++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 apps/bot/src/handlers/verify.handler.ts diff --git a/apps/bot/src/handlers/verify.handler.ts b/apps/bot/src/handlers/verify.handler.ts new file mode 100644 index 0000000..2a36bfc --- /dev/null +++ b/apps/bot/src/handlers/verify.handler.ts @@ -0,0 +1,33 @@ +import { Context } from 'grammy'; +import { BaseHandler } from './base.handler'; + +export class VerifyHandler extends BaseHandler { + async handle(ctx: Context): Promise { + const token = ctx.match; + const chat = ctx.chat; + const from = ctx.from; + + if (!token || !chat || !from) { + await ctx.reply( + 'Not enough data to complete this request. Reach out to our support team.', + ); + return; + } + + try { + const handle = await this.temporalClient.start('TelegramVerifyWorkflow', { + taskQueue: this.configService.get('temporal.queue'), + args: [{ token, chat, from }], + workflowId: `telegram:verify:${token}`, + }); + + const result = await handle.result(); + await ctx.reply(result); + } catch (error) { + this.logger.error(error); + await ctx.reply( + 'Something unexpected happened. Reach out to our support team.', + ); + } + } +} From 674e4a8da7fc87f5e092ab758f1e4a74549a3a46 Mon Sep 17 00:00:00 2001 From: cyri113 Date: Wed, 28 May 2025 11:34:19 +0200 Subject: [PATCH 3/5] Add BaseHandler class to establish a foundation for Telegram event handling with Temporal integration. --- apps/bot/src/handlers/base.handler.ts | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 apps/bot/src/handlers/base.handler.ts diff --git a/apps/bot/src/handlers/base.handler.ts b/apps/bot/src/handlers/base.handler.ts new file mode 100644 index 0000000..145eb39 --- /dev/null +++ b/apps/bot/src/handlers/base.handler.ts @@ -0,0 +1,16 @@ +import { Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { WorkflowClient } from '@temporalio/client'; +import { Context, FilterQuery } from 'grammy'; +import { InjectTemporalClient } from 'nestjs-temporal'; + +export abstract class BaseHandler { + protected readonly logger = new Logger(this.constructor.name); + + constructor( + @InjectTemporalClient() + protected readonly temporalClient: WorkflowClient, + protected readonly configService: ConfigService, + ) { } + abstract handle(ctx: Context, event?: FilterQuery): Promise; +} From d941d894becaad94286b011ab01ac5406b134df5 Mon Sep 17 00:00:00 2001 From: cyri113 Date: Wed, 28 May 2025 11:34:32 +0200 Subject: [PATCH 4/5] Add HandlersModule and implement QuestionHandler, UpdateHandler, and SummaryHandler for Telegram event processing with Temporal integration. --- apps/bot/src/handlers/handlers.module.ts | 14 ++++++ apps/bot/src/handlers/question.handler.ts | 53 +++++++++++++++++++++++ apps/bot/src/handlers/summary.handler.ts | 38 ++++++++++++++++ apps/bot/src/handlers/update.handler.ts | 17 ++++++++ 4 files changed, 122 insertions(+) create mode 100644 apps/bot/src/handlers/handlers.module.ts create mode 100644 apps/bot/src/handlers/question.handler.ts create mode 100644 apps/bot/src/handlers/summary.handler.ts create mode 100644 apps/bot/src/handlers/update.handler.ts diff --git a/apps/bot/src/handlers/handlers.module.ts b/apps/bot/src/handlers/handlers.module.ts new file mode 100644 index 0000000..b7204af --- /dev/null +++ b/apps/bot/src/handlers/handlers.module.ts @@ -0,0 +1,14 @@ +import { Module } from '@nestjs/common'; +import { ConfigModule } from '@nestjs/config'; +import { TemporalModule } from 'nestjs-temporal'; +import { QuestionHandler } from './question.handler'; +import { UpdateHandler } from './update.handler'; +import { VerifyHandler } from './verify.handler'; +import { SummaryHandler } from './summary.handler'; + +@Module({ + imports: [ConfigModule, TemporalModule], + providers: [QuestionHandler, UpdateHandler, VerifyHandler, SummaryHandler], + exports: [QuestionHandler, UpdateHandler, VerifyHandler, SummaryHandler], +}) +export class HandlersModule { } diff --git a/apps/bot/src/handlers/question.handler.ts b/apps/bot/src/handlers/question.handler.ts new file mode 100644 index 0000000..17ff786 --- /dev/null +++ b/apps/bot/src/handlers/question.handler.ts @@ -0,0 +1,53 @@ +import { Context, FilterQuery } from 'grammy'; +import { BaseHandler } from './base.handler'; +import { UpdateEvent } from '@app/common'; + +const HIVEMIND_BLOCKLIST = ['-1002141367711']; // TODO: Move this to config + +export class QuestionHandler extends BaseHandler { + async handle(ctx: Context, event?: FilterQuery): Promise { + try { + if ( + event !== UpdateEvent.CHAT_MEMBER && + ctx.update.message?.from && + !ctx.update.message.from.is_bot && + ctx.update.message.text?.trim().length > 0 + ) { + const update = ctx.update; + + if (HIVEMIND_BLOCKLIST.includes(update.message?.chat.id.toString())) { + this.logger.log( + `${update.message?.chat.id} is on a blocklist. Skipping question.`, + ); + return; + } + + const result = await this.temporalClient.execute( + 'TelegramQuestionWorkflow', + { + taskQueue: 'TEMPORAL_QUEUE_HEAVY', + args: [{ update }], + workflowId: `telegram:question:${update.update_id}`, + }, + ); + + if (!result || result.length === 0) { + this.logger.log(`No reply from hivemind. Skipping question.`); + return; + } + + const other = { + reply_parameters: { + message_id: update.message.message_id, + }, + }; + + this.logger.log('Reply from hivemind:', result); + await ctx.reply(result, other); + } + } catch (error) { + this.logger.error(error); + await ctx.reply('Something unexpected happened. Please try again later.'); + } + } +} diff --git a/apps/bot/src/handlers/summary.handler.ts b/apps/bot/src/handlers/summary.handler.ts new file mode 100644 index 0000000..2a5a9a6 --- /dev/null +++ b/apps/bot/src/handlers/summary.handler.ts @@ -0,0 +1,38 @@ +import { Context } from 'grammy'; +import { BaseHandler } from './base.handler'; + +export class SummaryHandler extends BaseHandler { + async handle(ctx: Context): Promise { + try { + console.log('SummaryHandler', ctx.update.update_id); + const update = ctx.update; + + const result = await this.temporalClient.execute( + 'TelegramSummaryWorkflow', + { + taskQueue: 'TEMPORAL_QUEUE_HEAVY', + args: [{ update }], + workflowId: `telegram:summary:${update.update_id}`, + }, + ); + + if (!result || result.length === 0) { + // this.logger.log(`No reply from hivemind. Skipping question.`); + return; + } + + const other = { + reply_parameters: { + message_id: update.message.message_id, + }, + }; + + // this.logger.log('Reply from hivemind:', result); + await ctx.reply(result, other); + } catch (error) { + // this.logger.error(error); + console.error('SummaryHandler error', error); + await ctx.reply('Something unexpected happened. Please try again later.'); + } + } +} diff --git a/apps/bot/src/handlers/update.handler.ts b/apps/bot/src/handlers/update.handler.ts new file mode 100644 index 0000000..527150f --- /dev/null +++ b/apps/bot/src/handlers/update.handler.ts @@ -0,0 +1,17 @@ +import { Context } from 'grammy'; +import { BaseHandler } from './base.handler'; + +export class UpdateHandler extends BaseHandler { + async handle(ctx: Context): Promise { + try { + const update = ctx.update; + await this.temporalClient.start('TelegramUpdateWorkflow', { + taskQueue: 'TEMPORAL_QUEUE_LIGHT', + args: [{ update }], + workflowId: `telegram:update:${update.update_id}`, + }); + } catch (error) { + this.logger.error(error); + } + } +} From 4bc80feb3551484616b6b340c2430badab305435 Mon Sep 17 00:00:00 2001 From: cyri113 Date: Wed, 28 May 2025 11:34:41 +0200 Subject: [PATCH 5/5] Refactor BotService to utilize new handlers for command processing and remove unused Temporal client integration. Add HandlersModule to manage event handling more effectively. --- apps/bot/src/bot.module.ts | 17 +-- apps/bot/src/bot.service.ts | 249 +++++++++++------------------------- 2 files changed, 82 insertions(+), 184 deletions(-) diff --git a/apps/bot/src/bot.module.ts b/apps/bot/src/bot.module.ts index c1a42f3..5733b37 100644 --- a/apps/bot/src/bot.module.ts +++ b/apps/bot/src/bot.module.ts @@ -8,6 +8,7 @@ import { TemporalModule } from 'nestjs-temporal'; import { temporalConfig } from '@app/common/config/temporal.config'; import { Connection } from '@temporalio/client'; import { RabbitMQModule } from './rabbitmq/rabbitmq.module'; +import { HandlersModule } from './handlers/handlers.module'; @Module({ imports: [ @@ -17,20 +18,14 @@ import { RabbitMQModule } from './rabbitmq/rabbitmq.module'; isGlobal: true, }), RabbitMQModule, + HandlersModule, TemporalModule.registerClientAsync({ inject: [ConfigService], useFactory: async (config: ConfigService) => { const uri = config.get('temporal.uri'); - console.log('uri', uri); - let connection: Connection; - try { - connection = await Connection.connect({ - address: uri, - }); - } catch (error) { - console.error('Failed to connect to Temporal:', error); - throw error; - } + const connection: Connection = await Connection.connect({ + address: uri, + }); return { connection }; }, }), @@ -38,4 +33,4 @@ import { RabbitMQModule } from './rabbitmq/rabbitmq.module'; controllers: [BotController], providers: [BotService], }) -export class BotModule {} +export class BotModule { } diff --git a/apps/bot/src/bot.service.ts b/apps/bot/src/bot.service.ts index b481500..b3473be 100644 --- a/apps/bot/src/bot.service.ts +++ b/apps/bot/src/bot.service.ts @@ -1,13 +1,13 @@ import { IgnoreEvent, UpdateEvent } from '@app/common'; import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; -import { WorkflowClient } from '@temporalio/client'; import { API_CONSTANTS, Bot, Context, RawApi } from 'grammy'; import { Other } from 'grammy/out/core/api'; -import { Message, Update } from 'grammy/types'; -import { InjectTemporalClient } from 'nestjs-temporal'; - -const HIVEMIND_BLOCKLIST = ['-1002141367711']; +import { Message } from 'grammy/types'; +import { QuestionHandler } from './handlers/question.handler'; +import { UpdateHandler } from './handlers/update.handler'; +import { VerifyHandler } from './handlers/verify.handler'; +import { SummaryHandler } from './handlers/summary.handler'; @Injectable() export class BotService implements OnModuleInit { @@ -16,14 +16,23 @@ export class BotService implements OnModuleInit { constructor( private readonly configService: ConfigService, - @InjectTemporalClient() - private readonly temporalClient: WorkflowClient, + private readonly questionHandler: QuestionHandler, + private readonly updateHandler: UpdateHandler, + private readonly verifyHandler: VerifyHandler, + private readonly summaryHandler: SummaryHandler, ) { - this.bot = new Bot(configService.get('telegram.token')); + this.bot = new Bot(this.configService.get('telegram.token')); this.bot.command('start', this.start); - this.bot.command('verify', this.isAdmin, this.verify); - this.bot.command('summary', this.getSummary); + this.bot.command( + 'verify', + this.isAdmin, + this.verifyHandler.handle.bind(this.verifyHandler), + ); + this.bot.command( + 'summary', + this.summaryHandler.handle.bind(this.summaryHandler), + ); Object.values(IgnoreEvent).map((event) => { this.bot.on(event, () => { @@ -34,20 +43,8 @@ export class BotService implements OnModuleInit { Object.values(UpdateEvent).map((event) => { this.bot.on(event, (ctx: Context) => { this.logger.log(`Received ${event} from ${ctx.chat.id}`); - this.processEvent(event, ctx.update); - try { - if ( - event !== UpdateEvent.CHAT_MEMBER && - ctx.update.message?.from && - !ctx.update.message.from.is_bot && - ctx.update.message.text?.trim().length > 0 - ) { - this.processQuestion(ctx); - } - } catch (error) { - this.logger.error(error); - } - + this.updateHandler.handle(ctx); + this.questionHandler.handle(ctx); return; }); }); @@ -123,153 +120,59 @@ export class BotService implements OnModuleInit { await ctx.reply(text, { parse_mode: 'HTML' }); }; - protected verify = async (ctx: Context) => { - const token = ctx.match; - const chat = ctx.chat; - const from = ctx.from; - - if (!token || !chat || !from) { - await ctx.reply( - 'Not enough data to complete this request. Reach out to our support team.', - ); - return; - } - - try { - const handle = await this.temporalClient.start('TelegramVerifyWorkflow', { - taskQueue: this.configService.get('temporal.queue'), - args: [{ token, chat, from }], - workflowId: `telegram:verify:${token}`, - }); - - const result = await handle.result(); - await ctx.reply(result); - } catch (error) { - console.error(error); - await ctx.reply( - 'Something unexpected happened. Reach out to our support team.', - ); - } - }; - - protected processEvent = async (event: string, update: Update) => { - try { - await this.temporalClient.start('TelegramEventWorkflow', { - taskQueue: this.configService.get('temporal.queue'), - args: [{ event, update }], - workflowId: `telegram:event:${update.update_id}`, - }); - } catch (error) { - console.error(error); - } - }; - - protected processQuestion = async (ctx: Context) => { - try { - if (HIVEMIND_BLOCKLIST.includes(ctx.chat.id.toString())) { - console.log('Skipping question from HIVEMIND for', ctx.chat.id); - return; - } - - const community = await this.temporalClient.execute( - 'TelegramGetCommunityWorkflow', - { - taskQueue: 'TEMPORAL_QUEUE_LIGHT', - args: [{ chatId: ctx.chat.id }], - workflowId: `telegram:getcommunity:${ctx.update.update_id}`, - }, - ); - if (!community) { - console.log('No community found for', ctx.chat.id); - return; - } - - const reply = await this.temporalClient.execute( - 'AgenticHivemindTemporalWorkflow', - { - taskQueue: 'HIVEMIND_AGENT_QUEUE', - args: [ - { - community_id: community.id, - query: ctx.update.message.text, - enable_answer_skipping: true, - }, - ], - workflowId: `telegram:hivemind:${ctx.update.update_id}`, - }, - ); - - if (!reply || reply.length === 0) { - console.log('No reply from hivemind.'); - return; - } - - const other = { - reply_parameters: { - message_id: ctx.update.message.message_id, - }, - }; - - console.log('Reply from hivemind:', reply); - await ctx.reply(reply, other); - } catch (error) { - console.error(error); - } - }; - - protected getSummary = async (ctx: Context) => { - try { - const community = await this.temporalClient.execute( - 'TelegramGetCommunityWorkflow', - { - taskQueue: 'TEMPORAL_QUEUE_LIGHT', - args: [{ chatId: ctx.chat.id }], - workflowId: `telegram:getcommunity:${ctx.update.update_id}`, - }, - ); - if (!community) { - console.log('No community found for', ctx.chat.id); - return; - } - - const platform = await this.temporalClient.execute( - 'TelegramGetPlatformWorkflow', - { - taskQueue: 'TEMPORAL_QUEUE_LIGHT', - args: [{ chatId: ctx.chat.id }], - workflowId: `telegram:getplatform:${ctx.update.update_id}`, - }, - ); - if (!platform) { - console.log('No platform found for', ctx.chat.id); - return; - } - - const summary = await this.temporalClient.execute( - 'PlatformSummariesWorkflow', - { - taskQueue: 'TEMPORAL_QUEUE_PYTHON_LIGHT', - args: [ - { - platform_id: platform.id, - community_id: community.id, - start_date: null, - end_date: null, - extract_text_only: true, - }, - ], - workflowId: `telegram:summaries:${ctx.update.update_id}`, - }, - ); - - if (!summary || summary.length === 0) { - console.log('No summary found for', ctx.chat.id); - return; - } - - await ctx.reply(summary); - } catch (error) { - console.error(error); - } - }; + // protected getSummary = async (ctx: Context) => { + // try { + // const community = await this.temporalClient.execute( + // 'TelegramGetCommunityWorkflow', + // { + // taskQueue: 'TEMPORAL_QUEUE_LIGHT', + // args: [{ chatId: ctx.chat.id }], + // workflowId: `telegram:getcommunity:${ctx.update.update_id}`, + // }, + // ); + // if (!community) { + // console.log('No community found for', ctx.chat.id); + // return; + // } + + // const platform = await this.temporalClient.execute( + // 'TelegramGetPlatformWorkflow', + // { + // taskQueue: 'TEMPORAL_QUEUE_LIGHT', + // args: [{ chatId: ctx.chat.id }], + // workflowId: `telegram:getplatform:${ctx.update.update_id}`, + // }, + // ); + // if (!platform) { + // console.log('No platform found for', ctx.chat.id); + // return; + // } + + // const summary = await this.temporalClient.execute( + // 'PlatformSummariesWorkflow', + // { + // taskQueue: 'TEMPORAL_QUEUE_PYTHON_LIGHT', + // args: [ + // { + // platform_id: platform.id, + // community_id: community.id, + // start_date: null, + // end_date: null, + // extract_text_only: true, + // }, + // ], + // workflowId: `telegram:summaries:${ctx.update.update_id}`, + // }, + // ); + + // if (!summary || summary.length === 0) { + // console.log('No summary found for', ctx.chat.id); + // return; + // } + + // await ctx.reply(summary); + // } catch (error) { + // console.error(error); + // } + // }; }