diff --git a/src/cache-tags/provider/base.ts b/src/cache-tags/provider/base.ts new file mode 100644 index 0000000..7a7c191 --- /dev/null +++ b/src/cache-tags/provider/base.ts @@ -0,0 +1,48 @@ +import type { CacheTag, CacheTagsProvider, CacheTagsProviderErrorHandlingConfig } from '../types.js'; + +/** + * An abstract base class for `CacheTagsProvider` implementations that adds error handling and logging. + */ +export abstract class AbstractErrorHandlingCacheTagsProvider implements CacheTagsProvider { + protected readonly throwOnError: boolean; + protected readonly onError?: CacheTagsProviderErrorHandlingConfig['onError']; + + protected constructor( + protected readonly providerName: string, + config: CacheTagsProviderErrorHandlingConfig = {}, + ) { + this.throwOnError = config.throwOnError ?? true; + this.onError = config.onError; + } + + public abstract storeQueryCacheTags(queryId: string, cacheTags: CacheTag[]): Promise; + + public abstract queriesReferencingCacheTags(cacheTags: CacheTag[]): Promise; + + public abstract deleteCacheTags(cacheTags: CacheTag[]): Promise; + + public abstract truncateCacheTags(): Promise; + + protected async wrap(method: keyof CacheTagsProvider, args: unknown[], fn: () => Promise, fallback: T): Promise { + try { + return await fn(); + } catch (error) { + const provider = this.providerName; + + // Call onError callback if provided, but guard against exceptions + // to prevent masking the original provider error + try { + this.onError?.(error, { provider, method, args }); + } catch (handlerError) { + console.error(`Error handler itself failed in ${provider}.${method}.`, { handlerError }); + } + + if (this.throwOnError) { + throw error; + } + console.debug(`Error occurred in ${provider}.${method}.`, { error, args }); + + return fallback; + } + } +} diff --git a/src/cache-tags/provider/neon.ts b/src/cache-tags/provider/neon.ts index f02ca3e..f49e5e6 100644 --- a/src/cache-tags/provider/neon.ts +++ b/src/cache-tags/provider/neon.ts @@ -1,7 +1,8 @@ import { neon } from '@neondatabase/serverless'; -import { type CacheTag, type CacheTagsProvider } from '../types.js'; +import type { CacheTag, CacheTagsProvider, CacheTagsProviderErrorHandlingConfig } from '../types.js'; +import { AbstractErrorHandlingCacheTagsProvider } from './base.js'; -export type NeonCacheTagsProviderConfig = { +type NeonCacheTagsProviderBaseConfig = { /** * Neon connection string. You can find it in the "Connection" tab of your Neon project dashboard. * Has the format `postgresql://user:pass@host/db` @@ -21,61 +22,94 @@ export type NeonCacheTagsProviderConfig = { readonly table: string; }; +export type NeonCacheTagsProviderConfig = NeonCacheTagsProviderBaseConfig & CacheTagsProviderErrorHandlingConfig; + /** * A `CacheTagsProvider` implementation that uses Neon as the storage backend. */ -export class NeonCacheTagsProvider implements CacheTagsProvider { +export class NeonCacheTagsProvider extends AbstractErrorHandlingCacheTagsProvider implements CacheTagsProvider { private readonly sql; private readonly table; - constructor({ connectionUrl, table }: NeonCacheTagsProviderConfig) { + constructor({ connectionUrl, table, throwOnError, onError }: NeonCacheTagsProviderConfig) { + super('NeonCacheTagsProvider', { throwOnError, onError }); this.sql = neon(connectionUrl, { fullResults: true }); this.table = NeonCacheTagsProvider.quoteIdentifier(table); } public async storeQueryCacheTags(queryId: string, cacheTags: CacheTag[]) { - if (!cacheTags?.length) { - return; - } - - const tags = cacheTags.flatMap((_, i) => [queryId, cacheTags[i]]); - const placeholders = cacheTags.map((_, i) => `($${2 * i + 1}, $${2 * i + 2})`).join(','); - - await this.sql.query(`INSERT INTO ${this.table} VALUES ${placeholders} ON CONFLICT DO NOTHING`, tags); + return this.wrap( + 'storeQueryCacheTags', + [queryId, cacheTags], + async () => { + if (!cacheTags?.length) { + return; + } + + const tags = cacheTags.flatMap((_, i) => [queryId, cacheTags[i]]); + const placeholders = cacheTags.map((_, i) => `($${2 * i + 1}, $${2 * i + 2})`).join(','); + + await this.sql.query(`INSERT INTO ${this.table} VALUES ${placeholders} ON CONFLICT DO NOTHING`, tags); + }, + undefined, + ); } public async queriesReferencingCacheTags(cacheTags: CacheTag[]): Promise { - if (!cacheTags?.length) { - return []; - } - - const placeholders = cacheTags.map((_, i) => `$${i + 1}`).join(','); - - const { rows } = await this.sql.query( - `SELECT DISTINCT query_id FROM ${this.table} WHERE cache_tag IN (${placeholders})`, - cacheTags, + return this.wrap( + 'queriesReferencingCacheTags', + [cacheTags], + async () => { + if (!cacheTags?.length) { + return []; + } + + const placeholders = cacheTags.map((_, i) => `$${i + 1}`).join(','); + + const { rows } = await this.sql.query( + `SELECT DISTINCT query_id FROM ${this.table} WHERE cache_tag IN (${placeholders})`, + cacheTags, + ); + + return rows.reduce((queryIds, row) => { + if (typeof row.query_id === 'string') { + queryIds.push(row.query_id); + } + + return queryIds; + }, []); + }, + [], ); - - return rows.reduce((queryIds, row) => { - if (typeof row.query_id === 'string') { - queryIds.push(row.query_id); - } - - return queryIds; - }, []); } public async deleteCacheTags(cacheTags: CacheTag[]) { - if (!cacheTags?.length) { - return 0; - } - const placeholders = cacheTags.map((_, i) => `$${i + 1}`).join(','); - - return (await this.sql.query(`DELETE FROM ${this.table} WHERE cache_tag IN (${placeholders})`, cacheTags)).rowCount ?? 0; + return this.wrap( + 'deleteCacheTags', + [cacheTags], + async () => { + if (!cacheTags?.length) { + return 0; + } + const placeholders = cacheTags.map((_, i) => `$${i + 1}`).join(','); + + return ( + (await this.sql.query(`DELETE FROM ${this.table} WHERE cache_tag IN (${placeholders})`, cacheTags)).rowCount ?? 0 + ); + }, + 0, + ); } public async truncateCacheTags() { - return (await this.sql.query(`DELETE FROM ${this.table}`)).rowCount ?? 0; + return this.wrap( + 'truncateCacheTags', + [], + async () => { + return (await this.sql.query(`DELETE FROM ${this.table}`)).rowCount ?? 0; + }, + 0, + ); } /** diff --git a/src/cache-tags/provider/redis.ts b/src/cache-tags/provider/redis.ts index a112401..78e2e79 100644 --- a/src/cache-tags/provider/redis.ts +++ b/src/cache-tags/provider/redis.ts @@ -1,7 +1,8 @@ import { Redis } from 'ioredis'; -import { type CacheTag, type CacheTagsProvider } from '../types.js'; +import type { CacheTag, CacheTagsProvider, CacheTagsProviderErrorHandlingConfig } from '../types.js'; +import { AbstractErrorHandlingCacheTagsProvider } from './base.js'; -export type RedisCacheTagsProviderConfig = { +type RedisCacheTagsProviderBaseConfig = { /** * Redis connection string. For example, `redis://user:pass@host:port/db`. */ @@ -14,14 +15,17 @@ export type RedisCacheTagsProviderConfig = { readonly keyPrefix?: string; }; +export type RedisCacheTagsProviderConfig = RedisCacheTagsProviderBaseConfig & CacheTagsProviderErrorHandlingConfig; + /** * A `CacheTagsProvider` implementation that uses Redis as the storage backend. */ -export class RedisCacheTagsProvider implements CacheTagsProvider { +export class RedisCacheTagsProvider extends AbstractErrorHandlingCacheTagsProvider implements CacheTagsProvider { private readonly redis; private readonly keyPrefix; - constructor({ connectionUrl, keyPrefix }: RedisCacheTagsProviderConfig) { + constructor({ connectionUrl, keyPrefix, throwOnError, onError }: RedisCacheTagsProviderConfig) { + super('RedisCacheTagsProvider', { throwOnError, onError }); this.redis = new Redis(connectionUrl, { maxRetriesPerRequest: 3, lazyConnect: true, @@ -30,51 +34,79 @@ export class RedisCacheTagsProvider implements CacheTagsProvider { } public async storeQueryCacheTags(queryId: string, cacheTags: CacheTag[]) { - if (!cacheTags?.length) { - return; - } - - const pipeline = this.redis.pipeline(); - - for (const tag of cacheTags) { - pipeline.sadd(`${this.keyPrefix}${tag}`, queryId); - } - - const results = await pipeline.exec(); - const error = results?.find(([err]) => err)?.[0]; - if (error) { - throw error; - } + return this.wrap( + 'storeQueryCacheTags', + [queryId, cacheTags], + async () => { + if (!cacheTags?.length) { + return; + } + + const pipeline = this.redis.pipeline(); + + for (const tag of cacheTags) { + pipeline.sadd(`${this.keyPrefix}${tag}`, queryId); + } + + const results = await pipeline.exec(); + const error = results?.find(([err]) => err)?.[0]; + if (error) { + throw error; + } + }, + undefined, + ); } public async queriesReferencingCacheTags(cacheTags: CacheTag[]) { - if (!cacheTags?.length) { - return []; - } - - const keys = cacheTags.map((tag) => `${this.keyPrefix}${tag}`); - - return this.redis.sunion(...keys); + return this.wrap( + 'queriesReferencingCacheTags', + [cacheTags], + async () => { + if (!cacheTags?.length) { + return []; + } + + const keys = cacheTags.map((tag) => `${this.keyPrefix}${tag}`); + + return this.redis.sunion(...keys); + }, + [], + ); } public async deleteCacheTags(cacheTags: CacheTag[]) { - if (!cacheTags?.length) { - return 0; - } - - const keys = cacheTags.map((tag) => `${this.keyPrefix}${tag}`); - - return this.redis.del(...keys); + return this.wrap( + 'deleteCacheTags', + [cacheTags], + async () => { + if (!cacheTags?.length) { + return 0; + } + + const keys = cacheTags.map((tag) => `${this.keyPrefix}${tag}`); + + return this.redis.del(...keys); + }, + 0, + ); } public async truncateCacheTags() { - const keys = await this.getKeys(); - - if (keys.length === 0) { - return 0; - } - - return await this.redis.del(...keys); + return this.wrap( + 'truncateCacheTags', + [], + async () => { + const keys = await this.getKeys(); + + if (keys.length === 0) { + return 0; + } + + return await this.redis.del(...keys); + }, + 0, + ); } /** diff --git a/src/cache-tags/types.ts b/src/cache-tags/types.ts index 8e74898..b874c7b 100644 --- a/src/cache-tags/types.ts +++ b/src/cache-tags/types.ts @@ -64,3 +64,20 @@ export interface CacheTagsProvider { */ truncateCacheTags(): Promise; } + +export type CacheTagsProviderErrorHandlingConfig = { + /** + * If false, errors are suppressed and a fallback value is returned. + * Default: true + */ + throwOnError?: boolean; + + /** + * Optional callback invoked when an error occurs in a `CacheTagsProvider` method, + * useful for logging and telemetry. + * + * Called before the error is either thrown (when `throwOnError` is true or + * undefined) or suppressed (when `throwOnError` is false). + */ + onError?: (error: unknown, ctx: { provider: string; method: keyof CacheTagsProvider; args: unknown[] }) => void; +};