From 7a690d87ae61196b2e98b7a666a6eaf9c3fde7f1 Mon Sep 17 00:00:00 2001 From: Behzad-rabiei Date: Sun, 13 Apr 2025 12:41:32 +0200 Subject: [PATCH 1/4] feat: create mediawiki schedule on hivemind module activation --- src/services/mediawiki/core.service.ts | 43 +++++++++++++ src/services/mediawiki/index.ts | 5 ++ src/services/module.service.ts | 37 ++++++++++- src/services/temporal/mediawiki.service.ts | 73 ++++++++++++++++++++++ 4 files changed, 155 insertions(+), 3 deletions(-) create mode 100644 src/services/mediawiki/core.service.ts create mode 100644 src/services/mediawiki/index.ts create mode 100644 src/services/temporal/mediawiki.service.ts diff --git a/src/services/mediawiki/core.service.ts b/src/services/mediawiki/core.service.ts new file mode 100644 index 00000000..8a32800a --- /dev/null +++ b/src/services/mediawiki/core.service.ts @@ -0,0 +1,43 @@ +import { Types } from 'mongoose'; + +import parentLogger from '../../config/logger'; +import { ApiError } from '../../utils'; +import temporalMediaWiki from '../temporal/mediawiki.service'; + +const logger = parentLogger.child({ module: 'MediaWikiCoreService' }); + +async function createMediaWikiSchedule(platformId: Types.ObjectId): Promise { + try { + const schedule = await temporalMediaWiki.createSchedule(platformId); + logger.info(`Started schedule '${schedule.scheduleId}'`); + await schedule.trigger(); + return schedule.scheduleId; + } catch (error) { + logger.error(error, 'Failed to trigger mediawiki schedule.'); + throw new ApiError(590, 'Failed to create mediawiki schedule.'); + } +} + +async function deleteMediaWikiSchedule(scheduleId: string): Promise { + try { + await temporalMediaWiki.deleteSchedule(scheduleId); + } catch (error) { + logger.error(error, 'Failed to delete mediawiki schedule.'); + throw new ApiError(590, 'Failed to delete mediawiki schedule.'); + } +} + +async function terminateMediaWikiWorkflow(communityId: string): Promise { + try { + await temporalMediaWiki.terminateWorkflow(`api:mediawiki:${communityId}`); + } catch (error) { + logger.error(error, 'Failed to terminate mediawiki workflow.'); + throw new ApiError(590, 'Failed to terminate mediawiki workflow.'); + } +} + +export default { + createMediaWikiSchedule, + deleteMediaWikiSchedule, + terminateMediaWikiWorkflow, +}; diff --git a/src/services/mediawiki/index.ts b/src/services/mediawiki/index.ts new file mode 100644 index 00000000..2c70ec6e --- /dev/null +++ b/src/services/mediawiki/index.ts @@ -0,0 +1,5 @@ +import coreService from './core.service'; + +export default { + coreService, +}; diff --git a/src/services/module.service.ts b/src/services/module.service.ts index 151c706f..041087a7 100644 --- a/src/services/module.service.ts +++ b/src/services/module.service.ts @@ -4,6 +4,7 @@ import { IModule, IModuleUpdateBody, Module, PlatformNames, ModuleNames } from ' import platformService from './platform.service'; import websiteService from './website'; +import mediawikiService from './mediawiki'; /** * Create a module @@ -91,9 +92,13 @@ const updateModule = async ( const existingPlatform = module.options.platforms.find((p) => p.name === newPlatform.name); if (existingPlatform) { - // if (module.name === ModuleNames.Hivemind && newPlatform.name === PlatformNames.Website) { - // await handleHivemindWebsiteCase(newPlatform); - // } + if (module.name === ModuleNames.Hivemind) { + if (newPlatform.name === PlatformNames.Website) { + await handleHivemindWebsiteCase(newPlatform); + } else if (newPlatform.name === PlatformNames.MediaWiki) { + await handleHivemindMediaWikiCase(newPlatform); + } + } existingPlatform.metadata = newPlatform.metadata; } else { module.options.platforms.push(newPlatform); @@ -132,6 +137,32 @@ const handleHivemindWebsiteCase = async (platform: any) => { } }; +const handleHivemindMediaWikiCase = async (platform: any) => { + const platformDoc = await platformService.getPlatformById(platform.platform); + + if (!platformDoc) return; + + const isActivated = platform.metadata?.activated; + const existingScheduleId = platformDoc.get('metadata.scheduleId'); + + if (isActivated === true) { + if (!existingScheduleId) { + const scheduleId = await mediawikiService.coreService.createMediaWikiSchedule(platform.platform); + platformDoc.set('metadata.scheduleId', scheduleId); + + await platformDoc.save(); + } + } else if (isActivated === false) { + if (existingScheduleId) { + await mediawikiService.coreService.deleteMediaWikiSchedule(existingScheduleId); + await mediawikiService.coreService.terminateMediaWikiWorkflow(platformDoc.community.toString()); + platformDoc.set('metadata.scheduleId', null); + + await platformDoc.save(); + } + } +}; + /** * Delete module * @param {HydratedDocument} module - module doc diff --git a/src/services/temporal/mediawiki.service.ts b/src/services/temporal/mediawiki.service.ts new file mode 100644 index 00000000..eadc2b91 --- /dev/null +++ b/src/services/temporal/mediawiki.service.ts @@ -0,0 +1,73 @@ +import { Types } from 'mongoose'; + +import { CalendarSpec, Client, ScheduleHandle, ScheduleOverlapPolicy } from '@temporalio/client'; + +import parentLogger from '../../config/logger'; +import { queues } from './configs/temporal.config'; +import { TemporalCoreService } from './core.service'; + +const logger = parentLogger.child({ module: 'MediawikiTemporalService' }); + +class TemporalWMediaWikiService extends TemporalCoreService { + public async createSchedule(platformId: Types.ObjectId): Promise { + const initiationTime = new Date(); + const dayNumber = initiationTime.getUTCDay(); + const hour = initiationTime.getUTCHours(); + const minute = initiationTime.getUTCMinutes(); + const DAY_NAMES = ['SUNDAY', 'MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY'] as const; + const dayOfWeek = DAY_NAMES[dayNumber]; + + const calendarSpec: CalendarSpec = { + dayOfWeek, + hour, + minute, + comment: `Weekly schedule for ${dayOfWeek} at ${hour}:${minute} UTC`, + }; + + try { + const client: Client = await this.getClient(); + return client.schedule.create({ + scheduleId: `api:mediawiki:${platformId}`, + spec: { + calendars: [calendarSpec], + }, + action: { + type: 'startWorkflow', + workflowType: 'MediaWikiETLWorkflow', + args: [platformId.toString()], + taskQueue: queues.TEMPORAL_QUEUE_PYTHON_HEAVY, + }, + policies: { + catchupWindow: '1 day', + overlap: ScheduleOverlapPolicy.SKIP, + }, + }); + } catch (error) { + throw new Error(`Failed to create or update mediawiki ingestion schedule: ${(error as Error).message}`); + } + } + + public async pauseSchedule(scheduleId: string): Promise { + const client: Client = await this.getClient(); + const handle = client.schedule.getHandle(scheduleId); + await handle.pause(); + } + + public async deleteSchedule(scheduleId: string): Promise { + const client: Client = await this.getClient(); + const handle = client.schedule.getHandle(scheduleId); + + await handle.delete(); + } + + public async terminateWorkflow(workflowId: string): Promise { + const client: Client = await this.getClient(); + const handle = client.workflow.getHandle(workflowId); + const description = await handle.describe(); + if (description.status.name !== 'TERMINATED' && description.status.name !== 'COMPLETED') { + await handle.terminate('Terminated due to schedule deletion'); + } + } +} + +export default new TemporalWMediaWikiService(); From d54f5e1f1896a3b85e2f7f84076472df71bc429a Mon Sep 17 00:00:00 2001 From: Behzad-rabiei Date: Mon, 14 Apr 2025 19:43:45 +0200 Subject: [PATCH 2/4] chore: add migration script to add module names --- .migrate | 8 ++- package-lock.json | 8 +-- package.json | 2 +- .../db/1744651967487-add-modules.ts | 64 +++++++++++++++++++ 4 files changed, 75 insertions(+), 7 deletions(-) create mode 100644 src/migrations/db/1744651967487-add-modules.ts diff --git a/.migrate b/.migrate index ad553683..4babc25c 100644 --- a/.migrate +++ b/.migrate @@ -1,9 +1,13 @@ { - "lastRun": "1738072787797-add-activated-field-to-modules.ts", + "lastRun": "1744651967487-add-modules.ts", "migrations": [ { "title": "1738072787797-add-activated-field-to-modules.ts", - "timestamp": 1744199444528 + "timestamp": 1744652563680 + }, + { + "title": "1744651967487-add-modules.ts", + "timestamp": 1744652563722 } ] } \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index 7c3fb7a3..885005b9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,7 +14,7 @@ "@notionhq/client": "^2.2.3", "@sentry/node": "^7.50.0", "@temporalio/client": "^1.11.3", - "@togethercrew.dev/db": "^3.4.0", + "@togethercrew.dev/db": "^3.5.0", "@togethercrew.dev/tc-messagebroker": "^0.0.50", "@types/express-session": "^1.17.7", "@types/morgan": "^1.9.5", @@ -3659,9 +3659,9 @@ "dev": true }, "node_modules/@togethercrew.dev/db": { - "version": "3.4.0", - "resolved": "https://registry.npmjs.org/@togethercrew.dev/db/-/db-3.4.0.tgz", - "integrity": "sha512-7XoagWGLwuh7SsY6C7awobzF7JZ7wnsex06+iovRC+gJGNM4G5ppP/iNSnEgyFHUm9j/7iBJc5ZrzW/m5RCNhA==", + "version": "3.5.0", + "resolved": "https://registry.npmjs.org/@togethercrew.dev/db/-/db-3.5.0.tgz", + "integrity": "sha512-DfrcNd71QPZsxIqSssQJhbGuxCLKm3pPE951c/AG/bzd5qLhmPsTsgq4qTXr37eetlF5WUMS0JlnSE+CITDNuA==", "license": "ISC", "dependencies": { "discord.js": "^14.7.1", diff --git a/package.json b/package.json index 960981ff..e3a77609 100644 --- a/package.json +++ b/package.json @@ -28,7 +28,7 @@ "@notionhq/client": "^2.2.3", "@sentry/node": "^7.50.0", "@temporalio/client": "^1.11.3", - "@togethercrew.dev/db": "^3.4.0", + "@togethercrew.dev/db": "^3.5.0", "@togethercrew.dev/tc-messagebroker": "^0.0.50", "@types/express-session": "^1.17.7", "@types/morgan": "^1.9.5", diff --git a/src/migrations/db/1744651967487-add-modules.ts b/src/migrations/db/1744651967487-add-modules.ts new file mode 100644 index 00000000..793eacc8 --- /dev/null +++ b/src/migrations/db/1744651967487-add-modules.ts @@ -0,0 +1,64 @@ +import 'dotenv/config'; + +import mongoose from 'mongoose'; + +import { Community, Module, ModuleNames } from '@togethercrew.dev/db'; + +import config from '../../config'; +import logger from '../../config/logger'; + +async function connectToMongoDB() { + try { + await mongoose.connect(config.mongoose.serverURL); + logger.info('Connected to MongoDB!'); + } catch (error) { + logger.fatal('Failed to connect to MongoDB!'); + throw error; + } +} + +export const up = async () => { + await connectToMongoDB(); + + try { + const communities = await Community.find({}).exec(); + logger.info(`Found ${communities.length} communities.`); + + const newModuleNames = [ModuleNames.Announcements, ModuleNames.CommunityHealth, ModuleNames.CommunityInsights]; + + for (const community of communities) { + for (const moduleName of newModuleNames) { + const newModule = new Module({ + name: moduleName, + community: community._id, + activated: true, + options: { platforms: [] }, + }); + await newModule.save(); + logger.info(`Created module "${moduleName}" for community ${community._id}`); + } + } + } catch (error) { + logger.error('Error during migration up:', error); + throw error; + } finally { + await mongoose.connection.close(); + logger.info('MongoDB connection closed.'); + } +}; + +export const down = async () => { + await connectToMongoDB(); + + try { + const moduleNamesToDelete = [ModuleNames.Announcements, ModuleNames.CommunityHealth, ModuleNames.CommunityInsights]; + const result = await Module.deleteMany({ name: { $in: moduleNamesToDelete } }); + logger.info(`Migration down: deleted ${result.deletedCount} modules.`); + } catch (error) { + logger.error('Error during migration down:', error); + throw error; + } finally { + await mongoose.connection.close(); + logger.info('MongoDB connection closed.'); + } +}; From 5d5d3e6e45a1f7930417db16eb213b0e2a182907 Mon Sep 17 00:00:00 2001 From: Behzad-rabiei Date: Mon, 14 Apr 2025 19:49:28 +0200 Subject: [PATCH 3/4] Revert "feat: create mediawiki schedule on hivemind module activation" This reverts commit 7a690d87ae61196b2e98b7a666a6eaf9c3fde7f1. --- src/services/mediawiki/core.service.ts | 43 ------------- src/services/mediawiki/index.ts | 5 -- src/services/module.service.ts | 37 +---------- src/services/temporal/mediawiki.service.ts | 73 ---------------------- 4 files changed, 3 insertions(+), 155 deletions(-) delete mode 100644 src/services/mediawiki/core.service.ts delete mode 100644 src/services/mediawiki/index.ts delete mode 100644 src/services/temporal/mediawiki.service.ts diff --git a/src/services/mediawiki/core.service.ts b/src/services/mediawiki/core.service.ts deleted file mode 100644 index 8a32800a..00000000 --- a/src/services/mediawiki/core.service.ts +++ /dev/null @@ -1,43 +0,0 @@ -import { Types } from 'mongoose'; - -import parentLogger from '../../config/logger'; -import { ApiError } from '../../utils'; -import temporalMediaWiki from '../temporal/mediawiki.service'; - -const logger = parentLogger.child({ module: 'MediaWikiCoreService' }); - -async function createMediaWikiSchedule(platformId: Types.ObjectId): Promise { - try { - const schedule = await temporalMediaWiki.createSchedule(platformId); - logger.info(`Started schedule '${schedule.scheduleId}'`); - await schedule.trigger(); - return schedule.scheduleId; - } catch (error) { - logger.error(error, 'Failed to trigger mediawiki schedule.'); - throw new ApiError(590, 'Failed to create mediawiki schedule.'); - } -} - -async function deleteMediaWikiSchedule(scheduleId: string): Promise { - try { - await temporalMediaWiki.deleteSchedule(scheduleId); - } catch (error) { - logger.error(error, 'Failed to delete mediawiki schedule.'); - throw new ApiError(590, 'Failed to delete mediawiki schedule.'); - } -} - -async function terminateMediaWikiWorkflow(communityId: string): Promise { - try { - await temporalMediaWiki.terminateWorkflow(`api:mediawiki:${communityId}`); - } catch (error) { - logger.error(error, 'Failed to terminate mediawiki workflow.'); - throw new ApiError(590, 'Failed to terminate mediawiki workflow.'); - } -} - -export default { - createMediaWikiSchedule, - deleteMediaWikiSchedule, - terminateMediaWikiWorkflow, -}; diff --git a/src/services/mediawiki/index.ts b/src/services/mediawiki/index.ts deleted file mode 100644 index 2c70ec6e..00000000 --- a/src/services/mediawiki/index.ts +++ /dev/null @@ -1,5 +0,0 @@ -import coreService from './core.service'; - -export default { - coreService, -}; diff --git a/src/services/module.service.ts b/src/services/module.service.ts index 041087a7..151c706f 100644 --- a/src/services/module.service.ts +++ b/src/services/module.service.ts @@ -4,7 +4,6 @@ import { IModule, IModuleUpdateBody, Module, PlatformNames, ModuleNames } from ' import platformService from './platform.service'; import websiteService from './website'; -import mediawikiService from './mediawiki'; /** * Create a module @@ -92,13 +91,9 @@ const updateModule = async ( const existingPlatform = module.options.platforms.find((p) => p.name === newPlatform.name); if (existingPlatform) { - if (module.name === ModuleNames.Hivemind) { - if (newPlatform.name === PlatformNames.Website) { - await handleHivemindWebsiteCase(newPlatform); - } else if (newPlatform.name === PlatformNames.MediaWiki) { - await handleHivemindMediaWikiCase(newPlatform); - } - } + // if (module.name === ModuleNames.Hivemind && newPlatform.name === PlatformNames.Website) { + // await handleHivemindWebsiteCase(newPlatform); + // } existingPlatform.metadata = newPlatform.metadata; } else { module.options.platforms.push(newPlatform); @@ -137,32 +132,6 @@ const handleHivemindWebsiteCase = async (platform: any) => { } }; -const handleHivemindMediaWikiCase = async (platform: any) => { - const platformDoc = await platformService.getPlatformById(platform.platform); - - if (!platformDoc) return; - - const isActivated = platform.metadata?.activated; - const existingScheduleId = platformDoc.get('metadata.scheduleId'); - - if (isActivated === true) { - if (!existingScheduleId) { - const scheduleId = await mediawikiService.coreService.createMediaWikiSchedule(platform.platform); - platformDoc.set('metadata.scheduleId', scheduleId); - - await platformDoc.save(); - } - } else if (isActivated === false) { - if (existingScheduleId) { - await mediawikiService.coreService.deleteMediaWikiSchedule(existingScheduleId); - await mediawikiService.coreService.terminateMediaWikiWorkflow(platformDoc.community.toString()); - platformDoc.set('metadata.scheduleId', null); - - await platformDoc.save(); - } - } -}; - /** * Delete module * @param {HydratedDocument} module - module doc diff --git a/src/services/temporal/mediawiki.service.ts b/src/services/temporal/mediawiki.service.ts deleted file mode 100644 index eadc2b91..00000000 --- a/src/services/temporal/mediawiki.service.ts +++ /dev/null @@ -1,73 +0,0 @@ -import { Types } from 'mongoose'; - -import { CalendarSpec, Client, ScheduleHandle, ScheduleOverlapPolicy } from '@temporalio/client'; - -import parentLogger from '../../config/logger'; -import { queues } from './configs/temporal.config'; -import { TemporalCoreService } from './core.service'; - -const logger = parentLogger.child({ module: 'MediawikiTemporalService' }); - -class TemporalWMediaWikiService extends TemporalCoreService { - public async createSchedule(platformId: Types.ObjectId): Promise { - const initiationTime = new Date(); - const dayNumber = initiationTime.getUTCDay(); - const hour = initiationTime.getUTCHours(); - const minute = initiationTime.getUTCMinutes(); - const DAY_NAMES = ['SUNDAY', 'MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY'] as const; - const dayOfWeek = DAY_NAMES[dayNumber]; - - const calendarSpec: CalendarSpec = { - dayOfWeek, - hour, - minute, - comment: `Weekly schedule for ${dayOfWeek} at ${hour}:${minute} UTC`, - }; - - try { - const client: Client = await this.getClient(); - return client.schedule.create({ - scheduleId: `api:mediawiki:${platformId}`, - spec: { - calendars: [calendarSpec], - }, - action: { - type: 'startWorkflow', - workflowType: 'MediaWikiETLWorkflow', - args: [platformId.toString()], - taskQueue: queues.TEMPORAL_QUEUE_PYTHON_HEAVY, - }, - policies: { - catchupWindow: '1 day', - overlap: ScheduleOverlapPolicy.SKIP, - }, - }); - } catch (error) { - throw new Error(`Failed to create or update mediawiki ingestion schedule: ${(error as Error).message}`); - } - } - - public async pauseSchedule(scheduleId: string): Promise { - const client: Client = await this.getClient(); - const handle = client.schedule.getHandle(scheduleId); - await handle.pause(); - } - - public async deleteSchedule(scheduleId: string): Promise { - const client: Client = await this.getClient(); - const handle = client.schedule.getHandle(scheduleId); - - await handle.delete(); - } - - public async terminateWorkflow(workflowId: string): Promise { - const client: Client = await this.getClient(); - const handle = client.workflow.getHandle(workflowId); - const description = await handle.describe(); - if (description.status.name !== 'TERMINATED' && description.status.name !== 'COMPLETED') { - await handle.terminate('Terminated due to schedule deletion'); - } - } -} - -export default new TemporalWMediaWikiService(); From 981890d80a04040fceb5a195b1fa33a8b0c2138c Mon Sep 17 00:00:00 2001 From: Behzad-rabiei Date: Mon, 14 Apr 2025 19:50:57 +0200 Subject: [PATCH 4/4] chore: remove last migration from migration list --- .migrate | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/.migrate b/.migrate index 4babc25c..78be70fb 100644 --- a/.migrate +++ b/.migrate @@ -1,13 +1,9 @@ { "lastRun": "1744651967487-add-modules.ts", "migrations": [ - { - "title": "1738072787797-add-activated-field-to-modules.ts", - "timestamp": 1744652563680 - }, { "title": "1744651967487-add-modules.ts", "timestamp": 1744652563722 } ] -} \ No newline at end of file +}