diff --git a/src/services/mediawiki/core.service.ts b/src/services/mediawiki/core.service.ts new file mode 100644 index 0000000..8a32800 --- /dev/null +++ b/src/services/mediawiki/core.service.ts @@ -0,0 +1,43 @@ +import { Types } from 'mongoose'; + +import parentLogger from '../../config/logger'; +import { ApiError } from '../../utils'; +import temporalMediaWiki from '../temporal/mediawiki.service'; + +const logger = parentLogger.child({ module: 'MediaWikiCoreService' }); + +async function createMediaWikiSchedule(platformId: Types.ObjectId): Promise { + try { + const schedule = await temporalMediaWiki.createSchedule(platformId); + logger.info(`Started schedule '${schedule.scheduleId}'`); + await schedule.trigger(); + return schedule.scheduleId; + } catch (error) { + logger.error(error, 'Failed to trigger mediawiki schedule.'); + throw new ApiError(590, 'Failed to create mediawiki schedule.'); + } +} + +async function deleteMediaWikiSchedule(scheduleId: string): Promise { + try { + await temporalMediaWiki.deleteSchedule(scheduleId); + } catch (error) { + logger.error(error, 'Failed to delete mediawiki schedule.'); + throw new ApiError(590, 'Failed to delete mediawiki schedule.'); + } +} + +async function terminateMediaWikiWorkflow(communityId: string): Promise { + try { + await temporalMediaWiki.terminateWorkflow(`api:mediawiki:${communityId}`); + } catch (error) { + logger.error(error, 'Failed to terminate mediawiki workflow.'); + throw new ApiError(590, 'Failed to terminate mediawiki workflow.'); + } +} + +export default { + createMediaWikiSchedule, + deleteMediaWikiSchedule, + terminateMediaWikiWorkflow, +}; diff --git a/src/services/mediawiki/index.ts b/src/services/mediawiki/index.ts new file mode 100644 index 0000000..2c70ec6 --- /dev/null +++ b/src/services/mediawiki/index.ts @@ -0,0 +1,5 @@ +import coreService from './core.service'; + +export default { + coreService, +}; diff --git a/src/services/module.service.ts b/src/services/module.service.ts index 151c706..041087a 100644 --- a/src/services/module.service.ts +++ b/src/services/module.service.ts @@ -4,6 +4,7 @@ import { IModule, IModuleUpdateBody, Module, PlatformNames, ModuleNames } from ' import platformService from './platform.service'; import websiteService from './website'; +import mediawikiService from './mediawiki'; /** * Create a module @@ -91,9 +92,13 @@ const updateModule = async ( const existingPlatform = module.options.platforms.find((p) => p.name === newPlatform.name); if (existingPlatform) { - // if (module.name === ModuleNames.Hivemind && newPlatform.name === PlatformNames.Website) { - // await handleHivemindWebsiteCase(newPlatform); - // } + if (module.name === ModuleNames.Hivemind) { + if (newPlatform.name === PlatformNames.Website) { + await handleHivemindWebsiteCase(newPlatform); + } else if (newPlatform.name === PlatformNames.MediaWiki) { + await handleHivemindMediaWikiCase(newPlatform); + } + } existingPlatform.metadata = newPlatform.metadata; } else { module.options.platforms.push(newPlatform); @@ -132,6 +137,32 @@ const handleHivemindWebsiteCase = async (platform: any) => { } }; +const handleHivemindMediaWikiCase = async (platform: any) => { + const platformDoc = await platformService.getPlatformById(platform.platform); + + if (!platformDoc) return; + + const isActivated = platform.metadata?.activated; + const existingScheduleId = platformDoc.get('metadata.scheduleId'); + + if (isActivated === true) { + if (!existingScheduleId) { + const scheduleId = await mediawikiService.coreService.createMediaWikiSchedule(platform.platform); + platformDoc.set('metadata.scheduleId', scheduleId); + + await platformDoc.save(); + } + } else if (isActivated === false) { + if (existingScheduleId) { + await mediawikiService.coreService.deleteMediaWikiSchedule(existingScheduleId); + await mediawikiService.coreService.terminateMediaWikiWorkflow(platformDoc.community.toString()); + platformDoc.set('metadata.scheduleId', null); + + await platformDoc.save(); + } + } +}; + /** * Delete module * @param {HydratedDocument} module - module doc diff --git a/src/services/temporal/mediawiki.service.ts b/src/services/temporal/mediawiki.service.ts new file mode 100644 index 0000000..eadc2b9 --- /dev/null +++ b/src/services/temporal/mediawiki.service.ts @@ -0,0 +1,73 @@ +import { Types } from 'mongoose'; + +import { CalendarSpec, Client, ScheduleHandle, ScheduleOverlapPolicy } from '@temporalio/client'; + +import parentLogger from '../../config/logger'; +import { queues } from './configs/temporal.config'; +import { TemporalCoreService } from './core.service'; + +const logger = parentLogger.child({ module: 'MediawikiTemporalService' }); + +class TemporalWMediaWikiService extends TemporalCoreService { + public async createSchedule(platformId: Types.ObjectId): Promise { + const initiationTime = new Date(); + const dayNumber = initiationTime.getUTCDay(); + const hour = initiationTime.getUTCHours(); + const minute = initiationTime.getUTCMinutes(); + const DAY_NAMES = ['SUNDAY', 'MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY'] as const; + const dayOfWeek = DAY_NAMES[dayNumber]; + + const calendarSpec: CalendarSpec = { + dayOfWeek, + hour, + minute, + comment: `Weekly schedule for ${dayOfWeek} at ${hour}:${minute} UTC`, + }; + + try { + const client: Client = await this.getClient(); + return client.schedule.create({ + scheduleId: `api:mediawiki:${platformId}`, + spec: { + calendars: [calendarSpec], + }, + action: { + type: 'startWorkflow', + workflowType: 'MediaWikiETLWorkflow', + args: [platformId.toString()], + taskQueue: queues.TEMPORAL_QUEUE_PYTHON_HEAVY, + }, + policies: { + catchupWindow: '1 day', + overlap: ScheduleOverlapPolicy.SKIP, + }, + }); + } catch (error) { + throw new Error(`Failed to create or update mediawiki ingestion schedule: ${(error as Error).message}`); + } + } + + public async pauseSchedule(scheduleId: string): Promise { + const client: Client = await this.getClient(); + const handle = client.schedule.getHandle(scheduleId); + await handle.pause(); + } + + public async deleteSchedule(scheduleId: string): Promise { + const client: Client = await this.getClient(); + const handle = client.schedule.getHandle(scheduleId); + + await handle.delete(); + } + + public async terminateWorkflow(workflowId: string): Promise { + const client: Client = await this.getClient(); + const handle = client.workflow.getHandle(workflowId); + const description = await handle.describe(); + if (description.status.name !== 'TERMINATED' && description.status.name !== 'COMPLETED') { + await handle.terminate('Terminated due to schedule deletion'); + } + } +} + +export default new TemporalWMediaWikiService();