From 45f474b5112de48a4ff71986f15a25ea17d3c5ec Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Mon, 26 Jan 2026 19:50:42 +0200 Subject: [PATCH 01/24] feat(transport): add rate limiting for messages and connections Add sliding window rate limiting to protect against flooding attacks: - Message rate limiting: 100 messages/second per peer (configurable) - Connection attempt rate limiting: 10 attempts/minute per peer (configurable) Implementation: - Add SlidingWindowRateLimiter class with automatic pruning - Add maxMessagesPerSecond and maxConnectionAttemptsPerMinute options - Integrate rate checks in sendRemoteMessage before sending - Integrate connection rate checks before dialing new connections - Clean up rate limiter state when peers become stale or transport stops Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/platform/rate-limiter.ts | 191 ++++++++++++++++++ .../src/remotes/platform/transport.ts | 27 +++ packages/ocap-kernel/src/remotes/types.ts | 12 ++ 3 files changed, 230 insertions(+) create mode 100644 packages/ocap-kernel/src/remotes/platform/rate-limiter.ts diff --git a/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts b/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts new file mode 100644 index 000000000..6a3e00545 --- /dev/null +++ b/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts @@ -0,0 +1,191 @@ +import { ResourceLimitError } from '@metamask/kernel-errors'; + +/** Default message rate limit: 100 messages per second per peer */ +export const DEFAULT_MESSAGE_RATE_LIMIT = 100; + +/** Default message rate window in milliseconds (1 second) */ +export const DEFAULT_MESSAGE_RATE_WINDOW_MS = 1000; + +/** Default connection attempt rate limit: 10 attempts per minute per peer */ +export const DEFAULT_CONNECTION_RATE_LIMIT = 10; + +/** Default connection rate window in milliseconds (1 minute) */ +export const DEFAULT_CONNECTION_RATE_WINDOW_MS = 60_000; + +/** + * A sliding window rate limiter that tracks event counts per key within a time window. + * Events older than the window are automatically pruned when checking or recording. + */ +export class SlidingWindowRateLimiter { + readonly #maxEvents: number; + + readonly #windowMs: number; + + readonly #timestamps: Map; + + /** + * Create a new sliding window rate limiter. + * + * @param maxEvents - Maximum number of events allowed within the window. + * @param windowMs - Window size in milliseconds. + */ + constructor(maxEvents: number, windowMs: number) { + this.#maxEvents = maxEvents; + this.#windowMs = windowMs; + this.#timestamps = new Map(); + } + + /** + * Check if an event would exceed the rate limit for a given key. + * This does not record the event. + * + * @param key - The key to check (e.g., peer ID). + * @returns True if the event would exceed the rate limit. + */ + wouldExceedLimit(key: string): boolean { + const now = Date.now(); + const cutoff = now - this.#windowMs; + const timestamps = this.#timestamps.get(key); + + if (!timestamps) { + return false; + } + + // Count events within the window + const recentCount = timestamps.filter((ts) => ts > cutoff).length; + return recentCount >= this.#maxEvents; + } + + /** + * Record an event for a given key. + * Automatically prunes old timestamps outside the window. + * + * @param key - The key to record (e.g., peer ID). + */ + recordEvent(key: string): void { + const now = Date.now(); + const cutoff = now - this.#windowMs; + let timestamps = this.#timestamps.get(key); + + if (!timestamps) { + timestamps = []; + this.#timestamps.set(key, timestamps); + } + + // Prune old timestamps and add new one + const pruned = timestamps.filter((ts) => ts > cutoff); + pruned.push(now); + this.#timestamps.set(key, pruned); + } + + /** + * Check the rate limit and record the event if allowed. + * Throws ResourceLimitError if the limit would be exceeded. + * + * @param key - The key to check and record (e.g., peer ID). + * @param limitType - The type of limit for error reporting. + * @throws ResourceLimitError if the rate limit would be exceeded. + */ + checkAndRecord(key: string, limitType: string): void { + if (this.wouldExceedLimit(key)) { + const timestamps = this.#timestamps.get(key) ?? []; + const cutoff = Date.now() - this.#windowMs; + const currentCount = timestamps.filter((ts) => ts > cutoff).length; + + throw new ResourceLimitError( + `Rate limit exceeded: ${currentCount}/${this.#maxEvents} ${limitType} in ${this.#windowMs}ms window`, + { + data: { + limitType, + current: currentCount, + limit: this.#maxEvents, + windowMs: this.#windowMs, + }, + }, + ); + } + this.recordEvent(key); + } + + /** + * Get the current count of events within the window for a key. + * + * @param key - The key to check. + * @returns The number of events within the current window. + */ + getCurrentCount(key: string): number { + const now = Date.now(); + const cutoff = now - this.#windowMs; + const timestamps = this.#timestamps.get(key); + + if (!timestamps) { + return 0; + } + + return timestamps.filter((ts) => ts > cutoff).length; + } + + /** + * Clear all recorded events for a specific key. + * + * @param key - The key to clear. + */ + clearKey(key: string): void { + this.#timestamps.delete(key); + } + + /** + * Clear all recorded events. + */ + clear(): void { + this.#timestamps.clear(); + } + + /** + * Prune old timestamps for all keys. + * Removes keys that have no recent events. + */ + pruneStale(): void { + const now = Date.now(); + const cutoff = now - this.#windowMs; + + for (const [key, timestamps] of this.#timestamps.entries()) { + const pruned = timestamps.filter((ts) => ts > cutoff); + if (pruned.length === 0) { + this.#timestamps.delete(key); + } else { + this.#timestamps.set(key, pruned); + } + } + } +} + +/** + * Factory function to create a message rate limiter. + * + * @param maxMessagesPerSecond - Maximum messages per second per peer. + * @returns A configured SlidingWindowRateLimiter for message rate limiting. + */ +export function makeMessageRateLimiter( + maxMessagesPerSecond: number = DEFAULT_MESSAGE_RATE_LIMIT, +): SlidingWindowRateLimiter { + return new SlidingWindowRateLimiter( + maxMessagesPerSecond, + DEFAULT_MESSAGE_RATE_WINDOW_MS, + ); +} + +/** + * Factory function to create a connection attempt rate limiter. + * + * @param maxAttemptsPerMinute - Maximum connection attempts per minute per peer. + * @returns A configured SlidingWindowRateLimiter for connection rate limiting. + */ +export function makeConnectionRateLimiter( + maxAttemptsPerMinute: number = DEFAULT_CONNECTION_RATE_LIMIT, +): SlidingWindowRateLimiter { + return new SlidingWindowRateLimiter( + maxAttemptsPerMinute, + DEFAULT_CONNECTION_RATE_WINDOW_MS, + ); +} diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index dfc4e8282..7426d732d 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -14,6 +14,12 @@ import { SCTP_USER_INITIATED_ABORT, } from './constants.ts'; import { PeerStateManager } from './peer-state-manager.ts'; +import { + DEFAULT_CONNECTION_RATE_LIMIT, + DEFAULT_MESSAGE_RATE_LIMIT, + makeConnectionRateLimiter, + makeMessageRateLimiter, +} from './rate-limiter.ts'; import { makeReconnectionLifecycle } from './reconnection-lifecycle.ts'; import { ReconnectionManager } from './reconnection.ts'; import { @@ -41,6 +47,8 @@ import type { * @param options.maxMessageSizeBytes - Maximum message size in bytes (default: 1MB). * @param options.cleanupIntervalMs - Stale peer cleanup interval in milliseconds (default: 15 minutes). * @param options.stalePeerTimeoutMs - Stale peer timeout in milliseconds (default: 1 hour). + * @param options.maxMessagesPerSecond - Maximum messages per second per peer (default: 100). + * @param options.maxConnectionAttemptsPerMinute - Maximum connection attempts per minute per peer (default: 10). * @param remoteMessageHandler - Handler to be called when messages are received from elsewhere. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote (after max retries or non-retryable error). * @@ -65,6 +73,8 @@ export async function initTransport( maxMessageSizeBytes = DEFAULT_MAX_MESSAGE_SIZE_BYTES, cleanupIntervalMs = DEFAULT_CLEANUP_INTERVAL_MS, stalePeerTimeoutMs = DEFAULT_STALE_PEER_TIMEOUT_MS, + maxMessagesPerSecond = DEFAULT_MESSAGE_RATE_LIMIT, + maxConnectionAttemptsPerMinute = DEFAULT_CONNECTION_RATE_LIMIT, } = options; let cleanupWakeDetector: (() => void) | undefined; const stopController = new AbortController(); @@ -78,6 +88,10 @@ export async function initTransport( maxConcurrentConnections, () => peerStateManager.countActiveConnections(), ); + const messageRateLimiter = makeMessageRateLimiter(maxMessagesPerSecond); + const connectionRateLimiter = makeConnectionRateLimiter( + maxConnectionAttemptsPerMinute, + ); let cleanupIntervalId: ReturnType | undefined; // Holder for handleConnectionLoss - initialized later after all dependencies are defined // This breaks the circular dependency between readChannel → handleConnectionLoss → registerChannel @@ -109,7 +123,12 @@ export async function initTransport( for (const peerId of stalePeers) { peerStateManager.removePeer(peerId); reconnectionManager.stopReconnection(peerId); + messageRateLimiter.clearKey(peerId); + connectionRateLimiter.clearKey(peerId); } + // Also prune stale rate limiter entries that may not have peer state + messageRateLimiter.pruneStale(); + connectionRateLimiter.pruneStale(); } /** @@ -326,6 +345,9 @@ export async function initTransport( // Validate message size before sending validateMessageSize(message); + // Check message rate limit + messageRateLimiter.checkAndRecord(targetPeerId, 'messages'); + const state = peerStateManager.getState(targetPeerId); // Get or establish channel @@ -334,6 +356,9 @@ export async function initTransport( // Check connection limit before attempting to dial checkConnectionLimit(); + // Check connection attempt rate limit + connectionRateLimiter.checkAndRecord(targetPeerId, 'connectionAttempts'); + try { const { locationHints: hints } = state; channel = await connectionFactory.dialIdempotent( @@ -508,6 +533,8 @@ export async function initTransport( await connectionFactory.stop(); peerStateManager.clear(); reconnectionManager.clear(); + messageRateLimiter.clear(); + connectionRateLimiter.clear(); } // Return the sender with a stop handle and connection management functions diff --git a/packages/ocap-kernel/src/remotes/types.ts b/packages/ocap-kernel/src/remotes/types.ts index b91523904..f278ff93a 100644 --- a/packages/ocap-kernel/src/remotes/types.ts +++ b/packages/ocap-kernel/src/remotes/types.ts @@ -77,6 +77,18 @@ export type RemoteCommsOptions = { * `resetStorage: true` when creating the kernel to clear existing identity first. */ mnemonic?: string | undefined; + /** + * Maximum messages per second per peer (default: 100). + * Messages exceeding this rate are rejected with ResourceLimitError. + * Uses a sliding 1-second window. + */ + maxMessagesPerSecond?: number | undefined; + /** + * Maximum connection attempts per minute per peer (default: 10). + * Connection attempts exceeding this rate are rejected with ResourceLimitError. + * Uses a sliding 1-minute window. + */ + maxConnectionAttemptsPerMinute?: number | undefined; }; export type RemoteInfo = { From 812d8b17da3e1796c29a76aa95e55730237f604d Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Mon, 26 Jan 2026 19:53:20 +0200 Subject: [PATCH 02/24] test(transport): add unit tests for rate limiter Add comprehensive test coverage for SlidingWindowRateLimiter: - Basic limit checking (wouldExceedLimit) - Event recording and pruning - checkAndRecord with error handling - getCurrentCount with window expiration - clearKey and clear methods - pruneStale for cleanup - Sliding window behavior with real timing Also test factory functions and constants. Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/platform/rate-limiter.test.ts | 293 ++++++++++++++++++ 1 file changed, 293 insertions(+) create mode 100644 packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts diff --git a/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts b/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts new file mode 100644 index 000000000..834a540e6 --- /dev/null +++ b/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts @@ -0,0 +1,293 @@ +import { ResourceLimitError } from '@metamask/kernel-errors'; +import { describe, it, expect, beforeEach } from 'vitest'; + +import { + SlidingWindowRateLimiter, + makeMessageRateLimiter, + makeConnectionRateLimiter, + DEFAULT_MESSAGE_RATE_LIMIT, + DEFAULT_MESSAGE_RATE_WINDOW_MS, + DEFAULT_CONNECTION_RATE_LIMIT, + DEFAULT_CONNECTION_RATE_WINDOW_MS, +} from './rate-limiter.ts'; + +describe('SlidingWindowRateLimiter', () => { + let limiter: SlidingWindowRateLimiter; + + beforeEach(() => { + // Create a limiter allowing 3 events per 100ms window for faster tests + limiter = new SlidingWindowRateLimiter(3, 100); + }); + + describe('wouldExceedLimit', () => { + it('returns false when no events recorded', () => { + expect(limiter.wouldExceedLimit('peer1')).toBe(false); + }); + + it('returns false when under the limit', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.wouldExceedLimit('peer1')).toBe(false); + }); + + it('returns true when at the limit', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + }); + + it('tracks limits independently per key', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + expect(limiter.wouldExceedLimit('peer2')).toBe(false); + }); + + it('allows events after window expires', async () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + + // Wait for window to expire + await new Promise((resolve) => setTimeout(resolve, 110)); + expect(limiter.wouldExceedLimit('peer1')).toBe(false); + }); + }); + + describe('recordEvent', () => { + it('records events for a key', () => { + limiter.recordEvent('peer1'); + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + + it('accumulates events', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.getCurrentCount('peer1')).toBe(2); + }); + + it('prunes old events when recording', async () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + await new Promise((resolve) => setTimeout(resolve, 110)); + limiter.recordEvent('peer1'); + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + }); + + describe('checkAndRecord', () => { + it('records event when under limit', () => { + limiter.checkAndRecord('peer1', 'messages'); + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + + it('throws ResourceLimitError when limit exceeded', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + + expect(() => limiter.checkAndRecord('peer1', 'messages')).toThrow( + ResourceLimitError, + ); + }); + + it('includes limit details in error', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + + let caughtError: ResourceLimitError | undefined; + try { + limiter.checkAndRecord('peer1', 'messages'); + } catch (error) { + caughtError = error as ResourceLimitError; + } + + expect(caughtError).toBeInstanceOf(ResourceLimitError); + expect(caughtError?.data).toStrictEqual({ + limitType: 'messages', + current: 3, + limit: 3, + windowMs: 100, + }); + }); + + it('does not record when limit exceeded', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + + try { + limiter.checkAndRecord('peer1', 'messages'); + } catch { + // Expected + } + + expect(limiter.getCurrentCount('peer1')).toBe(3); + }); + }); + + describe('getCurrentCount', () => { + it('returns 0 for unknown key', () => { + expect(limiter.getCurrentCount('unknown')).toBe(0); + }); + + it('returns count of events within window', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.getCurrentCount('peer1')).toBe(2); + }); + + it('excludes events outside window', async () => { + limiter.recordEvent('peer1'); + await new Promise((resolve) => setTimeout(resolve, 60)); + limiter.recordEvent('peer1'); + await new Promise((resolve) => setTimeout(resolve, 50)); + // First event is now outside window (110ms ago) + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + }); + + describe('clearKey', () => { + it('removes all events for a key', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.clearKey('peer1'); + expect(limiter.getCurrentCount('peer1')).toBe(0); + }); + + it('does not affect other keys', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer2'); + limiter.clearKey('peer1'); + expect(limiter.getCurrentCount('peer1')).toBe(0); + expect(limiter.getCurrentCount('peer2')).toBe(1); + }); + }); + + describe('clear', () => { + it('removes all events for all keys', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer2'); + limiter.clear(); + expect(limiter.getCurrentCount('peer1')).toBe(0); + expect(limiter.getCurrentCount('peer2')).toBe(0); + }); + }); + + describe('pruneStale', () => { + it('removes keys with no recent events', async () => { + limiter.recordEvent('peer1'); + await new Promise((resolve) => setTimeout(resolve, 110)); + limiter.pruneStale(); + expect(limiter.getCurrentCount('peer1')).toBe(0); + }); + + it('keeps keys with recent events', () => { + limiter.recordEvent('peer1'); + limiter.pruneStale(); + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + + it('prunes old events from active keys', async () => { + limiter.recordEvent('peer1'); + await new Promise((resolve) => setTimeout(resolve, 60)); + limiter.recordEvent('peer1'); + await new Promise((resolve) => setTimeout(resolve, 50)); + limiter.pruneStale(); + // Only the second event should remain (first is >100ms old) + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + }); + + describe('sliding window behavior', () => { + it('allows burst followed by sustained rate', async () => { + // Burst 3 events + limiter.checkAndRecord('peer1', 'test'); + limiter.checkAndRecord('peer1', 'test'); + limiter.checkAndRecord('peer1', 'test'); + + // Should be at limit + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + + // Wait for first event to expire + await new Promise((resolve) => setTimeout(resolve, 110)); + + // Now slots available + expect(limiter.wouldExceedLimit('peer1')).toBe(false); + limiter.checkAndRecord('peer1', 'test'); + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + }); +}); + +describe('makeMessageRateLimiter', () => { + it('creates limiter with default settings', () => { + const limiter = makeMessageRateLimiter(); + + // Should allow DEFAULT_MESSAGE_RATE_LIMIT events + for (let idx = 0; idx < DEFAULT_MESSAGE_RATE_LIMIT; idx++) { + limiter.recordEvent('peer1'); + } + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + }); + + it('creates limiter with custom rate', () => { + const limiter = makeMessageRateLimiter(5); + + for (let idx = 0; idx < 5; idx++) { + limiter.recordEvent('peer1'); + } + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + }); + + it('uses 1 second window', async () => { + // Use a small limit to make test faster + const limiter = makeMessageRateLimiter(2); + + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + + // Window is 1 second, so after 1 second events should be allowed + await new Promise((resolve) => + setTimeout(resolve, DEFAULT_MESSAGE_RATE_WINDOW_MS + 10), + ); + expect(limiter.wouldExceedLimit('peer1')).toBe(false); + }); +}); + +describe('makeConnectionRateLimiter', () => { + it('creates limiter with default settings', () => { + const limiter = makeConnectionRateLimiter(); + + // Should allow DEFAULT_CONNECTION_RATE_LIMIT events + for (let idx = 0; idx < DEFAULT_CONNECTION_RATE_LIMIT; idx++) { + limiter.recordEvent('peer1'); + } + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + }); + + it('creates limiter with custom rate', () => { + const limiter = makeConnectionRateLimiter(3); + + for (let idx = 0; idx < 3; idx++) { + limiter.recordEvent('peer1'); + } + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + }); + + // Skip window expiration test for connection limiter as it would take 60 seconds +}); + +describe('constants', () => { + it('exports expected default values', () => { + expect(DEFAULT_MESSAGE_RATE_LIMIT).toBe(100); + expect(DEFAULT_MESSAGE_RATE_WINDOW_MS).toBe(1000); + expect(DEFAULT_CONNECTION_RATE_LIMIT).toBe(10); + expect(DEFAULT_CONNECTION_RATE_WINDOW_MS).toBe(60_000); + }); +}); From b3c635d52557ac5cc3bc04ab13ab3d9d9cd1dd76 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Mon, 26 Jan 2026 22:13:12 +0200 Subject: [PATCH 03/24] fix: add rate limit types to ResourceLimitError - Add 'messageRate' and 'connectionRate' to ResourceLimitError limitType - Update rate limiter to use correct limit type enum values - Update tests to match new limit types Co-Authored-By: Claude Opus 4.5 --- .../src/errors/ResourceLimitError.test.ts | 2 +- .../src/errors/ResourceLimitError.ts | 19 ++++++++++++++++--- .../src/remotes/platform/rate-limiter.test.ts | 19 +++++++++---------- .../src/remotes/platform/rate-limiter.ts | 6 ++++-- .../src/remotes/platform/transport.ts | 4 ++-- 5 files changed, 32 insertions(+), 18 deletions(-) diff --git a/packages/kernel-errors/src/errors/ResourceLimitError.test.ts b/packages/kernel-errors/src/errors/ResourceLimitError.test.ts index ae2bf6da4..a37f3f57d 100644 --- a/packages/kernel-errors/src/errors/ResourceLimitError.test.ts +++ b/packages/kernel-errors/src/errors/ResourceLimitError.test.ts @@ -167,7 +167,7 @@ describe('ResourceLimitError', () => { stack: 'stack trace', } as unknown as MarshaledOcapError, expectedError: - 'At path: data.limitType -- Expected the value to satisfy a union of `literal | literal`, but received: "invalid"', + 'At path: data.limitType -- Expected the value to satisfy a union of `literal | literal | literal | literal`, but received: "invalid"', }, { name: 'invalid current type', diff --git a/packages/kernel-errors/src/errors/ResourceLimitError.ts b/packages/kernel-errors/src/errors/ResourceLimitError.ts index 13026a7c6..d5cc5f39f 100644 --- a/packages/kernel-errors/src/errors/ResourceLimitError.ts +++ b/packages/kernel-errors/src/errors/ResourceLimitError.ts @@ -29,7 +29,11 @@ export class ResourceLimitError extends BaseError { message: string, options?: ErrorOptionsWithStack & { data?: { - limitType?: 'connection' | 'messageSize'; + limitType?: + | 'connection' + | 'messageSize' + | 'messageRate' + | 'connectionRate'; current?: number; limit?: number; }; @@ -50,7 +54,12 @@ export class ResourceLimitError extends BaseError { data: optional( object({ limitType: optional( - union([literal('connection'), literal('messageSize')]), + union([ + literal('connection'), + literal('messageSize'), + literal('messageRate'), + literal('connectionRate'), + ]), ), current: optional(number()), limit: optional(number()), @@ -75,7 +84,11 @@ export class ResourceLimitError extends BaseError { const options = unmarshalErrorOptions(marshaledError); const data = marshaledError.data as | { - limitType?: 'connection' | 'messageSize'; + limitType?: + | 'connection' + | 'messageSize' + | 'messageRate' + | 'connectionRate'; current?: number; limit?: number; } diff --git a/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts b/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts index 834a540e6..aa3507c6b 100644 --- a/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts @@ -80,7 +80,7 @@ describe('SlidingWindowRateLimiter', () => { describe('checkAndRecord', () => { it('records event when under limit', () => { - limiter.checkAndRecord('peer1', 'messages'); + limiter.checkAndRecord('peer1', 'messageRate'); expect(limiter.getCurrentCount('peer1')).toBe(1); }); @@ -89,7 +89,7 @@ describe('SlidingWindowRateLimiter', () => { limiter.recordEvent('peer1'); limiter.recordEvent('peer1'); - expect(() => limiter.checkAndRecord('peer1', 'messages')).toThrow( + expect(() => limiter.checkAndRecord('peer1', 'messageRate')).toThrow( ResourceLimitError, ); }); @@ -101,17 +101,16 @@ describe('SlidingWindowRateLimiter', () => { let caughtError: ResourceLimitError | undefined; try { - limiter.checkAndRecord('peer1', 'messages'); + limiter.checkAndRecord('peer1', 'messageRate'); } catch (error) { caughtError = error as ResourceLimitError; } expect(caughtError).toBeInstanceOf(ResourceLimitError); expect(caughtError?.data).toStrictEqual({ - limitType: 'messages', + limitType: 'messageRate', current: 3, limit: 3, - windowMs: 100, }); }); @@ -121,7 +120,7 @@ describe('SlidingWindowRateLimiter', () => { limiter.recordEvent('peer1'); try { - limiter.checkAndRecord('peer1', 'messages'); + limiter.checkAndRecord('peer1', 'messageRate'); } catch { // Expected } @@ -206,9 +205,9 @@ describe('SlidingWindowRateLimiter', () => { describe('sliding window behavior', () => { it('allows burst followed by sustained rate', async () => { // Burst 3 events - limiter.checkAndRecord('peer1', 'test'); - limiter.checkAndRecord('peer1', 'test'); - limiter.checkAndRecord('peer1', 'test'); + limiter.checkAndRecord('peer1', 'messageRate'); + limiter.checkAndRecord('peer1', 'messageRate'); + limiter.checkAndRecord('peer1', 'messageRate'); // Should be at limit expect(limiter.wouldExceedLimit('peer1')).toBe(true); @@ -218,7 +217,7 @@ describe('SlidingWindowRateLimiter', () => { // Now slots available expect(limiter.wouldExceedLimit('peer1')).toBe(false); - limiter.checkAndRecord('peer1', 'test'); + limiter.checkAndRecord('peer1', 'messageRate'); expect(limiter.getCurrentCount('peer1')).toBe(1); }); }); diff --git a/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts b/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts index 6a3e00545..6776ec317 100644 --- a/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts +++ b/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts @@ -86,7 +86,10 @@ export class SlidingWindowRateLimiter { * @param limitType - The type of limit for error reporting. * @throws ResourceLimitError if the rate limit would be exceeded. */ - checkAndRecord(key: string, limitType: string): void { + checkAndRecord( + key: string, + limitType: 'messageRate' | 'connectionRate', + ): void { if (this.wouldExceedLimit(key)) { const timestamps = this.#timestamps.get(key) ?? []; const cutoff = Date.now() - this.#windowMs; @@ -99,7 +102,6 @@ export class SlidingWindowRateLimiter { limitType, current: currentCount, limit: this.#maxEvents, - windowMs: this.#windowMs, }, }, ); diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index 7426d732d..95fe066ea 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -346,7 +346,7 @@ export async function initTransport( validateMessageSize(message); // Check message rate limit - messageRateLimiter.checkAndRecord(targetPeerId, 'messages'); + messageRateLimiter.checkAndRecord(targetPeerId, 'messageRate'); const state = peerStateManager.getState(targetPeerId); @@ -357,7 +357,7 @@ export async function initTransport( checkConnectionLimit(); // Check connection attempt rate limit - connectionRateLimiter.checkAndRecord(targetPeerId, 'connectionAttempts'); + connectionRateLimiter.checkAndRecord(targetPeerId, 'connectionRate'); try { const { locationHints: hints } = state; From 768eeeaec53f65fbbfeba9521c2c32173fdc6708 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Mon, 26 Jan 2026 22:52:09 +0200 Subject: [PATCH 04/24] fix(transport): record message rate after send, rate limit reconnections - Move message rate recording to after successful write instead of before send attempt. This prevents failed sends from consuming rate quota. - Add connection rate limiting to automatic reconnection attempts via checkConnectionRateLimit dependency in reconnection lifecycle. - Handle ResourceLimitError gracefully during reconnection by continuing the loop after backoff instead of giving up on the peer. Co-Authored-By: Claude Opus 4.5 --- .../platform/reconnection-lifecycle.test.ts | 43 +++++++++++++++++++ .../platform/reconnection-lifecycle.ts | 18 +++++++- .../src/remotes/platform/transport.ts | 19 +++++++- 3 files changed, 77 insertions(+), 3 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts index 5c45729e2..732136f59 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts @@ -79,6 +79,7 @@ describe('reconnection-lifecycle', () => { dialPeer: vi.fn().mockResolvedValue(mockChannel), reuseOrReturnChannel: vi.fn().mockResolvedValue(mockChannel), checkConnectionLimit: vi.fn(), + checkConnectionRateLimit: vi.fn(), registerChannel: vi.fn(), } as unknown as ReconnectionLifecycleDeps; }); @@ -321,6 +322,48 @@ describe('reconnection-lifecycle', () => { expect(deps.checkConnectionLimit).toHaveBeenCalled(); }); + it('checks connection rate limit before dialing', async () => { + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + expect(deps.checkConnectionRateLimit).toHaveBeenCalledWith('peer1'); + }); + + it('continues loop on ResourceLimitError instead of giving up', async () => { + const { ResourceLimitError } = kernelErrors; + ( + deps.checkConnectionRateLimit as ReturnType + ).mockImplementationOnce(() => { + throw new ResourceLimitError('Rate limit exceeded', { + data: { limitType: 'connectionRate', current: 10, limit: 10 }, + }); + }); + + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) // First attempt - rate limited + .mockReturnValueOnce(true) // Second attempt - success + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + // Should not call onRemoteGiveUp because rate limit is retryable + expect(deps.onRemoteGiveUp).not.toHaveBeenCalled(); + // Should have tried twice (once rate limited, once successful) + expect(deps.reconnectionManager.incrementAttempt).toHaveBeenCalledTimes( + 2, + ); + expect(deps.logger.log).toHaveBeenCalledWith( + expect.stringContaining('rate limited'), + ); + }); + it('logs reconnection attempts', async () => { (deps.reconnectionManager.isReconnecting as ReturnType) .mockReturnValueOnce(true) diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts index 5cc950f52..9a0653948 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts @@ -1,4 +1,7 @@ -import { isRetryableNetworkError } from '@metamask/kernel-errors'; +import { + isRetryableNetworkError, + ResourceLimitError, +} from '@metamask/kernel-errors'; import { abortableDelay, DEFAULT_MAX_RETRY_ATTEMPTS, @@ -27,6 +30,7 @@ export type ReconnectionLifecycleDeps = { dialedChannel: Channel, ) => Promise; checkConnectionLimit: () => void; + checkConnectionRateLimit: (peerId: string) => void; registerChannel: ( peerId: string, channel: Channel, @@ -62,6 +66,7 @@ export function makeReconnectionLifecycle( dialPeer, reuseOrReturnChannel, checkConnectionLimit, + checkConnectionRateLimit, registerChannel, } = deps; @@ -124,6 +129,14 @@ export function makeReconnectionLifecycle( reconnectionManager.stopReconnection(peerId); return; } + // Rate limit errors are temporary - skip this attempt but continue the loop + // The backoff delay will naturally space out attempts + if (problem instanceof ResourceLimitError) { + logger.log( + `${peerId}:: reconnection attempt ${nextAttempt} rate limited, will retry after backoff`, + ); + continue; + } if (!isRetryableNetworkError(problem)) { outputError(peerId, `non-retryable failure`, problem); reconnectionManager.stopReconnection(peerId); @@ -151,6 +164,9 @@ export function makeReconnectionLifecycle( state: PeerState, peerId: string, ): Promise { + // Check connection rate limit before attempting dial + checkConnectionRateLimit(peerId); + const { locationHints: hints } = state; const dialedChannel = await dialPeer(peerId, hints); diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index 95fe066ea..c4ee367e5 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -316,6 +316,8 @@ export async function initTransport( connectionFactory.dialIdempotent(peerId, hints, false), reuseOrReturnChannel, checkConnectionLimit, + checkConnectionRateLimit: (peerId: string) => + connectionRateLimiter.checkAndRecord(peerId, 'connectionRate'), registerChannel, }); reconnectionHolder.handleConnectionLoss = @@ -345,8 +347,19 @@ export async function initTransport( // Validate message size before sending validateMessageSize(message); - // Check message rate limit - messageRateLimiter.checkAndRecord(targetPeerId, 'messageRate'); + // Check message rate limit (check only, record after successful send) + if (messageRateLimiter.wouldExceedLimit(targetPeerId)) { + throw new ResourceLimitError( + `Rate limit exceeded: ${messageRateLimiter.getCurrentCount(targetPeerId)}/${maxMessagesPerSecond} messageRate in ${1000}ms window`, + { + data: { + limitType: 'messageRate', + current: messageRateLimiter.getCurrentCount(targetPeerId), + limit: maxMessagesPerSecond, + }, + }, + ); + } const state = peerStateManager.getState(targetPeerId); @@ -400,6 +413,8 @@ export async function initTransport( fromString(message), DEFAULT_WRITE_TIMEOUT_MS, ); + // Record message rate only after successful send + messageRateLimiter.recordEvent(targetPeerId); peerStateManager.updateConnectionTime(targetPeerId); reconnectionManager.resetBackoff(targetPeerId); } catch (problem) { From 76e30e2857ef3bec24135d954fb0fcbbd2b10e55 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Mon, 26 Jan 2026 23:12:57 +0200 Subject: [PATCH 05/24] test(e2e): fix queue limit test to avoid rate limit interference - Add remoteCommsOptions parameter to setupAliceAndBob helper - Configure higher maxMessagesPerSecond for queue limit test to ensure rate limiting doesn't interfere with queue limit testing Co-Authored-By: Claude Opus 4.5 --- packages/nodejs/test/e2e/remote-comms.test.ts | 2 ++ packages/nodejs/test/helpers/remote-comms.ts | 12 +++++++++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/packages/nodejs/test/e2e/remote-comms.test.ts b/packages/nodejs/test/e2e/remote-comms.test.ts index 5d2774f6c..37da6c2df 100644 --- a/packages/nodejs/test/e2e/remote-comms.test.ts +++ b/packages/nodejs/test/e2e/remote-comms.test.ts @@ -524,12 +524,14 @@ describe.sequential('Remote Communications E2E', () => { it( 'rejects new messages when queue reaches MAX_QUEUE limit', async () => { + // Use high rate limit to avoid rate limiting interference with queue limit test const { aliceRef, bobURL } = await setupAliceAndBob( kernel1, kernel2, kernelStore1, kernelStore2, testRelays, + { maxMessagesPerSecond: 500 }, ); await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']); diff --git a/packages/nodejs/test/helpers/remote-comms.ts b/packages/nodejs/test/helpers/remote-comms.ts index bcd7b80f4..303a31726 100644 --- a/packages/nodejs/test/helpers/remote-comms.ts +++ b/packages/nodejs/test/helpers/remote-comms.ts @@ -1,7 +1,11 @@ import type { KernelDatabase } from '@metamask/kernel-store'; import { stringify } from '@metamask/kernel-utils'; import { Kernel, kunser, makeKernelStore } from '@metamask/ocap-kernel'; -import type { ClusterConfig, KRef } from '@metamask/ocap-kernel'; +import type { + ClusterConfig, + KRef, + RemoteCommsOptions, +} from '@metamask/ocap-kernel'; import { makeTestKernel } from './kernel.ts'; @@ -181,6 +185,7 @@ export async function wait(ms: number): Promise { * @param kernelStore1 - Kernel store for first kernel. * @param kernelStore2 - Kernel store for second kernel. * @param relays - Array of relay addresses. + * @param remoteCommsOptions - Optional additional options for initRemoteComms. * @returns Object with all setup data including URLs and references. */ export async function setupAliceAndBob( @@ -189,6 +194,7 @@ export async function setupAliceAndBob( kernelStore1: ReturnType, kernelStore2: ReturnType, relays: string[], + remoteCommsOptions?: Omit, ): Promise<{ aliceURL: string; bobURL: string; @@ -197,8 +203,8 @@ export async function setupAliceAndBob( peerId1: string; peerId2: string; }> { - await kernel1.initRemoteComms({ relays }); - await kernel2.initRemoteComms({ relays }); + await kernel1.initRemoteComms({ relays, ...remoteCommsOptions }); + await kernel2.initRemoteComms({ relays, ...remoteCommsOptions }); const aliceConfig = makeRemoteVatConfig('Alice'); const bobConfig = makeRemoteVatConfig('Bob'); From 6b90047b53cad4532e4f554da0777b5fb1c59722 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Mon, 26 Jan 2026 23:14:25 +0200 Subject: [PATCH 06/24] fix(transport): avoid duplicate getCurrentCount calls in rate limit error - Call getCurrentCount once and reuse the value for both message and data - Use DEFAULT_MESSAGE_RATE_WINDOW_MS constant instead of hardcoded 1000 Co-Authored-By: Claude Opus 4.5 --- packages/ocap-kernel/src/remotes/platform/transport.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index c4ee367e5..fb230f3d9 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -17,6 +17,7 @@ import { PeerStateManager } from './peer-state-manager.ts'; import { DEFAULT_CONNECTION_RATE_LIMIT, DEFAULT_MESSAGE_RATE_LIMIT, + DEFAULT_MESSAGE_RATE_WINDOW_MS, makeConnectionRateLimiter, makeMessageRateLimiter, } from './rate-limiter.ts'; @@ -349,12 +350,13 @@ export async function initTransport( // Check message rate limit (check only, record after successful send) if (messageRateLimiter.wouldExceedLimit(targetPeerId)) { + const currentCount = messageRateLimiter.getCurrentCount(targetPeerId); throw new ResourceLimitError( - `Rate limit exceeded: ${messageRateLimiter.getCurrentCount(targetPeerId)}/${maxMessagesPerSecond} messageRate in ${1000}ms window`, + `Rate limit exceeded: ${currentCount}/${maxMessagesPerSecond} messageRate in ${DEFAULT_MESSAGE_RATE_WINDOW_MS}ms window`, { data: { limitType: 'messageRate', - current: messageRateLimiter.getCurrentCount(targetPeerId), + current: currentCount, limit: maxMessagesPerSecond, }, }, From 040914f407c47264944de931664855c55de6b08b Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Mon, 26 Jan 2026 23:17:10 +0200 Subject: [PATCH 07/24] refactor(transport): move rate limiter constants to constants.ts Move DEFAULT_MESSAGE_RATE_LIMIT, DEFAULT_MESSAGE_RATE_WINDOW_MS, DEFAULT_CONNECTION_RATE_LIMIT, and DEFAULT_CONNECTION_RATE_WINDOW_MS to constants.ts for consistency with other default constants. Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/platform/constants.ts | 12 ++++++++++++ .../src/remotes/platform/rate-limiter.test.ts | 10 ++++++---- .../src/remotes/platform/rate-limiter.ts | 17 ++++++----------- .../src/remotes/platform/transport.ts | 6 +++--- 4 files changed, 27 insertions(+), 18 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/platform/constants.ts b/packages/ocap-kernel/src/remotes/platform/constants.ts index 608a3ebe7..ca66968fc 100644 --- a/packages/ocap-kernel/src/remotes/platform/constants.ts +++ b/packages/ocap-kernel/src/remotes/platform/constants.ts @@ -15,3 +15,15 @@ export const DEFAULT_WRITE_TIMEOUT_MS = 10_000; /** SCTP user initiated abort code (RFC 4960) */ export const SCTP_USER_INITIATED_ABORT = 12; + +/** Default message rate limit: 100 messages per second per peer */ +export const DEFAULT_MESSAGE_RATE_LIMIT = 100; + +/** Default message rate window in milliseconds (1 second) */ +export const DEFAULT_MESSAGE_RATE_WINDOW_MS = 1000; + +/** Default connection attempt rate limit: 10 attempts per minute per peer */ +export const DEFAULT_CONNECTION_RATE_LIMIT = 10; + +/** Default connection rate window in milliseconds (1 minute) */ +export const DEFAULT_CONNECTION_RATE_WINDOW_MS = 60_000; diff --git a/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts b/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts index aa3507c6b..57f443113 100644 --- a/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts @@ -1,14 +1,16 @@ import { ResourceLimitError } from '@metamask/kernel-errors'; import { describe, it, expect, beforeEach } from 'vitest'; +import { + DEFAULT_CONNECTION_RATE_LIMIT, + DEFAULT_CONNECTION_RATE_WINDOW_MS, + DEFAULT_MESSAGE_RATE_LIMIT, + DEFAULT_MESSAGE_RATE_WINDOW_MS, +} from './constants.ts'; import { SlidingWindowRateLimiter, makeMessageRateLimiter, makeConnectionRateLimiter, - DEFAULT_MESSAGE_RATE_LIMIT, - DEFAULT_MESSAGE_RATE_WINDOW_MS, - DEFAULT_CONNECTION_RATE_LIMIT, - DEFAULT_CONNECTION_RATE_WINDOW_MS, } from './rate-limiter.ts'; describe('SlidingWindowRateLimiter', () => { diff --git a/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts b/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts index 6776ec317..b3e79a19c 100644 --- a/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts +++ b/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts @@ -1,16 +1,11 @@ import { ResourceLimitError } from '@metamask/kernel-errors'; -/** Default message rate limit: 100 messages per second per peer */ -export const DEFAULT_MESSAGE_RATE_LIMIT = 100; - -/** Default message rate window in milliseconds (1 second) */ -export const DEFAULT_MESSAGE_RATE_WINDOW_MS = 1000; - -/** Default connection attempt rate limit: 10 attempts per minute per peer */ -export const DEFAULT_CONNECTION_RATE_LIMIT = 10; - -/** Default connection rate window in milliseconds (1 minute) */ -export const DEFAULT_CONNECTION_RATE_WINDOW_MS = 60_000; +import { + DEFAULT_CONNECTION_RATE_LIMIT, + DEFAULT_CONNECTION_RATE_WINDOW_MS, + DEFAULT_MESSAGE_RATE_LIMIT, + DEFAULT_MESSAGE_RATE_WINDOW_MS, +} from './constants.ts'; /** * A sliding window rate limiter that tracks event counts per key within a time window. diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index fb230f3d9..7e0880f58 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -7,17 +7,17 @@ import { makeErrorLogger, writeWithTimeout } from './channel-utils.ts'; import { ConnectionFactory } from './connection-factory.ts'; import { DEFAULT_CLEANUP_INTERVAL_MS, + DEFAULT_CONNECTION_RATE_LIMIT, DEFAULT_MAX_CONCURRENT_CONNECTIONS, DEFAULT_MAX_MESSAGE_SIZE_BYTES, + DEFAULT_MESSAGE_RATE_LIMIT, + DEFAULT_MESSAGE_RATE_WINDOW_MS, DEFAULT_STALE_PEER_TIMEOUT_MS, DEFAULT_WRITE_TIMEOUT_MS, SCTP_USER_INITIATED_ABORT, } from './constants.ts'; import { PeerStateManager } from './peer-state-manager.ts'; import { - DEFAULT_CONNECTION_RATE_LIMIT, - DEFAULT_MESSAGE_RATE_LIMIT, - DEFAULT_MESSAGE_RATE_WINDOW_MS, makeConnectionRateLimiter, makeMessageRateLimiter, } from './rate-limiter.ts'; From bf3b1d11090507489bbf6147fc50e6a6e77347ad Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Mon, 26 Jan 2026 23:56:43 +0200 Subject: [PATCH 08/24] fix(transport): use atomic checkAndRecord to prevent TOCTOU race Revert to using checkAndRecord() for message rate limiting instead of separate check and record calls. The separated approach had a TOCTOU race where concurrent sends could all pass the check before any recorded, bypassing the rate limit. Yes, failed sends now consume quota, but this is necessary for security - recording after send would allow attackers to make unlimited concurrent attempts that bypass the rate limit. Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/platform/transport.ts | 22 +++++-------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index 7e0880f58..dd36c1ea9 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -11,7 +11,6 @@ import { DEFAULT_MAX_CONCURRENT_CONNECTIONS, DEFAULT_MAX_MESSAGE_SIZE_BYTES, DEFAULT_MESSAGE_RATE_LIMIT, - DEFAULT_MESSAGE_RATE_WINDOW_MS, DEFAULT_STALE_PEER_TIMEOUT_MS, DEFAULT_WRITE_TIMEOUT_MS, SCTP_USER_INITIATED_ABORT, @@ -348,20 +347,11 @@ export async function initTransport( // Validate message size before sending validateMessageSize(message); - // Check message rate limit (check only, record after successful send) - if (messageRateLimiter.wouldExceedLimit(targetPeerId)) { - const currentCount = messageRateLimiter.getCurrentCount(targetPeerId); - throw new ResourceLimitError( - `Rate limit exceeded: ${currentCount}/${maxMessagesPerSecond} messageRate in ${DEFAULT_MESSAGE_RATE_WINDOW_MS}ms window`, - { - data: { - limitType: 'messageRate', - current: currentCount, - limit: maxMessagesPerSecond, - }, - }, - ); - } + // Check and record message rate limit atomically to prevent TOCTOU race + // Note: This records before send completes, so failed sends consume quota. + // This is intentional - recording after send would allow concurrent sends + // to bypass the rate limit via TOCTOU attacks. + messageRateLimiter.checkAndRecord(targetPeerId, 'messageRate'); const state = peerStateManager.getState(targetPeerId); @@ -415,8 +405,6 @@ export async function initTransport( fromString(message), DEFAULT_WRITE_TIMEOUT_MS, ); - // Record message rate only after successful send - messageRateLimiter.recordEvent(targetPeerId); peerStateManager.updateConnectionTime(targetPeerId); reconnectionManager.resetBackoff(targetPeerId); } catch (problem) { From 7092a0a39c816c138053ed13c0104d847e8dd365 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Wed, 28 Jan 2026 16:25:16 +0100 Subject: [PATCH 09/24] fix(transport): preserve retry quota on rate limit, close rejected channels Two bug fixes: 1. Rate-limited reconnection attempts no longer consume retry quota. Previously, incrementAttempt was called before the rate limit check, so rate-limited attempts counted against maxRetryAttempts even though no dial was performed. Now, decrementAttempt is called when rate limited to undo the premature increment. 2. Rejected inbound connections are now properly closed. When an inbound connection is rejected (due to intentional close or connection limit), the channel is now closed via closeChannel() to prevent resource leaks from dangling libp2p streams. Co-Authored-By: Claude Opus 4.5 --- .../platform/reconnection-lifecycle.test.ts | 30 +++++++++++++++ .../platform/reconnection-lifecycle.ts | 3 ++ .../src/remotes/platform/reconnection.test.ts | 37 +++++++++++++++++++ .../src/remotes/platform/reconnection.ts | 14 +++++++ .../src/remotes/platform/transport.test.ts | 14 +++++++ .../src/remotes/platform/transport.ts | 21 ++++++++++- 6 files changed, 118 insertions(+), 1 deletion(-) diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts index 732136f59..9d1d72cb7 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts @@ -69,6 +69,7 @@ describe('reconnection-lifecycle', () => { isReconnecting: vi.fn().mockReturnValue(true), shouldRetry: vi.fn().mockReturnValue(true), incrementAttempt: vi.fn().mockReturnValue(1), + decrementAttempt: vi.fn(), calculateBackoff: vi.fn().mockReturnValue(100), startReconnection: vi.fn(), stopReconnection: vi.fn(), @@ -364,6 +365,35 @@ describe('reconnection-lifecycle', () => { ); }); + it('decrements attempt count when rate limited to preserve retry quota', async () => { + const { ResourceLimitError } = kernelErrors; + ( + deps.checkConnectionRateLimit as ReturnType + ).mockImplementationOnce(() => { + throw new ResourceLimitError('Rate limit exceeded', { + data: { limitType: 'connectionRate', current: 10, limit: 10 }, + }); + }); + + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) // First attempt - rate limited + .mockReturnValueOnce(true) // Second attempt - success + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + // Should have decremented attempt count when rate limited + // so that rate-limited attempts don't consume retry quota + expect(deps.reconnectionManager.decrementAttempt).toHaveBeenCalledWith( + 'peer1', + ); + expect(deps.reconnectionManager.decrementAttempt).toHaveBeenCalledTimes( + 1, + ); + }); + it('logs reconnection attempts', async () => { (deps.reconnectionManager.isReconnecting as ReturnType) .mockReturnValueOnce(true) diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts index 9a0653948..2f3fc28ea 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts @@ -132,6 +132,9 @@ export function makeReconnectionLifecycle( // Rate limit errors are temporary - skip this attempt but continue the loop // The backoff delay will naturally space out attempts if (problem instanceof ResourceLimitError) { + // Don't count rate-limited attempts against the retry quota since + // no actual dial was performed + reconnectionManager.decrementAttempt(peerId); logger.log( `${peerId}:: reconnection attempt ${nextAttempt} rate limited, will retry after backoff`, ); diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection.test.ts b/packages/ocap-kernel/src/remotes/platform/reconnection.test.ts index c4469e182..7ca47e634 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection.test.ts @@ -126,6 +126,43 @@ describe('ReconnectionManager', () => { }); }); + describe('decrementAttempt', () => { + it('decrements attempt count', () => { + manager.incrementAttempt('peer1'); + manager.incrementAttempt('peer1'); + expect(manager.getAttemptCount('peer1')).toBe(2); + + manager.decrementAttempt('peer1'); + expect(manager.getAttemptCount('peer1')).toBe(1); + + manager.decrementAttempt('peer1'); + expect(manager.getAttemptCount('peer1')).toBe(0); + }); + + it('does not go below zero', () => { + expect(manager.getAttemptCount('peer1')).toBe(0); + + manager.decrementAttempt('peer1'); + expect(manager.getAttemptCount('peer1')).toBe(0); + }); + + it('handles decrement on new peer without prior state', () => { + manager.decrementAttempt('newpeer'); + expect(manager.getAttemptCount('newpeer')).toBe(0); + }); + + it('only affects specified peer', () => { + manager.incrementAttempt('peer1'); + manager.incrementAttempt('peer1'); + manager.incrementAttempt('peer2'); + + manager.decrementAttempt('peer1'); + + expect(manager.getAttemptCount('peer1')).toBe(1); + expect(manager.getAttemptCount('peer2')).toBe(1); + }); + }); + describe('resetBackoff', () => { it('resets attempt count to zero', () => { manager.incrementAttempt('peer1'); diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection.ts b/packages/ocap-kernel/src/remotes/platform/reconnection.ts index 3664a2472..8393cd6ab 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection.ts @@ -75,6 +75,20 @@ export class ReconnectionManager { return state.attemptCount; } + /** + * Decrement the attempt count (minimum 0). + * Used to "undo" an attempt that didn't actually perform a dial + * (e.g., when rate-limited before the connection was attempted). + * + * @param peerId - The peer ID to decrement attempts for. + */ + decrementAttempt(peerId: string): void { + const state = this.#getState(peerId); + if (state.attemptCount > 0) { + state.attemptCount -= 1; + } + } + /** * Reset the backoff counter for a peer. * diff --git a/packages/ocap-kernel/src/remotes/platform/transport.test.ts b/packages/ocap-kernel/src/remotes/platform/transport.test.ts index 8924cfeb9..825f84413 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.test.ts @@ -20,6 +20,7 @@ const mockReconnectionManager = { stopReconnection: vi.fn(), shouldRetry: vi.fn().mockReturnValue(true), incrementAttempt: vi.fn().mockReturnValue(1), + decrementAttempt: vi.fn(), calculateBackoff: vi.fn().mockReturnValue(100), resetBackoff: vi.fn(), resetAllBackoffs: vi.fn(), @@ -39,6 +40,8 @@ vi.mock('./reconnection.ts', () => { incrementAttempt = mockReconnectionManager.incrementAttempt; + decrementAttempt = mockReconnectionManager.decrementAttempt; + calculateBackoff = mockReconnectionManager.calculateBackoff; resetBackoff = mockReconnectionManager.resetBackoff; @@ -162,6 +165,7 @@ describe('transport.initTransport', () => { mockReconnectionManager.stopReconnection.mockClear(); mockReconnectionManager.shouldRetry.mockClear(); mockReconnectionManager.incrementAttempt.mockClear(); + mockReconnectionManager.decrementAttempt.mockClear(); mockReconnectionManager.calculateBackoff.mockClear(); mockReconnectionManager.resetBackoff.mockClear(); mockReconnectionManager.resetAllBackoffs.mockClear(); @@ -894,6 +898,11 @@ describe('transport.initTransport', () => { ); // Should not start reading from this channel expect(mockChannel.msgStream.read).not.toHaveBeenCalled(); + // Should close the channel to prevent resource leaks + expect(mockConnectionFactory.closeChannel).toHaveBeenCalledWith( + mockChannel, + 'peer-1', + ); }); }); }); @@ -1795,6 +1804,11 @@ describe('transport.initTransport', () => { expect(mockLogger.log).toHaveBeenCalledWith( 'peer-3:: rejecting inbound connection due to connection limit', ); + // Should close the channel to prevent resource leaks + expect(mockConnectionFactory.closeChannel).toHaveBeenCalledWith( + inboundChannel, + 'peer-3', + ); }); // Should not have started reading from the rejected channel diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index dd36c1ea9..464016ab3 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -429,7 +429,16 @@ export async function initTransport( logger.log( `${channel.peerId}:: rejecting inbound connection from intentionally closed peer`, ); - // Don't add to channels map and don't start reading - connection will naturally close + // Close the channel to release network resources and prevent leaks + connectionFactory + .closeChannel(channel, channel.peerId) + .catch((problem) => { + outputError( + channel.peerId, + 'closing rejected inbound channel', + problem, + ); + }); return; } @@ -441,6 +450,16 @@ export async function initTransport( logger.log( `${channel.peerId}:: rejecting inbound connection due to connection limit`, ); + // Close the channel to release network resources and prevent leaks + connectionFactory + .closeChannel(channel, channel.peerId) + .catch((problem) => { + outputError( + channel.peerId, + 'closing rejected inbound channel', + problem, + ); + }); return; } throw error; From e9e725a4a2483336eb3fac9fe412e0a877a4473c Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Wed, 28 Jan 2026 16:37:53 +0100 Subject: [PATCH 10/24] fix(transport): add input validation and optimize rate limiter - Add validation in SlidingWindowRateLimiter constructor to reject non-positive values for maxEvents and windowMs, preventing misconfiguration that would cause unexpected behavior - Optimize checkAndRecord to use getCurrentCount instead of duplicating the Date.now() call and timestamp filtering logic - Refactor constructor validation tests to use it.each for conciseness Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/platform/rate-limiter.test.ts | 36 +++++++++++++++++++ .../src/remotes/platform/rate-limiter.ts | 14 +++++--- 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts b/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts index 57f443113..7f53b5a55 100644 --- a/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts @@ -21,6 +21,42 @@ describe('SlidingWindowRateLimiter', () => { limiter = new SlidingWindowRateLimiter(3, 100); }); + describe('constructor', () => { + it.each([ + { + maxEvents: 0, + windowMs: 100, + error: 'maxEvents must be a positive number', + }, + { + maxEvents: -1, + windowMs: 100, + error: 'maxEvents must be a positive number', + }, + { + maxEvents: 10, + windowMs: 0, + error: 'windowMs must be a positive number', + }, + { + maxEvents: 10, + windowMs: -1, + error: 'windowMs must be a positive number', + }, + ])( + 'throws "$error" when maxEvents=$maxEvents and windowMs=$windowMs', + ({ maxEvents, windowMs, error }) => { + expect(() => new SlidingWindowRateLimiter(maxEvents, windowMs)).toThrow( + error, + ); + }, + ); + + it('accepts valid positive values', () => { + expect(() => new SlidingWindowRateLimiter(1, 1)).not.toThrow(); + }); + }); + describe('wouldExceedLimit', () => { it('returns false when no events recorded', () => { expect(limiter.wouldExceedLimit('peer1')).toBe(false); diff --git a/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts b/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts index b3e79a19c..cf732934f 100644 --- a/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts +++ b/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts @@ -23,8 +23,15 @@ export class SlidingWindowRateLimiter { * * @param maxEvents - Maximum number of events allowed within the window. * @param windowMs - Window size in milliseconds. + * @throws Error if maxEvents or windowMs is not a positive number. */ constructor(maxEvents: number, windowMs: number) { + if (maxEvents <= 0) { + throw new Error('maxEvents must be a positive number'); + } + if (windowMs <= 0) { + throw new Error('windowMs must be a positive number'); + } this.#maxEvents = maxEvents; this.#windowMs = windowMs; this.#timestamps = new Map(); @@ -85,11 +92,8 @@ export class SlidingWindowRateLimiter { key: string, limitType: 'messageRate' | 'connectionRate', ): void { - if (this.wouldExceedLimit(key)) { - const timestamps = this.#timestamps.get(key) ?? []; - const cutoff = Date.now() - this.#windowMs; - const currentCount = timestamps.filter((ts) => ts > cutoff).length; - + const currentCount = this.getCurrentCount(key); + if (currentCount >= this.#maxEvents) { throw new ResourceLimitError( `Rate limit exceeded: ${currentCount}/${this.#maxEvents} ${limitType} in ${this.#windowMs}ms window`, { From 63ee9820969479e0e61c291107d146fa354be6f9 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Wed, 28 Jan 2026 18:33:04 +0100 Subject: [PATCH 11/24] fix(transport): distinguish rate limit from connection limit errors The ResourceLimitError catch block incorrectly treated all such errors as rate limit errors where "no actual dial was performed." However, checkConnectionLimit() throws with limitType: 'connection' AFTER dial succeeds. This caused: 1. decrementAttempt incorrectly called (dial was performed, should count) 2. Log message incorrectly said "rate limited" 3. Dialed channel leaked since never closed or registered Fix: - Check error.data.limitType to distinguish 'connectionRate' (before dial) from 'connection' (after dial) - Only decrement attempt count for rate limit errors (connectionRate) - Add closeChannel dependency to close leaked channels - Wrap checkConnectionLimit in try/catch to close channel on error Co-Authored-By: Claude Opus 4.5 --- .../platform/reconnection-lifecycle.test.ts | 47 +++++++++++++++++++ .../platform/reconnection-lifecycle.ts | 24 +++++++--- .../src/remotes/platform/transport.ts | 2 + 3 files changed, 67 insertions(+), 6 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts index 9d1d72cb7..d3265b034 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts @@ -81,6 +81,7 @@ describe('reconnection-lifecycle', () => { reuseOrReturnChannel: vi.fn().mockResolvedValue(mockChannel), checkConnectionLimit: vi.fn(), checkConnectionRateLimit: vi.fn(), + closeChannel: vi.fn().mockResolvedValue(undefined), registerChannel: vi.fn(), } as unknown as ReconnectionLifecycleDeps; }); @@ -394,6 +395,52 @@ describe('reconnection-lifecycle', () => { ); }); + it('does not decrement attempt count for connection limit errors', async () => { + const { ResourceLimitError } = kernelErrors; + ( + deps.checkConnectionLimit as ReturnType + ).mockImplementationOnce(() => { + throw new ResourceLimitError('Connection limit exceeded', { + data: { limitType: 'connection', current: 100, limit: 100 }, + }); + }); + + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) // First attempt - connection limit + .mockReturnValueOnce(true) // Second attempt - success + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + // Should NOT decrement attempt count because dial was performed + expect(deps.reconnectionManager.decrementAttempt).not.toHaveBeenCalled(); + }); + + it('closes channel when connection limit is exceeded after dial', async () => { + const { ResourceLimitError } = kernelErrors; + ( + deps.checkConnectionLimit as ReturnType + ).mockImplementationOnce(() => { + throw new ResourceLimitError('Connection limit exceeded', { + data: { limitType: 'connection', current: 100, limit: 100 }, + }); + }); + + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) // First attempt - connection limit + .mockReturnValueOnce(true) // Second attempt - success + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + // Should close the channel to prevent resource leak + expect(deps.closeChannel).toHaveBeenCalledWith(mockChannel, 'peer1'); + }); + it('logs reconnection attempts', async () => { (deps.reconnectionManager.isReconnecting as ReturnType) .mockReturnValueOnce(true) diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts index 2f3fc28ea..c14231425 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts @@ -31,6 +31,7 @@ export type ReconnectionLifecycleDeps = { ) => Promise; checkConnectionLimit: () => void; checkConnectionRateLimit: (peerId: string) => void; + closeChannel: (channel: Channel, peerId: string) => Promise; registerChannel: ( peerId: string, channel: Channel, @@ -67,6 +68,7 @@ export function makeReconnectionLifecycle( reuseOrReturnChannel, checkConnectionLimit, checkConnectionRateLimit, + closeChannel, registerChannel, } = deps; @@ -129,17 +131,21 @@ export function makeReconnectionLifecycle( reconnectionManager.stopReconnection(peerId); return; } - // Rate limit errors are temporary - skip this attempt but continue the loop - // The backoff delay will naturally space out attempts - if (problem instanceof ResourceLimitError) { - // Don't count rate-limited attempts against the retry quota since - // no actual dial was performed + // Handle rate limit errors (connectionRate) - these are temporary and + // occur before any dial was performed, so don't count against retry quota + if ( + problem instanceof ResourceLimitError && + (problem.data as { limitType?: string } | undefined)?.limitType === + 'connectionRate' + ) { reconnectionManager.decrementAttempt(peerId); logger.log( `${peerId}:: reconnection attempt ${nextAttempt} rate limited, will retry after backoff`, ); continue; } + // Connection limit errors (limitType: 'connection') occur after dial - + // the attempt counts and channel cleanup is handled in tryReconnect if (!isRetryableNetworkError(problem)) { outputError(peerId, `non-retryable failure`, problem); reconnectionManager.stopReconnection(peerId); @@ -181,7 +187,13 @@ export function makeReconnectionLifecycle( // Re-check connection limit and register if this is a new channel if (state.channel !== channel) { - checkConnectionLimit(); + try { + checkConnectionLimit(); + } catch (error) { + // Connection limit exceeded after dial - close the channel to prevent leak + await closeChannel(channel, peerId); + throw error; + } registerChannel(peerId, channel, 'reading channel to'); } diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index 464016ab3..84bc3ba19 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -318,6 +318,8 @@ export async function initTransport( checkConnectionLimit, checkConnectionRateLimit: (peerId: string) => connectionRateLimiter.checkAndRecord(peerId, 'connectionRate'), + closeChannel: async (channel, peerId) => + connectionFactory.closeChannel(channel, peerId), registerChannel, }); reconnectionHolder.handleConnectionLoss = From 621ecd6aa1c251a0171fd3e8bc4ab9044c353d35 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Wed, 28 Jan 2026 18:48:02 +0100 Subject: [PATCH 12/24] refactor(kernel-errors): add ResourceLimitType and ResourceLimitErrorData types Extract reusable types from ResourceLimitError to improve type safety and reduce inline type casts when checking error data. - Add ResourceLimitType union type for limit types - Add ResourceLimitErrorData type for error data structure - Export both types from kernel-errors package - Use ResourceLimitErrorData in reconnection-lifecycle for cleaner code Co-Authored-By: Claude Opus 4.5 --- .../src/errors/ResourceLimitError.ts | 40 +++++++++---------- packages/kernel-errors/src/index.ts | 6 ++- .../platform/reconnection-lifecycle.ts | 3 +- 3 files changed, 27 insertions(+), 22 deletions(-) diff --git a/packages/kernel-errors/src/errors/ResourceLimitError.ts b/packages/kernel-errors/src/errors/ResourceLimitError.ts index d5cc5f39f..62dfd4536 100644 --- a/packages/kernel-errors/src/errors/ResourceLimitError.ts +++ b/packages/kernel-errors/src/errors/ResourceLimitError.ts @@ -11,6 +11,24 @@ import { BaseError } from '../BaseError.ts'; import { marshaledErrorSchema, ErrorCode } from '../constants.ts'; import type { ErrorOptionsWithStack, MarshaledOcapError } from '../types.ts'; +/** + * The type of resource limit that was exceeded. + */ +export type ResourceLimitType = + | 'connection' + | 'messageSize' + | 'messageRate' + | 'connectionRate'; + +/** + * Data associated with a ResourceLimitError. + */ +export type ResourceLimitErrorData = { + limitType?: ResourceLimitType; + current?: number; + limit?: number; +}; + /** * Error indicating a resource limit was exceeded. */ @@ -28,15 +46,7 @@ export class ResourceLimitError extends BaseError { constructor( message: string, options?: ErrorOptionsWithStack & { - data?: { - limitType?: - | 'connection' - | 'messageSize' - | 'messageRate' - | 'connectionRate'; - current?: number; - limit?: number; - }; + data?: ResourceLimitErrorData; }, ) { super(ErrorCode.ResourceLimitError, message, { @@ -82,17 +92,7 @@ export class ResourceLimitError extends BaseError { ): ResourceLimitError { assert(marshaledError, this.struct); const options = unmarshalErrorOptions(marshaledError); - const data = marshaledError.data as - | { - limitType?: - | 'connection' - | 'messageSize' - | 'messageRate' - | 'connectionRate'; - current?: number; - limit?: number; - } - | undefined; + const data = marshaledError.data as ResourceLimitErrorData | undefined; return new ResourceLimitError(marshaledError.message, { ...options, ...(data !== undefined && { data }), diff --git a/packages/kernel-errors/src/index.ts b/packages/kernel-errors/src/index.ts index 9f3e40694..cd7ffc048 100644 --- a/packages/kernel-errors/src/index.ts +++ b/packages/kernel-errors/src/index.ts @@ -8,7 +8,11 @@ export { VatNotFoundError } from './errors/VatNotFoundError.ts'; export { StreamReadError } from './errors/StreamReadError.ts'; export { SubclusterNotFoundError } from './errors/SubclusterNotFoundError.ts'; export { AbortError } from './errors/AbortError.ts'; -export { ResourceLimitError } from './errors/ResourceLimitError.ts'; +export { + ResourceLimitError, + type ResourceLimitType, + type ResourceLimitErrorData, +} from './errors/ResourceLimitError.ts'; export { ErrorCode, ErrorSentinel, diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts index c14231425..67b8b5d3c 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts @@ -2,6 +2,7 @@ import { isRetryableNetworkError, ResourceLimitError, } from '@metamask/kernel-errors'; +import type { ResourceLimitErrorData } from '@metamask/kernel-errors'; import { abortableDelay, DEFAULT_MAX_RETRY_ATTEMPTS, @@ -135,7 +136,7 @@ export function makeReconnectionLifecycle( // occur before any dial was performed, so don't count against retry quota if ( problem instanceof ResourceLimitError && - (problem.data as { limitType?: string } | undefined)?.limitType === + (problem.data as ResourceLimitErrorData | undefined)?.limitType === 'connectionRate' ) { reconnectionManager.decrementAttempt(peerId); From 9d29f68aafe4b6fd035f40f0aeec5360ae2a0b61 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Wed, 28 Jan 2026 18:50:33 +0100 Subject: [PATCH 13/24] fix(transport): reject NaN and Infinity in rate limiter constructor The validation `maxEvents <= 0` passes for NaN and Infinity since both comparisons return false. This silently disables rate limiting entirely as `currentCount >= NaN` and `currentCount >= Infinity` are always false. Fix by using Number.isFinite() which rejects NaN, Infinity, and -Infinity, ensuring rate limiting cannot be bypassed via faulty configuration parsing. Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/platform/rate-limiter.test.ts | 28 ++++++++++++++++--- .../src/remotes/platform/rate-limiter.ts | 10 +++---- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts b/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts index 7f53b5a55..c28466a29 100644 --- a/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts @@ -26,22 +26,42 @@ describe('SlidingWindowRateLimiter', () => { { maxEvents: 0, windowMs: 100, - error: 'maxEvents must be a positive number', + error: 'maxEvents must be a positive finite number', }, { maxEvents: -1, windowMs: 100, - error: 'maxEvents must be a positive number', + error: 'maxEvents must be a positive finite number', + }, + { + maxEvents: NaN, + windowMs: 100, + error: 'maxEvents must be a positive finite number', + }, + { + maxEvents: Infinity, + windowMs: 100, + error: 'maxEvents must be a positive finite number', }, { maxEvents: 10, windowMs: 0, - error: 'windowMs must be a positive number', + error: 'windowMs must be a positive finite number', }, { maxEvents: 10, windowMs: -1, - error: 'windowMs must be a positive number', + error: 'windowMs must be a positive finite number', + }, + { + maxEvents: 10, + windowMs: NaN, + error: 'windowMs must be a positive finite number', + }, + { + maxEvents: 10, + windowMs: Infinity, + error: 'windowMs must be a positive finite number', }, ])( 'throws "$error" when maxEvents=$maxEvents and windowMs=$windowMs', diff --git a/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts b/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts index cf732934f..68bfe9617 100644 --- a/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts +++ b/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts @@ -23,14 +23,14 @@ export class SlidingWindowRateLimiter { * * @param maxEvents - Maximum number of events allowed within the window. * @param windowMs - Window size in milliseconds. - * @throws Error if maxEvents or windowMs is not a positive number. + * @throws Error if maxEvents or windowMs is not a positive finite number. */ constructor(maxEvents: number, windowMs: number) { - if (maxEvents <= 0) { - throw new Error('maxEvents must be a positive number'); + if (!Number.isFinite(maxEvents) || maxEvents <= 0) { + throw new Error('maxEvents must be a positive finite number'); } - if (windowMs <= 0) { - throw new Error('windowMs must be a positive number'); + if (!Number.isFinite(windowMs) || windowMs <= 0) { + throw new Error('windowMs must be a positive finite number'); } this.#maxEvents = maxEvents; this.#windowMs = windowMs; From 61ca207e6fc9bae11869c01795c5754e4beae4f8 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Wed, 28 Jan 2026 23:45:56 +0100 Subject: [PATCH 14/24] fix(transport): handle connection limit errors as retryable during reconnection Connection limit errors (ResourceLimitError with limitType: 'connection') were falling through to isRetryableNetworkError(), which doesn't recognize them, causing permanent reconnection failure via onRemoteGiveUp. Added explicit handling to continue the retry loop for connection limit errors. Co-Authored-By: Claude Opus 4.5 --- .../platform/reconnection-lifecycle.test.ts | 36 +++++++++++++++++++ .../platform/reconnection-lifecycle.ts | 10 ++++++ 2 files changed, 46 insertions(+) diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts index d3265b034..db0a3f0f5 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts @@ -418,6 +418,42 @@ describe('reconnection-lifecycle', () => { expect(deps.reconnectionManager.decrementAttempt).not.toHaveBeenCalled(); }); + it('retries connection limit errors without calling isRetryableNetworkError', async () => { + const { ResourceLimitError } = kernelErrors; + // Mock isRetryableNetworkError to return false - connection limit errors + // should be retried regardless via explicit handling + ( + kernelErrors.isRetryableNetworkError as ReturnType + ).mockReturnValue(false); + + ( + deps.checkConnectionLimit as ReturnType + ).mockImplementationOnce(() => { + throw new ResourceLimitError('Connection limit exceeded', { + data: { limitType: 'connection', current: 100, limit: 100 }, + }); + }); + + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) // First attempt - connection limit + .mockReturnValueOnce(true) // Second attempt - success + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + // Should NOT call onRemoteGiveUp - connection limit errors are retryable + expect(deps.onRemoteGiveUp).not.toHaveBeenCalled(); + // Should have retried and succeeded + expect(deps.reconnectionManager.resetBackoff).toHaveBeenCalledWith( + 'peer1', + ); + expect(deps.logger.log).toHaveBeenCalledWith( + expect.stringContaining('hit connection limit'), + ); + }); + it('closes channel when connection limit is exceeded after dial', async () => { const { ResourceLimitError } = kernelErrors; ( diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts index 67b8b5d3c..a8b60cfa9 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts @@ -147,6 +147,16 @@ export function makeReconnectionLifecycle( } // Connection limit errors (limitType: 'connection') occur after dial - // the attempt counts and channel cleanup is handled in tryReconnect + if ( + problem instanceof ResourceLimitError && + (problem.data as ResourceLimitErrorData | undefined)?.limitType === + 'connection' + ) { + logger.log( + `${peerId}:: reconnection attempt ${nextAttempt} hit connection limit, will retry after backoff`, + ); + continue; + } if (!isRetryableNetworkError(problem)) { outputError(peerId, `non-retryable failure`, problem); reconnectionManager.stopReconnection(peerId); From 15dfc1ebcf5c6e2eebd73024d002cd962f21cb52 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Wed, 28 Jan 2026 23:53:52 +0100 Subject: [PATCH 15/24] refactor(kernel-errors): add isResourceLimitError type guard Extracts the ResourceLimitError check pattern into a reusable type guard function that optionally checks for a specific limitType. This simplifies the error handling code in reconnection-lifecycle.ts. Co-Authored-By: Claude Opus 4.5 --- packages/kernel-errors/src/index.ts | 1 + .../src/utils/isResourceLimitError.test.ts | 80 +++++++++++++++++++ .../src/utils/isResourceLimitError.ts | 28 +++++++ .../platform/reconnection-lifecycle.ts | 15 +--- 4 files changed, 112 insertions(+), 12 deletions(-) create mode 100644 packages/kernel-errors/src/utils/isResourceLimitError.test.ts create mode 100644 packages/kernel-errors/src/utils/isResourceLimitError.ts diff --git a/packages/kernel-errors/src/index.ts b/packages/kernel-errors/src/index.ts index cd7ffc048..4f27db9eb 100644 --- a/packages/kernel-errors/src/index.ts +++ b/packages/kernel-errors/src/index.ts @@ -27,3 +27,4 @@ export { unmarshalError } from './marshal/unmarshalError.ts'; export { isMarshaledError } from './marshal/isMarshaledError.ts'; export { isMarshaledOcapError } from './marshal/isMarshaledOcapError.ts'; export { isRetryableNetworkError } from './utils/isRetryableNetworkError.ts'; +export { isResourceLimitError } from './utils/isResourceLimitError.ts'; diff --git a/packages/kernel-errors/src/utils/isResourceLimitError.test.ts b/packages/kernel-errors/src/utils/isResourceLimitError.test.ts new file mode 100644 index 000000000..af7dd9fb4 --- /dev/null +++ b/packages/kernel-errors/src/utils/isResourceLimitError.test.ts @@ -0,0 +1,80 @@ +import { describe, it, expect } from 'vitest'; + +import { isResourceLimitError } from './isResourceLimitError.ts'; +import { ResourceLimitError } from '../errors/ResourceLimitError.ts'; + +describe('isResourceLimitError', () => { + describe('without limitType parameter', () => { + it('returns true for ResourceLimitError', () => { + const error = new ResourceLimitError('limit exceeded'); + expect(isResourceLimitError(error)).toBe(true); + }); + + it('returns true for ResourceLimitError with any limitType', () => { + const connectionError = new ResourceLimitError('connection limit', { + data: { limitType: 'connection' }, + }); + const rateError = new ResourceLimitError('rate limit', { + data: { limitType: 'connectionRate' }, + }); + + expect(isResourceLimitError(connectionError)).toBe(true); + expect(isResourceLimitError(rateError)).toBe(true); + }); + + it('returns false for regular Error', () => { + const error = new Error('some error'); + expect(isResourceLimitError(error)).toBe(false); + }); + + it('returns false for null', () => { + expect(isResourceLimitError(null)).toBe(false); + }); + + it('returns false for undefined', () => { + expect(isResourceLimitError(undefined)).toBe(false); + }); + + it('returns false for non-error objects', () => { + expect(isResourceLimitError({ message: 'fake error' })).toBe(false); + }); + }); + + describe('with limitType parameter', () => { + it('returns true when limitType matches', () => { + const error = new ResourceLimitError('connection limit', { + data: { limitType: 'connection' }, + }); + expect(isResourceLimitError(error, 'connection')).toBe(true); + }); + + it('returns false when limitType does not match', () => { + const error = new ResourceLimitError('connection limit', { + data: { limitType: 'connection' }, + }); + expect(isResourceLimitError(error, 'connectionRate')).toBe(false); + }); + + it('returns false when error has no limitType', () => { + const error = new ResourceLimitError('limit exceeded'); + expect(isResourceLimitError(error, 'connection')).toBe(false); + }); + + it('returns false for non-ResourceLimitError even with matching-like data', () => { + const error = new Error('some error'); + expect(isResourceLimitError(error, 'connection')).toBe(false); + }); + + it.each([ + 'connection', + 'connectionRate', + 'messageSize', + 'messageRate', + ] as const)('correctly identifies %s limitType', (limitType) => { + const error = new ResourceLimitError('limit exceeded', { + data: { limitType }, + }); + expect(isResourceLimitError(error, limitType)).toBe(true); + }); + }); +}); diff --git a/packages/kernel-errors/src/utils/isResourceLimitError.ts b/packages/kernel-errors/src/utils/isResourceLimitError.ts new file mode 100644 index 000000000..486411454 --- /dev/null +++ b/packages/kernel-errors/src/utils/isResourceLimitError.ts @@ -0,0 +1,28 @@ +import { ResourceLimitError } from '../errors/ResourceLimitError.ts'; +import type { + ResourceLimitType, + ResourceLimitErrorData, +} from '../errors/ResourceLimitError.ts'; + +/** + * Check if an error is a ResourceLimitError, optionally with a specific limit type. + * + * @param error - The error to check. + * @param limitType - Optional limit type to match against. + * @returns True if the error is a ResourceLimitError (with matching limitType if specified). + */ +export function isResourceLimitError( + error: unknown, + limitType?: ResourceLimitType, +): error is ResourceLimitError { + if (!(error instanceof ResourceLimitError)) { + return false; + } + + if (limitType === undefined) { + return true; + } + + const data = error.data as ResourceLimitErrorData | undefined; + return data?.limitType === limitType; +} diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts index a8b60cfa9..ceb700208 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts @@ -1,8 +1,7 @@ import { isRetryableNetworkError, - ResourceLimitError, + isResourceLimitError, } from '@metamask/kernel-errors'; -import type { ResourceLimitErrorData } from '@metamask/kernel-errors'; import { abortableDelay, DEFAULT_MAX_RETRY_ATTEMPTS, @@ -134,11 +133,7 @@ export function makeReconnectionLifecycle( } // Handle rate limit errors (connectionRate) - these are temporary and // occur before any dial was performed, so don't count against retry quota - if ( - problem instanceof ResourceLimitError && - (problem.data as ResourceLimitErrorData | undefined)?.limitType === - 'connectionRate' - ) { + if (isResourceLimitError(problem, 'connectionRate')) { reconnectionManager.decrementAttempt(peerId); logger.log( `${peerId}:: reconnection attempt ${nextAttempt} rate limited, will retry after backoff`, @@ -147,11 +142,7 @@ export function makeReconnectionLifecycle( } // Connection limit errors (limitType: 'connection') occur after dial - // the attempt counts and channel cleanup is handled in tryReconnect - if ( - problem instanceof ResourceLimitError && - (problem.data as ResourceLimitErrorData | undefined)?.limitType === - 'connection' - ) { + if (isResourceLimitError(problem, 'connection')) { logger.log( `${peerId}:: reconnection attempt ${nextAttempt} hit connection limit, will retry after backoff`, ); From bf458d0bafa2ea7a7fbc9e78908a2d34ed452eb5 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Thu, 29 Jan 2026 12:39:34 +0100 Subject: [PATCH 16/24] fix(transport): preserve ResourceLimitError when closeChannel fails When checkConnectionLimit() throws during reconnection, the code closes the channel before re-throwing. If closeChannel itself throws, that error would propagate instead of the original ResourceLimitError, causing reconnection to give up prematurely via onRemoteGiveUp. Wrap closeChannel in try-catch to ensure the original error is always re-thrown regardless of cleanup success. Co-Authored-By: Claude Opus 4.5 --- .../platform/reconnection-lifecycle.test.ts | 39 +++++++++++++++++++ .../platform/reconnection-lifecycle.ts | 7 +++- 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts index db0a3f0f5..ab55e59c5 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts @@ -477,6 +477,45 @@ describe('reconnection-lifecycle', () => { expect(deps.closeChannel).toHaveBeenCalledWith(mockChannel, 'peer1'); }); + it('propagates ResourceLimitError even when closeChannel fails', async () => { + const { ResourceLimitError } = kernelErrors; + // Mock isRetryableNetworkError to return false - we want to verify the + // ResourceLimitError is still correctly identified + ( + kernelErrors.isRetryableNetworkError as ReturnType + ).mockReturnValue(false); + + ( + deps.checkConnectionLimit as ReturnType + ).mockImplementationOnce(() => { + throw new ResourceLimitError('Connection limit exceeded', { + data: { limitType: 'connection', current: 100, limit: 100 }, + }); + }); + + // closeChannel throws an error + (deps.closeChannel as ReturnType).mockRejectedValueOnce( + new Error('Channel already closed'), + ); + + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) // First attempt - connection limit + close error + .mockReturnValueOnce(true) // Second attempt - success + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + // Should NOT call onRemoteGiveUp - the original ResourceLimitError should + // be preserved even though closeChannel failed + expect(deps.onRemoteGiveUp).not.toHaveBeenCalled(); + // Should have retried and succeeded + expect(deps.reconnectionManager.resetBackoff).toHaveBeenCalledWith( + 'peer1', + ); + }); + it('logs reconnection attempts', async () => { (deps.reconnectionManager.isReconnecting as ReturnType) .mockReturnValueOnce(true) diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts index ceb700208..a780c51f9 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts @@ -193,7 +193,12 @@ export function makeReconnectionLifecycle( checkConnectionLimit(); } catch (error) { // Connection limit exceeded after dial - close the channel to prevent leak - await closeChannel(channel, peerId); + // Use try-catch to ensure the original error is always re-thrown + try { + await closeChannel(channel, peerId); + } catch { + // Ignore close errors - the original ResourceLimitError takes priority + } throw error; } registerChannel(peerId, channel, 'reading channel to'); From c8a17681006cb06d743956f9b113be6a6480cde7 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Thu, 29 Jan 2026 12:42:40 +0100 Subject: [PATCH 17/24] refactor(transport): deduplicate wouldExceedLimit by reusing getCurrentCount Co-Authored-By: Claude Opus 4.5 --- .../ocap-kernel/src/remotes/platform/rate-limiter.ts | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts b/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts index 68bfe9617..c86f3b322 100644 --- a/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts +++ b/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts @@ -45,17 +45,7 @@ export class SlidingWindowRateLimiter { * @returns True if the event would exceed the rate limit. */ wouldExceedLimit(key: string): boolean { - const now = Date.now(); - const cutoff = now - this.#windowMs; - const timestamps = this.#timestamps.get(key); - - if (!timestamps) { - return false; - } - - // Count events within the window - const recentCount = timestamps.filter((ts) => ts > cutoff).length; - return recentCount >= this.#maxEvents; + return this.getCurrentCount(key) >= this.#maxEvents; } /** From c4950a0f2b3d340ff648d2d8b611ec3b1dbc4d12 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Thu, 29 Jan 2026 13:51:51 +0100 Subject: [PATCH 18/24] fix(kernel-errors): add isResourceLimitError to exports test Co-Authored-By: Claude Opus 4.5 --- packages/kernel-errors/src/index.test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/kernel-errors/src/index.test.ts b/packages/kernel-errors/src/index.test.ts index 96a79688c..be7911cd7 100644 --- a/packages/kernel-errors/src/index.test.ts +++ b/packages/kernel-errors/src/index.test.ts @@ -23,6 +23,7 @@ describe('index', () => { 'isMarshaledError', 'isMarshaledOcapError', 'isOcapError', + 'isResourceLimitError', 'isRetryableNetworkError', 'marshalError', 'toError', From 0242e6ebbbafc4acc79b635d45d684cd52b9b138 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Thu, 29 Jan 2026 14:02:32 +0100 Subject: [PATCH 19/24] fix(transport): add missing isResourceLimitError mock to tests The transport tests were failing because the mock for @metamask/kernel-errors did not include isResourceLimitError, which was recently added. When the reconnection lifecycle code tried to call this function, it failed with undefined is not a function, causing the reconnection loop to crash. Co-Authored-By: Claude Opus 4.5 --- packages/ocap-kernel/src/remotes/platform/transport.test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/ocap-kernel/src/remotes/platform/transport.test.ts b/packages/ocap-kernel/src/remotes/platform/transport.test.ts index 825f84413..4298a1d01 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.test.ts @@ -128,6 +128,7 @@ vi.mock('@metamask/kernel-errors', () => ({ errorWithCode?.code === 'ETIMEDOUT' ); }), + isResourceLimitError: vi.fn().mockReturnValue(false), })); // Mock uint8arrays From f93baba4f10a1538728c59e9b891f32e06cb1c8b Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Thu, 29 Jan 2026 14:50:34 +0100 Subject: [PATCH 20/24] refactor(transport): extract closeRejectedChannel helper Extract duplicated channel closing logic in the inbound connection handler into a dedicated helper function to reduce duplication and ensure consistent error handling. Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/platform/transport.ts | 33 ++++++++----------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index 84bc3ba19..64a439449 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -424,6 +424,17 @@ export async function initTransport( reconnectionManager.resetAllBackoffs(); } + /** + * Close a rejected inbound channel, logging any errors. + * + * @param channel - The channel to close. + */ + function closeRejectedChannel(channel: Channel): void { + connectionFactory.closeChannel(channel, channel.peerId).catch((problem) => { + outputError(channel.peerId, 'closing rejected inbound channel', problem); + }); + } + // Set up inbound connection handler connectionFactory.onInboundConnection((channel) => { // Reject inbound connections from intentionally closed peers @@ -431,16 +442,7 @@ export async function initTransport( logger.log( `${channel.peerId}:: rejecting inbound connection from intentionally closed peer`, ); - // Close the channel to release network resources and prevent leaks - connectionFactory - .closeChannel(channel, channel.peerId) - .catch((problem) => { - outputError( - channel.peerId, - 'closing rejected inbound channel', - problem, - ); - }); + closeRejectedChannel(channel); return; } @@ -452,16 +454,7 @@ export async function initTransport( logger.log( `${channel.peerId}:: rejecting inbound connection due to connection limit`, ); - // Close the channel to release network resources and prevent leaks - connectionFactory - .closeChannel(channel, channel.peerId) - .catch((problem) => { - outputError( - channel.peerId, - 'closing rejected inbound channel', - problem, - ); - }); + closeRejectedChannel(channel); return; } throw error; From fe715d8761c1ef5c363ebfd22c4f575f626b4bc9 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Thu, 29 Jan 2026 19:55:58 +0100 Subject: [PATCH 21/24] test(reconnection-lifecycle): add explicit call count assertions Make test assertions more precise by verifying exact call counts for checkConnectionLimit and checkConnectionRateLimit. In a single successful reconnection attempt, each should be called exactly once. Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/platform/reconnection-lifecycle.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts index ab55e59c5..ba540a8c5 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts @@ -321,7 +321,7 @@ describe('reconnection-lifecycle', () => { await lifecycle.attemptReconnection('peer1'); - expect(deps.checkConnectionLimit).toHaveBeenCalled(); + expect(deps.checkConnectionLimit).toHaveBeenCalledTimes(1); }); it('checks connection rate limit before dialing', async () => { @@ -333,6 +333,7 @@ describe('reconnection-lifecycle', () => { await lifecycle.attemptReconnection('peer1'); + expect(deps.checkConnectionRateLimit).toHaveBeenCalledTimes(1); expect(deps.checkConnectionRateLimit).toHaveBeenCalledWith('peer1'); }); From f215020f5e7fcf1037b74a5989f4beb4611f17cc Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Thu, 29 Jan 2026 19:56:26 +0100 Subject: [PATCH 22/24] simplify --- .../src/remotes/platform/rate-limiter.test.ts | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts b/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts index c28466a29..f5e0df56a 100644 --- a/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts @@ -3,7 +3,6 @@ import { describe, it, expect, beforeEach } from 'vitest'; import { DEFAULT_CONNECTION_RATE_LIMIT, - DEFAULT_CONNECTION_RATE_WINDOW_MS, DEFAULT_MESSAGE_RATE_LIMIT, DEFAULT_MESSAGE_RATE_WINDOW_MS, } from './constants.ts'; @@ -339,12 +338,3 @@ describe('makeConnectionRateLimiter', () => { // Skip window expiration test for connection limiter as it would take 60 seconds }); - -describe('constants', () => { - it('exports expected default values', () => { - expect(DEFAULT_MESSAGE_RATE_LIMIT).toBe(100); - expect(DEFAULT_MESSAGE_RATE_WINDOW_MS).toBe(1000); - expect(DEFAULT_CONNECTION_RATE_LIMIT).toBe(10); - expect(DEFAULT_CONNECTION_RATE_WINDOW_MS).toBe(60_000); - }); -}); From 6be75fa5445709ccd25a0fe323e4aac425d94d50 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Thu, 29 Jan 2026 20:01:45 +0100 Subject: [PATCH 23/24] test(reconnection-lifecycle): add flushPromises to prevent test flakiness Add a flushPromises helper and use it in handleConnectionLoss tests that trigger fire-and-forget async reconnection work. This ensures all pending microtasks complete before assertions run, preventing potential flakiness from async operations bleeding into subsequent tests. Co-Authored-By: Claude Opus 4.5 --- .../remotes/platform/reconnection-lifecycle.test.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts index ba540a8c5..3653992ee 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts @@ -28,6 +28,10 @@ vi.mock('@metamask/kernel-errors', async () => { }; }); +// Helper to flush pending promises/microtasks +const flushPromises = async (): Promise => + new Promise((resolve) => setImmediate(resolve)); + describe('reconnection-lifecycle', () => { let deps: ReconnectionLifecycleDeps; let abortController: AbortController; @@ -122,13 +126,14 @@ describe('reconnection-lifecycle', () => { expect(state.channel).toBeUndefined(); }); - it('starts reconnection if not already reconnecting', () => { + it('starts reconnection if not already reconnecting', async () => { ( deps.reconnectionManager.isReconnecting as ReturnType ).mockReturnValue(false); const lifecycle = makeReconnectionLifecycle(deps); lifecycle.handleConnectionLoss('peer1'); + await flushPromises(); expect(deps.reconnectionManager.startReconnection).toHaveBeenCalledWith( 'peer1', @@ -146,13 +151,14 @@ describe('reconnection-lifecycle', () => { expect(deps.reconnectionManager.startReconnection).not.toHaveBeenCalled(); }); - it('logs connection loss message', () => { + it('logs connection loss message', async () => { ( deps.reconnectionManager.isReconnecting as ReturnType ).mockReturnValue(false); const lifecycle = makeReconnectionLifecycle(deps); lifecycle.handleConnectionLoss('peer1'); + await flushPromises(); expect(deps.logger.log).toHaveBeenCalledWith( 'peer1:: connection lost, initiating reconnection', From 31427c3430e4a8c3385f006187b501c64cf2d51b Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Thu, 29 Jan 2026 20:59:24 +0100 Subject: [PATCH 24/24] fix(transport): close channel when connection limit exceeded after dial When checkConnectionLimit() throws after a successful dial due to a race condition (another connection was established during dial), the dialed channel was not being closed, causing a resource leak. Apply the same pattern used in reconnection-lifecycle.ts: wrap the post-dial checkConnectionLimit() call in its own try-catch to close the channel before rethrowing the error. Added a test that simulates this race condition by having two concurrent sends where the second completes while the first is still dialing. Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/platform/transport.test.ts | 61 +++++++++++++++++++ .../src/remotes/platform/transport.ts | 17 ++++-- 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/platform/transport.test.ts b/packages/ocap-kernel/src/remotes/platform/transport.test.ts index 4298a1d01..a96724427 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.test.ts @@ -1827,6 +1827,67 @@ describe('transport.initTransport', () => { await sendRemoteMessage('peer-1', makeTestMessage('msg')); expect(mockChannel.msgStream.write).toHaveBeenCalled(); }); + + it('closes channel when connection limit exceeded after dial due to race condition', async () => { + const maxConcurrentConnections = 1; + const dialedChannels: MockChannel[] = []; + + // Track when dial completes so we can inject a race condition + let resolveFirstDial: ((channel: MockChannel) => void) | undefined; + const firstDialPromise = new Promise((resolve) => { + resolveFirstDial = resolve; + }); + + mockConnectionFactory.dialIdempotent.mockImplementation( + async (peerId: string) => { + const channel = createMockChannel(peerId); + dialedChannels.push(channel); + + if (peerId === 'peer-1') { + // First dial waits to be resolved manually + return firstDialPromise; + } + // Second dial completes immediately + return channel; + }, + ); + + const { sendRemoteMessage } = await initTransport( + '0x1234', + { maxConcurrentConnections }, + vi.fn(), + ); + + // Start first send (will wait at dial) + const firstSendPromise = sendRemoteMessage( + 'peer-1', + makeTestMessage('msg1'), + ); + + // Wait for first dial to start + await vi.waitFor(() => { + expect(dialedChannels).toHaveLength(1); + }); + + // Start and complete second send while first is still dialing + // This establishes a connection, filling the limit + await sendRemoteMessage('peer-2', makeTestMessage('msg2')); + + // Now complete the first dial - post-dial check should fail + // because we're now at the connection limit + resolveFirstDial?.(dialedChannels[0] as MockChannel); + + // First send should fail with connection limit error + await expect(firstSendPromise).rejects.toThrow( + /Connection limit reached/u, + ); + + // The channel from the first dial should have been closed to prevent leak + expect(mockConnectionFactory.closeChannel).toHaveBeenCalledWith( + dialedChannels[0], + 'peer-1', + ); + }); }); describe('stale peer cleanup', () => { diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index 64a439449..e57a062ed 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -387,14 +387,21 @@ export async function initTransport( // Re-check connection limit after reuseOrReturnChannel to prevent race conditions if (state.channel !== channel) { - checkConnectionLimit(); + try { + checkConnectionLimit(); + } catch (error) { + // Connection limit exceeded after dial - close the channel to prevent leak + // Use try-catch to ensure the original error is always re-thrown + try { + await connectionFactory.closeChannel(channel, targetPeerId); + } catch { + // Ignore close errors - the original ResourceLimitError takes priority + } + throw error; + } registerChannel(targetPeerId, channel, 'reading channel to'); } } catch (problem) { - // Re-throw ResourceLimitError to propagate to caller - if (problem instanceof ResourceLimitError) { - throw problem; - } outputError(targetPeerId, `opening connection`, problem); handleConnectionLoss(targetPeerId); throw problem;