diff --git a/src/lib/queue/drivers/queue_postgres.ts b/src/lib/queue/drivers/queue_postgres.ts index 9d02c0da..adece2c9 100644 --- a/src/lib/queue/drivers/queue_postgres.ts +++ b/src/lib/queue/drivers/queue_postgres.ts @@ -44,6 +44,7 @@ export default class KeetaAnchorQueueStorageDriverPostgres Promise) | null = null; private dbInitializationPromise: Promise | null = null; + private serializationRetryCount = 0; readonly name = 'KeetaAnchorQueueStorageDriverPostgres'; readonly id: string; @@ -77,34 +78,114 @@ export default class KeetaAnchorQueueStorageDriverPostgres('SELECT MAX(version) as version FROM queue_schema_version'); + const currentVersion = versionResult.rows[0]?.version ?? 0; + + logger?.debug(`Current queue schema version: ${currentVersion}`); + + // Version 1: Initial schema + if (currentVersion < 1) { + logger?.debug('Applying schema version 1: Initial tables and indexes'); + + await client.query('BEGIN'); + try { + await client.query(` + CREATE TABLE IF NOT EXISTS queue_entries ( + id TEXT NOT NULL, + path TEXT NOT NULL, + request TEXT NOT NULL, + output TEXT, + last_error TEXT, + status TEXT NOT NULL, + created BIGINT NOT NULL, + updated BIGINT NOT NULL, + worker BIGINT, + failures INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (id, path) + )`); + + await client.query(` + CREATE TABLE IF NOT EXISTS queue_idempotent_keys ( + entry_id TEXT NOT NULL, + idempotent_id TEXT NOT NULL, + path TEXT NOT NULL, + UNIQUE (idempotent_id, path), + PRIMARY KEY (entry_id, idempotent_id, path), + FOREIGN KEY (entry_id, path) REFERENCES queue_entries(id, path) + )`); + + // Old single-column indexes (for pre-version-2 schemas) + await client.query('CREATE INDEX IF NOT EXISTS idx_queue_entries_status ON queue_entries(status)'); + await client.query('CREATE INDEX IF NOT EXISTS idx_queue_entries_updated ON queue_entries(updated)'); + await client.query('CREATE INDEX IF NOT EXISTS idx_queue_idempotent_keys_idempotent_id ON queue_idempotent_keys(idempotent_id)'); + + await client.query('INSERT INTO queue_schema_version (version, applied_at) VALUES (1, $1)', [Date.now()]); + await client.query('COMMIT'); + logger?.debug('Applied schema version 1'); + } catch (error) { + await client.query('ROLLBACK'); + throw(error); + } + } + + // Version 2: Partition-aware composite indexes + if (currentVersion < 2) { + logger?.debug('Applying schema version 2: Partition-aware composite indexes'); + + // Create new partition-aware indexes (CONCURRENTLY must be outside transaction) + logger?.debug('Creating partition-aware indexes...'); + await client.query('CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_queue_entries_path_status ON queue_entries(path, status)'); + await client.query('CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_queue_entries_path_updated ON queue_entries(path, updated)'); + await client.query('CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_queue_entries_path_status_updated ON queue_entries(path, status, updated)'); + await client.query('CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_queue_idempotent_keys_path_idempotent_id ON queue_idempotent_keys(path, idempotent_id)'); + + // Now transactionally drop old indexes and record version + await client.query('BEGIN'); + try { + // Drop old indexes that are now redundant (these will fail gracefully if indexes don't exist) + logger?.debug('Dropping old single-column indexes...'); + await client.query('DROP INDEX IF EXISTS idx_queue_entries_status'); + await client.query('DROP INDEX IF EXISTS idx_queue_entries_updated'); + await client.query('DROP INDEX IF EXISTS idx_queue_idempotent_keys_idempotent_id'); + + await client.query('INSERT INTO queue_schema_version (version, applied_at) VALUES (2, $1)', [Date.now()]); + await client.query('COMMIT'); + logger?.debug('Applied schema version 2'); + } catch (error) { + await client.query('ROLLBACK'); + throw(error); + } + } + + logger?.debug('Schema is up to date'); + } finally { + // Always release the advisory lock + // Note: Advisory locks are session-based and auto-release on disconnect, + // but we explicitly unlock for clarity and to avoid holding locks longer than needed + try { + logger?.debug('Releasing advisory lock'); + await client.query('SELECT pg_advisory_unlock($1)', [lockId]); + } catch (unlockError) { + // Log but don't throw - the lock will be auto-released when connection closes + logger?.debug('Failed to explicitly release advisory lock (will auto-release on disconnect):', unlockError); + } + } } finally { client.release(); } @@ -153,6 +234,9 @@ export default class KeetaAnchorQueueStorageDriverPostgres => { - const existingEntry = await client.query<{ status: KeetaAnchorQueueStatus; failures: number; last_error: string | null; output: string | null }>('SELECT status, failures, last_error, output FROM queue_entries WHERE id = $1 AND path = $2', [id, this.pathStr]); + const existingEntry = await client.query<{ status: KeetaAnchorQueueStatus; failures: number; last_error: string | null; output: string | null }>('SELECT status, failures, last_error, output FROM queue_entries WHERE id = $1 AND path = $2 FOR UPDATE', [id, this.pathStr]); if (existingEntry.rows.length === 0) { throw(new Error(`Request with ID ${String(id)} not found`)); } @@ -415,6 +499,10 @@ export default class KeetaAnchorQueueStorageDriverPostgres { this.toctouDelay = undefined; + }, + getSerializationRetryCount: (): number => { + return(this.serializationRetryCount); + }, + resetSerializationRetryCount: (): void => { + this.serializationRetryCount = 0; } }); } diff --git a/src/lib/queue/index.test.ts b/src/lib/queue/index.test.ts index a4e41939..6930b6c8 100644 --- a/src/lib/queue/index.test.ts +++ b/src/lib/queue/index.test.ts @@ -46,6 +46,14 @@ function generateRequestID(): KeetaAnchorQueueRequestID { return(crypto.randomUUID() as unknown as KeetaAnchorQueueRequestID); } +/* + * Type guard to check if a queue driver is a PostgreSQL driver. + * This allows us to access PostgreSQL-specific testing methods safely. + */ +function isPostgresDriver(driver: unknown): driver is KeetaAnchorQueueStorageDriverPostgres { + return(driver instanceof KeetaAnchorQueueStorageDriverPostgres); +} + function getTestingRedisConfig(): { host: string; port: number; password: string | undefined; } | null { const host = process.env['ANCHOR_TESTING_REDIS_HOST']; const portStr = process.env['ANCHOR_TESTING_REDIS_PORT']; @@ -1792,6 +1800,270 @@ suite.sequential('Driver Tests', async function() { expect(part111_entry?.request).toEqual({ partition: 'part1.1.1' }); }); } + + /* + * Test for serialization conflict prevention with multiple partitions. + * Only run for PostgreSQL driver as it's the one with SERIALIZABLE isolation. + */ + if (driver === 'PostgreSQL') { + for (const kind of ['single', 'multiple'] as const) { + testRunner(`Multi-Partition Concurrent Operations No Serialization Errors (${kind} worker${kind === 'multiple' ? 's' : ''})`, async function() { + await using driverInstance = await driverConfig.create(`multi_partition_serial_${kind}`); + const rootQueue = driverInstance.queue; + + /* Create 3 partitioned queues sharing the same root table */ + await using partition1 = await rootQueue.partition('partition1'); + await using partition2 = await rootQueue.partition('partition2'); + await using partition3 = await rootQueue.partition('partition3'); + + const queues = [partition1, partition2, partition3]; + const queueNames = ['partition1', 'partition2', 'partition3']; + + /* + * Verify all queue instances are PostgreSQL drivers so we can use + * PostgreSQL-specific testing methods. + */ + if (!isPostgresDriver(rootQueue)) { + throw(new Error('Root queue is not a PostgreSQL driver')); + } + if (!isPostgresDriver(partition1)) { + throw(new Error('Partition1 is not a PostgreSQL driver')); + } + if (!isPostgresDriver(partition2)) { + throw(new Error('Partition2 is not a PostgreSQL driver')); + } + if (!isPostgresDriver(partition3)) { + throw(new Error('Partition3 is not a PostgreSQL driver')); + } + + /* Add multiple entries to each partition */ + const entriesPerPartition = 2000; + for (let queueIdx = 0; queueIdx < queues.length; queueIdx++) { + const queue = queues[queueIdx]; + const queueName = queueNames[queueIdx]; + if (!queue || !queueName) { + throw(new Error(`Queue ${queueIdx} not found`)); + } + for (let i = 0; i < entriesPerPartition; i++) { + await queue.add({ + partition: queueName, + item: i + }); + } + } + + /* Reset counters after setup, before concurrent processing test */ + rootQueue._Testing(TestingKey).resetSerializationRetryCount(); + partition1._Testing(TestingKey).resetSerializationRetryCount(); + partition2._Testing(TestingKey).resetSerializationRetryCount(); + partition3._Testing(TestingKey).resetSerializationRetryCount(); + + /* Track processed items */ + const processedItems: { partition: string; item: number; workerID: number }[] = []; + + /* Process function that simulates work */ + const processEntry = async (queue: KeetaAnchorQueueStorageDriver, workerID: number): Promise => { + let processed = 0; + for (let i = 0; i < 10; i++) { + /* Query for pending items */ + const entries = await queue.query({ status: 'pending', limit: 1 }); + if (entries.length === 0) { + break; + } + + const entry = entries[0]; + if (!entry) { + break; + } + + /* Simulate some work being done */ + await asleep(Math.random() * 5); + + /* Update status to processing */ + try { + await queue.setStatus(entry.id, 'processing', { + oldStatus: 'pending' + }); + } catch (error: unknown) { + /* Another worker may have grabbed it, skip */ + if (error instanceof Errors.IncorrectStateAssertedError) { + continue; + } + throw(error); + } + + /* Simulate more work */ + await asleep(Math.random() * 5); + + /* Complete the work */ + await queue.setStatus(entry.id, 'completed', { + oldStatus: 'processing', + output: { processed: true, by: workerID } + }); + + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + const req = entry.request as { partition: string; item: number }; + processedItems.push({ + partition: req.partition, + item: req.item, + workerID: workerID + }); + processed++; + } + return(processed); + }; + + /* Run workers concurrently on all partitions */ + const workerCount = kind === 'single' ? 1 : 3; + const promises: Promise[] = []; + + for (let queueIdx = 0; queueIdx < queues.length; queueIdx++) { + const queue = queues[queueIdx]; + if (!queue) { + throw(new Error(`Queue or name ${queueIdx} not found`)); + } + + for (let workerID = 0; workerID < workerCount; workerID++) { + promises.push(processEntry(queue, queueIdx * workerCount + workerID)); + } + } + + /* Wait for all workers to complete */ + const results = await Promise.all(promises); + const totalProcessed = results.reduce((sum, count) => sum + count, 0); + + /* Get serialization retry counts from all queues */ + const rootRetries = rootQueue._Testing(TestingKey).getSerializationRetryCount(); + const p1Retries = partition1._Testing(TestingKey).getSerializationRetryCount(); + const p2Retries = partition2._Testing(TestingKey).getSerializationRetryCount(); + const p3Retries = partition3._Testing(TestingKey).getSerializationRetryCount(); + const totalRetries = rootRetries + p1Retries + p2Retries + p3Retries; + + /* + * With our fixes (partition-specific indexes, FOR UPDATE, random ordering), + * cross-partition serialization conflicts are eliminated completely. + * However, within a single partition, some conflicts are still possible when: + * - Multiple workers query() for pending items (creates read dependency) + * - Then one worker's setStatus() with FOR UPDATE conflicts with another's read + */ + const maxExpectedRetries = kind === 'single' ? 3 : 10; + if (totalRetries > maxExpectedRetries) { + /* Log detailed information to help diagnose excessive retries */ + console.error(`Excessive serialization retries detected!`); + console.error(` Root: ${rootRetries}, P1: ${p1Retries}, P2: ${p2Retries}, P3: ${p3Retries}`); + console.error(` Total: ${totalRetries} (max expected: ${maxExpectedRetries})`); + console.error(` Worker type: ${kind}`); + console.error(` Total processed: ${totalProcessed}`); + } + expect(totalRetries).toBeLessThanOrEqual(maxExpectedRetries); + + /* Verify items were processed */ + expect(totalProcessed).toBeGreaterThan(0); + expect(processedItems.length).toBe(totalProcessed); + + /* Verify each partition processed some items */ + const itemsByPartition = processedItems.reduce<{ [key: string]: number }>((acc, item) => { + acc[item.partition] = (acc[item.partition] ?? 0) + 1; + return(acc); + }, {}); + + for (const queueName of queueNames) { + expect(itemsByPartition[queueName]).toBeGreaterThan(0); + } + + /* Verify no duplicate processing (each item processed exactly once) */ + const processedItemKeys = processedItems.map(item => `${item.partition}-${item.item}`); + const uniqueKeys = new Set(processedItemKeys); + expect(uniqueKeys.size).toBe(processedItemKeys.length); + + /* Verify entries are in completed state */ + for (const queue of queues) { + if (!queue) { + continue; + } + const completedEntries = await queue.query({ status: 'completed' }); + expect(completedEntries.length).toBeGreaterThan(0); + } + + if (kind === 'single') { + /* + * Now test operations that should have ZERO serialization conflicts. + * When operations don't include query() (which creates read dependencies), + * and each partition operates independently, there should be no conflicts. + */ + + /* Reset counters before zero-conflict test */ + rootQueue._Testing(TestingKey).resetSerializationRetryCount(); + partition1._Testing(TestingKey).resetSerializationRetryCount(); + partition2._Testing(TestingKey).resetSerializationRetryCount(); + partition3._Testing(TestingKey).resetSerializationRetryCount(); + + /* + * Concurrent add + setStatus operations on different partitions. + * This should have zero conflicts because: + * - No query() operations (no read dependencies) + * - Each partition operates on its own data + * - add() uses SERIALIZABLE but with partition-specific indexes + * - setStatus() uses FOR UPDATE on specific IDs + */ + const zeroConflictPromises: Promise[] = []; + const itemsPerPartition = 20; + + for (let queueIdx = 0; queueIdx < queues.length; queueIdx++) { + const queue = queues[queueIdx]; + const queueName = queueNames[queueIdx]; + if (!queue || !queueName) { + throw(new Error(`Queue ${queueIdx} not found`)); + } + + /* Each partition adds items and updates status concurrently */ + for (let workerIdx = 0; workerIdx < (kind === 'single' ? 1 : 3); workerIdx++) { + zeroConflictPromises.push((async () => { + for (let i = 0; i < itemsPerPartition; i++) { + /* Add a new item */ + const id = await queue.add({ + partition: queueName, + zeroConflictTest: true, + item: i, + worker: workerIdx + }); + + /* Immediately update its status - no query involved */ + await queue.setStatus(id, 'processing'); + await queue.setStatus(id, 'completed', { + output: { worker: workerIdx } + }); + } + })()); + } + } + + /* Wait for all zero-conflict operations to complete */ + await Promise.all(zeroConflictPromises); + + /* Get serialization retry counts after zero-conflict test */ + const zeroRootRetries = rootQueue._Testing(TestingKey).getSerializationRetryCount(); + const zeroP1Retries = partition1._Testing(TestingKey).getSerializationRetryCount(); + const zeroP2Retries = partition2._Testing(TestingKey).getSerializationRetryCount(); + const zeroP3Retries = partition3._Testing(TestingKey).getSerializationRetryCount(); + const zeroTotalRetries = zeroRootRetries + zeroP1Retries + zeroP2Retries + zeroP3Retries; + + /* + * With partition-specific indexes and no query() operations, + * serialization conflicts should be zero unless there are + * concurrent workers. + */ + if (zeroTotalRetries > 0) { + console.error(`Unexpected serialization retries in zero-conflict test!`); + console.error(` Root: ${zeroRootRetries}, P1: ${zeroP1Retries}, P2: ${zeroP2Retries}, P3: ${zeroP3Retries}`); + console.error(` Total: ${zeroTotalRetries}`); + console.error(` Worker type: ${kind}`); + } + expect(zeroTotalRetries).toBe(0); + } + }, 60_000); /* Longer timeout for concurrent operations */ + } + } }); } });