diff --git a/.github/workflows/chat-sync-live-nightly.yaml b/.github/workflows/chat-sync-live-nightly.yaml new file mode 100644 index 000000000..d4d9c9aea --- /dev/null +++ b/.github/workflows/chat-sync-live-nightly.yaml @@ -0,0 +1,53 @@ +name: Chat Sync Live Discord Nightly + +on: + schedule: + - cron: "17 8 * * *" + workflow_dispatch: + +jobs: + chat-sync-live: + runs-on: ubuntu-latest + timeout-minutes: 45 + permissions: + contents: read + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup Bun + uses: oven-sh/setup-bun@v1 + with: + bun-version: latest + + - name: Cache dependencies + uses: actions/cache@v4 + with: + path: | + ~/.bun/install/cache + node_modules + */node_modules + key: ${{ runner.os }}-bun-${{ hashFiles('**/bun.lock') }} + restore-keys: | + ${{ runner.os }}-bun- + + - name: Install dependencies + run: bun install --frozen-lockfile + + - name: Run live Discord sync suite + run: CHAT_SYNC_TEST_DIAGNOSTICS_DIR=apps/backend/.artifacts/chat-sync-live bun run --cwd apps/backend test:sync:live + env: + DISCORD_SYNC_TEST_GUILD_ID: ${{ secrets.DISCORD_SYNC_TEST_GUILD_ID }} + DISCORD_SYNC_TEST_CHANNEL_ID: ${{ secrets.DISCORD_SYNC_TEST_CHANNEL_ID }} + DISCORD_SYNC_TEST_CHANNEL_ID_2: ${{ secrets.DISCORD_SYNC_TEST_CHANNEL_ID_2 }} + DISCORD_SYNC_TEST_BOT_TOKEN: ${{ secrets.DISCORD_SYNC_TEST_BOT_TOKEN }} + DISCORD_BOT_TOKEN: ${{ secrets.DISCORD_BOT_TOKEN }} + + - name: Upload live sync diagnostics + if: failure() + uses: actions/upload-artifact@v4 + with: + name: chat-sync-live-diagnostics-${{ github.run_id }} + path: apps/backend/.artifacts/chat-sync-live + if-no-files-found: ignore diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 5715742d6..6c21d0c11 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -97,11 +97,22 @@ jobs: - name: Install dependencies run: bun install --frozen-lockfile + - name: Run sync regression suite + run: CHAT_SYNC_TEST_DIAGNOSTICS_DIR=apps/backend/.artifacts/chat-sync bun run --cwd apps/backend test:sync + - name: Run tests run: bun run test:once --coverage.enabled true env: VITE_BACKEND_URL: ${{ vars.VITE_BACKEND_URL }} + - name: Upload sync diagnostics + if: failure() + uses: actions/upload-artifact@v4 + with: + name: chat-sync-diagnostics-${{ github.run_id }} + path: apps/backend/.artifacts/chat-sync + if-no-files-found: ignore + - name: "Report Coverage" if: always() uses: davelosert/vitest-coverage-report-action@v2 diff --git a/apps/backend/package.json b/apps/backend/package.json index ca26b6c5e..4a035feca 100644 --- a/apps/backend/package.json +++ b/apps/backend/package.json @@ -13,6 +13,8 @@ "start": "bun run src/index.ts", "typecheck": "tsc --noEmit", "test": "vitest run src/**/*.test.ts", + "test:sync": "vitest run src/services/chat-sync/*.test.ts src/services/chat-sync/*.integration.test.ts src/routes/webhooks.http.test.ts --exclude src/services/chat-sync/*.e2e.test.ts", + "test:sync:live": "vitest run src/services/chat-sync/chat-sync-live-discord.e2e.test.ts", "test:watch": "vitest src/**/*.test.ts", "test:policies": "vitest run src/lib/policy-utils.test.ts src/policies/*.test.ts", "setup": "bun run scripts/setup.ts", @@ -47,6 +49,7 @@ "pg": "^8.16.3" }, "devDependencies": { + "@testcontainers/postgresql": "^10.18.0", "@effect/language-service": "catalog:effect", "@types/bun": "1.3.8", "drizzle-kit": "^0.31.8", diff --git a/apps/backend/src/routes/webhooks.http.test.ts b/apps/backend/src/routes/webhooks.http.test.ts index ef9a65cd7..85dcae9a8 100644 --- a/apps/backend/src/routes/webhooks.http.test.ts +++ b/apps/backend/src/routes/webhooks.http.test.ts @@ -7,6 +7,7 @@ import { syncSequinWebhookEventToDiscord, } from "./webhooks.http.ts" import { SequinWebhookPayload, type SequinWebhookEvent } from "@hazel/domain/http" +import { recordChatSyncDiagnostic } from "../test/chat-sync-test-diagnostics" const metadataDefaults = { idempotency_key: "idempotency-default", @@ -74,6 +75,54 @@ const makeEvent = ( } as unknown as SequinWebhookEvent } +const makeWorkerSpy = () => { + const calls: Array<{ + method: string + id: string + dedupeKey?: string + }> = [] + + const worker: Parameters[2] = { + syncHazelMessageCreateToAllConnections: (messageId: string, dedupeKey?: string) => + Effect.sync(() => { + calls.push({ method: "syncHazelMessageCreateToAllConnections", id: messageId, dedupeKey }) + return { synced: 1, failed: 0 } + }), + syncHazelMessageUpdateToAllConnections: (messageId: string, dedupeKey?: string) => + Effect.sync(() => { + calls.push({ method: "syncHazelMessageUpdateToAllConnections", id: messageId, dedupeKey }) + return { synced: 1, failed: 0 } + }), + syncHazelMessageDeleteToAllConnections: (messageId: string, dedupeKey?: string) => + Effect.sync(() => { + calls.push({ method: "syncHazelMessageDeleteToAllConnections", id: messageId, dedupeKey }) + return { synced: 1, failed: 0 } + }), + syncHazelReactionCreateToAllConnections: (reactionId: string, dedupeKey?: string) => + Effect.sync(() => { + calls.push({ method: "syncHazelReactionCreateToAllConnections", id: reactionId, dedupeKey }) + return { synced: 1, failed: 0 } + }), + syncHazelReactionDeleteToAllConnections: ( + payload: { hazelMessageId: string }, + dedupeKey?: string, + ) => + Effect.sync(() => { + calls.push({ + method: "syncHazelReactionDeleteToAllConnections", + id: payload.hazelMessageId, + dedupeKey, + }) + return { synced: 1, failed: 0 } + }), + } + + return { + worker, + calls, + } +} + describe("sequin webhook payload decoding", () => { it("accepts message_reactions payloads without updatedAt", () => { Schema.decodeUnknownSync(SequinWebhookPayload)({ @@ -309,3 +358,236 @@ describe("sequin webhook processing order", () => { expect(workerCalls).toEqual(["create:msg-attachment-backed"]) }) }) + +describe("sequin webhook sync routing matrix", () => { + it("routes message insert/update/delete and soft-delete update to expected worker methods", async () => { + const { worker, calls } = makeWorkerSpy() + const messageId = "msg-route-1" + + const insertEvent = makeEvent(makeMessageRecord(messageId), "messages", { + action: "insert", + commit_timestamp: "2026-02-01T12:00:00.000Z", + commit_lsn: 10, + commit_idx: 0, + }) + const updateEvent = makeEvent( + { + ...makeMessageRecord(messageId), + content: "updated", + }, + "messages", + { + action: "update", + commit_timestamp: "2026-02-01T12:00:01.000Z", + commit_lsn: 10, + commit_idx: 1, + }, + ) + const deleteEvent = makeEvent(makeMessageRecord(messageId), "messages", { + action: "delete", + commit_timestamp: "2026-02-01T12:00:02.000Z", + commit_lsn: 10, + commit_idx: 2, + }) + const softDeleteUpdateEvent = makeEvent( + makeMessageRecord(messageId, "2026-02-01T12:00:03.000Z"), + "messages", + { + action: "update", + commit_timestamp: "2026-02-01T12:00:03.000Z", + commit_lsn: 10, + commit_idx: 3, + }, + ) + + await Effect.runPromise( + syncSequinWebhookEventToDiscord(insertEvent, "integration-bot", worker), + ) + await Effect.runPromise( + syncSequinWebhookEventToDiscord(updateEvent, "integration-bot", worker), + ) + await Effect.runPromise( + syncSequinWebhookEventToDiscord(deleteEvent, "integration-bot", worker), + ) + await Effect.runPromise( + syncSequinWebhookEventToDiscord(softDeleteUpdateEvent, "integration-bot", worker), + ) + + recordChatSyncDiagnostic({ + suite: "webhooks.http", + testCase: "message-routing-matrix", + workerMethod: calls[0]?.method ?? "unknown", + action: "route", + dedupeKey: calls[0]?.dedupeKey, + expected: "create/update/delete/delete", + actual: calls.map((call) => call.method).join("/"), + metadata: { + messageId, + }, + }) + + expect(calls.map((call) => call.method)).toEqual([ + "syncHazelMessageCreateToAllConnections", + "syncHazelMessageUpdateToAllConnections", + "syncHazelMessageDeleteToAllConnections", + "syncHazelMessageDeleteToAllConnections", + ]) + expect(calls.every((call) => call.dedupeKey?.includes("hazel:sequin:messages"))).toBe(true) + }) + + it("routes reaction insert/delete and ignores reaction update action", async () => { + const { worker, calls } = makeWorkerSpy() + + const insertEvent = makeEvent(makeReactionRecord("react-route-1"), "message_reactions", { + action: "insert", + commit_timestamp: "2026-02-01T12:10:00.000Z", + commit_lsn: 20, + commit_idx: 0, + }) + const deleteEvent = makeEvent(makeReactionRecord("react-route-2"), "message_reactions", { + action: "delete", + commit_timestamp: "2026-02-01T12:10:01.000Z", + commit_lsn: 20, + commit_idx: 1, + }) + const updateEvent = makeEvent(makeReactionRecord("react-route-3"), "message_reactions", { + action: "update", + commit_timestamp: "2026-02-01T12:10:02.000Z", + commit_lsn: 20, + commit_idx: 2, + }) + + await Effect.runPromise( + syncSequinWebhookEventToDiscord(insertEvent, "integration-bot", worker), + ) + await Effect.runPromise( + syncSequinWebhookEventToDiscord(deleteEvent, "integration-bot", worker), + ) + await Effect.runPromise( + syncSequinWebhookEventToDiscord(updateEvent, "integration-bot", worker), + ) + + expect(calls.map((call) => call.method)).toEqual([ + "syncHazelReactionCreateToAllConnections", + "syncHazelReactionDeleteToAllConnections", + ]) + expect(calls.every((call) => call.dedupeKey?.includes("hazel:sequin:message_reactions"))).toBe( + true, + ) + }) + + it("filters integration bot authored message/reaction events to prevent loops", async () => { + const { worker, calls } = makeWorkerSpy() + const integrationBotUserId = "integration-bot" + + await Effect.runPromise( + syncSequinWebhookEventToDiscord( + makeEvent( + { + ...makeMessageRecord("msg-loop"), + authorId: integrationBotUserId, + }, + "messages", + { + action: "insert", + commit_timestamp: "2026-02-01T12:20:00.000Z", + commit_lsn: 30, + commit_idx: 0, + }, + ), + integrationBotUserId, + worker, + ), + ) + await Effect.runPromise( + syncSequinWebhookEventToDiscord( + makeEvent( + { + ...makeReactionRecord("reaction-loop"), + userId: integrationBotUserId, + }, + "message_reactions", + { + action: "insert", + commit_timestamp: "2026-02-01T12:20:01.000Z", + commit_lsn: 30, + commit_idx: 1, + }, + ), + integrationBotUserId, + worker, + ), + ) + + expect(calls).toHaveLength(0) + }) + + it("isolates per-event failures across message and reaction action types", async () => { + const workerCalls: string[] = [] + const failingWorker: Parameters[2] = { + syncHazelMessageCreateToAllConnections: () => Effect.fail(new Error("create failed")), + syncHazelMessageUpdateToAllConnections: (messageId: string) => + Effect.sync(() => { + workerCalls.push(`update:${messageId}`) + return { synced: 1, failed: 0 } + }), + syncHazelMessageDeleteToAllConnections: () => Effect.fail(new Error("delete failed")), + syncHazelReactionCreateToAllConnections: () => Effect.fail(new Error("reaction create failed")), + syncHazelReactionDeleteToAllConnections: (payload: { hazelMessageId: string }) => + Effect.sync(() => { + workerCalls.push(`reaction-delete:${payload.hazelMessageId}`) + return { synced: 1, failed: 0 } + }), + } + + const events: SequinWebhookEvent[] = [ + makeEvent(makeMessageRecord("msg-fail-create"), "messages", { + action: "insert", + commit_timestamp: "2026-02-01T13:00:00.000Z", + commit_lsn: 40, + commit_idx: 0, + }), + makeEvent(makeMessageRecord("msg-ok-update"), "messages", { + action: "update", + commit_timestamp: "2026-02-01T13:00:01.000Z", + commit_lsn: 40, + commit_idx: 1, + }), + makeEvent(makeMessageRecord("msg-fail-delete"), "messages", { + action: "delete", + commit_timestamp: "2026-02-01T13:00:02.000Z", + commit_lsn: 40, + commit_idx: 2, + }), + makeEvent(makeReactionRecord("reaction-fail-create"), "message_reactions", { + action: "insert", + commit_timestamp: "2026-02-01T13:00:03.000Z", + commit_lsn: 40, + commit_idx: 3, + }), + makeEvent(makeReactionRecord("reaction-ok-delete"), "message_reactions", { + action: "delete", + commit_timestamp: "2026-02-01T13:00:04.000Z", + commit_lsn: 40, + commit_idx: 4, + }), + ] + + await Effect.runPromise( + processSequinWebhookEventsInCommitOrder(events, (event) => + syncSequinWebhookEventToDiscord(event, "integration-bot", failingWorker), + ), + ) + + recordChatSyncDiagnostic({ + suite: "webhooks.http", + testCase: "failure-isolation", + workerMethod: "processSequinWebhookEventsInCommitOrder", + action: "continue_on_error", + expected: "update:msg-ok-update,reaction-delete:message-1", + actual: workerCalls.join(","), + }) + + expect(workerCalls).toEqual(["update:msg-ok-update", "reaction-delete:message-1"]) + }) +}) diff --git a/apps/backend/src/services/chat-sync/chat-sync-core-worker.integration.test.ts b/apps/backend/src/services/chat-sync/chat-sync-core-worker.integration.test.ts new file mode 100644 index 000000000..4bdaac8ce --- /dev/null +++ b/apps/backend/src/services/chat-sync/chat-sync-core-worker.integration.test.ts @@ -0,0 +1,865 @@ +import { randomUUID } from "node:crypto" +import { + ChannelRepo, + ChatSyncChannelLinkRepo, + ChatSyncConnectionRepo, + ChatSyncEventReceiptRepo, + ChatSyncMessageLinkRepo, + IntegrationConnectionRepo, + MessageReactionRepo, + MessageRepo, + OrganizationMemberRepo, + UserRepo, +} from "@hazel/backend-core" +import { and, Database, eq, isNull, schema } from "@hazel/db" +import type { + ChannelId, + ExternalChannelId, + ExternalMessageId, + ExternalThreadId, + ExternalUserId, + ExternalWebhookId, + MessageId, + MessageReactionId, + OrganizationId, + SyncChannelLinkId, + SyncConnectionId, + UserId, +} from "@hazel/schema" +import { Discord } from "@hazel/integrations" +import { Effect, Layer } from "effect" +import { afterAll, beforeAll, beforeEach, describe, expect, it } from "vitest" +import { createChatSyncDbHarness, type ChatSyncDbHarness } from "../../test/chat-sync-db-harness" +import { recordChatSyncDiagnostic } from "../../test/chat-sync-test-diagnostics" +import { ChannelAccessSyncService } from "../channel-access-sync" +import { IntegrationBotService } from "../integrations/integration-bot-service" +import { ChatSyncProviderRegistry, type ChatSyncProviderAdapter } from "./chat-sync-provider-registry" +import { ChatSyncCoreWorker } from "./chat-sync-core-worker" + +type SeedContext = { + organizationId: OrganizationId + authorUserId: UserId + botUserId: UserId + channelId: ChannelId +} + +type AdapterRecorder = { + readonly calls: { + createMessage: Array<{ + externalChannelId: ExternalChannelId + content: string + replyToExternalMessageId?: ExternalMessageId + }> + updateMessage: Array<{ + externalChannelId: ExternalChannelId + externalMessageId: ExternalMessageId + content: string + }> + deleteMessage: Array<{ + externalChannelId: ExternalChannelId + externalMessageId: ExternalMessageId + }> + addReaction: Array<{ + externalChannelId: ExternalChannelId + externalMessageId: ExternalMessageId + emoji: string + }> + removeReaction: Array<{ + externalChannelId: ExternalChannelId + externalMessageId: ExternalMessageId + emoji: string + }> + createThread: Array<{ + externalChannelId: ExternalChannelId + externalMessageId: ExternalMessageId + name: string + }> + } + readonly adapter: ChatSyncProviderAdapter +} + +const runEffect = (effect: Effect.Effect) => + Effect.runPromise((effect as Effect.Effect).pipe(Effect.scoped)) + +const uuid = () => randomUUID() as T + +const makeAdapterRecorder = (): AdapterRecorder => { + let messageSeq = 0 + let threadSeq = 0 + + const calls: AdapterRecorder["calls"] = { + createMessage: [], + updateMessage: [], + deleteMessage: [], + addReaction: [], + removeReaction: [], + createThread: [], + } + + const adapter: ChatSyncProviderAdapter = { + provider: "discord", + createMessage: (params) => + Effect.sync(() => { + calls.createMessage.push(params) + messageSeq += 1 + return `20000000000000000${messageSeq}` as ExternalMessageId + }), + createMessageWithAttachments: (params) => + Effect.sync(() => { + calls.createMessage.push({ + externalChannelId: params.externalChannelId, + content: params.content, + replyToExternalMessageId: params.replyToExternalMessageId, + }) + messageSeq += 1 + return `20000000000000000${messageSeq}` as ExternalMessageId + }), + updateMessage: (params) => + Effect.sync(() => { + calls.updateMessage.push(params) + }), + deleteMessage: (params) => + Effect.sync(() => { + calls.deleteMessage.push(params) + }), + addReaction: (params) => + Effect.sync(() => { + calls.addReaction.push(params) + }), + removeReaction: (params) => + Effect.sync(() => { + calls.removeReaction.push(params) + }), + createThread: (params) => + Effect.sync(() => { + calls.createThread.push(params) + threadSeq += 1 + return `30000000000000000${threadSeq}` as ExternalThreadId + }), + } + + return { calls, adapter } +} + +const insertBaseContext = (harness: ChatSyncDbHarness) => + harness.run( + Effect.gen(function* () { + const db = yield* Database.Database + + const organizationId = uuid() + const authorUserId = uuid() + const botUserId = uuid() + const channelId = uuid() + + yield* db.execute((client) => + client.insert(schema.organizationsTable).values({ + id: organizationId, + name: "Chat Sync Org", + slug: `chat-sync-${organizationId.slice(0, 8)}`, + logoUrl: null, + settings: null, + isPublic: false, + deletedAt: null, + }), + ) + + yield* db.execute((client) => + client.insert(schema.usersTable).values([ + { + id: authorUserId, + externalId: `user-${authorUserId}`, + email: `author-${authorUserId}@example.com`, + firstName: "Author", + lastName: "User", + avatarUrl: null, + userType: "user", + settings: null, + isOnboarded: true, + timezone: "UTC", + deletedAt: null, + }, + { + id: botUserId, + externalId: `bot-${botUserId}`, + email: `bot-${botUserId}@example.com`, + firstName: "Bot", + lastName: "User", + avatarUrl: null, + userType: "machine", + settings: null, + isOnboarded: true, + timezone: "UTC", + deletedAt: null, + }, + ]), + ) + + yield* db.execute((client) => + client.insert(schema.channelsTable).values({ + id: channelId, + name: "general", + icon: null, + type: "public", + organizationId, + parentChannelId: null, + sectionId: null, + deletedAt: null, + }), + ) + + return { + organizationId, + authorUserId, + botUserId, + channelId, + } as const + }), + ) + +const insertConnection = ( + harness: ChatSyncDbHarness, + params: { + organizationId: OrganizationId + createdBy: UserId + status?: "active" | "paused" | "error" | "disabled" + externalWorkspaceId?: string + }, +) => + harness.run( + Effect.gen(function* () { + const db = yield* Database.Database + const syncConnectionId = uuid() + + yield* db.execute((client) => + client.insert(schema.chatSyncConnectionsTable).values({ + id: syncConnectionId, + organizationId: params.organizationId, + integrationConnectionId: null, + provider: "discord", + externalWorkspaceId: params.externalWorkspaceId ?? "guild-1", + externalWorkspaceName: "Guild", + status: params.status ?? "active", + settings: null, + metadata: null, + errorMessage: null, + lastSyncedAt: null, + createdBy: params.createdBy, + deletedAt: null, + }), + ) + + return syncConnectionId + }), + ) + +const insertLink = ( + harness: ChatSyncDbHarness, + params: { + syncConnectionId: SyncConnectionId + hazelChannelId: ChannelId + externalChannelId: ExternalChannelId + direction?: "both" | "hazel_to_external" | "external_to_hazel" + settings?: Record | null + isActive?: boolean + }, +) => + harness.run( + Effect.gen(function* () { + const db = yield* Database.Database + const linkId = uuid() + yield* db.execute((client) => + client.insert(schema.chatSyncChannelLinksTable).values({ + id: linkId, + syncConnectionId: params.syncConnectionId, + hazelChannelId: params.hazelChannelId, + externalChannelId: params.externalChannelId, + externalChannelName: "external-general", + direction: params.direction ?? "both", + isActive: params.isActive ?? true, + settings: params.settings ?? null, + lastSyncedAt: null, + deletedAt: null, + }), + ) + return linkId + }), + ) + +const insertMessage = ( + harness: ChatSyncDbHarness, + params: { + channelId: ChannelId + authorId: UserId + content: string + replyToMessageId?: MessageId | null + threadChannelId?: ChannelId | null + }, +) => + harness.run( + Effect.gen(function* () { + const db = yield* Database.Database + const messageId = uuid() + yield* db.execute((client) => + client.insert(schema.messagesTable).values({ + id: messageId, + channelId: params.channelId, + authorId: params.authorId, + content: params.content, + embeds: null, + replyToMessageId: params.replyToMessageId ?? null, + threadChannelId: params.threadChannelId ?? null, + deletedAt: null, + }), + ) + return messageId + }), + ) + +const insertReaction = ( + harness: ChatSyncDbHarness, + params: { + messageId: MessageId + channelId: ChannelId + userId: UserId + emoji: string + }, +) => + harness.run( + Effect.gen(function* () { + const db = yield* Database.Database + const reactionId = uuid() + yield* db.execute((client) => + client.insert(schema.messageReactionsTable).values({ + id: reactionId, + messageId: params.messageId, + channelId: params.channelId, + userId: params.userId, + emoji: params.emoji, + }), + ) + return reactionId + }), + ) + +const makeWorkerLayer = ( + harness: ChatSyncDbHarness, + params: { + botUserId: UserId + providerAdapter: ChatSyncProviderAdapter + onSyncChannel?: (channelId: ChannelId) => void + }, +) => { + const repoLayer = Layer.mergeAll( + ChatSyncConnectionRepo.Default, + ChatSyncChannelLinkRepo.Default, + ChatSyncMessageLinkRepo.Default, + ChatSyncEventReceiptRepo.Default, + MessageRepo.Default, + MessageReactionRepo.Default, + ChannelRepo.Default, + IntegrationConnectionRepo.Default, + UserRepo.Default, + OrganizationMemberRepo.Default, + ).pipe(Layer.provide(harness.dbLayer)) + + const deps = Layer.mergeAll( + harness.dbLayer, + repoLayer, + Layer.succeed(ChatSyncProviderRegistry, { + getAdapter: () => Effect.succeed(params.providerAdapter), + } as unknown as ChatSyncProviderRegistry), + Layer.succeed(IntegrationBotService, { + getOrCreateBotUser: () => Effect.succeed({ id: params.botUserId }), + } as unknown as IntegrationBotService), + Layer.succeed(ChannelAccessSyncService, { + syncChannel: (channelId: ChannelId) => + Effect.sync(() => { + params.onSyncChannel?.(channelId) + }), + } as unknown as ChannelAccessSyncService), + Layer.succeed(Discord.DiscordApiClient, { + createWebhook: () => Effect.fail(new Error("not used in deterministic integration tests")), + executeWebhookMessage: () => Effect.fail(new Error("not used in deterministic integration tests")), + updateWebhookMessage: () => Effect.fail(new Error("not used in deterministic integration tests")), + deleteWebhookMessage: () => Effect.fail(new Error("not used in deterministic integration tests")), + createMessage: () => Effect.fail(new Error("not used in deterministic integration tests")), + updateMessage: () => Effect.fail(new Error("not used in deterministic integration tests")), + deleteMessage: () => Effect.fail(new Error("not used in deterministic integration tests")), + addReaction: () => Effect.fail(new Error("not used in deterministic integration tests")), + removeReaction: () => Effect.fail(new Error("not used in deterministic integration tests")), + createThread: () => Effect.fail(new Error("not used in deterministic integration tests")), + } as unknown as Discord.DiscordApiClient), + ) + + return ChatSyncCoreWorker.DefaultWithoutDependencies.pipe(Layer.provide(deps)) +} + +describe("ChatSyncCoreWorker integration", () => { + let harness: ChatSyncDbHarness + + beforeAll(async () => { + harness = await createChatSyncDbHarness() + }, 120_000) + + afterAll(async () => { + await harness.stop() + }, 60_000) + + beforeEach(async () => { + await harness.reset() + }) + + it("syncs Hazel message create/update/delete lifecycle and records receipts", async () => { + const ctx = await insertBaseContext(harness) + const recorder = makeAdapterRecorder() + const workerLayer = makeWorkerLayer(harness, { + botUserId: ctx.botUserId, + providerAdapter: recorder.adapter, + }) + + const syncConnectionId = await insertConnection(harness, { + organizationId: ctx.organizationId, + createdBy: ctx.authorUserId, + }) + const linkId = await insertLink(harness, { + syncConnectionId, + hazelChannelId: ctx.channelId, + externalChannelId: "11111111111111111" as ExternalChannelId, + }) + const messageId = await insertMessage(harness, { + channelId: ctx.channelId, + authorId: ctx.authorUserId, + content: "hello", + }) + + const createResult = await runEffect( + ChatSyncCoreWorker.syncHazelMessageToProvider(syncConnectionId, messageId).pipe( + Effect.provide(workerLayer), + ), + ) + recordChatSyncDiagnostic({ + suite: "chat-sync-core-worker.integration", + testCase: "hazel-message-create-update-delete", + workerMethod: "syncHazelMessageToProvider", + action: "create", + syncConnectionId, + expected: "synced", + actual: createResult.status, + }) + expect(createResult.status).toBe("synced") + expect(recorder.calls.createMessage).toHaveLength(1) + + const dedupedResult = await runEffect( + ChatSyncCoreWorker.syncHazelMessageToProvider(syncConnectionId, messageId).pipe( + Effect.provide(workerLayer), + ), + ) + expect(dedupedResult.status).toBe("deduped") + + await harness.run( + Effect.gen(function* () { + const db = yield* Database.Database + yield* db.execute((client) => + client + .update(schema.messagesTable) + .set({ content: "hello-updated", updatedAt: new Date() }) + .where(eq(schema.messagesTable.id, messageId)), + ) + }), + ) + + const updateResult = await runEffect( + ChatSyncCoreWorker.syncHazelMessageUpdateToProvider(syncConnectionId, messageId).pipe( + Effect.provide(workerLayer), + ), + ) + expect(updateResult.status).toBe("updated") + expect(recorder.calls.updateMessage).toHaveLength(1) + + const deleteResult = await runEffect( + ChatSyncCoreWorker.syncHazelMessageDeleteToProvider(syncConnectionId, messageId).pipe( + Effect.provide(workerLayer), + ), + ) + expect(deleteResult.status).toBe("deleted") + expect(recorder.calls.deleteMessage).toHaveLength(1) + + await harness.run( + Effect.gen(function* () { + const db = yield* Database.Database + const messageLinkRows = yield* db.execute((client) => + client + .select() + .from(schema.chatSyncMessageLinksTable) + .where(eq(schema.chatSyncMessageLinksTable.channelLinkId, linkId)), + ) + expect(messageLinkRows).toHaveLength(1) + expect(messageLinkRows[0]?.deletedAt).not.toBeNull() + + const receiptRows = yield* db.execute((client) => + client + .select() + .from(schema.chatSyncEventReceiptsTable) + .where(eq(schema.chatSyncEventReceiptsTable.syncConnectionId, syncConnectionId)), + ) + expect(receiptRows.length).toBeGreaterThanOrEqual(3) + expect( + receiptRows.some((receipt) => receipt.status === "processed" && receipt.source === "hazel"), + ).toBe(true) + }), + ) + }) + + it("syncs Hazel reactions and ignores delete while reaction still exists", async () => { + const ctx = await insertBaseContext(harness) + const recorder = makeAdapterRecorder() + const workerLayer = makeWorkerLayer(harness, { + botUserId: ctx.botUserId, + providerAdapter: recorder.adapter, + }) + + const syncConnectionId = await insertConnection(harness, { + organizationId: ctx.organizationId, + createdBy: ctx.authorUserId, + }) + const linkId = await insertLink(harness, { + syncConnectionId, + hazelChannelId: ctx.channelId, + externalChannelId: "11111111111111111" as ExternalChannelId, + }) + const messageId = await insertMessage(harness, { + channelId: ctx.channelId, + authorId: ctx.authorUserId, + content: "hello", + }) + const createMessageResult = await runEffect( + ChatSyncCoreWorker.syncHazelMessageToProvider(syncConnectionId, messageId).pipe( + Effect.provide(workerLayer), + ), + ) + expect(createMessageResult.status).toBe("synced") + const externalMessageId = createMessageResult.externalMessageId + + await harness.run( + Effect.gen(function* () { + const db = yield* Database.Database + const links = yield* db.execute((client) => + client + .select() + .from(schema.chatSyncMessageLinksTable) + .where( + eq(schema.chatSyncMessageLinksTable.channelLinkId, linkId), + ), + ) + expect(links[0]?.externalMessageId).toBe(externalMessageId) + }), + ) + + const reactionId = await insertReaction(harness, { + messageId, + channelId: ctx.channelId, + userId: ctx.authorUserId, + emoji: "🔥", + }) + + const createReactionResult = await runEffect( + ChatSyncCoreWorker.syncHazelReactionCreateToProvider(syncConnectionId, reactionId).pipe( + Effect.provide(workerLayer), + ), + ) + expect(createReactionResult.status).toBe("created") + expect(recorder.calls.addReaction).toHaveLength(1) + + const ignoredDelete = await runEffect( + ChatSyncCoreWorker.syncHazelReactionDeleteToProvider(syncConnectionId, { + hazelChannelId: ctx.channelId, + hazelMessageId: messageId, + emoji: "🔥", + userId: ctx.authorUserId, + }).pipe(Effect.provide(workerLayer)), + ) + expect(ignoredDelete.status).toBe("ignored_remaining_reactions") + + await harness.run( + Effect.gen(function* () { + const db = yield* Database.Database + yield* db.execute((client) => + client + .delete(schema.messageReactionsTable) + .where(eq(schema.messageReactionsTable.id, reactionId)), + ) + }), + ) + + const deleteReactionResult = await runEffect( + ChatSyncCoreWorker.syncHazelReactionDeleteToProvider(syncConnectionId, { + hazelChannelId: ctx.channelId, + hazelMessageId: messageId, + emoji: "🔥", + userId: ctx.authorUserId, + }, "hazel:reaction:delete:second-pass").pipe(Effect.provide(workerLayer)), + ) + expect(deleteReactionResult.status).toBe("deleted") + expect(recorder.calls.removeReaction).toHaveLength(1) + }) + + it("syncs Discord ingress message/reaction/thread lifecycle into Hazel", async () => { + const ctx = await insertBaseContext(harness) + const recorder = makeAdapterRecorder() + const syncedChannels: ChannelId[] = [] + const workerLayer = makeWorkerLayer(harness, { + botUserId: ctx.botUserId, + providerAdapter: recorder.adapter, + onSyncChannel: (channelId) => syncedChannels.push(channelId), + }) + + const syncConnectionId = await insertConnection(harness, { + organizationId: ctx.organizationId, + createdBy: ctx.authorUserId, + }) + const externalParentChannelId = "11111111111111111" as ExternalChannelId + const parentLinkId = await insertLink(harness, { + syncConnectionId, + hazelChannelId: ctx.channelId, + externalChannelId: externalParentChannelId, + }) + + const ingressCreate = await runEffect( + ChatSyncCoreWorker.ingestMessageCreate({ + syncConnectionId, + externalChannelId: externalParentChannelId, + externalMessageId: "22222222222222221" as ExternalMessageId, + externalAuthorId: "ext-user-1" as ExternalUserId, + externalAuthorDisplayName: "External User", + content: "from discord", + dedupeKey: "ext:create:1", + }).pipe(Effect.provide(workerLayer)), + ) + expect(ingressCreate.status).toBe("created") + if (!ingressCreate.hazelMessageId) { + throw new Error("ingestMessageCreate did not return hazelMessageId") + } + const hazelMessageId = ingressCreate.hazelMessageId + + const ingressUpdate = await runEffect( + ChatSyncCoreWorker.ingestMessageUpdate({ + syncConnectionId, + externalChannelId: externalParentChannelId, + externalMessageId: "22222222222222221" as ExternalMessageId, + content: "from discord updated", + dedupeKey: "ext:update:1", + }).pipe(Effect.provide(workerLayer)), + ) + expect(ingressUpdate.status).toBe("updated") + expect(ingressUpdate.hazelMessageId).toBe(hazelMessageId) + + const reactionAdd = await runEffect( + ChatSyncCoreWorker.ingestReactionAdd({ + syncConnectionId, + externalChannelId: externalParentChannelId, + externalMessageId: "22222222222222221" as ExternalMessageId, + externalUserId: "ext-user-2" as ExternalUserId, + externalAuthorDisplayName: "React User", + emoji: "🔥", + dedupeKey: "ext:reaction:add:1", + }).pipe(Effect.provide(workerLayer)), + ) + expect(reactionAdd.status).toBe("created") + + const reactionDelete = await runEffect( + ChatSyncCoreWorker.ingestReactionRemove({ + syncConnectionId, + externalChannelId: externalParentChannelId, + externalMessageId: "22222222222222221" as ExternalMessageId, + externalUserId: "ext-user-2" as ExternalUserId, + externalAuthorDisplayName: "React User", + emoji: "🔥", + dedupeKey: "ext:reaction:remove:1", + }).pipe(Effect.provide(workerLayer)), + ) + expect(reactionDelete.status).toBe("deleted") + + const threadCreate = await runEffect( + ChatSyncCoreWorker.ingestThreadCreate({ + syncConnectionId, + externalParentChannelId, + externalThreadId: "33333333333333331" as ExternalThreadId, + externalRootMessageId: "22222222222222221" as ExternalMessageId, + name: "Thread From Discord", + dedupeKey: "ext:thread:create:1", + }).pipe(Effect.provide(workerLayer)), + ) + expect(threadCreate.status).toBe("created") + expect(syncedChannels).toContain(threadCreate.hazelThreadChannelId) + + const ingressDelete = await runEffect( + ChatSyncCoreWorker.ingestMessageDelete({ + syncConnectionId, + externalChannelId: externalParentChannelId, + externalMessageId: "22222222222222221" as ExternalMessageId, + dedupeKey: "ext:delete:1", + }).pipe(Effect.provide(workerLayer)), + ) + expect(ingressDelete.status).toBe("deleted") + expect(ingressDelete.hazelMessageId).toBe(hazelMessageId) + + await harness.run( + Effect.gen(function* () { + const db = yield* Database.Database + const linkRows = yield* db.execute((client) => + client + .select() + .from(schema.chatSyncMessageLinksTable) + .where(eq(schema.chatSyncMessageLinksTable.channelLinkId, parentLinkId)), + ) + expect(linkRows).toHaveLength(1) + expect(linkRows[0]?.hazelMessageId).toBe(hazelMessageId) + + const deletedMessageRows = yield* db.execute((client) => + client + .select() + .from(schema.messagesTable) + .where( + eq(schema.messagesTable.id, hazelMessageId), + ), + ) + expect(deletedMessageRows[0]?.deletedAt).not.toBeNull() + }), + ) + }) + + it("enforces direction/inactive/webhook-origin guards", async () => { + const ctx = await insertBaseContext(harness) + const recorder = makeAdapterRecorder() + const workerLayer = makeWorkerLayer(harness, { + botUserId: ctx.botUserId, + providerAdapter: recorder.adapter, + }) + + const activeBothConnection = await insertConnection(harness, { + organizationId: ctx.organizationId, + createdBy: ctx.authorUserId, + }) + const activeInboundOnlyConnection = await insertConnection(harness, { + organizationId: ctx.organizationId, + createdBy: ctx.authorUserId, + externalWorkspaceId: "guild-2", + }) + const inactiveConnection = await insertConnection(harness, { + organizationId: ctx.organizationId, + createdBy: ctx.authorUserId, + status: "paused", + externalWorkspaceId: "guild-3", + }) + + await insertLink(harness, { + syncConnectionId: activeBothConnection, + hazelChannelId: ctx.channelId, + externalChannelId: "11111111111111111" as ExternalChannelId, + direction: "both", + }) + await insertLink(harness, { + syncConnectionId: activeInboundOnlyConnection, + hazelChannelId: ctx.channelId, + externalChannelId: "11111111111111112" as ExternalChannelId, + direction: "external_to_hazel", + }) + await insertLink(harness, { + syncConnectionId: inactiveConnection, + hazelChannelId: ctx.channelId, + externalChannelId: "11111111111111113" as ExternalChannelId, + direction: "both", + }) + + const outboundMessageId = await insertMessage(harness, { + channelId: ctx.channelId, + authorId: ctx.authorUserId, + content: "direction-check", + }) + + const outboundResult = await runEffect( + ChatSyncCoreWorker.syncHazelMessageCreateToAllConnections("discord", outboundMessageId).pipe( + Effect.provide(workerLayer), + ), + ) + recordChatSyncDiagnostic({ + suite: "chat-sync-core-worker.integration", + testCase: "direction-inactive-webhook-origin-guards", + workerMethod: "syncHazelMessageCreateToAllConnections", + action: "direction_gate", + expected: "1 synced", + actual: `${outboundResult.synced} synced`, + }) + expect(outboundResult.synced).toBe(1) + expect(recorder.calls.createMessage).toHaveLength(1) + + const webhookSettings = { + outboundIdentity: { + enabled: true, + strategy: "webhook", + providers: { + discord: { + kind: "discord.webhook", + webhookId: "99999999999999999", + webhookToken: "token", + }, + }, + }, + } + const guardedLinkConnection = await insertConnection(harness, { + organizationId: ctx.organizationId, + createdBy: ctx.authorUserId, + externalWorkspaceId: "guild-4", + }) + await insertLink(harness, { + syncConnectionId: guardedLinkConnection, + hazelChannelId: ctx.channelId, + externalChannelId: "11111111111111114" as ExternalChannelId, + direction: "both", + settings: webhookSettings, + }) + + const webhookIgnored = await runEffect( + ChatSyncCoreWorker.ingestMessageCreate({ + syncConnectionId: guardedLinkConnection, + externalChannelId: "11111111111111114" as ExternalChannelId, + externalMessageId: "22222222222222224" as ExternalMessageId, + externalWebhookId: "99999999999999999" as ExternalWebhookId, + content: "from webhook", + dedupeKey: "ext:webhook-origin", + }).pipe(Effect.provide(workerLayer)), + ) + expect(webhookIgnored.status).toBe("ignored_webhook_origin") + + const inactiveIgnored = await runEffect( + ChatSyncCoreWorker.ingestMessageCreate({ + syncConnectionId: inactiveConnection, + externalChannelId: "11111111111111113" as ExternalChannelId, + externalMessageId: "22222222222222223" as ExternalMessageId, + content: "ignored", + dedupeKey: "ext:inactive", + }).pipe(Effect.provide(workerLayer)), + ) + expect(inactiveIgnored.status).toBe("ignored_connection_inactive") + + await harness.run( + Effect.gen(function* () { + const db = yield* Database.Database + const rows = yield* db.execute((client) => + client + .select() + .from(schema.messagesTable) + .where( + and( + eq(schema.messagesTable.channelId, ctx.channelId), + isNull(schema.messagesTable.deletedAt), + ), + ), + ) + const contents = rows.map((row) => row.content) + expect(contents).toContain("direction-check") + expect(contents).not.toContain("ignored") + }), + ) + }) +}) diff --git a/apps/backend/src/services/chat-sync/chat-sync-core-worker.ts b/apps/backend/src/services/chat-sync/chat-sync-core-worker.ts index df93f9633..b451cbc9d 100644 --- a/apps/backend/src/services/chat-sync/chat-sync-core-worker.ts +++ b/apps/backend/src/services/chat-sync/chat-sync-core-worker.ts @@ -163,6 +163,7 @@ export class ChatSyncCoreWorker extends Effect.Service()("Ch const integrationBotService = yield* IntegrationBotService const channelAccessSyncService = yield* ChannelAccessSyncService const providerRegistry = yield* ChatSyncProviderRegistry + const discordApiClient = yield* Discord.DiscordApiClient const payloadHash = (value: unknown): string => createHash("sha256").update(JSON.stringify(value)).digest("hex") @@ -504,10 +505,10 @@ export class ChatSyncCoreWorker extends Effect.Service()("Ch } const botToken = Redacted.value(botTokenOption.value) - const created = yield* Discord.DiscordApiClient.createWebhook({ + const created = yield* discordApiClient.createWebhook({ channelId: params.link.externalChannelId, botToken, - }).pipe(Effect.provide(Discord.DiscordApiClient.Default)) + }) const nextConfig: ChatSyncChannelLink.DiscordWebhookOutboundIdentityConfig = { kind: "discord.webhook", @@ -623,14 +624,14 @@ export class ChatSyncCoreWorker extends Effect.Service()("Ch attachments: params.attachments, }) : params.content - const outboundMessageId = yield* Discord.DiscordApiClient.executeWebhookMessage({ + const outboundMessageId = yield* discordApiClient.executeWebhookMessage({ webhookId: config.value.webhookId, webhookToken: config.value.webhookToken, content: outboundContent, replyToExternalMessageId: params.replyToExternalMessageId, username: metadata.username, avatarUrl: metadata.avatarUrl ?? config.value.defaultAvatarUrl, - }).pipe(Effect.provide(Discord.DiscordApiClient.Default)) + }) return Option.some(outboundMessageId as ExternalMessageId) }).pipe( @@ -667,12 +668,12 @@ export class ChatSyncCoreWorker extends Effect.Service()("Ch return false } - yield* Discord.DiscordApiClient.updateWebhookMessage({ + yield* discordApiClient.updateWebhookMessage({ webhookId: config.value.webhookId, webhookToken: config.value.webhookToken, webhookMessageId: params.externalMessageId, content: params.content, - }).pipe(Effect.provide(Discord.DiscordApiClient.Default)) + }) return true }).pipe( @@ -708,11 +709,11 @@ export class ChatSyncCoreWorker extends Effect.Service()("Ch return false } - yield* Discord.DiscordApiClient.deleteWebhookMessage({ + yield* discordApiClient.deleteWebhookMessage({ webhookId: config.value.webhookId, webhookToken: config.value.webhookToken, webhookMessageId: params.externalMessageId, - }).pipe(Effect.provide(Discord.DiscordApiClient.Default)) + }) return true }).pipe( @@ -2590,6 +2591,7 @@ export class ChatSyncCoreWorker extends Effect.Service()("Ch IntegrationBotService.Default, ChannelAccessSyncService.Default, ChatSyncProviderRegistry.Default, + Discord.DiscordApiClient.Default, ], }, ) {} diff --git a/apps/backend/src/services/chat-sync/chat-sync-live-discord.e2e.test.ts b/apps/backend/src/services/chat-sync/chat-sync-live-discord.e2e.test.ts new file mode 100644 index 000000000..6b936ee53 --- /dev/null +++ b/apps/backend/src/services/chat-sync/chat-sync-live-discord.e2e.test.ts @@ -0,0 +1,514 @@ +import { randomUUID } from "node:crypto" +import { + ChannelRepo, + ChatSyncChannelLinkRepo, + ChatSyncConnectionRepo, + ChatSyncEventReceiptRepo, + ChatSyncMessageLinkRepo, + IntegrationConnectionRepo, + MessageReactionRepo, + MessageRepo, + OrganizationMemberRepo, + UserRepo, +} from "@hazel/backend-core" +import { and, Database, eq, isNull, schema } from "@hazel/db" +import { + type ChannelId, + type ExternalChannelId, + type ExternalMessageId, + type ExternalThreadId, + type ExternalUserId, + type MessageId, + type OrganizationId, + type SyncChannelLinkId, + type SyncConnectionId, + type UserId, +} from "@hazel/schema" +import { Discord } from "@hazel/integrations" +import { Effect, Layer } from "effect" +import { afterAll, beforeAll, beforeEach, describe, expect, it } from "vitest" +import { createChatSyncDbHarness, type ChatSyncDbHarness } from "../../test/chat-sync-db-harness" +import { recordChatSyncDiagnostic } from "../../test/chat-sync-test-diagnostics" +import { ChannelAccessSyncService } from "../channel-access-sync" +import { IntegrationBotService } from "../integrations/integration-bot-service" +import { loadChatSyncLiveDiscordTestConfig } from "./chat-sync-live-test-config" +import { ChatSyncCoreWorker } from "./chat-sync-core-worker" +import { ChatSyncProviderRegistry } from "./chat-sync-provider-registry" + +const liveConfig = loadChatSyncLiveDiscordTestConfig() +const describeLive = liveConfig.isConfigured ? describe : describe.skip + +const runEffect = (effect: Effect.Effect) => + Effect.runPromise((effect as Effect.Effect).pipe(Effect.scoped)) + +const uuid = () => randomUUID() as T + +const insertBaseContext = (harness: ChatSyncDbHarness) => + harness.run( + Effect.gen(function* () { + const db = yield* Database.Database + const organizationId = uuid() + const authorUserId = uuid() + const botUserId = uuid() + const channelId = uuid() + + yield* db.execute((client) => + client.insert(schema.organizationsTable).values({ + id: organizationId, + name: "Discord Live Sync Org", + slug: `discord-live-${organizationId.slice(0, 8)}`, + logoUrl: null, + settings: null, + isPublic: false, + deletedAt: null, + }), + ) + + yield* db.execute((client) => + client.insert(schema.usersTable).values([ + { + id: authorUserId, + externalId: `user-${authorUserId}`, + email: `author-${authorUserId}@example.com`, + firstName: "Live", + lastName: "Author", + avatarUrl: null, + userType: "user", + settings: null, + isOnboarded: true, + timezone: "UTC", + deletedAt: null, + }, + { + id: botUserId, + externalId: `bot-${botUserId}`, + email: `bot-${botUserId}@example.com`, + firstName: "Live", + lastName: "Bot", + avatarUrl: null, + userType: "machine", + settings: null, + isOnboarded: true, + timezone: "UTC", + deletedAt: null, + }, + ]), + ) + + yield* db.execute((client) => + client.insert(schema.channelsTable).values({ + id: channelId, + name: "live-sync", + icon: null, + type: "public", + organizationId, + parentChannelId: null, + sectionId: null, + deletedAt: null, + }), + ) + + return { + organizationId, + authorUserId, + botUserId, + channelId, + } as const + }), + ) + +const insertConnection = ( + harness: ChatSyncDbHarness, + params: { + organizationId: OrganizationId + createdBy: UserId + externalWorkspaceId: string + }, +) => + harness.run( + Effect.gen(function* () { + const db = yield* Database.Database + const syncConnectionId = uuid() + yield* db.execute((client) => + client.insert(schema.chatSyncConnectionsTable).values({ + id: syncConnectionId, + organizationId: params.organizationId, + integrationConnectionId: null, + provider: "discord", + externalWorkspaceId: params.externalWorkspaceId, + externalWorkspaceName: "Discord Sandbox", + status: "active", + settings: null, + metadata: null, + errorMessage: null, + lastSyncedAt: null, + createdBy: params.createdBy, + deletedAt: null, + }), + ) + return syncConnectionId + }), + ) + +const insertLink = ( + harness: ChatSyncDbHarness, + params: { + syncConnectionId: SyncConnectionId + hazelChannelId: ChannelId + externalChannelId: ExternalChannelId + direction?: "both" | "hazel_to_external" | "external_to_hazel" + }, +) => + harness.run( + Effect.gen(function* () { + const db = yield* Database.Database + yield* db.execute((client) => + client.insert(schema.chatSyncChannelLinksTable).values({ + id: uuid(), + syncConnectionId: params.syncConnectionId, + hazelChannelId: params.hazelChannelId, + externalChannelId: params.externalChannelId, + externalChannelName: "sandbox-channel", + direction: params.direction ?? "both", + isActive: true, + settings: null, + lastSyncedAt: null, + deletedAt: null, + }), + ) + }), + ) + +const insertMessage = ( + harness: ChatSyncDbHarness, + params: { + channelId: ChannelId + authorId: UserId + content: string + }, +) => + harness.run( + Effect.gen(function* () { + const db = yield* Database.Database + const messageId = uuid() + yield* db.execute((client) => + client.insert(schema.messagesTable).values({ + id: messageId, + channelId: params.channelId, + authorId: params.authorId, + content: params.content, + embeds: null, + replyToMessageId: null, + threadChannelId: null, + deletedAt: null, + }), + ) + return messageId + }), + ) + +const makeWorkerLayer = ( + harness: ChatSyncDbHarness, + params: { + botUserId: UserId + }, +) => { + const repoLayer = Layer.mergeAll( + ChatSyncConnectionRepo.Default, + ChatSyncChannelLinkRepo.Default, + ChatSyncMessageLinkRepo.Default, + ChatSyncEventReceiptRepo.Default, + MessageRepo.Default, + MessageReactionRepo.Default, + ChannelRepo.Default, + IntegrationConnectionRepo.Default, + UserRepo.Default, + OrganizationMemberRepo.Default, + ).pipe(Layer.provide(harness.dbLayer)) + + const deps = Layer.mergeAll( + harness.dbLayer, + repoLayer, + ChatSyncProviderRegistry.Default, + Discord.DiscordApiClient.Default, + Layer.succeed(IntegrationBotService, { + getOrCreateBotUser: () => Effect.succeed({ id: params.botUserId }), + } as unknown as IntegrationBotService), + Layer.succeed(ChannelAccessSyncService, { + syncChannel: () => Effect.void, + } as unknown as ChannelAccessSyncService), + ) + + return ChatSyncCoreWorker.DefaultWithoutDependencies.pipe(Layer.provide(deps)) +} + +describeLive("Chat Sync live Discord nightly e2e", () => { + let harness: ChatSyncDbHarness + let outboundMessageId: ExternalMessageId | undefined + let inboundMessageId: ExternalMessageId | undefined + const activeChannelId = (liveConfig.channelId2 ?? liveConfig.channelId) as ExternalChannelId + + beforeAll(async () => { + process.env.DISCORD_BOT_TOKEN = liveConfig.botToken + harness = await createChatSyncDbHarness() + }, 180_000) + + afterAll(async () => { + if (inboundMessageId) { + await runEffect( + Discord.DiscordApiClient.deleteMessage({ + channelId: activeChannelId, + messageId: inboundMessageId, + botToken: liveConfig.botToken!, + }).pipe( + Effect.provide(Discord.DiscordApiClient.Default), + Effect.catchAll(() => Effect.void), + ), + ) + } + if (outboundMessageId) { + await runEffect( + Discord.DiscordApiClient.deleteMessage({ + channelId: liveConfig.channelId!, + messageId: outboundMessageId, + botToken: liveConfig.botToken!, + }).pipe( + Effect.provide(Discord.DiscordApiClient.Default), + Effect.catchAll(() => Effect.void), + ), + ) + } + await harness.stop() + }, 60_000) + + beforeEach(async () => { + await harness.reset() + }) + + it("validates Hazel -> Discord create/update/delete against live Discord API", async () => { + const ctx = await insertBaseContext(harness) + const workerLayer = makeWorkerLayer(harness, { + botUserId: ctx.botUserId, + }) + const connectionId = await insertConnection(harness, { + organizationId: ctx.organizationId, + createdBy: ctx.authorUserId, + externalWorkspaceId: liveConfig.guildId!, + }) + await insertLink(harness, { + syncConnectionId: connectionId, + hazelChannelId: ctx.channelId, + externalChannelId: liveConfig.channelId! as ExternalChannelId, + }) + const hazelMessageId = await insertMessage(harness, { + channelId: ctx.channelId, + authorId: ctx.authorUserId, + content: `live outbound ${Date.now()}`, + }) + + const createResult = await runEffect( + ChatSyncCoreWorker.syncHazelMessageToProvider(connectionId, hazelMessageId).pipe( + Effect.provide(workerLayer), + ), + ) + expect(createResult.status).toBe("synced") + if (!createResult.externalMessageId) { + throw new Error("syncHazelMessageToProvider did not return externalMessageId") + } + outboundMessageId = createResult.externalMessageId + + await harness.run( + Effect.gen(function* () { + const db = yield* Database.Database + yield* db.execute((client) => + client + .update(schema.messagesTable) + .set({ + content: `live outbound updated ${Date.now()}`, + updatedAt: new Date(), + }) + .where(eq(schema.messagesTable.id, hazelMessageId)), + ) + }), + ) + + const updateResult = await runEffect( + ChatSyncCoreWorker.syncHazelMessageUpdateToProvider(connectionId, hazelMessageId).pipe( + Effect.provide(workerLayer), + ), + ) + expect(updateResult.status).toBe("updated") + + const deleteResult = await runEffect( + ChatSyncCoreWorker.syncHazelMessageDeleteToProvider(connectionId, hazelMessageId).pipe( + Effect.provide(workerLayer), + ), + ) + expect(deleteResult.status).toBe("deleted") + + recordChatSyncDiagnostic({ + suite: "chat-sync-live-discord", + testCase: "hazel-to-discord", + workerMethod: "syncHazelMessageToProvider", + action: "create_update_delete", + syncConnectionId: connectionId, + expected: "synced/updated/deleted", + actual: `${createResult.status}/${updateResult.status}/${deleteResult.status}`, + }) + }, 180_000) + + it("validates Discord -> Hazel ingest for message/reaction/thread lifecycle with live ids", async () => { + const ctx = await insertBaseContext(harness) + const workerLayer = makeWorkerLayer(harness, { + botUserId: ctx.botUserId, + }) + const connectionId = await insertConnection(harness, { + organizationId: ctx.organizationId, + createdBy: ctx.authorUserId, + externalWorkspaceId: liveConfig.guildId!, + }) + await insertLink(harness, { + syncConnectionId: connectionId, + hazelChannelId: ctx.channelId, + externalChannelId: activeChannelId, + direction: "both", + }) + + const createdInboundMessageId = (await runEffect( + Discord.DiscordApiClient.createMessage({ + channelId: activeChannelId, + content: `live inbound ${Date.now()}`, + botToken: liveConfig.botToken!, + }).pipe(Effect.provide(Discord.DiscordApiClient.Default)), + )) as ExternalMessageId + inboundMessageId = createdInboundMessageId + + const createResult = await runEffect( + ChatSyncCoreWorker.ingestMessageCreate({ + syncConnectionId: connectionId, + externalChannelId: activeChannelId, + externalMessageId: createdInboundMessageId, + externalAuthorId: "live-external-author" as ExternalUserId, + externalAuthorDisplayName: "Live External", + content: `ingest create ${Date.now()}`, + dedupeKey: `live:ingest:create:${createdInboundMessageId}`, + }).pipe(Effect.provide(workerLayer)), + ) + expect(createResult.status).toBe("created") + + const updateResult = await runEffect( + ChatSyncCoreWorker.ingestMessageUpdate({ + syncConnectionId: connectionId, + externalChannelId: activeChannelId, + externalMessageId: createdInboundMessageId, + content: `ingest update ${Date.now()}`, + dedupeKey: `live:ingest:update:${createdInboundMessageId}`, + }).pipe(Effect.provide(workerLayer)), + ) + expect(updateResult.status).toBe("updated") + + const reactionAdd = await runEffect( + ChatSyncCoreWorker.ingestReactionAdd({ + syncConnectionId: connectionId, + externalChannelId: activeChannelId, + externalMessageId: createdInboundMessageId, + externalUserId: "live-external-reactor" as ExternalUserId, + externalAuthorDisplayName: "Live Reactor", + emoji: "🔥", + dedupeKey: `live:ingest:reaction:add:${createdInboundMessageId}`, + }).pipe(Effect.provide(workerLayer)), + ) + expect(reactionAdd.status).toBe("created") + + const reactionDelete = await runEffect( + ChatSyncCoreWorker.ingestReactionRemove({ + syncConnectionId: connectionId, + externalChannelId: activeChannelId, + externalMessageId: createdInboundMessageId, + externalUserId: "live-external-reactor" as ExternalUserId, + externalAuthorDisplayName: "Live Reactor", + emoji: "🔥", + dedupeKey: `live:ingest:reaction:remove:${createdInboundMessageId}`, + }).pipe(Effect.provide(workerLayer)), + ) + expect(reactionDelete.status).toBe("deleted") + + const externalThreadId = await runEffect( + Discord.DiscordApiClient.createThread({ + channelId: activeChannelId, + messageId: createdInboundMessageId, + name: `Hazel Live Thread ${Date.now()}`, + botToken: liveConfig.botToken!, + }).pipe(Effect.provide(Discord.DiscordApiClient.Default)), + ) + + const threadCreate = await runEffect( + ChatSyncCoreWorker.ingestThreadCreate({ + syncConnectionId: connectionId, + externalParentChannelId: activeChannelId, + externalThreadId: externalThreadId as ExternalThreadId, + externalRootMessageId: createdInboundMessageId, + name: "Live Thread", + dedupeKey: `live:ingest:thread:${externalThreadId}`, + }).pipe(Effect.provide(workerLayer)), + ) + expect(threadCreate.status).toBe("created") + + const deleteResult = await runEffect( + ChatSyncCoreWorker.ingestMessageDelete({ + syncConnectionId: connectionId, + externalChannelId: activeChannelId, + externalMessageId: createdInboundMessageId, + dedupeKey: `live:ingest:delete:${createdInboundMessageId}`, + }).pipe(Effect.provide(workerLayer)), + ) + expect(deleteResult.status).toBe("deleted") + + await harness.run( + Effect.gen(function* () { + const db = yield* Database.Database + const messageRows = yield* db.execute((client) => + client + .select() + .from(schema.messagesTable) + .where( + and( + eq(schema.messagesTable.channelId, ctx.channelId), + isNull(schema.messagesTable.deletedAt), + ), + ), + ) + expect(messageRows.some((row) => row.content.includes("ingest update"))).toBe(false) + + const messageLinkRows = yield* db.execute((client) => + client + .select() + .from(schema.chatSyncMessageLinksTable) + .where( + eq( + schema.chatSyncMessageLinksTable.externalMessageId, + createdInboundMessageId, + ), + ), + ) + expect(messageLinkRows.length).toBeGreaterThan(0) + }) + ) + + recordChatSyncDiagnostic({ + suite: "chat-sync-live-discord", + testCase: "discord-to-hazel", + workerMethod: "ingestMessageCreate/Update/Delete", + action: "message_reaction_thread", + syncConnectionId: connectionId, + expected: "created/updated/created/deleted/created/deleted", + actual: `${createResult.status}/${updateResult.status}/${reactionAdd.status}/${reactionDelete.status}/${threadCreate.status}/${deleteResult.status}`, + }) + }, 240_000) +}) + +if (!liveConfig.isConfigured) { + console.warn( + `Skipping chat-sync live Discord e2e tests. Missing env vars: ${liveConfig.missing.join(", ")}`, + ) +} diff --git a/apps/backend/src/services/chat-sync/chat-sync-live-test-config.ts b/apps/backend/src/services/chat-sync/chat-sync-live-test-config.ts new file mode 100644 index 000000000..a0dc1a31d --- /dev/null +++ b/apps/backend/src/services/chat-sync/chat-sync-live-test-config.ts @@ -0,0 +1,44 @@ +export interface ChatSyncLiveDiscordTestConfig { + guildId?: string + channelId?: string + channelId2?: string + botToken?: string + isConfigured: boolean + missing: ReadonlyArray +} + +const readNonEmpty = (name: string): string | undefined => { + const value = process.env[name] + if (!value) return undefined + const trimmed = value.trim() + return trimmed.length > 0 ? trimmed : undefined +} + +export const loadChatSyncLiveDiscordTestConfig = (): ChatSyncLiveDiscordTestConfig => { + const guildId = readNonEmpty("DISCORD_SYNC_TEST_GUILD_ID") + const channelId = readNonEmpty("DISCORD_SYNC_TEST_CHANNEL_ID") + const channelId2 = readNonEmpty("DISCORD_SYNC_TEST_CHANNEL_ID_2") + const botToken = + readNonEmpty("DISCORD_SYNC_TEST_BOT_TOKEN") ?? + readNonEmpty("DISCORD_BOT_TOKEN") + + const missing: string[] = [] + if (!guildId) { + missing.push("DISCORD_SYNC_TEST_GUILD_ID") + } + if (!channelId) { + missing.push("DISCORD_SYNC_TEST_CHANNEL_ID") + } + if (!botToken) { + missing.push("DISCORD_SYNC_TEST_BOT_TOKEN|DISCORD_BOT_TOKEN") + } + + return { + guildId, + channelId, + channelId2, + botToken, + isConfigured: missing.length === 0, + missing, + } +} diff --git a/apps/backend/src/services/chat-sync/chat-sync-provider-registry.ts b/apps/backend/src/services/chat-sync/chat-sync-provider-registry.ts index 9261eee2e..8630a79cf 100644 --- a/apps/backend/src/services/chat-sync/chat-sync-provider-registry.ts +++ b/apps/backend/src/services/chat-sync/chat-sync-provider-registry.ts @@ -90,6 +90,8 @@ export class ChatSyncProviderRegistry extends Effect.Service(effect: Effect.Effect) => + Effect.runPromise(effect as Effect.Effect) + +type GatewayLink = { + syncConnectionId: SyncConnectionId + direction: "both" | "hazel_to_external" | "external_to_hazel" +} + +const makeDispatchHarness = () => { + const linksByChannel = new Map>() + const calls = { + create: [] as Array>, + update: [] as Array>, + delete: [] as Array>, + reactionAdd: [] as Array>, + reactionRemove: [] as Array>, + threadCreate: [] as Array>, + } + + const handlers = createDiscordGatewayDispatchHandlers({ + discordSyncWorker: { + ingestMessageCreate: (payload: any) => + Effect.sync(() => { + calls.create.push(payload as Record) + }), + ingestMessageUpdate: (payload: any) => + Effect.sync(() => { + calls.update.push(payload as Record) + }), + ingestMessageDelete: (payload: any) => + Effect.sync(() => { + calls.delete.push(payload as Record) + }), + ingestReactionAdd: (payload: any) => + Effect.sync(() => { + calls.reactionAdd.push(payload as Record) + }), + ingestReactionRemove: (payload: any) => + Effect.sync(() => { + calls.reactionRemove.push(payload as Record) + }), + ingestThreadCreate: (payload: any) => + Effect.sync(() => { + calls.threadCreate.push(payload as Record) + }), + } as any, + findActiveLinksByExternalChannel: (externalChannelId) => + Effect.succeed(linksByChannel.get(externalChannelId) ?? []), + isCurrentBotAuthor: (authorId) => Effect.succeed(authorId === "bot-self"), + }) + + return { + handlers, + calls, + setLinks: (channelId: ExternalChannelId, links: ReadonlyArray) => { + linksByChannel.set(channelId, links) + }, + } +} + +describe("DiscordGatewayService dispatch handlers", () => { + it("routes MESSAGE_CREATE for inbound-capable links and keeps dedupe key stable", async () => { + const harness = makeDispatchHarness() + const channelId = "123456789012345678" as ExternalChannelId + harness.setLinks(channelId, [ + { + syncConnectionId: "00000000-0000-0000-0000-000000000001" as SyncConnectionId, + direction: "both", + }, + { + syncConnectionId: "00000000-0000-0000-0000-000000000002" as SyncConnectionId, + direction: "hazel_to_external", + }, + ]) + + await run( + harness.handlers.ingestMessageCreateEvent({ + id: "223456789012345678", + channel_id: channelId, + content: "hello", + author: { + id: "778899", + username: "alice", + }, + } satisfies DiscordMessageCreateEvent), + ) + + expect(harness.calls.create).toHaveLength(1) + recordChatSyncDiagnostic({ + suite: "discord-gateway-dispatch", + testCase: "message-create-routing", + workerMethod: "ingestMessageCreate", + action: "dispatch", + dedupeKey: String(harness.calls.create[0]?.dedupeKey), + syncConnectionId: String(harness.calls.create[0]?.syncConnectionId), + expected: "one inbound dispatch for both-direction link", + actual: `${harness.calls.create.length} dispatches`, + }) + expect(harness.calls.create[0]?.syncConnectionId).toBe( + "00000000-0000-0000-0000-000000000001", + ) + expect(harness.calls.create[0]?.dedupeKey).toBe("discord:gateway:create:223456789012345678") + }) + + it("drops malformed MESSAGE_DELETE ids and does not dispatch", async () => { + const harness = makeDispatchHarness() + + await run( + harness.handlers.ingestMessageDeleteEvent({ + id: 123 as unknown as string, + channel_id: "123456789012345678", + } satisfies DiscordMessageDeleteEvent), + ) + + expect(harness.calls.delete).toHaveLength(0) + }) + + it("suppresses bot-authored and self-authored message events", async () => { + const harness = makeDispatchHarness() + const channelId = "123456789012345678" as ExternalChannelId + harness.setLinks(channelId, [ + { + syncConnectionId: "00000000-0000-0000-0000-000000000011" as SyncConnectionId, + direction: "both", + }, + ]) + + await run( + harness.handlers.ingestMessageCreateEvent({ + id: "223456789012345678", + channel_id: channelId, + content: "ignored", + author: { id: "bot-1", bot: true }, + }), + ) + await run( + harness.handlers.ingestMessageCreateEvent({ + id: "223456789012345679", + channel_id: channelId, + content: "ignored-2", + author: { id: "bot-self", username: "hazel-bot" }, + }), + ) + + expect(harness.calls.create).toHaveLength(0) + }) + + it("routes MESSAGE_UPDATE and derives deterministic hash-based dedupe when edited timestamp is missing", async () => { + const harness = makeDispatchHarness() + const channelId = "123456789012345678" as ExternalChannelId + harness.setLinks(channelId, [ + { + syncConnectionId: "00000000-0000-0000-0000-000000000021" as SyncConnectionId, + direction: "external_to_hazel", + }, + ]) + + await run( + harness.handlers.ingestMessageUpdateEvent({ + id: "223456789012345678", + channel_id: channelId, + content: "changed", + webhook_id: 999 as unknown as string, + } satisfies DiscordMessageUpdateEvent), + ) + + expect(harness.calls.update).toHaveLength(1) + const dedupe = String(harness.calls.update[0]?.dedupeKey) + expect(dedupe.startsWith("discord:gateway:update:223456789012345678:")).toBe(true) + expect(dedupe.split(":")[4]?.length).toBe(16) + expect(harness.calls.update[0]?.externalWebhookId).toBeUndefined() + }) + + it("routes reaction add/remove with direction filter and custom emoji formatting", async () => { + const harness = makeDispatchHarness() + const channelId = "123456789012345678" as ExternalChannelId + harness.setLinks(channelId, [ + { + syncConnectionId: "00000000-0000-0000-0000-000000000031" as SyncConnectionId, + direction: "both", + }, + { + syncConnectionId: "00000000-0000-0000-0000-000000000032" as SyncConnectionId, + direction: "hazel_to_external", + }, + ]) + + const reactionEvent: DiscordMessageReactionAddEvent = { + channel_id: channelId, + message_id: "223456789012345678", + user_id: "998877665544332211", + emoji: { + name: "party_blob", + id: "445566778899001122", + }, + } + + await run(harness.handlers.ingestMessageReactionAddEvent(reactionEvent)) + await run( + harness.handlers.ingestMessageReactionRemoveEvent({ + ...reactionEvent, + } as any), + ) + + expect(harness.calls.reactionAdd).toHaveLength(1) + expect(harness.calls.reactionRemove).toHaveLength(1) + expect(harness.calls.reactionAdd[0]?.emoji).toBe("party_blob:445566778899001122") + expect(harness.calls.reactionAdd[0]?.dedupeKey).toBe( + "discord:gateway:reaction:add:123456789012345678:223456789012345678:998877665544332211:party_blob:445566778899001122", + ) + }) + + it("routes THREAD_CREATE only for thread types and inbound-capable links", async () => { + const harness = makeDispatchHarness() + const parentChannelId = "123456789012345678" as ExternalChannelId + harness.setLinks(parentChannelId, [ + { + syncConnectionId: "00000000-0000-0000-0000-000000000041" as SyncConnectionId, + direction: "both", + }, + ]) + + await run( + harness.handlers.ingestThreadCreateEvent({ + id: "323456789012345678", + parent_id: parentChannelId, + type: 10, + name: "ignore-non-thread", + } satisfies DiscordThreadCreateEvent), + ) + await run( + harness.handlers.ingestThreadCreateEvent({ + id: "323456789012345679", + parent_id: parentChannelId, + type: 11, + name: "new-thread", + } satisfies DiscordThreadCreateEvent), + ) + + expect(harness.calls.threadCreate).toHaveLength(1) + expect(harness.calls.threadCreate[0]?.dedupeKey).toBe( + "discord:gateway:thread:create:323456789012345679", + ) + }) +}) diff --git a/apps/backend/src/services/chat-sync/discord-gateway-service.ts b/apps/backend/src/services/chat-sync/discord-gateway-service.ts index 24b230f7f..7e61a3ed6 100644 --- a/apps/backend/src/services/chat-sync/discord-gateway-service.ts +++ b/apps/backend/src/services/chat-sync/discord-gateway-service.ts @@ -7,6 +7,7 @@ import { ExternalChannelId, ExternalMessageId, ExternalThreadId, + SyncConnectionId, ExternalUserId, ExternalWebhookId, } from "@hazel/schema" @@ -16,7 +17,7 @@ import { Config, Effect, Layer, Option, Redacted, Ref, Schema } from "effect" import { DiscordSyncWorker } from "./discord-sync-worker" import type { ChatSyncIngressMessageAttachment } from "./chat-sync-core-worker" -interface DiscordMessageAuthor { +export interface DiscordMessageAuthor { id?: string username?: string global_name?: string | null @@ -25,11 +26,11 @@ interface DiscordMessageAuthor { bot?: boolean } -interface DiscordReadyEvent { +export interface DiscordReadyEvent { user?: { id?: string } } -interface DiscordMessageCreateEvent { +export interface DiscordMessageCreateEvent { id?: string channel_id?: string content?: string @@ -49,7 +50,7 @@ interface DiscordMessageAttachment { url?: string } -interface DiscordMessageUpdateEvent { +export interface DiscordMessageUpdateEvent { id?: string channel_id?: string content?: string @@ -58,7 +59,7 @@ interface DiscordMessageUpdateEvent { edited_timestamp?: string | null } -interface DiscordMessageDeleteEvent { +export interface DiscordMessageDeleteEvent { id?: string channel_id?: string webhook_id?: string @@ -69,7 +70,7 @@ interface DiscordReactionEmoji { name?: string | null } -interface DiscordMessageReactionAddEvent { +export interface DiscordMessageReactionAddEvent { channel_id?: string message_id?: string user_id?: string @@ -80,7 +81,7 @@ interface DiscordMessageReactionAddEvent { emoji?: DiscordReactionEmoji } -interface DiscordMessageReactionRemoveEvent { +export interface DiscordMessageReactionRemoveEvent { channel_id?: string message_id?: string user_id?: string @@ -91,7 +92,7 @@ interface DiscordMessageReactionRemoveEvent { emoji?: DiscordReactionEmoji } -interface DiscordThreadCreateEvent { +export interface DiscordThreadCreateEvent { id?: string parent_id?: string name?: string @@ -189,6 +190,389 @@ export const decodeOptionalExternalId = ( const getValueType = (value: unknown): string => (value === null ? "null" : typeof value) +type GatewayDirection = "both" | "hazel_to_external" | "external_to_hazel" + +type DiscordGatewayChannelLink = { + readonly syncConnectionId: SyncConnectionId + readonly direction: GatewayDirection +} + +type DiscordGatewayDispatchWorker = Pick< + DiscordSyncWorker, + | "ingestMessageCreate" + | "ingestMessageUpdate" + | "ingestMessageDelete" + | "ingestReactionAdd" + | "ingestReactionRemove" + | "ingestThreadCreate" +> + +const decodeRequiredExternalIdOrWarn = (params: { + eventType: string + field: string + value: unknown + decode: ExternalIdDecoder +}) => + Effect.gen(function* () { + const decoded = decodeRequiredExternalId(params.value, params.decode) + if (Option.isNone(decoded)) { + yield* Effect.logWarning("Discord gateway dropped event: invalid external id", { + eventType: params.eventType, + field: params.field, + valueType: getValueType(params.value), + }) + } + return decoded + }) + +const decodeOptionalExternalIdOrWarn = (params: { + eventType: string + field: string + value: unknown + decode: ExternalIdDecoder +}) => + Effect.gen(function* () { + const decoded = decodeOptionalExternalId(params.value, params.decode) + if (params.value !== undefined && decoded === undefined) { + yield* Effect.logWarning("Discord gateway ignored optional invalid external id", { + eventType: params.eventType, + field: params.field, + valueType: getValueType(params.value), + }) + } + return decoded + }) + +export const createDiscordGatewayDispatchHandlers = (deps: { + discordSyncWorker: DiscordGatewayDispatchWorker + findActiveLinksByExternalChannel: ( + externalChannelId: ExternalChannelId, + ) => Effect.Effect, unknown, never> + isCurrentBotAuthor: (authorId?: string) => Effect.Effect +}) => { + const ingestMessageCreateEvent = Effect.fn("DiscordGatewayService.ingestMessageCreateEvent")( + function* (event: DiscordMessageCreateEvent) { + if (!event.id || !event.channel_id || typeof event.content !== "string") return + if (event.author?.bot) return + if (yield* deps.isCurrentBotAuthor(event.author?.id)) return + + const externalChannelIdOption = yield* decodeRequiredExternalIdOrWarn({ + eventType: "MESSAGE_CREATE", + field: "channel_id", + value: event.channel_id, + decode: decodeExternalChannelId, + }) + if (Option.isNone(externalChannelIdOption)) return + + const externalMessageIdOption = yield* decodeRequiredExternalIdOrWarn({ + eventType: "MESSAGE_CREATE", + field: "id", + value: event.id, + decode: decodeExternalMessageId, + }) + if (Option.isNone(externalMessageIdOption)) return + + const externalChannelId = externalChannelIdOption.value + const externalMessageId = externalMessageIdOption.value + const externalAuthorId = yield* decodeOptionalExternalIdOrWarn({ + eventType: "MESSAGE_CREATE", + field: "author.id", + value: event.author?.id, + decode: decodeExternalUserId, + }) + const externalAuthorDisplayName = formatDiscordDisplayName(event.author) + const externalAuthorAvatarUrl = buildAuthorAvatarUrl(event.author) + const externalReplyToMessageId = yield* decodeOptionalExternalIdOrWarn({ + eventType: "MESSAGE_CREATE", + field: "message_reference.message_id", + value: event.message_reference?.message_id, + decode: decodeExternalMessageId, + }) + const externalWebhookId = yield* decodeOptionalExternalIdOrWarn({ + eventType: "MESSAGE_CREATE", + field: "webhook_id", + value: event.webhook_id, + decode: decodeExternalWebhookId, + }) + const externalAttachments = normalizeDiscordMessageAttachments(event.attachments) + + const links = yield* deps.findActiveLinksByExternalChannel(externalChannelId) + + for (const link of links) { + if (link.direction === "hazel_to_external") continue + yield* deps.discordSyncWorker.ingestMessageCreate({ + syncConnectionId: link.syncConnectionId, + externalChannelId, + externalMessageId, + content: event.content, + externalAuthorId, + externalAuthorDisplayName, + externalAuthorAvatarUrl, + externalReplyToMessageId: externalReplyToMessageId ?? null, + externalThreadId: null, + externalAttachments, + externalWebhookId, + dedupeKey: `discord:gateway:create:${externalMessageId}`, + }) + } + }, + ) + + const ingestMessageUpdateEvent = Effect.fn("DiscordGatewayService.ingestMessageUpdateEvent")( + function* (event: DiscordMessageUpdateEvent) { + if (!event.id || !event.channel_id || typeof event.content !== "string") return + if (event.author?.bot) return + if (yield* deps.isCurrentBotAuthor(event.author?.id)) return + + const externalChannelIdOption = yield* decodeRequiredExternalIdOrWarn({ + eventType: "MESSAGE_UPDATE", + field: "channel_id", + value: event.channel_id, + decode: decodeExternalChannelId, + }) + if (Option.isNone(externalChannelIdOption)) return + + const externalMessageIdOption = yield* decodeRequiredExternalIdOrWarn({ + eventType: "MESSAGE_UPDATE", + field: "id", + value: event.id, + decode: decodeExternalMessageId, + }) + if (Option.isNone(externalMessageIdOption)) return + + const externalChannelId = externalChannelIdOption.value + const externalMessageId = externalMessageIdOption.value + const externalWebhookId = yield* decodeOptionalExternalIdOrWarn({ + eventType: "MESSAGE_UPDATE", + field: "webhook_id", + value: event.webhook_id, + decode: decodeExternalWebhookId, + }) + + const links = yield* deps.findActiveLinksByExternalChannel(externalChannelId) + + for (const link of links) { + if (link.direction === "hazel_to_external") continue + const updateVersion = + event.edited_timestamp ?? + createHash("sha256") + .update(`${externalMessageId}:${event.content ?? ""}`) + .digest("hex") + .slice(0, 16) + yield* deps.discordSyncWorker.ingestMessageUpdate({ + syncConnectionId: link.syncConnectionId, + externalChannelId, + externalMessageId, + externalWebhookId, + content: event.content, + dedupeKey: `discord:gateway:update:${externalMessageId}:${updateVersion}`, + }) + } + }, + ) + + const ingestMessageDeleteEvent = Effect.fn("DiscordGatewayService.ingestMessageDeleteEvent")( + function* (event: DiscordMessageDeleteEvent) { + if (!event.id || !event.channel_id) return + + const externalChannelIdOption = yield* decodeRequiredExternalIdOrWarn({ + eventType: "MESSAGE_DELETE", + field: "channel_id", + value: event.channel_id, + decode: decodeExternalChannelId, + }) + if (Option.isNone(externalChannelIdOption)) return + + const externalMessageIdOption = yield* decodeRequiredExternalIdOrWarn({ + eventType: "MESSAGE_DELETE", + field: "id", + value: event.id, + decode: decodeExternalMessageId, + }) + if (Option.isNone(externalMessageIdOption)) return + + const externalChannelId = externalChannelIdOption.value + const externalMessageId = externalMessageIdOption.value + const externalWebhookId = yield* decodeOptionalExternalIdOrWarn({ + eventType: "MESSAGE_DELETE", + field: "webhook_id", + value: event.webhook_id, + decode: decodeExternalWebhookId, + }) + + const links = yield* deps.findActiveLinksByExternalChannel(externalChannelId) + + for (const link of links) { + if (link.direction === "hazel_to_external") continue + yield* deps.discordSyncWorker.ingestMessageDelete({ + syncConnectionId: link.syncConnectionId, + externalChannelId, + externalMessageId, + externalWebhookId, + dedupeKey: `discord:gateway:delete:${externalMessageId}`, + }) + } + }, + ) + + const ingestMessageReactionAddEvent = Effect.fn( + "DiscordGatewayService.ingestMessageReactionAddEvent", + )(function* (event: DiscordMessageReactionAddEvent) { + if (!event.channel_id || !event.message_id || !event.user_id) return + if (yield* deps.isCurrentBotAuthor(event.user_id)) return + + const emoji = formatDiscordEmoji(event.emoji) + if (!emoji) return + const { externalAuthorDisplayName, externalAuthorAvatarUrl } = extractReactionAuthor(event) + const externalChannelIdOption = yield* decodeRequiredExternalIdOrWarn({ + eventType: "MESSAGE_REACTION_ADD", + field: "channel_id", + value: event.channel_id, + decode: decodeExternalChannelId, + }) + if (Option.isNone(externalChannelIdOption)) return + + const externalMessageIdOption = yield* decodeRequiredExternalIdOrWarn({ + eventType: "MESSAGE_REACTION_ADD", + field: "message_id", + value: event.message_id, + decode: decodeExternalMessageId, + }) + if (Option.isNone(externalMessageIdOption)) return + + const externalUserIdOption = yield* decodeRequiredExternalIdOrWarn({ + eventType: "MESSAGE_REACTION_ADD", + field: "user_id", + value: event.user_id, + decode: decodeExternalUserId, + }) + if (Option.isNone(externalUserIdOption)) return + + const externalChannelId = externalChannelIdOption.value + const externalMessageId = externalMessageIdOption.value + const externalUserId = externalUserIdOption.value + + const links = yield* deps.findActiveLinksByExternalChannel(externalChannelId) + + for (const link of links) { + if (link.direction === "hazel_to_external") continue + yield* deps.discordSyncWorker.ingestReactionAdd({ + syncConnectionId: link.syncConnectionId, + externalChannelId, + externalMessageId, + externalUserId, + externalAuthorDisplayName, + externalAuthorAvatarUrl, + emoji, + dedupeKey: `discord:gateway:reaction:add:${externalChannelId}:${externalMessageId}:${externalUserId}:${emoji}`, + }) + } + }) + + const ingestMessageReactionRemoveEvent = Effect.fn( + "DiscordGatewayService.ingestMessageReactionRemoveEvent", + )(function* (event: DiscordMessageReactionRemoveEvent) { + if (!event.channel_id || !event.message_id || !event.user_id) return + if (yield* deps.isCurrentBotAuthor(event.user_id)) return + + const emoji = formatDiscordEmoji(event.emoji) + if (!emoji) return + const { externalAuthorDisplayName, externalAuthorAvatarUrl } = extractReactionAuthor(event) + const externalChannelIdOption = yield* decodeRequiredExternalIdOrWarn({ + eventType: "MESSAGE_REACTION_REMOVE", + field: "channel_id", + value: event.channel_id, + decode: decodeExternalChannelId, + }) + if (Option.isNone(externalChannelIdOption)) return + + const externalMessageIdOption = yield* decodeRequiredExternalIdOrWarn({ + eventType: "MESSAGE_REACTION_REMOVE", + field: "message_id", + value: event.message_id, + decode: decodeExternalMessageId, + }) + if (Option.isNone(externalMessageIdOption)) return + + const externalUserIdOption = yield* decodeRequiredExternalIdOrWarn({ + eventType: "MESSAGE_REACTION_REMOVE", + field: "user_id", + value: event.user_id, + decode: decodeExternalUserId, + }) + if (Option.isNone(externalUserIdOption)) return + + const externalChannelId = externalChannelIdOption.value + const externalMessageId = externalMessageIdOption.value + const externalUserId = externalUserIdOption.value + + const links = yield* deps.findActiveLinksByExternalChannel(externalChannelId) + + for (const link of links) { + if (link.direction === "hazel_to_external") continue + yield* deps.discordSyncWorker.ingestReactionRemove({ + syncConnectionId: link.syncConnectionId, + externalChannelId, + externalMessageId, + externalUserId, + externalAuthorDisplayName, + externalAuthorAvatarUrl, + emoji, + dedupeKey: `discord:gateway:reaction:remove:${externalChannelId}:${externalMessageId}:${externalUserId}:${emoji}`, + }) + } + }) + + const ingestThreadCreateEvent = Effect.fn("DiscordGatewayService.ingestThreadCreateEvent")( + function* (event: DiscordThreadCreateEvent) { + if (!event.id || !event.parent_id) return + if (event.type !== undefined && event.type !== 11 && event.type !== 12) return + + const externalParentChannelIdOption = yield* decodeRequiredExternalIdOrWarn({ + eventType: "THREAD_CREATE", + field: "parent_id", + value: event.parent_id, + decode: decodeExternalChannelId, + }) + if (Option.isNone(externalParentChannelIdOption)) return + + const externalThreadIdOption = yield* decodeRequiredExternalIdOrWarn({ + eventType: "THREAD_CREATE", + field: "id", + value: event.id, + decode: decodeExternalThreadId, + }) + if (Option.isNone(externalThreadIdOption)) return + + const externalParentChannelId = externalParentChannelIdOption.value + const externalThreadId = externalThreadIdOption.value + + const links = yield* deps.findActiveLinksByExternalChannel(externalParentChannelId) + + for (const link of links) { + if (link.direction === "hazel_to_external") continue + yield* deps.discordSyncWorker.ingestThreadCreate({ + syncConnectionId: link.syncConnectionId, + externalParentChannelId, + externalThreadId, + externalRootMessageId: null, + name: event.name ?? "Thread", + dedupeKey: `discord:gateway:thread:create:${externalThreadId}`, + }) + } + }, + ) + + return { + ingestMessageCreateEvent, + ingestMessageUpdateEvent, + ingestMessageDeleteEvent, + ingestMessageReactionAddEvent, + ingestMessageReactionRemoveEvent, + ingestThreadCreateEvent, + } +} + export class DiscordGatewayService extends Effect.Service()("DiscordGatewayService", { accessors: true, effect: Effect.gen(function* () { @@ -248,365 +632,12 @@ export class DiscordGatewayService extends Effect.Service return Option.isSome(botUserId) && botUserId.value === authorId }) - const decodeRequiredExternalIdOrWarn = (params: { - eventType: string - field: string - value: unknown - decode: ExternalIdDecoder - }) => - Effect.gen(function* () { - const decoded = decodeRequiredExternalId(params.value, params.decode) - if (Option.isNone(decoded)) { - yield* Effect.logWarning("Discord gateway dropped event: invalid external id", { - eventType: params.eventType, - field: params.field, - valueType: getValueType(params.value), - }) - } - return decoded - }) - - const decodeOptionalExternalIdOrWarn = (params: { - eventType: string - field: string - value: unknown - decode: ExternalIdDecoder - }) => - Effect.gen(function* () { - const decoded = decodeOptionalExternalId(params.value, params.decode) - if (params.value !== undefined && decoded === undefined) { - yield* Effect.logWarning("Discord gateway ignored optional invalid external id", { - eventType: params.eventType, - field: params.field, - valueType: getValueType(params.value), - }) - } - return decoded - }) - - const ingestMessageCreateEvent = Effect.fn("DiscordGatewayService.ingestMessageCreateEvent")( - function* (event: DiscordMessageCreateEvent) { - if (!event.id || !event.channel_id || typeof event.content !== "string") return - if (event.author?.bot) return - if (yield* isCurrentBotAuthor(event.author?.id)) return - - const externalChannelIdOption = yield* decodeRequiredExternalIdOrWarn({ - eventType: "MESSAGE_CREATE", - field: "channel_id", - value: event.channel_id, - decode: decodeExternalChannelId, - }) - if (Option.isNone(externalChannelIdOption)) return - - const externalMessageIdOption = yield* decodeRequiredExternalIdOrWarn({ - eventType: "MESSAGE_CREATE", - field: "id", - value: event.id, - decode: decodeExternalMessageId, - }) - if (Option.isNone(externalMessageIdOption)) return - - const externalChannelId = externalChannelIdOption.value - const externalMessageId = externalMessageIdOption.value - const externalAuthorId = yield* decodeOptionalExternalIdOrWarn({ - eventType: "MESSAGE_CREATE", - field: "author.id", - value: event.author?.id, - decode: decodeExternalUserId, - }) - const externalAuthorDisplayName = formatDiscordDisplayName(event.author) - const externalAuthorAvatarUrl = buildAuthorAvatarUrl(event.author) - const externalReplyToMessageId = yield* decodeOptionalExternalIdOrWarn({ - eventType: "MESSAGE_CREATE", - field: "message_reference.message_id", - value: event.message_reference?.message_id, - decode: decodeExternalMessageId, - }) - const externalWebhookId = yield* decodeOptionalExternalIdOrWarn({ - eventType: "MESSAGE_CREATE", - field: "webhook_id", - value: event.webhook_id, - decode: decodeExternalWebhookId, - }) - const externalAttachments = normalizeDiscordMessageAttachments(event.attachments) - - const links = yield* channelLinkRepo - .findActiveByExternalChannel(externalChannelId) - .pipe(withSystemActor) - - for (const link of links) { - if (link.direction === "hazel_to_external") continue - yield* discordSyncWorker.ingestMessageCreate({ - syncConnectionId: link.syncConnectionId, - externalChannelId, - externalMessageId, - content: event.content, - externalAuthorId, - externalAuthorDisplayName, - externalAuthorAvatarUrl, - externalReplyToMessageId: externalReplyToMessageId ?? null, - externalThreadId: null, - externalAttachments, - externalWebhookId, - dedupeKey: `discord:gateway:create:${externalMessageId}`, - }) - } - }, - ) - - const ingestMessageUpdateEvent = Effect.fn("DiscordGatewayService.ingestMessageUpdateEvent")( - function* (event: DiscordMessageUpdateEvent) { - if (!event.id || !event.channel_id || typeof event.content !== "string") return - if (event.author?.bot) return - if (yield* isCurrentBotAuthor(event.author?.id)) return - - const externalChannelIdOption = yield* decodeRequiredExternalIdOrWarn({ - eventType: "MESSAGE_UPDATE", - field: "channel_id", - value: event.channel_id, - decode: decodeExternalChannelId, - }) - if (Option.isNone(externalChannelIdOption)) return - - const externalMessageIdOption = yield* decodeRequiredExternalIdOrWarn({ - eventType: "MESSAGE_UPDATE", - field: "id", - value: event.id, - decode: decodeExternalMessageId, - }) - if (Option.isNone(externalMessageIdOption)) return - - const externalChannelId = externalChannelIdOption.value - const externalMessageId = externalMessageIdOption.value - const externalWebhookId = yield* decodeOptionalExternalIdOrWarn({ - eventType: "MESSAGE_UPDATE", - field: "webhook_id", - value: event.webhook_id, - decode: decodeExternalWebhookId, - }) - - const links = yield* channelLinkRepo - .findActiveByExternalChannel(externalChannelId) - .pipe(withSystemActor) - - for (const link of links) { - if (link.direction === "hazel_to_external") continue - const updateVersion = - event.edited_timestamp ?? - createHash("sha256") - .update(`${externalMessageId}:${event.content ?? ""}`) - .digest("hex") - .slice(0, 16) - yield* discordSyncWorker.ingestMessageUpdate({ - syncConnectionId: link.syncConnectionId, - externalChannelId, - externalMessageId, - externalWebhookId, - content: event.content, - dedupeKey: `discord:gateway:update:${externalMessageId}:${updateVersion}`, - }) - } - }, - ) - - const ingestMessageDeleteEvent = Effect.fn("DiscordGatewayService.ingestMessageDeleteEvent")( - function* (event: DiscordMessageDeleteEvent) { - if (!event.id || !event.channel_id) return - - const externalChannelIdOption = yield* decodeRequiredExternalIdOrWarn({ - eventType: "MESSAGE_DELETE", - field: "channel_id", - value: event.channel_id, - decode: decodeExternalChannelId, - }) - if (Option.isNone(externalChannelIdOption)) return - - const externalMessageIdOption = yield* decodeRequiredExternalIdOrWarn({ - eventType: "MESSAGE_DELETE", - field: "id", - value: event.id, - decode: decodeExternalMessageId, - }) - if (Option.isNone(externalMessageIdOption)) return - - const externalChannelId = externalChannelIdOption.value - const externalMessageId = externalMessageIdOption.value - const externalWebhookId = yield* decodeOptionalExternalIdOrWarn({ - eventType: "MESSAGE_DELETE", - field: "webhook_id", - value: event.webhook_id, - decode: decodeExternalWebhookId, - }) - - const links = yield* channelLinkRepo - .findActiveByExternalChannel(externalChannelId) - .pipe(withSystemActor) - - for (const link of links) { - if (link.direction === "hazel_to_external") continue - yield* discordSyncWorker.ingestMessageDelete({ - syncConnectionId: link.syncConnectionId, - externalChannelId, - externalMessageId, - externalWebhookId, - dedupeKey: `discord:gateway:delete:${externalMessageId}`, - }) - } - }, - ) - const ingestMessageReactionAddEvent = Effect.fn( - "DiscordGatewayService.ingestMessageReactionAddEvent", - )(function* (event: DiscordMessageReactionAddEvent) { - if (!event.channel_id || !event.message_id || !event.user_id) return - if (yield* isCurrentBotAuthor(event.user_id)) return - - const emoji = formatDiscordEmoji(event.emoji) - if (!emoji) return - const { externalAuthorDisplayName, externalAuthorAvatarUrl } = extractReactionAuthor(event) - const externalChannelIdOption = yield* decodeRequiredExternalIdOrWarn({ - eventType: "MESSAGE_REACTION_ADD", - field: "channel_id", - value: event.channel_id, - decode: decodeExternalChannelId, - }) - if (Option.isNone(externalChannelIdOption)) return - - const externalMessageIdOption = yield* decodeRequiredExternalIdOrWarn({ - eventType: "MESSAGE_REACTION_ADD", - field: "message_id", - value: event.message_id, - decode: decodeExternalMessageId, - }) - if (Option.isNone(externalMessageIdOption)) return - - const externalUserIdOption = yield* decodeRequiredExternalIdOrWarn({ - eventType: "MESSAGE_REACTION_ADD", - field: "user_id", - value: event.user_id, - decode: decodeExternalUserId, - }) - if (Option.isNone(externalUserIdOption)) return - - const externalChannelId = externalChannelIdOption.value - const externalMessageId = externalMessageIdOption.value - const externalUserId = externalUserIdOption.value - - const links = yield* channelLinkRepo - .findActiveByExternalChannel(externalChannelId) - .pipe(withSystemActor) - - for (const link of links) { - if (link.direction === "hazel_to_external") continue - yield* discordSyncWorker.ingestReactionAdd({ - syncConnectionId: link.syncConnectionId, - externalChannelId, - externalMessageId, - externalUserId, - externalAuthorDisplayName, - externalAuthorAvatarUrl, - emoji, - dedupeKey: `discord:gateway:reaction:add:${externalChannelId}:${externalMessageId}:${externalUserId}:${emoji}`, - }) - } - }) - - const ingestMessageReactionRemoveEvent = Effect.fn( - "DiscordGatewayService.ingestMessageReactionRemoveEvent", - )(function* (event: DiscordMessageReactionRemoveEvent) { - if (!event.channel_id || !event.message_id || !event.user_id) return - if (yield* isCurrentBotAuthor(event.user_id)) return - - const emoji = formatDiscordEmoji(event.emoji) - if (!emoji) return - const { externalAuthorDisplayName, externalAuthorAvatarUrl } = extractReactionAuthor(event) - const externalChannelIdOption = yield* decodeRequiredExternalIdOrWarn({ - eventType: "MESSAGE_REACTION_REMOVE", - field: "channel_id", - value: event.channel_id, - decode: decodeExternalChannelId, - }) - if (Option.isNone(externalChannelIdOption)) return - - const externalMessageIdOption = yield* decodeRequiredExternalIdOrWarn({ - eventType: "MESSAGE_REACTION_REMOVE", - field: "message_id", - value: event.message_id, - decode: decodeExternalMessageId, - }) - if (Option.isNone(externalMessageIdOption)) return - - const externalUserIdOption = yield* decodeRequiredExternalIdOrWarn({ - eventType: "MESSAGE_REACTION_REMOVE", - field: "user_id", - value: event.user_id, - decode: decodeExternalUserId, - }) - if (Option.isNone(externalUserIdOption)) return - - const externalChannelId = externalChannelIdOption.value - const externalMessageId = externalMessageIdOption.value - const externalUserId = externalUserIdOption.value - - const links = yield* channelLinkRepo - .findActiveByExternalChannel(externalChannelId) - .pipe(withSystemActor) - - for (const link of links) { - if (link.direction === "hazel_to_external") continue - yield* discordSyncWorker.ingestReactionRemove({ - syncConnectionId: link.syncConnectionId, - externalChannelId, - externalMessageId, - externalUserId, - externalAuthorDisplayName, - externalAuthorAvatarUrl, - emoji, - dedupeKey: `discord:gateway:reaction:remove:${externalChannelId}:${externalMessageId}:${externalUserId}:${emoji}`, - }) - } - }) - - const ingestThreadCreateEvent = Effect.fn("DiscordGatewayService.ingestThreadCreateEvent")( - function* (event: DiscordThreadCreateEvent) { - if (!event.id || !event.parent_id) return - if (event.type !== undefined && event.type !== 11 && event.type !== 12) return - - const externalParentChannelIdOption = yield* decodeRequiredExternalIdOrWarn({ - eventType: "THREAD_CREATE", - field: "parent_id", - value: event.parent_id, - decode: decodeExternalChannelId, - }) - if (Option.isNone(externalParentChannelIdOption)) return - - const externalThreadIdOption = yield* decodeRequiredExternalIdOrWarn({ - eventType: "THREAD_CREATE", - field: "id", - value: event.id, - decode: decodeExternalThreadId, - }) - if (Option.isNone(externalThreadIdOption)) return - - const externalParentChannelId = externalParentChannelIdOption.value - const externalThreadId = externalThreadIdOption.value - - const links = yield* channelLinkRepo - .findActiveByExternalChannel(externalParentChannelId) - .pipe(withSystemActor) - - for (const link of links) { - if (link.direction === "hazel_to_external") continue - yield* discordSyncWorker.ingestThreadCreate({ - syncConnectionId: link.syncConnectionId, - externalParentChannelId, - externalThreadId, - externalRootMessageId: null, - name: event.name ?? "Thread", - dedupeKey: `discord:gateway:thread:create:${externalThreadId}`, - }) - } - }, - ) + const dispatchHandlers = createDiscordGatewayDispatchHandlers({ + discordSyncWorker, + findActiveLinksByExternalChannel: (externalChannelId) => + channelLinkRepo.findActiveByExternalChannel(externalChannelId).pipe(withSystemActor), + isCurrentBotAuthor, + }) const onReady = Effect.fn("DiscordGatewayService.onReady")(function* (event: DiscordReadyEvent) { if (!event.user?.id) { @@ -651,36 +682,40 @@ export class DiscordGatewayService extends Effect.Service ), ), gateway.handleDispatch("MESSAGE_CREATE", (event) => - ingestMessageCreateEvent(event as DiscordMessageCreateEvent).pipe( + dispatchHandlers.ingestMessageCreateEvent(event as DiscordMessageCreateEvent).pipe( Effect.catchAll((error) => onDispatchError("MESSAGE_CREATE", error)), ), ), gateway.handleDispatch("MESSAGE_UPDATE", (event) => - ingestMessageUpdateEvent(event as DiscordMessageUpdateEvent).pipe( + dispatchHandlers.ingestMessageUpdateEvent(event as DiscordMessageUpdateEvent).pipe( Effect.catchAll((error) => onDispatchError("MESSAGE_UPDATE", error)), ), ), gateway.handleDispatch("MESSAGE_DELETE", (event) => - ingestMessageDeleteEvent(event as DiscordMessageDeleteEvent).pipe( + dispatchHandlers.ingestMessageDeleteEvent(event as DiscordMessageDeleteEvent).pipe( Effect.catchAll((error) => onDispatchError("MESSAGE_DELETE", error)), ), ), gateway.handleDispatch("MESSAGE_REACTION_ADD", (event) => - ingestMessageReactionAddEvent(event as DiscordMessageReactionAddEvent).pipe( + dispatchHandlers + .ingestMessageReactionAddEvent(event as DiscordMessageReactionAddEvent) + .pipe( Effect.catchAll((error) => onDispatchError("MESSAGE_REACTION_ADD", error), ), ), ), gateway.handleDispatch("MESSAGE_REACTION_REMOVE", (event) => - ingestMessageReactionRemoveEvent(event as DiscordMessageReactionRemoveEvent).pipe( + dispatchHandlers + .ingestMessageReactionRemoveEvent(event as DiscordMessageReactionRemoveEvent) + .pipe( Effect.catchAll((error) => onDispatchError("MESSAGE_REACTION_REMOVE", error), ), ), ), gateway.handleDispatch("THREAD_CREATE", (event) => - ingestThreadCreateEvent(event as DiscordThreadCreateEvent).pipe( + dispatchHandlers.ingestThreadCreateEvent(event as DiscordThreadCreateEvent).pipe( Effect.catchAll((error) => onDispatchError("THREAD_CREATE", error)), ), ), diff --git a/apps/backend/src/test/chat-sync-db-harness.ts b/apps/backend/src/test/chat-sync-db-harness.ts new file mode 100644 index 000000000..02605eeed --- /dev/null +++ b/apps/backend/src/test/chat-sync-db-harness.ts @@ -0,0 +1,79 @@ +import { execSync } from "node:child_process" +import { fileURLToPath } from "node:url" +import { PostgreSqlContainer, type StartedPostgreSqlContainer } from "@testcontainers/postgresql" +import { Database } from "@hazel/db" +import { Effect, Layer, Redacted } from "effect" + +const DB_PACKAGE_DIR = fileURLToPath(new URL("../../../../packages/db", import.meta.url)) + +const TRUNCATE_SQL = ` +TRUNCATE TABLE + chat_sync_event_receipts, + chat_sync_message_links, + chat_sync_channel_links, + chat_sync_connections, + message_reactions, + messages, + channels, + organization_members, + integration_connections, + users, + organizations +RESTART IDENTITY CASCADE; +` + +export interface ChatSyncDbHarness { + readonly container: StartedPostgreSqlContainer + readonly dbLayer: Layer.Layer + run: (effect: Effect.Effect) => Promise + reset: () => Promise + stop: () => Promise +} + +const runDbPush = (databaseUrl: string) => { + execSync("bun run db:push", { + cwd: DB_PACKAGE_DIR, + stdio: "pipe", + env: { + ...process.env, + DATABASE_URL: databaseUrl, + }, + }) +} + +export const createChatSyncDbHarness = async (): Promise => { + const container = await new PostgreSqlContainer("postgres:alpine").start() + const databaseUrl = container.getConnectionUri() + + runDbPush(databaseUrl) + + const dbLayer = Database.layer({ + url: Redacted.make(databaseUrl), + ssl: false, + }) + + const run = (effect: Effect.Effect) => + Effect.runPromise( + (effect as Effect.Effect).pipe(Effect.provide(dbLayer), Effect.scoped), + ) + + const reset = () => + run( + Effect.gen(function* () { + const db = yield* Database.Database + yield* db.execute((client) => client.$client.unsafe(TRUNCATE_SQL)) + }), + ) + + const stop = async () => { + await container.stop() + } + + return { + container, + dbLayer, + run, + reset, + stop, + } +} diff --git a/apps/backend/src/test/chat-sync-test-diagnostics.ts b/apps/backend/src/test/chat-sync-test-diagnostics.ts new file mode 100644 index 000000000..40afe59ed --- /dev/null +++ b/apps/backend/src/test/chat-sync-test-diagnostics.ts @@ -0,0 +1,43 @@ +import { appendFileSync, mkdirSync, rmSync } from "node:fs" +import path from "node:path" + +type Primitive = string | number | boolean | null + +export type ChatSyncDiagnosticRecord = { + suite: string + testCase: string + workerMethod: string + action: string + dedupeKey?: string + syncConnectionId?: string + channelLinkId?: string + expected?: Primitive | Record + actual?: Primitive | Record + metadata?: Record +} + +const resolveDiagnosticsDir = (): string => { + const configured = process.env.CHAT_SYNC_TEST_DIAGNOSTICS_DIR + if (configured && configured.trim().length > 0) { + return path.resolve(configured) + } + return path.resolve(process.cwd(), ".artifacts/chat-sync") +} + +const diagnosticsFilePath = (): string => + path.join(resolveDiagnosticsDir(), "chat-sync-diagnostics.jsonl") + +export const resetChatSyncDiagnostics = (): void => { + const dir = resolveDiagnosticsDir() + rmSync(dir, { recursive: true, force: true }) +} + +export const recordChatSyncDiagnostic = (record: ChatSyncDiagnosticRecord): void => { + const dir = resolveDiagnosticsDir() + mkdirSync(dir, { recursive: true }) + appendFileSync( + diagnosticsFilePath(), + `${JSON.stringify({ timestamp: new Date().toISOString(), ...record })}\n`, + "utf8", + ) +} diff --git a/bun.lock b/bun.lock index 488206712..aeeb82c06 100644 --- a/bun.lock +++ b/bun.lock @@ -57,6 +57,7 @@ }, "devDependencies": { "@effect/language-service": "catalog:effect", + "@testcontainers/postgresql": "^10.18.0", "@types/bun": "1.3.8", "drizzle-kit": "^0.31.8", "typescript": "^5.9.3", @@ -3459,7 +3460,7 @@ "siginfo": ["siginfo@2.0.0", "", {}, "sha512-ybx0WO1/8bSBLEWXZvEd7gMW3Sn3JFlW3TvX1nREbDLRNQNaeNN8WK0meBwPdAaOI7TtRRRJn/Es1zhrrCHu7g=="], - "signal-exit": ["signal-exit@4.1.0", "", {}, "sha512-bzyZ1e88w9O1iNJbKnOlvYTrWPDl46O1bG0D3XInv+9tkPrxrN8jUUTiFlDkkmKWgn1M6CfIA13SuGqOa9Korw=="], + "signal-exit": ["signal-exit@3.0.7", "", {}, "sha512-wnD2ZE+l+SPC/uoS0vXeE9L1+0wuaMqKlfz9AMUo38JsyLSBWSFcHR1Rri62LZc12vLr1gb3jl7iwQhgwpAbGQ=="], "simple-swizzle": ["simple-swizzle@0.2.4", "", { "dependencies": { "is-arrayish": "^0.3.1" } }, "sha512-nAu1WFPQSMNr2Zn9PGSZK9AGn4t/y97lEm+MXTtUDwfP0ksAIX4nO+6ruD9Jwut4C49SB1Ws+fbXsm/yScWOHw=="], @@ -3915,6 +3916,8 @@ "@hazel/effect-bun/@types/bun": ["@types/bun@1.3.9", "", { "dependencies": { "bun-types": "1.3.9" } }, "sha512-KQ571yULOdWJiMH+RIWIOZ7B2RXQGpL1YQrBtLIV3FqDcCu6FsbFUBwhdKUlCKUpS3PJDsHlJ1QKlpxoVR+xtw=="], + "@inquirer/core/signal-exit": ["signal-exit@4.1.0", "", {}, "sha512-bzyZ1e88w9O1iNJbKnOlvYTrWPDl46O1bG0D3XInv+9tkPrxrN8jUUTiFlDkkmKWgn1M6CfIA13SuGqOa9Korw=="], + "@inquirer/core/wrap-ansi": ["wrap-ansi@6.2.0", "", { "dependencies": { "ansi-styles": "^4.0.0", "string-width": "^4.1.0", "strip-ansi": "^6.0.0" } }, "sha512-r6lPcBGxZXlIcymEu7InxDMhdW0KDxpLgoFLcguasxCaJ/SOIZwINatK9KY/tf+ZrlywOKU0UDj3ATXUBfxJXA=="], "@isaacs/cliui/string-width": ["string-width@5.1.2", "", { "dependencies": { "eastasianwidth": "^0.2.0", "emoji-regex": "^9.2.2", "strip-ansi": "^7.0.1" } }, "sha512-HnLOCR3vjcY8beoNLtcjZ5/nxn2afmME6lhrDrebokqMap+XbeW8n9TXpPDOqdGK5qcI3oT0GKTW6wC7EMiVqA=="], @@ -4123,6 +4126,8 @@ "filelist/minimatch": ["minimatch@5.1.6", "", { "dependencies": { "brace-expansion": "^2.0.1" } }, "sha512-lKwV/1brpG6mBUFHtb7NUmtABCb2WZZmm2wNiOA5hAb8VdCS4B3dtMWyvcoViccwAW/COERjXLt0zP1zXUN26g=="], + "foreground-child/signal-exit": ["signal-exit@4.1.0", "", {}, "sha512-bzyZ1e88w9O1iNJbKnOlvYTrWPDl46O1bG0D3XInv+9tkPrxrN8jUUTiFlDkkmKWgn1M6CfIA13SuGqOa9Korw=="], + "fumadocs-core/path-to-regexp": ["path-to-regexp@8.3.0", "", {}, "sha512-7jdwVIRtsP8MYpdXSwOS0YdD0Du+qOoF/AEPIt88PcCFrZCzx41oxku1jD88hZBwbNUIEfpqvuhjFaMAqMTWnA=="], "fumadocs-mdx/esbuild": ["esbuild@0.27.2", "", { "optionalDependencies": { "@esbuild/aix-ppc64": "0.27.2", "@esbuild/android-arm": "0.27.2", "@esbuild/android-arm64": "0.27.2", "@esbuild/android-x64": "0.27.2", "@esbuild/darwin-arm64": "0.27.2", "@esbuild/darwin-x64": "0.27.2", "@esbuild/freebsd-arm64": "0.27.2", "@esbuild/freebsd-x64": "0.27.2", "@esbuild/linux-arm": "0.27.2", "@esbuild/linux-arm64": "0.27.2", "@esbuild/linux-ia32": "0.27.2", "@esbuild/linux-loong64": "0.27.2", "@esbuild/linux-mips64el": "0.27.2", "@esbuild/linux-ppc64": "0.27.2", "@esbuild/linux-riscv64": "0.27.2", "@esbuild/linux-s390x": "0.27.2", "@esbuild/linux-x64": "0.27.2", "@esbuild/netbsd-arm64": "0.27.2", "@esbuild/netbsd-x64": "0.27.2", "@esbuild/openbsd-arm64": "0.27.2", "@esbuild/openbsd-x64": "0.27.2", "@esbuild/openharmony-arm64": "0.27.2", "@esbuild/sunos-x64": "0.27.2", "@esbuild/win32-arm64": "0.27.2", "@esbuild/win32-ia32": "0.27.2", "@esbuild/win32-x64": "0.27.2" }, "bin": { "esbuild": "bin/esbuild" } }, "sha512-HyNQImnsOC7X9PMNaCIeAm4ISCQXs5a5YasTXVliKv4uuBo1dKrG0A+uQS8M5eXjVMnLg3WgXaKvprHlFJQffw=="], @@ -4179,8 +4184,6 @@ "proper-lockfile/retry": ["retry@0.12.0", "", {}, "sha512-9LkiTwjUh6rT555DtE9rTX+BKByPfrMzEAtnlEtdEwr3Nkffwiihqe2bWADg+OQRjt9gl6ICdmB/ZFDCGAtSow=="], - "proper-lockfile/signal-exit": ["signal-exit@3.0.7", "", {}, "sha512-wnD2ZE+l+SPC/uoS0vXeE9L1+0wuaMqKlfz9AMUo38JsyLSBWSFcHR1Rri62LZc12vLr1gb3jl7iwQhgwpAbGQ=="], - "protobufjs/@types/node": ["@types/node@24.10.1", "", { "dependencies": { "undici-types": "~7.16.0" } }, "sha512-GNWcUTRBgIRJD5zj+Tq0fKOJ5XZajIiBroOF0yvj2bSU1WvNdYS/dn9UxwsujGW4JX06dnHyjV2y9rRaybH0iQ=="], "readdir-glob/minimatch": ["minimatch@5.1.6", "", { "dependencies": { "brace-expansion": "^2.0.1" } }, "sha512-lKwV/1brpG6mBUFHtb7NUmtABCb2WZZmm2wNiOA5hAb8VdCS4B3dtMWyvcoViccwAW/COERjXLt0zP1zXUN26g=="],