Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/worker/BBBLiveStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,13 @@ export class BBBLiveStream{
logger.debug("Browser console: "+msg);
};

this.bbbStream = await getStream(this.page, bbbStreamOptions, consoleLog);
try {
this.bbbStream = await getStream(this.page, bbbStreamOptions, consoleLog);
logger.info("Successfully initialized stream");
} catch (error) {
logger.error({ error, options: bbbStreamOptions }, 'Failed to get stream');
throw error;
}

this.waitForMeetingEnded();

Expand Down
33 changes: 26 additions & 7 deletions src/worker/PuppeteerStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,18 @@ export async function getStream(page: Page, opts: getStreamOptions, consoleLog:
if (!opts.frameSize) opts.frameSize = 20;
const retryPolicy = Object.assign({}, { each: 20, times: 3 }, opts.retry);

consoleLog("Getting extension page...");
const extension = await getExtensionPage(page.browser());

extension.on('console', (message: any) => {
consoleLog(message.text());
});

consoleLog("Acquiring lock for tab query...");
await lock();

await page.bringToFront();
consoleLog("Querying for tab...");
const [tab] = await extension.evaluate(
async (x) => {
// @ts-ignore
Expand All @@ -291,15 +294,21 @@ export async function getStream(page: Page, opts: getStreamOptions, consoleLog:
);

unlock();
if (!tab) throw new Error("Cannot find tab, try providing your own tabQuery to getStream options");
if (!tab) {
consoleLog("ERROR: Cannot find tab");
throw new Error("Cannot find tab, try providing your own tabQuery to getStream options");
}
consoleLog(`Found tab with id: ${tab.id}`);

const stream = new PassThrough();


function onConnection(ws: WebSocket, req: IncomingMessage) {
const url = new URL(`http://localhost:${port}${req.url}`);
consoleLog("WebSocket connection established");

async function close() {
consoleLog("Closing stream connection");
if (!stream.readableEnded && !stream.writableEnded) stream.end();
if (!extension.isClosed() && extension.browser().isConnected()) {
// @ts-ignore
Expand Down Expand Up @@ -327,13 +336,23 @@ export async function getStream(page: Page, opts: getStreamOptions, consoleLog:
(await wss).on("connection", onConnection);

await page.bringToFront();
consoleLog("Asserting extension is loaded...");
await assertExtensionLoaded(extension, retryPolicy);
consoleLog("Extension loaded successfully");

await extension.evaluate(
// @ts-ignore
(settings) => START_RECORDING(settings),
{ ...opts, tabId: tab.id }
);
try {
consoleLog("Starting recording in browser...");
await extension.evaluate(
// @ts-ignore
(settings) => START_RECORDING(settings),
{ ...opts, tabId: tab.id }
);
consoleLog("Recording started successfully");
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
consoleLog(`[ERROR] Failed to start recording: ${errorMessage}`);
throw error;
}

const mute = () => {
if (!extension.isClosed() && extension.browser().isConnected()) {
Expand Down Expand Up @@ -364,4 +383,4 @@ async function assertExtensionLoaded(ext: Page, opt: getStreamOptions["retry"])
await wait(Math.pow(opt.each, currentTick));
}
throw new Error("Could not find START_RECORDING function in the browser context");
}
}
147 changes: 88 additions & 59 deletions src/worker/extension/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,77 +49,106 @@ async function START_RECORDING(opts: recordingOptions) {
})
);

const client = new WebSocket(`ws://localhost:${window.location.hash.substring(1)}`, []);

await new Promise<void>((resolve) => {
if (client.readyState === WebSocket.OPEN) resolve();
client.addEventListener("open", () => resolve());
});


stream = await new Promise<MediaStream>((resolve, reject) => {
chrome.tabCapture.capture(
{
audio: opts.audio,
video: opts.video,
audioConstraints: opts.audioConstraints,
videoConstraints: opts.videoConstraints,
},
(stream) => {
if (chrome.runtime.lastError || !stream) {
console.error(chrome.runtime.lastError?.message);
reject(chrome.runtime.lastError?.message);
} else {
resolve(stream);
try {
const client = new WebSocket(`ws://localhost:${window.location.hash.substring(1)}`, []);

await new Promise<void>((resolve, reject) => {
if (client.readyState === WebSocket.OPEN) resolve();
client.addEventListener("open", () => {
console.log("[PUPPETEER_STREAM] WebSocket connected");
resolve();
});
client.addEventListener("error", (error) => {
console.error("[PUPPETEER_STREAM] WebSocket connection error:", error);
reject(new Error("WebSocket connection failed"));
});
// Add timeout
setTimeout(() => reject(new Error("WebSocket connection timeout")), 10000);
});

console.log("[PUPPETEER_STREAM] Capturing tab with id:", opts.tabId);

stream = await new Promise<MediaStream>((resolve, reject) => {
chrome.tabCapture.capture(
{
audio: opts.audio,
video: opts.video,
audioConstraints: opts.audioConstraints,
videoConstraints: opts.videoConstraints,
},
(stream) => {
if (chrome.runtime.lastError || !stream) {
console.error("[PUPPETEER_STREAM] Tab capture error:", chrome.runtime.lastError?.message);
reject(chrome.runtime.lastError?.message);
} else {
console.log("[PUPPETEER_STREAM] Tab captured successfully");
resolve(stream);
}
}
}
);
});
);
});

/*var constraints = { frameRate: 30 };
stream.getVideoTracks()[0].applyConstraints(constraints).catch((e: any) => console.log(e));
*/
/*var constraints = { frameRate: 30 };
stream.getVideoTracks()[0].applyConstraints(constraints).catch((e: any) => console.log(e));
*/

// somtimes needed to sync audio and video
if (opts.delay) await new Promise((resolve) => setTimeout(resolve, opts.delay));
// somtimes needed to sync audio and video
if (opts.delay) {
console.log(`[PUPPETEER_STREAM] Applying delay: ${opts.delay}ms`);
await new Promise((resolve) => setTimeout(resolve, opts.delay));
}

recorder = new MediaRecorder(stream, {
audioBitsPerSecond: opts.audioBitsPerSecond,
videoBitsPerSecond: opts.videoBitsPerSecond,
bitsPerSecond: opts.bitsPerSecond,
mimeType: opts.mimeType,
});
console.log("[PUPPETEER_STREAM] Creating MediaRecorder with mimeType:", opts.mimeType);

recorder.ondataavailable = async (e) => {
if (!e.data.size) return;
recorder = new MediaRecorder(stream, {
audioBitsPerSecond: opts.audioBitsPerSecond,
videoBitsPerSecond: opts.videoBitsPerSecond,
bitsPerSecond: opts.bitsPerSecond,
mimeType: opts.mimeType,
});

const buffer = await e.data.arrayBuffer();
recorder.ondataavailable = async (e) => {
if (!e.data.size) return;

client.send(buffer);
};
const buffer = await e.data.arrayBuffer();

// TODO: recorder onerror
client.send(buffer);
};

recorder.onerror = () => recorder.stop();
recorder.onerror = (event: any) => {
console.error("[PUPPETEER_STREAM] MediaRecorder error:", event);
recorder.stop();
};

recorder.onstop = function () {
try {
const tracks = stream.getTracks();
recorder.onstop = function () {
console.log("[PUPPETEER_STREAM] MediaRecorder stopped");
try {
const tracks = stream.getTracks();

tracks.forEach(function (track) {
track.stop();
});
tracks.forEach(function (track) {
track.stop();
});

if (client.readyState === WebSocket.OPEN) client.close();
} catch (error) {}
};
stream.onremovetrack = () => {
try {
recorder.stop();
} catch (error) {}
};
if (client.readyState === WebSocket.OPEN) client.close();
} catch (error) {
console.error("[PUPPETEER_STREAM] Error in recorder.onstop:", error);
}
};
stream.onremovetrack = () => {
console.log("[PUPPETEER_STREAM] Stream track removed");
try {
recorder.stop();
} catch (error) {
console.error("[PUPPETEER_STREAM] Error stopping recorder on track removal:", error);
}
};

recorder.start(opts.frameSize);
recorder.start(opts.frameSize);
console.log("[PUPPETEER_STREAM] Recording started with frameSize:", opts.frameSize);
} catch (error) {
console.error("[PUPPETEER_STREAM] START_RECORDING failed:", error);
throw error;
}
}

function MUTE() {
Expand All @@ -140,4 +169,4 @@ function STOP_RECORDING() {
if (recorder.state === "inactive") return;

recorder.stop();
}
}
18 changes: 14 additions & 4 deletions src/worker/sandbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,17 @@ import { SandboxedJob } from 'bullmq';
import { BBBLiveStream } from "./BBBLiveStream";

module.exports = async (job: SandboxedJob) => {
console.log("start job "+job.id);
const livestream = new BBBLiveStream(job);
return await livestream.startStream();
};
console.log("Starting job "+job.id);
try {
const livestream = new BBBLiveStream(job);
const result = await livestream.startStream();
console.log("Job "+job.id+" completed successfully with result: "+result);
return result;
} catch (error) {
console.error("Job "+job.id+" failed with error:", error);
if (error instanceof Error) {
console.error("Error stack:", error.stack);
}
throw error;
}
};