diff --git a/sdk/src/lib/OmotesSDK.ts b/sdk/src/lib/OmotesSDK.ts index 7b1d670..80bfe16 100644 --- a/sdk/src/lib/OmotesSDK.ts +++ b/sdk/src/lib/OmotesSDK.ts @@ -1,6 +1,6 @@ import { Workflow } from '@omotes/proto'; import { Connection, connect } from 'amqplib'; -import { Observable } from 'rxjs'; +import { BehaviorSubject, Observable } from 'rxjs'; import { getChannel } from './channel'; import { Job } from './Job'; import { getProfile } from './profiles'; @@ -10,7 +10,22 @@ import { setupAvailableWorkflows } from './workflow'; export class OmotesSDK { private _connection: Connection | null = null; - public workflows!: Observable; + private _isConnected = false; + private _isReconnecting = false; + private _manualDisconnect = false; + private readonly workflows$ = new BehaviorSubject([]); + + public get workflows(): Observable { + // Trigger connection attempt if not connected and not already reconnecting + if ( + !this._isConnected && + !this._isReconnecting && + !this._manualDisconnect + ) { + this.handleReconnect(); + } + return this.workflows$.asObservable(); + } private get connection() { if (!this._connection) { @@ -19,9 +34,11 @@ export class OmotesSDK { return this._connection as Connection; } - constructor(private readonly options: OmotesSDKOptions) { } + constructor(private readonly options: OmotesSDKOptions) {} public async connect() { + this._manualDisconnect = false; + this._connection = await connect({ hostname: this.options.rabbitMQUrl, username: this.options.rabbitMQUsername, @@ -29,9 +46,36 @@ export class OmotesSDK { port: this.options.rabbitMQPort, vhost: 'omotes', }); - const { trigger, workflows } = await setupAvailableWorkflows(this.connection, this.options.id); - this.workflows = workflows; + + this._connection.on('error', (err) => { + console.error('[OmotesSDK] Connection error:', err.message); + }); + + this._connection.on('close', () => { + this._isConnected = false; + this.workflows$.next([]); + if (!this._manualDisconnect) { + console.warn( + '[OmotesSDK] Connection closed unexpectedly, attempting reconnect...' + ); + this.handleReconnect(); + } + }); + + const { trigger, workflows, ready } = await setupAvailableWorkflows( + this.connection, + this.options.id + ); + workflows.subscribe((wf) => { + this.workflows$.next(wf); + }); + + // Prevents race condition where trigger is called before the channel is ready + await ready(); trigger(); + + this._isConnected = true; + console.log('[OmotesSDK] Connected successfully'); } public async createJob(type: Workflow.AsObject['typeName'], esdl: string) { @@ -42,8 +86,71 @@ export class OmotesSDK { } public async getProfile( - dbName: string, host: string, port: number, measurement: string, field: string + dbName: string, + host: string, + port: number, + measurement: string, + field: string ) { - return getProfile(dbName, host, port, measurement, field, this.options.influxUser, this.options.influxPassword); + return getProfile( + dbName, + host, + port, + measurement, + field, + this.options.influxUser, + this.options.influxPassword + ); + } + + public async disconnect() { + this._manualDisconnect = true; + this._isConnected = false; + this.workflows$.next([]); + if (this._connection) { + try { + await this._connection.close(); + } catch { + // Ignore errors during intentional close + } + this._connection = null; + } + } + + private async handleReconnect() { + if (this._isReconnecting) return; + this._isReconnecting = true; + + const maxRetries = 10; + const baseDelay = 1000; // 1 second + const maxDelay = 30000; // 30 seconds + + for (let attempt = 1; attempt <= maxRetries; attempt++) { + const delay = Math.min(baseDelay * Math.pow(2, attempt - 1), maxDelay); + console.log( + `[OmotesSDK] Reconnection attempt ${attempt}/${maxRetries} in ${delay}ms...` + ); + + await this.sleep(delay); + + try { + await this.connect(); + console.log('[OmotesSDK] Reconnected successfully'); + this._isReconnecting = false; + return; + } catch (err) { + console.error( + `[OmotesSDK] Reconnection attempt ${attempt} failed:`, + (err as Error).message + ); + } + } + + this._isReconnecting = false; + console.error('[OmotesSDK] Max reconnection attempts reached. Giving up.'); + } + + private sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); } } diff --git a/sdk/src/lib/workflow.ts b/sdk/src/lib/workflow.ts index 40b79bf..a35ec57 100644 --- a/sdk/src/lib/workflow.ts +++ b/sdk/src/lib/workflow.ts @@ -1,18 +1,20 @@ import { RequestAvailableWorkflows } from '@omotes/proto'; import { Connection } from 'amqplib'; -import { from, map, shareReplay } from 'rxjs'; +import { firstValueFrom, from, map, shareReplay } from 'rxjs'; import { getChannel } from './channel'; import { AvailableWorkflowsHandler } from './handlers/AvailableWorkflowsHandler'; export async function setupAvailableWorkflows(connection: Connection, clientId: string) { const availableChannel$ = from(getChannel(connection, `available_workflows.${clientId}`, 'available_workflows')).pipe( - map(({ channel }) => channel) + map(({ channel }) => channel), + shareReplay(1) ); const workflowsHandler = new AvailableWorkflowsHandler(availableChannel$, clientId); const requestChannel = await connection.createChannel(); return { workflows: workflowsHandler.getWorkflows().pipe(shareReplay(1)), - trigger: () => requestChannel.sendToQueue('request_available_workflows', Buffer.from(new RequestAvailableWorkflows().serializeBinary())) + trigger: () => requestChannel.sendToQueue('request_available_workflows', Buffer.from(new RequestAvailableWorkflows().serializeBinary())), + ready: () => firstValueFrom(availableChannel$) }; }