Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 163 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class HyperDHT extends DHT {

super({ ...opts, port, bootstrap, nodes, filterNode })

const { router, relayAddresses, persistent } = defaultCacheOpts(opts)
const { router, relayAddresses, nodeRTT, connectionCache, directConnectionCache, persistent } =
defaultCacheOpts(opts)

this.defaultKeyPair = opts.keyPair || createKeyPair(opts.seed)
this.listening = new Set()
Expand All @@ -46,6 +47,12 @@ class HyperDHT extends DHT {
this._validatedLocalAddresses = new Map()
this._relayAddressesCache = new Cache(relayAddresses)

this._nodeRTT = new Cache(nodeRTT)

this._connectionCache = new Cache(connectionCache)

this._directConnectionCache = new Cache(directConnectionCache)

this._deferRandomPunch = !!opts.deferRandomPunch
this._lastRandomPunch = this._deferRandomPunch ? Date.now() : 0
this._connectable = true
Expand All @@ -65,6 +72,63 @@ class HyperDHT extends DHT {
if (!this.online) return
for (const server of this.listening) server.notifyOnline()
})

this._rttWarmupInterval = null
if (opts.preWarmRTT !== false) {
this._startRTTWarmup()
}

this.parallelProbing = opts.parallelProbing !== false
}

_startRTTWarmup() {
if (this._rttWarmupInterval) return

this._rttWarmupInterval = setInterval(() => {
if (this.destroyed || !this.online) return

// Sample up to 10 random nodes for RTT measurement
const nodes = []
if (this.nodes && this.nodes.latest) {
let count = 0
for (let node = this.nodes.latest; node && count < 10; node = node.prev) {
if (node.host && node.port) {
nodes.push(node)
count++
}
}
}

for (const node of nodes) {
const key = `${node.host}:${node.port}`
const stats = this._nodeRTT.get(key)
const needsUpdate = !stats || Date.now() - stats.lastUpdate > 60000 // Update if older than 60s

if (needsUpdate) {
// Measure RTT in background (don't await)
const startTime = process.hrtime.bigint()
this.ping({ host: node.host, port: node.port }).then(
() => {
const endTime = process.hrtime.bigint()
const rtt = Number(endTime - startTime) / 1_000_000
if (rtt > 0 && rtt < 10000) {

this.updateNodeRTT(node, rtt)
}
},
() => {
}
)
}
}
}, 30000)
}

_stopRTTWarmup() {
if (this._rttWarmupInterval) {
clearInterval(this._rttWarmupInterval)
this._rttWarmupInterval = null
}
}

connect(remotePublicKey, opts) {
Expand Down Expand Up @@ -119,9 +183,104 @@ class HyperDHT extends DHT {
if (this._persistent) this._persistent.destroy()
await this.rawStreams.clear()
await this._socketPool.destroy()
this._stopRTTWarmup()
this._nodeRTT.clear()
this._connectionCache.clear()
this._directConnectionCache.clear()
await super.destroy()
}


getNodeRTT(node) {
if (!node) return null
const host = node.host || (node.address && node.address.host)
const port = node.port || (node.address && node.address.port)
if (!host || !port) return null

const key = `${host}:${port}`
const stats = this._nodeRTT.get(key)
return stats ? stats.srtt : null
}


updateNodeRTT(node, rtt) {
if (!node || !rtt || rtt <= 0) return

const host = node.host || (node.address && node.address.host)
const port = node.port || (node.address && node.address.port)
if (!host || !port) return

const key = `${host}:${port}`

const alpha = 0.125 // Weight for SRTT
const beta = 0.25 // Weight for RTTVAR

if (!this._nodeRTT.has(key)) {
// First measurement
this._nodeRTT.set(key, {
srtt: rtt,
rttvar: rtt / 2, // Initial variance estimate
samples: 1,
lastUpdate: Date.now()
})
} else {
const stats = this._nodeRTT.get(key)

stats.rttvar = (1 - beta) * stats.rttvar + beta * Math.abs(stats.srtt - rtt)

stats.srtt = (1 - alpha) * stats.srtt + alpha * rtt

stats.samples++
stats.lastUpdate = Date.now()
}
}


sortNodesByRTT(nodes) {
return nodes
.map((node) => ({
node,
rtt: this.getNodeRTT(node) || Infinity
}))
.sort((a, b) => a.rtt - b.rtt)
.map((item) => item.node)
}


getAverageRTT() {
if (!this._nodeRTT) return null

let totalRTT = 0
let count = 0
for (const stats of this._nodeRTT.values()) {
if (stats.srtt && stats.srtt > 0) {
totalRTT += stats.srtt
count++
}
}

return count > 0 ? totalRTT / count : null
}


getRTTBasedTimeout(baseTimeout = 10000, minMultiplier = 0.9, maxMultiplier = 1.5) {
const avgRTT = this.getAverageRTT()
if (!avgRTT) return baseTimeout

// Only optimize timeout if we have very fast RTT (< 100ms)
// For slower networks, keep the base timeout to avoid premature timeouts
if (avgRTT < 100) {
// For fast networks, use 10x RTT as timeout (very conservative)
const rttBasedTimeout = avgRTT * 10
const minTimeout = baseTimeout * minMultiplier
const maxTimeout = baseTimeout * maxMultiplier
return Math.max(minTimeout, Math.min(maxTimeout, rttBasedTimeout))
}

// For slower networks, don't reduce timeout - keep base
return baseTimeout
}

async validateLocalAddresses(addresses) {
const list = []
const socks = []
Expand Down Expand Up @@ -593,6 +752,9 @@ function defaultCacheOpts(opts) {
forwards: { maxSize, maxAge }
},
relayAddresses: { maxSize: Math.min(maxSize, 512), maxAge: 0 },
nodeRTT: { maxSize: Math.min(maxSize, 2048), maxAge: 3600000 },
connectionCache: { maxSize: Math.min(maxSize, 1024), maxAge: 60000 },
directConnectionCache: { maxSize: Math.min(maxSize, 1024), maxAge: 300000 },
persistent: {
records: { maxSize, maxAge },
refreshes: { maxSize, maxAge },
Expand Down
50 changes: 46 additions & 4 deletions lib/announcer.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,22 @@ module.exports = class Announcer {
const pings = []

for (const node of this._serverRelays[2].values()) {
pings.push(this.dht.ping(node))
const startTime = process.hrtime.bigint()
const pingPromise = this.dht.ping(node).then(
(res) => {
// Track RTT for relay nodes
const endTime = process.hrtime.bigint()
const rtt = Number(endTime - startTime) / 1_000_000 // ms
if (this.dht.updateNodeRTT) {
this.dht.updateNodeRTT(node, rtt)
}
return res
},
(err) => {
throw err
}
)
pings.push(pingPromise)
}

const active = await resolved(pings)
Expand Down Expand Up @@ -167,7 +182,7 @@ module.exports = class Announcer {
if (this.stopped || this.suspended) return

const ann = []
const replies = pickBest(q.closestReplies)
const replies = pickBest(q.closestReplies, this.dht)

const relays = []
const relayAddresses = []
Expand Down Expand Up @@ -295,8 +310,35 @@ function resolved(ps) {
})
}

function pickBest(replies) {
// TODO: pick the ones closest to us RTT wise
function pickBest(replies, dht) {
if (!replies || replies.length === 0) return []

if (dht && dht.sortNodesByRTT) {
const nodeToMsg = new Map()
const nodes = []

for (const msg of replies) {
if (msg.from) {
nodes.push(msg.from)
const key = `${msg.from.host}:${msg.from.port}`
nodeToMsg.set(key, msg)
}
}

if (nodes.length > 0) {
const sortedNodes = dht.sortNodesByRTT(nodes)

const sortedMsgs = []
for (const node of sortedNodes) {
const key = `${node.host}:${node.port}`
const msg = nodeToMsg.get(key)
if (msg) sortedMsgs.push(msg)
}

return sortedMsgs.slice(0, 3)
}
}

return replies.slice(0, 3)
}

Expand Down
Loading