Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 125 additions & 29 deletions src/lib/queue/drivers/queue_postgres.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export default class KeetaAnchorQueueStorageDriverPostgres<QueueRequest extends
private readonly logger: Logger | undefined;
private poolInternal: (() => Promise<pg.Pool>) | null = null;
private dbInitializationPromise: Promise<boolean> | null = null;
private serializationRetryCount = 0;

readonly name = 'KeetaAnchorQueueStorageDriverPostgres';
readonly id: string;
Expand Down Expand Up @@ -77,34 +78,114 @@ export default class KeetaAnchorQueueStorageDriverPostgres<QueueRequest extends

const client = await pool.connect();
try {
await client.query(`
CREATE TABLE IF NOT EXISTS queue_entries (
id TEXT NOT NULL,
path TEXT NOT NULL,
request TEXT NOT NULL,
output TEXT,
last_error TEXT,
status TEXT NOT NULL,
created BIGINT NOT NULL,
updated BIGINT NOT NULL,
worker BIGINT,
failures INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (id, path)
)`);

await client.query(`
CREATE TABLE IF NOT EXISTS queue_idempotent_keys (
entry_id TEXT NOT NULL,
idempotent_id TEXT NOT NULL,
path TEXT NOT NULL,
UNIQUE (idempotent_id, path),
PRIMARY KEY (entry_id, idempotent_id, path),
FOREIGN KEY (entry_id, path) REFERENCES queue_entries(id, path)
)`);

await client.query('CREATE INDEX IF NOT EXISTS idx_queue_entries_status ON queue_entries(status)');
await client.query('CREATE INDEX IF NOT EXISTS idx_queue_entries_updated ON queue_entries(updated)');
await client.query('CREATE INDEX IF NOT EXISTS idx_queue_idempotent_keys_idempotent_id ON queue_idempotent_keys(idempotent_id)');
// Use advisory lock to ensure only one process migrates at a time
// Lock ID: hash of 'queue_schema_migration'
const lockId = 0x71756575; // 'queu' in hex
logger?.debug('Acquiring advisory lock for schema migration');
await client.query('SELECT pg_advisory_lock($1)', [lockId]);

try {
// Create schema version table if it doesn't exist
await client.query(`
CREATE TABLE IF NOT EXISTS queue_schema_version (
version INTEGER NOT NULL,
applied_at BIGINT NOT NULL,
PRIMARY KEY (version)
)`);

// Check current schema version
const versionResult = await client.query<{ version: number }>('SELECT MAX(version) as version FROM queue_schema_version');
const currentVersion = versionResult.rows[0]?.version ?? 0;

logger?.debug(`Current queue schema version: ${currentVersion}`);

// Version 1: Initial schema
if (currentVersion < 1) {
logger?.debug('Applying schema version 1: Initial tables and indexes');

await client.query('BEGIN');
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using BEGIN without specifying isolation level may cause confusion since other transactions in the codebase explicitly use BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE. For consistency and clarity, consider explicitly specifying the isolation level even for DDL operations (e.g., BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED).

Suggested change
await client.query('BEGIN');
await client.query('BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED');

Copilot uses AI. Check for mistakes.
try {
await client.query(`
CREATE TABLE IF NOT EXISTS queue_entries (
id TEXT NOT NULL,
path TEXT NOT NULL,
request TEXT NOT NULL,
output TEXT,
last_error TEXT,
status TEXT NOT NULL,
created BIGINT NOT NULL,
updated BIGINT NOT NULL,
worker BIGINT,
failures INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (id, path)
)`);

await client.query(`
CREATE TABLE IF NOT EXISTS queue_idempotent_keys (
entry_id TEXT NOT NULL,
idempotent_id TEXT NOT NULL,
path TEXT NOT NULL,
UNIQUE (idempotent_id, path),
PRIMARY KEY (entry_id, idempotent_id, path),
FOREIGN KEY (entry_id, path) REFERENCES queue_entries(id, path)
)`);

// Old single-column indexes (for pre-version-2 schemas)
await client.query('CREATE INDEX IF NOT EXISTS idx_queue_entries_status ON queue_entries(status)');
await client.query('CREATE INDEX IF NOT EXISTS idx_queue_entries_updated ON queue_entries(updated)');
await client.query('CREATE INDEX IF NOT EXISTS idx_queue_idempotent_keys_idempotent_id ON queue_idempotent_keys(idempotent_id)');

await client.query('INSERT INTO queue_schema_version (version, applied_at) VALUES (1, $1)', [Date.now()]);
await client.query('COMMIT');
logger?.debug('Applied schema version 1');
} catch (error) {
await client.query('ROLLBACK');
throw(error);
}
}

// Version 2: Partition-aware composite indexes
if (currentVersion < 2) {
logger?.debug('Applying schema version 2: Partition-aware composite indexes');

// Create new partition-aware indexes (CONCURRENTLY must be outside transaction)
logger?.debug('Creating partition-aware indexes...');
await client.query('CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_queue_entries_path_status ON queue_entries(path, status)');
await client.query('CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_queue_entries_path_updated ON queue_entries(path, updated)');
await client.query('CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_queue_entries_path_status_updated ON queue_entries(path, status, updated)');
await client.query('CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_queue_idempotent_keys_path_idempotent_id ON queue_idempotent_keys(path, idempotent_id)');

// Now transactionally drop old indexes and record version
await client.query('BEGIN');
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using BEGIN without specifying isolation level may cause confusion since other transactions in the codebase explicitly use BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE. For consistency and clarity, consider explicitly specifying the isolation level even for DDL operations (e.g., BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED).

Suggested change
await client.query('BEGIN');
await client.query('BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED');

Copilot uses AI. Check for mistakes.
try {
// Drop old indexes that are now redundant (these will fail gracefully if indexes don't exist)
logger?.debug('Dropping old single-column indexes...');
await client.query('DROP INDEX IF EXISTS idx_queue_entries_status');
await client.query('DROP INDEX IF EXISTS idx_queue_entries_updated');
await client.query('DROP INDEX IF EXISTS idx_queue_idempotent_keys_idempotent_id');

await client.query('INSERT INTO queue_schema_version (version, applied_at) VALUES (2, $1)', [Date.now()]);
await client.query('COMMIT');
logger?.debug('Applied schema version 2');
} catch (error) {
await client.query('ROLLBACK');
throw(error);
}
}

logger?.debug('Schema is up to date');
} finally {
// Always release the advisory lock
// Note: Advisory locks are session-based and auto-release on disconnect,
// but we explicitly unlock for clarity and to avoid holding locks longer than needed
try {
logger?.debug('Releasing advisory lock');
await client.query('SELECT pg_advisory_unlock($1)', [lockId]);
} catch (unlockError) {
// Log but don't throw - the lock will be auto-released when connection closes
logger?.debug('Failed to explicitly release advisory lock (will auto-release on disconnect):', unlockError);
}
}
} finally {
client.release();
}
Expand Down Expand Up @@ -153,6 +234,9 @@ export default class KeetaAnchorQueueStorageDriverPostgres<QueueRequest extends
if (errorCode === '40001' || errorCode === '40P01') {
logger?.debug('Serialization failure or deadlock detected');

// Track serialization retries for instrumentation
this.serializationRetryCount++;

const minBackoff = 100;
const maxBackoff = 30_000;
const backoffIntervalSize = Math.min(maxBackoff - minBackoff, (retry + 50) ** 2);
Expand Down Expand Up @@ -289,7 +373,7 @@ export default class KeetaAnchorQueueStorageDriverPostgres<QueueRequest extends
const { oldStatus } = ancillary ?? {};

return(await this.dbTransaction('setStatus', async (client, logger): Promise<void> => {
const existingEntry = await client.query<{ status: KeetaAnchorQueueStatus; failures: number; last_error: string | null; output: string | null }>('SELECT status, failures, last_error, output FROM queue_entries WHERE id = $1 AND path = $2', [id, this.pathStr]);
const existingEntry = await client.query<{ status: KeetaAnchorQueueStatus; failures: number; last_error: string | null; output: string | null }>('SELECT status, failures, last_error, output FROM queue_entries WHERE id = $1 AND path = $2 FOR UPDATE', [id, this.pathStr]);
if (existingEntry.rows.length === 0) {
throw(new Error(`Request with ID ${String(id)} not found`));
}
Expand Down Expand Up @@ -415,6 +499,10 @@ export default class KeetaAnchorQueueStorageDriverPostgres<QueueRequest extends
query += ' WHERE ' + conditions.join(' AND ');
}

// Use random ordering to prevent multiple workers from contending for the same rows
// This spreads the load when multiple workers query simultaneously
query += ' ORDER BY RANDOM()';

if (filter?.limit !== undefined) {
query += ` LIMIT $${paramIndex++}`;
params.push(filter.limit);
Expand Down Expand Up @@ -490,6 +578,8 @@ export default class KeetaAnchorQueueStorageDriverPostgres<QueueRequest extends
_Testing(key: string): {
setToctouDelay(delay: number): void;
unsetToctouDelay(): void;
getSerializationRetryCount(): number;
resetSerializationRetryCount(): void;
} {
if (key !== 'bc81abf8-e43b-490b-b486-744fb49a5082') {
throw(new Error('This is a testing only method'));
Expand All @@ -503,6 +593,12 @@ export default class KeetaAnchorQueueStorageDriverPostgres<QueueRequest extends
},
unsetToctouDelay: (): void => {
this.toctouDelay = undefined;
},
getSerializationRetryCount: (): number => {
return(this.serializationRetryCount);
},
resetSerializationRetryCount: (): void => {
this.serializationRetryCount = 0;
}
});
}
Expand Down
Loading
Loading