From e05c634f00eb5b96822956281b6c00eda054a6d6 Mon Sep 17 00:00:00 2001 From: leoc11 Date: Sun, 19 Oct 2025 10:15:11 +0700 Subject: [PATCH 1/4] * 1 Endpoint per thread communication (reduce MessageChannel) * pool + other feature --- src/comlink.ts | 672 ++-------------------------------- src/common.ts | 454 +++++++++++++++++++++++ src/consumer-pool.ts | 13 + src/consumer.ts | 154 ++++++++ src/node-adapter.ts | 12 +- src/protocol.ts | 73 ++-- src/provider.ts | 414 +++++++++++++++++++++ src/remote-controller.ts | 282 ++++++++++++++ src/remote-pool-controller.ts | 211 +++++++++++ src/transfer-handler.ts | 200 ++++++++++ src/window-adapter.ts | 28 ++ 11 files changed, 1837 insertions(+), 676 deletions(-) create mode 100644 src/common.ts create mode 100644 src/consumer-pool.ts create mode 100644 src/consumer.ts create mode 100644 src/provider.ts create mode 100644 src/remote-controller.ts create mode 100644 src/remote-pool-controller.ts create mode 100644 src/transfer-handler.ts create mode 100644 src/window-adapter.ts diff --git a/src/comlink.ts b/src/comlink.ts index 27c13694..e8d41c9a 100644 --- a/src/comlink.ts +++ b/src/comlink.ts @@ -4,651 +4,27 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { - Endpoint, - EventSource, - Message, - MessageType, - PostMessageWithOrigin, - WireValue, - WireValueType, -} from "./protocol"; -export type { Endpoint }; - -export const proxyMarker = Symbol("Comlink.proxy"); -export const createEndpoint = Symbol("Comlink.endpoint"); -export const releaseProxy = Symbol("Comlink.releaseProxy"); -export const finalizer = Symbol("Comlink.finalizer"); - -const throwMarker = Symbol("Comlink.thrown"); - -/** - * Interface of values that were marked to be proxied with `comlink.proxy()`. - * Can also be implemented by classes. - */ -export interface ProxyMarked { - [proxyMarker]: true; -} - -/** - * Takes a type and wraps it in a Promise, if it not already is one. - * This is to avoid `Promise>`. - * - * This is the inverse of `Unpromisify`. - */ -type Promisify = T extends Promise ? T : Promise; -/** - * Takes a type that may be Promise and unwraps the Promise type. - * If `P` is not a Promise, it returns `P`. - * - * This is the inverse of `Promisify`. - */ -type Unpromisify

= P extends Promise ? T : P; - -/** - * Takes the raw type of a remote property and returns the type that is visible to the local thread on the proxy. - * - * Note: This needs to be its own type alias, otherwise it will not distribute over unions. - * See https://www.typescriptlang.org/docs/handbook/advanced-types.html#distributive-conditional-types - */ -type RemoteProperty = - // If the value is a method, comlink will proxy it automatically. - // Objects are only proxied if they are marked to be proxied. - // Otherwise, the property is converted to a Promise that resolves the cloned value. - T extends Function | ProxyMarked ? Remote : Promisify; - -/** - * Takes the raw type of a property as a remote thread would see it through a proxy (e.g. when passed in as a function - * argument) and returns the type that the local thread has to supply. - * - * This is the inverse of `RemoteProperty`. - * - * Note: This needs to be its own type alias, otherwise it will not distribute over unions. See - * https://www.typescriptlang.org/docs/handbook/advanced-types.html#distributive-conditional-types - */ -type LocalProperty = T extends Function | ProxyMarked - ? Local - : Unpromisify; - -/** - * Proxies `T` if it is a `ProxyMarked`, clones it otherwise (as handled by structured cloning and transfer handlers). - */ -export type ProxyOrClone = T extends ProxyMarked ? Remote : T; -/** - * Inverse of `ProxyOrClone`. - */ -export type UnproxyOrClone = T extends RemoteObject - ? Local - : T; - -/** - * Takes the raw type of a remote object in the other thread and returns the type as it is visible to the local thread - * when proxied with `Comlink.proxy()`. - * - * This does not handle call signatures, which is handled by the more general `Remote` type. - * - * @template T The raw type of a remote object as seen in the other thread. - */ -export type RemoteObject = { [P in keyof T]: RemoteProperty }; -/** - * Takes the type of an object as a remote thread would see it through a proxy (e.g. when passed in as a function - * argument) and returns the type that the local thread has to supply. - * - * This does not handle call signatures, which is handled by the more general `Local` type. - * - * This is the inverse of `RemoteObject`. - * - * @template T The type of a proxied object. - */ -export type LocalObject = { [P in keyof T]: LocalProperty }; - -/** - * Additional special comlink methods available on each proxy returned by `Comlink.wrap()`. - */ -export interface ProxyMethods { - [createEndpoint]: () => Promise; - [releaseProxy]: () => void; -} - -/** - * Takes the raw type of a remote object, function or class in the other thread and returns the type as it is visible to - * the local thread from the proxy return value of `Comlink.wrap()` or `Comlink.proxy()`. - */ -export type Remote = - // Handle properties - RemoteObject & - // Handle call signature (if present) - (T extends (...args: infer TArguments) => infer TReturn - ? ( - ...args: { [I in keyof TArguments]: UnproxyOrClone } - ) => Promisify>> - : unknown) & - // Handle construct signature (if present) - // The return of construct signatures is always proxied (whether marked or not) - (T extends { new (...args: infer TArguments): infer TInstance } - ? { - new ( - ...args: { - [I in keyof TArguments]: UnproxyOrClone; - } - ): Promisify>; - } - : unknown) & - // Include additional special comlink methods available on the proxy. - ProxyMethods; - -/** - * Expresses that a type can be either a sync or async. - */ -type MaybePromise = Promise | T; - -/** - * Takes the raw type of a remote object, function or class as a remote thread would see it through a proxy (e.g. when - * passed in as a function argument) and returns the type the local thread has to supply. - * - * This is the inverse of `Remote`. It takes a `Remote` and returns its original input `T`. - */ -export type Local = - // Omit the special proxy methods (they don't need to be supplied, comlink adds them) - Omit, keyof ProxyMethods> & - // Handle call signatures (if present) - (T extends (...args: infer TArguments) => infer TReturn - ? ( - ...args: { [I in keyof TArguments]: ProxyOrClone } - ) => // The raw function could either be sync or async, but is always proxied automatically - MaybePromise>> - : unknown) & - // Handle construct signature (if present) - // The return of construct signatures is always proxied (whether marked or not) - (T extends { new (...args: infer TArguments): infer TInstance } - ? { - new ( - ...args: { - [I in keyof TArguments]: ProxyOrClone; - } - ): // The raw constructor could either be sync or async, but is always proxied automatically - MaybePromise>>; - } - : unknown); - -const isObject = (val: unknown): val is object => - (typeof val === "object" && val !== null) || typeof val === "function"; - -/** - * Customizes the serialization of certain values as determined by `canHandle()`. - * - * @template T The input type being handled by this transfer handler. - * @template S The serialized type sent over the wire. - */ -export interface TransferHandler { - /** - * Gets called for every value to determine whether this transfer handler - * should serialize the value, which includes checking that it is of the right - * type (but can perform checks beyond that as well). - */ - canHandle(value: unknown): value is T; - - /** - * Gets called with the value if `canHandle()` returned `true` to produce a - * value that can be sent in a message, consisting of structured-cloneable - * values and/or transferrable objects. - */ - serialize(value: T): [S, Transferable[]]; - - /** - * Gets called to deserialize an incoming value that was serialized in the - * other thread with this transfer handler (known through the name it was - * registered under). - */ - deserialize(value: S): T; -} - -/** - * Internal transfer handle to handle objects marked to proxy. - */ -const proxyTransferHandler: TransferHandler = { - canHandle: (val): val is ProxyMarked => - isObject(val) && (val as ProxyMarked)[proxyMarker], - serialize(obj) { - const { port1, port2 } = new MessageChannel(); - expose(obj, port1); - return [port2, [port2]]; - }, - deserialize(port) { - port.start(); - return wrap(port); - }, -}; - -interface ThrownValue { - [throwMarker]: unknown; // just needs to be present - value: unknown; -} -type SerializedThrownValue = - | { isError: true; value: Error } - | { isError: false; value: unknown }; -type PendingListenersMap = Map< - string, - (value: WireValue | PromiseLike) => void ->; -type EndpointWithPendingListeners = { - endpoint: Endpoint; - pendingListeners: PendingListenersMap; -}; - -/** - * Internal transfer handler to handle thrown exceptions. - */ -const throwTransferHandler: TransferHandler< - ThrownValue, - SerializedThrownValue -> = { - canHandle: (value): value is ThrownValue => - isObject(value) && throwMarker in value, - serialize({ value }) { - let serialized: SerializedThrownValue; - if (value instanceof Error) { - serialized = { - isError: true, - value: { - message: value.message, - name: value.name, - stack: value.stack, - }, - }; - } else { - serialized = { isError: false, value }; - } - return [serialized, []]; - }, - deserialize(serialized) { - if (serialized.isError) { - throw Object.assign( - new Error(serialized.value.message), - serialized.value - ); - } - throw serialized.value; - }, -}; - -/** - * Allows customizing the serialization of certain values. - */ -export const transferHandlers = new Map< - string, - TransferHandler ->([ - ["proxy", proxyTransferHandler], - ["throw", throwTransferHandler], -]); - -function isAllowedOrigin( - allowedOrigins: (string | RegExp)[], - origin: string -): boolean { - for (const allowedOrigin of allowedOrigins) { - if (origin === allowedOrigin || allowedOrigin === "*") { - return true; - } - if (allowedOrigin instanceof RegExp && allowedOrigin.test(origin)) { - return true; - } - } - return false; -} - -export function expose( - obj: any, - ep: Endpoint = globalThis as any, - allowedOrigins: (string | RegExp)[] = ["*"] -) { - ep.addEventListener("message", function callback(ev: MessageEvent) { - if (!ev || !ev.data) { - return; - } - if (!isAllowedOrigin(allowedOrigins, ev.origin)) { - console.warn(`Invalid origin '${ev.origin}' for comlink proxy`); - return; - } - const { id, type, path } = { - path: [] as string[], - ...(ev.data as Message), - }; - const argumentList = (ev.data.argumentList || []).map(fromWireValue); - let returnValue; - try { - const parent = path.slice(0, -1).reduce((obj, prop) => obj[prop], obj); - const rawValue = path.reduce((obj, prop) => obj[prop], obj); - switch (type) { - case MessageType.GET: - { - returnValue = rawValue; - } - break; - case MessageType.SET: - { - parent[path.slice(-1)[0]] = fromWireValue(ev.data.value); - returnValue = true; - } - break; - case MessageType.APPLY: - { - returnValue = rawValue.apply(parent, argumentList); - } - break; - case MessageType.CONSTRUCT: - { - const value = new rawValue(...argumentList); - returnValue = proxy(value); - } - break; - case MessageType.ENDPOINT: - { - const { port1, port2 } = new MessageChannel(); - expose(obj, port2); - returnValue = transfer(port1, [port1]); - } - break; - case MessageType.RELEASE: - { - returnValue = undefined; - } - break; - default: - return; - } - } catch (value) { - returnValue = { value, [throwMarker]: 0 }; - } - Promise.resolve(returnValue) - .catch((value) => { - return { value, [throwMarker]: 0 }; - }) - .then((returnValue) => { - const [wireValue, transferables] = toWireValue(returnValue); - ep.postMessage({ ...wireValue, id }, transferables); - if (type === MessageType.RELEASE) { - // detach and deactive after sending release response above. - ep.removeEventListener("message", callback as any); - closeEndPoint(ep); - if (finalizer in obj && typeof obj[finalizer] === "function") { - obj[finalizer](); - } - } - }) - .catch((error) => { - // Send Serialization Error To Caller - const [wireValue, transferables] = toWireValue({ - value: new TypeError("Unserializable return value"), - [throwMarker]: 0, - }); - ep.postMessage({ ...wireValue, id }, transferables); - }); - } as any); - if (ep.start) { - ep.start(); - } -} - -function isMessagePort(endpoint: Endpoint): endpoint is MessagePort { - return endpoint.constructor.name === "MessagePort"; -} - -function closeEndPoint(endpoint: Endpoint) { - if (isMessagePort(endpoint)) endpoint.close(); -} - -export function wrap(ep: Endpoint, target?: any): Remote { - const pendingListeners : PendingListenersMap = new Map(); - - ep.addEventListener("message", function handleMessage(ev: Event) { - const { data } = ev as MessageEvent; - if (!data || !data.id) { - return; - } - const resolver = pendingListeners.get(data.id); - if (!resolver) { - return; - } - - try { - resolver(data); - } finally { - pendingListeners.delete(data.id); - } - }); - - return createProxy({ endpoint: ep, pendingListeners }, [], target) as any; -} - -function throwIfProxyReleased(isReleased: boolean) { - if (isReleased) { - throw new Error("Proxy has been released and is not useable"); - } -} - -function releaseEndpoint(epWithPendingListeners: EndpointWithPendingListeners) { - return requestResponseMessage(epWithPendingListeners, { - type: MessageType.RELEASE, - }).then(() => { - closeEndPoint(epWithPendingListeners.endpoint); - }); -} - -interface FinalizationRegistry { - new (cb: (heldValue: T) => void): FinalizationRegistry; - register( - weakItem: object, - heldValue: T, - unregisterToken?: object | undefined - ): void; - unregister(unregisterToken: object): void; -} -declare var FinalizationRegistry: FinalizationRegistry; - -const proxyCounter = new WeakMap(); -const proxyFinalizers = - "FinalizationRegistry" in globalThis && - new FinalizationRegistry( - (epWithPendingListeners: EndpointWithPendingListeners) => { - const newCount = (proxyCounter.get(epWithPendingListeners) || 0) - 1; - proxyCounter.set(epWithPendingListeners, newCount); - if (newCount === 0) { - releaseEndpoint(epWithPendingListeners).finally(() => { - epWithPendingListeners.pendingListeners.clear(); - }); - } - } - ); - -function registerProxy( - proxy: object, - epWithPendingListeners: EndpointWithPendingListeners -) { - const newCount = (proxyCounter.get(epWithPendingListeners) || 0) + 1; - proxyCounter.set(epWithPendingListeners, newCount); - if (proxyFinalizers) { - proxyFinalizers.register(proxy, epWithPendingListeners, proxy); - } -} - -function unregisterProxy(proxy: object) { - if (proxyFinalizers) { - proxyFinalizers.unregister(proxy); - } -} - -function createProxy( - epWithPendingListeners: EndpointWithPendingListeners, - path: (string | number | symbol)[] = [], - target: object = function () {} -): Remote { - let isProxyReleased = false; - const proxy = new Proxy(target, { - get(_target, prop) { - throwIfProxyReleased(isProxyReleased); - if (prop === releaseProxy) { - return () => { - unregisterProxy(proxy); - releaseEndpoint(epWithPendingListeners).finally(() => { - epWithPendingListeners.pendingListeners.clear(); - }); - isProxyReleased = true; - }; - } - if (prop === "then") { - if (path.length === 0) { - return { then: () => proxy }; - } - const r = requestResponseMessage(epWithPendingListeners, { - type: MessageType.GET, - path: path.map((p) => p.toString()), - }).then(fromWireValue); - return r.then.bind(r); - } - return createProxy(epWithPendingListeners, [...path, prop]); - }, - set(_target, prop, rawValue) { - throwIfProxyReleased(isProxyReleased); - // FIXME: ES6 Proxy Handler `set` methods are supposed to return a - // boolean. To show good will, we return true asynchronously ¯\_(ツ)_/¯ - const [value, transferables] = toWireValue(rawValue); - return requestResponseMessage( - epWithPendingListeners, - { - type: MessageType.SET, - path: [...path, prop].map((p) => p.toString()), - value, - }, - transferables - ).then(fromWireValue) as any; - }, - apply(_target, _thisArg, rawArgumentList) { - throwIfProxyReleased(isProxyReleased); - const last = path[path.length - 1]; - if ((last as any) === createEndpoint) { - return requestResponseMessage(epWithPendingListeners, { - type: MessageType.ENDPOINT, - }).then(fromWireValue); - } - // We just pretend that `bind()` didn’t happen. - if (last === "bind") { - return createProxy(epWithPendingListeners, path.slice(0, -1)); - } - const [argumentList, transferables] = processArguments(rawArgumentList); - return requestResponseMessage( - epWithPendingListeners, - { - type: MessageType.APPLY, - path: path.map((p) => p.toString()), - argumentList, - }, - transferables - ).then(fromWireValue); - }, - construct(_target, rawArgumentList) { - throwIfProxyReleased(isProxyReleased); - const [argumentList, transferables] = processArguments(rawArgumentList); - return requestResponseMessage( - epWithPendingListeners, - { - type: MessageType.CONSTRUCT, - path: path.map((p) => p.toString()), - argumentList, - }, - transferables - ).then(fromWireValue); - }, - }); - registerProxy(proxy, epWithPendingListeners); - return proxy as any; -} - -function myFlat(arr: (T | T[])[]): T[] { - return Array.prototype.concat.apply([], arr); -} - -function processArguments(argumentList: any[]): [WireValue[], Transferable[]] { - const processed = argumentList.map(toWireValue); - return [processed.map((v) => v[0]), myFlat(processed.map((v) => v[1]))]; -} - -const transferCache = new WeakMap(); -export function transfer(obj: T, transfers: Transferable[]): T { - transferCache.set(obj, transfers); - return obj; -} - -export function proxy(obj: T): T & ProxyMarked { - return Object.assign(obj, { [proxyMarker]: true }) as any; -} - -export function windowEndpoint( - w: PostMessageWithOrigin, - context: EventSource = globalThis, - targetOrigin = "*" -): Endpoint { - return { - postMessage: (msg: any, transferables: Transferable[]) => - w.postMessage(msg, targetOrigin, transferables), - addEventListener: context.addEventListener.bind(context), - removeEventListener: context.removeEventListener.bind(context), - }; -} - -function toWireValue(value: any): [WireValue, Transferable[]] { - for (const [name, handler] of transferHandlers) { - if (handler.canHandle(value)) { - const [serializedValue, transferables] = handler.serialize(value); - return [ - { - type: WireValueType.HANDLER, - name, - value: serializedValue, - }, - transferables, - ]; - } - } - return [ - { - type: WireValueType.RAW, - value, - }, - transferCache.get(value) || [], - ]; -} - -function fromWireValue(value: WireValue): any { - switch (value.type) { - case WireValueType.HANDLER: - return transferHandlers.get(value.name)!.deserialize(value.value); - case WireValueType.RAW: - return value.value; - } -} - -function requestResponseMessage( - epWithPendingListeners: EndpointWithPendingListeners, - msg: Message, - transfers?: Transferable[] -): Promise { - const ep = epWithPendingListeners.endpoint; - const pendingListeners = epWithPendingListeners.pendingListeners; - return new Promise((resolve) => { - const id = generateUUID(); - pendingListeners.set(id, resolve); - if (ep.start) { - ep.start(); - } - ep.postMessage({ id, ...msg }, transfers); -}); -} - -function generateUUID(): string { - return new Array(4) - .fill(0) - .map(() => Math.floor(Math.random() * Number.MAX_SAFE_INTEGER).toString(16)) - .join("-"); -} +export type { Endpoint } from "./protocol"; +export type { + ProxyMarked, + UnProxyMarked, + ProxyOrClone, + UnproxyOrClone, + Remote, + Local, + TransferHandler, +} from "./common"; +export type { PoolOptions } from "./consumer-pool"; +export { + proxyMarker, + proxyRemoteData, + opch, + transferHandlers, + transfer, +} from "./common"; +export { windowEndpoint } from "./window-adapter"; +export { nodeEndpoint } from "./node-adapter"; +export { expose, unexpose, proxy } from "./provider"; +export { wrap } from "./consumer"; +export { pool } from "./consumer-pool"; +import "./transfer-handler"; diff --git a/src/common.ts b/src/common.ts new file mode 100644 index 00000000..dd945732 --- /dev/null +++ b/src/common.ts @@ -0,0 +1,454 @@ +/** + * @license + * Copyright 2019 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { + Endpoint, + WireValue, + WireValueType, + ThreadID, + ProxyID, +} from "./protocol"; + +export const opch = Symbol("Comlink.optionalChaining"); +export const proxyRemoteData = Symbol("Comlink.proxyRemoteData"); +export const proxyMarker = Symbol("Comlink.proxy"); + +export const throwMarker = Symbol("Comlink.thrown"); +export const threadId: ThreadID = crypto + .getRandomValues(new Uint8Array(6)) + .reduce((p, v) => p * 256 + v, 0); + +export const tidEndPointMap: Map = new Map(); + +export type ProxyRemoteData = { + controller: IRemoteController; + pid: ProxyID; + path: (string | number | symbol)[]; +}; + +export interface IRemoteController { + register(proxy: Remote, pid: ProxyID, isRoot: boolean): void; + unregister(pid: ProxyID): Promise; + get(pid: ProxyID, path: (string | number | symbol)[]): Promise; + set( + pid: ProxyID, + path: (string | number | symbol)[], + rawValue: any + ): Promise; + apply( + pid: ProxyID, + path: (string | number | symbol)[], + argArray: any[] + ): Promise; + construct( + pid: ProxyID, + path: (string | number | symbol)[], + argArray: any[] + ): Promise; +} + +/** + * Interface of values that were marked to be proxied with `comlink.proxy()`. + * Can also be implemented by classes. + */ +export type ProxyMarked = T & { + [proxyMarker]: true; +}; +export type UnProxyMarked = T extends ProxyMarked ? U : never; + +/** + * Takes a type and wraps it in a Promise, if it not already is one. + * This is to avoid `Promise>`. + * + * This is the inverse of `Unpromisify`. + */ +type Promisify = T extends Promise ? T : Promise; +/** + * Takes a type that may be Promise and unwraps the Promise type. + * If `P` is not a Promise, it returns `P`. + * + * This is the inverse of `Promisify`. + */ +type Unpromisify

= P extends Promise ? T : P; + +/** + * Expresses that a type can be either a sync or async. + */ +type MaybePromise = Promise | T; + +/** + * Proxies `T` if it is a `ProxyMarked`, clones it otherwise (as handled by structured cloning and transfer handlers). + */ +export type ProxyOrClone = T extends ProxyMarked + ? Remote + : typeof proxyMarker extends keyof T + ? Remote> + : T; +/** + * Inverse of `ProxyOrClone`. + */ +export type UnproxyOrClone = T extends ProxyMethods + ? U extends Function + ? U | T | ProxyMarked> + : T | ProxyMarked> + : T; + +declare const remoteMarker: unique symbol; +/** + * Additional special comlink methods available on each proxy returned by `Comlink.wrap()`. + */ +export interface ProxyMethods { + [Symbol.dispose]: () => void; + [Symbol.asyncDispose]: () => Promise; + [proxyRemoteData]: ProxyRemoteData; + [remoteMarker]: T; +} + +/** + * Takes the raw type of a remote property and returns the type that is visible to the local thread on the proxy. + * + * Note: This needs to be its own type alias, otherwise it will not distribute over unions. + * See https://www.typescriptlang.org/docs/handbook/advanced-types.html#distributive-conditional-types + */ +type RemoteProperty = + // If the value is a method, comlink will proxy it automatically. + // Objects are only proxied if they are marked to be proxied. + // Otherwise, the property is converted to a Promise that resolves the cloned value. + T extends ProxyMarked + ? Remote & Promisify> + : typeof proxyMarker extends keyof T + ? Remote> & + Promisify>> + : T extends Function + ? Remote + : T extends object + ? Remote & Promisify + : Promisify; + +/** + * Takes the raw type of a remote object in the other thread and returns the type as it is visible to the local thread + * when proxied with `Comlink.proxy()`. + * + * This does not handle call signatures, which is handled by the more general `RemoteCall` type. + * + * @template T The raw type of a remote object as seen in the other thread. + */ +type RemoteObject = { + [P in keyof T]-?: undefined extends T[P] + ? RemoteProperty & { + [opch]: RemoteProperty>; + } + : null extends T[P] + ? RemoteProperty & { + [opch]: RemoteProperty>; + } + : RemoteProperty; +} & (T extends { [Symbol.iterator](): Iterator } + ? { [Symbol.asyncIterator](): AsyncIterableIterator } + : T extends { [Symbol.asyncIterator](): AsyncIterableIterator } + ? { [Symbol.asyncIterator](): AsyncIterableIterator } + : {}); + +export type RemoteCall = T extends { + (...args: infer TArguments1): infer TReturn1; + (...args: infer TArguments2): infer TReturn2; + (...args: infer TArguments3): infer TReturn3; +} + ? { + ( + ...args: { + [I in keyof TArguments1]: UnproxyOrClone; + } + ): Promisify>>; + ( + ...args: { + [I in keyof TArguments2]: UnproxyOrClone; + } + ): Promisify>>; + ( + ...args: { + [I in keyof TArguments3]: UnproxyOrClone; + } + ): Promisify>>; + } + : unknown; + +export type RemoteConstruct = T extends { + new (...args: infer TArguments1): infer TInstance1; + new (...args: infer TArguments2): infer TInstance2; + new (...args: infer TArguments3): infer TInstance3; +} + ? { + new ( + ...args: { + [I in keyof TArguments1]: UnproxyOrClone; + } + ): Promisify>; + new ( + ...args: { + [I in keyof TArguments2]: UnproxyOrClone; + } + ): Promisify>; + new ( + ...args: { + [I in keyof TArguments3]: UnproxyOrClone; + } + ): Promisify>; + } + : unknown; + +/** + * Takes the raw type of a remote object, function or class in the other thread and returns the type as it is visible to + * the local thread from the proxy return value of `Comlink.wrap()` or `Comlink.proxy()`. + */ +export type Remote = + // Handle properties + RemoteObject & + // Handle call signature (if present) + RemoteCall & + // Handle construct signature (if present) + // The return of construct signatures is always proxied (whether marked or not) + RemoteConstruct & + // Include additional special comlink methods available on the proxy. + ProxyMethods; + +/** + * Takes the raw type of a property as a remote thread would see it through a proxy (e.g. when passed in as a function + * argument) and returns the type that the local thread has to supply. + * + * This is the inverse of `RemoteProperty`. + * + * Note: This needs to be its own type alias, otherwise it will not distribute over unions. See + * https://www.typescriptlang.org/docs/handbook/advanced-types.html#distributive-conditional-types + */ +type LocalProperty = T extends Function | ProxyMarked + ? Local + : Unpromisify; + +/** + * Takes the type of an object as a remote thread would see it through a proxy (e.g. when passed in as a function + * argument) and returns the type that the local thread has to supply. + * + * This does not handle call signatures, which is handled by the more general `LocalCall` type. + * + * This is the inverse of `RemoteObject`. + * + * @template T The type of a proxied object. + */ +export type LocalObject = { [P in keyof T]: LocalProperty }; + +// The raw function could either be sync or async, but is always proxied automatically +export type LocalCall = T extends { + (...args: infer TArguments1): infer TReturn1; + (...args: infer TArguments2): infer TReturn2; + (...args: infer TArguments3): infer TReturn3; +} + ? { + ( + ...args: { + [I in keyof TArguments1]: ProxyOrClone; + } + ): MaybePromise>>; + ( + ...args: { + [I in keyof TArguments2]: ProxyOrClone; + } + ): MaybePromise>>; + ( + ...args: { + [I in keyof TArguments3]: ProxyOrClone; + } + ): MaybePromise>>; + } + : unknown; + +// The raw constructor could either be sync or async, but is always proxied automatically +export type LocalConstruct = T extends { + new (...args: infer TArguments1): infer TInstance1; + new (...args: infer TArguments2): infer TInstance2; + new (...args: infer TArguments3): infer TInstance3; +} + ? { + new ( + ...args: { + [I in keyof TArguments1]: ProxyOrClone; + } + ): MaybePromise>>; + new ( + ...args: { + [I in keyof TArguments2]: ProxyOrClone; + } + ): MaybePromise>>; + new ( + ...args: { + [I in keyof TArguments3]: ProxyOrClone; + } + ): MaybePromise>>; + } + : unknown; + +/** + * Takes the raw type of a remote object, function or class as a remote thread would see it through a proxy (e.g. when + * passed in as a function argument) and returns the type the local thread has to supply. + * + * This is the inverse of `Remote`. It takes a `Remote` and returns its original input `T`. + */ +export type Local = + // Omit the special proxy methods (they don't need to be supplied, comlink adds them) + T extends ProxyMethods + ? U | Local + : LocalObject & LocalCall & LocalConstruct; + +/** + * Customizes the serialization of certain values as determined by `canHandle()`. + * + * @template T The input type being handled by this transfer handler. + * @template S The serialized type sent over the wire. + */ +export interface TransferHandler { + /** + * Gets called for every value to determine whether this transfer handler + * should serialize the value, which includes checking that it is of the right + * type (but can perform checks beyond that as well). + */ + canHandle(value: unknown): value is T; + + /** + * Gets called with the value if `canHandle()` returned `true` to produce a + * value that can be sent in a message, consisting of structured-cloneable + * values and/or transferrable objects. + */ + serialize(value: T, providerEp: Endpoint): Promise<[S, Transferable[]]>; + + /** + * Gets called to deserialize an incoming value that was serialized in the + * other thread with this transfer handler (known through the name it was + * registered under). + */ + deserialize(value: S, consumerEp: Endpoint): Promise; +} + +/** + * Allows customizing the serialization of certain values. + */ +export const transferHandlers = new Map< + string, + TransferHandler +>(); + +export function isMessagePort(endpoint: Endpoint): endpoint is MessagePort { + return endpoint.constructor.name === "MessagePort"; +} + +export function closeEndPoint(endpoint: Endpoint) { + if (isMessagePort(endpoint)) endpoint.close(); +} + +const transferCache = new WeakMap(); +export function transfer(obj: T, transfers: Transferable[]): T { + transferCache.set(obj, transfers); + return obj; +} + +export async function toWireValue( + ep: Endpoint, + value: any +): Promise<[WireValue, Transferable[]]> { + for (const [name, handler] of transferHandlers) { + if (handler.canHandle(value)) { + const [serializedValue, transferables] = await handler.serialize( + value, + ep + ); + return [ + { + type: WireValueType.HANDLER, + name, + value: serializedValue, + }, + transferables, + ]; + } + } + return [ + { + type: WireValueType.RAW, + value, + }, + transferCache.get(value) || [], + ]; +} + +export async function fromWireValue( + ep: Endpoint, + value: WireValue +): Promise { + switch (value.type) { + case WireValueType.HANDLER: { + const handler = transferHandlers.get(value.name)!; + return (await handler.deserialize(value.value, ep)) as T; + } + case WireValueType.RAW: + return value.value as T; + } +} + +/** + * Takes the raw type of a remote property and returns the type that is visible to the local thread on the proxy. + * + * `Comlink.proxy()` always Promisify + * + * Note: This needs to be its own type alias, otherwise it will not distribute over unions. + * See https://www.typescriptlang.org/docs/handbook/advanced-types.html#distributive-conditional-types + */ +type RemotePoolProperty = T extends ProxyMarked + ? Promisify> + : typeof proxyMarker extends keyof T + ? Promisify>> + : T extends Function + ? RemotePool + : T extends object + ? RemotePool & Promisify + : Promisify; + +/** + * Takes the raw type of a remote object in the other thread and returns the type as it is visible to the local thread. + * All property is marked as readonly. + * + * This does not handle call signatures, which is handled by `RemoteCall` type. + * + * @template T The raw type of a remote object as seen in the other thread. + */ +type RemotePoolObject = { + readonly [P in keyof T]-?: undefined extends T[P] + ? RemotePoolProperty & { + [opch]: RemotePoolProperty, null>>; + } + : null extends T[P] + ? RemotePoolProperty & { + [opch]: RemotePoolProperty, null>>; + } + : RemotePoolProperty; +} & (T extends { [Symbol.iterator](): Iterator } + ? { [Symbol.asyncIterator](): AsyncIterableIterator } + : T extends { [Symbol.asyncIterator](): AsyncIterableIterator } + ? { [Symbol.asyncIterator](): AsyncIterableIterator } + : {}); + +/** + * Takes the raw type of a remote object, function or class in the other thread and returns the type as it is visible to + * the local thread from the proxy return value of `Comlink.pool()`. + */ +export type RemotePool = + // Handle properties + RemotePoolObject & + // Handle call signature (if present) + RemoteCall & + // Handle construct signature (if present) + // The return of construct signatures is always proxied (whether marked or not) + RemoteConstruct & + // Include additional special comlink methods available on the proxy. + ProxyMethods; diff --git a/src/consumer-pool.ts b/src/consumer-pool.ts new file mode 100644 index 00000000..328bb90b --- /dev/null +++ b/src/consumer-pool.ts @@ -0,0 +1,13 @@ +export type { PoolOptions } from "./remote-pool-controller"; +import { ProxyMarked, RemotePool, UnProxyMarked } from "./common"; +import { createRemote } from "./consumer"; +import { Endpoint } from "./protocol"; +import { PoolOptions, RemotePoolController } from "./remote-pool-controller"; + +export function pool( + spawn: () => Endpoint, + options?: PoolOptions +): RemotePool> { + const poolControler = new RemotePoolController(spawn, options); + return createRemote>(poolControler, 0) as any; +} diff --git a/src/consumer.ts b/src/consumer.ts new file mode 100644 index 00000000..52f56594 --- /dev/null +++ b/src/consumer.ts @@ -0,0 +1,154 @@ +/** + * @license + * Copyright 2019 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { + ProxyMarked, + opch, + Remote, + UnProxyMarked, + IRemoteController, + proxyRemoteData, + ProxyRemoteData, +} from "./common"; +import { Endpoint, ProxyID } from "./protocol"; +import { RemoteController } from "./remote-controller"; + +interface TypedFinalizationRegistry + extends FinalizationRegistry { + new (cleanupCallback: (heldValue: T) => void): TypedFinalizationRegistry< + TW, + T + >; + register(target: TW, heldValue: T, unregisterToken?: TW): void; + unregister(unregisterToken: TW): boolean; +} + +export const remoteFinalizers = !("FinalizationRegistry" in globalThis) + ? undefined + : new (FinalizationRegistry as TypedFinalizationRegistry< + Remote, + [IRemoteController, ProxyID] + >)(([ep, pid]) => setTimeout(() => ep.unregister(pid))); +const controllerMap = new Map(); +const proxyTarget = function () {} as object; + +function throwIfRemoteReleased(isReleased: boolean) { + if (isReleased) { + throw new Error("Proxy has been released and is not useable"); + } +} + +export function wrap( + ep: Endpoint +): Remote> { + const controller = getRemoteController(ep); + return createRemote>(controller, 0); +} + +export function getRemoteController(ep: Endpoint, isAutoCreate = true) { + let controller = controllerMap.get(ep)!; + if (isAutoCreate && !controller) { + controller = new RemoteController(ep); + controllerMap.set(ep, controller); + } + return controller; +} + +export function createRemote( + controller: IRemoteController, + pid: ProxyID, + path: (string | number | symbol)[] = [] +): Remote { + const data: ProxyRemoteData = { + controller, + pid, + path, + }; + let isProxyReleased = false; + const proxy = new Proxy(proxyTarget, { + get(_target, prop) { + throwIfRemoteReleased(isProxyReleased); + if (prop === Symbol.asyncDispose || prop === Symbol.dispose) { + return async () => { + if (remoteFinalizers) { + remoteFinalizers.unregister(proxy); + } + try { + await controller.unregister(pid); + } finally { + isProxyReleased = true; + } + }; + } + if (prop === proxyRemoteData) { + return data; + } + if (prop === "then") { + if (path.length > 0 && path[path.length - 1] === opch) { + path = path.slice(0, -1); + } + if (path.length === 0) { + return { then: () => proxy }; + } + + const r = controller.get(pid, path); + return r.then.bind(r); + } + return createRemote(controller, pid, [...path, prop]); + }, + set(_target, prop, rawValue) { + throwIfRemoteReleased(isProxyReleased); + // FIXME: ES6 Proxy Handler `set` methods are supposed to return a + // boolean. To show good will, we return true asynchronously ¯\_(ツ)_/¯ + + return controller.set(pid, [...path, prop], rawValue) as any; + }, + apply(_target, _thisArg, rawArgumentList) { + throwIfRemoteReleased(isProxyReleased); + const last = path[path.length - 1]; + // We just pretend that `bind()` didn’t happen. + if (last === "bind") { + return createRemote(controller, pid, path.slice(0, -1)); + } + if (last === Symbol.asyncIterator) { + let remoteIterator: Remote> | undefined; + const getIterator = async () => { + if (!remoteIterator) { + remoteIterator = await controller.apply(pid, path, []); + } + return remoteIterator!; + }; + + return { + async next(args) { + const obj = await getIterator(); + return obj.next(args); + }, + async return(value?: any) { + const obj = await getIterator(); + return obj.return[opch](value); + }, + async throw(e?: any) { + const obj = await getIterator(); + return obj.throw[opch](e); + }, + } as AsyncIterator; + } + + return controller.apply(pid, path, rawArgumentList); + }, + construct(_target, rawArgumentList) { + throwIfRemoteReleased(isProxyReleased); + return controller.construct(pid, path, rawArgumentList); + }, + }) as Remote; + + controller.register(proxy, pid, path.length === 0); + if (remoteFinalizers) { + remoteFinalizers.register(proxy, [controller, pid], proxy); + } + return proxy as Remote; +} diff --git a/src/node-adapter.ts b/src/node-adapter.ts index c86c0f70..73f62a59 100644 --- a/src/node-adapter.ts +++ b/src/node-adapter.ts @@ -21,11 +21,11 @@ export interface NodeEndpoint { start?: () => void; } -export default function nodeEndpoint(nep: NodeEndpoint): Endpoint { +export function nodeEndpoint(nep: NodeEndpoint): Endpoint { const listeners = new WeakMap(); return { postMessage: nep.postMessage.bind(nep), - addEventListener: (_, eh) => { + addEventListener: (type, eh) => { const l = (data: any) => { if ("handleEvent" in eh) { eh.handleEvent({ data } as MessageEvent); @@ -33,17 +33,17 @@ export default function nodeEndpoint(nep: NodeEndpoint): Endpoint { eh({ data } as MessageEvent); } }; - nep.on("message", l); + nep.on(type, l); listeners.set(eh, l); }, - removeEventListener: (_, eh) => { + removeEventListener: (type, eh) => { const l = listeners.get(eh); if (!l) { return; } - nep.off("message", l); + nep.off(type, l); listeners.delete(eh); }, - start: nep.start && nep.start.bind(nep), + start: nep.start?.bind(nep), }; } diff --git a/src/protocol.ts b/src/protocol.ts index d3eb2692..27b76d17 100644 --- a/src/protocol.ts +++ b/src/protocol.ts @@ -18,35 +18,24 @@ export interface EventSource { ): void; } -export interface PostMessageWithOrigin { - postMessage( - message: any, - targetOrigin: string, - transfer?: Transferable[] - ): void; -} - export interface Endpoint extends EventSource { postMessage(message: any, transfer?: Transferable[]): void; - start?: () => void; } export const enum WireValueType { - RAW = "RAW", - PROXY = "PROXY", - THROW = "THROW", - HANDLER = "HANDLER", + RAW = -1, + HANDLER = -2, } export interface RawWireValue { - id?: string; + id?: number; type: WireValueType.RAW; value: {}; } export interface HandlerWireValue { - id?: string; + id?: number; type: WireValueType.HANDLER; name: string; value: unknown; @@ -54,26 +43,45 @@ export interface HandlerWireValue { export type WireValue = RawWireValue | HandlerWireValue; -export type MessageID = string; +export type MessageID = number; +export type ProxyID = number; +export type ThreadID = number; export const enum MessageType { - GET = "GET", - SET = "SET", - APPLY = "APPLY", - CONSTRUCT = "CONSTRUCT", - ENDPOINT = "ENDPOINT", - RELEASE = "RELEASE", + READY = 0, + EXCHANGETID = 1, + MERGE = 2, + GET = 3, + SET = 4, + APPLY = 5, + CONSTRUCT = 6, + ENDPOINT = 7, + HEARTBEAT = 8, + RELEASE = 9, +} + +export interface PingMessage { + id?: MessageType.READY; + type: MessageType.READY; +} + +export interface HeartBeatMessage { + id?: MessageID; + type: MessageType.HEARTBEAT; + lock?: string; } export interface GetMessage { id?: MessageID; type: MessageType.GET; + pid: ProxyID; path: string[]; } export interface SetMessage { id?: MessageID; type: MessageType.SET; + pid: ProxyID; path: string[]; value: WireValue; } @@ -81,6 +89,7 @@ export interface SetMessage { export interface ApplyMessage { id?: MessageID; type: MessageType.APPLY; + pid: ProxyID; path: string[]; argumentList: WireValue[]; } @@ -88,6 +97,7 @@ export interface ApplyMessage { export interface ConstructMessage { id?: MessageID; type: MessageType.CONSTRUCT; + pid: ProxyID; path: string[]; argumentList: WireValue[]; } @@ -95,17 +105,36 @@ export interface ConstructMessage { export interface EndpointMessage { id?: MessageID; type: MessageType.ENDPOINT; + pid?: ProxyID; } export interface ReleaseMessage { id?: MessageID; type: MessageType.RELEASE; + pid: ProxyID; +} + +export interface ShareIdMessage { + id?: MessageID; + type: MessageType.EXCHANGETID; + tid: ThreadID; +} + +export interface MergeMessage { + id?: MessageID; + type: MessageType.MERGE; + ep: MessagePort; + tid: ThreadID; } export type Message = + | PingMessage + | MergeMessage + | ShareIdMessage | GetMessage | SetMessage | ApplyMessage | ConstructMessage | EndpointMessage + | HeartBeatMessage | ReleaseMessage; diff --git a/src/provider.ts b/src/provider.ts new file mode 100644 index 00000000..fa39df6d --- /dev/null +++ b/src/provider.ts @@ -0,0 +1,414 @@ +/** + * @license + * Copyright 2019 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { + Endpoint, + ProxyID, + Message, + MessageType, + WireValue, + WireValueType, + ThreadID, + MergeMessage, +} from "./protocol"; +import { + fromWireValue, + toWireValue, + opch, + ProxyMarked, + transfer, + isMessagePort, + throwMarker, + closeEndPoint, + proxyMarker, + threadId, + tidEndPointMap, +} from "./common"; + +type ProviderResource = { + proxyIdMap: Map; // id, object + callback: EventListenerOrEventListenerObject; + timeout?: number; + tid?: ThreadID; +}; + +const symbolOpchStr = opch.toString(); +const symbolMap = new Map([ + [Symbol.asyncIterator.toString(), Symbol.asyncIterator], +]); +const isSharedWorker = + typeof SharedWorkerGlobalScope !== "undefined" && + globalThis instanceof SharedWorkerGlobalScope; + +export const objectMap = new WeakMap< + any, + { pid?: ProxyID; proxy: ProxyMarked; count: number } +>(); // object => pid, proxy +export const proxyMap = new WeakMap(); // proxy => pid, obj +export const providerResourceMap = new Map(); + +export function expose( + obj: T, + ep: Endpoint = globalThis as any, + allowedOrigins: (string | RegExp)[] = ["*"], + timeout?: number +) { + let resource = providerResourceMap.get(ep)!; + if (!resource) { + if (!isSharedWorker) { + timeout = undefined; + } + + resource = { + proxyIdMap: new Map(), + callback: async function callback(ev: MessageEvent) { + if (!ev || !ev.data) { + return; + } + + if (!isAllowedOrigin(allowedOrigins, ev.origin)) { + console.warn(`Invalid origin '${ev.origin}' for comlink proxy`); + return; + } + + // console.log(`>> [${threadId}] ${JSON.stringify(ev.data)}`); + const { id, type } = ev.data as Message; + if ( + typeof id !== "number" || + typeof type !== "number" || + type <= WireValueType.RAW + ) { + return; + } + + const path: string[] = ev.data.path ?? []; + let pid: ProxyID = ev.data.pid ?? 0; + if (pid === 0) { + pid = resource.proxyIdMap.keys().next().value ?? 0; + } + + const argumentList = await Promise.all( + (ev.data.argumentList || []).map((val: any) => fromWireValue(ep, val)) + ); + let returnValue: any, parent: any; + try { + let lastpath: string, rawValue: any; + if (path.length === 0) { + parent = rawValue = resource.proxyIdMap.get(pid); + lastpath = undefined as any; + } else { + parent = resource.proxyIdMap.get(pid); + const walkResult = walk(parent, path); + parent = walkResult.parent; + rawValue = walkResult.value; + lastpath = path[path.length - 1]; + } + switch (type) { + case MessageType.READY: + { + returnValue = undefined; + } + break; + case MessageType.EXCHANGETID: + { + const tid = ev.data.tid; + tidEndPointMap.set(tid, ep); + resource.tid = tid; + returnValue = { + tid: threadId, + timeout: timeout && Math.floor(timeout / 3), + }; + } + break; + case MessageType.MERGE: + { + const { tid } = ev.data as MergeMessage; + const targetEP = tidEndPointMap.get(tid)!; + const { proxyIdMap } = providerResourceMap.get(targetEP)!; + for (const [key, obj] of resource.proxyIdMap) { + if (!proxyIdMap.has(key)) { + proxyIdMap.set(key, obj); + } + } + returnValue = undefined; + } + break; + case MessageType.GET: + { + returnValue = rawValue; + } + break; + case MessageType.SET: + { + parent[lastpath] = await fromWireValue(ep, ev.data.value); + returnValue = true; + } + break; + case MessageType.APPLY: + { + const symbol = symbolMap.get(lastpath); + if (symbol) { + switch (symbol) { + case Symbol.asyncIterator: { + if (Symbol.asyncIterator in parent) { + rawValue = parent[Symbol.asyncIterator]; + } else { + rawValue = parent[Symbol.iterator]; + } + returnValue = expose( + rawValue.apply(parent, argumentList), + ep + ); + break; + } + } + } else { + returnValue = rawValue.apply(parent, argumentList); + } + } + break; + case MessageType.CONSTRUCT: + { + const value = new rawValue(...argumentList); + returnValue = proxy(value); + } + break; + case MessageType.ENDPOINT: + { + const { port1, port2 } = new MessageChannel(); + expose(parent, port2); + returnValue = transfer(port1, [port1]); + } + break; + case MessageType.HEARTBEAT: + { + if (!timeout) { + returnValue = undefined; + } else { + clearTimeout(resource.timeout); + if (ev.data.lock) { + navigator.locks.request(ev.data.lock, () => { + unexpose(ep); + }); + resource.timeout = undefined; + returnValue = undefined; + } else { + resource.timeout = setTimeout(() => { + unexpose(ep); + }, timeout) as any; + returnValue = Math.floor(timeout / 3); + } + } + } + break; + case MessageType.RELEASE: + { + returnValue = undefined; + } + break; + default: + return; + } + } catch (value) { + returnValue = { value, [throwMarker]: 0 }; + } + Promise.resolve(returnValue) + .catch((value) => { + return { value, [throwMarker]: 0 }; + }) + .then(async (returnValue) => { + const [wireValue, transferables] = await toWireValue( + ep, + returnValue + ); + wireValue.id = id; + //console.log(`[${threadId}] ${JSON.stringify(wireValue)} >>>>`); + ep.postMessage(wireValue, transferables); + + switch (type) { + case MessageType.RELEASE: { + // detach after sending release response above. + unexpose(ep, [parent]); + break; + } + case MessageType.MERGE: { + // detach and deactive after sending release response above. + unexpose(ep); + break; + } + } + }) + .catch(async (error) => { + // Send Serialization Error To Caller + const [wireValue, transferables] = await toWireValue(ep, { + value: new TypeError("Unserializable return value", { + cause: error, + }), + [throwMarker]: 0, + }); + wireValue.id = id; + ep.postMessage(wireValue, transferables); + }); + } as any, + }; + + providerResourceMap.set(ep, resource); + ep.addEventListener("message", resource.callback); + if (ep.start) { + ep.start(); + } + + if (timeout) { + resource.timeout = setTimeout(function () { + unexpose(ep); + }, timeout) as any; + } + + if (!isMessagePort(ep)) { + ep.postMessage({ + id: MessageType.READY, + type: WireValueType.RAW, + } as WireValue); + } + } + + const proxyObj = proxy(obj); + const proxyData = registerProxy(proxyObj); + const id = proxyData.pid!; + resource.proxyIdMap.set(id, obj); + objectMap.get(obj)!.count++; + return proxyObj; +} + +export function unexpose(ep: Endpoint = globalThis as any, objs?: any[]) { + const resource = providerResourceMap.get(ep); + if (!resource) { + return; + } + + const rproxyMap = resource.proxyIdMap; + if (!objs) { + objs = Array.from(rproxyMap.values()); + } + + for (const obj of objs) { + const objData = objectMap.get(obj)!; + const id = objData.pid!; + if (!rproxyMap.has(id)) { + continue; + } + + rproxyMap.delete(id); + objData.count--; + if (objData.count === 0) { + if ( + Symbol.asyncDispose in obj && + typeof obj[Symbol.asyncDispose] === "function" + ) { + obj[Symbol.asyncDispose](); + } else if ( + Symbol.dispose in obj && + typeof obj[Symbol.dispose] === "function" + ) { + obj[Symbol.dispose](); + } + } + } + + if (rproxyMap.size > 0) { + return; + } + + //console.warn(`[${threadId}] unexpose ${resource.tid}`); + const { callback, timeout, tid } = resource; + callback && ep.removeEventListener("message", callback); + closeEndPoint(ep); + providerResourceMap.delete(ep); + tid && tidEndPointMap.delete(tid); + typeof timeout !== "undefined" && clearTimeout(timeout); +} + +function isAllowedOrigin( + allowedOrigins: (string | RegExp)[], + origin: string +): boolean { + for (const allowedOrigin of allowedOrigins) { + if (origin === allowedOrigin || allowedOrigin === "*") { + return true; + } + if (allowedOrigin instanceof RegExp && allowedOrigin.test(origin)) { + return true; + } + } + return false; +} + +export function proxy(obj: T): ProxyMarked { + if (proxyMap.has(obj)) { + return obj as ProxyMarked; + } + + let objectData = objectMap.get(obj); + if (!objectData) { + objectData = { proxy: undefined, count: 0 }; + objectMap.set(obj, objectData); + } + if (!objectData.proxy) { + objectData.proxy = isProxy(obj) + ? obj + : (new Proxy(obj as any, {}) as ProxyMarked); + proxyMap.set(objectData.proxy, { obj }); + } + + return objectData.proxy; +} + +let proxyIdCounter = 1; +export function registerProxy(obj: ProxyMarked) { + const proxyData = proxyMap.get(obj)!; + if (typeof proxyData.pid !== "number") { + proxyData.pid = proxyIdCounter++; + objectMap.get(proxyData.obj)!.pid = proxyData.pid; + } + + return proxyData; +} + +export function isProxy(obj: T): obj is ProxyMarked { + return ( + proxyMap.has(obj) || + (obj as ProxyMarked)?.[proxyMarker] || + typeof obj === "function" + ); +} + +export function walk(parent: any, path: string[]) { + let isOpch = false; + let value = parent; + for (let i = 0, len = path.length; i < len; i++) { + const prop = path[i]; + if (!isOpch && (value === null || value === undefined)) { + // stop early + break; + } + if (prop === symbolOpchStr) { + isOpch = true; + // continue chain + continue; + } + + parent = value; + value = isOpch ? value?.[prop] : value[prop]; + if (isOpch && value !== undefined && value !== null) { + isOpch = false; + } + } + + return { + parent: parent, + value: value, + }; +} diff --git a/src/remote-controller.ts b/src/remote-controller.ts new file mode 100644 index 00000000..7cb4df72 --- /dev/null +++ b/src/remote-controller.ts @@ -0,0 +1,282 @@ +import { + IRemoteController, + Remote, + closeEndPoint, + toWireValue, + threadId, + fromWireValue, + isMessagePort, + tidEndPointMap, +} from "./common"; +import { + WireValue, + Message, + ThreadID, + ProxyID, + Endpoint, + MessageType, +} from "./protocol"; + +export const remotePidMap = new WeakMap(); +const hasLockApi = () => + navigator && + "locks" in navigator && + typeof navigator.locks?.request === "function"; + +export class RemoteController implements IRemoteController { + protected messageIdSequence = 1; + protected pendingListeners = new Map void>(); + protected isReady = false; + protected queue?: [Message, Transferable[]?][] = []; + protected heartBeat?: number; + public tid?: ThreadID; + protected pidRemoteMap = new Map< + ProxyID, + { remote: WeakRef; count: number } + >(); + + constructor(protected ep: Endpoint) { + this.handler = this.handler.bind(this); + this.initialize(); + } + + public getRemote(pid: ProxyID): Remote | undefined { + return this.pidRemoteMap.get(pid)?.remote?.deref(); + } + public async unregister(pid: ProxyID) { + const counterData = this.pidRemoteMap.get(pid); + if (counterData) { + counterData.count--; + if (counterData.count === 0) { + this.pidRemoteMap.delete(pid); + + try { + await this.requestResponse({ + type: MessageType.RELEASE, + pid: pid, + }); + } catch { + } finally { + if (this.pidRemoteMap.size === 0) { + this.ep.removeEventListener("message", this.handler as any); + closeEndPoint(this.ep); + this.heartBeat ?? clearTimeout(this.heartBeat); + this.pendingListeners.clear(); + this.queue = undefined; + this.tid && tidEndPointMap.delete(this.tid); + } + } + } + } + + return this.pidRemoteMap.size === 0; + } + public register(proxy: Remote, pid: ProxyID, isRoot: boolean) { + remotePidMap.set(proxy, pid); + if (!this.pidRemoteMap.has(pid)) { + this.pidRemoteMap.set(pid, { remote: new WeakRef(proxy), count: 0 }); + } + const remoteData = this.pidRemoteMap.get(pid)!; + if (isRoot && !remoteData.remote.deref()) { + remoteData.remote = new WeakRef(proxy); + } + remoteData.count++; + } + public get(pid: ProxyID, path: (string | number | symbol)[]) { + return this.requestResponse({ + type: MessageType.GET, + pid: pid, + path: path.map((p) => p.toString()), + }); + } + public async set( + pid: ProxyID, + path: (string | number | symbol)[], + rawValue: any + ) { + if (rawValue instanceof Promise) { + rawValue = await rawValue; + } + + const [value, transferables] = await toWireValue(this.ep, rawValue); + return await this.requestResponse( + { + type: MessageType.SET, + pid: pid, + path: path.map((p) => p.toString()), + value, + }, + transferables + ); + } + public async apply( + pid: ProxyID, + path: (string | number | symbol)[], + argArray: any[] + ) { + const [argumentList, transferables] = await this.processArguments( + this.ep, + argArray + ); + return this.requestResponse( + { + type: MessageType.APPLY, + pid: pid, + path: path.map((p) => p.toString()), + argumentList, + }, + transferables + ); + } + public async construct( + pid: ProxyID, + path: (string | number | symbol)[], + argArray: any[] + ) { + const [argumentList, transferables] = await this.processArguments( + this.ep, + argArray + ); + return this.requestResponse( + { + type: MessageType.CONSTRUCT, + pid: pid, + path: path.map((p) => p.toString()), + argumentList, + }, + transferables + ); + } + public createEndPoint(pid: ProxyID) { + return this.requestResponse({ + type: MessageType.ENDPOINT, + pid: pid, + }); + } + protected requestResponse( + msg: Message, + transfers?: Transferable[] + ): Promise { + return new Promise((resolve) => { + msg.id = this.messageIdSequence++; + //console.log(`[${threadId}] ${JSON.stringify(msg)} >>`); + if (this.isReady) { + this.ep.postMessage(msg, transfers); + } else { + this.queue?.push([msg, transfers]); + } + + this.pendingListeners.set(msg.id, resolve); + }).then((wireValue) => fromWireValue(this.ep, wireValue)); + } + protected async processArguments( + ep: Endpoint, + argumentList: any[] + ): Promise<[WireValue[], Transferable[]]> { + const processed = await Promise.all( + argumentList.map((val) => toWireValue(ep, val)) + ); + const mapped = processed.reduce( + (mapped, arg) => { + mapped[0].push(arg[0]); + if (Array.isArray(arg[1])) { + mapped[1].push(...arg[1]); + } + + return mapped; + }, + [[], []] as [WireValue[], Transferable[]] + ); + return mapped; + } + protected initialize() { + this.ep.addEventListener("message", this.handler as any); + if (this.ep.start) { + this.ep.start(); + } + + this.isReady = isMessagePort(this.ep); + if (this.isReady) { + this.queue = undefined; + } else { + this.pendingListeners.set(MessageType.READY, (async () => { + this.isReady = true; + const queue = this.queue; + this.queue = undefined; + if (queue) { + for (const q of queue) { + this.ep.postMessage(q[0], q[1]); + } + } + }) as any); + this.ep.postMessage({ id: MessageType.READY, type: MessageType.READY }); + } + + this.sendTID(); + } + + protected async handler(ev: MessageEvent) { + const data = ev.data; + if ( + !data || + typeof data.id !== "number" || + data.type >= MessageType.READY + ) { + return; + } + //console.log(`>>>> [${threadId}] ${JSON.stringify(ev.data)}`); + const resolver = this.pendingListeners.get(data.id); + if (!resolver) { + return; + } + + try { + resolver(data); + } finally { + this.pendingListeners.delete(data.id); + } + } + + protected async sendTID() { + const resp = await this.requestResponse<{ + tid: ThreadID; + timeout?: number; + }>({ + type: MessageType.EXCHANGETID, + tid: threadId, + }); + this.tid = resp.tid; + tidEndPointMap.set(resp.tid, this.ep); + + if (resp.timeout) { + if (hasLockApi()) { + const lockId = `Comlink.id:${Date.now()}:${ + Math.random() * Number.MAX_SAFE_INTEGER + }`; + navigator.locks.request( + lockId, + { mode: "exclusive" }, + () => new Promise(() => {}) + ); + this.requestResponse({ + type: MessageType.HEARTBEAT, + lock: lockId, + }); + } else { + this.sendHeartBeat(resp.timeout); + } + } + } + protected sendHeartBeat(timeout: number) { + if (typeof timeout !== "number" || timeout <= 0) { + return; + } + + this.heartBeat ?? clearTimeout(this.heartBeat); + this.heartBeat = setTimeout(() => { + this.requestResponse({ + type: MessageType.HEARTBEAT, + }).then((val: number) => this.sendHeartBeat(val)); + }, timeout) as any; + } +} diff --git a/src/remote-pool-controller.ts b/src/remote-pool-controller.ts new file mode 100644 index 00000000..16a9c2e7 --- /dev/null +++ b/src/remote-pool-controller.ts @@ -0,0 +1,211 @@ +import { IRemoteController, Remote } from "./common"; +import { remoteFinalizers, getRemoteController } from "./consumer"; +import { Endpoint, ProxyID, WireValue } from "./protocol"; + +export type PoolOptions = { + min?: number; + max?: number; + idleTimeout?: number; + maxQueue?: number; +}; + +type PoolConfig = { + spawn: () => Endpoint; + min: number; + max: number; + idleTimeout: number; + maxQueue?: number; +}; +type TaskType = "set" | "get" | "apply" | "construct"; +type Task = { + executor: (value?: any) => void; + type: TaskType; + pid: ProxyID; + path: (string | number | symbol)[]; + args?: any; +}; + +export class RemotePoolController implements IRemoteController { + protected taskQueue: Task[] = []; + protected idleWorkers = new Map(); + protected workers = new Set(); + protected rootProxy?: WeakRef; + protected idleTimeoutId?: number; + protected proxyCount: number; + protected config: PoolConfig; + + constructor(spawn: () => Endpoint, options?: PoolOptions) { + const min = options?.min || 1; + this.config = { + spawn: spawn, + min: min, + max: Math.max(min, options?.max || navigator.hardwareConcurrency), + idleTimeout: Math.max(0, options?.idleTimeout || 30_000), + maxQueue: options?.maxQueue, + }; + this.proxyCount = 0; + for (let i = 0; i < this.config.min; i++) { + this.spawnWorker(); + } + } + + public async unregister(pid: ProxyID) { + this.proxyCount--; + if (this.proxyCount === 0) { + const unregisterPromises: Promise[] = []; + for (const worker of this.workers) { + const prom = worker.unregister(pid); + unregisterPromises.push(prom); + this.idleWorkers.delete(worker); + this.workers.delete(worker); + } + await Promise.all(unregisterPromises); + } + + return this.workers.size == 0; + } + public register(proxy: Remote, pid: ProxyID, isRoot: boolean) { + if (isRoot) { + this.rootProxy = new WeakRef(proxy); + for (const worker of this.workers) { + worker.register(proxy, pid, isRoot); + } + } + + this.proxyCount++; + } + public get(pid: ProxyID, path: (string | number | symbol)[]) { + return this.requestResponse("get", pid, path); + } + public set( + _pid: ProxyID, + _path: (string | number | symbol)[], + _rawValue: WireValue + ): Promise { + throw new Error("pool don't support set"); + } + public apply( + pid: ProxyID, + path: (string | number | symbol)[], + argArray: any[] + ) { + return this.requestResponse("apply", pid, path, argArray); + } + public construct( + pid: ProxyID, + path: (string | number | symbol)[], + argArray: any[] + ) { + return this.requestResponse("construct", pid, path, argArray); + } + + protected async requestResponse( + type: TaskType, + pid: ProxyID, + path: (string | number | symbol)[], + args?: any + ): Promise { + if ( + typeof this.config.maxQueue === "number" && + this.taskQueue.length >= this.config.maxQueue + ) { + throw new Error("max queue reached"); + } + + return new Promise((resolve) => { + this.taskQueue.push({ + executor: resolve, + type: type, + pid: pid, + path: path, + args: args, + }); + this.runTask(); + }); + } + + protected runTask() { + if (this.taskQueue.length <= 0) { + return; + } + if (this.idleWorkers.size <= 0) { + if (this.config.max <= this.workers.size) { + return; + } + + this.spawnWorker(); + } + + const [worker] = this.idleWorkers.entries().next().value!; + this.idleWorkers.delete(worker); + const job = this.taskQueue.shift()!; + + this.scheduleIdleTimer(); + worker[job.type](job.pid, job.path, job.args) + .then((val: any) => { + job.executor(val); + }) + .finally(() => { + const requireSchedule = this.idleWorkers.size === 0; + this.idleWorkers.set(worker, Date.now() + this.config.idleTimeout); + if (this.taskQueue.length > 0) { + this.runTask(); + } else if (requireSchedule) { + this.scheduleIdleTimer(); + } + }); + } + + protected scheduleIdleTimer() { + clearTimeout(this.idleTimeoutId); + if (this.workers.size <= this.config.min) { + return; + } + if (this.idleWorkers.size === 0) { + return; + } + + const timeoutAt = Math.max( + 0, + this.idleWorkers.entries().next().value![1] - Date.now() + ); + this.idleTimeoutId = setTimeout(() => { + const now = Date.now(); + for (const [worker, expireAt] of this.idleWorkers) { + if (expireAt > now) { + break; + } + if (this.workers.size <= this.config.min) { + break; + } + + this.releaseWorker(worker); + } + + this.scheduleIdleTimer(); + }, timeoutAt) as any; + } + + protected releaseWorker(worker: IRemoteController) { + const rootProxy = this.rootProxy?.deref(); + if (rootProxy && remoteFinalizers) { + remoteFinalizers.unregister(rootProxy); + } + worker.unregister(0); + this.workers.delete(worker); + this.idleWorkers.delete(worker); + //console.log(`release worker ${this.workers.size}`); + } + protected spawnWorker() { + const ep = this.config.spawn(); + const worker = getRemoteController(ep)!; + const proxy = this.rootProxy?.deref(); + if (proxy) { + worker.register(proxy, 0, true); + } + + this.workers.add(worker); + this.idleWorkers.set(worker, Date.now() + this.config.idleTimeout); + //console.log(`spawn worker ${this.workers.size}`); + } +} diff --git a/src/transfer-handler.ts b/src/transfer-handler.ts new file mode 100644 index 00000000..75aa0349 --- /dev/null +++ b/src/transfer-handler.ts @@ -0,0 +1,200 @@ +/** + * @license + * Copyright 2019 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { + TransferHandler, + Remote, + throwMarker, + transferHandlers, + threadId, + proxyRemoteData, + tidEndPointMap, + proxyMarker, + ProxyMarked, +} from "./common"; +import { createRemote, getRemoteController } from "./consumer"; +import { RemoteController, remotePidMap } from "./remote-controller"; +import { + providerResourceMap, + registerProxy, + objectMap, + expose, + walk, + proxyMap, + proxy, +} from "./provider"; +import { MergeMessage, MessageType, ProxyID, ThreadID } from "./protocol"; + +/** + * Internal transfer handle to handle proxy object. + */ +type SeralizedRemoteValue = { + pid: ProxyID; + mp?: MessagePort; + tid?: ThreadID; + path: string[]; +}; +const remoteTransferHandler: TransferHandler = { + canHandle: (val): val is Remote => { + return ( + remotePidMap.has(val as any) && + (val as Remote)[proxyRemoteData]?.controller instanceof RemoteController + ); + }, + async serialize(obj, targetEp) { + const pid = remotePidMap.get(obj)!; + const data = obj[proxyRemoteData]; + const result: SeralizedRemoteValue = { + pid: pid!, + path: data.path.map((o) => o.toString()), + }; + const controller = data.controller as RemoteController; + if ( + controller?.tid != threadId && + controller !== getRemoteController(targetEp, false) + ) { + result.mp = await controller.createEndPoint(pid); + result.tid = controller.tid; + } + const transferables = []; + if (result.mp) { + transferables.push(result.mp); + } + return [result, transferables]; + }, + async deserialize({ pid, mp, tid, path }: SeralizedRemoteValue, ep) { + if (mp && tid) { + ep = mp; + const existingEp = tidEndPointMap.get(tid!); + if (existingEp) { + await new Promise((res, rej) => { + mp.start?.(); + mp.postMessage({ + id: -1, + type: MessageType.MERGE, + tid: threadId, + } as MergeMessage); + mp.onmessage = () => { + res(); + mp.onmessage = null; + mp.close(); + }; + mp.onmessageerror = rej; + }); + ep = existingEp; + } else { + tidEndPointMap.set(tid!, ep); + } + + const controller = getRemoteController(ep); + const remote = createRemote(controller, pid, path); + controller.tid = tid; + return remote; + } else { + const resource = providerResourceMap.get(ep)!; + if (pid === 0) { + pid = resource.proxyIdMap.keys().next().value ?? 0; + } + const root = resource.proxyIdMap.get(pid); + let d = walk(root, path).value; + if (proxyMap.has(d)) { + d = proxyMap.get(d)?.obj ?? d; + } + + return d; + } + }, +}; + +/** + * Internal transfer handle to handle objects marked to proxy. + */ +const proxyTransferHandler: TransferHandler = { + canHandle: (val): val is ProxyMarked => { + if (proxyMap.has(val)) { + return true; + } + if ((val as ProxyMarked)?.[proxyMarker] || typeof val === "function") { + proxy(val); + return true; + } + + return false; + }, + async serialize(obj, ep) { + const proxyData = registerProxy(obj); + let resource = providerResourceMap.get(ep); + if (!resource) { + expose(proxyData.obj, ep); + resource = providerResourceMap.get(ep)!; + } + + if (!resource.proxyIdMap.has(proxyData.pid!)) { + resource.proxyIdMap.set(proxyData.pid!, proxyData.obj); + objectMap.get(proxyData.obj)!.count++; + } + return [proxyData.pid!, []]; + }, + async deserialize(pid, ep) { + const controller = getRemoteController(ep); + return controller.getRemote(pid) ?? createRemote(controller, pid); + }, +}; + +interface ThrownValue { + [throwMarker]: unknown; // just needs to be present + value: unknown; +} +type SerializedThrownValue = + | { isError: true; value: Error } + | { isError: false; value: unknown }; + +const isObject = (val: unknown): val is object => + (typeof val === "object" && val !== null) || typeof val === "function"; + +/** + * Internal transfer handler to handle thrown exceptions. + */ +const throwTransferHandler: TransferHandler< + ThrownValue, + SerializedThrownValue +> = { + canHandle: (value): value is ThrownValue => + isObject(value) && throwMarker in value, + async serialize({ value }) { + let serialized: SerializedThrownValue; + if (value instanceof Error) { + serialized = { + isError: true, + value: { + message: value.message, + name: value.name, + stack: value.stack, + cause: value.cause, + }, + }; + } else { + serialized = { isError: false, value }; + } + return [serialized, []]; + }, + async deserialize(serialized) { + if (serialized.isError) { + throw Object.assign( + new Error(serialized.value.message), + serialized.value + ); + } + throw serialized.value; + }, +}; + +/** + * Allows customizing the serialization of certain values. + */ +transferHandlers.set("remote", remoteTransferHandler); +transferHandlers.set("proxy", proxyTransferHandler); +transferHandlers.set("throw", throwTransferHandler); diff --git a/src/window-adapter.ts b/src/window-adapter.ts new file mode 100644 index 00000000..880b30df --- /dev/null +++ b/src/window-adapter.ts @@ -0,0 +1,28 @@ +/** + * @license + * Copyright 2019 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { Endpoint, EventSource } from "./protocol"; + +export interface PostMessageWithOrigin { + postMessage( + message: any, + targetOrigin: string, + transfer?: Transferable[] + ): void; +} + +export function windowEndpoint( + w: PostMessageWithOrigin, + context: EventSource = globalThis, + targetOrigin = "*" +): Endpoint { + return { + postMessage: (msg: any, transferables: Transferable[]) => + w.postMessage(msg, targetOrigin, transferables), + addEventListener: context.addEventListener.bind(context), + removeEventListener: context.removeEventListener.bind(context), + }; +} From 7a6f42de93d4dd1043295078dc591395d725c4e8 Mon Sep 17 00:00:00 2001 From: leoc11 Date: Sun, 19 Oct 2025 10:41:23 +0700 Subject: [PATCH 2/4] * remove symbol.dispose --- src/common.ts | 1 - src/consumer.ts | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/common.ts b/src/common.ts index dd945732..c6a8dccc 100644 --- a/src/common.ts +++ b/src/common.ts @@ -101,7 +101,6 @@ declare const remoteMarker: unique symbol; * Additional special comlink methods available on each proxy returned by `Comlink.wrap()`. */ export interface ProxyMethods { - [Symbol.dispose]: () => void; [Symbol.asyncDispose]: () => Promise; [proxyRemoteData]: ProxyRemoteData; [remoteMarker]: T; diff --git a/src/consumer.ts b/src/consumer.ts index 52f56594..2dbc5e73 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -71,7 +71,7 @@ export function createRemote( const proxy = new Proxy(proxyTarget, { get(_target, prop) { throwIfRemoteReleased(isProxyReleased); - if (prop === Symbol.asyncDispose || prop === Symbol.dispose) { + if (prop === Symbol.asyncDispose) { return async () => { if (remoteFinalizers) { remoteFinalizers.unregister(proxy); From 733a39569811932f0bf8dc1e9fd9c09c7df362bb Mon Sep 17 00:00:00 2001 From: leoc11 Date: Sun, 19 Oct 2025 10:42:19 +0700 Subject: [PATCH 3/4] * adjust test --- tests/same_window.comlink.test.js | 27 ++-------- tests/type-checks.ts | 89 ++++++++++++++++++------------- tests/worker.comlink.test.js | 19 ------- 3 files changed, 55 insertions(+), 80 deletions(-) diff --git a/tests/same_window.comlink.test.js b/tests/same_window.comlink.test.js index 9fea1292..529820fe 100644 --- a/tests/same_window.comlink.test.js +++ b/tests/same_window.comlink.test.js @@ -552,30 +552,11 @@ describe("Comlink in the same realm", function () { port2.postMessage({ a: 1 }); }); - it("can tunnels a new endpoint with createEndpoint", async function () { - Comlink.expose( - { - a: 4, - c() { - return 5; - }, - }, - this.port2 - ); - const proxy = Comlink.wrap(this.port1); - const otherEp = await proxy[Comlink.createEndpoint](); - const otherProxy = Comlink.wrap(otherEp); - expect(await otherProxy.a).to.equal(4); - expect(await proxy.a).to.equal(4); - expect(await otherProxy.c()).to.equal(5); - expect(await proxy.c()).to.equal(5); - }); - it("released proxy should no longer be useable and throw an exception", async function () { const thing = Comlink.wrap(this.port1); Comlink.expose(SampleClass, this.port2); const instance = await new thing(); - await instance[Comlink.releaseProxy](); + await instance[Symbol.asyncDispose](); expect(() => instance.method()).to.throw(); }); @@ -584,7 +565,7 @@ describe("Comlink in the same realm", function () { Comlink.expose( { a: "thing", - [Comlink.finalizer]: () => { + [Symbol.dispose]: () => { finalized = true; }, }, @@ -592,7 +573,7 @@ describe("Comlink in the same realm", function () { ); const instance = Comlink.wrap(this.port1); expect(await instance.a).to.equal("thing"); - await instance[Comlink.releaseProxy](); + await instance[Symbol.asyncDispose](); // wait a beat to let the events process await new Promise((resolve) => setTimeout(resolve, 1)); expect(finalized).to.be.true; @@ -606,7 +587,7 @@ describe("Comlink in the same realm", function () { Comlink.expose( { a: "thing", - [Comlink.finalizer]: () => { + [Symbol.dispose]: () => { finalized = true; }, }, diff --git a/tests/type-checks.ts b/tests/type-checks.ts index 1a33cc55..dbf8da52 100644 --- a/tests/type-checks.ts +++ b/tests/type-checks.ts @@ -1,6 +1,7 @@ import { assert, Has, NotHas, IsAny, IsExact } from "conditional-type-checks"; import * as Comlink from "../src/comlink.js"; +import { RemoteController } from "../src/remote-controller.js"; async function closureSoICanUseAwait() { { @@ -8,7 +9,8 @@ async function closureSoICanUseAwait() { return 4; } - const proxy = Comlink.wrap(0 as any); + const ex = Comlink.expose(simpleNumberFunction); + const proxy = Comlink.wrap(0 as any); assert>(false); const v = proxy(); assert>>(true); @@ -19,7 +21,8 @@ async function closureSoICanUseAwait() { return { a: 3 }; } - const proxy = Comlink.wrap(0 as any); + const ex = Comlink.expose(simpleObjectFunction); + const proxy = Comlink.wrap(0 as any); const v = await proxy(); assert>(true); } @@ -29,7 +32,8 @@ async function closureSoICanUseAwait() { return { a: 3 }; } - const proxy = Comlink.wrap(0 as any); + const ex = Comlink.expose(simpleAsyncFunction); + const proxy = Comlink.wrap(0 as any); const v = await proxy(); assert>(true); } @@ -39,7 +43,8 @@ async function closureSoICanUseAwait() { return Comlink.proxy({ a: 3 }); } - const proxy = Comlink.wrap(0 as any); + const ex = Comlink.expose(functionWithProxy); + const proxy = Comlink.wrap(0 as any); const subproxy = await proxy(); const prop = subproxy.a; assert>>(true); @@ -57,7 +62,8 @@ async function closureSoICanUseAwait() { } } - const proxy = Comlink.wrap(0 as any); + const ex = Comlink.expose(X); + const proxy = Comlink.wrap(0 as any); assert Promise }>>(true); const instance = await new proxy(); assert Promise }>>(true); @@ -77,7 +83,8 @@ async function closureSoICanUseAwait() { }, }; - const proxy = Comlink.wrap(0 as any); + const ex = Comlink.expose(x); + const proxy = Comlink.wrap(0 as any); assert>(false); const a = proxy.a; assert>>(true); @@ -104,17 +111,17 @@ async function closureSoICanUseAwait() { class Foo { constructor(cParam: string) { const self = this; - assert>(true); + assert>>(true); } prop1: string = "abc"; proxyProp = Comlink.proxy(new Bar()); methodWithTupleParams(...args: [string] | [number, string]): number { return 123; } - methodWithProxiedReturnValue(): Baz & Comlink.ProxyMarked { + methodWithProxiedReturnValue(): Comlink.ProxyMarked { return Comlink.proxy({ baz: 123, method: () => 123 }); } - methodWithProxyParameter(param: Baz & Comlink.ProxyMarked): void {} + methodWithProxyParameter(param: Comlink.ProxyMarked): void {} } class Bar { @@ -126,11 +133,13 @@ async function closureSoICanUseAwait() { return Comlink.proxy({ baz: 123, method: () => 123 }); } } - const proxy = Comlink.wrap(Comlink.windowEndpoint(self)); + const ex = Comlink.expose(new Foo("")); + const proxy = Comlink.wrap(Comlink.windowEndpoint(self)); assert>>(true); - proxy[Comlink.releaseProxy](); - const endp = proxy[Comlink.createEndpoint](); + proxy[Symbol.asyncDispose](); + const controller = proxy[Comlink.proxyRemoteData].controller as RemoteController; + const endp = controller.createEndPoint(0); assert>>(true); assert>(false); @@ -143,7 +152,7 @@ async function closureSoICanUseAwait() { assert>>(true); assert< - IsExact> + IsExact & Promise>> >(true); assert>(false); @@ -163,7 +172,7 @@ async function closureSoICanUseAwait() { const r4 = proxy.methodWithProxiedReturnValue(); assert>(false); assert< - IsExact>> + IsExact>> >(true); const r5 = proxy.proxyProp.methodWithProxiedReturnValue(); @@ -179,13 +188,14 @@ async function closureSoICanUseAwait() { assert>(false); assert>>(true); - const ProxiedFooClass = Comlink.wrap( + const thingFoo = Comlink.expose(Foo); + const ProxiedFooClass = Comlink.wrap( Comlink.windowEndpoint(self) ); const inst1 = await new ProxiedFooClass("test"); assert>>(true); - inst1[Comlink.releaseProxy](); - inst1[Comlink.createEndpoint](); + inst1[Symbol.asyncDispose](); + inst1[Comlink.proxyRemoteData]; // @ts-expect-error await new ProxiedFooClass(123); @@ -223,10 +233,11 @@ async function closureSoICanUseAwait() { unsubscribe(): void; } /** A Subscribable that can get proxied by Comlink */ - interface ProxyableSubscribable extends Comlink.ProxyMarked { + interface ProxyableSubscribable { + [Comlink.proxyMarker]: true; subscribe( - subscriber: Comlink.Remote & Comlink.ProxyMarked> - ): Unsubscribable & Comlink.ProxyMarked; + subscriber: Comlink.Remote> + ): Comlink.ProxyMarked; } /** Simple parameter object that gets cloned (not proxied) */ @@ -237,8 +248,7 @@ async function closureSoICanUseAwait() { class Registry { async registerProvider( provider: Comlink.Remote< - ((params: Params) => ProxyableSubscribable) & - Comlink.ProxyMarked + (params: Params) => ProxyableSubscribable > ) { const resultPromise = provider({ textDocument: "foo" }); @@ -259,7 +269,7 @@ async function closureSoICanUseAwait() { assert< IsExact< typeof subscriptionPromise, - Promise> + Promise> > >(true); const subscriber = Comlink.proxy({ @@ -271,15 +281,16 @@ async function closureSoICanUseAwait() { assert>>(true); } } - const proxy2 = Comlink.wrap(Comlink.windowEndpoint(self)); + const exposedRegistry = Comlink.expose(new Registry()); + const proxy2 = Comlink.wrap(Comlink.windowEndpoint(self)); proxy2.registerProvider( // Synchronous callback Comlink.proxy(({ textDocument }: Params) => { const subscribable = Comlink.proxy({ subscribe( - subscriber: Comlink.Remote & Comlink.ProxyMarked> - ): Unsubscribable & Comlink.ProxyMarked { + subscriber: Comlink.Remote> + ): Comlink.ProxyMarked { // Important to test here is that union types (such as Function | undefined) distribute properly // when wrapped in Promises/proxied @@ -287,7 +298,9 @@ async function closureSoICanUseAwait() { assert< IsExact< typeof subscriber.closed, - Promise | Promise | Promise | undefined + (Promise | Promise | Promise) & { + [Comlink.opch]: Promise | Promise; + } > >(true); @@ -295,9 +308,9 @@ async function closureSoICanUseAwait() { assert< IsExact< typeof subscriber.next, - | Comlink.Remote<(value: string) => void> - | Promise - | undefined + (Comlink.Remote<(value: string) => void> | Promise) & { + [Comlink.opch]: Comlink.Remote<(value: string) => void>; + } > >(true); @@ -317,7 +330,7 @@ async function closureSoICanUseAwait() { return Comlink.proxy({ unsubscribe() {} }); }, }); - assert>(true); + assert>>(true); return subscribable; }) ); @@ -326,15 +339,15 @@ async function closureSoICanUseAwait() { Comlink.proxy(async ({ textDocument }: Params) => { const subscribable = Comlink.proxy({ subscribe( - subscriber: Comlink.Remote & Comlink.ProxyMarked> - ): Unsubscribable & Comlink.ProxyMarked { + subscriber: Comlink.Remote> + ): Comlink.ProxyMarked { assert>(false); assert< IsExact< typeof subscriber.next, - | Comlink.Remote<(value: string) => void> - | Promise - | undefined + (Comlink.Remote<(value: string) => void> | Promise) & { + [Comlink.opch]: Comlink.Remote<(value: string) => void>; + } > >(true); @@ -357,11 +370,11 @@ async function closureSoICanUseAwait() { assert>(true); return val instanceof URL; }, - serialize: (url) => { + serialize: async (url) => { assert>(true); return [url.href, []]; }, - deserialize: (str) => { + deserialize: async (str) => { assert>(true); return new URL(str); }, diff --git a/tests/worker.comlink.test.js b/tests/worker.comlink.test.js index 4c445706..debcf472 100644 --- a/tests/worker.comlink.test.js +++ b/tests/worker.comlink.test.js @@ -26,23 +26,4 @@ describe("Comlink across workers", function () { const proxy = Comlink.wrap(this.worker); expect(await proxy(1, 3)).to.equal(4); }); - - it("can tunnels a new endpoint with createEndpoint", async function () { - const proxy = Comlink.wrap(this.worker); - const otherEp = await proxy[Comlink.createEndpoint](); - const otherProxy = Comlink.wrap(otherEp); - expect(await otherProxy(20, 1)).to.equal(21); - }); - - it("releaseProxy closes MessagePort created by createEndpoint", async function () { - const proxy = Comlink.wrap(this.worker); - const otherEp = await proxy[Comlink.createEndpoint](); - const otherProxy = Comlink.wrap(otherEp); - expect(await otherProxy(20, 1)).to.equal(21); - - await new Promise((resolve) => { - otherEp.close = resolve; // Resolve the promise when the MessagePort is closed. - otherProxy[Comlink.releaseProxy](); // Release the proxy, which should close the MessagePort. - }); - }); }); From 78f04f11f7472d1c2a43eee40e7cf3c183d78de0 Mon Sep 17 00:00:00 2001 From: leoc11 Date: Sun, 19 Oct 2025 11:25:56 +0700 Subject: [PATCH 4/4] * adjust test --- tests/type-checks.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/type-checks.ts b/tests/type-checks.ts index dbf8da52..df4552fb 100644 --- a/tests/type-checks.ts +++ b/tests/type-checks.ts @@ -129,7 +129,7 @@ async function closureSoICanUseAwait() { method(param: string): number { return 123; } - methodWithProxiedReturnValue(): Baz & Comlink.ProxyMarked { + methodWithProxiedReturnValue(): Comlink.ProxyMarked { return Comlink.proxy({ baz: 123, method: () => 123 }); } } @@ -177,7 +177,7 @@ async function closureSoICanUseAwait() { const r5 = proxy.proxyProp.methodWithProxiedReturnValue(); assert< - IsExact>> + IsExact>> >(true); const r6 = (await proxy.methodWithProxiedReturnValue()).baz;