Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
672 changes: 24 additions & 648 deletions src/comlink.ts

Large diffs are not rendered by default.

453 changes: 453 additions & 0 deletions src/common.ts

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions src/consumer-pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
export type { PoolOptions } from "./remote-pool-controller";
import { ProxyMarked, RemotePool, UnProxyMarked } from "./common";
import { createRemote } from "./consumer";
import { Endpoint } from "./protocol";
import { PoolOptions, RemotePoolController } from "./remote-pool-controller";

export function pool<T extends ProxyMarked>(
spawn: () => Endpoint,
options?: PoolOptions
): RemotePool<UnProxyMarked<T>> {
const poolControler = new RemotePoolController(spawn, options);
return createRemote<UnProxyMarked<T>>(poolControler, 0) as any;
}
154 changes: 154 additions & 0 deletions src/consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/**
* @license
* Copyright 2019 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/

import {
ProxyMarked,
opch,
Remote,
UnProxyMarked,
IRemoteController,
proxyRemoteData,
ProxyRemoteData,
} from "./common";
import { Endpoint, ProxyID } from "./protocol";
import { RemoteController } from "./remote-controller";

interface TypedFinalizationRegistry<TW extends WeakKey, T>
extends FinalizationRegistry<T> {
new (cleanupCallback: (heldValue: T) => void): TypedFinalizationRegistry<
TW,
T
>;
register(target: TW, heldValue: T, unregisterToken?: TW): void;
unregister(unregisterToken: TW): boolean;
}

export const remoteFinalizers = !("FinalizationRegistry" in globalThis)
? undefined
: new (FinalizationRegistry as TypedFinalizationRegistry<
Remote,
[IRemoteController, ProxyID]
>)(([ep, pid]) => setTimeout(() => ep.unregister(pid)));
const controllerMap = new Map<Endpoint, RemoteController>();
const proxyTarget = function () {} as object;

function throwIfRemoteReleased(isReleased: boolean) {
if (isReleased) {
throw new Error("Proxy has been released and is not useable");
}
}

export function wrap<T extends ProxyMarked>(
ep: Endpoint
): Remote<UnProxyMarked<T>> {
const controller = getRemoteController(ep);
return createRemote<UnProxyMarked<T>>(controller, 0);
}

export function getRemoteController(ep: Endpoint, isAutoCreate = true) {
let controller = controllerMap.get(ep)!;
if (isAutoCreate && !controller) {
controller = new RemoteController(ep);
controllerMap.set(ep, controller);
}
return controller;
}

export function createRemote<T>(
controller: IRemoteController,
pid: ProxyID,
path: (string | number | symbol)[] = []
): Remote<T> {
const data: ProxyRemoteData = {
controller,
pid,
path,
};
let isProxyReleased = false;
const proxy = new Proxy(proxyTarget, {
get(_target, prop) {
throwIfRemoteReleased(isProxyReleased);
if (prop === Symbol.asyncDispose) {
return async () => {
if (remoteFinalizers) {
remoteFinalizers.unregister(proxy);
}
try {
await controller.unregister(pid);
} finally {
isProxyReleased = true;
}
};
}
if (prop === proxyRemoteData) {
return data;
}
if (prop === "then") {
if (path.length > 0 && path[path.length - 1] === opch) {
path = path.slice(0, -1);
}
if (path.length === 0) {
return { then: () => proxy };
}

const r = controller.get(pid, path);
return r.then.bind(r);
}
return createRemote(controller, pid, [...path, prop]);
},
set(_target, prop, rawValue) {
throwIfRemoteReleased(isProxyReleased);
// FIXME: ES6 Proxy Handler `set` methods are supposed to return a
// boolean. To show good will, we return true asynchronously ¯\_(ツ)_/¯

return controller.set(pid, [...path, prop], rawValue) as any;
},
apply(_target, _thisArg, rawArgumentList) {
throwIfRemoteReleased(isProxyReleased);
const last = path[path.length - 1];
// We just pretend that `bind()` didn’t happen.
if (last === "bind") {
return createRemote(controller, pid, path.slice(0, -1));
}
if (last === Symbol.asyncIterator) {
let remoteIterator: Remote<AsyncIterator<any, any>> | undefined;
const getIterator = async () => {
if (!remoteIterator) {
remoteIterator = await controller.apply(pid, path, []);
}
return remoteIterator!;
};

return {
async next(args) {
const obj = await getIterator();
return obj.next(args);
},
async return(value?: any) {
const obj = await getIterator();
return obj.return[opch](value);
},
async throw(e?: any) {
const obj = await getIterator();
return obj.throw[opch](e);
},
} as AsyncIterator<any, any, any>;
}

return controller.apply(pid, path, rawArgumentList);
},
construct(_target, rawArgumentList) {
throwIfRemoteReleased(isProxyReleased);
return controller.construct(pid, path, rawArgumentList);
},
}) as Remote;

controller.register(proxy, pid, path.length === 0);
if (remoteFinalizers) {
remoteFinalizers.register(proxy, [controller, pid], proxy);
}
return proxy as Remote<T>;
}
12 changes: 6 additions & 6 deletions src/node-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,29 @@ export interface NodeEndpoint {
start?: () => void;
}

export default function nodeEndpoint(nep: NodeEndpoint): Endpoint {
export function nodeEndpoint(nep: NodeEndpoint): Endpoint {
const listeners = new WeakMap();
return {
postMessage: nep.postMessage.bind(nep),
addEventListener: (_, eh) => {
addEventListener: (type, eh) => {
const l = (data: any) => {
if ("handleEvent" in eh) {
eh.handleEvent({ data } as MessageEvent);
} else {
eh({ data } as MessageEvent);
}
};
nep.on("message", l);
nep.on(type, l);
listeners.set(eh, l);
},
removeEventListener: (_, eh) => {
removeEventListener: (type, eh) => {
const l = listeners.get(eh);
if (!l) {
return;
}
nep.off("message", l);
nep.off(type, l);
listeners.delete(eh);
},
start: nep.start && nep.start.bind(nep),
start: nep.start?.bind(nep),
};
}
73 changes: 51 additions & 22 deletions src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,94 +18,123 @@ export interface EventSource {
): void;
}

export interface PostMessageWithOrigin {
postMessage(
message: any,
targetOrigin: string,
transfer?: Transferable[]
): void;
}

export interface Endpoint extends EventSource {
postMessage(message: any, transfer?: Transferable[]): void;

start?: () => void;
}

export const enum WireValueType {
RAW = "RAW",
PROXY = "PROXY",
THROW = "THROW",
HANDLER = "HANDLER",
RAW = -1,
HANDLER = -2,
}

export interface RawWireValue {
id?: string;
id?: number;
type: WireValueType.RAW;
value: {};
}

export interface HandlerWireValue {
id?: string;
id?: number;
type: WireValueType.HANDLER;
name: string;
value: unknown;
}

export type WireValue = RawWireValue | HandlerWireValue;

export type MessageID = string;
export type MessageID = number;
export type ProxyID = number;
export type ThreadID = number;

export const enum MessageType {
GET = "GET",
SET = "SET",
APPLY = "APPLY",
CONSTRUCT = "CONSTRUCT",
ENDPOINT = "ENDPOINT",
RELEASE = "RELEASE",
READY = 0,
EXCHANGETID = 1,
MERGE = 2,
GET = 3,
SET = 4,
APPLY = 5,
CONSTRUCT = 6,
ENDPOINT = 7,
HEARTBEAT = 8,
RELEASE = 9,
}

export interface PingMessage {
id?: MessageType.READY;
type: MessageType.READY;
}

export interface HeartBeatMessage {
id?: MessageID;
type: MessageType.HEARTBEAT;
lock?: string;
}

export interface GetMessage {
id?: MessageID;
type: MessageType.GET;
pid: ProxyID;
path: string[];
}

export interface SetMessage {
id?: MessageID;
type: MessageType.SET;
pid: ProxyID;
path: string[];
value: WireValue;
}

export interface ApplyMessage {
id?: MessageID;
type: MessageType.APPLY;
pid: ProxyID;
path: string[];
argumentList: WireValue[];
}

export interface ConstructMessage {
id?: MessageID;
type: MessageType.CONSTRUCT;
pid: ProxyID;
path: string[];
argumentList: WireValue[];
}

export interface EndpointMessage {
id?: MessageID;
type: MessageType.ENDPOINT;
pid?: ProxyID;
}

export interface ReleaseMessage {
id?: MessageID;
type: MessageType.RELEASE;
pid: ProxyID;
}

export interface ShareIdMessage {
id?: MessageID;
type: MessageType.EXCHANGETID;
tid: ThreadID;
}

export interface MergeMessage {
id?: MessageID;
type: MessageType.MERGE;
ep: MessagePort;
tid: ThreadID;
}

export type Message =
| PingMessage
| MergeMessage
| ShareIdMessage
| GetMessage
| SetMessage
| ApplyMessage
| ConstructMessage
| EndpointMessage
| HeartBeatMessage
| ReleaseMessage;
Loading