Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .eslintignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ dist
types
*.test.ts
*.test.js
.yarn/install-state.gz
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ types/*

.yarnrc.yml
.yarn
*.bak
10 changes: 10 additions & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
.DS_Store
node_modules
yarn-error.log
coverage/*
dist/*
types/*

.yarnrc.yml
.yarn
*.bak
Binary file removed .yarn/install-state.gz
Binary file not shown.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "vait",
"version": "4.2.1",
"version": "4.3.0",
"main": "./dist/vait.common.js",
"module": "./dist/vait.esm.js",
"typings": "./types/index.d.ts",
Expand Down
131 changes: 113 additions & 18 deletions src/concurrency.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import { nextTick } from './next-tick'
import { Signal } from './signal'
import { Timer } from './timer'
import { concurrency } from './concurrency'
import { Memo } from './memo'

test('concurrency.Number', async () => {
const { get, set } = concurrency.Number(1)
test('concurrency.Valve', async () => {
const { get, set } = concurrency.Valve(1)
assert(get() === 1)
set(99)
assert(get() === 99)
Expand Down Expand Up @@ -141,11 +142,11 @@ test('concurrent_limit should be integer', async () => {
}
})

test('concurrent_limit should >= 1', async () => {
test('concurrent_limit should >= 0', async () => {
{
let val = 0
try {
await concurrency(0, [][Symbol.iterator](), () => Promise.resolve())
await concurrency(-1, [][Symbol.iterator](), () => Promise.resolve())
val = 111
} catch (err) {
assert( err instanceof RangeError )
Expand All @@ -156,7 +157,7 @@ test('concurrent_limit should >= 1', async () => {
{
let val = 0
try {
await concurrency(0, [2,4,2,1,'a'][Symbol.iterator](), () => Promise.resolve())
await concurrency(-9, [2,4,2,1,'a'][Symbol.iterator](), () => Promise.resolve())
val = 111
} catch (err) {
assert( err instanceof RangeError )
Expand All @@ -174,7 +175,7 @@ function range(start: number, end: number) {
return list
}

test('concurrency should support Infinity max concurrency', async () => {
test('concurrency() should support Infinity max concurrency', async () => {
let revoke_count = 0
const [waiting, ok] = Wait()
const promise = concurrency(Infinity, range(0, 9)[Symbol.iterator](), () => {
Expand All @@ -196,14 +197,14 @@ test('concurrency should support Infinity max concurrency', async () => {
await promise
})

test('concurrency should dynamically change the max concurrency', async () => {
test('concurrency() should dynamically change the max concurrency', async () => {
let revoke_count = 0
const maxConcurrency = concurrency.Number(1)
const concurrencyValve = concurrency.Valve(1)
const resolved_idx_list: Array<number> = []
const pedding_tasks: Array<Wait> = []

const concurrentPromise = concurrency(
maxConcurrency,
concurrencyValve,
range(0, 9)[Symbol.iterator](),
(_, idx) => {
revoke_count += 1
Expand All @@ -226,22 +227,28 @@ test('concurrency should dynamically change the max concurrency', async () => {
expect(revoke_count).toBe(1)

expect(() => {
maxConcurrency.set(-1)
concurrencyValve.set(-1)
}).toThrow()
expect(() => {
maxConcurrency.set(0)
concurrencyValve.set(-2)
}).toThrow()
expect(() => {
maxConcurrency.set('' as any)
concurrencyValve.set(-10)
}).toThrow()
// expect(() => {
// concurrencyValve.set(0)
// }).toThrow()
expect(() => {
concurrencyValve.set('' as any)
}).toThrow()

expect(() => {
maxConcurrency.set(1)
concurrencyValve.set(1)
}).not.toThrow()
await timeout(100)
expect(revoke_count).toBe(1)

maxConcurrency.set(2)
concurrencyValve.set(2)
await timeout(100)
expect(revoke_count).toBe(2)

Expand All @@ -268,15 +275,15 @@ test('concurrency should dynamically change the max concurrency', async () => {
expect(resolved_idx_list.slice(1, resolved_idx_list.length - 1)).toStrictEqual(range(2, 9)) // 中间的是 2~9
})

test('concurrency should dynamically change the max concurrency(change to small)', async () => {
test('concurrency() should dynamically change the max concurrency(change to small)', async () => {
let revoke_count = 0
const resolved_idx_list: Array<number> = []
const waiting_list = range(0, 9).map(() => Wait())

const maxConcurrency = concurrency.Number(3)
const concurrencyValve = concurrency.Valve(3)

const concurrentPromise = concurrency(
maxConcurrency,
concurrencyValve,
waiting_list[Symbol.iterator](),
(w, idx) => {
revoke_count += 1
Expand All @@ -292,7 +299,7 @@ test('concurrency should dynamically change the max concurrency(change to small)
await timeout(100)
expect(resolved_idx_list.length).toBe(0)

maxConcurrency.set(1)
concurrencyValve.set(1)
expect(revoke_count).toBe(3)
expect(resolved_idx_list.length).toBe(0)

Expand Down Expand Up @@ -334,3 +341,91 @@ test('concurrency should dynamically change the max concurrency(change to small)

await concurrentPromise
})

test('concurrency() should support 0 max concurrency', async () => {
let revoke_count = 0
const concurrencyValve = concurrency.Valve(0)
const concurrentPromise = concurrency(
concurrencyValve,
range(0, 9)[Symbol.iterator](),
async (_, idx) => {
revoke_count += 1
},
)

await timeout(100)
expect(revoke_count).toBe(0)
concurrencyValve.set(1)

await timeout(100)
expect(revoke_count).toBe(10)
await concurrentPromise
})

test('concurrency() should support pause', async () => {
let revoke_count = 0
const resolved_idx_list: Array<number> = []
const waiting_list = range(0, 9).map(() => Wait())

const concurrencyValve = concurrency.Valve(3)

const concurrentPromise = concurrency(
concurrencyValve,
waiting_list[Symbol.iterator](),
(w, idx) => {
revoke_count += 1
const [ waiting ] = w
return waiting.then(() => {
resolved_idx_list.push(idx)
})
},
)

expect(revoke_count).toBe(3)

concurrencyValve.set(0)
waiting_list.slice(0, 3).forEach(([, go]) => go())
await timeout(100)
expect(revoke_count).toBe(3)

waiting_list.forEach(([, go]) => go())
concurrencyValve.set(1)

await concurrentPromise
})

test('concurrency() should set valve in async function', async () => {
let revoke_count = 0
const resolved_idx_list: Array<number> = []
const concurrencyValve = concurrency.Valve(1)

// const m = Memo(2)
// const cancel = Memo.watch(m, () => {
// cancel()
// Memo.change(m, 9)
// })
// Memo.change(m, 1)

const concurrentPromise = concurrency(
concurrencyValve,
range(0, 9)[Symbol.iterator](),
async (_, idx) => {
revoke_count += 1
concurrencyValve.set(0)
Timer(
Math.floor(Math.random() * 100),
() => {
concurrencyValve.set(1)
resolved_idx_list.push(idx)
}
)
},
)

await concurrentPromise

expect(revoke_count).toBe(10)
for (let i = 0; i < resolved_idx_list.length; ++i) {
expect(resolved_idx_list[i]).toBe(i)
}
})
43 changes: 26 additions & 17 deletions src/concurrency.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import { Memo, WithValidating } from './memo'
import { nextTick } from './next-tick';
import { Wait } from './wait'

const NONE_ERROR = Symbol('NONE_ERROR')

export type ConcurrencyNumber = {
export type ConcurrencyValve = {
get(): number;
set(v: number): void
memo: Memo<number>
}

concurrency.Number = (v: number): ConcurrencyNumber => {
concurrency.Valve = (v: number): ConcurrencyValve => {
const memo = WithValidating(Memo(v), v => {
if (v < 1) {
throw new RangeError('concurrent_limit should >= 1')
if (v < 0) {
throw new RangeError('concurrent_limit should >= 0')
} else if (!Number.isInteger(v)) {
if (v !== Infinity) {
throw new TypeError('concurrent_limit should be integer')
Expand All @@ -28,34 +29,42 @@ concurrency.Number = (v: number): ConcurrencyNumber => {
}

concurrency.each = <T>(
MaxConcurrency: ConcurrencyNumber | number,
valve: ConcurrencyValve | number,
list: Array<T>,
asyncFn: (item: T, idx: number, list: T[]) => Promise<void>
) => (
concurrency(
MaxConcurrency,
valve,
list[Symbol.iterator](),
(item, idx) => asyncFn(item, idx, list)
)
)

export async function concurrency<T>(
MaxConcurrency: ConcurrencyNumber | number,
valOrNum: ConcurrencyValve | number,
iterator: IterableIterator<T>,
asyncFn: (item: T, idx: number) => Promise<void>
): Promise<void> {
if (typeof MaxConcurrency === 'number') {
return concurrency(concurrency.Number(MaxConcurrency), iterator, asyncFn)
if (typeof valOrNum === 'number') {
return concurrencyWithValve(concurrency.Valve(valOrNum), iterator, asyncFn)
} else {
return concurrencyWithValve(valOrNum, iterator, asyncFn)
}
}

const [ getMaxConcurrency ] = MaxConcurrency.memo

function concurrencyWithValve<T>(
valve: ConcurrencyValve,
iterator: IterableIterator<T>,
asyncFn: (item: T, idx: number) => Promise<void>
): Promise<void> {
let current_concurrency = 0
let __idx = 0
let error_info: (typeof NONE_ERROR) | Exclude<unknown, typeof NONE_ERROR> = NONE_ERROR
const [ waiting, done ] = Wait()
let result: IteratorResult<T, void>
const cancelWatch = Memo.watch(MaxConcurrency.memo, callConcurrent)
const cancelWatch = Memo.watch(valve.memo, () => {
nextTick().then(run)
})

function after() {
if (error_info === NONE_ERROR) {
Expand All @@ -66,7 +75,7 @@ export async function concurrency<T>(
) {
done()
} else {
callConcurrent()
run()
}
}
}
Expand All @@ -78,10 +87,10 @@ export async function concurrency<T>(
}
}

callConcurrent()
function callConcurrent() {
run()
function run() {
while (
(current_concurrency < getMaxConcurrency()) &&
(current_concurrency < valve.get()) &&
(!result || !result?.done)
) {
current_concurrency += 1
Expand All @@ -103,7 +112,7 @@ export async function concurrency<T>(

return (
waiting.then(() => {
cancelWatch && cancelWatch()
cancelWatch()
if (error_info !== NONE_ERROR) {
throw error_info
}
Expand Down
6 changes: 3 additions & 3 deletions src/concurrent-map.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ConcurrencyNumber, concurrency } from './concurrency'
import { ConcurrencyValve, concurrency } from './concurrency'

export function concurrentMap<T, NT>(
conccurrencyNumber: ConcurrencyNumber | number,
valve: ConcurrencyValve | number,
list: T[],
asyncFn: (item: T, idx: number, total: T[]) => Promise<NT>,
): Promise<NT[]> {
Expand All @@ -10,7 +10,7 @@ export function concurrentMap<T, NT>(
} else {
const new_list: NT[] = []
return concurrency.each(
conccurrencyNumber,
valve,
list,
async (item, idx) => {
new_list[idx] = await asyncFn(item, idx, list)
Expand Down
19 changes: 19 additions & 0 deletions src/memo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -425,3 +425,22 @@ test('watching ValidatingMemo By WatchMemo(ValidatingMemo(Memo', () => {
set(999)
expect(changed).toBe( 2 )
})

test('memo should support cancel in watch handle', () => {
const m = Memo(2)
let i: number = 0
const watchHandler = () => {
cancel()
i += 1
}
const cancel = Memo.watch(m, watchHandler)

expect(i).toBe(0)
Memo.change(m, 1)
expect(i).toBe(1)

assert(m.length === 2)

Memo.change(m, 2)
expect(i).toBe(1)
})
Loading