Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
45f474b
feat(transport): add rate limiting for messages and connections
sirtimid Jan 26, 2026
812d8b1
test(transport): add unit tests for rate limiter
sirtimid Jan 26, 2026
b3c635d
fix: add rate limit types to ResourceLimitError
sirtimid Jan 26, 2026
768eeea
fix(transport): record message rate after send, rate limit reconnections
sirtimid Jan 26, 2026
76e30e2
test(e2e): fix queue limit test to avoid rate limit interference
sirtimid Jan 26, 2026
6b90047
fix(transport): avoid duplicate getCurrentCount calls in rate limit e…
sirtimid Jan 26, 2026
040914f
refactor(transport): move rate limiter constants to constants.ts
sirtimid Jan 26, 2026
bf3b1d1
fix(transport): use atomic checkAndRecord to prevent TOCTOU race
sirtimid Jan 26, 2026
7092a0a
fix(transport): preserve retry quota on rate limit, close rejected ch…
sirtimid Jan 28, 2026
e9e725a
fix(transport): add input validation and optimize rate limiter
sirtimid Jan 28, 2026
63ee982
fix(transport): distinguish rate limit from connection limit errors
sirtimid Jan 28, 2026
621ecd6
refactor(kernel-errors): add ResourceLimitType and ResourceLimitError…
sirtimid Jan 28, 2026
9d29f68
fix(transport): reject NaN and Infinity in rate limiter constructor
sirtimid Jan 28, 2026
61ca207
fix(transport): handle connection limit errors as retryable during re…
sirtimid Jan 28, 2026
15dfc1e
refactor(kernel-errors): add isResourceLimitError type guard
sirtimid Jan 28, 2026
bf458d0
fix(transport): preserve ResourceLimitError when closeChannel fails
sirtimid Jan 29, 2026
c8a1768
refactor(transport): deduplicate wouldExceedLimit by reusing getCurre…
sirtimid Jan 29, 2026
c4950a0
fix(kernel-errors): add isResourceLimitError to exports test
sirtimid Jan 29, 2026
0242e6e
fix(transport): add missing isResourceLimitError mock to tests
sirtimid Jan 29, 2026
f93baba
refactor(transport): extract closeRejectedChannel helper
sirtimid Jan 29, 2026
fe715d8
test(reconnection-lifecycle): add explicit call count assertions
sirtimid Jan 29, 2026
f215020
simplify
sirtimid Jan 29, 2026
6be75fa
test(reconnection-lifecycle): add flushPromises to prevent test flaki…
sirtimid Jan 29, 2026
31427c3
fix(transport): close channel when connection limit exceeded after dial
sirtimid Jan 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ describe('ResourceLimitError', () => {
stack: 'stack trace',
} as unknown as MarshaledOcapError,
expectedError:
'At path: data.limitType -- Expected the value to satisfy a union of `literal | literal`, but received: "invalid"',
'At path: data.limitType -- Expected the value to satisfy a union of `literal | literal | literal | literal`, but received: "invalid"',
},
{
name: 'invalid current type',
Expand Down
39 changes: 26 additions & 13 deletions packages/kernel-errors/src/errors/ResourceLimitError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,24 @@ import { BaseError } from '../BaseError.ts';
import { marshaledErrorSchema, ErrorCode } from '../constants.ts';
import type { ErrorOptionsWithStack, MarshaledOcapError } from '../types.ts';

/**
* The type of resource limit that was exceeded.
*/
export type ResourceLimitType =
| 'connection'
| 'messageSize'
| 'messageRate'
| 'connectionRate';

/**
* Data associated with a ResourceLimitError.
*/
export type ResourceLimitErrorData = {
limitType?: ResourceLimitType;
current?: number;
limit?: number;
};

/**
* Error indicating a resource limit was exceeded.
*/
Expand All @@ -28,11 +46,7 @@ export class ResourceLimitError extends BaseError {
constructor(
message: string,
options?: ErrorOptionsWithStack & {
data?: {
limitType?: 'connection' | 'messageSize';
current?: number;
limit?: number;
};
data?: ResourceLimitErrorData;
},
) {
super(ErrorCode.ResourceLimitError, message, {
Expand All @@ -50,7 +64,12 @@ export class ResourceLimitError extends BaseError {
data: optional(
object({
limitType: optional(
union([literal('connection'), literal('messageSize')]),
union([
literal('connection'),
literal('messageSize'),
literal('messageRate'),
literal('connectionRate'),
]),
),
current: optional(number()),
limit: optional(number()),
Expand All @@ -73,13 +92,7 @@ export class ResourceLimitError extends BaseError {
): ResourceLimitError {
assert(marshaledError, this.struct);
const options = unmarshalErrorOptions(marshaledError);
const data = marshaledError.data as
| {
limitType?: 'connection' | 'messageSize';
current?: number;
limit?: number;
}
| undefined;
const data = marshaledError.data as ResourceLimitErrorData | undefined;
return new ResourceLimitError(marshaledError.message, {
...options,
...(data !== undefined && { data }),
Expand Down
1 change: 1 addition & 0 deletions packages/kernel-errors/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ describe('index', () => {
'isMarshaledError',
'isMarshaledOcapError',
'isOcapError',
'isResourceLimitError',
'isRetryableNetworkError',
'marshalError',
'toError',
Expand Down
7 changes: 6 additions & 1 deletion packages/kernel-errors/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ export { VatNotFoundError } from './errors/VatNotFoundError.ts';
export { StreamReadError } from './errors/StreamReadError.ts';
export { SubclusterNotFoundError } from './errors/SubclusterNotFoundError.ts';
export { AbortError } from './errors/AbortError.ts';
export { ResourceLimitError } from './errors/ResourceLimitError.ts';
export {
ResourceLimitError,
type ResourceLimitType,
type ResourceLimitErrorData,
} from './errors/ResourceLimitError.ts';
export {
ErrorCode,
ErrorSentinel,
Expand All @@ -23,3 +27,4 @@ export { unmarshalError } from './marshal/unmarshalError.ts';
export { isMarshaledError } from './marshal/isMarshaledError.ts';
export { isMarshaledOcapError } from './marshal/isMarshaledOcapError.ts';
export { isRetryableNetworkError } from './utils/isRetryableNetworkError.ts';
export { isResourceLimitError } from './utils/isResourceLimitError.ts';
80 changes: 80 additions & 0 deletions packages/kernel-errors/src/utils/isResourceLimitError.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { describe, it, expect } from 'vitest';

import { isResourceLimitError } from './isResourceLimitError.ts';
import { ResourceLimitError } from '../errors/ResourceLimitError.ts';

describe('isResourceLimitError', () => {
describe('without limitType parameter', () => {
it('returns true for ResourceLimitError', () => {
const error = new ResourceLimitError('limit exceeded');
expect(isResourceLimitError(error)).toBe(true);
});

it('returns true for ResourceLimitError with any limitType', () => {
const connectionError = new ResourceLimitError('connection limit', {
data: { limitType: 'connection' },
});
const rateError = new ResourceLimitError('rate limit', {
data: { limitType: 'connectionRate' },
});

expect(isResourceLimitError(connectionError)).toBe(true);
expect(isResourceLimitError(rateError)).toBe(true);
});

it('returns false for regular Error', () => {
const error = new Error('some error');
expect(isResourceLimitError(error)).toBe(false);
});

it('returns false for null', () => {
expect(isResourceLimitError(null)).toBe(false);
});

it('returns false for undefined', () => {
expect(isResourceLimitError(undefined)).toBe(false);
});

it('returns false for non-error objects', () => {
expect(isResourceLimitError({ message: 'fake error' })).toBe(false);
});
});

describe('with limitType parameter', () => {
it('returns true when limitType matches', () => {
const error = new ResourceLimitError('connection limit', {
data: { limitType: 'connection' },
});
expect(isResourceLimitError(error, 'connection')).toBe(true);
});

it('returns false when limitType does not match', () => {
const error = new ResourceLimitError('connection limit', {
data: { limitType: 'connection' },
});
expect(isResourceLimitError(error, 'connectionRate')).toBe(false);
});

it('returns false when error has no limitType', () => {
const error = new ResourceLimitError('limit exceeded');
expect(isResourceLimitError(error, 'connection')).toBe(false);
});

it('returns false for non-ResourceLimitError even with matching-like data', () => {
const error = new Error('some error');
expect(isResourceLimitError(error, 'connection')).toBe(false);
});

it.each([
'connection',
'connectionRate',
'messageSize',
'messageRate',
] as const)('correctly identifies %s limitType', (limitType) => {
const error = new ResourceLimitError('limit exceeded', {
data: { limitType },
});
expect(isResourceLimitError(error, limitType)).toBe(true);
});
});
});
Comment on lines +6 to +80
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LG in this PR.

This whole suite tests $x_i \mapsto y_i$. There must be a shorter way to write this moving forward, that reads faster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its ok, not that difficult to read

28 changes: 28 additions & 0 deletions packages/kernel-errors/src/utils/isResourceLimitError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { ResourceLimitError } from '../errors/ResourceLimitError.ts';
import type {
ResourceLimitType,
ResourceLimitErrorData,
} from '../errors/ResourceLimitError.ts';

/**
* Check if an error is a ResourceLimitError, optionally with a specific limit type.
*
* @param error - The error to check.
* @param limitType - Optional limit type to match against.
* @returns True if the error is a ResourceLimitError (with matching limitType if specified).
*/
export function isResourceLimitError(
error: unknown,
limitType?: ResourceLimitType,
): error is ResourceLimitError {
if (!(error instanceof ResourceLimitError)) {
return false;
}

if (limitType === undefined) {
return true;
}

const data = error.data as ResourceLimitErrorData | undefined;
return data?.limitType === limitType;
}
2 changes: 2 additions & 0 deletions packages/nodejs/test/e2e/remote-comms.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -524,12 +524,14 @@ describe.sequential('Remote Communications E2E', () => {
it(
'rejects new messages when queue reaches MAX_QUEUE limit',
async () => {
// Use high rate limit to avoid rate limiting interference with queue limit test
const { aliceRef, bobURL } = await setupAliceAndBob(
kernel1,
kernel2,
kernelStore1,
kernelStore2,
testRelays,
{ maxMessagesPerSecond: 500 },
);

await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']);
Expand Down
12 changes: 9 additions & 3 deletions packages/nodejs/test/helpers/remote-comms.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import type { KernelDatabase } from '@metamask/kernel-store';
import { stringify } from '@metamask/kernel-utils';
import { Kernel, kunser, makeKernelStore } from '@metamask/ocap-kernel';
import type { ClusterConfig, KRef } from '@metamask/ocap-kernel';
import type {
ClusterConfig,
KRef,
RemoteCommsOptions,
} from '@metamask/ocap-kernel';

import { makeTestKernel } from './kernel.ts';

Expand Down Expand Up @@ -181,6 +185,7 @@ export async function wait(ms: number): Promise<void> {
* @param kernelStore1 - Kernel store for first kernel.
* @param kernelStore2 - Kernel store for second kernel.
* @param relays - Array of relay addresses.
* @param remoteCommsOptions - Optional additional options for initRemoteComms.
* @returns Object with all setup data including URLs and references.
*/
export async function setupAliceAndBob(
Expand All @@ -189,6 +194,7 @@ export async function setupAliceAndBob(
kernelStore1: ReturnType<typeof makeKernelStore>,
kernelStore2: ReturnType<typeof makeKernelStore>,
relays: string[],
remoteCommsOptions?: Omit<RemoteCommsOptions, 'relays'>,
): Promise<{
aliceURL: string;
bobURL: string;
Expand All @@ -197,8 +203,8 @@ export async function setupAliceAndBob(
peerId1: string;
peerId2: string;
}> {
await kernel1.initRemoteComms({ relays });
await kernel2.initRemoteComms({ relays });
await kernel1.initRemoteComms({ relays, ...remoteCommsOptions });
await kernel2.initRemoteComms({ relays, ...remoteCommsOptions });

const aliceConfig = makeRemoteVatConfig('Alice');
const bobConfig = makeRemoteVatConfig('Bob');
Expand Down
12 changes: 12 additions & 0 deletions packages/ocap-kernel/src/remotes/platform/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,15 @@ export const DEFAULT_WRITE_TIMEOUT_MS = 10_000;

/** SCTP user initiated abort code (RFC 4960) */
export const SCTP_USER_INITIATED_ABORT = 12;

/** Default message rate limit: 100 messages per second per peer */
export const DEFAULT_MESSAGE_RATE_LIMIT = 100;

/** Default message rate window in milliseconds (1 second) */
export const DEFAULT_MESSAGE_RATE_WINDOW_MS = 1000;

/** Default connection attempt rate limit: 10 attempts per minute per peer */
export const DEFAULT_CONNECTION_RATE_LIMIT = 10;

/** Default connection rate window in milliseconds (1 minute) */
export const DEFAULT_CONNECTION_RATE_WINDOW_MS = 60_000;
Loading
Loading