Skip to content

Comments

fix(streamer): await sendToManySubscriptions async dispatch#38681

Open
Shreyas2004wagh wants to merge 8 commits intoRocketChat:developfrom
Shreyas2004wagh:fix/38680-streamer-sendtomanysubscriptions
Open

fix(streamer): await sendToManySubscriptions async dispatch#38681
Shreyas2004wagh wants to merge 8 commits intoRocketChat:developfrom
Shreyas2004wagh:fix/38680-streamer-sendtomanysubscriptions

Conversation

@Shreyas2004wagh
Copy link
Contributor

@Shreyas2004wagh Shreyas2004wagh commented Feb 14, 2026

Proposed changes (including videos or screenshots)

This PR fixes async dispatch handling in the streamer delivery path.

  • Replaced forEach(async ...) in sendToManySubscriptions with Promise.allSettled(...) over mapped async tasks.
  • Ensures the method now waits for all per-subscription async permission checks and message dispatch attempts before resolving.
  • Prevents dropped/swallowed async failures by handling rejected tasks explicitly.
  • Added structured error logging (SystemLogger.error) for failed subscription deliveries.
  • Kept the existing call-site behavior (void this.sendToManySubscriptions(...)) unchanged, so this remains a scoped reliability fix with no API contract changes.

Issue(s)

Steps to test or reproduce

  1. Start the server with this branch.
  2. Subscribe multiple clients to a streamer event where allowEmit is async (for example, existing async allowEmit rules like __my_messages__).
  3. Trigger streamer emissions for that event.
  4. Verify:
  • Deliveries continue to work for valid subscribers.
  • Async failures are logged instead of being silently detached.
  • No unhandled promise rejection is produced by this delivery path.

Further comments

  • Change is intentionally scoped to apps/meteor/server/modules/streamer/streamer.module.ts.
  • No schema/API changes.
  • No UI impact.

Summary by CodeRabbit

  • Bug Fixes

    • Delivery now processes subscribers concurrently and isolates per-recipient failures so one failing delivery won't stop others; added per-recipient error capture and centralized handling while preserving permission and retransmit guards and origin-skip behavior.
  • Tests

    • Added unit tests covering permission checks (including delayed resolution), origin-skip behavior, error resilience, and logging when permission checks fail.

Copilot AI review requested due to automatic review settings February 14, 2026 09:44
@Shreyas2004wagh Shreyas2004wagh requested a review from a team as a code owner February 14, 2026 09:44
@changeset-bot
Copy link

changeset-bot bot commented Feb 14, 2026

⚠️ No Changeset found

Latest commit: dc8784a

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@dionisio-bot
Copy link
Contributor

dionisio-bot bot commented Feb 14, 2026

Looks like this PR is not ready to merge, because of the following issues:

  • This PR is missing the 'stat: QA assured' label
  • This PR is missing the required milestone or project

Please fix the issues and try again

If you have any trouble, please check the PR guidelines

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 14, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Reworked Streamer.sendToManySubscriptions to perform per-subscription async permission checks and sends concurrently via Promise.allSettled, skip the origin subscription, and log per-subscription permission or delivery failures while continuing delivery to other subscribers.

Changes

Cohort / File(s) Summary
Streamer Delivery Logic
apps/meteor/server/modules/streamer/streamer.module.ts
Replaced per-subscription async forEach with concurrent processing (Promise.allSettled). Each subscription runs isEmitAllowed and send in its own try/catch; permission or send rejections are caught and logged with eventName/streamName. Retains retransmitToSelf guard and origin-skip logic.
Unit tests for sendToManySubscriptions
apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts
Added tests exercising delayed permission checks (gate Promise), verifying origin skip, selective sends to allowed subs, and resilience to permission-check rejections (errors logged, other deliveries continue).

Sequence Diagram(s)

sequenceDiagram
    participant Streamer as Streamer
    participant SubA as Subscription A
    participant SubB as Subscription B
    participant SubC as Subscription C
    participant Logger as SystemLogger

    Streamer->>SubA: isEmitAllowed(subA) (async)
    Streamer->>SubB: isEmitAllowed(subB) (async)
    Streamer->>SubC: isEmitAllowed(subC) (async)
    Note over SubA,SubC: permission checks resolve/reject independently
    alt allowed
        SubA-->>Streamer: allowed
        Streamer->>SubA: send(msg)
        SubA-->>Streamer: send fulfilled / rejected
    else denied
        SubB-->>Streamer: denied (skip send)
    end
    alt isEmitAllowed rejects
        SubC-->>Streamer: error
        Streamer->>Logger: log permission error (eventName, streamName)
    end
    Streamer->>Logger: log any per-subscription send failures from allSettled
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

🐇 I hop through checks with gentle zest,
Some gates delay, some pass the test.
One stumbles — I note with a wink,
The rest bound on, no chain to sink.
AllSettled keeps our meadow blessed. 🥕

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically describes the main change: converting sendToManySubscriptions to await async dispatch using Promise.allSettled instead of forEach.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a critical async handling bug in the streamer module where forEach(async ...) was not awaiting async operations, causing unreliable message delivery and potentially dropped errors.

Changes:

  • Replaced forEach(async ...) with Promise.allSettled() to properly await all subscription deliveries
  • Added explicit error handling and logging for failed subscription deliveries
  • Ensures async permission checks complete deterministically before the method resolves

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues found across 1 file

@codecov
Copy link

codecov bot commented Feb 14, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 70.60%. Comparing base (11e1c51) to head (dc8784a).
⚠️ Report is 62 commits behind head on develop.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff             @@
##           develop   #38681      +/-   ##
===========================================
+ Coverage    70.49%   70.60%   +0.10%     
===========================================
  Files         3175     3189      +14     
  Lines       111094   112705    +1611     
  Branches     20045    20402     +357     
===========================================
+ Hits         78311    79570    +1259     
- Misses       30738    31077     +339     
- Partials      2045     2058      +13     
Flag Coverage Δ
e2e 60.36% <ø> (-0.11%) ⬇️
e2e-api 48.88% <0.00%> (+1.11%) ⬆️
unit 71.63% <90.00%> (+0.15%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@Shreyas2004wagh
Copy link
Contributor Author

Codecov is flagging the new async branches introduced in sendToManySubscriptions.

I’m adding focused unit tests to cover the updated behavior (async per-subscription handling, error logging, and delivery semantics) and will push them shortly to address patch coverage.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts`:
- Around line 55-58: The afterEach cleanup uses
createdStreamers.splice(0).forEach((name) => delete
StreamerCentral.instances[name]) which returns a boolean from the forEach
callback and triggers Biome lint; change the cleanup to iterate with a for...of
loop (or a simple for loop) over createdStreamers.splice(0) and perform delete
StreamerCentral.instances[name] inside the loop so the callback does not return
a value; update the afterEach that also calls sinon.restore() accordingly.
🧹 Nitpick comments (1)
apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts (1)

7-19: Nit: Remove inline comments per coding guidelines.

The // no-op for unit test comments on the stub methods can be dropped — the empty body already conveys the intent. As per coding guidelines, **/*.{ts,tsx,js}: "Avoid code comments in the implementation."

Suggested diff
 class TestStreamer extends Streamer<any> {
-	registerPublication(): void {
-		// no-op for unit test
-	}
+	registerPublication(): void {}
 
-	registerMethod(): void {
-		// no-op for unit test
-	}
+	registerMethod(): void {}
 
 	changedPayload(): string {
 		return 'payload';
 	}
 }
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4602c06 and b0dee87.

📒 Files selected for processing (1)
  • apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts
🧰 Additional context used
📓 Path-based instructions (2)
**/*.{ts,tsx,js}

📄 CodeRabbit inference engine (.cursor/rules/playwright.mdc)

**/*.{ts,tsx,js}: Write concise, technical TypeScript/JavaScript with accurate typing in Playwright tests
Avoid code comments in the implementation

Files:

  • apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts
**/*.spec.ts

📄 CodeRabbit inference engine (.cursor/rules/playwright.mdc)

**/*.spec.ts: Use descriptive test names that clearly communicate expected behavior in Playwright tests
Use .spec.ts extension for test files (e.g., login.spec.ts)

Files:

  • apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts
🧠 Learnings (9)
📚 Learning: 2025-11-24T17:08:17.065Z
Learnt from: CR
Repo: RocketChat/Rocket.Chat PR: 0
File: .cursor/rules/playwright.mdc:0-0
Timestamp: 2025-11-24T17:08:17.065Z
Learning: Applies to apps/meteor/tests/e2e/**/*.spec.ts : Ensure tests run reliably in parallel without shared state conflicts

Applied to files:

  • apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts
📚 Learning: 2025-11-24T17:08:17.065Z
Learnt from: CR
Repo: RocketChat/Rocket.Chat PR: 0
File: .cursor/rules/playwright.mdc:0-0
Timestamp: 2025-11-24T17:08:17.065Z
Learning: Applies to apps/meteor/tests/e2e/**/*.spec.ts : Group related tests in the same file

Applied to files:

  • apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts
📚 Learning: 2025-11-24T17:08:17.065Z
Learnt from: CR
Repo: RocketChat/Rocket.Chat PR: 0
File: .cursor/rules/playwright.mdc:0-0
Timestamp: 2025-11-24T17:08:17.065Z
Learning: Applies to apps/meteor/tests/e2e/**/*.spec.ts : Maintain test isolation between test cases in Playwright tests

Applied to files:

  • apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts
📚 Learning: 2025-11-24T17:08:17.065Z
Learnt from: CR
Repo: RocketChat/Rocket.Chat PR: 0
File: .cursor/rules/playwright.mdc:0-0
Timestamp: 2025-11-24T17:08:17.065Z
Learning: Applies to apps/meteor/tests/e2e/**/*.spec.ts : All test files must be created in `apps/meteor/tests/e2e/` directory

Applied to files:

  • apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts
📚 Learning: 2025-12-10T21:00:54.909Z
Learnt from: KevLehman
Repo: RocketChat/Rocket.Chat PR: 37091
File: ee/packages/abac/jest.config.ts:4-7
Timestamp: 2025-12-10T21:00:54.909Z
Learning: Rocket.Chat monorepo: Jest testMatch pattern '<rootDir>/src/**/*.spec.(ts|js|mjs)' is valid in this repo and used across multiple packages (e.g., packages/tools, ee/packages/omnichannel-services). Do not flag it as invalid in future reviews.

Applied to files:

  • apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts
📚 Learning: 2025-11-24T17:08:17.065Z
Learnt from: CR
Repo: RocketChat/Rocket.Chat PR: 0
File: .cursor/rules/playwright.mdc:0-0
Timestamp: 2025-11-24T17:08:17.065Z
Learning: Applies to apps/meteor/tests/e2e/**/*.spec.ts : Utilize Playwright fixtures (`test`, `page`, `expect`) for consistency in test files

Applied to files:

  • apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts
📚 Learning: 2025-11-24T17:08:17.065Z
Learnt from: CR
Repo: RocketChat/Rocket.Chat PR: 0
File: .cursor/rules/playwright.mdc:0-0
Timestamp: 2025-11-24T17:08:17.065Z
Learning: Applies to apps/meteor/tests/e2e/**/*.spec.ts : Use `test.step()` for complex test scenarios to improve organization in Playwright tests

Applied to files:

  • apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts
📚 Learning: 2025-11-24T17:08:17.065Z
Learnt from: CR
Repo: RocketChat/Rocket.Chat PR: 0
File: .cursor/rules/playwright.mdc:0-0
Timestamp: 2025-11-24T17:08:17.065Z
Learning: Applies to apps/meteor/tests/e2e/**/*.spec.ts : Use `expect` matchers for assertions (`toEqual`, `toContain`, `toBeTruthy`, `toHaveLength`, etc.) instead of `assert` statements in Playwright tests

Applied to files:

  • apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts
📚 Learning: 2025-11-24T17:08:17.065Z
Learnt from: CR
Repo: RocketChat/Rocket.Chat PR: 0
File: .cursor/rules/playwright.mdc:0-0
Timestamp: 2025-11-24T17:08:17.065Z
Learning: Applies to apps/meteor/tests/e2e/**/*.spec.ts : Ensure clean state for each test execution in Playwright tests

Applied to files:

  • apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts
🪛 Biome (2.3.14)
apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts

[error] 57-57: This callback passed to forEach() iterable method should not return a value.

Either remove this return or remove the returned value.

(lint/suspicious/useIterableCallbackReturn)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: 📦 Build Packages
  • GitHub Check: cubic · AI code reviewer
  • GitHub Check: CodeQL-Build
🔇 Additional comments (3)
apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts (3)

60-80: LGTM!

The gate-based pattern cleanly validates that dispatch is deferred until the async permission check resolves. Well structured.


82-104: LGTM!

Origin filtering, allowed dispatch, and denied-subscription skipping are all well covered. The test correctly relies on Set insertion-order iteration for deterministic stub sequencing.


106-130: LGTM!

Good resilience test — confirms that a rejected permission check is logged with the expected structured payload and does not block delivery to other subscribers.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@Shreyas2004wagh
Copy link
Contributor Author

Added focused unit tests covering async dispatch and error handling in sendToManySubscriptions. This should address the Codecov patch coverage.

@Shreyas2004wagh
Copy link
Contributor Author

I reverted all changes outside the streamer scope.

After reviewing CI failures:

  • API EE failure (07-queue.ts) reproduces on recent develop and is unrelated to streamer changes (test/data assumption issue).
  • UI CE and UI EE failures are Playwright timeouts and appear flaky; no streamer-layer behavior is involved.

Given the streamer-only scope of this PR, there is no appropriate cross-layer fix to apply here.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0aba435 and 20eb057.

📒 Files selected for processing (2)
  • apps/meteor/server/modules/streamer/streamer.module.ts
  • apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • apps/meteor/tests/unit/server/modules/streamer/streamer.module.spec.ts
🧰 Additional context used
📓 Path-based instructions (1)
**/*.{ts,tsx,js}

📄 CodeRabbit inference engine (.cursor/rules/playwright.mdc)

**/*.{ts,tsx,js}: Write concise, technical TypeScript/JavaScript with accurate typing in Playwright tests
Avoid code comments in the implementation

Files:

  • apps/meteor/server/modules/streamer/streamer.module.ts
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: 📦 Build Packages
  • GitHub Check: cubic · AI code reviewer
  • GitHub Check: CodeQL-Build
🔇 Additional comments (1)
apps/meteor/server/modules/streamer/streamer.module.ts (1)

293-316: Core concurrent-dispatch fix is correct; note Promise.all vs Promise.allSettled in PR description.

The use of await Promise.all([...subscriptions].map(async ...)) correctly awaits all per-subscription async permission checks and sends concurrently, with one failure not blocking others (enforced by the internal try/catch). The retransmitToSelf guard, truthy allowed check, and optional-chained socket?.send() are all preserved correctly.

Minor note: the PR description references Promise.allSettled, but since the entire callback body is wrapped in try/catch, no mapped promise can reject — Promise.all and Promise.allSettled are functionally equivalent in this shape. No change needed, but the PR description should be updated for accuracy.

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/meteor/server/modules/streamer/streamer.module.ts`:
- Around line 307-313: The catch in the streamer delivery path currently calls
SystemLogger.debug which silences real failures; update the catch in the
delivery block inside streamer.module.ts to log at a higher level (use
SystemLogger.error per PR intent, or at minimum SystemLogger.warn), keeping the
same structured fields (msg, eventName, streamName: this.name) and include the
caught err details so failures from isEmitAllowed or the TransformMessage/getMsg
callback are visible in production; adjust the log call where the catch
currently invokes SystemLogger.debug to SystemLogger.error and ensure err is
passed through in the log payload.

Comment on lines 65 to 78
let release!: (value: boolean) => void;
const gate = new Promise<boolean>((resolve) => {
release = resolve;
});

sinon.stub(streamer, 'isEmitAllowed').returns(gate as any);

const sendPromise = streamer.sendToManySubscriptions(new Set([sub.entry]), undefined, 'event', [], 'test-msg');

await Promise.resolve();
expect(sub.send.called).to.equal(false);

release(true);
await sendPromise;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can u simplify these test? this part is a bit confusing, i'm sure there's some work that can be done diff

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants