diff --git a/.eslintignore b/.eslintignore index 6d0e285..4d61c21 100644 --- a/.eslintignore +++ b/.eslintignore @@ -5,3 +5,4 @@ dist types *.test.ts *.test.js +.yarn/install-state.gz diff --git a/.gitignore b/.gitignore index 41de265..4a697b4 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ types/* .yarnrc.yml .yarn +*.bak diff --git a/.npmignore b/.npmignore new file mode 100644 index 0000000..4a697b4 --- /dev/null +++ b/.npmignore @@ -0,0 +1,10 @@ +.DS_Store +node_modules +yarn-error.log +coverage/* +dist/* +types/* + +.yarnrc.yml +.yarn +*.bak diff --git a/.yarn/install-state.gz b/.yarn/install-state.gz deleted file mode 100644 index 51ed101..0000000 Binary files a/.yarn/install-state.gz and /dev/null differ diff --git a/package.json b/package.json index 62c8b21..b5d12a1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "vait", - "version": "4.2.1", + "version": "4.3.0", "main": "./dist/vait.common.js", "module": "./dist/vait.esm.js", "typings": "./types/index.d.ts", diff --git a/src/concurrency.test.ts b/src/concurrency.test.ts index 1f5cff0..d86766e 100644 --- a/src/concurrency.test.ts +++ b/src/concurrency.test.ts @@ -5,9 +5,10 @@ import { nextTick } from './next-tick' import { Signal } from './signal' import { Timer } from './timer' import { concurrency } from './concurrency' +import { Memo } from './memo' -test('concurrency.Number', async () => { - const { get, set } = concurrency.Number(1) +test('concurrency.Valve', async () => { + const { get, set } = concurrency.Valve(1) assert(get() === 1) set(99) assert(get() === 99) @@ -141,11 +142,11 @@ test('concurrent_limit should be integer', async () => { } }) -test('concurrent_limit should >= 1', async () => { +test('concurrent_limit should >= 0', async () => { { let val = 0 try { - await concurrency(0, [][Symbol.iterator](), () => Promise.resolve()) + await concurrency(-1, [][Symbol.iterator](), () => Promise.resolve()) val = 111 } catch (err) { assert( err instanceof RangeError ) @@ -156,7 +157,7 @@ test('concurrent_limit should >= 1', async () => { { let val = 0 try { - await concurrency(0, [2,4,2,1,'a'][Symbol.iterator](), () => Promise.resolve()) + await concurrency(-9, [2,4,2,1,'a'][Symbol.iterator](), () => Promise.resolve()) val = 111 } catch (err) { assert( err instanceof RangeError ) @@ -174,7 +175,7 @@ function range(start: number, end: number) { return list } -test('concurrency should support Infinity max concurrency', async () => { +test('concurrency() should support Infinity max concurrency', async () => { let revoke_count = 0 const [waiting, ok] = Wait() const promise = concurrency(Infinity, range(0, 9)[Symbol.iterator](), () => { @@ -196,14 +197,14 @@ test('concurrency should support Infinity max concurrency', async () => { await promise }) -test('concurrency should dynamically change the max concurrency', async () => { +test('concurrency() should dynamically change the max concurrency', async () => { let revoke_count = 0 - const maxConcurrency = concurrency.Number(1) + const concurrencyValve = concurrency.Valve(1) const resolved_idx_list: Array = [] const pedding_tasks: Array = [] const concurrentPromise = concurrency( - maxConcurrency, + concurrencyValve, range(0, 9)[Symbol.iterator](), (_, idx) => { revoke_count += 1 @@ -226,22 +227,28 @@ test('concurrency should dynamically change the max concurrency', async () => { expect(revoke_count).toBe(1) expect(() => { - maxConcurrency.set(-1) + concurrencyValve.set(-1) }).toThrow() expect(() => { - maxConcurrency.set(0) + concurrencyValve.set(-2) }).toThrow() expect(() => { - maxConcurrency.set('' as any) + concurrencyValve.set(-10) + }).toThrow() + // expect(() => { + // concurrencyValve.set(0) + // }).toThrow() + expect(() => { + concurrencyValve.set('' as any) }).toThrow() expect(() => { - maxConcurrency.set(1) + concurrencyValve.set(1) }).not.toThrow() await timeout(100) expect(revoke_count).toBe(1) - maxConcurrency.set(2) + concurrencyValve.set(2) await timeout(100) expect(revoke_count).toBe(2) @@ -268,15 +275,15 @@ test('concurrency should dynamically change the max concurrency', async () => { expect(resolved_idx_list.slice(1, resolved_idx_list.length - 1)).toStrictEqual(range(2, 9)) // 中间的是 2~9 }) -test('concurrency should dynamically change the max concurrency(change to small)', async () => { +test('concurrency() should dynamically change the max concurrency(change to small)', async () => { let revoke_count = 0 const resolved_idx_list: Array = [] const waiting_list = range(0, 9).map(() => Wait()) - const maxConcurrency = concurrency.Number(3) + const concurrencyValve = concurrency.Valve(3) const concurrentPromise = concurrency( - maxConcurrency, + concurrencyValve, waiting_list[Symbol.iterator](), (w, idx) => { revoke_count += 1 @@ -292,7 +299,7 @@ test('concurrency should dynamically change the max concurrency(change to small) await timeout(100) expect(resolved_idx_list.length).toBe(0) - maxConcurrency.set(1) + concurrencyValve.set(1) expect(revoke_count).toBe(3) expect(resolved_idx_list.length).toBe(0) @@ -334,3 +341,91 @@ test('concurrency should dynamically change the max concurrency(change to small) await concurrentPromise }) + +test('concurrency() should support 0 max concurrency', async () => { + let revoke_count = 0 + const concurrencyValve = concurrency.Valve(0) + const concurrentPromise = concurrency( + concurrencyValve, + range(0, 9)[Symbol.iterator](), + async (_, idx) => { + revoke_count += 1 + }, + ) + + await timeout(100) + expect(revoke_count).toBe(0) + concurrencyValve.set(1) + + await timeout(100) + expect(revoke_count).toBe(10) + await concurrentPromise +}) + +test('concurrency() should support pause', async () => { + let revoke_count = 0 + const resolved_idx_list: Array = [] + const waiting_list = range(0, 9).map(() => Wait()) + + const concurrencyValve = concurrency.Valve(3) + + const concurrentPromise = concurrency( + concurrencyValve, + waiting_list[Symbol.iterator](), + (w, idx) => { + revoke_count += 1 + const [ waiting ] = w + return waiting.then(() => { + resolved_idx_list.push(idx) + }) + }, + ) + + expect(revoke_count).toBe(3) + + concurrencyValve.set(0) + waiting_list.slice(0, 3).forEach(([, go]) => go()) + await timeout(100) + expect(revoke_count).toBe(3) + + waiting_list.forEach(([, go]) => go()) + concurrencyValve.set(1) + + await concurrentPromise +}) + +test('concurrency() should set valve in async function', async () => { + let revoke_count = 0 + const resolved_idx_list: Array = [] + const concurrencyValve = concurrency.Valve(1) + + // const m = Memo(2) + // const cancel = Memo.watch(m, () => { + // cancel() + // Memo.change(m, 9) + // }) + // Memo.change(m, 1) + + const concurrentPromise = concurrency( + concurrencyValve, + range(0, 9)[Symbol.iterator](), + async (_, idx) => { + revoke_count += 1 + concurrencyValve.set(0) + Timer( + Math.floor(Math.random() * 100), + () => { + concurrencyValve.set(1) + resolved_idx_list.push(idx) + } + ) + }, + ) + + await concurrentPromise + + expect(revoke_count).toBe(10) + for (let i = 0; i < resolved_idx_list.length; ++i) { + expect(resolved_idx_list[i]).toBe(i) + } +}) diff --git a/src/concurrency.ts b/src/concurrency.ts index 62d537c..94037d9 100644 --- a/src/concurrency.ts +++ b/src/concurrency.ts @@ -1,18 +1,19 @@ import { Memo, WithValidating } from './memo' +import { nextTick } from './next-tick'; import { Wait } from './wait' const NONE_ERROR = Symbol('NONE_ERROR') -export type ConcurrencyNumber = { +export type ConcurrencyValve = { get(): number; set(v: number): void memo: Memo } -concurrency.Number = (v: number): ConcurrencyNumber => { +concurrency.Valve = (v: number): ConcurrencyValve => { const memo = WithValidating(Memo(v), v => { - if (v < 1) { - throw new RangeError('concurrent_limit should >= 1') + if (v < 0) { + throw new RangeError('concurrent_limit should >= 0') } else if (!Number.isInteger(v)) { if (v !== Infinity) { throw new TypeError('concurrent_limit should be integer') @@ -28,34 +29,42 @@ concurrency.Number = (v: number): ConcurrencyNumber => { } concurrency.each = ( - MaxConcurrency: ConcurrencyNumber | number, + valve: ConcurrencyValve | number, list: Array, asyncFn: (item: T, idx: number, list: T[]) => Promise ) => ( concurrency( - MaxConcurrency, + valve, list[Symbol.iterator](), (item, idx) => asyncFn(item, idx, list) ) ) export async function concurrency( - MaxConcurrency: ConcurrencyNumber | number, + valOrNum: ConcurrencyValve | number, iterator: IterableIterator, asyncFn: (item: T, idx: number) => Promise ): Promise { - if (typeof MaxConcurrency === 'number') { - return concurrency(concurrency.Number(MaxConcurrency), iterator, asyncFn) + if (typeof valOrNum === 'number') { + return concurrencyWithValve(concurrency.Valve(valOrNum), iterator, asyncFn) + } else { + return concurrencyWithValve(valOrNum, iterator, asyncFn) } +} - const [ getMaxConcurrency ] = MaxConcurrency.memo - +function concurrencyWithValve( + valve: ConcurrencyValve, + iterator: IterableIterator, + asyncFn: (item: T, idx: number) => Promise +): Promise { let current_concurrency = 0 let __idx = 0 let error_info: (typeof NONE_ERROR) | Exclude = NONE_ERROR const [ waiting, done ] = Wait() let result: IteratorResult - const cancelWatch = Memo.watch(MaxConcurrency.memo, callConcurrent) + const cancelWatch = Memo.watch(valve.memo, () => { + nextTick().then(run) + }) function after() { if (error_info === NONE_ERROR) { @@ -66,7 +75,7 @@ export async function concurrency( ) { done() } else { - callConcurrent() + run() } } } @@ -78,10 +87,10 @@ export async function concurrency( } } - callConcurrent() - function callConcurrent() { + run() + function run() { while ( - (current_concurrency < getMaxConcurrency()) && + (current_concurrency < valve.get()) && (!result || !result?.done) ) { current_concurrency += 1 @@ -103,7 +112,7 @@ export async function concurrency( return ( waiting.then(() => { - cancelWatch && cancelWatch() + cancelWatch() if (error_info !== NONE_ERROR) { throw error_info } diff --git a/src/concurrent-map.ts b/src/concurrent-map.ts index 1c4a4cd..4c6090e 100644 --- a/src/concurrent-map.ts +++ b/src/concurrent-map.ts @@ -1,7 +1,7 @@ -import { ConcurrencyNumber, concurrency } from './concurrency' +import { ConcurrencyValve, concurrency } from './concurrency' export function concurrentMap( - conccurrencyNumber: ConcurrencyNumber | number, + valve: ConcurrencyValve | number, list: T[], asyncFn: (item: T, idx: number, total: T[]) => Promise, ): Promise { @@ -10,7 +10,7 @@ export function concurrentMap( } else { const new_list: NT[] = [] return concurrency.each( - conccurrencyNumber, + valve, list, async (item, idx) => { new_list[idx] = await asyncFn(item, idx, list) diff --git a/src/memo.test.ts b/src/memo.test.ts index ee78326..057f55f 100644 --- a/src/memo.test.ts +++ b/src/memo.test.ts @@ -425,3 +425,22 @@ test('watching ValidatingMemo By WatchMemo(ValidatingMemo(Memo', () => { set(999) expect(changed).toBe( 2 ) }) + +test('memo should support cancel in watch handle', () => { + const m = Memo(2) + let i: number = 0 + const watchHandler = () => { + cancel() + i += 1 + } + const cancel = Memo.watch(m, watchHandler) + + expect(i).toBe(0) + Memo.change(m, 1) + expect(i).toBe(1) + + assert(m.length === 2) + + Memo.change(m, 2) + expect(i).toBe(1) +}) diff --git a/src/memo.ts b/src/memo.ts index c194f45..beabb35 100644 --- a/src/memo.ts +++ b/src/memo.ts @@ -27,8 +27,6 @@ type MemoAfter = TypedFunc<'after', D, void> export type MemoLike = [ MemoGetter, MemoSetter, ...Array | MemoAfter> ] export type Memo = ID, 'memo'> -// type TypeMemo = [ MemoGetter ] - export class MemoError extends Error { } export const Memo = (data: D): Memo => { diff --git a/src/next-tick.test.ts b/src/next-tick.test.ts index d0af11f..236b1a5 100644 --- a/src/next-tick.test.ts +++ b/src/next-tick.test.ts @@ -1,6 +1,6 @@ import { nextTick } from './next-tick' -test.only('nextTick', async () => { +test('nextTick', async () => { let value: number | undefined = undefined const promise = nextTick().then(() => { diff --git a/src/queue-pool.ts b/src/queue-pool.ts index ee18965..e3cb8ca 100644 --- a/src/queue-pool.ts +++ b/src/queue-pool.ts @@ -29,7 +29,7 @@ export function QueuePool(): QueuePool { const signal = { ALL_DONE: Signal(), - ERROR: Signal<{ id: ID, payload: Payload, error: unknown }>() + ERROR: Signal<{ id: ID, payload: Payload, error: unknown }>(), } function createQueue(id: ID) { @@ -59,28 +59,22 @@ export function QueuePool(): QueuePool { if (typeof func !== 'function') { return addTask(id, undefined as Payload, payload as (() => Promise)) } else { - // const pause = getPause() - // if (pause !== null) { - // const [waiting] = pause - // waiting.then(() => { - // addTask(id, payload as Payload, func) - // }) - // } const q = getQueue(id) q.task(payload as Payload, func) - const cancelReceiveError = q.signal.ERROR.receive(({ task, error }) => { - signal.ERROR.trigger({ - id, - payload: q.getPayload(task), - error, + const cancelReceiveError = ( + q.signal.ERROR.receive(({ task, error }) => { + signal.ERROR.trigger({ + id, + payload: q.getPayload(task), + error, + }) }) - }) + ) if (q.signal.ALL_DONE.isEmpty()) { - const cancel = q.signal.ALL_DONE.receive(() => { + Signal.once(q.signal.ALL_DONE, () => { cancelReceiveError() - cancel() pool.delete(id) if (pool.size === 0) { signal.ALL_DONE.trigger() diff --git a/src/queue.test.ts b/src/queue.test.ts index 808e883..5113d45 100644 --- a/src/queue.test.ts +++ b/src/queue.test.ts @@ -117,9 +117,6 @@ test('Queue', () => { expect(() => { Queue().setMaxConcurrent(0.9) }).toThrow() - expect(() => { - Queue().setMaxConcurrent(0) - }).toThrow() expect(() => { Queue().setMaxConcurrent('0' as any) }).toThrow() diff --git a/src/queue.ts b/src/queue.ts index 4d2ef3b..bf16946 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -94,8 +94,8 @@ export function Queue(signal?: S) { const TasksMemo = Memo([]) const [getTasks, setTasks] = TasksMemo const [getStatus, setStatus] = Memo('pause') - const MaxConcurrency = concurrency.Number(1) - const { memo: [, setMaxConcurrent] } = MaxConcurrency + const concurrencyValve = concurrency.Valve(1) + const { memo: [, setMaxConcurrent] } = concurrencyValve function dropTask(task: QueueTask) { setTasks( @@ -128,19 +128,18 @@ export function Queue(signal?: S) { return ( nextTick().then(() => ( concurrency( - MaxConcurrency, + concurrencyValve, QueueTaskIterator(), - (task) => ( - task() - .then(() => { - signal?.SUCCESS.trigger(task) - signal?.PROCESSED.trigger(task) - }) - .catch((error) => { - signal?.ERROR.trigger({ task, error }) - signal?.PROCESSED.trigger(task) - }) - ) + async task => { + try { + await task() + signal?.SUCCESS.trigger(task) + } catch (error) { + signal?.ERROR.trigger({ task, error }) + } finally { + signal?.PROCESSED.trigger(task) + } + } ) )).then(() => { if (getTasks().length) { diff --git a/src/signal.ts b/src/signal.ts index 7048138..f6d877c 100644 --- a/src/signal.ts +++ b/src/signal.ts @@ -11,53 +11,51 @@ export class SignalError extends Error {} // 进而会影响调用方的代码执行,这是不太好的 // 故 executeHandlers 将会忽略错误,并将跳过当前出错的函数 // 继续执行队列中余下的函数 -export function executeHandlers

(handlers: Handlers

, payload: P) { - for (let i = 0; i < handlers.length; ++i) { - try { - handlers[i](payload) - } catch (err) { - console.warn( - 'Signal has caught an error, but it is ignored by default and not thrown. ' + - 'If you want to throw this error, please use triggerCareError(). Error details:', err - ) - } - } -} - -function executeHandlersCareError

(handlers: Handlers

, payload: P) { - try { +export const ExecuteHandlers =

(ignore_error: boolean, getHandlers: () => Handler

[]) => ( + (payload: P) => { + const handlers = getHandlers() for (let i = 0; i < handlers.length; ++i) { - handlers[i](payload) + try { handlers[i](payload) } catch (cause) { + if (ignore_error) { + console.warn( + 'Signal has caught an error, but it is ignored by default and not thrown. ' + + 'If you want to throw this error, please use triggerCareError(). Error details:', cause + ) + } else { + throw new SignalError('An error occurred while executing the function queue registered in the signal.', { cause }) + } + } } - } catch (cause) { - throw new SignalError('An error occurred while executing the function queue registered in the signal.', { cause }) } -} +) -export interface Signal

{ +export type Signal

= Readonly<{ + trigger: P extends void ? () => void : (payload: P) => void + triggerCareError: P extends void ? () => void : (payload: P) => void + receive(fn: (payload: P) => void): () => void + cancelReceive(fn: (payload: P) => void): void isEmpty(): boolean - trigger(payload: P): void - triggerCareError(payload: P): void - receive(fn: Handler

): () => void - cancelReceive(fn: Handler

): void -} +}> + export function Signal(): Signal export function Signal

(): Signal

export function Signal

(): Signal

{ const [getHandlers, setHandlers] = Memo>([]) - const cancelReceive = (fn: Handler

) => { + + const cancelReceive = (fn: Handler

) => ( setHandlers( removeByItem(getHandlers(), fn) ) - } + ) + return { - isEmpty: () => (getHandlers().length === 0), - triggerCareError: payload => executeHandlersCareError(getHandlers(), payload), - trigger: payload => executeHandlers(getHandlers(), payload), + isEmpty: () => getHandlers().length === 0, + trigger: ExecuteHandlers(true, getHandlers) as Signal

['trigger'], + triggerCareError: ExecuteHandlers(false, getHandlers) as Signal

['triggerCareError'], cancelReceive, receive(fn) { setHandlers(getHandlers().concat(fn)) return () => cancelReceive(fn) }, - } as const + } } Signal.once =

(sig: Signal

, fn: Handler

) => {