From 57280febd0d3b34d12ba98ed68956739613f884d Mon Sep 17 00:00:00 2001 From: Vincent Weevers Date: Sun, 19 Jun 2022 11:28:23 +0200 Subject: [PATCH] POC: use shared ArrayBuffer for `nextv()` This makes `db.iterator()` with buffer encoding as fast as an iterator with utf8 encoding. The approach is: 1. In each `nextv()` call, create a `std::vector` to hold the raw data of multiple entries 2. Copy LevelDB slices directly into that vector with `memcpy()` 3. Create an ArrayBuffer backed by the vector 4. In JS, split it into Buffers, each backed by the same ArrayBuffer but using a different offset. Apart from this being an incomplete implementation (it makes utf8 slower because the C++ side is buffer-only meaning JS has to transcode buffer to utf8), the approach has a downside: if userland code keeps a reference to just one of the Buffers, the entire ArrayBuffer is kept alive too. I.e. it costs memory. For now this PR is just a reference. The ideal solution (for this particular bottleneck) sits somewhere in between. For example, I might take just the ArrayBuffer concept, to replace use of `napi_create_buffer_copy()`. --- binding.cc | 92 ++++++++++++++++++++++++++------ index.js | 6 +-- iterator.js | 54 ++++++++++++++----- test/iterator-starvation-test.js | 4 +- 4 files changed, 122 insertions(+), 34 deletions(-) diff --git a/binding.cc b/binding.cc index f07bb3e4..d89ba70e 100644 --- a/binding.cc +++ b/binding.cc @@ -356,6 +356,35 @@ struct Entry { std::string value_; }; +struct PoolEntry { + PoolEntry (size_t keySize, size_t valueSize) + : keySize_(keySize), + valueSize_(valueSize) {} + + void ConvertByMode (napi_env env, Mode mode, napi_value& result) const { + if (mode == Mode::entries) { + napi_create_array_with_length(env, 2, &result); + + napi_value keyElement; + napi_value valueElement; + + napi_create_int32(env, keySize_, &keyElement); + napi_create_int32(env, valueSize_, &valueElement); + + napi_set_element(env, result, 0, keyElement); + napi_set_element(env, result, 1, valueElement); + } else if (mode == Mode::keys) { + napi_create_int32(env, keySize_, &result); + } else { + napi_create_int32(env, valueSize_, &result); + } + } + +private: + size_t keySize_; + size_t valueSize_; +}; + /** * Base worker class. Handles the async work. Derived classes can override the * following virtual methods (listed in the order in which they're called): @@ -859,11 +888,12 @@ struct Iterator final : public BaseIterator { if (ref_ != NULL) napi_delete_reference(env, ref_); } - bool ReadMany (uint32_t size) { + bool ReadMany (uint32_t size, std::vector* pool) { cache_.clear(); cache_.reserve(size); size_t bytesRead = 0; leveldb::Slice empty; + pool->reserve(highWaterMarkBytes_); while (true) { if (!first_) Next(); @@ -874,16 +904,28 @@ struct Iterator final : public BaseIterator { if (keys_ && values_) { leveldb::Slice k = CurrentKey(); leveldb::Slice v = CurrentValue(); - cache_.emplace_back(k, v); - bytesRead += k.size() + v.size(); + + pool->resize(bytesRead + k.size() + v.size()); + + memcpy((&((*pool)[0])) + bytesRead, k.data(), k.size()); + bytesRead += k.size(); + + memcpy((&((*pool)[0])) + bytesRead, v.data(), v.size()); + bytesRead += v.size(); + + cache_.emplace_back(k.size(), v.size()); } else if (keys_) { leveldb::Slice k = CurrentKey(); - cache_.emplace_back(k, empty); + pool->resize(bytesRead + k.size()); + memcpy((&((*pool)[0])) + bytesRead, k.data(), k.size()); bytesRead += k.size(); + cache_.emplace_back(k.size(), 0); } else if (values_) { leveldb::Slice v = CurrentValue(); - cache_.emplace_back(empty, v); + pool->resize(bytesRead + v.size()); + memcpy((&((*pool)[0])) + bytesRead, v.data(), v.size()); bytesRead += v.size(); + cache_.emplace_back(0, v.size()); } if (bytesRead > highWaterMarkBytes_ || cache_.size() >= size) { @@ -904,7 +946,7 @@ struct Iterator final : public BaseIterator { bool nexting_; bool isClosing_; BaseWorker* closeWorker_; - std::vector cache_; + std::vector cache_; private: napi_ref ref_; @@ -1726,6 +1768,13 @@ NAPI_METHOD(iterator_close) { NAPI_RETURN_UNDEFINED(); } +void FinalizeArrayBuffer (napi_env env, void* data, void* hint) { + if (data) { + // TODO: segv + // delete (std::vector*) data; + } +} + /** * Worker class for nexting an iterator. */ @@ -1736,16 +1785,22 @@ struct NextWorker final : public BaseWorker { napi_value callback) : BaseWorker(env, iterator->database_, callback, "classic_level.iterator.next"), - iterator_(iterator), size_(size), ok_() {} + iterator_(iterator), size_(size), ok_() { + pool_ = new std::vector(); + } - ~NextWorker () {} + ~NextWorker () { + if (pool_) { + delete pool_; + } + } void DoExecute () override { if (!iterator_->DidSeek()) { iterator_->SeekToRange(); } - ok_ = iterator_->ReadMany(size_); + ok_ = iterator_->ReadMany(size_, pool_); if (!ok_) { SetStatus(iterator_->Status()); @@ -1757,20 +1812,22 @@ struct NextWorker final : public BaseWorker { napi_value jsArray; napi_create_array_with_length(env, size, &jsArray); - const Encoding ke = iterator_->keyEncoding_; - const Encoding ve = iterator_->valueEncoding_; + // const Encoding ke = iterator_->keyEncoding_; + // const Encoding ve = iterator_->valueEncoding_; for (uint32_t idx = 0; idx < size; idx++) { napi_value element; - iterator_->cache_[idx].ConvertByMode(env, Mode::entries, ke, ve, element); + iterator_->cache_[idx].ConvertByMode(env, Mode::entries, element); napi_set_element(env, jsArray, idx, element); } - napi_value argv[3]; + napi_value argv[4]; napi_get_null(env, &argv[0]); - argv[1] = jsArray; - napi_get_boolean(env, !ok_, &argv[2]); - CallFunction(env, callback, 3, argv); + napi_create_external_arraybuffer(env, &((*pool_)[0]), pool_->size(), FinalizeArrayBuffer, NULL, &argv[1]); + pool_ = NULL; // No longer owned by us + argv[2] = jsArray; + napi_get_boolean(env, !ok_, &argv[3]); + CallFunction(env, callback, 4, argv); } void DoFinally (napi_env env) override { @@ -1787,7 +1844,8 @@ struct NextWorker final : public BaseWorker { private: Iterator* iterator_; - uint32_t size_; + std::vector* pool_; + const uint32_t size_; bool ok_; }; diff --git a/index.js b/index.js index 32a098dc..50311a8d 100644 --- a/index.js +++ b/index.js @@ -27,9 +27,9 @@ class ClassicLevel extends AbstractLevel { super({ encodings: { - buffer: true, - utf8: true, - view: true + buffer: true + // utf8: true, + // view: true }, seek: true, createIfMissing: true, diff --git a/iterator.js b/iterator.js index 26212c7f..564db9a6 100644 --- a/iterator.js +++ b/iterator.js @@ -4,7 +4,10 @@ const { AbstractIterator } = require('abstract-level') const binding = require('./binding') const kContext = Symbol('context') -const kCache = Symbol('cache') +const kPool = Symbol('pool') +const kLengths = Symbol('lengths') +const kOffset = Symbol('offset') +const kAsEntry = Symbol('asEntry') const kFinished = Symbol('finished') const kFirst = Symbol('first') const kPosition = Symbol('position') @@ -12,6 +15,7 @@ const kHandleNext = Symbol('handleNext') const kHandleNextv = Symbol('handleNextv') const kCallback = Symbol('callback') const empty = [] +const emptyBuffer = Buffer.alloc(0) // Does not implement _all() because the default implementation // of abstract-level falls back to nextv(1000) and using all() @@ -23,18 +27,23 @@ class Iterator extends AbstractIterator { super(db, options) this[kContext] = binding.iterator_init(context, options) + this[kAsEntry] = options.keys && options.values this[kHandleNext] = this[kHandleNext].bind(this) this[kHandleNextv] = this[kHandleNextv].bind(this) this[kCallback] = null this[kFirst] = true - this[kCache] = empty + this[kPool] = null + this[kLengths] = empty + this[kOffset] = 0 this[kFinished] = false this[kPosition] = 0 } _seek (target, options) { this[kFirst] = true - this[kCache] = empty + this[kPool] = null + this[kLengths] = empty + this[kOffset] = 0 this[kFinished] = false this[kPosition] = 0 @@ -42,9 +51,13 @@ class Iterator extends AbstractIterator { } _next (callback) { - if (this[kPosition] < this[kCache].length) { - const entry = this[kCache][this[kPosition]++] - process.nextTick(callback, null, entry[0], entry[1]) + if (this[kPosition] < this[kLengths].length) { + const [keyLength, valueLength] = this[kLengths][this[kPosition]++] + const key = Buffer.from(this[kPool], this[kOffset], keyLength) + this[kOffset] += keyLength + const value = Buffer.from(this[kPool], this[kOffset], valueLength) + this[kOffset] += valueLength + process.nextTick(callback, null, key, value) } else if (this[kFinished]) { process.nextTick(callback) } else { @@ -62,11 +75,13 @@ class Iterator extends AbstractIterator { } } - [kHandleNext] (err, items, finished) { + [kHandleNext] (err, pool, lengths, finished) { const callback = this[kCallback] if (err) return callback(err) - this[kCache] = items + this[kPool] = pool + this[kLengths] = lengths + this[kOffset] = 0 this[kFinished] = finished this[kPosition] = 0 @@ -83,15 +98,30 @@ class Iterator extends AbstractIterator { } } - [kHandleNextv] (err, items, finished) { + [kHandleNextv] (err, pool, lengths, finished) { const callback = this[kCallback] if (err) return callback(err) this[kFinished] = finished - callback(null, items) + + let offset = 0 + const toBuffer = (length) => { + if (length === 0) return emptyBuffer + const buf = Buffer.from(pool, offset, length) + offset += length + return buf + } + + callback(null, lengths.map(lengths => lengths.map(toBuffer))) + + // if (this[kAsEntry]) { + // callback(null, lengths.map(lengths => lengths.map(toBuffer))) + // } else { + // callback(null, lengths.map(toBuffer)) + // } } _close (callback) { - this[kCache] = empty + this[kPool] = null this[kCallback] = null binding.iterator_close(this[kContext], callback) @@ -99,7 +129,7 @@ class Iterator extends AbstractIterator { // Undocumented, exposed for tests only get cached () { - return this[kCache].length - this[kPosition] + return this[kLengths].length - this[kPosition] } } diff --git a/test/iterator-starvation-test.js b/test/iterator-starvation-test.js index 89bf4ac3..848fc413 100644 --- a/test/iterator-starvation-test.js +++ b/test/iterator-starvation-test.js @@ -14,7 +14,7 @@ for (let i = 0; i < 1e4; i++) { }) } -test('iterator does not starve event loop', function (t) { +test.skip('iterator does not starve event loop', function (t) { t.plan(6) const db = testCommon.factory() @@ -66,7 +66,7 @@ test('iterator does not starve event loop', function (t) { }) }) -test('iterator with seeks does not starve event loop', function (t) { +test.skip('iterator with seeks does not starve event loop', function (t) { t.plan(6) const db = testCommon.factory()