Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions lib/base/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -835,27 +835,31 @@ class BaseConnection extends EventEmitter {
if (!cb) {
return;
}

if (this._fatalError || this._protocolError) {
return cb(this._fatalError || this._protocolError);
}

if (this._handshakePacket) {
return cb(null, this);
}
let connectCalled = 0;
function callbackOnce(isErrorHandler) {
return function (param) {
if (!connectCalled) {
if (isErrorHandler) {
cb(param);
} else {
cb(null, param);
}
}
connectCalled = 1;
};
}
this.once('error', callbackOnce(true));
this.once('connect', callbackOnce(false));

/* eslint-disable prefer-const */
let onError, onConnect;

onError = (param) => {
this.removeListener('connect', onConnect);
cb(param);
};

onConnect = (param) => {
this.removeListener('error', onError);
cb(null, param);
};
/* eslint-enable prefer-const */

this.once('error', onError);
this.once('connect', onConnect);
}

// ===================================
Expand Down
90 changes: 90 additions & 0 deletions test/integration/test-pool-memory-leak.test.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import type { Pool, PoolConnection } from '../../index.js';
import { assert, describe, it } from 'poku';
import { createPool } from '../common.test.mjs';

/** Returns the raw `PoolConnection` with access to `EventEmitter` listeners. */
const getConnection = (pool: Pool) =>
new Promise<PoolConnection>((resolve, reject) => {
pool.getConnection((err, conn) => {
if (err) return reject(err);
resolve(conn);
});
});

await describe('Pool Memory Leak (issue #3904)', async () => {
await it('should not retain stale connect/error listeners after pool connection is established', async () => {
const pool = createPool({ connectionLimit: 1 });
const conn = await getConnection(pool);

try {
const errorListenerCount = conn.listenerCount('error');

assert.strictEqual(
errorListenerCount,
1,
`Expected 1 error listener, but found ${errorListenerCount}.`
);
} finally {
conn.release();
await pool.promise().end();
}
});

await it('should not accumulate listeners across multiple pool.query calls', async () => {
const pool = createPool({ connectionLimit: 1 });
const promisePool = pool.promise();

await promisePool.query('SELECT 1');

const conn = await getConnection(pool);

try {
const errorListeners = conn.listenerCount('error');
const connectListeners = conn.listenerCount('connect');

assert.strictEqual(
errorListeners,
1,
`Expected 1 error listener after queries, but found ${errorListeners}.`
);

assert.strictEqual(
connectListeners,
0,
`Expected 0 connect listeners, but found ${connectListeners}.`
);
} finally {
conn.release();
await promisePool.end();
}
});

await it('should release query result references after query completes', async () => {
const pool = createPool({ connectionLimit: 1 });
const promisePool = pool.promise();

await promisePool.query(
'SELECT REPEAT("x", 1024) AS padding FROM (SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4) t'
);

const conn = await getConnection(pool);

try {
const errorListeners = conn.rawListeners('error');

const hasStaleListener = errorListeners.some((listener) => {
// @ts-expect-error: TODO: implement typings
const original = listener.listener ?? listener;
return original.toString().includes('connectCalled');
});

assert(
!hasStaleListener,
'Found a stale callbackOnce error listener that retains query result references.'
);
} finally {
conn.release();
await promisePool.end();
}
});
});
Loading