From 9c709b1fcbb2f51aa146d8f34da9fc4d51eb53f0 Mon Sep 17 00:00:00 2001 From: Stijn Beukers-Roozen Date: Mon, 26 Jan 2026 16:10:12 +0100 Subject: [PATCH 1/2] added disconnect handling for RabbitMQ --- sdk/src/lib/OmotesSDK.ts | 70 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 3 deletions(-) diff --git a/sdk/src/lib/OmotesSDK.ts b/sdk/src/lib/OmotesSDK.ts index 7b1d670..77eb03c 100644 --- a/sdk/src/lib/OmotesSDK.ts +++ b/sdk/src/lib/OmotesSDK.ts @@ -1,6 +1,6 @@ import { Workflow } from '@omotes/proto'; import { Connection, connect } from 'amqplib'; -import { Observable } from 'rxjs'; +import { BehaviorSubject, Observable } from 'rxjs'; import { getChannel } from './channel'; import { Job } from './Job'; import { getProfile } from './profiles'; @@ -10,7 +10,13 @@ import { setupAvailableWorkflows } from './workflow'; export class OmotesSDK { private _connection: Connection | null = null; - public workflows!: Observable; + private _isReconnecting = false; + private _manualDisconnect = false; + private readonly workflows$ = new BehaviorSubject([]); + + public get workflows(): Observable { + return this.workflows$.asObservable(); + } private get connection() { if (!this._connection) { @@ -22,6 +28,8 @@ export class OmotesSDK { constructor(private readonly options: OmotesSDKOptions) { } public async connect() { + this._manualDisconnect = false; + this._connection = await connect({ hostname: this.options.rabbitMQUrl, username: this.options.rabbitMQUsername, @@ -29,8 +37,20 @@ export class OmotesSDK { port: this.options.rabbitMQPort, vhost: 'omotes', }); + + this._connection.on('error', (err) => { + console.error('[OmotesSDK] Connection error:', err.message); + }); + + this._connection.on('close', () => { + if (!this._manualDisconnect) { + console.warn('[OmotesSDK] Connection closed unexpectedly, attempting reconnect...'); + this.handleReconnect(); + } + }); + const { trigger, workflows } = await setupAvailableWorkflows(this.connection, this.options.id); - this.workflows = workflows; + workflows.subscribe((wf) => this.workflows$.next(wf)); trigger(); } @@ -46,4 +66,48 @@ export class OmotesSDK { ) { return getProfile(dbName, host, port, measurement, field, this.options.influxUser, this.options.influxPassword); } + + public async disconnect() { + this._manualDisconnect = true; + if (this._connection) { + try { + await this._connection.close(); + } catch { + // Ignore errors during intentional close + } + this._connection = null; + } + } + + private async handleReconnect() { + if (this._isReconnecting) return; + this._isReconnecting = true; + + const maxRetries = 10; + const baseDelay = 1000; // 1 second + const maxDelay = 30000; // 30 seconds + + for (let attempt = 1; attempt <= maxRetries; attempt++) { + const delay = Math.min(baseDelay * Math.pow(2, attempt - 1), maxDelay); + console.log(`[OmotesSDK] Reconnection attempt ${attempt}/${maxRetries} in ${delay}ms...`); + + await this.sleep(delay); + + try { + await this.connect(); + console.log('[OmotesSDK] Reconnected successfully'); + this._isReconnecting = false; + return; + } catch (err) { + console.error(`[OmotesSDK] Reconnection attempt ${attempt} failed:`, (err as Error).message); + } + } + + this._isReconnecting = false; + console.error('[OmotesSDK] Max reconnection attempts reached. Giving up.'); + } + + private sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } } From daaf520eb65bbcf46a11d18b4f43e769ad0dbcba Mon Sep 17 00:00:00 2001 From: Stijn Beukers-Roozen Date: Tue, 27 Jan 2026 13:29:24 +0100 Subject: [PATCH 2/2] Fixed workflow observable to emit everytime new data is available, a reconnect happens and to clear upon disconnecting from RabbitMQ --- sdk/src/lib/OmotesSDK.ts | 59 ++++++++++++++++++++++++++++++++++------ sdk/src/lib/workflow.ts | 8 ++++-- 2 files changed, 56 insertions(+), 11 deletions(-) diff --git a/sdk/src/lib/OmotesSDK.ts b/sdk/src/lib/OmotesSDK.ts index 77eb03c..80bfe16 100644 --- a/sdk/src/lib/OmotesSDK.ts +++ b/sdk/src/lib/OmotesSDK.ts @@ -10,11 +10,20 @@ import { setupAvailableWorkflows } from './workflow'; export class OmotesSDK { private _connection: Connection | null = null; + private _isConnected = false; private _isReconnecting = false; private _manualDisconnect = false; private readonly workflows$ = new BehaviorSubject([]); public get workflows(): Observable { + // Trigger connection attempt if not connected and not already reconnecting + if ( + !this._isConnected && + !this._isReconnecting && + !this._manualDisconnect + ) { + this.handleReconnect(); + } return this.workflows$.asObservable(); } @@ -25,7 +34,7 @@ export class OmotesSDK { return this._connection as Connection; } - constructor(private readonly options: OmotesSDKOptions) { } + constructor(private readonly options: OmotesSDKOptions) {} public async connect() { this._manualDisconnect = false; @@ -43,15 +52,30 @@ export class OmotesSDK { }); this._connection.on('close', () => { + this._isConnected = false; + this.workflows$.next([]); if (!this._manualDisconnect) { - console.warn('[OmotesSDK] Connection closed unexpectedly, attempting reconnect...'); + console.warn( + '[OmotesSDK] Connection closed unexpectedly, attempting reconnect...' + ); this.handleReconnect(); } }); - const { trigger, workflows } = await setupAvailableWorkflows(this.connection, this.options.id); - workflows.subscribe((wf) => this.workflows$.next(wf)); + const { trigger, workflows, ready } = await setupAvailableWorkflows( + this.connection, + this.options.id + ); + workflows.subscribe((wf) => { + this.workflows$.next(wf); + }); + + // Prevents race condition where trigger is called before the channel is ready + await ready(); trigger(); + + this._isConnected = true; + console.log('[OmotesSDK] Connected successfully'); } public async createJob(type: Workflow.AsObject['typeName'], esdl: string) { @@ -62,13 +86,27 @@ export class OmotesSDK { } public async getProfile( - dbName: string, host: string, port: number, measurement: string, field: string + dbName: string, + host: string, + port: number, + measurement: string, + field: string ) { - return getProfile(dbName, host, port, measurement, field, this.options.influxUser, this.options.influxPassword); + return getProfile( + dbName, + host, + port, + measurement, + field, + this.options.influxUser, + this.options.influxPassword + ); } public async disconnect() { this._manualDisconnect = true; + this._isConnected = false; + this.workflows$.next([]); if (this._connection) { try { await this._connection.close(); @@ -89,7 +127,9 @@ export class OmotesSDK { for (let attempt = 1; attempt <= maxRetries; attempt++) { const delay = Math.min(baseDelay * Math.pow(2, attempt - 1), maxDelay); - console.log(`[OmotesSDK] Reconnection attempt ${attempt}/${maxRetries} in ${delay}ms...`); + console.log( + `[OmotesSDK] Reconnection attempt ${attempt}/${maxRetries} in ${delay}ms...` + ); await this.sleep(delay); @@ -99,7 +139,10 @@ export class OmotesSDK { this._isReconnecting = false; return; } catch (err) { - console.error(`[OmotesSDK] Reconnection attempt ${attempt} failed:`, (err as Error).message); + console.error( + `[OmotesSDK] Reconnection attempt ${attempt} failed:`, + (err as Error).message + ); } } diff --git a/sdk/src/lib/workflow.ts b/sdk/src/lib/workflow.ts index 40b79bf..a35ec57 100644 --- a/sdk/src/lib/workflow.ts +++ b/sdk/src/lib/workflow.ts @@ -1,18 +1,20 @@ import { RequestAvailableWorkflows } from '@omotes/proto'; import { Connection } from 'amqplib'; -import { from, map, shareReplay } from 'rxjs'; +import { firstValueFrom, from, map, shareReplay } from 'rxjs'; import { getChannel } from './channel'; import { AvailableWorkflowsHandler } from './handlers/AvailableWorkflowsHandler'; export async function setupAvailableWorkflows(connection: Connection, clientId: string) { const availableChannel$ = from(getChannel(connection, `available_workflows.${clientId}`, 'available_workflows')).pipe( - map(({ channel }) => channel) + map(({ channel }) => channel), + shareReplay(1) ); const workflowsHandler = new AvailableWorkflowsHandler(availableChannel$, clientId); const requestChannel = await connection.createChannel(); return { workflows: workflowsHandler.getWorkflows().pipe(shareReplay(1)), - trigger: () => requestChannel.sendToQueue('request_available_workflows', Buffer.from(new RequestAvailableWorkflows().serializeBinary())) + trigger: () => requestChannel.sendToQueue('request_available_workflows', Buffer.from(new RequestAvailableWorkflows().serializeBinary())), + ready: () => firstValueFrom(availableChannel$) }; }