Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions src/cache-tags/provider/base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import type { CacheTag, CacheTagsProvider, CacheTagsProviderErrorHandlingConfig } from '../types.js';

/**
* An abstract base class for `CacheTagsProvider` implementations that adds error handling and logging.
*/
export abstract class AbstractErrorHandlingCacheTagsProvider implements CacheTagsProvider {
protected readonly throwOnError: boolean;
protected readonly onError?: CacheTagsProviderErrorHandlingConfig['onError'];

protected constructor(
protected readonly providerName: string,
config: CacheTagsProviderErrorHandlingConfig = {},
) {
this.throwOnError = config.throwOnError ?? true;
this.onError = config.onError;
}

public abstract storeQueryCacheTags(queryId: string, cacheTags: CacheTag[]): Promise<void>;

public abstract queriesReferencingCacheTags(cacheTags: CacheTag[]): Promise<string[]>;

public abstract deleteCacheTags(cacheTags: CacheTag[]): Promise<number>;

public abstract truncateCacheTags(): Promise<number>;

protected async wrap<T>(method: keyof CacheTagsProvider, args: unknown[], fn: () => Promise<T>, fallback: T): Promise<T> {
try {
return await fn();
} catch (error) {
const provider = this.providerName;

// Call onError callback if provided, but guard against exceptions
// to prevent masking the original provider error
try {
this.onError?.(error, { provider, method, args });
} catch (handlerError) {
console.error(`Error handler itself failed in ${provider}.${method}.`, { handlerError });
}

if (this.throwOnError) {
throw error;
}
console.debug(`Error occurred in ${provider}.${method}.`, { error, args });

return fallback;
}
}
}
106 changes: 70 additions & 36 deletions src/cache-tags/provider/neon.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { neon } from '@neondatabase/serverless';
import { type CacheTag, type CacheTagsProvider } from '../types.js';
import type { CacheTag, CacheTagsProvider, CacheTagsProviderErrorHandlingConfig } from '../types.js';
import { AbstractErrorHandlingCacheTagsProvider } from './base.js';

export type NeonCacheTagsProviderConfig = {
type NeonCacheTagsProviderBaseConfig = {
/**
* Neon connection string. You can find it in the "Connection" tab of your Neon project dashboard.
* Has the format `postgresql://user:pass@host/db`
Expand All @@ -21,61 +22,94 @@ export type NeonCacheTagsProviderConfig = {
readonly table: string;
};

export type NeonCacheTagsProviderConfig = NeonCacheTagsProviderBaseConfig & CacheTagsProviderErrorHandlingConfig;

/**
* A `CacheTagsProvider` implementation that uses Neon as the storage backend.
*/
export class NeonCacheTagsProvider implements CacheTagsProvider {
export class NeonCacheTagsProvider extends AbstractErrorHandlingCacheTagsProvider implements CacheTagsProvider {
private readonly sql;
private readonly table;

constructor({ connectionUrl, table }: NeonCacheTagsProviderConfig) {
constructor({ connectionUrl, table, throwOnError, onError }: NeonCacheTagsProviderConfig) {
super('NeonCacheTagsProvider', { throwOnError, onError });
this.sql = neon(connectionUrl, { fullResults: true });
this.table = NeonCacheTagsProvider.quoteIdentifier(table);
}

public async storeQueryCacheTags(queryId: string, cacheTags: CacheTag[]) {
if (!cacheTags?.length) {
return;
}

const tags = cacheTags.flatMap((_, i) => [queryId, cacheTags[i]]);
const placeholders = cacheTags.map((_, i) => `($${2 * i + 1}, $${2 * i + 2})`).join(',');

await this.sql.query(`INSERT INTO ${this.table} VALUES ${placeholders} ON CONFLICT DO NOTHING`, tags);
return this.wrap(
'storeQueryCacheTags',
[queryId, cacheTags],
async () => {
if (!cacheTags?.length) {
return;
}

const tags = cacheTags.flatMap((_, i) => [queryId, cacheTags[i]]);
const placeholders = cacheTags.map((_, i) => `($${2 * i + 1}, $${2 * i + 2})`).join(',');

await this.sql.query(`INSERT INTO ${this.table} VALUES ${placeholders} ON CONFLICT DO NOTHING`, tags);
},
undefined,
);
}

public async queriesReferencingCacheTags(cacheTags: CacheTag[]): Promise<string[]> {
if (!cacheTags?.length) {
return [];
}

const placeholders = cacheTags.map((_, i) => `$${i + 1}`).join(',');

const { rows } = await this.sql.query(
`SELECT DISTINCT query_id FROM ${this.table} WHERE cache_tag IN (${placeholders})`,
cacheTags,
return this.wrap(
'queriesReferencingCacheTags',
[cacheTags],
async () => {
if (!cacheTags?.length) {
return [];
}

const placeholders = cacheTags.map((_, i) => `$${i + 1}`).join(',');

const { rows } = await this.sql.query(
`SELECT DISTINCT query_id FROM ${this.table} WHERE cache_tag IN (${placeholders})`,
cacheTags,
);

return rows.reduce<string[]>((queryIds, row) => {
if (typeof row.query_id === 'string') {
queryIds.push(row.query_id);
}

return queryIds;
}, []);
},
[],
);

return rows.reduce<string[]>((queryIds, row) => {
if (typeof row.query_id === 'string') {
queryIds.push(row.query_id);
}

return queryIds;
}, []);
}

public async deleteCacheTags(cacheTags: CacheTag[]) {
if (!cacheTags?.length) {
return 0;
}
const placeholders = cacheTags.map((_, i) => `$${i + 1}`).join(',');

return (await this.sql.query(`DELETE FROM ${this.table} WHERE cache_tag IN (${placeholders})`, cacheTags)).rowCount ?? 0;
return this.wrap(
'deleteCacheTags',
[cacheTags],
async () => {
if (!cacheTags?.length) {
return 0;
}
const placeholders = cacheTags.map((_, i) => `$${i + 1}`).join(',');

return (
(await this.sql.query(`DELETE FROM ${this.table} WHERE cache_tag IN (${placeholders})`, cacheTags)).rowCount ?? 0
);
},
0,
);
}

public async truncateCacheTags() {
return (await this.sql.query(`DELETE FROM ${this.table}`)).rowCount ?? 0;
return this.wrap(
'truncateCacheTags',
[],
async () => {
return (await this.sql.query(`DELETE FROM ${this.table}`)).rowCount ?? 0;
},
0,
);
}

/**
Expand Down
112 changes: 72 additions & 40 deletions src/cache-tags/provider/redis.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Redis } from 'ioredis';
import { type CacheTag, type CacheTagsProvider } from '../types.js';
import type { CacheTag, CacheTagsProvider, CacheTagsProviderErrorHandlingConfig } from '../types.js';
import { AbstractErrorHandlingCacheTagsProvider } from './base.js';

export type RedisCacheTagsProviderConfig = {
type RedisCacheTagsProviderBaseConfig = {
/**
* Redis connection string. For example, `redis://user:pass@host:port/db`.
*/
Expand All @@ -14,14 +15,17 @@ export type RedisCacheTagsProviderConfig = {
readonly keyPrefix?: string;
};

export type RedisCacheTagsProviderConfig = RedisCacheTagsProviderBaseConfig & CacheTagsProviderErrorHandlingConfig;

/**
* A `CacheTagsProvider` implementation that uses Redis as the storage backend.
*/
export class RedisCacheTagsProvider implements CacheTagsProvider {
export class RedisCacheTagsProvider extends AbstractErrorHandlingCacheTagsProvider implements CacheTagsProvider {
private readonly redis;
private readonly keyPrefix;

constructor({ connectionUrl, keyPrefix }: RedisCacheTagsProviderConfig) {
constructor({ connectionUrl, keyPrefix, throwOnError, onError }: RedisCacheTagsProviderConfig) {
super('RedisCacheTagsProvider', { throwOnError, onError });
this.redis = new Redis(connectionUrl, {
maxRetriesPerRequest: 3,
lazyConnect: true,
Expand All @@ -30,51 +34,79 @@ export class RedisCacheTagsProvider implements CacheTagsProvider {
}

public async storeQueryCacheTags(queryId: string, cacheTags: CacheTag[]) {
if (!cacheTags?.length) {
return;
}

const pipeline = this.redis.pipeline();

for (const tag of cacheTags) {
pipeline.sadd(`${this.keyPrefix}${tag}`, queryId);
}

const results = await pipeline.exec();
const error = results?.find(([err]) => err)?.[0];
if (error) {
throw error;
}
return this.wrap(
'storeQueryCacheTags',
[queryId, cacheTags],
async () => {
if (!cacheTags?.length) {
return;
}

const pipeline = this.redis.pipeline();

for (const tag of cacheTags) {
pipeline.sadd(`${this.keyPrefix}${tag}`, queryId);
}

const results = await pipeline.exec();
const error = results?.find(([err]) => err)?.[0];
if (error) {
throw error;
}
},
undefined,
);
}

public async queriesReferencingCacheTags(cacheTags: CacheTag[]) {
if (!cacheTags?.length) {
return [];
}

const keys = cacheTags.map((tag) => `${this.keyPrefix}${tag}`);

return this.redis.sunion(...keys);
return this.wrap(
'queriesReferencingCacheTags',
[cacheTags],
async () => {
if (!cacheTags?.length) {
return [];
}

const keys = cacheTags.map((tag) => `${this.keyPrefix}${tag}`);

return this.redis.sunion(...keys);
},
[],
);
}

public async deleteCacheTags(cacheTags: CacheTag[]) {
if (!cacheTags?.length) {
return 0;
}

const keys = cacheTags.map((tag) => `${this.keyPrefix}${tag}`);

return this.redis.del(...keys);
return this.wrap(
'deleteCacheTags',
[cacheTags],
async () => {
if (!cacheTags?.length) {
return 0;
}

const keys = cacheTags.map((tag) => `${this.keyPrefix}${tag}`);

return this.redis.del(...keys);
},
0,
);
}

public async truncateCacheTags() {
const keys = await this.getKeys();

if (keys.length === 0) {
return 0;
}

return await this.redis.del(...keys);
return this.wrap(
'truncateCacheTags',
[],
async () => {
const keys = await this.getKeys();

if (keys.length === 0) {
return 0;
}

return await this.redis.del(...keys);
},
0,
);
}

/**
Expand Down
17 changes: 17 additions & 0 deletions src/cache-tags/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,20 @@ export interface CacheTagsProvider {
*/
truncateCacheTags(): Promise<number>;
}

export type CacheTagsProviderErrorHandlingConfig = {
/**
* If false, errors are suppressed and a fallback value is returned.
* Default: true
*/
throwOnError?: boolean;

/**
* Optional callback invoked when an error occurs in a `CacheTagsProvider` method,
* useful for logging and telemetry.
*
* Called before the error is either thrown (when `throwOnError` is true or
* undefined) or suppressed (when `throwOnError` is false).
*/
onError?: (error: unknown, ctx: { provider: string; method: keyof CacheTagsProvider; args: unknown[] }) => void;
};