Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 75 additions & 201 deletions src/commands/rooms/messages/reactions/subscribe.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {
ChatClient,
RoomStatus,
Subscription,
MessageReactionRawEvent,
MessageReactionSummaryEvent,
MessageReactionSummary,
Expand All @@ -10,6 +9,7 @@ import { Args, Flags } from "@oclif/core";
import chalk from "chalk";

import { ChatBaseCommand } from "../../../../chat-base-command.js";
import { waitUntilInterruptedOrTimeout } from "../../../../utils/long-running.js";

export default class MessagesReactionsSubscribe extends ChatBaseCommand {
static override args = {
Expand All @@ -35,12 +35,15 @@ export default class MessagesReactionsSubscribe extends ChatBaseCommand {
"Subscribe to raw individual reaction events instead of summaries",
default: false,
}),
duration: Flags.integer({
description:
"Automatically exit after the given number of seconds (0 = run indefinitely)",
char: "D",
required: false,
}),
};

private chatClient: ChatClient | null = null;
private unsubscribeReactionsFn: Subscription | null = null;
private unsubscribeRawReactionsFn: Subscription | null = null;
private unsubscribeStatusFn: (() => void) | null = null;

async run(): Promise<void> {
const { args, flags } = await this.parse(MessagesReactionsSubscribe);
Expand Down Expand Up @@ -100,56 +103,53 @@ export default class MessagesReactionsSubscribe extends ChatBaseCommand {
"subscribingToStatus",
"Subscribing to room status changes",
);
const { off: unsubscribeStatus } = chatRoom.onStatusChange(
(statusChange) => {
let reason: Error | null | string | undefined;
if (statusChange.current === RoomStatus.Failed) {
reason = chatRoom.error; // Get reason from chatRoom.error on failure
}
chatRoom.onStatusChange((statusChange) => {
let reason: Error | null | string | undefined;
if (statusChange.current === RoomStatus.Failed) {
reason = chatRoom.error; // Get reason from chatRoom.error on failure
}

const reasonMsg = reason instanceof Error ? reason.message : reason;
this.logCliEvent(
flags,
"room",
`status-${statusChange.current}`,
`Room status changed to ${statusChange.current}`,
{ reason: reasonMsg },
);

switch (statusChange.current) {
case RoomStatus.Attached: {
if (!this.shouldOutputJson(flags)) {
this.log(chalk.green("Successfully connected to Ably"));
this.log(
`Listening for message reactions in room ${chalk.cyan(room)}. Press Ctrl+C to exit.`,
);
}
const reasonMsg = reason instanceof Error ? reason.message : reason;
this.logCliEvent(
flags,
"room",
`status-${statusChange.current}`,
`Room status changed to ${statusChange.current}`,
{ reason: reasonMsg },
);

break;
switch (statusChange.current) {
case RoomStatus.Attached: {
if (!this.shouldOutputJson(flags)) {
this.log(chalk.green("Successfully connected to Ably"));
this.log(
`Listening for message reactions in room ${chalk.cyan(room)}. Press Ctrl+C to exit.`,
);
}

case RoomStatus.Detached: {
if (!this.shouldOutputJson(flags)) {
this.log(chalk.yellow("Disconnected from Ably"));
}
break;
}

break;
case RoomStatus.Detached: {
if (!this.shouldOutputJson(flags)) {
this.log(chalk.yellow("Disconnected from Ably"));
}

case RoomStatus.Failed: {
if (!this.shouldOutputJson(flags)) {
this.error(
`${chalk.red("Connection failed:")} ${reasonMsg || "Unknown error"}`,
);
}
break;
}

break;
case RoomStatus.Failed: {
if (!this.shouldOutputJson(flags)) {
this.error(
`${chalk.red("Connection failed:")} ${reasonMsg || "Unknown error"}`,
);
}
// No default

break;
}
},
);
this.unsubscribeStatusFn = unsubscribeStatus;
// No default
}
});
this.logCliEvent(
flags,
"room",
Expand All @@ -171,36 +171,35 @@ export default class MessagesReactionsSubscribe extends ChatBaseCommand {
"subscribingRaw",
"Subscribing to raw reaction events",
);
this.unsubscribeRawReactionsFn =
chatRoom.messages.reactions.subscribeRaw(
(event: MessageReactionRawEvent) => {
const timestamp = new Date().toISOString();
const eventData = {
type: event.type,
serial: event.reaction.messageSerial,
reaction: event.reaction,
room,
timestamp,
};
this.logCliEvent(
flags,
"reactions",
"rawReceived",
"Raw reaction event received",
eventData,
);
chatRoom.messages.reactions.subscribeRaw(
(event: MessageReactionRawEvent) => {
const timestamp = new Date().toISOString();
const eventData = {
type: event.type,
serial: event.reaction.messageSerial,
reaction: event.reaction,
room,
timestamp,
};
this.logCliEvent(
flags,
"reactions",
"rawReceived",
"Raw reaction event received",
eventData,
);

if (this.shouldOutputJson(flags)) {
this.log(
this.formatJsonOutput({ success: true, ...eventData }, flags),
);
} else {
this.log(
`[${chalk.dim(timestamp)}] ${chalk.green("⚡")} ${chalk.blue(event.reaction.clientId || "Unknown")} [${event.reaction.type}] ${event.type}: ${chalk.yellow(event.reaction.name || "unknown")} to message ${chalk.cyan(event.reaction.messageSerial)}`,
);
}
},
);
if (this.shouldOutputJson(flags)) {
this.log(
this.formatJsonOutput({ success: true, ...eventData }, flags),
);
} else {
this.log(
`[${chalk.dim(timestamp)}] ${chalk.green("⚡")} ${chalk.blue(event.reaction.clientId || "Unknown")} [${event.reaction.type}] ${event.type}: ${chalk.yellow(event.reaction.name || "unknown")} to message ${chalk.cyan(event.reaction.messageSerial)}`,
);
}
},
);
this.logCliEvent(
flags,
"reactions",
Expand All @@ -215,7 +214,7 @@ export default class MessagesReactionsSubscribe extends ChatBaseCommand {
"subscribing",
"Subscribing to reaction summaries",
);
this.unsubscribeReactionsFn = chatRoom.messages.reactions.subscribe(
chatRoom.messages.reactions.subscribe(
(event: MessageReactionSummaryEvent) => {
const timestamp = new Date().toISOString();

Expand Down Expand Up @@ -296,133 +295,8 @@ export default class MessagesReactionsSubscribe extends ChatBaseCommand {
"Listening for message reactions...",
);

// Keep the process running until interrupted
await new Promise<void>((resolve) => {
let cleanupInProgress = false;
const cleanup = async () => {
if (cleanupInProgress) return;
cleanupInProgress = true;
this.logCliEvent(
flags,
"reactions",
"cleanupInitiated",
"Cleanup initiated (Ctrl+C pressed)",
);
if (!this.shouldOutputJson(flags)) {
this.log(
`\n${chalk.yellow("Unsubscribing and closing connection...")}`,
);
}

// Set a force exit timeout
const forceExitTimeout = setTimeout(() => {
const errorMsg = "Force exiting after timeout during cleanup";
this.logCliEvent(flags, "reactions", "forceExit", errorMsg, {
room,
});
if (!this.shouldOutputJson(flags)) {
this.log(chalk.red("Force exiting after timeout..."));
}
}, 5000);

// Unsubscribe from reactions
if (this.unsubscribeReactionsFn) {
try {
this.logCliEvent(
flags,
"reactions",
"unsubscribing",
"Unsubscribing from reaction summaries",
);
this.unsubscribeReactionsFn.unsubscribe();
this.logCliEvent(
flags,
"reactions",
"unsubscribed",
"Unsubscribed from reaction summaries",
);
} catch (error) {
const errorMsg =
error instanceof Error ? error.message : String(error);
this.logCliEvent(
flags,
"reactions",
"unsubscribeError",
`Error unsubscribing from reactions: ${errorMsg}`,
{ error: errorMsg },
);
}
}

// Unsubscribe from raw reactions
if (this.unsubscribeRawReactionsFn) {
try {
this.logCliEvent(
flags,
"reactions",
"unsubscribingRaw",
"Unsubscribing from raw reaction events",
);
this.unsubscribeRawReactionsFn.unsubscribe();
this.logCliEvent(
flags,
"reactions",
"unsubscribedRaw",
"Unsubscribed from raw reaction events",
);
} catch (error) {
const errorMsg =
error instanceof Error ? error.message : String(error);
this.logCliEvent(
flags,
"reactions",
"unsubscribeRawError",
`Error unsubscribing from raw reactions: ${errorMsg}`,
{ error: errorMsg },
);
}
}

// Unsubscribe from status changes
if (this.unsubscribeStatusFn) {
try {
this.logCliEvent(
flags,
"room",
"unsubscribingStatus",
"Unsubscribing from room status",
);
this.unsubscribeStatusFn();
this.logCliEvent(
flags,
"room",
"unsubscribedStatus",
"Unsubscribed from room status",
);
} catch (error) {
const errorMsg =
error instanceof Error ? error.message : String(error);
this.logCliEvent(
flags,
"room",
"unsubscribeStatusError",
`Error unsubscribing from status: ${errorMsg}`,
{ error: errorMsg },
);
}
}

if (!this.shouldOutputJson(flags)) {
this.log(chalk.green("Successfully disconnected."));
}

clearTimeout(forceExitTimeout);
resolve();
};

process.on("SIGINT", () => void cleanup());
process.on("SIGTERM", () => void cleanup());
});
// Wait until the user interrupts or the optional duration elapses
await waitUntilInterruptedOrTimeout(flags.duration);
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error);
this.logCliEvent(flags, "reactions", "fatalError", `Error: ${errorMsg}`, {
Expand Down
Loading
Loading