Skip to content
67 changes: 67 additions & 0 deletions examples/search/client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
const DHT = require('hyperdht')
const vocabulary = require('./vocabulary')
const { SimHash } = require('simhash-vocabulary')
const Hyperdrive = require('hyperdrive')
const Hyperswarm = require('hyperswarm')
const Corestore = require('corestore')
const { spawn } = require('bare-subprocess')
const fs = require('bare-fs')
const process = require('bare-process')
const Hyperbee = require('hyperbee')

const beeKey = process.argv[2]
const searchTokens = process.argv.slice(3)

async function main() {
const node = new DHT({
ephemeral: true,
host: '127.0.0.1',
bootstrap: [{ host: '127.0.0.1', port: 49739 }],
experimentalSearch: true
})

const simhash = new SimHash(vocabulary)
const swarm = new Hyperswarm()
const store = new Corestore('./client')
const bee = new Hyperbee(store.get(beeKey), {
keyEncoding: 'utf-8',
valueEncoding: 'json'
})
await bee.ready()

swarm.on('connection', (conn) => {
console.log('connection')
store.replicate(conn)
})

const discovery = swarm.join(bee.discoveryKey, { client: true, server: false })
await discovery.flushed()

await new Promise((res) => setTimeout(res, 2000))

const res = await node.search(simhash.hash(['gif', ...searchTokens]))

for (const r of res) {
const file = await bee.get(r.values[0].toString('hex'))
console.log('Found', file.value.path, ', distance:', r.distance)
}

if (res.length > 0) {
const { value: file } = await bee.get(res[0].values[0].toString('hex'))

const drive = new Hyperdrive(store, Buffer.from(file.key, 'hex'))
await drive.ready()
const discovery = swarm.join(drive.discoveryKey, { client: true, server: false })
await discovery.flushed()

const fileData = await drive.get(file.path)
if (!fs.existsSync(`search-results/${file.path}`)) {
fs.mkdirSync(`search-results`)
}
fs.writeFileSync(`search-results/${file.path}`, fileData)

spawn('open', [`search-results/${file.path}`])
}
}

main()
18 changes: 18 additions & 0 deletions examples/search/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"dependencies": {
"bare-crypto": "^1.13.0",
"bare-fs": "^4.5.2",
"bare-process": "^4.2.2",
"bare-subprocess": "^5.2.2",
"corestore": "^7.7.0",
"hyperbee": "^2.26.5",
"hyperdht": "^6.28.0",
"hyperdrive": "^13.1.1",
"hyperswarm": "^4.16.0",
"localdrive": "^2.2.0",
"mirror-drive": "^1.10.0",
"searchable-record-cache": "^0.0.6",
"simhash-vocabulary": "^1.0.2",
"streamx": "^2.23.0"
}
}
84 changes: 84 additions & 0 deletions examples/search/server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
const { SimHash } = require('simhash-vocabulary')
const createTestnet = require('hyperdht/testnet')
const path = require('bare-path')
const { randomBytes } = require('bare-crypto')
const MirrorDrive = require('mirror-drive')
const Localdrive = require('localdrive')
const Hyperdrive = require('hyperdrive')
const Corestore = require('corestore')
const Hyperswarm = require('hyperswarm')
const { Transform } = require('streamx')
const vocabulary = require('./vocabulary')
const Hyperbee = require('hyperbee')

async function main() {
const testnet = await createTestnet(
10,
{
port: 49739
},
{ experimentalSearch: true }
)

const simhash = new SimHash(vocabulary)

function pushDHT(file) {
return new Transform({
async transform(chunk, cb) {
const tokens = path.basename(file).replace(/\..+$/, '').split('_').filter(Boolean)

const key = randomBytes(32)
await testnet.nodes[0].searchableRecordPut(simhash.hash(['gif', ...tokens]), key)
await bee.put(key.toString('hex'), {
path: file,
key: dst.key.toString('hex')
})
this.push(chunk)
cb(null)
}
})
}

const swarm = new Hyperswarm()
const store = new Corestore('./server')
const bee = new Hyperbee(store.get({ name: 'lookup' }), {
keyEncoding: 'utf-8',
valueEncoding: 'json'
})
await bee.ready()

swarm.on('connection', (conn) => {
console.log('connection')
store.replicate(conn)
})
const src = new Localdrive('./images')
const dst = new Hyperdrive(store)

const mirror = new MirrorDrive(src, dst, {
transformers: [
(file) => {
return pushDHT(file)
}
]
})

await mirror.done()

{
const discovery = swarm.join(bee.discoveryKey)
await discovery.flushed()
}

{
const discovery = swarm.join(dst.discoveryKey)
await discovery.flushed()
}

for await (const file of dst.list('.')) {
console.log('list', file) // => { key, value }
}

console.log('Serving DHT on', bee.key.toString('hex'), testnet.nodes[0].port)
}

main()
74 changes: 74 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const connect = require('./lib/connect')
const { FIREWALL, BOOTSTRAP_NODES, KNOWN_NODES, COMMANDS } = require('./lib/constants')
const { hash, createKeyPair } = require('./lib/crypto')
const { decode } = require('hypercore-id-encoding')
const { hammingDistance } = require('searchable-record-cache')
const RawStreamSet = require('./lib/raw-stream-set')
const ConnectionPool = require('./lib/connection-pool')
const { STREAM_NOT_CONNECTED } = require('./lib/errors')
Expand Down Expand Up @@ -52,6 +53,7 @@ class HyperDHT extends DHT {
this._randomPunchInterval = opts.randomPunchInterval || 20000 // min 20s between random punches...
this._randomPunches = 0
this._randomPunchLimit = 1 // set to one for extra safety for now
this._experimentalSearch = opts.experimentalSearch === true

this.once('persistent', () => {
this._persistent = new Persistent(this, persistent)
Expand Down Expand Up @@ -184,6 +186,65 @@ class HyperDHT extends DHT {
return this.query({ target, command: COMMANDS.LOOKUP, value: null }, opts)
}

async searchableRecordPut(target, value, opts = {}) {
if (!this._experimentalSearch) return

const query = this.query({ target, command: COMMANDS.SEARCH, value: null }, opts)
await query.finished()

for (const closest of query.closestReplies) {
await this.request(
{
target,
command: COMMANDS.SEARCHABLE_RECORD_PUT,
value: c.encode(m.searchableRecord, { value, key: target })
},
closest.from
)
}

return target
}

async search(target, opts = {}) {
if (!this._experimentalSearch) return

const query = this.query(
{
target,
command: COMMANDS.SEARCH,
value: c.encode(m.searchOptions, {
closest: opts.closest || 5,
values: opts.values || 5
})
},
opts
)

const results = []
const seen = new Set()

for await (const reply of query) {
if (reply.value) {
const res = c.decode(m.searchResponse, reply.value)

for (const r of res) {
const key = r.key.toString('hex')
if (seen.has(key)) continue

const distance = hammingDistance(r.key, target)
seen.add(key)
results.push({ ...r, distance, from: reply.from })
}
}
}

results.sort((a, b) => a.distance - b.distance)
while (results.length > 5) results.pop()

return results
}

lookupAndUnannounce(target, keyPair, opts = {}) {
const unannounces = []
const dht = this
Expand Down Expand Up @@ -426,6 +487,14 @@ class HyperDHT extends DHT {
this._persistent.onimmutableget(req)
return true
}
case COMMANDS.SEARCH: {
this._persistent.onsearch(req)
return true
}
case COMMANDS.SEARCHABLE_RECORD_PUT: {
this._persistent.onsearchablerecordput(req)
return true
}
}

return false
Expand Down Expand Up @@ -594,6 +663,7 @@ function defaultCacheOpts(opts) {
},
relayAddresses: { maxSize: Math.min(maxSize, 512), maxAge: 0 },
persistent: {
experimentalSearch: opts.experimentalSearch,
records: { maxSize, maxAge },
refreshes: { maxSize, maxAge },
mutables: {
Expand All @@ -604,6 +674,10 @@ function defaultCacheOpts(opts) {
maxSize: (maxSize / 2) | 0,
maxAge: opts.maxAge || 48 * 60 * 60 * 1000 // 48 hours
},
searchableRecords: {
maxSize: (maxSize / 2) | 0,
maxAge: opts.maxAge || 48 * 60 * 60 * 1000 // 48 hours
},
bumps: { maxSize, maxAge }
}
}
Expand Down
4 changes: 3 additions & 1 deletion lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ const COMMANDS = (exports.COMMANDS = {
MUTABLE_PUT: 6,
MUTABLE_GET: 7,
IMMUTABLE_PUT: 8,
IMMUTABLE_GET: 9
IMMUTABLE_GET: 9,
SEARCHABLE_RECORD_PUT: 10,
SEARCH: 11
})

exports.BOOTSTRAP_NODES = global.Pear?.config.dht?.bootstrap || [
Expand Down
53 changes: 53 additions & 0 deletions lib/messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -420,3 +420,56 @@ exports.mutableGetResponse = {
}
}
}

exports.searchOptions = {
preencode(state, m) {
c.uint.preencode(state, m.closest)
c.uint.preencode(state, m.values)
},
encode(state, m) {
c.uint.encode(state, m.closest)
c.uint.encode(state, m.values)
},
decode(state) {
return {
closest: c.uint.decode(state),
values: c.uint.decode(state)
}
}
}

exports.searchableRecord = {
preencode(state, m) {
c.fixed32.preencode(state, m.key)
c.fixed32.preencode(state, m.value)
},
encode(state, m) {
c.fixed32.encode(state, m.key)
c.fixed32.encode(state, m.value)
},
decode(state) {
return {
key: c.fixed32.decode(state),
value: c.fixed32.decode(state)
}
}
}

const searchResponseItem = {
preencode(state, m) {
c.fixed32.preencode(state, m.key)
c.array(c.fixed32).preencode(state, m.values)
},
encode(state, m) {
c.fixed32.encode(state, m.key)
c.array(c.fixed32).encode(state, m.values)
},
decode(state) {
return {
key: c.fixed32.decode(state),
values: c.array(c.fixed32).decode(state)
}
}
}

exports.searchResponse = c.array(searchResponseItem)
Loading