diff --git a/lib/index.js b/lib/index.js index 9e72cf0..c766422 100755 --- a/lib/index.js +++ b/lib/index.js @@ -74,8 +74,12 @@ exports.Team = class { exports.Events = class { - #pending = null; - #queue = []; + #iterators = new Set(); + + static _iterators(instance) { + + return instance.#iterators; + } static isIterator(iterator) { @@ -84,45 +88,40 @@ exports.Events = class { iterator() { - return new internals.EventsIterator(this); - } + const iterator = new internals.EventsIterator(this); - emit(value) { + this.#iterators.add(iterator); - this._queue({ value, done: false }); + return iterator; } - end() { + emit(value) { - this._queue({ done: true }); + for (const iterator of this.#iterators) { + iterator._queue({ value, done: false }); + } } - _next() { + end() { - if (this.#queue.length) { - return Promise.resolve(this.#queue.shift()); + for (const iterator of this.#iterators) { + iterator._queue({ done: true }); } - - this.#pending = new exports.Team(); - return this.#pending.work; } - _queue(item) { + _remove(iterator) { - if (this.#pending) { - this.#pending.attend(item); - this.#pending = null; - } - else { - this.#queue.push(item); - } + this.#iterators.delete(iterator); } }; internals.EventsIterator = class { - #events = null; + #events; + + #pending = null; + #queue = []; constructor(events) { @@ -136,6 +135,43 @@ internals.EventsIterator = class { next() { - return this.#events._next(); + if (this.#queue.length) { + return Promise.resolve(this.#queue.shift()); + } + + if (!this.#events) { + return { done: true }; + } + + this.#pending = new exports.Team(); + return this.#pending.work; + } + + return() { + + this._cleanup(); + + return { done: true }; + } + + _cleanup() { + + this.#events?._remove(this); + this.#events = null; + } + + _queue(item) { + + if (item.done) { + this._cleanup(); + } + + if (this.#pending) { + this.#pending.attend(item); + this.#pending = null; + } + else { + this.#queue.push(item); + } } }; diff --git a/test/index.js b/test/index.js index cc204fd..dae7587 100755 --- a/test/index.js +++ b/test/index.js @@ -224,21 +224,156 @@ describe('Events', () => { events.end(); expect(await collect).to.equal([1, 2, 3]); + expect(Teamwork.Events._iterators(events)).to.equal(new Set()); }); it('iterates over events (queued)', async () => { const events = new Teamwork.Events(); + const iterator = events.iterator(); + events.emit(1); events.emit(2); events.emit(3); events.end(); const items = []; - for await (const item of events.iterator()) { + for await (const item of iterator) { items.push(item); } expect(items).to.equal([1, 2, 3]); + expect(Teamwork.Events._iterators(events)).to.equal(new Set()); + }); + + it('only iterates over new events after iterator() call', async () => { + + const events = new Teamwork.Events(); + + events.emit(1); + + const iterator = events.iterator(); + events.emit(2); + events.emit(3); + events.end(); + + const items = []; + for await (const item of iterator) { + items.push(item); + } + + expect(items).to.equal([2, 3]); + expect(Teamwork.Events._iterators(events)).to.equal(new Set()); + }); + + it('returns done for consumed iterators', async () => { + + const events = new Teamwork.Events(); + const iterator = events.iterator(); + + events.emit(1); + events.emit(2); + events.emit(3); + events.end(); + + const items = []; + for await (const item of iterator) { + items.push(item); + } + + expect(iterator.next()).to.equal({ done: true }); + + expect(items).to.equal([1, 2, 3]); + expect(Teamwork.Events._iterators(events)).to.equal(new Set()); + }); + + it('can use break without leaking', async () => { + + const events = new Teamwork.Events(); + const iterator = events.iterator(); + + events.emit(1); + events.emit(2); + + const items = []; + for await (const item of iterator) { + items.push(item); + break; + } + + expect(items).to.equal([1]); + expect(Teamwork.Events._iterators(events)).to.equal(new Set()); + }); + + it('can throw without leaking', async () => { + + const events = new Teamwork.Events(); + const iterator = events.iterator(); + + events.emit(1); + events.emit(2); + + const items = []; + await expect((async () => { + + for await (const item of iterator) { + items.push(item); + throw new Error('fail'); + } + })()).to.reject('fail'); + + expect(items).to.equal([1]); + expect(Teamwork.Events._iterators(events)).to.equal(new Set()); + }); + + it('works with multiple iterators (serial)', async () => { + + const events = new Teamwork.Events(); + const iter1 = events.iterator(); + const iter2 = events.iterator(); + + events.emit(1); + events.emit(2); + events.emit(3); + events.end(); + + const items1 = []; + for await (const item1 of iter1) { + items1.push(item1); + } + + const items2 = []; + for await (const item2 of iter2) { + items2.push(item2); + } + + expect(items1).to.equal([1, 2, 3]); + expect(items2).to.equal([1, 2, 3]); + }); + + it('works with multiple iterators (interleaved)', async () => { + + const events = new Teamwork.Events(); + const iter1 = events.iterator(); + const iter2 = events.iterator(); + + events.emit(1); + events.emit(2); + events.emit(3); + events.end(); + + const items1 = []; + const items2 = []; + for await (const item1 of iter1) { + items1.push(item1); + if (items2.length === 0) { + for await (const item2 of iter2) { + items2.push(item2); + } + } + } + + expect(items1).to.equal([1, 2, 3]); + expect(items2).to.equal([1, 2, 3]); }); });