From 32f0048587fb9a0203cba36e324d79b3df079ebb Mon Sep 17 00:00:00 2001 From: Ingo Fischer Date: Thu, 22 Jan 2026 09:27:25 +0100 Subject: [PATCH 1/2] Add Invoke batching invokes are now batched (when device supports it and non-root-endopoint and not groups) within the same macro-task --- CHANGELOG.md | 5 + packages/node/src/node/ClientNode.ts | 4 +- .../src/node/client/ClientCommandMethod.ts | 56 ----- .../src/node/client/ClientGroupInteraction.ts | 10 + .../src/node/client/ClientNodeInteraction.ts | 43 +++- packages/node/src/node/client/PeerBehavior.ts | 2 +- .../client/commands/ClientCommandMethod.ts | 35 +++ .../node/client/commands/CommandBatcher.ts | 238 ++++++++++++++++++ .../node/client/commands/CommandInvoker.ts | 93 +++++++ .../GroupKeyManagementServerTest.ts | 1 + .../ScenesManagementServerTest.ts | 58 +++-- packages/node/test/node/ClientNodeTest.ts | 7 +- packages/node/test/node/CommandBatcherTest.ts | 199 +++++++++++++++ 13 files changed, 662 insertions(+), 89 deletions(-) delete mode 100644 packages/node/src/node/client/ClientCommandMethod.ts create mode 100644 packages/node/src/node/client/commands/ClientCommandMethod.ts create mode 100644 packages/node/src/node/client/commands/CommandBatcher.ts create mode 100644 packages/node/src/node/client/commands/CommandInvoker.ts create mode 100644 packages/node/test/node/CommandBatcherTest.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 79a6ab7de6..5aa2ad87fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,11 @@ The main work (all changes without a GitHub username in brackets in the below li ## __WORK IN PROGRESS__ --> +## __WORK IN PROGRESS__ + +- @matter/node + - Enhancement: Added automatic Command batching for non-root-endpoint commands when a node supports it and commands come in within the same macro-tick + ## 0.16.6 (2026-01-22) - @matter/general diff --git a/packages/node/src/node/ClientNode.ts b/packages/node/src/node/ClientNode.ts index e226adc5df..7e247a1252 100644 --- a/packages/node/src/node/ClientNode.ts +++ b/packages/node/src/node/ClientNode.ts @@ -200,7 +200,9 @@ export class ClientNode extends Node { return new ClientNetworkRuntime(this); } - async prepareRuntimeShutdown() {} + async prepareRuntimeShutdown() { + await this.#interaction?.close(); + } protected override get container() { return this.owner?.peers; diff --git a/packages/node/src/node/client/ClientCommandMethod.ts b/packages/node/src/node/client/ClientCommandMethod.ts deleted file mode 100644 index bcadc98b24..0000000000 --- a/packages/node/src/node/client/ClientCommandMethod.ts +++ /dev/null @@ -1,56 +0,0 @@ -/** - * @license - * Copyright 2022-2026 Matter.js Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import { ClusterBehavior } from "#behavior/cluster/ClusterBehavior.js"; -import type { ClientNode } from "#node/ClientNode.js"; -import { Node } from "#node/Node.js"; -import { ClientInteraction, Invoke } from "#protocol"; -import { Status, StatusResponseError } from "#types"; - -/** - * Create the command method for a client behavior. - */ -export function ClientCommandMethod(name: string) { - // This is our usual hack to give a function a proper name in stack traces - const temp = { - // The actual implementation - async [name](this: ClusterBehavior, fields?: {}) { - const node = this.env.get(Node) as ClientNode; - - // TODO when implementing TCP add needed logic for Large messages - const chunks = (node.interaction as ClientInteraction).invoke( - Invoke({ - commands: [ - Invoke.ConcreteCommandRequest({ - endpoint: this.endpoint, - cluster: this.cluster, - command: name, - fields, - }), - ], - }), - ); - - for await (const chunk of chunks) { - for (const entry of chunk) { - // We send only one command, so we only get one response back - switch (entry.kind) { - case "cmd-status": - if (entry.status !== Status.Success) { - throw StatusResponseError.create(entry.status, undefined, entry.clusterStatus); - } - return; - - case "cmd-response": - return entry.data; - } - } - } - }, - }; - - return temp[name]; -} diff --git a/packages/node/src/node/client/ClientGroupInteraction.ts b/packages/node/src/node/client/ClientGroupInteraction.ts index e4abd0ed14..c177478fb0 100644 --- a/packages/node/src/node/client/ClientGroupInteraction.ts +++ b/packages/node/src/node/client/ClientGroupInteraction.ts @@ -17,10 +17,20 @@ import { WriteResult, } from "#protocol"; import { ClientNodeInteraction } from "./ClientNodeInteraction.js"; +import { CommandInvoker } from "./commands/CommandInvoker.js"; export class InvalidGroupOperationError extends ImplementationError {} export class ClientGroupInteraction extends ClientNodeInteraction { + /** + * Groups use a plain {@link CommandInvoker} without batching. + * Group invokes always use suppressResponse and don't have device details + * like maxPathsPerInvoke, so batching is not applicable. + */ + protected override createInvoker() { + return new CommandInvoker(this.node); + } + /** Groups do not support reading or subscribing to attributes */ override read(_request: Read, _context?: ActionContext): ReadResult { throw new InvalidGroupOperationError("Groups do not support reading attributes"); diff --git a/packages/node/src/node/client/ClientNodeInteraction.ts b/packages/node/src/node/client/ClientNodeInteraction.ts index 0d4d205614..281c8fd843 100644 --- a/packages/node/src/node/client/ClientNodeInteraction.ts +++ b/packages/node/src/node/client/ClientNodeInteraction.ts @@ -6,6 +6,7 @@ import type { ActionContext } from "#behavior/context/ActionContext.js"; import { EndpointInitializer } from "#endpoint/properties/EndpointInitializer.js"; +import type { CommandInvoker } from "#node/client/commands/CommandInvoker.js"; import type { ClientNode } from "#node/ClientNode.js"; import { NodePhysicalProperties } from "#node/NodePhysicalProperties.js"; import { @@ -27,18 +28,58 @@ import { } from "#protocol"; import { EndpointNumber } from "#types"; import { ClientEndpointInitializer } from "./ClientEndpointInitializer.js"; +import { CommandBatcher } from "./commands/CommandBatcher.js"; /** * A {@link ClientInteraction} that brings the node online before attempting interaction. */ export class ClientNodeInteraction implements Interactable { - #node: ClientNode; + readonly #node: ClientNode; #physicalProps?: PhysicalDeviceProperties; + #invoker?: CommandInvoker; constructor(node: ClientNode) { this.#node = node; } + /** + * The node this interaction is associated with. + */ + protected get node(): ClientNode { + return this.#node; + } + + /** + * Command invoker for this interaction. + * + * For regular client nodes, returns a {@link CommandBatcher} that collects commands + * invoked within the same timer tick and sends them as a single batched invoke-request. + * + * Override in subclasses to provide different invoker behavior (e.g., groups use plain + * {@link CommandInvoker} without batching). + */ + get invoker() { + if (this.#invoker === undefined) { + this.#invoker = this.createInvoker(); + } + return this.#invoker; + } + + /** + * Create the command invoker for this interaction. + * Override in subclasses to provide different invoker types. + */ + protected createInvoker(): CommandInvoker { + return new CommandBatcher(this.#node); + } + + /** + * Close the interaction and release resources. + */ + async close() { + await this.#invoker?.close(); + } + /** * The current session used for interaction with the node if any session is established, otherwise undefined. */ diff --git a/packages/node/src/node/client/PeerBehavior.ts b/packages/node/src/node/client/PeerBehavior.ts index 52509af5e3..f131c05d9d 100644 --- a/packages/node/src/node/client/PeerBehavior.ts +++ b/packages/node/src/node/client/PeerBehavior.ts @@ -32,7 +32,7 @@ import { UnknownAttribute, UnknownCommand, } from "#types"; -import { ClientCommandMethod } from "./ClientCommandMethod.js"; +import { ClientCommandMethod } from "./commands/ClientCommandMethod.js"; const BIT_BLOCK_SIZE = Math.log2(Number.MAX_SAFE_INTEGER); diff --git a/packages/node/src/node/client/commands/ClientCommandMethod.ts b/packages/node/src/node/client/commands/ClientCommandMethod.ts new file mode 100644 index 0000000000..b3030c0637 --- /dev/null +++ b/packages/node/src/node/client/commands/ClientCommandMethod.ts @@ -0,0 +1,35 @@ +/** + * @license + * Copyright 2022-2026 Matter.js Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { ClusterBehavior } from "#behavior/cluster/ClusterBehavior.js"; +import type { ClientNode } from "#node/ClientNode.js"; +import { Node } from "#node/Node.js"; +import type { ClientNodeInteraction } from "../ClientNodeInteraction.js"; + +/** + * Create the command method for a client behavior. + * + * Commands are batched automatically - multiple commands invoked within the same timer tick + * are sent as a single batched invoke request for efficiency. + */ +export function ClientCommandMethod(name: string) { + // This is our usual hack to give a function a proper name in stack traces + const temp = { + // The actual implementation + async [name](this: ClusterBehavior, fields?: {}) { + const node = this.env.get(Node) as ClientNode; + + return (node.interaction as ClientNodeInteraction).invoker.invoke({ + endpoint: this.endpoint, + cluster: this.cluster, + command: name, + fields, + }); + }, + }; + + return temp[name]; +} diff --git a/packages/node/src/node/client/commands/CommandBatcher.ts b/packages/node/src/node/client/commands/CommandBatcher.ts new file mode 100644 index 0000000000..e787a0aed6 --- /dev/null +++ b/packages/node/src/node/client/commands/CommandBatcher.ts @@ -0,0 +1,238 @@ +/** + * @license + * Copyright 2022-2026 Matter.js Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { ActionContext } from "#behavior/context/ActionContext.js"; +import { BasicInformationClient } from "#behaviors/basic-information"; +import { createPromise, ImplementationError, Instant, Logger, Mutex, Time, Timer } from "#general"; +import { ClientInteraction, DecodedInvokeResult, Invoke, QueuedClientInteraction } from "#protocol"; +import type { ClientNode } from "../../ClientNode.js"; +import { NodePhysicalProperties } from "../../NodePhysicalProperties.js"; +import { CommandInvoker, InvokableCommand } from "./CommandInvoker.js"; + +const logger = Logger.get("CommandBatcher"); + +/** Maximum value for commandRef (uint16) */ +const MAX_COMMAND_REF = 0xffff; + +interface PendingCommand { + request: Invoke.ConcreteCommandRequest; + resolve: (data: any) => void; + reject: (error: Error) => void; + context?: ActionContext; +} + +/** + * Batches commands invoked within the same timer tick into a single invoke request. + * + * Commands invoked "near together" (within the same 0ms timer callback) are collected and + * sent as a batched-invoke with unique `commandRef` values for response correlation. + * + * Extends {@link CommandInvoker} to add batching capability on top of single command invocation. + */ +export class CommandBatcher extends CommandInvoker { + readonly #pendingCommands = new Map(); + readonly #mutex: Mutex; + readonly #flushTimer: Timer; + #nextCommandRef = 1; + #closed = false; + #supportsMultipleInvokes?: boolean; + + constructor(node: ClientNode) { + super(node); + this.#mutex = new Mutex(this); + this.#flushTimer = Time.getTimer("command-batcher", Instant, () => this.#flush()); + + // Clear cached maxPathsPerInvoke when the node goes offline + node.lifecycle.offline.on(() => { + this.#supportsMultipleInvokes = undefined; + }); + } + + /** + * Check if batching is enabled based on maxPathsPerInvoke value. + * Returns false if the value is not available or equals 1 (no batching support). + */ + get #enabled(): boolean { + if (this.#supportsMultipleInvokes === undefined) { + this.#supportsMultipleInvokes = + (this.node.maybeStateOf(BasicInformationClient)?.maxPathsPerInvoke ?? 1) > 1; + } + return this.#supportsMultipleInvokes; + } + + /** + * Queue a command for batched execution. + * Returns a promise that resolves when the command completes with its response data. + * + * Commands bypass batching and execute immediately when: + * - Target is endpoint 0 (root endpoint, typically administrative/commissioning operations) + * - Device only supports maxPathsPerInvoke=1 (no batching capability) + */ + override async invoke(request: InvokableCommand, context?: ActionContext): Promise { + if (this.#closed) { + throw new ImplementationError("CommandBatcher is closed"); + } + + // Bypass batching for endpoint 0 (root endpoint) - these are typically + // administrative/commissioning commands that should execute immediately + // Also bypass when multiple invokes are not supported + const endpointId = + typeof request.endpoint === "number" ? request.endpoint : (request.endpoint as { number?: number })?.number; + if (!endpointId || !this.#enabled) { + return this.executeImmediate(request, context); + } + + const commandRef = this.#allocateCommandRef(); + const { promise, resolver, rejecter } = createPromise(); + + this.#pendingCommands.set(commandRef, { + request: { ...request, commandRef } as Invoke.ConcreteCommandRequest, + resolve: resolver, + reject: rejecter, + context, + }); + + this.#scheduleFlush(); + + return promise; + } + + /** + * Allocate a unique commandRef, wrapping around at uint16 max. + */ + #allocateCommandRef(): number { + const ref = this.#nextCommandRef; + this.#nextCommandRef = this.#nextCommandRef >= MAX_COMMAND_REF ? 1 : this.#nextCommandRef + 1; + + // Ensure no collision with pending commands (very unlikely but possible after wrap) + if (this.#pendingCommands.has(ref)) { + return this.#allocateCommandRef(); + } + + return ref; + } + + #scheduleFlush() { + if (!this.#flushTimer.isRunning) { + this.#flushTimer.start(); + } + } + + async #flush() { + if (this.#pendingCommands.size === 0) { + return; + } + + // Snapshot current commands and clear for next batch + const commands = new Map(this.#pendingCommands); + this.#pendingCommands.clear(); + + // Run flush within the mutex to ensure proper sequencing + await this.#mutex.produce(async () => { + await this.#executeBatch(commands); + }); + } + + async #executeBatch(commands: Map) { + try { + const client = await this.#connect(); + + const commandList = [...commands.values()]; + + // Use context from first command (they should all be from the same tick anyway) + const context = commandList[0]?.context; + + // For single commands, don't include commandRef (optimization) + const isSingleCommand = commandList.length === 1; + const invokeRequests = isSingleCommand + ? [{ ...commandList[0].request, commandRef: undefined }] + : commandList.map(c => c.request); + + logger.debug(`Executing ${invokeRequests.length} command(s)${isSingleCommand ? "" : " (batched)"}`); + + const chunks: DecodedInvokeResult = client.invoke(Invoke({ commands: invokeRequests }), context); + + // Process responses and route to correct callers + for await (const chunk of chunks) { + for (const entry of chunk) { + let pending: PendingCommand | undefined; + + if (isSingleCommand) { + // Single command - take the only pending command + pending = commandList[0]; + commands.clear(); + } else { + // Batched - match by commandRef + pending = commands.get(entry.commandRef!); + if (!pending) { + logger.warn(`Received response for unknown commandRef ${entry.commandRef}`); + continue; + } + commands.delete(entry.commandRef!); + } + + this.#resolvePending(pending, entry); + } + } + + // Resolve any remaining commands with undefined (valid for suppressResponse) + for (const [, pending] of commands) { + pending.resolve(undefined); + } + } catch (error) { + // If the entire batch fails, reject all pending commands + for (const [, pending] of commands) { + pending.reject(error as Error); + } + } + } + + /** + * Resolve a pending command with its response entry. + */ + #resolvePending(pending: PendingCommand, entry: Parameters[0]) { + try { + pending.resolve(this.resolveEntry(entry)); + } catch (error) { + pending.reject(error as Error); + } + } + + /** + * Connect to the device, respecting thread queue behavior. + */ + async #connect(): Promise { + if (!this.node.lifecycle.isOnline) { + await this.node.start(); + } + + const props = NodePhysicalProperties(this.node); + + // When we have a thread device or don't know anything yet, use the queue + return props.threadConnected || !props.rootEndpointServerList.length + ? this.node.env.get(QueuedClientInteraction) + : this.node.env.get(ClientInteraction); + } + + /** + * Close the batcher and wait for pending commands to complete. + */ + override async close() { + this.#closed = true; + this.#flushTimer.stop(); + + // Reject any remaining pending commands + for (const [, pending] of this.#pendingCommands) { + pending.reject(new ImplementationError("CommandBatcher closed")); + } + this.#pendingCommands.clear(); + + // Wait for any in-flight batch to complete + await this.#mutex.close(); + + await super.close(); + } +} diff --git a/packages/node/src/node/client/commands/CommandInvoker.ts b/packages/node/src/node/client/commands/CommandInvoker.ts new file mode 100644 index 0000000000..f9a3fd00a7 --- /dev/null +++ b/packages/node/src/node/client/commands/CommandInvoker.ts @@ -0,0 +1,93 @@ +/** + * @license + * Copyright 2022-2026 Matter.js Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { ActionContext } from "#behavior/context/ActionContext.js"; +import { Invoke, InvokeResult } from "#protocol"; +import { Status, StatusResponseError } from "#types"; +import type { ClientNode } from "../../ClientNode.js"; + +/** + * Input for a command to be invoked. + * This is the same as `Invoke.ConcreteCommandRequest` but without `commandRef` + * since the batcher assigns it automatically when batching. + */ +export interface InvokableCommand { + endpoint: unknown; + cluster: unknown; + command: string | unknown; + fields?: unknown; +} + +/** + * Base class for command invocation. + * Handles single command invokes via the node's interaction interface. + * + * This class is used directly for groups (which don't support batching) + * and extended by {@link CommandBatcher} for nodes that support batched invokes. + */ +export class CommandInvoker { + readonly #node: ClientNode; + + constructor(node: ClientNode) { + this.#node = node; + } + + protected get node(): ClientNode { + return this.#node; + } + + /** + * Invoke a command on the node. + * Returns a promise that resolves when the command completes with its response data. + */ + async invoke(request: InvokableCommand, context?: ActionContext): Promise { + return this.executeImmediate(request, context); + } + + /** + * Execute a single command immediately via the node's interaction. + * Uses node.interaction to ensure proper handling for both regular nodes and groups. + */ + protected async executeImmediate(request: InvokableCommand, context?: ActionContext): Promise { + const chunks = this.#node.interaction.invoke( + Invoke({ commands: [request as Invoke.ConcreteCommandRequest] }), + context, + ); + + for await (const chunk of chunks) { + for (const entry of chunk) { + return this.resolveEntry(entry) as T; + } + } + + // No response received - this is valid for suppressResponse commands + return undefined as T; + } + + /** + * Resolve a single invoke response entry to its result value. + * Throws StatusResponseError for error statuses. + */ + protected resolveEntry(entry: InvokeResult.DecodedCommandResponse | InvokeResult.CommandStatus): unknown { + switch (entry.kind) { + case "cmd-status": + if (entry.status !== Status.Success) { + throw StatusResponseError.create(entry.status, undefined, entry.clusterStatus); + } + return undefined; + + case "cmd-response": + return entry.data; + } + } + + /** + * Close the invoker. Override in subclasses if cleanup is needed. + */ + async close() { + // Base class has nothing to clean up + } +} diff --git a/packages/node/test/behaviors/group-key-management/GroupKeyManagementServerTest.ts b/packages/node/test/behaviors/group-key-management/GroupKeyManagementServerTest.ts index a128b255bf..27e2d320ad 100644 --- a/packages/node/test/behaviors/group-key-management/GroupKeyManagementServerTest.ts +++ b/packages/node/test/behaviors/group-key-management/GroupKeyManagementServerTest.ts @@ -35,6 +35,7 @@ describe("GroupKeyManagementServer", () => { const cmds = peer1.commandsOf(GroupKeyManagementClient); + // Commands to endpoint 0 (root) bypass batching, so no MockTime.resolve() needed await cmds.keySetWrite({ groupKeySet: { groupKeySetId: 1, diff --git a/packages/node/test/behaviors/scenes-management/ScenesManagementServerTest.ts b/packages/node/test/behaviors/scenes-management/ScenesManagementServerTest.ts index 68b70d8cdc..5437d31b6d 100644 --- a/packages/node/test/behaviors/scenes-management/ScenesManagementServerTest.ts +++ b/packages/node/test/behaviors/scenes-management/ScenesManagementServerTest.ts @@ -34,44 +34,48 @@ describe("ScenesManagementServer", () => { expect(peer1).not.undefined; const onoff = peer1.endpoints.for(EndpointNumber(1)); - // Ensure off - await onoff.commandsOf(OnOffClient).off(); + // Ensure off - commands are batched via a timer, so we need MockTime.resolve() to advance time + await MockTime.resolve(onoff.commandsOf(OnOffClient).off()); const cmds = onoff.commandsOf(ScenesManagementClient); expect( - await cmds.addScene({ - groupId: GroupId(0), - sceneId: 1, - transitionTime: 1000, - sceneName: "Scene1", - extensionFieldSetStructs: [ - { - clusterId: ClusterId(6), - attributeValueList: [{ attributeId: AttributeId(0), valueUnsigned8: 1 }], - }, - ], - }), + await MockTime.resolve( + cmds.addScene({ + groupId: GroupId(0), + sceneId: 1, + transitionTime: 1000, + sceneName: "Scene1", + extensionFieldSetStructs: [ + { + clusterId: ClusterId(6), + attributeValueList: [{ attributeId: AttributeId(0), valueUnsigned8: 1 }], + }, + ], + }), + ), ).deep.equals({ status: 0, groupId: GroupId(0), sceneId: 1 }); expect( - await cmds.addScene({ - groupId: GroupId(0), - sceneId: 2, - transitionTime: 60000000, - sceneName: "Scene2", - extensionFieldSetStructs: [ - { - clusterId: ClusterId(6), - attributeValueList: [{ attributeId: AttributeId(0), valueUnsigned8: 1 }], - }, - ], - }), + await MockTime.resolve( + cmds.addScene({ + groupId: GroupId(0), + sceneId: 2, + transitionTime: 60000000, + sceneName: "Scene2", + extensionFieldSetStructs: [ + { + clusterId: ClusterId(6), + attributeValueList: [{ attributeId: AttributeId(0), valueUnsigned8: 1 }], + }, + ], + }), + ), ).deep.equals({ status: 0, groupId: GroupId(0), sceneId: 2 }); const waiter = MockTime.resolve(device.endpoints.for(1).eventsOf(OnOffServer).onOff$Changed); - await cmds.recallScene({ groupId: GroupId(0), sceneId: 1 }); + await MockTime.resolve(cmds.recallScene({ groupId: GroupId(0), sceneId: 1 })); await MockTime.advance(1500); diff --git a/packages/node/test/node/ClientNodeTest.ts b/packages/node/test/node/ClientNodeTest.ts index f64c1d2447..739aabd635 100644 --- a/packages/node/test/node/ClientNodeTest.ts +++ b/packages/node/test/node/ClientNodeTest.ts @@ -182,15 +182,16 @@ describe("ClientNode", () => { const receivedUpdate = new Promise(resolve => ep1.eventsOf(OnOffClient).onOff$Changed.on(resolve)); // *** INVOCATION *** + // Commands are batched via a timer, so we need MockTime.resolve() to advance time - await ep1.commandsOf(OnOffClient).toggle(); + await MockTime.resolve(ep1.commandsOf(OnOffClient).toggle()); // *** UPDATE *** await MockTime.resolve(receivedUpdate); // *** Test another command also in the feature-set *** - await ep1.commandsOf(OnOffClient).offWithEffect({ effectIdentifier: 0, effectVariant: 0 }); + await MockTime.resolve(ep1.commandsOf(OnOffClient).offWithEffect({ effectIdentifier: 0, effectVariant: 0 })); }); it("decommissions", async () => { @@ -336,7 +337,7 @@ describe("ClientNode", () => { ep1.eventsOf(OnOffClient).onOff$Changed.once(resolve); }); - await ep1.commandsOf(OnOffClient).toggle(); + await MockTime.resolve(ep1.commandsOf(OnOffClient).toggle()); await MockTime.resolve(toggled); diff --git a/packages/node/test/node/CommandBatcherTest.ts b/packages/node/test/node/CommandBatcherTest.ts new file mode 100644 index 0000000000..f610124d63 --- /dev/null +++ b/packages/node/test/node/CommandBatcherTest.ts @@ -0,0 +1,199 @@ +/** + * @license + * Copyright 2022-2026 Matter.js Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { OnOffClient, OnOffServer } from "#behaviors/on-off"; +import { ClientNodeInteraction } from "#node/client/ClientNodeInteraction.js"; +import { ServerNode } from "#node/ServerNode.js"; +import { MockSite } from "./mock-site.js"; + +describe("CommandBatcher", () => { + before(() => { + MockTime.init(); + + // Required for crypto to succeed + MockTime.macrotasks = true; + }); + + it("executes commands via the batcher", async () => { + await using site = new MockSite(); + // Enable batching with maxPathsPerInvoke=10 + const { controller, device } = await site.addCommissionedPair({ + device: { + type: ServerNode.RootEndpoint, + basicInformation: { + maxPathsPerInvoke: 10, + }, + }, + }); + + const peer1 = controller.peers.get("peer1")!; + expect(peer1).not.undefined; + + const ep1 = peer1.endpoints.for(1); + const cmds = ep1.commandsOf(OnOffClient); + + // Get initial state + const initialState = device.parts.get(1)!.stateOf(OnOffServer).onOff; + + // Execute a command via the batcher + await MockTime.resolve(cmds.toggle()); + + // State should be toggled + const finalState = device.parts.get(1)!.stateOf(OnOffServer).onOff; + expect(finalState).equals(!initialState); + }); + + it("requires MockTime.resolve for batched commands (non-root endpoints)", async () => { + await using site = new MockSite(); + // Enable batching with maxPathsPerInvoke=10 + const { controller } = await site.addCommissionedPair({ + device: { + type: ServerNode.RootEndpoint, + basicInformation: { + maxPathsPerInvoke: 10, + }, + }, + }); + + const peer1 = controller.peers.get("peer1")!; + expect(peer1).not.undefined; + + // Commands to non-root endpoints require batching + const ep1 = peer1.endpoints.for(1); + const cmds = ep1.commandsOf(OnOffClient); + + // Start a command but don't resolve the timer yet + const pendingPromise = cmds.toggle(); + + // The command should be pending in the batcher + let resolved = false; + // eslint-disable-next-line @typescript-eslint/no-floating-promises + pendingPromise.then(() => (resolved = true)); + + // Give microtasks a chance to run + await Promise.resolve(); + expect(resolved).equals(false); + + // Now resolve with MockTime + await MockTime.resolve(pendingPromise); + expect(resolved).equals(true); + }); + + it("bypasses batching when maxPathsPerInvoke is 1", async () => { + await using site = new MockSite(); + // Default device has maxPathsPerInvoke=1 (no batching) + const { controller } = await site.addCommissionedPair(); + + const peer1 = controller.peers.get("peer1")!; + expect(peer1).not.undefined; + + const ep1 = peer1.endpoints.for(1); + const cmds = ep1.commandsOf(OnOffClient); + + // With maxPathsPerInvoke=1, commands bypass batching but still need MockTime + // for the underlying async operations + await MockTime.resolve(cmds.toggle()); + }); + + it("clears maxPathsPerInvoke cache when node goes offline", async () => { + await using site = new MockSite(); + // Enable batching with maxPathsPerInvoke=10 + const { controller, device } = await site.addCommissionedPair({ + device: { + type: ServerNode.RootEndpoint, + basicInformation: { + maxPathsPerInvoke: 10, + }, + }, + }); + + const peer1 = controller.peers.get("peer1")!; + expect(peer1).not.undefined; + + const ep1 = peer1.endpoints.for(1); + const cmds = ep1.commandsOf(OnOffClient); + + // First command - should use batching (maxPathsPerInvoke=10) + await MockTime.resolve(cmds.toggle()); + + // Take the device offline + await MockTime.resolve(device.cancel()); + + // Bring it back online with different maxPathsPerInvoke + await device.act(agent => { + agent.basicInformation.state.maxPathsPerInvoke = 1; + }); + await MockTime.resolve(device.start()); + + // The peer should have reconnected and the cache should be cleared + // Next command should see the new maxPathsPerInvoke=1 + await MockTime.resolve(cmds.toggle()); + }); + + it("executes multiple commands sequentially", async () => { + await using site = new MockSite(); + // Enable batching with maxPathsPerInvoke=10 + const { controller, device } = await site.addCommissionedPair({ + device: { + type: ServerNode.RootEndpoint, + basicInformation: { + maxPathsPerInvoke: 10, + }, + }, + }); + + const peer1 = controller.peers.get("peer1")!; + expect(peer1).not.undefined; + + const ep1 = peer1.endpoints.for(1); + const cmds = ep1.commandsOf(OnOffClient); + + // Execute commands sequentially + await MockTime.resolve(cmds.off()); + expect(device.parts.get(1)!.stateOf(OnOffServer).onOff).equals(false); + + await MockTime.resolve(cmds.on()); + expect(device.parts.get(1)!.stateOf(OnOffServer).onOff).equals(true); + + await MockTime.resolve(cmds.toggle()); + expect(device.parts.get(1)!.stateOf(OnOffServer).onOff).equals(false); + }); + + it("rejects pending commands when batcher is closed", async () => { + await using site = new MockSite(); + // Enable batching with maxPathsPerInvoke=10 + const { controller } = await site.addCommissionedPair({ + device: { + type: ServerNode.RootEndpoint, + basicInformation: { + maxPathsPerInvoke: 10, + }, + }, + }); + + const peer1 = controller.peers.get("peer1")!; + expect(peer1).not.undefined; + + const ep1 = peer1.endpoints.for(1); + const cmds = ep1.commandsOf(OnOffClient); + + // Queue some commands and immediately attach rejection handlers to avoid unhandled rejection + const promise1 = cmds.toggle().catch(e => e); + const promise2 = cmds.toggle().catch(e => e); + + // Close the invoker directly - this should reject pending commands + await (peer1.interaction as ClientNodeInteraction).invoker.close(); + + // Both promises should have resolved to errors + const error1 = await MockTime.resolve(promise1); + const error2 = await MockTime.resolve(promise2); + + expect(error1).instanceOf(Error); + expect(error1.message).equals("CommandBatcher closed"); + expect(error2).instanceOf(Error); + expect(error2.message).equals("CommandBatcher closed"); + }); +}); From c5715864a930090904f446ac1c41142a428100a6 Mon Sep 17 00:00:00 2001 From: Ingo Fischer Date: Sat, 24 Jan 2026 19:17:08 +0100 Subject: [PATCH 2/2] review feedback --- .../src/node/client/commands/CommandBatcher.ts | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/packages/node/src/node/client/commands/CommandBatcher.ts b/packages/node/src/node/client/commands/CommandBatcher.ts index e787a0aed6..f77562ec56 100644 --- a/packages/node/src/node/client/commands/CommandBatcher.ts +++ b/packages/node/src/node/client/commands/CommandBatcher.ts @@ -39,6 +39,9 @@ export class CommandBatcher extends CommandInvoker { #nextCommandRef = 1; #closed = false; #supportsMultipleInvokes?: boolean; + #resetMultipleInvokes = () => { + this.#supportsMultipleInvokes = undefined; + }; constructor(node: ClientNode) { super(node); @@ -46,9 +49,7 @@ export class CommandBatcher extends CommandInvoker { this.#flushTimer = Time.getTimer("command-batcher", Instant, () => this.#flush()); // Clear cached maxPathsPerInvoke when the node goes offline - node.lifecycle.offline.on(() => { - this.#supportsMultipleInvokes = undefined; - }); + node.lifecycle.offline.on(this.#resetMultipleInvokes); } /** @@ -76,9 +77,10 @@ export class CommandBatcher extends CommandInvoker { throw new ImplementationError("CommandBatcher is closed"); } - // Bypass batching for endpoint 0 (root endpoint) - these are typically - // administrative/commissioning commands that should execute immediately - // Also bypass when multiple invokes are not supported + // Bypass batching for: + // * endpoint 0 (root endpoint) - (typically administrative/commissioning commands, better to execute immediately) + // * endpoint is undefined - multi-endpoint invoke, better also execute directly + // * when multiple invokes are not supported const endpointId = typeof request.endpoint === "number" ? request.endpoint : (request.endpoint as { number?: number })?.number; if (!endpointId || !this.#enabled) { @@ -222,6 +224,7 @@ export class CommandBatcher extends CommandInvoker { */ override async close() { this.#closed = true; + this.node.lifecycle.offline.off(this.#resetMultipleInvokes); this.#flushTimer.stop(); // Reject any remaining pending commands