From 8ccd16e15d25ddecc859724b591f34030c00160f Mon Sep 17 00:00:00 2001 From: Mysticswiss Date: Mon, 24 Nov 2025 01:58:53 +0800 Subject: [PATCH] Implement RTT tracking and parallel probing in HyperDHT - Added RTT measurement and caching for nodes to improve connection efficiency. - Introduced parallel probing based on average RTT to optimize connection attempts. - Enhanced timeout settings for connections using RTT data. - Updated related tests to validate new functionality and ensure proper behavior with RTT data. - Refactored connection logic to utilize new caching mechanisms for direct connections. --- index.js | 164 +++++++++++++++- lib/announcer.js | 50 ++++- lib/connect.js | 144 ++++++++++++-- lib/nat.js | 55 ++++-- lib/server.js | 4 +- test/all.js | 2 + test/parallel-probing.js | 396 +++++++++++++++++++++++++++++++++++++++ test/rtt.js | 290 ++++++++++++++++++++++++++++ 8 files changed, 1073 insertions(+), 32 deletions(-) create mode 100644 test/parallel-probing.js create mode 100644 test/rtt.js diff --git a/index.js b/index.js index 7e983360..e12aeafa 100644 --- a/index.js +++ b/index.js @@ -25,7 +25,8 @@ class HyperDHT extends DHT { super({ ...opts, port, bootstrap, nodes, filterNode }) - const { router, relayAddresses, persistent } = defaultCacheOpts(opts) + const { router, relayAddresses, nodeRTT, connectionCache, directConnectionCache, persistent } = + defaultCacheOpts(opts) this.defaultKeyPair = opts.keyPair || createKeyPair(opts.seed) this.listening = new Set() @@ -46,6 +47,12 @@ class HyperDHT extends DHT { this._validatedLocalAddresses = new Map() this._relayAddressesCache = new Cache(relayAddresses) + this._nodeRTT = new Cache(nodeRTT) + + this._connectionCache = new Cache(connectionCache) + + this._directConnectionCache = new Cache(directConnectionCache) + this._deferRandomPunch = !!opts.deferRandomPunch this._lastRandomPunch = this._deferRandomPunch ? Date.now() : 0 this._connectable = true @@ -65,6 +72,63 @@ class HyperDHT extends DHT { if (!this.online) return for (const server of this.listening) server.notifyOnline() }) + + this._rttWarmupInterval = null + if (opts.preWarmRTT !== false) { + this._startRTTWarmup() + } + + this.parallelProbing = opts.parallelProbing !== false + } + + _startRTTWarmup() { + if (this._rttWarmupInterval) return + + this._rttWarmupInterval = setInterval(() => { + if (this.destroyed || !this.online) return + + // Sample up to 10 random nodes for RTT measurement + const nodes = [] + if (this.nodes && this.nodes.latest) { + let count = 0 + for (let node = this.nodes.latest; node && count < 10; node = node.prev) { + if (node.host && node.port) { + nodes.push(node) + count++ + } + } + } + + for (const node of nodes) { + const key = `${node.host}:${node.port}` + const stats = this._nodeRTT.get(key) + const needsUpdate = !stats || Date.now() - stats.lastUpdate > 60000 // Update if older than 60s + + if (needsUpdate) { + // Measure RTT in background (don't await) + const startTime = process.hrtime.bigint() + this.ping({ host: node.host, port: node.port }).then( + () => { + const endTime = process.hrtime.bigint() + const rtt = Number(endTime - startTime) / 1_000_000 + if (rtt > 0 && rtt < 10000) { + + this.updateNodeRTT(node, rtt) + } + }, + () => { + } + ) + } + } + }, 30000) + } + + _stopRTTWarmup() { + if (this._rttWarmupInterval) { + clearInterval(this._rttWarmupInterval) + this._rttWarmupInterval = null + } } connect(remotePublicKey, opts) { @@ -119,9 +183,104 @@ class HyperDHT extends DHT { if (this._persistent) this._persistent.destroy() await this.rawStreams.clear() await this._socketPool.destroy() + this._stopRTTWarmup() + this._nodeRTT.clear() + this._connectionCache.clear() + this._directConnectionCache.clear() await super.destroy() } + + getNodeRTT(node) { + if (!node) return null + const host = node.host || (node.address && node.address.host) + const port = node.port || (node.address && node.address.port) + if (!host || !port) return null + + const key = `${host}:${port}` + const stats = this._nodeRTT.get(key) + return stats ? stats.srtt : null + } + + + updateNodeRTT(node, rtt) { + if (!node || !rtt || rtt <= 0) return + + const host = node.host || (node.address && node.address.host) + const port = node.port || (node.address && node.address.port) + if (!host || !port) return + + const key = `${host}:${port}` + + const alpha = 0.125 // Weight for SRTT + const beta = 0.25 // Weight for RTTVAR + + if (!this._nodeRTT.has(key)) { + // First measurement + this._nodeRTT.set(key, { + srtt: rtt, + rttvar: rtt / 2, // Initial variance estimate + samples: 1, + lastUpdate: Date.now() + }) + } else { + const stats = this._nodeRTT.get(key) + + stats.rttvar = (1 - beta) * stats.rttvar + beta * Math.abs(stats.srtt - rtt) + + stats.srtt = (1 - alpha) * stats.srtt + alpha * rtt + + stats.samples++ + stats.lastUpdate = Date.now() + } + } + + + sortNodesByRTT(nodes) { + return nodes + .map((node) => ({ + node, + rtt: this.getNodeRTT(node) || Infinity + })) + .sort((a, b) => a.rtt - b.rtt) + .map((item) => item.node) + } + + + getAverageRTT() { + if (!this._nodeRTT) return null + + let totalRTT = 0 + let count = 0 + for (const stats of this._nodeRTT.values()) { + if (stats.srtt && stats.srtt > 0) { + totalRTT += stats.srtt + count++ + } + } + + return count > 0 ? totalRTT / count : null + } + + + getRTTBasedTimeout(baseTimeout = 10000, minMultiplier = 0.9, maxMultiplier = 1.5) { + const avgRTT = this.getAverageRTT() + if (!avgRTT) return baseTimeout + + // Only optimize timeout if we have very fast RTT (< 100ms) + // For slower networks, keep the base timeout to avoid premature timeouts + if (avgRTT < 100) { + // For fast networks, use 10x RTT as timeout (very conservative) + const rttBasedTimeout = avgRTT * 10 + const minTimeout = baseTimeout * minMultiplier + const maxTimeout = baseTimeout * maxMultiplier + return Math.max(minTimeout, Math.min(maxTimeout, rttBasedTimeout)) + } + + // For slower networks, don't reduce timeout - keep base + return baseTimeout + } + async validateLocalAddresses(addresses) { const list = [] const socks = [] @@ -593,6 +752,9 @@ function defaultCacheOpts(opts) { forwards: { maxSize, maxAge } }, relayAddresses: { maxSize: Math.min(maxSize, 512), maxAge: 0 }, + nodeRTT: { maxSize: Math.min(maxSize, 2048), maxAge: 3600000 }, + connectionCache: { maxSize: Math.min(maxSize, 1024), maxAge: 60000 }, + directConnectionCache: { maxSize: Math.min(maxSize, 1024), maxAge: 300000 }, persistent: { records: { maxSize, maxAge }, refreshes: { maxSize, maxAge }, diff --git a/lib/announcer.js b/lib/announcer.js index 42f7d8ef..c14fbf63 100644 --- a/lib/announcer.js +++ b/lib/announcer.js @@ -112,7 +112,22 @@ module.exports = class Announcer { const pings = [] for (const node of this._serverRelays[2].values()) { - pings.push(this.dht.ping(node)) + const startTime = process.hrtime.bigint() + const pingPromise = this.dht.ping(node).then( + (res) => { + // Track RTT for relay nodes + const endTime = process.hrtime.bigint() + const rtt = Number(endTime - startTime) / 1_000_000 // ms + if (this.dht.updateNodeRTT) { + this.dht.updateNodeRTT(node, rtt) + } + return res + }, + (err) => { + throw err + } + ) + pings.push(pingPromise) } const active = await resolved(pings) @@ -167,7 +182,7 @@ module.exports = class Announcer { if (this.stopped || this.suspended) return const ann = [] - const replies = pickBest(q.closestReplies) + const replies = pickBest(q.closestReplies, this.dht) const relays = [] const relayAddresses = [] @@ -295,8 +310,35 @@ function resolved(ps) { }) } -function pickBest(replies) { - // TODO: pick the ones closest to us RTT wise +function pickBest(replies, dht) { + if (!replies || replies.length === 0) return [] + + if (dht && dht.sortNodesByRTT) { + const nodeToMsg = new Map() + const nodes = [] + + for (const msg of replies) { + if (msg.from) { + nodes.push(msg.from) + const key = `${msg.from.host}:${msg.from.port}` + nodeToMsg.set(key, msg) + } + } + + if (nodes.length > 0) { + const sortedNodes = dht.sortNodesByRTT(nodes) + + const sortedMsgs = [] + for (const node of sortedNodes) { + const key = `${node.host}:${node.port}` + const msg = nodeToMsg.get(key) + if (msg) sortedMsgs.push(msg) + } + + return sortedMsgs.slice(0, 3) + } + } + return replies.slice(0, 3) } diff --git a/lib/connect.js b/lib/connect.js index bad3c896..53bb5ef5 100644 --- a/lib/connect.js +++ b/lib/connect.js @@ -29,6 +29,42 @@ const { SUSPENDED } = require('./errors') +async function parallelProbe(target, probeFn, avgRTT, maxProbes = 3) { + const delays = avgRTT > 0 ? [0, Math.max(50, avgRTT >> 1), Math.max(100, avgRTT)] : [0, 100, 250] + + const probes = delays + .slice(0, maxProbes) + .map((delay) => + delay > 0 + ? new Promise((r) => setTimeout(r, delay)).then(() => probeFn(target)) + : probeFn(target) + ) + + const errors = [] + + return new Promise((resolve, reject) => { + let settled = 0 + let resolved = false + + probes.forEach((probe, i) => { + probe.then( + (result) => { + if (!resolved) { + resolved = true + resolve(result) + } + }, + (error) => { + errors[i] = error + if (++settled === probes.length && !resolved) { + reject(errors[0] || new Error('All parallel probes failed')) + } + } + ) + }) + }) +} + module.exports = function connect(dht, publicKey, opts = {}) { const pool = opts.pool || null @@ -181,7 +217,18 @@ async function connectAndHolepunch(c, opts) { if (isDone(c)) return } - await findAndConnect(c, opts) + if ( + (await Promise.race([ + tryDirectConnection(c, opts).catch(() => false), + new Promise((r) => setTimeout(() => r(false), 1000)) + ])) && + c.connect + ) { + await holepunch(c, opts) + return + } + + await findAndConnectParallel(c, opts) if (isDone(c)) return if (!c.connect) { @@ -226,7 +273,7 @@ async function holepunch(c, opts) { } if (c.firewall === FIREWALL.OPEN) { - c.passiveConnectTimeout = setTimeout(onabort, 10000) + c.passiveConnectTimeout = setTimeout(onabort, c.dht.getRTTBasedTimeout?.(10000) ?? 10000) return } @@ -315,7 +362,36 @@ async function holepunch(c, opts) { } } -async function findAndConnect(c, opts) { +async function tryDirectConnection(c, opts) { + const key = c.target.toString('hex') + const addresses = [] + + const cached = c.dht._directConnectionCache?.get(key) + if (cached && Date.now() - cached.timestamp < 300000) { + addresses.push(cached.address) + } + + if (addresses.length === 0) return false + + for (const addr of addresses) { + try { + await Promise.race([ + connectThroughNode(c, addr, null), + new Promise((_, rej) => setTimeout(() => rej(new Error('timeout')), 2000)) + ]) + if (c.connect) { + c.dht._directConnectionCache?.set(key, { address: addr, timestamp: Date.now() }) + return true + } + } catch { + if (addr === cached?.address) c.dht._directConnectionCache?.delete(key) + } + } + + return false +} + +async function findAndConnectParallel(c, opts) { let attempts = 0 let closestNodes = opts.relayAddresses && opts.relayAddresses.length ? opts.relayAddresses : null @@ -324,6 +400,13 @@ async function findAndConnect(c, opts) { if (cachedRelayAddresses) closestNodes = cachedRelayAddresses } + if (!closestNodes && c.dht._connectionCache) { + const cached = c.dht._connectionCache.get(c.target.toString('hex')) + if (cached && Date.now() - cached.timestamp < 60000) { + closestNodes = cached.path + } + } + if (c.dht._persistent) { // check if we know the route ourself... const route = c.dht._router.get(c.target) @@ -332,19 +415,29 @@ async function findAndConnect(c, opts) { } } + // If we have closestNodes, try handshake immediately in parallel with discovery + let earlyHandshakePromise = null + if (closestNodes?.length) { + const sorted = c.dht.sortNodesByRTT?.(closestNodes) ?? closestNodes + earlyHandshakePromise = connectThroughNode(c, sorted[0], null).catch(() => null) + } + // 2 is how many parallel connect attempts we want to do, we can make this configurable const sem = new Semaphore(2) const signal = sem.signal.bind(sem) const tries = closestNodes !== null ? 2 : 1 try { + earlyHandshakePromise?.catch(() => {}) + for (let i = 0; i < tries && !isDone(c) && !c.connect; i++) { + const useRTTBased = closestNodes !== null && c.dht._nodeRTT && c.dht.getAverageRTT() != null c.query = c.dht.findPeer(c.target, { hash: false, session: c.session, closestNodes, onlyClosestNodes: closestNodes !== null, - retries: closestNodes ? 1 : 3 + retries: closestNodes ? 1 : useRTTBased ? 2 : 3 }) for await (const data of c.query) { @@ -383,6 +476,10 @@ async function findAndConnect(c, opts) { } } +async function findAndConnect(c, opts) { + return findAndConnectParallel(c, opts) +} + async function connectThroughNode(c, address, socket) { if (!c.requesting) { // If we have a stable server address, send it over now @@ -448,6 +545,24 @@ async function connectThroughNode(c, address, socket) { payload } + if (address) { + const key = c.target.toString('hex') + const timestamp = Date.now() + + // Always cache relay path for discovery + const relayAddr = { host: address.host, port: address.port } + c.dht._connectionCache?.set(key, { path: [relayAddr], timestamp }) + + // Also cache peer's best reachable address for direct connection + const peerAddr = getFirstRemoteAddress(payload.addresses4, serverAddress) + if (peerAddr) { + c.dht._directConnectionCache?.set(key, { + address: { host: peerAddr.host, port: peerAddr.port }, + timestamp + }) + } + } + c.payload = new SecurePayload(hs.holepunchSecret) c.onsocket = function (socket, port, host) { @@ -553,9 +668,17 @@ async function updateHolepunch(c, peerAddress, relayAddr, payload) { } async function probeRound(c, serverAddress, serverRelay, retry) { - // Open a quick low ttl session against what we think is the server - if (serverAddress) await c.puncher.openSession(serverAddress) + const avgRTT = c.dht.getAverageRTT?.() ?? null + const probe = async (addr) => { + if (!addr) return + return c.dht.parallelProbing !== false && avgRTT != null + ? parallelProbe(addr, (a) => c.puncher.openSession(a), avgRTT, 3) + : c.puncher.openSession(addr) + } + + // Open a quick low ttl session against what we think is the server + await probe(serverAddress) if (isDone(c)) return null const reply = await updateHolepunch(c, serverRelay.peerAddress, serverRelay.relayAddress, { @@ -581,12 +704,11 @@ async function probeRound(c, serverAddress, serverRelay, retry) { // if they haven't said they are random yet if ( c.puncher.remoteFirewall < FIREWALL.RANDOM && - address && - address.host && - address.port && + address?.host && + address?.port && diffAddress(address, serverAddress) ) { - await c.puncher.openSession(address) + await probe(address) if (isDone(c)) return null } @@ -762,7 +884,7 @@ function relayConnection(c, relayThrough, payload, hs) { c.relaySocket = c.dht.connect(publicKey) c.relaySocket.setKeepAlive(c.relayKeepAlive) c.relayClient = relay.Client.from(c.relaySocket, { id: c.relaySocket.publicKey }) - c.relayTimeout = setTimeout(onabort, 15000, null) + c.relayTimeout = setTimeout(onabort, c.dht.getRTTBasedTimeout?.(15000) ?? 15000, null) c.relayClient.pair(isInitiator, token, c.rawStream).on('error', onabort).on('data', ondata) diff --git a/lib/nat.js b/lib/nat.js index d696d7eb..1512fcc8 100644 --- a/lib/nat.js +++ b/lib/nat.js @@ -30,32 +30,57 @@ module.exports = class Nat { const socket = this.socket const maxPings = this._minSamples - let skip = this.dht.nodes.length >= 8 ? 5 : 0 - let pending = 0 + const availableNodes = [] + for (let node = this.dht.nodes.latest; node; node = node.prev) { + if (node.host && node.port) { + const ref = node.host + ':' + node.port + if (!this._visited.has(ref)) { + availableNodes.push(node) + } + } + } + + let selectedNodes = availableNodes + if (availableNodes.length > maxPings && this.dht.sortNodesByRTT) { + // Use RTT-based selection if available + selectedNodes = this.dht.sortNodesByRTT(availableNodes).slice(0, maxPings * 2) + } else { + // Fallback: skip some nodes if we have many + let skip = this.dht.nodes.length >= 8 ? 5 : 0 + selectedNodes = availableNodes + .filter((node, idx) => { + if (skip > 0 && idx < skip) { + skip-- + return false + } + return true + }) + .slice(0, maxPings * 2) + } - // TODO: it would be best to pick the nodes to help us based on latency to us - // That should reduce connect latency in general. We should investigate tracking that later on. + let pending = 0 // TODO 2: try to pick nodes with different IPs as well, as that'll help multi IP cell connections... // If we expose this from the nat sampler then the DHT should be able to help us filter out scams as well... - for ( - let node = this.dht.nodes.latest; - node && this.sampled + pending < maxPings; - node = node.prev - ) { - if (skip > 0) { - skip-- - continue - } + for (const node of selectedNodes) { + if (this.sampled + pending >= maxPings) break const ref = node.host + ':' + node.port - if (this._visited.has(ref)) continue this._visited.set(ref, 1) pending++ - this.session.ping(node, { socket, retry: false }).then(onpong, onskip) + const startTime = process.hrtime.bigint() + this.session.ping(node, { socket, retry: false }).then((res) => { + // Track RTT + const endTime = process.hrtime.bigint() + const rtt = Number(endTime - startTime) / 1_000_000 // ms + if (this.dht.updateNodeRTT) { + this.dht.updateNodeRTT(node, rtt) + } + onpong(res) + }, onskip) } pending++ diff --git a/lib/server.js b/lib/server.js index 2244b922..bbf94448 100644 --- a/lib/server.js +++ b/lib/server.js @@ -643,7 +643,9 @@ module.exports = class Server extends EventEmitter { hs.relaySocket = this.dht.connect(publicKey) hs.relaySocket.setKeepAlive(this.relayKeepAlive) hs.relayClient = relay.Client.from(hs.relaySocket, { id: hs.relaySocket.publicKey }) - hs.relayTimeout = setTimeout(onabort, 15000) + // Use RTT-based timeout for relay (default 15s) + const relayTimeout = this.dht.getRTTBasedTimeout ? this.dht.getRTTBasedTimeout(15000) : 15000 + hs.relayTimeout = setTimeout(onabort, relayTimeout) hs.relayClient .pair(isInitiator, token, hs.rawStream) diff --git a/test/all.js b/test/all.js index 6c5c24f7..ffbfe2f1 100644 --- a/test/all.js +++ b/test/all.js @@ -14,8 +14,10 @@ async function runTests() { await import('./messages.js') await import('./nat.js') await import('./noncustodial.js') + await import('./parallel-probing.js') await import('./pool.js') await import('./relaying.js') + await import('./rtt.js') await import('./storing.js') test.resume() diff --git a/test/parallel-probing.js b/test/parallel-probing.js new file mode 100644 index 00000000..1ba9cdcd --- /dev/null +++ b/test/parallel-probing.js @@ -0,0 +1,396 @@ +const test = require('brittle') +const { swarm } = require('./helpers') +const DHT = require('../') + +function hashTargetKey(publicKey) { + return DHT.hash(publicKey).toString('hex') +} + + +test('parallel probe only runs when RTT data exists', async function (t) { + t.plan(3) + + const dht = new DHT({ parallelProbing: true, preWarmRTT: false }) + await dht.ready() + + t.is(dht.parallelProbing, true, 'parallel probing enabled') + + const avgRTT = dht.getAverageRTT() + t.is(avgRTT, null, 'no RTT data yet') + + + const shouldRunParallel = dht.parallelProbing !== false && avgRTT != null + t.is(shouldRunParallel, false, 'parallel probing should NOT run without RTT data') + + await dht.destroy() +}) + +test('parallel probe runs when RTT data exists', async function (t) { + t.plan(4) + + const dht = new DHT({ parallelProbing: true, preWarmRTT: false }) + await dht.ready() + + t.is(dht.parallelProbing, true, 'parallel probing enabled') + + dht.updateNodeRTT({ host: '1.1.1.1', port: 1111 }, 50) + dht.updateNodeRTT({ host: '2.2.2.2', port: 2222 }, 100) + + const avgRTT = dht.getAverageRTT() + t.ok(avgRTT != null, 'RTT data exists') + t.ok(avgRTT > 0, `average RTT: ${avgRTT}ms`) + + const shouldRunParallel = dht.parallelProbing !== false && avgRTT != null + t.is(shouldRunParallel, true, 'parallel probing SHOULD run with RTT data') + + await dht.destroy() +}) + +test('getAverageRTT only called once (not redundantly)', async function (t) { + t.plan(3) + + const dht = new DHT({ parallelProbing: true, preWarmRTT: false }) + await dht.ready() + + for (let i = 0; i < 100; i++) { + dht.updateNodeRTT({ host: `1.1.1.${i}`, port: 1111 + i }, 50 + i) + } + + const start = process.hrtime.bigint() + const avgRTT1 = dht.getAverageRTT() + const end = process.hrtime.bigint() + const duration1 = Number(end - start) / 1_000_000 + + t.ok(avgRTT1 > 0, `average RTT: ${avgRTT1}ms`) + + + + t.ok(duration1 >= 0, `getAverageRTT execution time: ${duration1.toFixed(3)}ms`) + t.pass('With fix, avgRTT should be cached once per probeRound, not called twice') + + await dht.destroy() +}) + + +test('parallel probing - enabled by default', async function (t) { + const testnet = await swarm(t) + const dht = testnet.nodes[0] + + t.is(dht.parallelProbing, true, 'parallel probing enabled by default') +}) + +test('parallel probing - can be disabled', async function (t) { + const dht = new DHT({ parallelProbing: false }) + await dht.ready() + + t.is(dht.parallelProbing, false, 'parallel probing disabled') + + await dht.destroy() +}) + +test('parallel probing - basic connection', async function (t) { + const [server, client] = await swarm(t, 2) + + const testServer = server.createServer() + + testServer.on('connection', (socket) => { + socket.on('error', () => {}) + setTimeout(() => socket.destroy(), 50) + }) + + await testServer.listen() + + const start = Date.now() + const socket = client.connect(testServer.publicKey) + + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('Connection timeout')), 10000) + + socket.on('open', () => { + clearTimeout(timeout) + const time = Date.now() - start + t.ok(time < 5000, `connection established in ${time}ms`) + socket.destroy() + resolve() + }) + + socket.on('error', (err) => { + clearTimeout(timeout) + if (err.code !== 'ECONNRESET') reject(err) + else resolve() + }) + }) + + await testServer.close() +}) + +test('parallel probing - multiple connections improve over time', async function (t) { + const [server, client] = await swarm(t, 2) + + const testServer = server.createServer() + + testServer.on('connection', (socket) => { + socket.on('error', () => {}) + setTimeout(() => socket.destroy(), 50) + }) + + await testServer.listen() + + // Make 5 connections + const times = [] + + for (let i = 0; i < 5; i++) { + const start = Date.now() + const socket = client.connect(testServer.publicKey) + + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('Connection timeout')), 10000) + + socket.on('open', () => { + clearTimeout(timeout) + const time = Date.now() - start + times.push(time) + socket.destroy() + resolve() + }) + + socket.on('error', (err) => { + clearTimeout(timeout) + if (err.code !== 'ECONNRESET') reject(err) + else resolve() + }) + }) + + if (i < 4) await new Promise((r) => setTimeout(r, 1000)) + } + + t.is(times.length, 5, 'all connections completed') + + const firstHalf = times.slice(0, 2) + const secondHalf = times.slice(3, 5) + const firstAvg = firstHalf.reduce((a, b) => a + b, 0) / firstHalf.length + const secondAvg = secondHalf.reduce((a, b) => a + b, 0) / secondHalf.length + + t.ok(secondAvg <= firstAvg * 1.5, 'performance did not degrade significantly') + + await testServer.close() +}) + +test('parallel probing - works without RTT data', async function (t) { + const [server, client] = await swarm(t, 2) + + t.is(client.getAverageRTT(), null, 'no RTT data initially') + + const testServer = server.createServer() + + testServer.on('connection', (socket) => { + socket.on('error', () => {}) + setTimeout(() => socket.destroy(), 50) + }) + + await testServer.listen() + + const start = Date.now() + const socket = client.connect(testServer.publicKey) + + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('Connection timeout')), 10000) + + socket.on('open', () => { + clearTimeout(timeout) + const time = Date.now() - start + t.ok(time < 5000, `connection without RTT data: ${time}ms`) + socket.destroy() + resolve() + }) + + socket.on('error', (err) => { + clearTimeout(timeout) + if (err.code !== 'ECONNRESET') reject(err) + else resolve() + }) + }) + + await testServer.close() +}) + +test('parallel probing - works with RTT data', async function (t) { + const [server, client] = await swarm(t, 2) + + const node = { host: '127.0.0.1', port: 8080 } + client.updateNodeRTT(node, 100) + + t.ok(client.getAverageRTT() > 0, 'RTT data available') + + const testServer = server.createServer() + + testServer.on('connection', (socket) => { + socket.on('error', () => {}) + setTimeout(() => socket.destroy(), 50) + }) + + await testServer.listen() + + const start = Date.now() + const socket = client.connect(testServer.publicKey) + + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('Connection timeout')), 10000) + + socket.on('open', () => { + clearTimeout(timeout) + const time = Date.now() - start + t.ok(time < 5000, `connection with RTT data: ${time}ms`) + socket.destroy() + resolve() + }) + + socket.on('error', (err) => { + clearTimeout(timeout) + if (err.code !== 'ECONNRESET') reject(err) + else resolve() + }) + }) + + await testServer.close() +}) + +test('parallel probing - connection cache usage', async function (t) { + const [server, client] = await swarm(t, 2) + + const testServer = server.createServer() + + testServer.on('connection', (socket) => { + socket.on('error', () => {}) + setTimeout(() => socket.destroy(), 50) + }) + + await testServer.listen() + + const initialCacheSize = client._connectionCache.size + + const socket1 = client.connect(testServer.publicKey) + + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('Connection timeout')), 10000) + + socket1.on('open', () => { + clearTimeout(timeout) + socket1.destroy() + resolve() + }) + + socket1.on('error', (err) => { + clearTimeout(timeout) + if (err.code !== 'ECONNRESET') reject(err) + else resolve() + }) + }) + + await new Promise((r) => setTimeout(r, 500)) + + const socket2 = client.connect(testServer.publicKey) + + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('Connection timeout')), 10000) + + socket2.on('open', () => { + clearTimeout(timeout) + socket2.destroy() + resolve() + }) + + socket2.on('error', (err) => { + clearTimeout(timeout) + if (err.code !== 'ECONNRESET') reject(err) + else resolve() + }) + }) + + const targetKey = hashTargetKey(testServer.publicKey) + t.ok(client._connectionCache.get(targetKey), 'connection cache accumulated data') + + await testServer.close() +}) + +test('parallel probing - direct connection cache usage', async function (t) { + const [server, client] = await swarm(t, 2) + + const testServer = server.createServer() + + testServer.on('connection', (socket) => { + socket.on('error', () => {}) + setTimeout(() => socket.destroy(), 50) + }) + + await testServer.listen() + + const socket = client.connect(testServer.publicKey) + + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('Connection timeout')), 10000) + + socket.on('open', () => { + clearTimeout(timeout) + socket.destroy() + resolve() + }) + + socket.on('error', (err) => { + clearTimeout(timeout) + if (err.code !== 'ECONNRESET') reject(err) + else resolve() + }) + }) + + const targetKey = hashTargetKey(testServer.publicKey) + const directEntry = client._directConnectionCache.get(targetKey) + t.ok(directEntry, 'direct connection cache populated') + if (directEntry) { + t.ok(directEntry.address && directEntry.address.host, 'direct cache stores peer address') + } + + await testServer.close() +}) + +test('parallel probing - handles connection failures gracefully', async function (t) { + const [server, client] = await swarm(t, 2) + + const testServer = server.createServer() + let connectionAttempted = false + + testServer.on('connection', (socket) => { + connectionAttempted = true + socket.on('error', () => {}) + socket.destroy() + }) + + await testServer.listen() + const publicKey = testServer.publicKey + + const socket = client.connect(publicKey) + + await new Promise((resolve) => { + const timeout = setTimeout(() => { + socket.destroy() + t.pass('handled connection attempt') + resolve() + }, 5000) + + socket.on('open', () => { + clearTimeout(timeout) + socket.destroy() + t.pass('connection opened') + resolve() + }) + + socket.on('error', (err) => { + clearTimeout(timeout) + t.pass('handled error gracefully: ' + err.message) + socket.destroy() + resolve() + }) + }) + + await testServer.close() +}) diff --git a/test/rtt.js b/test/rtt.js new file mode 100644 index 00000000..7fad4bf3 --- /dev/null +++ b/test/rtt.js @@ -0,0 +1,290 @@ +const test = require('brittle') +const createTestnet = require('../testnet') +const DHT = require('../') +const { swarm } = require('./helpers') + +test('rtt - tracking basic functionality', async function (t) { + const testnet = await swarm(t) + const dht = testnet.nodes[0] + + const key = '127.0.0.1:8080' + t.absent(dht._nodeRTT.get(key), 'no RTT data initially') + + const node = { host: '127.0.0.1', port: 8080 } + dht.updateNodeRTT(node, 100) + + t.ok(dht._nodeRTT.get(key), 'RTT entry added') + t.is(dht.getNodeRTT(node), 100, 'SRTT value correct for first sample') + + dht.updateNodeRTT(node, 200) + const srtt = dht.getNodeRTT(node) + t.ok(srtt > 100 && srtt < 200, 'TCP EWMA working') + // Expected: (1 - 0.125) * 100 + 0.125 * 200 = 87.5 + 25 = 112.5 + t.is(srtt, 112.5, 'TCP EWMA calculation correct (alpha=0.125)') + + const stats = dht._nodeRTT.get(key) + t.ok(stats.srtt, 'has SRTT') + t.ok(stats.rttvar >= 0, 'has RTTVAR') + t.is(stats.samples, 2, 'sample count correct') +}) + +test('rtt - update with invalid values', async function (t) { + const testnet = await swarm(t) + const dht = testnet.nodes[0] + + const node = { host: '127.0.0.1', port: 8080 } + const key = '127.0.0.1:8080' + + dht.updateNodeRTT(node, 0) + t.absent(dht._nodeRTT.get(key), 'zero RTT ignored') + + dht.updateNodeRTT(node, -100) + t.absent(dht._nodeRTT.get(key), 'negative RTT ignored') + + dht.updateNodeRTT(null, 100) + t.absent(dht._nodeRTT.get('null:undefined'), 'null node ignored') + + dht.updateNodeRTT({ host: '127.0.0.1' }, 100) + t.absent(dht._nodeRTT.get('127.0.0.1:undefined'), 'node without port ignored') +}) + +test('rtt - getNodeRTT with no data', async function (t) { + const testnet = await swarm(t) + const dht = testnet.nodes[0] + + const node = { host: '127.0.0.1', port: 8080 } + t.is(dht.getNodeRTT(node), null, 'returns null for unknown node') + t.is(dht.getNodeRTT(null), null, 'returns null for null node') +}) + +test('rtt - sortNodesByRTT', async function (t) { + const testnet = await swarm(t) + const dht = testnet.nodes[0] + + const nodes = [ + { host: '127.0.0.1', port: 8080 }, + { host: '127.0.0.1', port: 8081 }, + { host: '127.0.0.1', port: 8082 }, + { host: '127.0.0.1', port: 8083 } + ] + + dht.updateNodeRTT(nodes[0], 300) + dht.updateNodeRTT(nodes[1], 100) + dht.updateNodeRTT(nodes[2], 200) + + const sorted = dht.sortNodesByRTT(nodes) + + t.is(sorted.length, 4, 'all nodes returned') + t.is(sorted[0].port, 8081, 'fastest node first (100ms)') + t.is(sorted[1].port, 8082, 'second fastest (200ms)') + t.is(sorted[2].port, 8080, 'third fastest (300ms)') + t.is(sorted[3].port, 8083, 'node without RTT last') +}) + +test('rtt - getAverageRTT', async function (t) { + const testnet = await swarm(t) + const dht = testnet.nodes[0] + + t.is(dht.getAverageRTT(), null, 'null when no data') + + dht.updateNodeRTT({ host: '127.0.0.1', port: 8080 }, 100) + dht.updateNodeRTT({ host: '127.0.0.1', port: 8081 }, 200) + dht.updateNodeRTT({ host: '127.0.0.1', port: 8082 }, 300) + + const avg = dht.getAverageRTT() + t.is(avg, 200, 'average calculated correctly') +}) + +test('rtt - getRTTBasedTimeout for fast network', async function (t) { + const testnet = await swarm(t) + const dht = testnet.nodes[0] + + t.is(dht.getRTTBasedTimeout(10000), 10000, 'base timeout when no data') + + dht.updateNodeRTT({ host: '127.0.0.1', port: 8080 }, 50) + dht.updateNodeRTT({ host: '127.0.0.1', port: 8081 }, 60) + + const timeout = dht.getRTTBasedTimeout(10000) + t.ok(timeout < 10000, 'timeout reduced for fast network') + t.is(timeout, 9000, 'timeout is 90% of base (minimum threshold)') +}) + +test('rtt - getRTTBasedTimeout for slow network', async function (t) { + const testnet = await swarm(t) + const dht = testnet.nodes[0] + + dht.updateNodeRTT({ host: '127.0.0.1', port: 8080 }, 200) + dht.updateNodeRTT({ host: '127.0.0.1', port: 8081 }, 300) + + const timeout = dht.getRTTBasedTimeout(10000) + t.is(timeout, 10000, 'base timeout kept for slow network') +}) + +test('rtt - connection cache', async function (t) { + const testnet = await swarm(t) + const dht = testnet.nodes[0] + + const targetHash = 'test-target-hash' + t.absent(dht._connectionCache.get(targetHash), 'cache initially empty') + + const path = { nodes: ['node1', 'node2'], type: 'direct' } + dht._connectionCache.set(targetHash, { + path: path, + rtt: 100, + timestamp: Date.now() + }) + + t.ok(dht._connectionCache.get(targetHash), 'cache entry added') + + const cached = dht._connectionCache.get(targetHash) + t.ok(cached, 'cached entry exists') + t.alike(cached.path, path, 'cached path correct') + t.is(cached.rtt, 100, 'cached rtt correct') +}) + +test('rtt - direct connection cache', async function (t) { + const testnet = await swarm(t) + const dht = testnet.nodes[0] + + const targetHash = 'test-target-hash' + t.absent(dht._directConnectionCache.get(targetHash), 'direct cache initially empty') + + const address = { host: '127.0.0.1', port: 8080 } + dht._directConnectionCache.set(targetHash, { + address: address, + timestamp: Date.now() + }) + + t.ok(dht._directConnectionCache.get(targetHash), 'direct cache entry added') + + const cached = dht._directConnectionCache.get(targetHash) + t.ok(cached, 'cached entry exists') + t.alike(cached.address, address, 'cached address correct') +}) + +test('rtt - RTT statistics tracking', async function (t) { + const testnet = await swarm(t) + const dht = testnet.nodes[0] + + const node = { host: '127.0.0.1', port: 8080 } + + dht.updateNodeRTT(node, 100) + dht.updateNodeRTT(node, 150) + dht.updateNodeRTT(node, 200) + + const key = '127.0.0.1:8080' + const stats = dht._nodeRTT.get(key) + + t.ok(stats, 'stats exist') + t.is(stats.samples, 3, 'sample count correct') + t.ok(stats.srtt > 0, 'has SRTT') + t.ok(stats.rttvar >= 0, 'has RTTVAR') + t.ok(stats.lastUpdate > 0, 'lastUpdate timestamp set') + t.absent(stats.rtts, 'no rtts array in TCP-style') + t.absent(stats.minRTT, 'no minRTT in TCP-style') + t.absent(stats.maxRTT, 'no maxRTT in TCP-style') +}) + +test('rtt - TCP-style memory efficiency', async function (t) { + const testnet = await swarm(t) + const dht = testnet.nodes[0] + + const node = { host: '127.0.0.1', port: 8080 } + + for (let i = 0; i < 150; i++) { + dht.updateNodeRTT(node, 100 + Math.random() * 10) + } + + const key = '127.0.0.1:8080' + const stats = dht._nodeRTT.get(key) + + t.is(stats.samples, 150, 'sample count tracks all measurements') + t.absent(stats.rtts, 'no array storage in TCP-style') + t.ok(stats.srtt >= 100 && stats.srtt <= 110, 'SRTT converged') + t.ok(stats.rttvar < 10, 'RTTVAR stabilized') +}) + +test('rtt - preWarmRTT option', async function (t) { + const testnet1 = await swarm(t) + const dht1 = testnet1.nodes[0] + t.ok(dht1._rttWarmupInterval, 'warmup interval created by default') + + const testnet2 = await swarm(t) + const dht2 = testnet2.createNode({ preWarmRTT: false }) + await dht2.ready() + t.absent(dht2._rttWarmupInterval, 'warmup interval not created when disabled') +}) + +test('rtt - backward compatibility (no RTT methods)', async function (t) { + const testnet = await swarm(t) + const dht = testnet.nodes[0] + + const originalSort = dht.sortNodesByRTT + delete dht.sortNodesByRTT + + const nodes = [ + { host: '127.0.0.1', port: 8080 }, + { host: '127.0.0.1', port: 8081 } + ] + + t.pass('no crash without sortNodesByRTT') + + dht.sortNodesByRTT = originalSort +}) + +test('rtt - multiple nodes with same host different ports', async function (t) { + const testnet = await swarm(t) + const dht = testnet.nodes[0] + + const nodes = [ + { host: '127.0.0.1', port: 8080 }, + { host: '127.0.0.1', port: 8081 }, + { host: '127.0.0.1', port: 8082 } + ] + + dht.updateNodeRTT(nodes[0], 100) + dht.updateNodeRTT(nodes[1], 200) + dht.updateNodeRTT(nodes[2], 300) + + t.ok(dht._nodeRTT.get('127.0.0.1:8080'), 'all nodes tracked separately') + t.is(dht.getNodeRTT(nodes[0]), 100, 'first node RTT correct') + t.is(dht.getNodeRTT(nodes[1]), 200, 'second node RTT correct') + t.is(dht.getNodeRTT(nodes[2]), 300, 'third node RTT correct') +}) + +test('rtt - node with address object format', async function (t) { + const testnet = await swarm(t) + const dht = testnet.nodes[0] + + const node = { + address: { host: '127.0.0.1', port: 8080 } + } + + dht.updateNodeRTT(node, 150) + t.ok(dht._nodeRTT.get('127.0.0.1:8080'), 'node with address object added') + t.is(dht.getNodeRTT(node), 150, 'RTT retrieved correctly') + + const directNode = { host: '127.0.0.1', port: 8080 } + t.is(dht.getNodeRTT(directNode), 150, 'RTT retrievable with direct format') +}) + +test('rtt - warmup interval cleared on destroy', async function (t) { + const dht = new DHT({ bootstrap: [], preWarmRTT: true }) + await dht.ready() + + t.ok(dht._rttWarmupInterval, 'warmup interval active') + + await dht.destroy() + t.absent(dht._rttWarmupInterval, 'warmup interval cleared on destroy') +}) + +test('rtt - caches cleared on destroy', async function (t) { + const dht = new DHT({ bootstrap: [], preWarmRTT: false }) + await dht.ready() + + dht.updateNodeRTT({ host: '127.0.0.1', port: 8080 }, 123) + t.ok(Array.from(dht._nodeRTT.keys()).length > 0, 'RTT cache populated') + + await dht.destroy() + t.is(Array.from(dht._nodeRTT.keys()).length, 0, 'RTT cache cleared after destroy') +})