diff --git a/examples/search/client.js b/examples/search/client.js new file mode 100644 index 00000000..135f4d8c --- /dev/null +++ b/examples/search/client.js @@ -0,0 +1,67 @@ +const DHT = require('hyperdht') +const vocabulary = require('./vocabulary') +const { SimHash } = require('simhash-vocabulary') +const Hyperdrive = require('hyperdrive') +const Hyperswarm = require('hyperswarm') +const Corestore = require('corestore') +const { spawn } = require('bare-subprocess') +const fs = require('bare-fs') +const process = require('bare-process') +const Hyperbee = require('hyperbee') + +const beeKey = process.argv[2] +const searchTokens = process.argv.slice(3) + +async function main() { + const node = new DHT({ + ephemeral: true, + host: '127.0.0.1', + bootstrap: [{ host: '127.0.0.1', port: 49739 }], + experimentalSearch: true + }) + + const simhash = new SimHash(vocabulary) + const swarm = new Hyperswarm() + const store = new Corestore('./client') + const bee = new Hyperbee(store.get(beeKey), { + keyEncoding: 'utf-8', + valueEncoding: 'json' + }) + await bee.ready() + + swarm.on('connection', (conn) => { + console.log('connection') + store.replicate(conn) + }) + + const discovery = swarm.join(bee.discoveryKey, { client: true, server: false }) + await discovery.flushed() + + await new Promise((res) => setTimeout(res, 2000)) + + const res = await node.search(simhash.hash(['gif', ...searchTokens])) + + for (const r of res) { + const file = await bee.get(r.values[0].toString('hex')) + console.log('Found', file.value.path, ', distance:', r.distance) + } + + if (res.length > 0) { + const { value: file } = await bee.get(res[0].values[0].toString('hex')) + + const drive = new Hyperdrive(store, Buffer.from(file.key, 'hex')) + await drive.ready() + const discovery = swarm.join(drive.discoveryKey, { client: true, server: false }) + await discovery.flushed() + + const fileData = await drive.get(file.path) + if (!fs.existsSync(`search-results/${file.path}`)) { + fs.mkdirSync(`search-results`) + } + fs.writeFileSync(`search-results/${file.path}`, fileData) + + spawn('open', [`search-results/${file.path}`]) + } +} + +main() diff --git a/examples/search/package.json b/examples/search/package.json new file mode 100644 index 00000000..09f10cd0 --- /dev/null +++ b/examples/search/package.json @@ -0,0 +1,18 @@ +{ + "dependencies": { + "bare-crypto": "^1.13.0", + "bare-fs": "^4.5.2", + "bare-process": "^4.2.2", + "bare-subprocess": "^5.2.2", + "corestore": "^7.7.0", + "hyperbee": "^2.26.5", + "hyperdht": "^6.28.0", + "hyperdrive": "^13.1.1", + "hyperswarm": "^4.16.0", + "localdrive": "^2.2.0", + "mirror-drive": "^1.10.0", + "searchable-record-cache": "^0.0.6", + "simhash-vocabulary": "^1.0.2", + "streamx": "^2.23.0" + } +} diff --git a/examples/search/server.js b/examples/search/server.js new file mode 100644 index 00000000..5f402582 --- /dev/null +++ b/examples/search/server.js @@ -0,0 +1,84 @@ +const { SimHash } = require('simhash-vocabulary') +const createTestnet = require('hyperdht/testnet') +const path = require('bare-path') +const { randomBytes } = require('bare-crypto') +const MirrorDrive = require('mirror-drive') +const Localdrive = require('localdrive') +const Hyperdrive = require('hyperdrive') +const Corestore = require('corestore') +const Hyperswarm = require('hyperswarm') +const { Transform } = require('streamx') +const vocabulary = require('./vocabulary') +const Hyperbee = require('hyperbee') + +async function main() { + const testnet = await createTestnet( + 10, + { + port: 49739 + }, + { experimentalSearch: true } + ) + + const simhash = new SimHash(vocabulary) + + function pushDHT(file) { + return new Transform({ + async transform(chunk, cb) { + const tokens = path.basename(file).replace(/\..+$/, '').split('_').filter(Boolean) + + const key = randomBytes(32) + await testnet.nodes[0].searchableRecordPut(simhash.hash(['gif', ...tokens]), key) + await bee.put(key.toString('hex'), { + path: file, + key: dst.key.toString('hex') + }) + this.push(chunk) + cb(null) + } + }) + } + + const swarm = new Hyperswarm() + const store = new Corestore('./server') + const bee = new Hyperbee(store.get({ name: 'lookup' }), { + keyEncoding: 'utf-8', + valueEncoding: 'json' + }) + await bee.ready() + + swarm.on('connection', (conn) => { + console.log('connection') + store.replicate(conn) + }) + const src = new Localdrive('./images') + const dst = new Hyperdrive(store) + + const mirror = new MirrorDrive(src, dst, { + transformers: [ + (file) => { + return pushDHT(file) + } + ] + }) + + await mirror.done() + + { + const discovery = swarm.join(bee.discoveryKey) + await discovery.flushed() + } + + { + const discovery = swarm.join(dst.discoveryKey) + await discovery.flushed() + } + + for await (const file of dst.list('.')) { + console.log('list', file) // => { key, value } + } + + console.log('Serving DHT on', bee.key.toString('hex'), testnet.nodes[0].port) +} + +main() diff --git a/index.js b/index.js index 7e983360..bb6dfd3a 100644 --- a/index.js +++ b/index.js @@ -13,6 +13,7 @@ const connect = require('./lib/connect') const { FIREWALL, BOOTSTRAP_NODES, KNOWN_NODES, COMMANDS } = require('./lib/constants') const { hash, createKeyPair } = require('./lib/crypto') const { decode } = require('hypercore-id-encoding') +const { hammingDistance } = require('searchable-record-cache') const RawStreamSet = require('./lib/raw-stream-set') const ConnectionPool = require('./lib/connection-pool') const { STREAM_NOT_CONNECTED } = require('./lib/errors') @@ -52,6 +53,7 @@ class HyperDHT extends DHT { this._randomPunchInterval = opts.randomPunchInterval || 20000 // min 20s between random punches... this._randomPunches = 0 this._randomPunchLimit = 1 // set to one for extra safety for now + this._experimentalSearch = opts.experimentalSearch === true this.once('persistent', () => { this._persistent = new Persistent(this, persistent) @@ -184,6 +186,65 @@ class HyperDHT extends DHT { return this.query({ target, command: COMMANDS.LOOKUP, value: null }, opts) } + async searchableRecordPut(target, value, opts = {}) { + if (!this._experimentalSearch) return + + const query = this.query({ target, command: COMMANDS.SEARCH, value: null }, opts) + await query.finished() + + for (const closest of query.closestReplies) { + await this.request( + { + target, + command: COMMANDS.SEARCHABLE_RECORD_PUT, + value: c.encode(m.searchableRecord, { value, key: target }) + }, + closest.from + ) + } + + return target + } + + async search(target, opts = {}) { + if (!this._experimentalSearch) return + + const query = this.query( + { + target, + command: COMMANDS.SEARCH, + value: c.encode(m.searchOptions, { + closest: opts.closest || 5, + values: opts.values || 5 + }) + }, + opts + ) + + const results = [] + const seen = new Set() + + for await (const reply of query) { + if (reply.value) { + const res = c.decode(m.searchResponse, reply.value) + + for (const r of res) { + const key = r.key.toString('hex') + if (seen.has(key)) continue + + const distance = hammingDistance(r.key, target) + seen.add(key) + results.push({ ...r, distance, from: reply.from }) + } + } + } + + results.sort((a, b) => a.distance - b.distance) + while (results.length > 5) results.pop() + + return results + } + lookupAndUnannounce(target, keyPair, opts = {}) { const unannounces = [] const dht = this @@ -426,6 +487,14 @@ class HyperDHT extends DHT { this._persistent.onimmutableget(req) return true } + case COMMANDS.SEARCH: { + this._persistent.onsearch(req) + return true + } + case COMMANDS.SEARCHABLE_RECORD_PUT: { + this._persistent.onsearchablerecordput(req) + return true + } } return false @@ -594,6 +663,7 @@ function defaultCacheOpts(opts) { }, relayAddresses: { maxSize: Math.min(maxSize, 512), maxAge: 0 }, persistent: { + experimentalSearch: opts.experimentalSearch, records: { maxSize, maxAge }, refreshes: { maxSize, maxAge }, mutables: { @@ -604,6 +674,10 @@ function defaultCacheOpts(opts) { maxSize: (maxSize / 2) | 0, maxAge: opts.maxAge || 48 * 60 * 60 * 1000 // 48 hours }, + searchableRecords: { + maxSize: (maxSize / 2) | 0, + maxAge: opts.maxAge || 48 * 60 * 60 * 1000 // 48 hours + }, bumps: { maxSize, maxAge } } } diff --git a/lib/constants.js b/lib/constants.js index 0d43cd5e..8f44b9a5 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -10,7 +10,9 @@ const COMMANDS = (exports.COMMANDS = { MUTABLE_PUT: 6, MUTABLE_GET: 7, IMMUTABLE_PUT: 8, - IMMUTABLE_GET: 9 + IMMUTABLE_GET: 9, + SEARCHABLE_RECORD_PUT: 10, + SEARCH: 11 }) exports.BOOTSTRAP_NODES = global.Pear?.config.dht?.bootstrap || [ diff --git a/lib/messages.js b/lib/messages.js index b7fb4612..e3c246fa 100644 --- a/lib/messages.js +++ b/lib/messages.js @@ -420,3 +420,56 @@ exports.mutableGetResponse = { } } } + +exports.searchOptions = { + preencode(state, m) { + c.uint.preencode(state, m.closest) + c.uint.preencode(state, m.values) + }, + encode(state, m) { + c.uint.encode(state, m.closest) + c.uint.encode(state, m.values) + }, + decode(state) { + return { + closest: c.uint.decode(state), + values: c.uint.decode(state) + } + } +} + +exports.searchableRecord = { + preencode(state, m) { + c.fixed32.preencode(state, m.key) + c.fixed32.preencode(state, m.value) + }, + encode(state, m) { + c.fixed32.encode(state, m.key) + c.fixed32.encode(state, m.value) + }, + decode(state) { + return { + key: c.fixed32.decode(state), + value: c.fixed32.decode(state) + } + } +} + +const searchResponseItem = { + preencode(state, m) { + c.fixed32.preencode(state, m.key) + c.array(c.fixed32).preencode(state, m.values) + }, + encode(state, m) { + c.fixed32.encode(state, m.key) + c.array(c.fixed32).encode(state, m.values) + }, + decode(state) { + return { + key: c.fixed32.decode(state), + values: c.array(c.fixed32).decode(state) + } + } +} + +exports.searchResponse = c.array(searchResponseItem) diff --git a/lib/persistent.js b/lib/persistent.js index a97fea23..f63c9046 100644 --- a/lib/persistent.js +++ b/lib/persistent.js @@ -1,6 +1,7 @@ const c = require('compact-encoding') const sodium = require('sodium-universal') const RecordCache = require('record-cache') +const { SearchableRecordCache } = require('searchable-record-cache') const Cache = require('xache') const b4a = require('b4a') const unslab = require('unslab') @@ -21,6 +22,8 @@ module.exports = class Persistent { this.refreshes = new Cache(opts.refreshes) this.mutables = new Cache(opts.mutables) this.immutables = new Cache(opts.immutables) + this.searchableRecords = + opts.experimentalSearch === true ? new SearchableRecordCache(opts.searchableRecords) : null } onlookup(req) { @@ -226,11 +229,47 @@ module.exports = class Persistent { req.reply(null) } + async onsearchablerecordput(req) { + if (!req.target || !this.searchableRecords) return + + const doc = c.decode(m.searchableRecord, req.value) + const { key, value } = doc + + this.searchableRecords.add(key, value) + + req.reply(null) + } + + onsearch(req) { + if (!req.target || !req.value || !this.searchableRecords) { + // allow searchable record put to do an empty search + req.reply(null) + return + } + + const opts = c.decode(m.searchOptions, req.value) + const results = [] + + const res = this.searchableRecords.search(req.target, opts) + + for (const r of res) { + results.push({ + values: r.values, + key: r.key, + distance: r.distance + }) + } + + results.sort((a, b) => a.distance - b.distance) + req.reply(c.encode(m.searchResponse, results.slice(0, 12))) // max we can fit + } + destroy() { this.records.destroy() this.refreshes.destroy() this.mutables.destroy() this.immutables.destroy() + if (this.searchableRecords) this.searchableRecords.destroy() } static signMutable(seq, value, keyPair) { diff --git a/package.json b/package.json index 32d1351e..0d8255d0 100644 --- a/package.json +++ b/package.json @@ -39,6 +39,7 @@ "noise-handshake": "^4.0.0", "record-cache": "^1.1.1", "safety-catch": "^1.0.1", + "searchable-record-cache": "^1.0.2", "signal-promise": "^1.0.3", "sodium-universal": "^5.0.1", "streamx": "^2.16.1", @@ -51,7 +52,8 @@ "graceful-goodbye": "^1.3.0", "newline-decoder": "^1.0.2", "prettier": "^3.6.2", - "prettier-config-holepunch": "^2.0.0" + "prettier-config-holepunch": "^2.0.0", + "simhash-vocabulary": "^1.0.2" }, "scripts": { "format": "prettier --write .", diff --git a/test/all.js b/test/all.js index 6c5c24f7..35fc39d3 100644 --- a/test/all.js +++ b/test/all.js @@ -16,6 +16,7 @@ async function runTests() { await import('./noncustodial.js') await import('./pool.js') await import('./relaying.js') + await import('./search.js') await import('./storing.js') test.resume() diff --git a/test/helpers/index.js b/test/helpers/index.js index 658a53c8..01724d4d 100644 --- a/test/helpers/index.js +++ b/test/helpers/index.js @@ -12,8 +12,8 @@ async function toArray(iterable) { return result } -async function swarm(t, n = 32, bootstrap = []) { - return createTestnet(n, { bootstrap, teardown: t.teardown }) +async function swarm(t, n = 32, bootstrap = [], opts = {}) { + return createTestnet(n, { bootstrap, teardown: t.teardown }, opts) } async function* spawnFixture(t, args) { diff --git a/test/messages.js b/test/messages.js index 72bacbf1..4fc805cc 100644 --- a/test/messages.js +++ b/test/messages.js @@ -440,3 +440,26 @@ test('announce with refresh', function (t) { t.alike(d, ann) }) + +test('search', function (t) { + const state = { start: 0, end: 0, buffer: null } + + const searchResponse = [] + for (let i = 0; i < 12; i++) { + searchResponse.push({ + key: Buffer.alloc(32).fill(i), + values: [Buffer.alloc(32).fill(i)] + }) + } + + m.searchResponse.preencode(state, searchResponse) + state.buffer = b4a.allocUnsafe(state.end) + m.searchResponse.encode(state, searchResponse) + + t.is(state.end, state.start, 'fully encoded') + + state.start = 0 + const d = m.searchResponse.decode(state) + + t.alike(d, searchResponse) +}) diff --git a/test/search.js b/test/search.js new file mode 100644 index 00000000..ad952dea --- /dev/null +++ b/test/search.js @@ -0,0 +1,358 @@ +const test = require('brittle') +const { swarm } = require('./helpers') + +const { SimHash } = require('simhash-vocabulary') +const { randomBytes } = require('hypercore-crypto') + +test('search - disabled', async function (t) { + const simhash = new SimHash(vocabulary) + const { nodes } = await swarm(t, 100, []) + + // disabled + t.is(nodes[30]._experimentalSearch, false) + t.is(nodes[30]._persistent.searchableRecords, null) + + const pointer = randomBytes(32) + + await nodes[30].searchableRecordPut(simhash.hash(['planet', 'satellite']), pointer) + + const res = await nodes[30].search(simhash.hash(['planet', 'satellite'])) + t.is(res, undefined) +}) + +test('search - enabled', async function (t) { + const simhash = new SimHash(vocabulary) + const { nodes } = await swarm(t, 100, [], { experimentalSearch: true }) + + // enabled + t.is(nodes[30]._experimentalSearch, true) + t.ok(nodes[30]._persistent.searchableRecords) + + const pointer = randomBytes(32) + + await nodes[30].searchableRecordPut(simhash.hash(['planet', 'satellite']), pointer) + + const res = await nodes[30].search(simhash.hash(['planet', 'satellite'])) + t.is(res.length, 1) + t.is(res[0].values[0].toString('hex'), pointer.toString('hex')) +}) + +test('search - gc', async function (t) { + const simhash = new SimHash(vocabulary) + const { nodes } = await swarm(t, 100, [], { + maxAge: 100, // give us some time to do a search + experimentalSearch: true + }) + + const pointer = randomBytes(32) + + await nodes[30].searchableRecordPut(simhash.hash(['planet', 'satellite']), pointer) + + const res = await nodes[30].search(simhash.hash(['planet', 'satellite'])) + t.is(res.length, 1) + t.is(res[0].values[0].toString('hex'), pointer.toString('hex')) + + await new Promise((res) => setTimeout(res, 250)) + + // after gc + { + const res = await nodes[30].search(['planet', 'satellite']) + t.is(res.length, 0) + } +}) + +const vocabulary = [ + 'apple', + 'table', + 'window', + 'pencil', + 'chair', + 'water', + 'book', + 'garden', + 'cloud', + 'bridge', + 'mountain', + 'river', + 'ocean', + 'forest', + 'stone', + 'mirror', + 'candle', + 'basket', + 'flower', + 'blanket', + 'pillow', + 'lamp', + 'carpet', + 'clock', + 'drawer', + 'kitchen', + 'ceiling', + 'hallway', + 'doorway', + 'staircase', + 'bedroom', + 'bathroom', + 'garage', + 'driveway', + 'sidewalk', + 'pavement', + 'street', + 'highway', + 'tunnel', + 'building', + 'tower', + 'castle', + 'temple', + 'statue', + 'fountain', + 'plaza', + 'market', + 'bakery', + 'grocery', + 'pharmacy', + 'library', + 'museum', + 'theater', + 'stadium', + 'airport', + 'station', + 'platform', + 'terminal', + 'vessel', + 'anchor', + 'harbor', + 'lighthouse', + 'island', + 'peninsula', + 'continent', + 'planet', + 'satellite', + 'telescope', + 'microscope', + 'thermometer', + 'compass', + 'calculator', + 'computer', + 'keyboard', + 'monitor', + 'printer', + 'scanner', + 'camera', + 'photograph', + 'painting', + 'sculpture', + 'drawing', + 'sketch', + 'canvas', + 'palette', + 'brush', + 'crayon', + 'marker', + 'eraser', + 'ruler', + 'notebook', + 'folder', + 'envelope', + 'package', + 'container', + 'bottle', + 'jar', + 'cup', + 'plate', + 'bowl', + 'spoon', + 'fork', + 'knife', + 'napkin', + 'tablecloth', + 'counter', + 'cabinet', + 'shelf', + 'closet', + 'wardrobe', + 'dresser', + 'nightstand', + 'sofa', + 'armchair', + 'cushion', + 'curtain', + 'blinds', + 'shutter', + 'rooftop', + 'chimney', + 'gutter', + 'fence', + 'gate', + 'pathway', + 'meadow', + 'valley', + 'hillside', + 'plateau', + 'canyon', + 'desert', + 'tundra', + 'glacier', + 'waterfall', + 'stream', + 'pond', + 'swamp', + 'marsh', + 'prairie', + 'woodland', + 'grove', + 'orchard', + 'vineyard', + 'farmland', + 'pasture', + 'barnyard', + 'stable', + 'henhouse', + 'silo', + 'tractor', + 'wagon', + 'barrel', + 'bucket', + 'rake', + 'hatchet', + 'hammer', + 'wrench', + 'screwdriver', + 'pliers', + 'toolbox', + 'workbench', + 'sawdust', + 'timber', + 'plank', + 'beam', + 'brick', + 'sand', + 'clay', + 'soil', + 'dust', + 'pebble', + 'boulder', + 'granite', + 'marble', + 'limestone', + 'quartz', + 'crystal', + 'diamond', + 'emerald', + 'sapphire', + 'ruby', + 'topaz', + 'amber', + 'pearl', + 'coral', + 'seashell', + 'starfish', + 'dolphin', + 'whale', + 'shark', + 'octopus', + 'jellyfish', + 'seahorse', + 'turtle', + 'penguin', + 'pelican', + 'seagull', + 'sparrow', + 'robin', + 'cardinal', + 'bluebird', + 'finch', + 'hummingbird', + 'butterfly', + 'dragonfly', + 'beetle', + 'grasshopper', + 'cricket', + 'firefly', + 'caterpillar', + 'ladybug', + 'spider', + 'squirrel', + 'rabbit', + 'chipmunk', + 'raccoon', + 'beaver', + 'otter', + 'hedgehog', + 'porcupine', + 'badger', + 'weasel', + 'mole', + 'gopher', + 'hamster', + 'gerbil', + 'guinea', + 'ferret', + 'parrot', + 'canary', + 'parakeet', + 'macaw', + 'cockatoo', + 'pigeon', + 'dove', + 'falcon', + 'eagle', + 'hawk', + 'owl', + 'vulture', + 'ostrich', + 'flamingo', + 'peacock', + 'pheasant', + 'quail', + 'turkey', + 'chicken', + 'rooster', + 'duckling', + 'gosling', + 'cygnet', + 'foal', + 'calf', + 'lamb', + 'piglet', + 'kitten', + 'puppy' +] + +test.skip('search - big', async function (t) { + const simhash = new SimHash(vocabulary) + const { nodes } = await swarm(t, 2000, [], { experimentalSearch: true }) + + const targetPointer = randomBytes(32) + + const target = generateDoc() + await nodes[30].searchableRecordPut(simhash.hash(target), targetPointer) + + t.comment('creating docs') + + for (let i = 0; i < 10_000; i++) { + const tokens = generateDoc() + const pointer = randomBytes(32) + await nodes[30].searchableRecordPut(simhash.hash(tokens), pointer) + } + + t.pass('setup') + + const time = Date.now() + const res = await nodes[30].search(simhash.hash(target), { closest: 10, values: 1 }) + + t.ok(res.length, 5) + + t.is(res[0].values[0].toString('hex'), targetPointer.toString('hex')) + t.comment('searched', Date.now() - time) +}) + +function generateDoc(tokenSize = 256) { + const words = [] + for (let i = 0; i < tokenSize; i++) { + words.push(vocabulary[Math.floor(Math.random() * vocabulary.length)]) + } + + return words +} diff --git a/testnet.js b/testnet.js index 7c6cf787..e71aee9a 100644 --- a/testnet.js +++ b/testnet.js @@ -1,6 +1,6 @@ const DHT = require('.') -module.exports = async function createTestnet(size = 10, opts = {}) { +module.exports = async function createTestnet(size = 10, opts = {}, dhtOpts = {}) { const swarm = [] const teardown = typeof opts === 'function' ? opts : opts.teardown ? opts.teardown.bind(opts) : noop @@ -12,6 +12,7 @@ module.exports = async function createTestnet(size = 10, opts = {}) { if (size === 0) return new Testnet(swarm) const first = new DHT({ + ...dhtOpts, ephemeral: false, firewalled: false, bootstrap, @@ -27,6 +28,7 @@ module.exports = async function createTestnet(size = 10, opts = {}) { while (swarm.length < size) { const node = new DHT({ + ...dhtOpts, ephemeral: false, firewalled: false, bootstrap,