From ff2dce813e2ac750537963d7d059431a2d4e5cc6 Mon Sep 17 00:00:00 2001 From: aliamerj Date: Sun, 18 May 2025 16:52:18 +0300 Subject: [PATCH 1/2] remove google drive from cloud --- components/Connectors/Connectors.tsx | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/components/Connectors/Connectors.tsx b/components/Connectors/Connectors.tsx index efa7912..97cbca3 100644 --- a/components/Connectors/Connectors.tsx +++ b/components/Connectors/Connectors.tsx @@ -3,14 +3,18 @@ import { FaGoogleDrive, FaDropbox } from "react-icons/fa"; import { ConnectionBtn } from './ConnectionBtn'; import { SiAwslambda } from 'react-icons/si'; +const isCloud = process.env.DCUP_ENV === 'CLOUD'; + export const Connectors = async () => { const connectors = [ - { - id: 'google-drive', - name: 'Google Drive', - icon: , - description: 'Connect your Google Drive to access documents and files', - }, + ...(!isCloud ? [ + { + id: 'google-drive', + name: 'Google Drive', + icon: , + description: 'Connect your Google Drive to access documents and files', + }, + ] : []), { id: "dropbox", name: "Dropbox", From 6d8d937a2e2dbb4ecbfba254c9a9cbf485680210 Mon Sep 17 00:00:00 2001 From: aliamerj Date: Sat, 24 May 2025 23:19:33 +0300 Subject: [PATCH 2/2] user should be able to stop processing files when ever he/she wants --- .dockerignore | 3 + DataSource/index.tsx | 16 ++-- actions/connctions/set/index.ts | 7 +- actions/connctions/stop/index.ts | 31 ++++++++ actions/connctions/sync/index.ts | 11 ++- app/api/connections/[id]/files/route.ts | 12 +++ app/api/connections/[id]/route.ts | 26 +++++-- app/api/connections/[id]/stop/route.ts | 78 ++++++++++++++++++++ components/Connections/Connections.tsx | 8 ++ components/StopConnection/StopConnection.tsx | 46 ++++++++++++ cypress/e2e/directUpload.cy.ts | 47 +++++++++++- cypress/e2e/dropbox.cy.ts | 52 +++++++++++++ db/schemas/connections.ts | 1 + drizzle/meta/_journal.json | 14 ++++ fileProcessors/connectors/index.ts | 1 - fileProcessors/index.ts | 30 +++++--- next.config.js | 5 ++ workers/queues/jobs/processFiles.job.ts | 39 +++++++--- 18 files changed, 385 insertions(+), 42 deletions(-) create mode 100644 actions/connctions/stop/index.ts create mode 100644 app/api/connections/[id]/stop/route.ts create mode 100644 components/StopConnection/StopConnection.tsx diff --git a/.dockerignore b/.dockerignore index 6f4fb3d..c1ba168 100644 --- a/.dockerignore +++ b/.dockerignore @@ -5,3 +5,6 @@ npm-debug.log *.md .next .git +cypress.config.* +.cypress/* +cypress/ diff --git a/DataSource/index.tsx b/DataSource/index.tsx index 8949011..28b8786 100644 --- a/DataSource/index.tsx +++ b/DataSource/index.tsx @@ -7,6 +7,7 @@ import { ConfigDropbox } from './Dropbox/ConfigDropbox'; import { DropboxPickerProvider } from './Dropbox/DropboxPicker/dropbox-picker.context'; import { ConfigGoogleDrive } from './GoogleDrive/ConfigGoogleDrive'; import { ConfigAws } from './Aws/ConfigAws'; +import { StopConnection } from '@/components/StopConnection/StopConnection'; export const DataSource = ({ connection, token, status }: { connection: ConnectionQuery, token: string | undefined | null, status: "PROCESSING" | "FINISHED" | undefined }) => { switch (connection.service) { @@ -15,11 +16,13 @@ export const DataSource = ({ connection, token, status }: { connection: Connecti {connection.isConfigSet && } + case "DIRECT_UPLOAD": return <> + case 'DROPBOX': return <> @@ -28,17 +31,20 @@ export const DataSource = ({ connection, token, status }: { connection: Connecti + - case 'AWS': + case 'AWS': return <> - {connection.isConfigSet && } - - - + {connection.isConfigSet && } + + + + default: return <> {connection.isConfigSet && } + } } diff --git a/actions/connctions/set/index.ts b/actions/connctions/set/index.ts index b82b968..2a46990 100644 --- a/actions/connctions/set/index.ts +++ b/actions/connctions/set/index.ts @@ -1,5 +1,7 @@ "use server" import { authOptions } from "@/auth"; +import { databaseDrizzle } from "@/db"; +import { connections } from "@/db/schemas/connections"; import { setConnectionToProcess } from "@/fileProcessors/connectors"; import { fromErrorToFormState, toFormState } from "@/lib/zodErrorHandle"; import { addToProcessFilesQueue } from "@/workers/queues/jobs/processFiles.job"; @@ -17,7 +19,10 @@ export async function setConnectionConfig(_: FormState, formData: FormData) { formData.set("userId", session.user.id) const config = await setConnectionToProcess(formData) - await addToProcessFilesQueue(config) + const jobId = await addToProcessFilesQueue(config) + await databaseDrizzle.update(connections).set({ + jobId: jobId + }) revalidatePath("/connections"); return toFormState("SUCCESS", "start processing"); } catch (e) { diff --git a/actions/connctions/stop/index.ts b/actions/connctions/stop/index.ts new file mode 100644 index 0000000..331e4c6 --- /dev/null +++ b/actions/connctions/stop/index.ts @@ -0,0 +1,31 @@ +"use server" +import { authOptions } from "@/auth"; +import { databaseDrizzle } from "@/db"; +import { fromErrorToFormState, toFormState } from "@/lib/zodErrorHandle"; +import { redisConnection } from "@/workers/redis"; +import { getServerSession } from "next-auth"; + +type FormState = { + message: string; +}; + +export async function stopProcessing(_: FormState, formData: FormData) { + const session = await getServerSession(authOptions); + try { + if (!session?.user?.id) throw new Error("forbidden"); + const connectionId = formData.get("connectionId"); + if (!connectionId) throw new Error("Missing connection id") + + const conn = await databaseDrizzle.query.connections.findFirst({ + where: (c, u) => u.eq(c.id, connectionId.toString()), + columns: { + jobId: true, + } + }) + if (!conn || !conn.jobId) throw new Error("Connection Not Found or connection not processing in this time") + await redisConnection.set(`cancel-job:${conn.jobId}`, '1'); + return toFormState("SUCCESS", "stop processing"); + } catch (e) { + return fromErrorToFormState(e); + } +} diff --git a/actions/connctions/sync/index.ts b/actions/connctions/sync/index.ts index 60d355d..f59f397 100644 --- a/actions/connctions/sync/index.ts +++ b/actions/connctions/sync/index.ts @@ -57,10 +57,6 @@ export const syncConnectionConfig = async (_: FormState, formData: FormData) => }) if (!user) throw new Error("no such account") - await databaseDrizzle.update(connections).set({ - isSyncing: true, - }).where(eq(connections.id, connectionId)); - const { currentConn, othersConn } = user.connections.reduce((acc, c) => { if (c.id === connectionId) { acc.currentConn = c; @@ -71,8 +67,7 @@ export const syncConnectionConfig = async (_: FormState, formData: FormData) => }, { currentConn: null as Conn | null, othersConn: [] as Conn[] }); if (!currentConn) throw new Error("no such connection") - - await addToProcessFilesQueue({ + const jobId = await addToProcessFilesQueue({ connectionId: connectionId, service: currentConn.service, metadata: currentConn.metadata || null, @@ -81,6 +76,10 @@ export const syncConnectionConfig = async (_: FormState, formData: FormData) => files: [], links: [] }) + await databaseDrizzle.update(connections).set({ + isSyncing: true, + jobId: jobId + }).where(eq(connections.id, connectionId)); revalidatePath("/connections"); return toFormState("SUCCESS", "start processing"); diff --git a/app/api/connections/[id]/files/route.ts b/app/api/connections/[id]/files/route.ts index 097ebe9..0654e33 100644 --- a/app/api/connections/[id]/files/route.ts +++ b/app/api/connections/[id]/files/route.ts @@ -85,6 +85,8 @@ export async function DELETE(request: NextRequest, { params }: Params) { where: (c, ops) => ops.and(ops.eq(c.userId, userId), ops.eq(c.id, id)), columns: { id: true, + isSyncing: true, + jobId: true, }, }) @@ -94,6 +96,16 @@ export async function DELETE(request: NextRequest, { params }: Params) { message: "Connection not found or access denied", }, { status: 403 }) } + + if (connection.isSyncing || connection.jobId) { + return NextResponse.json( + { + code: 'processing_in_progress', + message: 'Cannot delete while files are being processed. Please cancel the active processing operation first.' + }, + { status: 409 } + ) + } const filesToDelete = validation.data.file ? [validation.data.file] : validation.data.files ?? []; diff --git a/app/api/connections/[id]/route.ts b/app/api/connections/[id]/route.ts index 7394b83..ac1954e 100644 --- a/app/api/connections/[id]/route.ts +++ b/app/api/connections/[id]/route.ts @@ -107,7 +107,10 @@ export async function DELETE(request: NextRequest, { params }: Params) { const { id } = await params const { data: conn, error: deleteError } = await tryAndCatch(databaseDrizzle.query.connections.findFirst({ where: (conn, ops) => ops.and(ops.eq(conn.id, id), ops.eq(conn.userId, userId)), - columns: {}, + columns: { + jobId: true, + isSyncing: true, + }, with: { files: { columns: { @@ -136,16 +139,25 @@ export async function DELETE(request: NextRequest, { params }: Params) { ) } + if (conn.isSyncing || conn.jobId) { + return NextResponse.json( + { + code: 'processing_in_progress', + message: 'Cannot delete while files are being processed. Please cancel the active processing operation first.' + }, + { status: 409 } + ) + } for (const { chunksIds } of conn.files) { - await qdrantCLient.delete(qdrant_collection_name, { + await tryAndCatch(qdrantCLient.delete(qdrant_collection_name, { points: chunksIds, wait: wait === "true", - }) + })) } - await databaseDrizzle + await tryAndCatch(databaseDrizzle .delete(connections) - .where(eq(connections.id, id)) + .where(eq(connections.id, id))) return NextResponse.json({ code: "ok", @@ -246,10 +258,10 @@ export async function PUT(request: NextRequest, { params }: Params) { let error: Error | null = null; if (links.length > 0 || files.length > 0) { - const {error:err} = await tryAndCatch(directProcessFiles(filesConfig)) + const { error: err } = await tryAndCatch(directProcessFiles(filesConfig)) error = err; } else { - const {error:err} = await tryAndCatch(connectionProcessFiles(filesConfig)) + const { error: err } = await tryAndCatch(connectionProcessFiles(filesConfig)) error = err } diff --git a/app/api/connections/[id]/stop/route.ts b/app/api/connections/[id]/stop/route.ts new file mode 100644 index 0000000..7ae26de --- /dev/null +++ b/app/api/connections/[id]/stop/route.ts @@ -0,0 +1,78 @@ +import { databaseDrizzle } from "@/db" +import { checkAuth } from "@/lib/api_key" +import { tryAndCatch } from "@/lib/try-catch" +import { redisConnection } from "@/workers/redis" +import { NextRequest, NextResponse } from "next/server" +import { APIError } from "openai" + + +type Params = { + params: Promise<{ + id: string + }> +} + +export async function POST(request: NextRequest, { params }: Params) { + try { + const { + data: userId, + error: authError, + } = await tryAndCatch(checkAuth(request)) + if (authError) { + return NextResponse.json({ + code: authError.code, + message: authError.message, + }, { status: authError.status }) + } + + const { id } = await params + const { data: conn, error: queryError } = await tryAndCatch(databaseDrizzle.query.connections.findFirst({ + where: (conn, ops) => ops.and(ops.eq(conn.userId, userId!), ops.eq(conn.id, id)), + columns: { + jobId: true, + } + })) + if (queryError) { + return NextResponse.json( + { + code: "internal_server_error", + message: "Failed to load connection status", + }, + { status: 500 }, + ) + } + if (!conn) { + return NextResponse.json( + { code: 'not_found', message: 'Connection not found' }, + { status: 404 } + ) + } + if (!conn.jobId) { + return NextResponse.json( + { code: 'not_processing', message: 'No active processing job' }, + { status: 400 } + ) + } + const { error: redisError } = await tryAndCatch( + redisConnection.set(`cancel-job:${conn.jobId}`, '1') + ) + + if (redisError) { + return NextResponse.json( + { code: 'internal_error', message: 'Failed to cancel processing' }, + { status: 500 } + ) + } + + return NextResponse.json( + { code: 'ok', message: 'Processing cancellation requested' }, + { status: 200 } + ) + + } catch (error: any) { + return NextResponse.json( + { code: "internal_server_error", message: error.message }, + { status: 500 }, + ); + } +} diff --git a/components/Connections/Connections.tsx b/components/Connections/Connections.tsx index 1ce1518..239f8b0 100644 --- a/components/Connections/Connections.tsx +++ b/components/Connections/Connections.tsx @@ -19,9 +19,11 @@ import { TooltipProvider, TooltipTrigger, } from "@/components/ui/tooltip" +import { useRouter } from "next/navigation"; export default function Connections({ connections, tokens }: { connections: ConnectionQuery[], tokens: ConnectionToken }) { const [isMounted, setIsMounted] = useState(false) + const route = useRouter() const [connProgress, setConnProgress] = useState(null); useEffect(() => { @@ -41,6 +43,12 @@ export default function Connections({ connections, tokens }: { connections: Conn }, []) + useEffect(() => { + if (connProgress?.status === 'FINISHED') { + route.refresh(); + } + }, [connProgress]); + return connections.map(connection => { const progress = connection.id === connProgress?.connectionId ? connProgress : null; diff --git a/components/StopConnection/StopConnection.tsx b/components/StopConnection/StopConnection.tsx new file mode 100644 index 0000000..e54f8ab --- /dev/null +++ b/components/StopConnection/StopConnection.tsx @@ -0,0 +1,46 @@ +"use client" +import { Button } from "../ui/button" +import { Pause } from "lucide-react" +import { useTransition } from "react" +import { toast } from "@/hooks/use-toast" +import { EMPTY_FORM_STATE } from "@/lib/zodErrorHandle" +import { ConnectionQuery } from "@/app/(protected)/connections/page" +import { stopProcessing } from "@/actions/connctions/stop" + + +export const StopConnection = ({ connection, status }: { + connection: ConnectionQuery, + status: "PROCESSING" | "FINISHED" | undefined +}) => { + const [isPending, startTransition] = useTransition(); + + const handleStopConnection = () => { + startTransition(async () => { + try { + const formData = new FormData(); + formData.set("connectionId", connection.id) + const res = await stopProcessing(EMPTY_FORM_STATE, formData) + if (res.status !== 'SUCCESS') { + throw new Error(res.message) + } + toast({ + title: res.message, + }); + + } catch (error: any) { + toast({ + variant: "destructive", + title: "Uh oh! Something went wrong.", + description: error.message || "An unexpected error occurred.", + }); + } + }) + } + + return ( + + ) +} diff --git a/cypress/e2e/directUpload.cy.ts b/cypress/e2e/directUpload.cy.ts index 8e8413c..1de2ecb 100644 --- a/cypress/e2e/directUpload.cy.ts +++ b/cypress/e2e/directUpload.cy.ts @@ -250,7 +250,7 @@ describe("Direct Upload UI", () => { cy.get('[data-test="folderName"]').should('contain.text', "*") cy.get('[data-test="processedFile"]').should('contain.text', 1) cy.get('[data-test="processedPage"]').should('contain.text', 2) - + // remove the only stored pdf file cy.get('[data-test="btn-config"]') .click() @@ -444,6 +444,50 @@ describe("Direct Upload UI", () => { expect(points).eq(0) }) }) + + it('should handle processing cancellation with progress preservation', () => { + + cy.uploadFiles({ files: ['invo.pdf', "sample.pdf"] }) + cy.wait(1000) + + const targetState = { file: 1, page: 2 } + let found = false + + const checkProgress = (retries = 0) => { + cy.get('[data-test="processedFile"]').invoke('text').then(fileText => { + cy.get('[data-test="processedPage"]').invoke('text').then(pageText => { + const currentFile = parseInt(fileText) + const currentPage = parseInt(pageText) + + if (currentFile >= targetState.file && currentPage >= targetState.page) { + found = true + cy.get('[data-test="stop-connection"]').click() + return + } + + if (!found) { + cy.wait(1000) // Check every 500ms + checkProgress(retries + 1) + } + }) + }) + } + checkProgress() + + // UI assertions + cy.get('[data-test="processedFile"]').should('contain', targetState.file) + cy.get('[data-test="processedPage"]').should('contain', targetState.page) + + cy.task("getConnection", { email: fakeUser.email }) + .then(({ conns }: any) => { + const conn = (conns as FileConnectionQuery[])[0] + expect(conn.service).eq("DIRECT_UPLOAD") + expect(conn.metadata).eq("{}") + expect(conn.limitPages).to.be.null + expect(conn.limitFiles).to.be.null + cy.checkIndexedFiles({ conn, files: [{ name: "invo.pdf", totalPages: 2 }, { name: "sample.pdf", totalPages: 0 }] }) + }) + }) }) describe("Direct Upload API", () => { @@ -607,6 +651,7 @@ describe("Direct Upload API", () => { }) }) }) + it('should enforce page limits during file operations and maintain constraints', () => { // Upload 1 pdf with 3 pages, it should process only 2 cy.task('addNewUser', fakeUser).then(user => { diff --git a/cypress/e2e/dropbox.cy.ts b/cypress/e2e/dropbox.cy.ts index 69387cd..dc4db06 100644 --- a/cypress/e2e/dropbox.cy.ts +++ b/cypress/e2e/dropbox.cy.ts @@ -509,4 +509,56 @@ describe("Dropbox connection UI Testing", () => { expect(points).eq(0) }) }) + + it('should handle processing cancellation with progress preservation', () => { + + cy.task('getConnections', { email: fakeUser.email }) + .then(res => { + const { conns } = res as { conns: ConnectionTable[] } + cy.get(`[data-test="btn-config-${conns[0].identifier}"]`) + .click() + .get('input[name="folderName"]') + .clear() + .type('_TEST_/invo.pdf/sample.pdf') + .get(`[data-test="btn-config-connection"]`) + .click() + }) + cy.wait(1000) + + const targetState = { file: 1, page: 2 } + let found = false + + const checkProgress = (retries = 0) => { + cy.get('[data-test="processedFile"]').invoke('text').then(fileText => { + cy.get('[data-test="processedPage"]').invoke('text').then(pageText => { + const currentFile = parseInt(fileText) + const currentPage = parseInt(pageText) + + if (currentFile >= targetState.file && currentPage >= targetState.page) { + found = true + cy.get('[data-test="stop-connection"]').click() + return + } + + if (!found) { + cy.wait(1000) // Check every 500ms + checkProgress(retries + 1) + } + }) + }) + } + checkProgress() + + // UI assertions + cy.get('[data-test="processedFile"]').should('contain', targetState.file) + cy.get('[data-test="processedPage"]').should('contain', targetState.page) + + cy.task("getConnection", { email: fakeUser.email }) + .then(({ conns }: any) => { + const conn = (conns as FileConnectionQuery[])[0] + expect(conn.limitPages).to.be.null + expect(conn.limitFiles).to.be.null + cy.checkIndexedFiles({ conn, source: "DROPBOX", files: [{ name: "invo.pdf", totalPages: 2 }, { name: "sample.pdf", totalPages: 0 }] }) + }) + }) }) diff --git a/db/schemas/connections.ts b/db/schemas/connections.ts index 6247850..cf54d45 100644 --- a/db/schemas/connections.ts +++ b/db/schemas/connections.ts @@ -34,6 +34,7 @@ export const connections = pgTable("connection", { limitFiles: integer("limit_files"), lastSynced: timestamp("last_synced", { withTimezone: true }), isSyncing: boolean("is_syncing").default(false).notNull(), + jobId: text("job_id"), isConfigSet: boolean("is_config_set").default(false).notNull(), createdAt: timestamp("createdAt", { withTimezone: true }).notNull().defaultNow(), }) diff --git a/drizzle/meta/_journal.json b/drizzle/meta/_journal.json index 8467fc4..c5704fa 100644 --- a/drizzle/meta/_journal.json +++ b/drizzle/meta/_journal.json @@ -71,6 +71,20 @@ "when": 1747198120772, "tag": "0009_regular_hydra", "breakpoints": true + }, + { + "idx": 10, + "version": "7", + "when": 1748097253159, + "tag": "0010_red_reptil", + "breakpoints": true + }, + { + "idx": 11, + "version": "7", + "when": 1748097469946, + "tag": "0011_keen_night_nurse", + "breakpoints": true } ] } \ No newline at end of file diff --git a/fileProcessors/connectors/index.ts b/fileProcessors/connectors/index.ts index 01b0ab8..23d40ec 100644 --- a/fileProcessors/connectors/index.ts +++ b/fileProcessors/connectors/index.ts @@ -84,7 +84,6 @@ export const setConnectionToProcess = async (formData: FormData): Promise conn.id === connectionId) if (connection && connection.limitPages) formData.set("pageLimit", connection.limitPages.toString()) diff --git a/fileProcessors/index.ts b/fileProcessors/index.ts index 3a3f726..d44b661 100644 --- a/fileProcessors/index.ts +++ b/fileProcessors/index.ts @@ -23,7 +23,7 @@ export type PageContent = { tables: unknown[] } -export const directProcessFiles = async ({ files, metadata, service, connectionId, links, pageLimit, fileLimit }: TQueue) => { +export const directProcessFiles = async ({ files, metadata, service, connectionId, links, pageLimit, fileLimit }: TQueue, checkCancel?: () => Promise) => { // Create promises for processing file URLs const filePromises = files.map(async (file) => { const arrayBuffer = Buffer.from(file.content, 'base64').buffer; @@ -64,7 +64,7 @@ export const directProcessFiles = async ({ files, metadata, service, connectionI if (pageLimit && pageLimit < currentPagesCount) { await databaseDrizzle .update(connections) - .set({ isSyncing: false }) + .set({ isSyncing: false, jobId: null }) .where(eq(connections.id, connectionId)) await publishProgress({ @@ -76,10 +76,10 @@ export const directProcessFiles = async ({ files, metadata, service, connectionI }) return; } - return await processFiles(filesContent, service, connectionId, pageLimit, fileLimit, currentPagesCount, connection.files.length) + return await processFiles(filesContent, service, connectionId, pageLimit, fileLimit, currentPagesCount, connection.files.length, checkCancel) } -export const connectionProcessFiles = async ({ connectionId, service, pageLimit, fileLimit }: TQueue) => { +export const connectionProcessFiles = async ({ connectionId, service, pageLimit, fileLimit }: TQueue, checkCancel?: () => Promise) => { const connection = await databaseDrizzle.query.connections.findFirst({ where: (c, ops) => ops.eq(c.id, connectionId), with: { @@ -109,7 +109,7 @@ export const connectionProcessFiles = async ({ connectionId, service, pageLimit, if (pageLimit && pageLimit < currentPagesCount) { await databaseDrizzle .update(connections) - .set({ isSyncing: false }) + .set({ isSyncing: false, jobId: null }) .where(eq(connections.id, connectionId)) await publishProgress({ @@ -121,15 +121,18 @@ export const connectionProcessFiles = async ({ connectionId, service, pageLimit, }) return; } - return processFiles(filesContent, service, connectionId, pageLimit, fileLimit, 0, 0) + return processFiles(filesContent, service, connectionId, pageLimit, fileLimit, 0, 0, checkCancel) } -const processFiles = async (filesContent: FileContent[], service: string, connectionId: string, pageLimit: number | null, fileLimit: number | null, currentPagesCount: number, currentFileCount: number) => { +const delay = (ms: number) => new Promise(res => setTimeout(res, ms)) + +const processFiles = async (filesContent: FileContent[], service: string, connectionId: string, pageLimit: number | null, fileLimit: number | null, currentPagesCount: number, currentFileCount: number, checkCancel?: () => Promise) => { const completedFiles: typeof processedFiles.$inferInsert[] = [] const allPoints = []; let processedPage = 0; let processedAllPages = currentPagesCount; let limits = pageLimit ? pageLimit - currentPagesCount : Infinity; + let shouldCancel = false; const now = new Date() try { const splitter = new RecursiveCharacterTextSplitter({ @@ -138,8 +141,7 @@ const processFiles = async (filesContent: FileContent[], service: string, connec keepSeparator: true, separators: ["\n\n## ", "\n\n# ", "\n\n", "\n", ". ", "! ", "? ", " "], }); - - for (let fileIndex = 0; fileIndex < filesContent.length && limits > 0; fileIndex++) { + fileLoop: for (let fileIndex = 0; fileIndex < filesContent.length && limits > 0; fileIndex++) { const file = filesContent[fileIndex] const chunksId = []; if (fileLimit !== null && fileLimit > 0 && fileIndex >= fileLimit) break; @@ -151,8 +153,14 @@ const processFiles = async (filesContent: FileContent[], service: string, connec }, }; for (let pageIndex = 0; pageIndex < file.pages.length && limits > 0; pageIndex++) { + if (checkCancel && await checkCancel()) { + shouldCancel = true; + break; + } + await delay(1000) + const page = file.pages[pageIndex] - if (limits <= 0) break; + if (limits <= 0 || shouldCancel) break; const textPoints = await processingTextPage(page.text, pageIndex, baseMetadata, splitter) if (textPoints) { allPoints.push(textPoints); @@ -185,6 +193,7 @@ const processFiles = async (filesContent: FileContent[], service: string, connec chunksIds: chunksId as string[], }) processedPage = 0 + if (limits <= 0 || shouldCancel) break fileLoop; } if (allPoints.length > 0) { @@ -209,6 +218,7 @@ const processFiles = async (filesContent: FileContent[], service: string, connec .set({ lastSynced: now, isSyncing: false, + jobId: null, }) .where(eq(connections.id, connectionId)) diff --git a/next.config.js b/next.config.js index 82544e2..5d165ae 100644 --- a/next.config.js +++ b/next.config.js @@ -1,6 +1,11 @@ /** @type {import('next').NextConfig} */ const nextConfig = { output: 'standalone', + experimental: { + serverActions: { + bodySizeLimit: '10mb' + } + } }; module.exports = nextConfig diff --git a/workers/queues/jobs/processFiles.job.ts b/workers/queues/jobs/processFiles.job.ts index 5d62cc7..38dd38d 100644 --- a/workers/queues/jobs/processFiles.job.ts +++ b/workers/queues/jobs/processFiles.job.ts @@ -1,4 +1,4 @@ -import { Queue, Worker } from "bullmq"; +import { Queue, Worker, Job } from "bullmq"; import { redisConnection } from "../../redis"; import { defaultQueueConfig } from "../config"; import { connectionProcessFiles, directProcessFiles } from "@/fileProcessors"; @@ -14,7 +14,7 @@ export type SerializedFile = { }; export type TQueue = { connectionId: string; - pageLimit: number| null, + pageLimit: number | null, fileLimit: number | null, files: SerializedFile[], links: string[], @@ -22,11 +22,6 @@ export type TQueue = { metadata: string | null, }; - -export const addToProcessFilesQueue = (data: TQueue) => { - return processfilesQueue.add(processFilesJobName, data) -}; - const processfilesQueue = new Queue(processFilesJobName, { connection: redisConnection, defaultJobOptions: { @@ -35,13 +30,35 @@ const processfilesQueue = new Queue(processFilesJobName, { } }); -new Worker(processFilesJobName, async ({ data }) => { - const { service }: TQueue = data + +new Worker(processFilesJobName, async (job: Job) => { + const isCancelled = async () => + (await redisConnection.get(`cancel-job:${job.id}`)) === '1'; + + const { service }: TQueue = job.data if (service === "DIRECT_UPLOAD") { - await directProcessFiles(data) + await processWithCancellation(directProcessFiles, job, isCancelled) } else { - await connectionProcessFiles(data) + await processWithCancellation(connectionProcessFiles, job, isCancelled); } + + await redisConnection.del(`cancel-job:${job.id}`); }, { connection: redisConnection }); + +export const addToProcessFilesQueue = async (data: TQueue) => { + const newJob = await processfilesQueue.add(processFilesJobName, data) + return newJob.id +}; + +/** + * Wraps a processing function to inject cancellation checks. + */ +async function processWithCancellation( + f: (data: any, checkCancel: () => Promise) => Promise, + job: Job, + checkCancel: () => Promise +) { + await f(job.data, checkCancel); +}