diff --git a/index.js b/index.js index d181e27..87bde0e 100644 --- a/index.js +++ b/index.js @@ -1,7 +1,7 @@ -const Hyperbee = require('hyperbee') +const Hyperbee = require('hyperbee2') const Hyperblobs = require('hyperblobs') const isOptions = require('is-options') -const { Writable, Readable } = require('streamx') +const { Writable, Readable, Transform, pipeline } = require('streamx') const unixPathResolve = require('unix-path-resolve') const MirrorDrive = require('mirror-drive') const SubEncoder = require('sub-encoder') @@ -12,8 +12,10 @@ const Hypercore = require('hypercore') const { BLOCK_NOT_AVAILABLE, BAD_ARGUMENT } = require('hypercore-errors') const Monitor = require('./lib/monitor') const Download = require('./lib/download') +const c = require('compact-encoding') const keyEncoding = new SubEncoder('files', 'utf-8') +const writeEncoding = require('./lib/encoding') const [BLOBS] = crypto.namespace('hyperdrive', 1) @@ -37,8 +39,8 @@ module.exports = class Hyperdrive extends ReadyResource { this._active = opts.active !== false this._openingBlobs = null this._onwait = opts.onwait || null - this._batching = !!(opts._checkout === null && opts._db) this._checkout = opts._checkout || null + this._batch = null this.ready().catch(safetyCatch) } @@ -50,7 +52,7 @@ module.exports = class Hyperdrive extends ReadyResource { static async getDriveKey(corestore) { const core = makeBee(undefined, corestore) await core.ready() - const key = core.key + const key = core.core.key await core.close() return key } @@ -72,7 +74,7 @@ module.exports = class Hyperdrive extends ReadyResource { } _generateBlobsManifest() { - const m = this.db.core.manifest + const m = this.db.core.core.manifest if (this.db.core.core.compat) return null return generateContentManifest(m, this.core.key) @@ -95,7 +97,7 @@ module.exports = class Hyperdrive extends ReadyResource { } get version() { - return this.db.version + return this.db.core.length } get writable() { @@ -125,6 +127,8 @@ module.exports = class Hyperdrive extends ReadyResource { } await this.core.truncate(version) + await this.db.update() + await bl.core.truncate(blobsVersion) } @@ -133,7 +137,7 @@ module.exports = class Hyperdrive extends ReadyResource { if (!checkout) checkout = this.version - const c = this.db.checkout(checkout) + const c = this.db.checkout({ length: checkout }) try { return await getBlobsLength(c) @@ -160,16 +164,12 @@ module.exports = class Hyperdrive extends ReadyResource { } checkout(version) { - return this._makeCheckout(this.db.checkout(version)) + return this._makeCheckout(this.db.checkout({ length: version })) } batch() { - return new Hyperdrive(this.corestore, this.key, { - onwait: this._onwait, - encryptionKey: this.encryptionKey, - _checkout: null, - _db: this.db.batch() - }) + this._batch = this.db.write() + return this } setActive(bool) { @@ -181,8 +181,8 @@ module.exports = class Hyperdrive extends ReadyResource { } async flush() { - await this.db.flush() - return this.close() + await this._batch.flush() + this._batch = null } async _close() { @@ -192,24 +192,19 @@ module.exports = class Hyperdrive extends ReadyResource { await this.db.close() - if (!this._checkout && !this._batching) { + if (!this._checkout && !this._batch) { await this.corestore.close() } await this.closeMonitors() } - async _openBlobsFromHeader(opts) { + async _ensureBlobs() { if (this.blobs) return true - const header = await getBee(this.db).getHeader(opts) - if (!header) return false + await this.core.ready() - if (this.blobs) return true - - const contentKey = - header.metadata && header.metadata.contentFeed && header.metadata.contentFeed.subarray(0, 32) - const blobsKey = contentKey || Hypercore.key(this._generateBlobsManifest()) + const blobsKey = Hypercore.key(this._generateBlobsManifest()) if (!blobsKey || blobsKey.length < 32) throw new Error('Invalid or no Blob store key set') const blobsCore = this.corestore.get({ @@ -217,7 +212,7 @@ module.exports = class Hyperdrive extends ReadyResource { cache: false, onwait: this._onwait, encryptionKey: this.encryptionKey, - keyPair: !contentKey && this.db.core.writable ? this.db.core.keyPair : null, + keyPair: this.db.core.writable ? this.db.core.keyPair : null, active: this._active }) await blobsCore.ready() @@ -242,7 +237,7 @@ module.exports = class Hyperdrive extends ReadyResource { return } - await this._openBlobsFromHeader({ wait: false }) + await this.db.ready() if (this.db.core.writable && !this.blobs) { const m = this._generateBlobsManifest() @@ -265,17 +260,10 @@ module.exports = class Hyperdrive extends ReadyResource { this.emit('blobs', this.blobs) this.emit('content-key', blobsCore.key) } - - await this.db.ready() - - if (!this.blobs) { - // eagerly load the blob store.... - this._openingBlobs = this._openBlobsFromHeader() - this._openingBlobs.catch(safetyCatch) - } } async getBlobs() { + await this._ensureBlobs() if (this.blobs) return this.blobs if (this._checkout) { @@ -301,6 +289,7 @@ module.exports = class Hyperdrive extends ReadyResource { } async get(name, opts) { + await this._ensureBlobs() const node = await this.entry(name, opts) if (!node?.value.blob) return null await this.getBlobs() @@ -313,15 +302,21 @@ module.exports = class Hyperdrive extends ReadyResource { async put(name, buf, { executable = false, metadata = null } = {}) { await this.getBlobs() const blob = await this.blobs.put(buf) - return this.db.put( - std(name, false), - { executable, linkname: null, blob, metadata }, - { keyEncoding } + const w = this._batch || this.db.write() + w.tryPut( + Buffer.from(std(name, false)), + c.encode(writeEncoding, { executable, linkname: null, blob, metadata }) ) + + if (!this._batch) { + return w.flush() + } } async del(name) { - return this.db.del(std(name, false), { keyEncoding }) + const w = this.db.write() + w.tryDelete(Buffer.from(std(name, false))) + return w.flush() } compare(a, b) { @@ -369,14 +364,17 @@ module.exports = class Hyperdrive extends ReadyResource { } async symlink(name, dst, { metadata = null } = {}) { - return this.db.put( - std(name, false), - { executable: false, linkname: dst, blob: null, metadata }, - { keyEncoding } + const w = this.db.write() + w.tryPut( + Buffer.from(std(name, false)), + c.encode(writeEncoding, { executable: false, linkname: dst, metadata }) ) + return w.flush() } async entry(name, opts) { + await this._ensureBlobs() + if (!opts || !opts.follow) return this._entry(name, opts) for (let i = 0; i < 16; i++) { @@ -392,34 +390,49 @@ module.exports = class Hyperdrive extends ReadyResource { async _entry(name, opts) { if (typeof name !== 'string') return name - return this.db.get(std(name, false), { ...opts, keyEncoding }) + const node = await this.db.get(Buffer.from(std(name, false)), { ...opts }) + if (!node) return null + + return { key: node.key.toString(), seq: node.seq, value: c.decode(writeEncoding, node.value) } } async exists(name) { return (await this.entry(name)) !== null } - watch(folder) { - folder = std(folder || '/', true) - - return this.db.watch(prefixRange(folder), { - keyEncoding, - map: (snap) => this._makeCheckout(snap) - }) - } - diff(length, folder, opts) { if (typeof folder === 'object' && folder && !opts) return this.diff(length, null, folder) folder = std(folder || '/', true) - return this.db.createDiffStream(length, prefixRange(folder), { - ...opts, - keyEncoding + const snap = this.db.checkout({ length }) + const range = prefixRange(folder) + + const decodeNode = (node) => { + if (!node) return null + + return { + key: node.key.toString(), + value: c.decode(writeEncoding, node.value) + } + } + + const transform = new Transform({ + transform(from, cb) { + this.push({ + left: decodeNode(from.left), + right: decodeNode(from.right) + }) + cb() + } }) + + const stream = pipeline(this.db.createDiffStream(snap, { ...opts, ...range }), transform) + return stream } async downloadDiff(length, folder, opts) { + await this._ensureBlobs() const dls = [] for await (const entry of this.diff(length, folder, opts)) { @@ -434,6 +447,7 @@ module.exports = class Hyperdrive extends ReadyResource { } async downloadRange(dbRanges, blobRanges) { + await this._ensureBlobs() const dls = [] await this.ready() @@ -451,8 +465,17 @@ module.exports = class Hyperdrive extends ReadyResource { return new Download(this, null, { downloads: dls }) } - entries(range, opts) { - const stream = this.db.createReadStream(range, { ...opts, keyEncoding }) + entries(range = {}, opts = {}) { + const transform = new Transform({ + transform(from, cb) { + this.push({ key: from.key.toString(), value: c.decode(writeEncoding, from.value) }) + cb() + } + }) + const stream = pipeline( + this.db.createReadStream({ ...opts, ...transformRange(range) }), + transform + ) if (opts && opts.ignore) stream._readableState.map = createStreamMapIgnore(opts.ignore) return stream } @@ -464,6 +487,7 @@ module.exports = class Hyperdrive extends ReadyResource { } async has(path) { + await this._ensureBlobs() const blobs = await this.getBlobs() const entry = !path || path.endsWith('/') ? null : await this.entry(path) if (entry) { @@ -618,13 +642,13 @@ module.exports = class Hyperdrive extends ReadyResource { onfinish = null if (err) return cb(err) - self.db - .put( - std(name, false), - { executable, linkname: null, blob: ws.id, metadata }, - { keyEncoding } - ) - .then(() => cb(null), cb) + + const w = self.db.write() + w.tryPut( + Buffer.from(std(name, false)), + c.encode(writeEncoding, { executable, linkname: null, blob: ws.id, metadata }) + ) + w.flush().then(() => cb(null), cb) } function callOndrain(err) { @@ -663,6 +687,7 @@ function shallowReadStream(files, folder, keys, ignore, opts) { return cb(null) } + node.key = node.key.toString() const suffix = node.key.slice(folder.length + 1) const i = suffix.indexOf('/') const name = i === -1 ? suffix : suffix.slice(0, i) @@ -682,29 +707,17 @@ function shallowReadStream(files, folder, keys, ignore, opts) { return } - this.push(keys ? name : node) + this.push(keys ? name.toString() : node) cb(null) } }) } function makeBee(key, corestore, opts = {}) { - const name = key ? undefined : 'db' - const core = corestore.get({ + return new Hyperbee(corestore, { key, - name, - exclusive: true, - onwait: opts.onwait, - encryptionKey: opts.encryptionKey, - compat: opts.compat, - active: opts.active - }) - - return new Hyperbee(core, { - keyEncoding: 'utf-8', - valueEncoding: 'json', - metadata: { contentFeed: null }, - extension: opts.extension + autoUpdate: true, + ...opts }) } @@ -727,7 +740,7 @@ function validateFilename(name) { function prefixRange(name, prev = '/') { // '0' is binary +1 of / - return { gt: name + prev, lt: name + '0' } + return { gt: Buffer.from(name + prev), lt: Buffer.from(name + '0') } } function generateContentManifest(m, key) { @@ -754,11 +767,10 @@ function generateContentManifest(m, key) { async function getBlobsLength(db) { let length = 0 - for await (const { value } of db.createReadStream()) { - const b = value && value.blob + const b = value ? c.decode(writeEncoding, value) : null if (!b) continue - const len = b.blockOffset + b.blockLength + const len = b.blob.blockOffset + b.blob.blockLength if (len > length) length = len } @@ -769,9 +781,19 @@ function toIgnoreFunction(ignore) { if (typeof ignore === 'function') return ignore const all = [].concat(ignore).map((e) => unixPathResolve('/', e)) - return (key) => all.some((path) => path === key || key.startsWith(path + '/')) + return (key) => + all.some((path) => path === key.toString() || key.toString().startsWith(path + '/')) } function createStreamMapIgnore(ignore) { return (node) => (ignore(node.key) ? null : node) } + +function transformRange(range) { + return { + gt: range.gt ? Buffer.from(range.gt) : range.gt, + gte: range.gte ? Buffer.from(range.gte) : range.gte, + lt: range.lt ? Buffer.from(range.lt) : range.lt, + lte: range.lte ? Buffer.from(range.lte) : range.lte + } +} diff --git a/lib/encoding.js b/lib/encoding.js new file mode 100644 index 0000000..1fe973f --- /dev/null +++ b/lib/encoding.js @@ -0,0 +1,16 @@ +const c = require('compact-encoding') +const { compile, opt } = require('compact-encoding-struct') + +const blob = compile({ + byteOffset: c.uint, + blockOffset: c.uint, + blockLength: c.uint, + byteLength: c.uint +}) + +module.exports = compile({ + executable: c.bool, + linkname: opt(c.string), + blob: opt(blob), + metadata: opt(c.json) +}) diff --git a/package.json b/package.json index 00c4d88..df790de 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,9 @@ }, "homepage": "https://github.com/holepunchto/hyperdrive#readme", "dependencies": { + "compact-encoding-struct": "^1.3.0", "hyperbee": "^2.11.1", + "hyperbee2": "github:holepunchto/hyperbee2", "hyperblobs": "^2.3.0", "hypercore": "^11.0.0", "hypercore-errors": "^1.0.0", diff --git a/test.js b/test.js index 083945d..09d2b0d 100644 --- a/test.js +++ b/test.js @@ -15,18 +15,18 @@ const Hyperdrive = require('./index.js') test('drive.core', async (t) => { const { drive } = await testenv(t) - t.is(drive.db.feed, drive.core) + t.is(drive.db.core, drive.core) }) test('drive.version', async (t) => { const { drive } = await testenv(t) await drive.put(__filename, fs.readFileSync(__filename)) - t.is(drive.db.feed.length, drive.version) + t.is(drive.db.core.length, drive.version) }) test('drive.key', async (t) => { const { drive } = await testenv(t) - t.is(b4a.compare(drive.db.feed.key, drive.key), 0) + t.is(b4a.compare(drive.db.core.key, drive.key), 0) }) test('drive.discoveryKey', async (t) => { @@ -373,85 +373,6 @@ test('symlink(key, linkname) resolve key path', async function (t) { await symlinkAndEntry('\\examples\\more\\h.txt', '/examples/more/h.txt') }) -test('watch() basic', async function (t) { - t.plan(5) - - const { drive } = await testenv(t) - const buf = b4a.from('hi') - - const watcher = drive.watch() - - eventFlush().then(async () => { - await drive.put('/a.txt', buf) - }) - - for await (const [current, previous] of watcher) { - // eslint-disable-line no-unreachable-loop - t.ok(current instanceof Hyperdrive) - t.ok(previous instanceof Hyperdrive) - t.is(current.version, 2) - t.is(previous.version, 1) - t.alike(await current.get('/a.txt'), buf) - break - } -}) - -test('watch(folder) basic', async function (t) { - t.plan(1) - - const { drive } = await testenv(t) - const buf = b4a.from('hi') - - await drive.put('/README.md', buf) - await drive.put('/examples/a.txt', buf) - await drive.put('/examples/more/a.txt', buf) - - const watcher = drive.watch('/examples') - - let next = watcher.next() - let onchange = null - next.then((data) => { - next = watcher.next() - onchange(data) - }) - - onchange = () => t.fail('should not trigger changes') - await drive.put('/b.txt', buf) - await eventFlush() - onchange = null - - onchange = () => t.pass('change') - await drive.put('/examples/b.txt', buf) - await eventFlush() - onchange = null -}) - -test('watch(folder) should normalize folder', async function (t) { - t.plan(1) - - const { drive } = await testenv(t) - const buf = b4a.from('hi') - - const watcher = drive.watch('examples//more//') - - let next = watcher.next() - let onchange = null - next.then((data) => { - next = watcher.next() - onchange(data) - }) - - onchange = () => t.fail('should not trigger changes') - await drive.put('/examples/a.txt', buf) - await eventFlush() - onchange = null - - onchange = () => t.pass('change') - await drive.put('/examples/more/a.txt', buf) - await eventFlush() - onchange = null -}) - test('drive.diff(length)', async (t) => { const { drive, @@ -497,7 +418,10 @@ test('drive.entries()', async (t) => { for await (const entry of drive.entries()) { for (const _entry of entries) { - if (JSON.stringify(_entry) === JSON.stringify(entry)) { + if ( + JSON.stringify(_entry.value) === JSON.stringify(entry.value) && + _entry.key === entry.key + ) { entries.delete(_entry) break } @@ -863,9 +787,8 @@ test('drive.batch() & drive.flush()', async (t) => { await batch.put('/file.txt', b4a.from('abc')) t.absent(await drive.get('/file.txt')) - await batch.flush() - t.ok(batch.blobs.core.closed) + t.absent(drive.blobs.core.closed) t.absent(drive.db.closed) t.absent(drive.db.core.closed) @@ -904,7 +827,7 @@ test('drive.close() on snapshots--does not close parent', async (t) => { await drive.put('/foo', b4a.from('bar')) - const checkout = drive.checkout(2) + const checkout = drive.checkout(1) await checkout.get('/foo') await checkout.close() @@ -920,13 +843,13 @@ test('drive.batch() on non-ready drive', async (t) => { await batch.put('/x', 'something') await batch.flush() - t.is(batch.blobs.core.closed, true) t.ok(await drive.get('/x')) await drive.close() }) +// TODO fix in hyperbee2 test('drive.close() for future checkout', async (t) => { const { drive } = await testenv(t) await drive.put('some', 'thing') @@ -1088,6 +1011,7 @@ test.skip('drive.clearAll() with diff', async (t) => { await b.close() }) +// TODO fix test('entry(key) cancelled when checkout closes', async function (t) { const { drive } = await testenv(t) await drive.put('some', '1') @@ -1157,6 +1081,8 @@ test('basic writable option', async function (t) { const b = new Hyperdrive(store.session({ writable: false }), a.key) await b.ready() + await b.getBlobs() + t.is(b.writable, false) t.is(b.blobs.core.writable, false) @@ -1240,7 +1166,7 @@ test('basic follow entry', async function (t) { t.is((await drive.entry('/file.shortcut')).value.linkname, '/file.txt') t.alike(await drive.entry('/file.shortcut', { follow: true }), { - seq: 1, + seq: 0, key: '/file.txt', value: { executable: false, @@ -1264,7 +1190,7 @@ test('multiple follow entry', async function (t) { t.is((await drive.entry('/file.shortcut.shortcut')).value.linkname, '/file.shortcut') t.alike(await drive.entry('/file.shortcut.shortcut', { follow: true }), { - seq: 1, + seq: 0, key: '/file.txt', value: { executable: false, @@ -1339,6 +1265,8 @@ test('drive.entry(key, { wait })', async (t) => { await replicate(drive, swarm, mirror) await drive.put('/file.txt', b4a.from('hi')) + await eventFlush() + await mirror.drive.getBlobs() await swarm.destroy() @@ -1359,6 +1287,8 @@ test('drive.get(key, { timeout })', async (t) => { await replicate(drive, swarm, mirror) await drive.put('/file.txt', b4a.from('hi')) + await eventFlush() + await mirror.drive.getBlobs() const entry = await mirror.drive.entry('/file.txt') @@ -1383,9 +1313,11 @@ test('drive.get(key, { wait }) with entry but no blob', async (t) => { await replicate(drive, swarm, mirror) await drive.put('/file.txt', b4a.from('hi')) + await eventFlush() + await mirror.drive.getBlobs() - const mirrorCheckout = mirror.drive.checkout(2) + const mirrorCheckout = mirror.drive.checkout(1) const entry = await mirrorCheckout.entry('/file.txt') t.ok(entry) t.ok(entry.value.blob) @@ -1409,6 +1341,8 @@ test('drive.get(key, { wait }) without entry', async (t) => { await replicate(drive, swarm, mirror) await drive.put('/file.txt', b4a.from('hi')) + await eventFlush() + await mirror.drive.getBlobs() await swarm.destroy() @@ -1465,9 +1399,9 @@ test('getBlobsLength happy paths', async (t) => { await drive.put('./file', 'here') t.is(await drive.getBlobsLength(), 2, 'Correct blobs length 2') - t.is(drive.version, 3, 'sanity check') - t.is(await drive.getBlobsLength(2), 1, 'Correct blobs length on explicit checkout') - t.is(await drive.getBlobsLength(3), 2, 'Correct blobs length on explicit checkout to latest') + t.is(drive.version, 2, 'sanity check') + t.is(await drive.getBlobsLength(2), 2, 'Correct blobs length on explicit checkout') + t.is(await drive.getBlobsLength(1), 1, 'Correct blobs length on explicit checkout to latest') await corestore.close() }) @@ -1511,15 +1445,15 @@ test('truncate happy path', async (t) => { await drive.put('file2', 'here2') await drive.put('file3', 'here3') - t.is(drive.version, 4, 'sanity check') + t.is(drive.version, 3, 'sanity check') t.is(await drive.getBlobsLength(), 3, 'sanity check') - await drive.truncate(3) - t.is(drive.version, 3, 'truncated db correctly') + await drive.truncate(2) + t.is(drive.version, 2, 'truncated db correctly') t.is(await drive.getBlobsLength(), 2, 'truncated blobs correctly') await drive.put('file3', 'here file 3 post truncation') - t.is(drive.version, 4, 'correct version when putting after truncate') + t.is(drive.version, 3, 'correct version when putting after truncate') t.is(await drive.getBlobsLength(), 3, 'correct blobsLength when putting after truncate') t.is(b4a.toString(await drive.get('file3')), 'here file 3 post truncation', 'Sanity check')