Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 61 additions & 50 deletions src/flare.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,9 @@ import {
FlareMiddleware,
} from './types';

type HandlerOptions<E, K extends keyof E> = {
handler: FlareHandler<E[K]>;
options: FlareCatchOptions;
};

export class Flare<E extends Record<string, any>> {
private handlerOptions: {
[K in keyof E]?: Set<HandlerOptions<E, K>>;
private handlerOptionsStore: {
[K in keyof E]?: Set<HandlerOptionsPair<E, K>>;
} = {};

private interceptors: FlareInterceptor<E>[] = [];
Expand All @@ -41,32 +36,10 @@ export class Flare<E extends Record<string, any>> {
return Promise.resolve(`Event "${String(event)}" was stopped by middleware.`);
}

const handlers = this.handlerOptions[event];
if (!handlers) return Promise.resolve(`No handlers found for event "${String(event)}".`);
const handlerOptionsSet = this.handlerOptionsStore[event];
if (!handlerOptionsSet) return Promise.resolve(`No handlers found for event "${String(event)}".`);

const { strategy = FlareFireStrategy.Parallel, timeout, haltOnError = false } = options;

if (strategy === FlareFireStrategy.Parallel) {
await Promise.allSettled(
Array.from(handlers).map(async (handler) => {
try {
await this.execute(event, handler, newPayload, timeout);
} catch (err) {
// TODO: handle or log exception
if (haltOnError) throw err;
}
})
);
} else {
for (const handler of handlers) {
try {
await this.execute(event, handler, newPayload, timeout);
} catch (err) {
// TODO: handle or log exception
if (haltOnError) break;
}
}
}
await this.handleExecute(event, newPayload, options, handlerOptionsSet);

this.handleAfterInterceptors(event, newPayload);
}
Expand All @@ -76,12 +49,12 @@ export class Flare<E extends Record<string, any>> {
handler: FlareHandler<E[K]>,
options: FlareCatchOptions = {},
): () => void {
if (!this.handlerOptions[event]) {
this.handlerOptions[event] = new Set();
if (!this.handlerOptionsStore[event]) {
this.handlerOptionsStore[event] = new Set();
}

this.handlerOptions[event].add({
handler: handler,
this.handlerOptionsStore[event].add({
handler,
options
})

Expand All @@ -92,27 +65,26 @@ export class Flare<E extends Record<string, any>> {
event: K,
handler: FlareHandler<E[K]>,
): void {
const handlers = this.handlerOptions[event];
if (!handlers) return;
const handlerOptionsSet = this.handlerOptionsStore[event];
if (!handlerOptionsSet) return;

for (const h of handlers) {
if (h.handler === handler) {
handlers.delete(h);
for (const handlerOptions of handlerOptionsSet) {
if (handlerOptions.handler === handler) {
handlerOptionsSet.delete(handlerOptions);
break;
}
}
}

releaseAll(): void {
for (const event in this.handlerOptions) {
if (!Object.prototype.hasOwnProperty.call(this.handlerOptions, event)) continue;
const handlers = this.handlerOptions[event];
handlers?.clear();
for (const event in this.handlerOptionsStore) {
if (!Object.prototype.hasOwnProperty.call(this.handlerOptionsStore, event)) continue;
this.handlerOptionsStore[event]?.clear();
}
this.handlerOptions = {};
this.handlerOptionsStore = {};
}

// ==================== internals ====================
// ==================== handle methods ====================

private handleBeforeInterceptors<K extends keyof E>(event: K, payload: E[K]) {
for (const interceptor of this.interceptors) {
Expand Down Expand Up @@ -162,19 +134,53 @@ export class Flare<E extends Record<string, any>> {
};
}

private async handleExecute<K extends keyof E>(
event: K,
newPayload: E[K],
fireOptions: FlareFireOptions,
handlerOptionsSet: Set<HandlerOptionsPair<E, K>>
) {
const { strategy = FlareFireStrategy.Parallel, timeout, haltOnError = false } = fireOptions;

if (strategy === FlareFireStrategy.Parallel) {
await Promise.allSettled(
Array.from(handlerOptionsSet).map(async (handlerOptions) => {
try {
await this.execute(event, newPayload, timeout, handlerOptions);
} catch (err) {
// TODO: handle or log exception
if (haltOnError) throw err;
}
})
);
} else {
for (const handlerOptions of handlerOptionsSet) {
try {
await this.execute(event, newPayload, timeout, handlerOptions);
} catch (err) {
// TODO: handle or log exception
if (haltOnError) break;
}
}
}
}

// ==================== internal methods ====================

private async execute<K extends keyof E>(
event: K,
handlerOptions: HandlerOptions<E, K>,
payload: E[K],
timeout?: number): Promise<void> {
timeout: number | undefined,
handlerOptions: HandlerOptionsPair<E, K>
): Promise<void> {
try {
if (!timeout) return this.call(handlerOptions.handler, payload);

const timeoutPromise = new Promise<void>((_, reject) => setTimeout(() => reject(new Error('FlareHandler timeout')), timeout));
return Promise.race([this.call(handlerOptions.handler, payload), timeoutPromise]);
} finally {
if (handlerOptions.options.once) {
this.handlerOptions[event]?.delete(handlerOptions);
this.handlerOptionsStore[event]?.delete(handlerOptions);
}
}
};
Expand All @@ -193,3 +199,8 @@ export class Flare<E extends Record<string, any>> {
}
}
}

interface HandlerOptionsPair<E, K extends keyof E> {
handler: FlareHandler<E[K]>;
options: FlareCatchOptions;
};