Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 114 additions & 7 deletions sdk/src/lib/OmotesSDK.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Workflow } from '@omotes/proto';
import { Connection, connect } from 'amqplib';
import { Observable } from 'rxjs';
import { BehaviorSubject, Observable } from 'rxjs';
import { getChannel } from './channel';
import { Job } from './Job';
import { getProfile } from './profiles';
Expand All @@ -10,7 +10,22 @@ import { setupAvailableWorkflows } from './workflow';

export class OmotesSDK {
private _connection: Connection | null = null;
public workflows!: Observable<Workflow.AsObject[]>;
private _isConnected = false;
private _isReconnecting = false;
private _manualDisconnect = false;
private readonly workflows$ = new BehaviorSubject<Workflow.AsObject[]>([]);

public get workflows(): Observable<Workflow.AsObject[]> {
// Trigger connection attempt if not connected and not already reconnecting
if (
!this._isConnected &&
!this._isReconnecting &&
!this._manualDisconnect
) {
this.handleReconnect();
}
return this.workflows$.asObservable();
}

private get connection() {
if (!this._connection) {
Expand All @@ -19,19 +34,48 @@ export class OmotesSDK {
return this._connection as Connection;
}

constructor(private readonly options: OmotesSDKOptions) { }
constructor(private readonly options: OmotesSDKOptions) {}

public async connect() {
this._manualDisconnect = false;

this._connection = await connect({
hostname: this.options.rabbitMQUrl,
username: this.options.rabbitMQUsername,
password: this.options.rabbitMQPassword,
port: this.options.rabbitMQPort,
vhost: 'omotes',
});
const { trigger, workflows } = await setupAvailableWorkflows(this.connection, this.options.id);
this.workflows = workflows;

this._connection.on('error', (err) => {
console.error('[OmotesSDK] Connection error:', err.message);
});

this._connection.on('close', () => {
this._isConnected = false;
this.workflows$.next([]);
if (!this._manualDisconnect) {
console.warn(
'[OmotesSDK] Connection closed unexpectedly, attempting reconnect...'
);
this.handleReconnect();
}
});

const { trigger, workflows, ready } = await setupAvailableWorkflows(
this.connection,
this.options.id
);
workflows.subscribe((wf) => {
this.workflows$.next(wf);
});

// Prevents race condition where trigger is called before the channel is ready
await ready();
trigger();

this._isConnected = true;
console.log('[OmotesSDK] Connected successfully');
}

public async createJob(type: Workflow.AsObject['typeName'], esdl: string) {
Expand All @@ -42,8 +86,71 @@ export class OmotesSDK {
}

public async getProfile(
dbName: string, host: string, port: number, measurement: string, field: string
dbName: string,
host: string,
port: number,
measurement: string,
field: string
) {
return getProfile(dbName, host, port, measurement, field, this.options.influxUser, this.options.influxPassword);
return getProfile(
dbName,
host,
port,
measurement,
field,
this.options.influxUser,
this.options.influxPassword
);
}

public async disconnect() {
this._manualDisconnect = true;
this._isConnected = false;
this.workflows$.next([]);
if (this._connection) {
try {
await this._connection.close();
} catch {
// Ignore errors during intentional close
}
this._connection = null;
}
}

private async handleReconnect() {
if (this._isReconnecting) return;
this._isReconnecting = true;

const maxRetries = 10;
const baseDelay = 1000; // 1 second
const maxDelay = 30000; // 30 seconds

for (let attempt = 1; attempt <= maxRetries; attempt++) {
const delay = Math.min(baseDelay * Math.pow(2, attempt - 1), maxDelay);
console.log(
`[OmotesSDK] Reconnection attempt ${attempt}/${maxRetries} in ${delay}ms...`
);

await this.sleep(delay);

try {
await this.connect();
console.log('[OmotesSDK] Reconnected successfully');
this._isReconnecting = false;
return;
} catch (err) {
console.error(
`[OmotesSDK] Reconnection attempt ${attempt} failed:`,
(err as Error).message
);
}
}

this._isReconnecting = false;
console.error('[OmotesSDK] Max reconnection attempts reached. Giving up.');
}

private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
8 changes: 5 additions & 3 deletions sdk/src/lib/workflow.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
import { RequestAvailableWorkflows } from '@omotes/proto';
import { Connection } from 'amqplib';
import { from, map, shareReplay } from 'rxjs';
import { firstValueFrom, from, map, shareReplay } from 'rxjs';
import { getChannel } from './channel';
import { AvailableWorkflowsHandler } from './handlers/AvailableWorkflowsHandler';

export async function setupAvailableWorkflows(connection: Connection, clientId: string) {
const availableChannel$ = from(getChannel(connection, `available_workflows.${clientId}`, 'available_workflows')).pipe(
map(({ channel }) => channel)
map(({ channel }) => channel),
shareReplay(1)
);
const workflowsHandler = new AvailableWorkflowsHandler(availableChannel$, clientId);
const requestChannel = await connection.createChannel();
return {
workflows: workflowsHandler.getWorkflows().pipe(shareReplay(1)),
trigger: () => requestChannel.sendToQueue('request_available_workflows', Buffer.from(new RequestAvailableWorkflows().serializeBinary()))
trigger: () => requestChannel.sendToQueue('request_available_workflows', Buffer.from(new RequestAvailableWorkflows().serializeBinary())),
ready: () => firstValueFrom(availableChannel$)
};
}