Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 60 additions & 24 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,12 @@ exports.Team = class {

exports.Events = class {

#pending = null;
#queue = [];
#iterators = new Set();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be safer to also use a WeakSet here just in case it's iterated through weird means?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be safer, but not possible since you can't iterate a WeakSet.


static _iterators(instance) {

return instance.#iterators;
}

static isIterator(iterator) {

Expand All @@ -84,45 +88,40 @@ exports.Events = class {

iterator() {

return new internals.EventsIterator(this);
}
const iterator = new internals.EventsIterator(this);

emit(value) {
this.#iterators.add(iterator);

this._queue({ value, done: false });
return iterator;
}

end() {
emit(value) {

this._queue({ done: true });
for (const iterator of this.#iterators) {
iterator._queue({ value, done: false });
}
}

_next() {
end() {

if (this.#queue.length) {
return Promise.resolve(this.#queue.shift());
for (const iterator of this.#iterators) {
iterator._queue({ done: true });
}

this.#pending = new exports.Team();
return this.#pending.work;
}

_queue(item) {
_remove(iterator) {

if (this.#pending) {
this.#pending.attend(item);
this.#pending = null;
}
else {
this.#queue.push(item);
}
this.#iterators.delete(iterator);
}
};


internals.EventsIterator = class {

#events = null;
#events;

#pending = null;
#queue = [];

constructor(events) {

Expand All @@ -136,6 +135,43 @@ internals.EventsIterator = class {

next() {

return this.#events._next();
if (this.#queue.length) {
return Promise.resolve(this.#queue.shift());
}

if (!this.#events) {
return { done: true };
}

this.#pending = new exports.Team();
return this.#pending.work;
}

return() {

this._cleanup();

return { done: true };
}

_cleanup() {

this.#events?._remove(this);
this.#events = null;
}

_queue(item) {

if (item.done) {
this._cleanup();
}

if (this.#pending) {
this.#pending.attend(item);
this.#pending = null;
}
else {
this.#queue.push(item);
}
}
};
137 changes: 136 additions & 1 deletion test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -224,21 +224,156 @@ describe('Events', () => {
events.end();

expect(await collect).to.equal([1, 2, 3]);
expect(Teamwork.Events._iterators(events)).to.equal(new Set());
});

it('iterates over events (queued)', async () => {

const events = new Teamwork.Events();
const iterator = events.iterator();

events.emit(1);
events.emit(2);
events.emit(3);
events.end();

const items = [];
for await (const item of events.iterator()) {
for await (const item of iterator) {
items.push(item);
}

expect(items).to.equal([1, 2, 3]);
expect(Teamwork.Events._iterators(events)).to.equal(new Set());
});

it('only iterates over new events after iterator() call', async () => {

const events = new Teamwork.Events();

events.emit(1);

const iterator = events.iterator();
events.emit(2);
events.emit(3);
events.end();

const items = [];
for await (const item of iterator) {
items.push(item);
}

expect(items).to.equal([2, 3]);
expect(Teamwork.Events._iterators(events)).to.equal(new Set());
});

it('returns done for consumed iterators', async () => {

const events = new Teamwork.Events();
const iterator = events.iterator();

events.emit(1);
events.emit(2);
events.emit(3);
events.end();

const items = [];
for await (const item of iterator) {
items.push(item);
}

expect(iterator.next()).to.equal({ done: true });

expect(items).to.equal([1, 2, 3]);
expect(Teamwork.Events._iterators(events)).to.equal(new Set());
});

it('can use break without leaking', async () => {

const events = new Teamwork.Events();
const iterator = events.iterator();

events.emit(1);
events.emit(2);

const items = [];
for await (const item of iterator) {
items.push(item);
break;
}

expect(items).to.equal([1]);
expect(Teamwork.Events._iterators(events)).to.equal(new Set());
});

it('can throw without leaking', async () => {

const events = new Teamwork.Events();
const iterator = events.iterator();

events.emit(1);
events.emit(2);

const items = [];
await expect((async () => {

for await (const item of iterator) {
items.push(item);
throw new Error('fail');
}
})()).to.reject('fail');

expect(items).to.equal([1]);
expect(Teamwork.Events._iterators(events)).to.equal(new Set());
});

it('works with multiple iterators (serial)', async () => {

const events = new Teamwork.Events();
const iter1 = events.iterator();
const iter2 = events.iterator();

events.emit(1);
events.emit(2);
events.emit(3);
events.end();

const items1 = [];
for await (const item1 of iter1) {
items1.push(item1);
}

const items2 = [];
for await (const item2 of iter2) {
items2.push(item2);
}

expect(items1).to.equal([1, 2, 3]);
expect(items2).to.equal([1, 2, 3]);
});

it('works with multiple iterators (interleaved)', async () => {

const events = new Teamwork.Events();
const iter1 = events.iterator();
const iter2 = events.iterator();

events.emit(1);
events.emit(2);
events.emit(3);
events.end();

const items1 = [];
const items2 = [];
for await (const item1 of iter1) {
items1.push(item1);
if (items2.length === 0) {
for await (const item2 of iter2) {
items2.push(item2);
}
}
}

expect(items1).to.equal([1, 2, 3]);
expect(items2).to.equal([1, 2, 3]);
});
});
Loading