diff --git a/packages/kernel-errors/src/errors/ResourceLimitError.test.ts b/packages/kernel-errors/src/errors/ResourceLimitError.test.ts index ae2bf6da4..a37f3f57d 100644 --- a/packages/kernel-errors/src/errors/ResourceLimitError.test.ts +++ b/packages/kernel-errors/src/errors/ResourceLimitError.test.ts @@ -167,7 +167,7 @@ describe('ResourceLimitError', () => { stack: 'stack trace', } as unknown as MarshaledOcapError, expectedError: - 'At path: data.limitType -- Expected the value to satisfy a union of `literal | literal`, but received: "invalid"', + 'At path: data.limitType -- Expected the value to satisfy a union of `literal | literal | literal | literal`, but received: "invalid"', }, { name: 'invalid current type', diff --git a/packages/kernel-errors/src/errors/ResourceLimitError.ts b/packages/kernel-errors/src/errors/ResourceLimitError.ts index 13026a7c6..62dfd4536 100644 --- a/packages/kernel-errors/src/errors/ResourceLimitError.ts +++ b/packages/kernel-errors/src/errors/ResourceLimitError.ts @@ -11,6 +11,24 @@ import { BaseError } from '../BaseError.ts'; import { marshaledErrorSchema, ErrorCode } from '../constants.ts'; import type { ErrorOptionsWithStack, MarshaledOcapError } from '../types.ts'; +/** + * The type of resource limit that was exceeded. + */ +export type ResourceLimitType = + | 'connection' + | 'messageSize' + | 'messageRate' + | 'connectionRate'; + +/** + * Data associated with a ResourceLimitError. + */ +export type ResourceLimitErrorData = { + limitType?: ResourceLimitType; + current?: number; + limit?: number; +}; + /** * Error indicating a resource limit was exceeded. */ @@ -28,11 +46,7 @@ export class ResourceLimitError extends BaseError { constructor( message: string, options?: ErrorOptionsWithStack & { - data?: { - limitType?: 'connection' | 'messageSize'; - current?: number; - limit?: number; - }; + data?: ResourceLimitErrorData; }, ) { super(ErrorCode.ResourceLimitError, message, { @@ -50,7 +64,12 @@ export class ResourceLimitError extends BaseError { data: optional( object({ limitType: optional( - union([literal('connection'), literal('messageSize')]), + union([ + literal('connection'), + literal('messageSize'), + literal('messageRate'), + literal('connectionRate'), + ]), ), current: optional(number()), limit: optional(number()), @@ -73,13 +92,7 @@ export class ResourceLimitError extends BaseError { ): ResourceLimitError { assert(marshaledError, this.struct); const options = unmarshalErrorOptions(marshaledError); - const data = marshaledError.data as - | { - limitType?: 'connection' | 'messageSize'; - current?: number; - limit?: number; - } - | undefined; + const data = marshaledError.data as ResourceLimitErrorData | undefined; return new ResourceLimitError(marshaledError.message, { ...options, ...(data !== undefined && { data }), diff --git a/packages/kernel-errors/src/index.test.ts b/packages/kernel-errors/src/index.test.ts index 96a79688c..be7911cd7 100644 --- a/packages/kernel-errors/src/index.test.ts +++ b/packages/kernel-errors/src/index.test.ts @@ -23,6 +23,7 @@ describe('index', () => { 'isMarshaledError', 'isMarshaledOcapError', 'isOcapError', + 'isResourceLimitError', 'isRetryableNetworkError', 'marshalError', 'toError', diff --git a/packages/kernel-errors/src/index.ts b/packages/kernel-errors/src/index.ts index 9f3e40694..4f27db9eb 100644 --- a/packages/kernel-errors/src/index.ts +++ b/packages/kernel-errors/src/index.ts @@ -8,7 +8,11 @@ export { VatNotFoundError } from './errors/VatNotFoundError.ts'; export { StreamReadError } from './errors/StreamReadError.ts'; export { SubclusterNotFoundError } from './errors/SubclusterNotFoundError.ts'; export { AbortError } from './errors/AbortError.ts'; -export { ResourceLimitError } from './errors/ResourceLimitError.ts'; +export { + ResourceLimitError, + type ResourceLimitType, + type ResourceLimitErrorData, +} from './errors/ResourceLimitError.ts'; export { ErrorCode, ErrorSentinel, @@ -23,3 +27,4 @@ export { unmarshalError } from './marshal/unmarshalError.ts'; export { isMarshaledError } from './marshal/isMarshaledError.ts'; export { isMarshaledOcapError } from './marshal/isMarshaledOcapError.ts'; export { isRetryableNetworkError } from './utils/isRetryableNetworkError.ts'; +export { isResourceLimitError } from './utils/isResourceLimitError.ts'; diff --git a/packages/kernel-errors/src/utils/isResourceLimitError.test.ts b/packages/kernel-errors/src/utils/isResourceLimitError.test.ts new file mode 100644 index 000000000..af7dd9fb4 --- /dev/null +++ b/packages/kernel-errors/src/utils/isResourceLimitError.test.ts @@ -0,0 +1,80 @@ +import { describe, it, expect } from 'vitest'; + +import { isResourceLimitError } from './isResourceLimitError.ts'; +import { ResourceLimitError } from '../errors/ResourceLimitError.ts'; + +describe('isResourceLimitError', () => { + describe('without limitType parameter', () => { + it('returns true for ResourceLimitError', () => { + const error = new ResourceLimitError('limit exceeded'); + expect(isResourceLimitError(error)).toBe(true); + }); + + it('returns true for ResourceLimitError with any limitType', () => { + const connectionError = new ResourceLimitError('connection limit', { + data: { limitType: 'connection' }, + }); + const rateError = new ResourceLimitError('rate limit', { + data: { limitType: 'connectionRate' }, + }); + + expect(isResourceLimitError(connectionError)).toBe(true); + expect(isResourceLimitError(rateError)).toBe(true); + }); + + it('returns false for regular Error', () => { + const error = new Error('some error'); + expect(isResourceLimitError(error)).toBe(false); + }); + + it('returns false for null', () => { + expect(isResourceLimitError(null)).toBe(false); + }); + + it('returns false for undefined', () => { + expect(isResourceLimitError(undefined)).toBe(false); + }); + + it('returns false for non-error objects', () => { + expect(isResourceLimitError({ message: 'fake error' })).toBe(false); + }); + }); + + describe('with limitType parameter', () => { + it('returns true when limitType matches', () => { + const error = new ResourceLimitError('connection limit', { + data: { limitType: 'connection' }, + }); + expect(isResourceLimitError(error, 'connection')).toBe(true); + }); + + it('returns false when limitType does not match', () => { + const error = new ResourceLimitError('connection limit', { + data: { limitType: 'connection' }, + }); + expect(isResourceLimitError(error, 'connectionRate')).toBe(false); + }); + + it('returns false when error has no limitType', () => { + const error = new ResourceLimitError('limit exceeded'); + expect(isResourceLimitError(error, 'connection')).toBe(false); + }); + + it('returns false for non-ResourceLimitError even with matching-like data', () => { + const error = new Error('some error'); + expect(isResourceLimitError(error, 'connection')).toBe(false); + }); + + it.each([ + 'connection', + 'connectionRate', + 'messageSize', + 'messageRate', + ] as const)('correctly identifies %s limitType', (limitType) => { + const error = new ResourceLimitError('limit exceeded', { + data: { limitType }, + }); + expect(isResourceLimitError(error, limitType)).toBe(true); + }); + }); +}); diff --git a/packages/kernel-errors/src/utils/isResourceLimitError.ts b/packages/kernel-errors/src/utils/isResourceLimitError.ts new file mode 100644 index 000000000..486411454 --- /dev/null +++ b/packages/kernel-errors/src/utils/isResourceLimitError.ts @@ -0,0 +1,28 @@ +import { ResourceLimitError } from '../errors/ResourceLimitError.ts'; +import type { + ResourceLimitType, + ResourceLimitErrorData, +} from '../errors/ResourceLimitError.ts'; + +/** + * Check if an error is a ResourceLimitError, optionally with a specific limit type. + * + * @param error - The error to check. + * @param limitType - Optional limit type to match against. + * @returns True if the error is a ResourceLimitError (with matching limitType if specified). + */ +export function isResourceLimitError( + error: unknown, + limitType?: ResourceLimitType, +): error is ResourceLimitError { + if (!(error instanceof ResourceLimitError)) { + return false; + } + + if (limitType === undefined) { + return true; + } + + const data = error.data as ResourceLimitErrorData | undefined; + return data?.limitType === limitType; +} diff --git a/packages/nodejs/test/e2e/remote-comms.test.ts b/packages/nodejs/test/e2e/remote-comms.test.ts index 5d2774f6c..37da6c2df 100644 --- a/packages/nodejs/test/e2e/remote-comms.test.ts +++ b/packages/nodejs/test/e2e/remote-comms.test.ts @@ -524,12 +524,14 @@ describe.sequential('Remote Communications E2E', () => { it( 'rejects new messages when queue reaches MAX_QUEUE limit', async () => { + // Use high rate limit to avoid rate limiting interference with queue limit test const { aliceRef, bobURL } = await setupAliceAndBob( kernel1, kernel2, kernelStore1, kernelStore2, testRelays, + { maxMessagesPerSecond: 500 }, ); await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']); diff --git a/packages/nodejs/test/helpers/remote-comms.ts b/packages/nodejs/test/helpers/remote-comms.ts index bcd7b80f4..303a31726 100644 --- a/packages/nodejs/test/helpers/remote-comms.ts +++ b/packages/nodejs/test/helpers/remote-comms.ts @@ -1,7 +1,11 @@ import type { KernelDatabase } from '@metamask/kernel-store'; import { stringify } from '@metamask/kernel-utils'; import { Kernel, kunser, makeKernelStore } from '@metamask/ocap-kernel'; -import type { ClusterConfig, KRef } from '@metamask/ocap-kernel'; +import type { + ClusterConfig, + KRef, + RemoteCommsOptions, +} from '@metamask/ocap-kernel'; import { makeTestKernel } from './kernel.ts'; @@ -181,6 +185,7 @@ export async function wait(ms: number): Promise { * @param kernelStore1 - Kernel store for first kernel. * @param kernelStore2 - Kernel store for second kernel. * @param relays - Array of relay addresses. + * @param remoteCommsOptions - Optional additional options for initRemoteComms. * @returns Object with all setup data including URLs and references. */ export async function setupAliceAndBob( @@ -189,6 +194,7 @@ export async function setupAliceAndBob( kernelStore1: ReturnType, kernelStore2: ReturnType, relays: string[], + remoteCommsOptions?: Omit, ): Promise<{ aliceURL: string; bobURL: string; @@ -197,8 +203,8 @@ export async function setupAliceAndBob( peerId1: string; peerId2: string; }> { - await kernel1.initRemoteComms({ relays }); - await kernel2.initRemoteComms({ relays }); + await kernel1.initRemoteComms({ relays, ...remoteCommsOptions }); + await kernel2.initRemoteComms({ relays, ...remoteCommsOptions }); const aliceConfig = makeRemoteVatConfig('Alice'); const bobConfig = makeRemoteVatConfig('Bob'); diff --git a/packages/ocap-kernel/src/remotes/platform/constants.ts b/packages/ocap-kernel/src/remotes/platform/constants.ts index 608a3ebe7..ca66968fc 100644 --- a/packages/ocap-kernel/src/remotes/platform/constants.ts +++ b/packages/ocap-kernel/src/remotes/platform/constants.ts @@ -15,3 +15,15 @@ export const DEFAULT_WRITE_TIMEOUT_MS = 10_000; /** SCTP user initiated abort code (RFC 4960) */ export const SCTP_USER_INITIATED_ABORT = 12; + +/** Default message rate limit: 100 messages per second per peer */ +export const DEFAULT_MESSAGE_RATE_LIMIT = 100; + +/** Default message rate window in milliseconds (1 second) */ +export const DEFAULT_MESSAGE_RATE_WINDOW_MS = 1000; + +/** Default connection attempt rate limit: 10 attempts per minute per peer */ +export const DEFAULT_CONNECTION_RATE_LIMIT = 10; + +/** Default connection rate window in milliseconds (1 minute) */ +export const DEFAULT_CONNECTION_RATE_WINDOW_MS = 60_000; diff --git a/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts b/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts new file mode 100644 index 000000000..f5e0df56a --- /dev/null +++ b/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts @@ -0,0 +1,340 @@ +import { ResourceLimitError } from '@metamask/kernel-errors'; +import { describe, it, expect, beforeEach } from 'vitest'; + +import { + DEFAULT_CONNECTION_RATE_LIMIT, + DEFAULT_MESSAGE_RATE_LIMIT, + DEFAULT_MESSAGE_RATE_WINDOW_MS, +} from './constants.ts'; +import { + SlidingWindowRateLimiter, + makeMessageRateLimiter, + makeConnectionRateLimiter, +} from './rate-limiter.ts'; + +describe('SlidingWindowRateLimiter', () => { + let limiter: SlidingWindowRateLimiter; + + beforeEach(() => { + // Create a limiter allowing 3 events per 100ms window for faster tests + limiter = new SlidingWindowRateLimiter(3, 100); + }); + + describe('constructor', () => { + it.each([ + { + maxEvents: 0, + windowMs: 100, + error: 'maxEvents must be a positive finite number', + }, + { + maxEvents: -1, + windowMs: 100, + error: 'maxEvents must be a positive finite number', + }, + { + maxEvents: NaN, + windowMs: 100, + error: 'maxEvents must be a positive finite number', + }, + { + maxEvents: Infinity, + windowMs: 100, + error: 'maxEvents must be a positive finite number', + }, + { + maxEvents: 10, + windowMs: 0, + error: 'windowMs must be a positive finite number', + }, + { + maxEvents: 10, + windowMs: -1, + error: 'windowMs must be a positive finite number', + }, + { + maxEvents: 10, + windowMs: NaN, + error: 'windowMs must be a positive finite number', + }, + { + maxEvents: 10, + windowMs: Infinity, + error: 'windowMs must be a positive finite number', + }, + ])( + 'throws "$error" when maxEvents=$maxEvents and windowMs=$windowMs', + ({ maxEvents, windowMs, error }) => { + expect(() => new SlidingWindowRateLimiter(maxEvents, windowMs)).toThrow( + error, + ); + }, + ); + + it('accepts valid positive values', () => { + expect(() => new SlidingWindowRateLimiter(1, 1)).not.toThrow(); + }); + }); + + describe('wouldExceedLimit', () => { + it('returns false when no events recorded', () => { + expect(limiter.wouldExceedLimit('peer1')).toBe(false); + }); + + it('returns false when under the limit', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.wouldExceedLimit('peer1')).toBe(false); + }); + + it('returns true when at the limit', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + }); + + it('tracks limits independently per key', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + expect(limiter.wouldExceedLimit('peer2')).toBe(false); + }); + + it('allows events after window expires', async () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + + // Wait for window to expire + await new Promise((resolve) => setTimeout(resolve, 110)); + expect(limiter.wouldExceedLimit('peer1')).toBe(false); + }); + }); + + describe('recordEvent', () => { + it('records events for a key', () => { + limiter.recordEvent('peer1'); + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + + it('accumulates events', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.getCurrentCount('peer1')).toBe(2); + }); + + it('prunes old events when recording', async () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + await new Promise((resolve) => setTimeout(resolve, 110)); + limiter.recordEvent('peer1'); + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + }); + + describe('checkAndRecord', () => { + it('records event when under limit', () => { + limiter.checkAndRecord('peer1', 'messageRate'); + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + + it('throws ResourceLimitError when limit exceeded', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + + expect(() => limiter.checkAndRecord('peer1', 'messageRate')).toThrow( + ResourceLimitError, + ); + }); + + it('includes limit details in error', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + + let caughtError: ResourceLimitError | undefined; + try { + limiter.checkAndRecord('peer1', 'messageRate'); + } catch (error) { + caughtError = error as ResourceLimitError; + } + + expect(caughtError).toBeInstanceOf(ResourceLimitError); + expect(caughtError?.data).toStrictEqual({ + limitType: 'messageRate', + current: 3, + limit: 3, + }); + }); + + it('does not record when limit exceeded', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + + try { + limiter.checkAndRecord('peer1', 'messageRate'); + } catch { + // Expected + } + + expect(limiter.getCurrentCount('peer1')).toBe(3); + }); + }); + + describe('getCurrentCount', () => { + it('returns 0 for unknown key', () => { + expect(limiter.getCurrentCount('unknown')).toBe(0); + }); + + it('returns count of events within window', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.getCurrentCount('peer1')).toBe(2); + }); + + it('excludes events outside window', async () => { + limiter.recordEvent('peer1'); + await new Promise((resolve) => setTimeout(resolve, 60)); + limiter.recordEvent('peer1'); + await new Promise((resolve) => setTimeout(resolve, 50)); + // First event is now outside window (110ms ago) + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + }); + + describe('clearKey', () => { + it('removes all events for a key', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.clearKey('peer1'); + expect(limiter.getCurrentCount('peer1')).toBe(0); + }); + + it('does not affect other keys', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer2'); + limiter.clearKey('peer1'); + expect(limiter.getCurrentCount('peer1')).toBe(0); + expect(limiter.getCurrentCount('peer2')).toBe(1); + }); + }); + + describe('clear', () => { + it('removes all events for all keys', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer2'); + limiter.clear(); + expect(limiter.getCurrentCount('peer1')).toBe(0); + expect(limiter.getCurrentCount('peer2')).toBe(0); + }); + }); + + describe('pruneStale', () => { + it('removes keys with no recent events', async () => { + limiter.recordEvent('peer1'); + await new Promise((resolve) => setTimeout(resolve, 110)); + limiter.pruneStale(); + expect(limiter.getCurrentCount('peer1')).toBe(0); + }); + + it('keeps keys with recent events', () => { + limiter.recordEvent('peer1'); + limiter.pruneStale(); + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + + it('prunes old events from active keys', async () => { + limiter.recordEvent('peer1'); + await new Promise((resolve) => setTimeout(resolve, 60)); + limiter.recordEvent('peer1'); + await new Promise((resolve) => setTimeout(resolve, 50)); + limiter.pruneStale(); + // Only the second event should remain (first is >100ms old) + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + }); + + describe('sliding window behavior', () => { + it('allows burst followed by sustained rate', async () => { + // Burst 3 events + limiter.checkAndRecord('peer1', 'messageRate'); + limiter.checkAndRecord('peer1', 'messageRate'); + limiter.checkAndRecord('peer1', 'messageRate'); + + // Should be at limit + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + + // Wait for first event to expire + await new Promise((resolve) => setTimeout(resolve, 110)); + + // Now slots available + expect(limiter.wouldExceedLimit('peer1')).toBe(false); + limiter.checkAndRecord('peer1', 'messageRate'); + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + }); +}); + +describe('makeMessageRateLimiter', () => { + it('creates limiter with default settings', () => { + const limiter = makeMessageRateLimiter(); + + // Should allow DEFAULT_MESSAGE_RATE_LIMIT events + for (let idx = 0; idx < DEFAULT_MESSAGE_RATE_LIMIT; idx++) { + limiter.recordEvent('peer1'); + } + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + }); + + it('creates limiter with custom rate', () => { + const limiter = makeMessageRateLimiter(5); + + for (let idx = 0; idx < 5; idx++) { + limiter.recordEvent('peer1'); + } + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + }); + + it('uses 1 second window', async () => { + // Use a small limit to make test faster + const limiter = makeMessageRateLimiter(2); + + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + + // Window is 1 second, so after 1 second events should be allowed + await new Promise((resolve) => + setTimeout(resolve, DEFAULT_MESSAGE_RATE_WINDOW_MS + 10), + ); + expect(limiter.wouldExceedLimit('peer1')).toBe(false); + }); +}); + +describe('makeConnectionRateLimiter', () => { + it('creates limiter with default settings', () => { + const limiter = makeConnectionRateLimiter(); + + // Should allow DEFAULT_CONNECTION_RATE_LIMIT events + for (let idx = 0; idx < DEFAULT_CONNECTION_RATE_LIMIT; idx++) { + limiter.recordEvent('peer1'); + } + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + }); + + it('creates limiter with custom rate', () => { + const limiter = makeConnectionRateLimiter(3); + + for (let idx = 0; idx < 3; idx++) { + limiter.recordEvent('peer1'); + } + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + }); + + // Skip window expiration test for connection limiter as it would take 60 seconds +}); diff --git a/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts b/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts new file mode 100644 index 000000000..c86f3b322 --- /dev/null +++ b/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts @@ -0,0 +1,182 @@ +import { ResourceLimitError } from '@metamask/kernel-errors'; + +import { + DEFAULT_CONNECTION_RATE_LIMIT, + DEFAULT_CONNECTION_RATE_WINDOW_MS, + DEFAULT_MESSAGE_RATE_LIMIT, + DEFAULT_MESSAGE_RATE_WINDOW_MS, +} from './constants.ts'; + +/** + * A sliding window rate limiter that tracks event counts per key within a time window. + * Events older than the window are automatically pruned when checking or recording. + */ +export class SlidingWindowRateLimiter { + readonly #maxEvents: number; + + readonly #windowMs: number; + + readonly #timestamps: Map; + + /** + * Create a new sliding window rate limiter. + * + * @param maxEvents - Maximum number of events allowed within the window. + * @param windowMs - Window size in milliseconds. + * @throws Error if maxEvents or windowMs is not a positive finite number. + */ + constructor(maxEvents: number, windowMs: number) { + if (!Number.isFinite(maxEvents) || maxEvents <= 0) { + throw new Error('maxEvents must be a positive finite number'); + } + if (!Number.isFinite(windowMs) || windowMs <= 0) { + throw new Error('windowMs must be a positive finite number'); + } + this.#maxEvents = maxEvents; + this.#windowMs = windowMs; + this.#timestamps = new Map(); + } + + /** + * Check if an event would exceed the rate limit for a given key. + * This does not record the event. + * + * @param key - The key to check (e.g., peer ID). + * @returns True if the event would exceed the rate limit. + */ + wouldExceedLimit(key: string): boolean { + return this.getCurrentCount(key) >= this.#maxEvents; + } + + /** + * Record an event for a given key. + * Automatically prunes old timestamps outside the window. + * + * @param key - The key to record (e.g., peer ID). + */ + recordEvent(key: string): void { + const now = Date.now(); + const cutoff = now - this.#windowMs; + let timestamps = this.#timestamps.get(key); + + if (!timestamps) { + timestamps = []; + this.#timestamps.set(key, timestamps); + } + + // Prune old timestamps and add new one + const pruned = timestamps.filter((ts) => ts > cutoff); + pruned.push(now); + this.#timestamps.set(key, pruned); + } + + /** + * Check the rate limit and record the event if allowed. + * Throws ResourceLimitError if the limit would be exceeded. + * + * @param key - The key to check and record (e.g., peer ID). + * @param limitType - The type of limit for error reporting. + * @throws ResourceLimitError if the rate limit would be exceeded. + */ + checkAndRecord( + key: string, + limitType: 'messageRate' | 'connectionRate', + ): void { + const currentCount = this.getCurrentCount(key); + if (currentCount >= this.#maxEvents) { + throw new ResourceLimitError( + `Rate limit exceeded: ${currentCount}/${this.#maxEvents} ${limitType} in ${this.#windowMs}ms window`, + { + data: { + limitType, + current: currentCount, + limit: this.#maxEvents, + }, + }, + ); + } + this.recordEvent(key); + } + + /** + * Get the current count of events within the window for a key. + * + * @param key - The key to check. + * @returns The number of events within the current window. + */ + getCurrentCount(key: string): number { + const now = Date.now(); + const cutoff = now - this.#windowMs; + const timestamps = this.#timestamps.get(key); + + if (!timestamps) { + return 0; + } + + return timestamps.filter((ts) => ts > cutoff).length; + } + + /** + * Clear all recorded events for a specific key. + * + * @param key - The key to clear. + */ + clearKey(key: string): void { + this.#timestamps.delete(key); + } + + /** + * Clear all recorded events. + */ + clear(): void { + this.#timestamps.clear(); + } + + /** + * Prune old timestamps for all keys. + * Removes keys that have no recent events. + */ + pruneStale(): void { + const now = Date.now(); + const cutoff = now - this.#windowMs; + + for (const [key, timestamps] of this.#timestamps.entries()) { + const pruned = timestamps.filter((ts) => ts > cutoff); + if (pruned.length === 0) { + this.#timestamps.delete(key); + } else { + this.#timestamps.set(key, pruned); + } + } + } +} + +/** + * Factory function to create a message rate limiter. + * + * @param maxMessagesPerSecond - Maximum messages per second per peer. + * @returns A configured SlidingWindowRateLimiter for message rate limiting. + */ +export function makeMessageRateLimiter( + maxMessagesPerSecond: number = DEFAULT_MESSAGE_RATE_LIMIT, +): SlidingWindowRateLimiter { + return new SlidingWindowRateLimiter( + maxMessagesPerSecond, + DEFAULT_MESSAGE_RATE_WINDOW_MS, + ); +} + +/** + * Factory function to create a connection attempt rate limiter. + * + * @param maxAttemptsPerMinute - Maximum connection attempts per minute per peer. + * @returns A configured SlidingWindowRateLimiter for connection rate limiting. + */ +export function makeConnectionRateLimiter( + maxAttemptsPerMinute: number = DEFAULT_CONNECTION_RATE_LIMIT, +): SlidingWindowRateLimiter { + return new SlidingWindowRateLimiter( + maxAttemptsPerMinute, + DEFAULT_CONNECTION_RATE_WINDOW_MS, + ); +} diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts index 5c45729e2..3653992ee 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts @@ -28,6 +28,10 @@ vi.mock('@metamask/kernel-errors', async () => { }; }); +// Helper to flush pending promises/microtasks +const flushPromises = async (): Promise => + new Promise((resolve) => setImmediate(resolve)); + describe('reconnection-lifecycle', () => { let deps: ReconnectionLifecycleDeps; let abortController: AbortController; @@ -69,6 +73,7 @@ describe('reconnection-lifecycle', () => { isReconnecting: vi.fn().mockReturnValue(true), shouldRetry: vi.fn().mockReturnValue(true), incrementAttempt: vi.fn().mockReturnValue(1), + decrementAttempt: vi.fn(), calculateBackoff: vi.fn().mockReturnValue(100), startReconnection: vi.fn(), stopReconnection: vi.fn(), @@ -79,6 +84,8 @@ describe('reconnection-lifecycle', () => { dialPeer: vi.fn().mockResolvedValue(mockChannel), reuseOrReturnChannel: vi.fn().mockResolvedValue(mockChannel), checkConnectionLimit: vi.fn(), + checkConnectionRateLimit: vi.fn(), + closeChannel: vi.fn().mockResolvedValue(undefined), registerChannel: vi.fn(), } as unknown as ReconnectionLifecycleDeps; }); @@ -119,13 +126,14 @@ describe('reconnection-lifecycle', () => { expect(state.channel).toBeUndefined(); }); - it('starts reconnection if not already reconnecting', () => { + it('starts reconnection if not already reconnecting', async () => { ( deps.reconnectionManager.isReconnecting as ReturnType ).mockReturnValue(false); const lifecycle = makeReconnectionLifecycle(deps); lifecycle.handleConnectionLoss('peer1'); + await flushPromises(); expect(deps.reconnectionManager.startReconnection).toHaveBeenCalledWith( 'peer1', @@ -143,13 +151,14 @@ describe('reconnection-lifecycle', () => { expect(deps.reconnectionManager.startReconnection).not.toHaveBeenCalled(); }); - it('logs connection loss message', () => { + it('logs connection loss message', async () => { ( deps.reconnectionManager.isReconnecting as ReturnType ).mockReturnValue(false); const lifecycle = makeReconnectionLifecycle(deps); lifecycle.handleConnectionLoss('peer1'); + await flushPromises(); expect(deps.logger.log).toHaveBeenCalledWith( 'peer1:: connection lost, initiating reconnection', @@ -318,7 +327,200 @@ describe('reconnection-lifecycle', () => { await lifecycle.attemptReconnection('peer1'); - expect(deps.checkConnectionLimit).toHaveBeenCalled(); + expect(deps.checkConnectionLimit).toHaveBeenCalledTimes(1); + }); + + it('checks connection rate limit before dialing', async () => { + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + expect(deps.checkConnectionRateLimit).toHaveBeenCalledTimes(1); + expect(deps.checkConnectionRateLimit).toHaveBeenCalledWith('peer1'); + }); + + it('continues loop on ResourceLimitError instead of giving up', async () => { + const { ResourceLimitError } = kernelErrors; + ( + deps.checkConnectionRateLimit as ReturnType + ).mockImplementationOnce(() => { + throw new ResourceLimitError('Rate limit exceeded', { + data: { limitType: 'connectionRate', current: 10, limit: 10 }, + }); + }); + + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) // First attempt - rate limited + .mockReturnValueOnce(true) // Second attempt - success + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + // Should not call onRemoteGiveUp because rate limit is retryable + expect(deps.onRemoteGiveUp).not.toHaveBeenCalled(); + // Should have tried twice (once rate limited, once successful) + expect(deps.reconnectionManager.incrementAttempt).toHaveBeenCalledTimes( + 2, + ); + expect(deps.logger.log).toHaveBeenCalledWith( + expect.stringContaining('rate limited'), + ); + }); + + it('decrements attempt count when rate limited to preserve retry quota', async () => { + const { ResourceLimitError } = kernelErrors; + ( + deps.checkConnectionRateLimit as ReturnType + ).mockImplementationOnce(() => { + throw new ResourceLimitError('Rate limit exceeded', { + data: { limitType: 'connectionRate', current: 10, limit: 10 }, + }); + }); + + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) // First attempt - rate limited + .mockReturnValueOnce(true) // Second attempt - success + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + // Should have decremented attempt count when rate limited + // so that rate-limited attempts don't consume retry quota + expect(deps.reconnectionManager.decrementAttempt).toHaveBeenCalledWith( + 'peer1', + ); + expect(deps.reconnectionManager.decrementAttempt).toHaveBeenCalledTimes( + 1, + ); + }); + + it('does not decrement attempt count for connection limit errors', async () => { + const { ResourceLimitError } = kernelErrors; + ( + deps.checkConnectionLimit as ReturnType + ).mockImplementationOnce(() => { + throw new ResourceLimitError('Connection limit exceeded', { + data: { limitType: 'connection', current: 100, limit: 100 }, + }); + }); + + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) // First attempt - connection limit + .mockReturnValueOnce(true) // Second attempt - success + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + // Should NOT decrement attempt count because dial was performed + expect(deps.reconnectionManager.decrementAttempt).not.toHaveBeenCalled(); + }); + + it('retries connection limit errors without calling isRetryableNetworkError', async () => { + const { ResourceLimitError } = kernelErrors; + // Mock isRetryableNetworkError to return false - connection limit errors + // should be retried regardless via explicit handling + ( + kernelErrors.isRetryableNetworkError as ReturnType + ).mockReturnValue(false); + + ( + deps.checkConnectionLimit as ReturnType + ).mockImplementationOnce(() => { + throw new ResourceLimitError('Connection limit exceeded', { + data: { limitType: 'connection', current: 100, limit: 100 }, + }); + }); + + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) // First attempt - connection limit + .mockReturnValueOnce(true) // Second attempt - success + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + // Should NOT call onRemoteGiveUp - connection limit errors are retryable + expect(deps.onRemoteGiveUp).not.toHaveBeenCalled(); + // Should have retried and succeeded + expect(deps.reconnectionManager.resetBackoff).toHaveBeenCalledWith( + 'peer1', + ); + expect(deps.logger.log).toHaveBeenCalledWith( + expect.stringContaining('hit connection limit'), + ); + }); + + it('closes channel when connection limit is exceeded after dial', async () => { + const { ResourceLimitError } = kernelErrors; + ( + deps.checkConnectionLimit as ReturnType + ).mockImplementationOnce(() => { + throw new ResourceLimitError('Connection limit exceeded', { + data: { limitType: 'connection', current: 100, limit: 100 }, + }); + }); + + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) // First attempt - connection limit + .mockReturnValueOnce(true) // Second attempt - success + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + // Should close the channel to prevent resource leak + expect(deps.closeChannel).toHaveBeenCalledWith(mockChannel, 'peer1'); + }); + + it('propagates ResourceLimitError even when closeChannel fails', async () => { + const { ResourceLimitError } = kernelErrors; + // Mock isRetryableNetworkError to return false - we want to verify the + // ResourceLimitError is still correctly identified + ( + kernelErrors.isRetryableNetworkError as ReturnType + ).mockReturnValue(false); + + ( + deps.checkConnectionLimit as ReturnType + ).mockImplementationOnce(() => { + throw new ResourceLimitError('Connection limit exceeded', { + data: { limitType: 'connection', current: 100, limit: 100 }, + }); + }); + + // closeChannel throws an error + (deps.closeChannel as ReturnType).mockRejectedValueOnce( + new Error('Channel already closed'), + ); + + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) // First attempt - connection limit + close error + .mockReturnValueOnce(true) // Second attempt - success + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + // Should NOT call onRemoteGiveUp - the original ResourceLimitError should + // be preserved even though closeChannel failed + expect(deps.onRemoteGiveUp).not.toHaveBeenCalled(); + // Should have retried and succeeded + expect(deps.reconnectionManager.resetBackoff).toHaveBeenCalledWith( + 'peer1', + ); }); it('logs reconnection attempts', async () => { diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts index 5cc950f52..a780c51f9 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts @@ -1,4 +1,7 @@ -import { isRetryableNetworkError } from '@metamask/kernel-errors'; +import { + isRetryableNetworkError, + isResourceLimitError, +} from '@metamask/kernel-errors'; import { abortableDelay, DEFAULT_MAX_RETRY_ATTEMPTS, @@ -27,6 +30,8 @@ export type ReconnectionLifecycleDeps = { dialedChannel: Channel, ) => Promise; checkConnectionLimit: () => void; + checkConnectionRateLimit: (peerId: string) => void; + closeChannel: (channel: Channel, peerId: string) => Promise; registerChannel: ( peerId: string, channel: Channel, @@ -62,6 +67,8 @@ export function makeReconnectionLifecycle( dialPeer, reuseOrReturnChannel, checkConnectionLimit, + checkConnectionRateLimit, + closeChannel, registerChannel, } = deps; @@ -124,6 +131,23 @@ export function makeReconnectionLifecycle( reconnectionManager.stopReconnection(peerId); return; } + // Handle rate limit errors (connectionRate) - these are temporary and + // occur before any dial was performed, so don't count against retry quota + if (isResourceLimitError(problem, 'connectionRate')) { + reconnectionManager.decrementAttempt(peerId); + logger.log( + `${peerId}:: reconnection attempt ${nextAttempt} rate limited, will retry after backoff`, + ); + continue; + } + // Connection limit errors (limitType: 'connection') occur after dial - + // the attempt counts and channel cleanup is handled in tryReconnect + if (isResourceLimitError(problem, 'connection')) { + logger.log( + `${peerId}:: reconnection attempt ${nextAttempt} hit connection limit, will retry after backoff`, + ); + continue; + } if (!isRetryableNetworkError(problem)) { outputError(peerId, `non-retryable failure`, problem); reconnectionManager.stopReconnection(peerId); @@ -151,6 +175,9 @@ export function makeReconnectionLifecycle( state: PeerState, peerId: string, ): Promise { + // Check connection rate limit before attempting dial + checkConnectionRateLimit(peerId); + const { locationHints: hints } = state; const dialedChannel = await dialPeer(peerId, hints); @@ -162,7 +189,18 @@ export function makeReconnectionLifecycle( // Re-check connection limit and register if this is a new channel if (state.channel !== channel) { - checkConnectionLimit(); + try { + checkConnectionLimit(); + } catch (error) { + // Connection limit exceeded after dial - close the channel to prevent leak + // Use try-catch to ensure the original error is always re-thrown + try { + await closeChannel(channel, peerId); + } catch { + // Ignore close errors - the original ResourceLimitError takes priority + } + throw error; + } registerChannel(peerId, channel, 'reading channel to'); } diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection.test.ts b/packages/ocap-kernel/src/remotes/platform/reconnection.test.ts index c4469e182..7ca47e634 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection.test.ts @@ -126,6 +126,43 @@ describe('ReconnectionManager', () => { }); }); + describe('decrementAttempt', () => { + it('decrements attempt count', () => { + manager.incrementAttempt('peer1'); + manager.incrementAttempt('peer1'); + expect(manager.getAttemptCount('peer1')).toBe(2); + + manager.decrementAttempt('peer1'); + expect(manager.getAttemptCount('peer1')).toBe(1); + + manager.decrementAttempt('peer1'); + expect(manager.getAttemptCount('peer1')).toBe(0); + }); + + it('does not go below zero', () => { + expect(manager.getAttemptCount('peer1')).toBe(0); + + manager.decrementAttempt('peer1'); + expect(manager.getAttemptCount('peer1')).toBe(0); + }); + + it('handles decrement on new peer without prior state', () => { + manager.decrementAttempt('newpeer'); + expect(manager.getAttemptCount('newpeer')).toBe(0); + }); + + it('only affects specified peer', () => { + manager.incrementAttempt('peer1'); + manager.incrementAttempt('peer1'); + manager.incrementAttempt('peer2'); + + manager.decrementAttempt('peer1'); + + expect(manager.getAttemptCount('peer1')).toBe(1); + expect(manager.getAttemptCount('peer2')).toBe(1); + }); + }); + describe('resetBackoff', () => { it('resets attempt count to zero', () => { manager.incrementAttempt('peer1'); diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection.ts b/packages/ocap-kernel/src/remotes/platform/reconnection.ts index 3664a2472..8393cd6ab 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection.ts @@ -75,6 +75,20 @@ export class ReconnectionManager { return state.attemptCount; } + /** + * Decrement the attempt count (minimum 0). + * Used to "undo" an attempt that didn't actually perform a dial + * (e.g., when rate-limited before the connection was attempted). + * + * @param peerId - The peer ID to decrement attempts for. + */ + decrementAttempt(peerId: string): void { + const state = this.#getState(peerId); + if (state.attemptCount > 0) { + state.attemptCount -= 1; + } + } + /** * Reset the backoff counter for a peer. * diff --git a/packages/ocap-kernel/src/remotes/platform/transport.test.ts b/packages/ocap-kernel/src/remotes/platform/transport.test.ts index 8924cfeb9..a96724427 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.test.ts @@ -20,6 +20,7 @@ const mockReconnectionManager = { stopReconnection: vi.fn(), shouldRetry: vi.fn().mockReturnValue(true), incrementAttempt: vi.fn().mockReturnValue(1), + decrementAttempt: vi.fn(), calculateBackoff: vi.fn().mockReturnValue(100), resetBackoff: vi.fn(), resetAllBackoffs: vi.fn(), @@ -39,6 +40,8 @@ vi.mock('./reconnection.ts', () => { incrementAttempt = mockReconnectionManager.incrementAttempt; + decrementAttempt = mockReconnectionManager.decrementAttempt; + calculateBackoff = mockReconnectionManager.calculateBackoff; resetBackoff = mockReconnectionManager.resetBackoff; @@ -125,6 +128,7 @@ vi.mock('@metamask/kernel-errors', () => ({ errorWithCode?.code === 'ETIMEDOUT' ); }), + isResourceLimitError: vi.fn().mockReturnValue(false), })); // Mock uint8arrays @@ -162,6 +166,7 @@ describe('transport.initTransport', () => { mockReconnectionManager.stopReconnection.mockClear(); mockReconnectionManager.shouldRetry.mockClear(); mockReconnectionManager.incrementAttempt.mockClear(); + mockReconnectionManager.decrementAttempt.mockClear(); mockReconnectionManager.calculateBackoff.mockClear(); mockReconnectionManager.resetBackoff.mockClear(); mockReconnectionManager.resetAllBackoffs.mockClear(); @@ -894,6 +899,11 @@ describe('transport.initTransport', () => { ); // Should not start reading from this channel expect(mockChannel.msgStream.read).not.toHaveBeenCalled(); + // Should close the channel to prevent resource leaks + expect(mockConnectionFactory.closeChannel).toHaveBeenCalledWith( + mockChannel, + 'peer-1', + ); }); }); }); @@ -1795,6 +1805,11 @@ describe('transport.initTransport', () => { expect(mockLogger.log).toHaveBeenCalledWith( 'peer-3:: rejecting inbound connection due to connection limit', ); + // Should close the channel to prevent resource leaks + expect(mockConnectionFactory.closeChannel).toHaveBeenCalledWith( + inboundChannel, + 'peer-3', + ); }); // Should not have started reading from the rejected channel @@ -1812,6 +1827,67 @@ describe('transport.initTransport', () => { await sendRemoteMessage('peer-1', makeTestMessage('msg')); expect(mockChannel.msgStream.write).toHaveBeenCalled(); }); + + it('closes channel when connection limit exceeded after dial due to race condition', async () => { + const maxConcurrentConnections = 1; + const dialedChannels: MockChannel[] = []; + + // Track when dial completes so we can inject a race condition + let resolveFirstDial: ((channel: MockChannel) => void) | undefined; + const firstDialPromise = new Promise((resolve) => { + resolveFirstDial = resolve; + }); + + mockConnectionFactory.dialIdempotent.mockImplementation( + async (peerId: string) => { + const channel = createMockChannel(peerId); + dialedChannels.push(channel); + + if (peerId === 'peer-1') { + // First dial waits to be resolved manually + return firstDialPromise; + } + // Second dial completes immediately + return channel; + }, + ); + + const { sendRemoteMessage } = await initTransport( + '0x1234', + { maxConcurrentConnections }, + vi.fn(), + ); + + // Start first send (will wait at dial) + const firstSendPromise = sendRemoteMessage( + 'peer-1', + makeTestMessage('msg1'), + ); + + // Wait for first dial to start + await vi.waitFor(() => { + expect(dialedChannels).toHaveLength(1); + }); + + // Start and complete second send while first is still dialing + // This establishes a connection, filling the limit + await sendRemoteMessage('peer-2', makeTestMessage('msg2')); + + // Now complete the first dial - post-dial check should fail + // because we're now at the connection limit + resolveFirstDial?.(dialedChannels[0] as MockChannel); + + // First send should fail with connection limit error + await expect(firstSendPromise).rejects.toThrow( + /Connection limit reached/u, + ); + + // The channel from the first dial should have been closed to prevent leak + expect(mockConnectionFactory.closeChannel).toHaveBeenCalledWith( + dialedChannels[0], + 'peer-1', + ); + }); }); describe('stale peer cleanup', () => { diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index dfc4e8282..e57a062ed 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -7,13 +7,19 @@ import { makeErrorLogger, writeWithTimeout } from './channel-utils.ts'; import { ConnectionFactory } from './connection-factory.ts'; import { DEFAULT_CLEANUP_INTERVAL_MS, + DEFAULT_CONNECTION_RATE_LIMIT, DEFAULT_MAX_CONCURRENT_CONNECTIONS, DEFAULT_MAX_MESSAGE_SIZE_BYTES, + DEFAULT_MESSAGE_RATE_LIMIT, DEFAULT_STALE_PEER_TIMEOUT_MS, DEFAULT_WRITE_TIMEOUT_MS, SCTP_USER_INITIATED_ABORT, } from './constants.ts'; import { PeerStateManager } from './peer-state-manager.ts'; +import { + makeConnectionRateLimiter, + makeMessageRateLimiter, +} from './rate-limiter.ts'; import { makeReconnectionLifecycle } from './reconnection-lifecycle.ts'; import { ReconnectionManager } from './reconnection.ts'; import { @@ -41,6 +47,8 @@ import type { * @param options.maxMessageSizeBytes - Maximum message size in bytes (default: 1MB). * @param options.cleanupIntervalMs - Stale peer cleanup interval in milliseconds (default: 15 minutes). * @param options.stalePeerTimeoutMs - Stale peer timeout in milliseconds (default: 1 hour). + * @param options.maxMessagesPerSecond - Maximum messages per second per peer (default: 100). + * @param options.maxConnectionAttemptsPerMinute - Maximum connection attempts per minute per peer (default: 10). * @param remoteMessageHandler - Handler to be called when messages are received from elsewhere. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote (after max retries or non-retryable error). * @@ -65,6 +73,8 @@ export async function initTransport( maxMessageSizeBytes = DEFAULT_MAX_MESSAGE_SIZE_BYTES, cleanupIntervalMs = DEFAULT_CLEANUP_INTERVAL_MS, stalePeerTimeoutMs = DEFAULT_STALE_PEER_TIMEOUT_MS, + maxMessagesPerSecond = DEFAULT_MESSAGE_RATE_LIMIT, + maxConnectionAttemptsPerMinute = DEFAULT_CONNECTION_RATE_LIMIT, } = options; let cleanupWakeDetector: (() => void) | undefined; const stopController = new AbortController(); @@ -78,6 +88,10 @@ export async function initTransport( maxConcurrentConnections, () => peerStateManager.countActiveConnections(), ); + const messageRateLimiter = makeMessageRateLimiter(maxMessagesPerSecond); + const connectionRateLimiter = makeConnectionRateLimiter( + maxConnectionAttemptsPerMinute, + ); let cleanupIntervalId: ReturnType | undefined; // Holder for handleConnectionLoss - initialized later after all dependencies are defined // This breaks the circular dependency between readChannel → handleConnectionLoss → registerChannel @@ -109,7 +123,12 @@ export async function initTransport( for (const peerId of stalePeers) { peerStateManager.removePeer(peerId); reconnectionManager.stopReconnection(peerId); + messageRateLimiter.clearKey(peerId); + connectionRateLimiter.clearKey(peerId); } + // Also prune stale rate limiter entries that may not have peer state + messageRateLimiter.pruneStale(); + connectionRateLimiter.pruneStale(); } /** @@ -297,6 +316,10 @@ export async function initTransport( connectionFactory.dialIdempotent(peerId, hints, false), reuseOrReturnChannel, checkConnectionLimit, + checkConnectionRateLimit: (peerId: string) => + connectionRateLimiter.checkAndRecord(peerId, 'connectionRate'), + closeChannel: async (channel, peerId) => + connectionFactory.closeChannel(channel, peerId), registerChannel, }); reconnectionHolder.handleConnectionLoss = @@ -326,6 +349,12 @@ export async function initTransport( // Validate message size before sending validateMessageSize(message); + // Check and record message rate limit atomically to prevent TOCTOU race + // Note: This records before send completes, so failed sends consume quota. + // This is intentional - recording after send would allow concurrent sends + // to bypass the rate limit via TOCTOU attacks. + messageRateLimiter.checkAndRecord(targetPeerId, 'messageRate'); + const state = peerStateManager.getState(targetPeerId); // Get or establish channel @@ -334,6 +363,9 @@ export async function initTransport( // Check connection limit before attempting to dial checkConnectionLimit(); + // Check connection attempt rate limit + connectionRateLimiter.checkAndRecord(targetPeerId, 'connectionRate'); + try { const { locationHints: hints } = state; channel = await connectionFactory.dialIdempotent( @@ -355,14 +387,21 @@ export async function initTransport( // Re-check connection limit after reuseOrReturnChannel to prevent race conditions if (state.channel !== channel) { - checkConnectionLimit(); + try { + checkConnectionLimit(); + } catch (error) { + // Connection limit exceeded after dial - close the channel to prevent leak + // Use try-catch to ensure the original error is always re-thrown + try { + await connectionFactory.closeChannel(channel, targetPeerId); + } catch { + // Ignore close errors - the original ResourceLimitError takes priority + } + throw error; + } registerChannel(targetPeerId, channel, 'reading channel to'); } } catch (problem) { - // Re-throw ResourceLimitError to propagate to caller - if (problem instanceof ResourceLimitError) { - throw problem; - } outputError(targetPeerId, `opening connection`, problem); handleConnectionLoss(targetPeerId); throw problem; @@ -392,6 +431,17 @@ export async function initTransport( reconnectionManager.resetAllBackoffs(); } + /** + * Close a rejected inbound channel, logging any errors. + * + * @param channel - The channel to close. + */ + function closeRejectedChannel(channel: Channel): void { + connectionFactory.closeChannel(channel, channel.peerId).catch((problem) => { + outputError(channel.peerId, 'closing rejected inbound channel', problem); + }); + } + // Set up inbound connection handler connectionFactory.onInboundConnection((channel) => { // Reject inbound connections from intentionally closed peers @@ -399,7 +449,7 @@ export async function initTransport( logger.log( `${channel.peerId}:: rejecting inbound connection from intentionally closed peer`, ); - // Don't add to channels map and don't start reading - connection will naturally close + closeRejectedChannel(channel); return; } @@ -411,6 +461,7 @@ export async function initTransport( logger.log( `${channel.peerId}:: rejecting inbound connection due to connection limit`, ); + closeRejectedChannel(channel); return; } throw error; @@ -508,6 +559,8 @@ export async function initTransport( await connectionFactory.stop(); peerStateManager.clear(); reconnectionManager.clear(); + messageRateLimiter.clear(); + connectionRateLimiter.clear(); } // Return the sender with a stop handle and connection management functions diff --git a/packages/ocap-kernel/src/remotes/types.ts b/packages/ocap-kernel/src/remotes/types.ts index b91523904..f278ff93a 100644 --- a/packages/ocap-kernel/src/remotes/types.ts +++ b/packages/ocap-kernel/src/remotes/types.ts @@ -77,6 +77,18 @@ export type RemoteCommsOptions = { * `resetStorage: true` when creating the kernel to clear existing identity first. */ mnemonic?: string | undefined; + /** + * Maximum messages per second per peer (default: 100). + * Messages exceeding this rate are rejected with ResourceLimitError. + * Uses a sliding 1-second window. + */ + maxMessagesPerSecond?: number | undefined; + /** + * Maximum connection attempts per minute per peer (default: 10). + * Connection attempts exceeding this rate are rejected with ResourceLimitError. + * Uses a sliding 1-minute window. + */ + maxConnectionAttemptsPerMinute?: number | undefined; }; export type RemoteInfo = {