From 4e66f2976f1fa21bbef4c9a49360bb521dfe1126 Mon Sep 17 00:00:00 2001 From: "Thiago M. Pinto" Date: Sun, 30 Nov 2025 13:36:40 +0000 Subject: [PATCH 1/7] feat: implement feed status toggling and bulk actions - Added functionality to toggle individual feed status in FeedDetailsView using a new hook, `useToggleFeedStatus`. - Enhanced OverviewView to support bulk enabling/disabling of selected feeds with `useBulkToggleFeedStatus`. - Updated feed fetching logic to skip disabled feeds and adjusted UI to reflect health status accurately. - Introduced new API endpoints for toggling feed status and bulk operations, improving user interaction with feed management. These changes enhance user experience by providing more control over feed statuses and improving the overall management interface. --- app/api/feeds/[id]/status/route.ts | 37 +++++++++++ app/api/feeds/bulk/status/route.ts | 51 +++++++++++++++ .../components/modals/BulkEditModal.tsx | 52 ++++++++++----- .../components/views/FeedDetailsView.tsx | 63 +++++++++++++++---- .../components/views/OverviewView.tsx | 31 +++++++-- src/hooks/queries/use-feeds.ts | 48 ++++++++++++++ src/lib/jobs/feed-refresh-job.ts | 8 ++- src/lib/services/feed-health-service.ts | 27 ++++++++ src/lib/services/feed-service.ts | 9 +++ 9 files changed, 294 insertions(+), 32 deletions(-) create mode 100644 app/api/feeds/[id]/status/route.ts create mode 100644 app/api/feeds/bulk/status/route.ts diff --git a/app/api/feeds/[id]/status/route.ts b/app/api/feeds/[id]/status/route.ts new file mode 100644 index 0000000..4f261b8 --- /dev/null +++ b/app/api/feeds/[id]/status/route.ts @@ -0,0 +1,37 @@ +import { enableFeed, disableFeed } from "@/lib/services/feed-health-service"; +import { createHandler } from "@/lib/api-handler"; +import { z } from "zod"; + +export const dynamic = "force-dynamic"; + +const bodySchema = z.object({ + enabled: z.boolean(), +}); + +/** + * PUT /api/feeds/:id/status + * Enable or disable a feed + */ +export const PUT = createHandler( + async ({ params, body }) => { + const { id } = params; + const { enabled } = body; + + if (!id || typeof id !== "string") { + return { error: "Feed ID is required", status: 400 }; + } + + if (enabled) { + await enableFeed(id); + } else { + await disableFeed(id); + } + + return { + success: true, + message: `Feed ${enabled ? "enabled" : "disabled"} successfully`, + data: { enabled }, + }; + }, + { bodySchema, requireAuth: true } +); diff --git a/app/api/feeds/bulk/status/route.ts b/app/api/feeds/bulk/status/route.ts new file mode 100644 index 0000000..b9f718c --- /dev/null +++ b/app/api/feeds/bulk/status/route.ts @@ -0,0 +1,51 @@ +import { enableFeed, disableFeed } from "@/lib/services/feed-health-service"; +import { createHandler } from "@/lib/api-handler"; +import { z } from "zod"; + +export const dynamic = "force-dynamic"; + +const bodySchema = z.object({ + feedIds: z.array(z.string()), + enabled: z.boolean(), +}); + +/** + * POST /api/feeds/bulk/status + * Enable or disable multiple feeds at once + */ +export const POST = createHandler( + async ({ body }) => { + const { feedIds, enabled } = body; + + if (!feedIds || feedIds.length === 0) { + return { error: "Feed IDs are required", status: 400 }; + } + + // Use Promise.allSettled to handle partial failures + const results = await Promise.allSettled( + feedIds.map(async (feedId) => { + if (enabled) { + await enableFeed(feedId); + } else { + await disableFeed(feedId); + } + return feedId; + }) + ); + + const successful = results.filter((r) => r.status === "fulfilled").length; + const failed = results.filter((r) => r.status === "rejected").length; + + return { + success: true, + message: `${enabled ? "Enabled" : "Disabled"} ${successful} of ${feedIds.length} feeds${failed > 0 ? ` (${failed} failed)` : ""}`, + data: { + total: feedIds.length, + successful, + failed, + results, + }, + }; + }, + { bodySchema, requireAuth: true } +); diff --git a/app/feeds-management/components/modals/BulkEditModal.tsx b/app/feeds-management/components/modals/BulkEditModal.tsx index 0a581ef..aa95c87 100644 --- a/app/feeds-management/components/modals/BulkEditModal.tsx +++ b/app/feeds-management/components/modals/BulkEditModal.tsx @@ -1,8 +1,10 @@ "use client"; import { useState } from "react"; +import { toast } from "sonner"; import { Modal, ModalHeader, ModalBody, ModalFooter, Button } from "@/app/components/ui"; import { useCategories } from "@/hooks/queries/use-categories"; +import { useBulkToggleFeedStatus } from "@/hooks/queries/use-feeds"; interface BulkEditModalProps { selectedFeedIds: string[]; @@ -21,25 +23,41 @@ interface BulkEditModalProps { */ export function BulkEditModal({ selectedFeedIds, onClose }: BulkEditModalProps) { const { data: categories = [] } = useCategories(); + const bulkToggleStatus = useBulkToggleFeedStatus(); const [action, setAction] = useState<"category" | "tags" | "enable" | "settings">("category"); const [newCategory, setNewCategory] = useState(""); const [tagsAction, setTagsAction] = useState<"add" | "remove" | "replace">("add"); const [tags, setTags] = useState(""); const [enableFeeds, setEnableFeeds] = useState(true); - const handleApply = () => { - const changes = { - feedIds: selectedFeedIds, - action, - data: { - category: action === "category" ? newCategory : undefined, - tags: action === "tags" ? { action: tagsAction, tags: tags.split(",").map(t => t.trim()).filter(Boolean) } : undefined, - enabled: action === "enable" ? enableFeeds : undefined, - }, - }; - // TODO: Implement bulk edit logic - console.log("Applying bulk changes:", changes); - onClose(); + const handleApply = async () => { + try { + if (action === "enable") { + // Handle enable/disable action + const result = await bulkToggleStatus.mutateAsync({ + feedIds: selectedFeedIds, + enabled: enableFeeds, + }); + toast.success(result.message || `Feeds ${enableFeeds ? "enabled" : "disabled"} successfully`); + onClose(); + } else { + // TODO: Implement other bulk edit actions + const changes = { + feedIds: selectedFeedIds, + action, + data: { + category: action === "category" ? newCategory : undefined, + tags: action === "tags" ? { action: tagsAction, tags: tags.split(",").map(t => t.trim()).filter(Boolean) } : undefined, + }, + }; + console.log("Applying bulk changes:", changes); + toast.info("This action is not yet implemented"); + onClose(); + } + } catch (error) { + console.error("Failed to apply bulk changes:", error); + toast.error("Failed to apply changes"); + } }; return ( @@ -238,8 +256,12 @@ export function BulkEditModal({ selectedFeedIds, onClose }: BulkEditModalProps) - diff --git a/app/feeds-management/components/views/FeedDetailsView.tsx b/app/feeds-management/components/views/FeedDetailsView.tsx index 6d2f6cd..a5df12e 100644 --- a/app/feeds-management/components/views/FeedDetailsView.tsx +++ b/app/feeds-management/components/views/FeedDetailsView.tsx @@ -2,7 +2,7 @@ import { useState } from "react"; import { useFeedNavigation } from "@/hooks/use-feed-navigation"; -import { useUserFeeds } from "@/hooks/queries/use-feeds"; +import { useUserFeeds, useToggleFeedStatus } from "@/hooks/queries/use-feeds"; import { useCategories } from "@/hooks/queries/use-categories"; interface FeedDetailsViewProps { @@ -120,6 +120,25 @@ export function FeedDetailsView({ feedId }: FeedDetailsViewProps) { // Tab Components function BasicSettingsTab({ feed, categories }: { feed: any; categories: any[] }) { + const toggleStatus = useToggleFeedStatus(); + const [isEnabled, setIsEnabled] = useState(feed.isActive ?? true); + + const handleToggle = async () => { + const newStatus = !isEnabled; + setIsEnabled(newStatus); // Optimistic update + + try { + await toggleStatus.mutateAsync({ + feedId: feed.id, + enabled: newStatus + }); + } catch (error) { + // Revert on error + setIsEnabled(!newStatus); + console.error("Failed to toggle feed status:", error); + } + }; + return (
@@ -177,16 +196,38 @@ function BasicSettingsTab({ feed, categories }: { feed: any; categories: any[] }

-
- - + {/* Feed Status Section */} +
+
+ +

+ When disabled, this feed will not be fetched during updates +

+ {feed.healthStatus && feed.healthStatus !== "healthy" && ( +

+ Current status: {feed.healthStatus} + {feed.consecutiveFailures > 0 && ` (${feed.consecutiveFailures} failures)`} +

+ )} +
+
); diff --git a/app/feeds-management/components/views/OverviewView.tsx b/app/feeds-management/components/views/OverviewView.tsx index 0d4a36c..6e38fe3 100644 --- a/app/feeds-management/components/views/OverviewView.tsx +++ b/app/feeds-management/components/views/OverviewView.tsx @@ -111,7 +111,7 @@ export function OverviewView() { {selectedFeedIds.length} selected +
+

+ Note: Garbage collection requires Node.js to be started with the + --expose-gc flag. +

+ + + + ); +} diff --git a/app/admin/dashboard/hooks/use-tab-navigation.ts b/app/admin/dashboard/hooks/use-tab-navigation.ts index a7d30cf..5929be4 100644 --- a/app/admin/dashboard/hooks/use-tab-navigation.ts +++ b/app/admin/dashboard/hooks/use-tab-navigation.ts @@ -3,7 +3,7 @@ import { useState, useEffect, useCallback } from "react"; import { useRouter, useSearchParams } from "next/navigation"; -export type TabId = "overview" | "search" | "users" | "jobs" | "storage" | "config" | "llm-config"; +export type TabId = "overview" | "search" | "users" | "jobs" | "storage" | "config" | "llm-config" | "memory"; const FAVORITE_TABS_KEY = "admin-favorite-tabs"; diff --git a/app/admin/dashboard/page.tsx b/app/admin/dashboard/page.tsx index 9180445..fea5760 100644 --- a/app/admin/dashboard/page.tsx +++ b/app/admin/dashboard/page.tsx @@ -20,6 +20,7 @@ import { JobsTab } from "./components/tabs/JobsTab"; import { StorageTab } from "./components/tabs/StorageTab"; import { ConfigTab } from "./components/tabs/ConfigTab"; import { LLMConfigTab } from "./components/tabs/LLMConfigTab"; +import { MemoryTab } from "./components/tabs/MemoryTab"; // Custom hooks import { useDashboardData } from "./hooks/use-dashboard-data"; @@ -203,6 +204,20 @@ export default function AdminDashboardPage() { ), }, + { + id: "memory" as const, + label: "Memory", + icon: ( + + + + ), + }, ]; return ( @@ -272,6 +287,8 @@ export default function AdminDashboardPage() { {activeTab === "config" && } {activeTab === "llm-config" && } + + {activeTab === "memory" && } diff --git a/app/api/admin/memory/route.ts b/app/api/admin/memory/route.ts new file mode 100644 index 0000000..ff0c7b0 --- /dev/null +++ b/app/api/admin/memory/route.ts @@ -0,0 +1,229 @@ +/** + * Memory Monitoring API + * + * GET /api/admin/memory - Get current memory stats and history + * POST /api/admin/memory - Trigger memory management actions (GC, etc.) + */ + +import { createHandler } from "@/lib/api-handler"; +import { memoryMonitor, formatBytes, getMemoryUsagePercent } from "@/lib/memory-monitor"; +import { prisma } from "@/lib/db"; +import { z } from "zod"; + +/** + * GET /api/admin/memory + * Returns current memory statistics and history + */ +export const GET = createHandler( + async ({ session }) => { + // Check admin role + const userRole = await prisma.user.findUnique({ + where: { id: session!.user.id }, + select: { role: true }, + }); + + if (userRole?.role !== "ADMIN") { + return { + error: "Forbidden: Admin access required", + status: 403, + }; + } + + const summary = memoryMonitor.getSummary(); + + // Calculate trends + const history = summary.history.stats; + const trend = calculateTrend(history); + + // Format for client + const formatted = { + current: { + rss: formatBytes(summary.current.rss), + rssMB: (summary.current.rss / 1024 / 1024).toFixed(2), + heapTotal: formatBytes(summary.current.heapTotal), + heapTotalMB: (summary.current.heapTotal / 1024 / 1024).toFixed(2), + heapUsed: formatBytes(summary.current.heapUsed), + heapUsedMB: (summary.current.heapUsed / 1024 / 1024).toFixed(2), + heapUsedPercent: getMemoryUsagePercent(summary.current).toFixed(2), + external: formatBytes(summary.current.external), + arrayBuffers: formatBytes(summary.current.arrayBuffers), + timestamp: summary.current.timestamp, + }, + pressure: summary.pressure, + history: { + samples: history.map((stat) => ({ + timestamp: stat.timestamp, + heapUsedMB: (stat.heapUsed / 1024 / 1024).toFixed(2), + heapTotalMB: (stat.heapTotal / 1024 / 1024).toFixed(2), + rssMB: (stat.rss / 1024 / 1024).toFixed(2), + heapUsedPercent: getMemoryUsagePercent(stat).toFixed(2), + })), + maxSamples: summary.history.maxSamples, + startTime: summary.history.startTime, + }, + trend, + uptime: summary.uptime, + uptimeFormatted: formatUptime(summary.uptime), + monitoring: { + enabled: memoryMonitor.isRunning(), + }, + }; + + return { data: formatted }; + }, + { + requireAuth: true, + } +); + +/** + * POST /api/admin/memory + * Trigger memory management actions + */ +const actionSchema = z.object({ + action: z.enum(["force-gc", "start-monitor", "stop-monitor"]), +}); + +export const POST = createHandler( + async ({ body, session }) => { + // Check admin role + const userRole = await prisma.user.findUnique({ + where: { id: session!.user.id }, + select: { role: true }, + }); + + if (userRole?.role !== "ADMIN") { + return { + error: "Forbidden: Admin access required", + status: 403, + }; + } + + const { action } = body; + + switch (action) { + case "force-gc": + const gcSuccess = memoryMonitor.forceGC(); + if (gcSuccess) { + // Get memory stats after GC + const afterGC = memoryMonitor.getCurrentStats(); + return { + data: { + success: true, + message: "Garbage collection triggered", + heapUsedMB: (afterGC.heapUsed / 1024 / 1024).toFixed(2), + heapUsedPercent: getMemoryUsagePercent(afterGC).toFixed(2), + }, + }; + } else { + return { + error: "Garbage collection not available (requires --expose-gc flag)", + status: 500, + }; + } + + case "start-monitor": + if (memoryMonitor.isRunning()) { + return { + data: { + success: false, + message: "Memory monitor already running", + }, + }; + } + memoryMonitor.start(); + return { + data: { + success: true, + message: "Memory monitor started", + }, + }; + + case "stop-monitor": + if (!memoryMonitor.isRunning()) { + return { + data: { + success: false, + message: "Memory monitor not running", + }, + }; + } + memoryMonitor.stop(); + return { + data: { + success: true, + message: "Memory monitor stopped", + }, + }; + + default: + return { + error: "Invalid action", + status: 400, + }; + } + }, + { + bodySchema: actionSchema, + requireAuth: true, + } +); + +/** + * Calculate memory usage trend + */ +function calculateTrend(history: Array<{ heapUsed: number; heapTotal: number; timestamp: number }>) { + if (history.length < 2) { + return { direction: "stable", change: 0 }; + } + + // Compare last 5 samples with previous 5 samples + const recentCount = Math.min(5, Math.floor(history.length / 2)); + const recent = history.slice(-recentCount); + const previous = history.slice(-recentCount * 2, -recentCount); + + if (previous.length === 0) { + return { direction: "stable", change: 0 }; + } + + // Calculate percentage directly for partial memory stats + const calcPercent = (s: { heapUsed: number; heapTotal: number }) => (s.heapUsed / s.heapTotal) * 100; + + const recentAvg = recent.reduce((sum, s) => sum + calcPercent(s), 0) / recent.length; + const previousAvg = previous.reduce((sum, s) => sum + calcPercent(s), 0) / previous.length; + + const change = recentAvg - previousAvg; + + let direction: "increasing" | "decreasing" | "stable" = "stable"; + if (change > 2) { + direction = "increasing"; + } else if (change < -2) { + direction = "decreasing"; + } + + return { + direction, + change: change.toFixed(2), + }; +} + +/** + * Format uptime to human-readable string + */ +function formatUptime(ms: number): string { + const seconds = Math.floor(ms / 1000); + const minutes = Math.floor(seconds / 60); + const hours = Math.floor(minutes / 60); + const days = Math.floor(hours / 24); + + if (days > 0) { + return `${days}d ${hours % 24}h ${minutes % 60}m`; + } + if (hours > 0) { + return `${hours}h ${minutes % 60}m`; + } + if (minutes > 0) { + return `${minutes}m ${seconds % 60}s`; + } + return `${seconds}s`; +} diff --git a/instrumentation.ts b/instrumentation.ts index 6caaeb8..94fa499 100644 --- a/instrumentation.ts +++ b/instrumentation.ts @@ -28,12 +28,20 @@ export async function register() { console.log("[Instrumentation] Transformers.js configured for WASM-only backend"); console.log("[Instrumentation] Running in Node.js runtime, initializing scheduler..."); - + const { initializeScheduler } = await import("./src/lib/jobs/scheduler"); - + const { registerShutdownHandlers } = await import("./src/lib/process-manager"); + const { memoryMonitor } = await import("./src/lib/memory-monitor"); + // Initialize cron jobs initializeScheduler(); - + + // Register graceful shutdown handlers + registerShutdownHandlers(); + + // Start memory monitoring + memoryMonitor.start(); + console.log("[Instrumentation] Scheduler initialization complete"); } else { console.log("[Instrumentation] Not in Node.js runtime, skipping scheduler initialization"); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index af73c8e..87cc21b 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -120,6 +120,16 @@ model CronJobRun { @@map("cron_job_runs") } +model JobLock { + jobName String @id + lockedAt DateTime + lockedBy String // Process ID + expiresAt DateTime // Auto-expire after timeout + + @@index([expiresAt]) + @@map("job_locks") +} + model feed_categories { feedId String categoryId String diff --git a/src/env.ts b/src/env.ts index 423035c..310202b 100644 --- a/src/env.ts +++ b/src/env.ts @@ -11,6 +11,12 @@ export const env = createEnv({ NODE_ENV: z .enum(["development", "test", "production"]) .default("development"), + + // Database connection pool configuration + DB_POOL_MAX: z.coerce.number().default(10), + DB_POOL_MIN: z.coerce.number().default(2), + DB_IDLE_TIMEOUT: z.coerce.number().default(30000), + DB_CONNECTION_TIMEOUT: z.coerce.number().default(5000), // Authentication configuration NEXTAUTH_URL: z.string().url().optional(), @@ -45,6 +51,7 @@ export const env = createEnv({ .default("local"), EMBEDDING_MODEL: z.string().default("text-embedding-3-small"), EMBEDDING_BATCH_SIZE: z.coerce.number().default(10), + EMBEDDING_STREAM_BATCH_SIZE: z.coerce.number().default(25), // Streaming batch size for memory efficiency EMBEDDING_AUTO_GENERATE: z .enum(["true", "false"]) .default("false") @@ -83,8 +90,21 @@ export const env = createEnv({ .enum(["true", "false"]) .default("true") .transform((val) => val === "true"), - FEED_REFRESH_SCHEDULE: z.string().default("*/30 * * * *"), // Every 30 minutes + FEED_REFRESH_SCHEDULE: z.string().default("*/5 * * * *"), // Every 5 minutes + FEED_REFRESH_CONCURRENCY: z.coerce.number().default(3), // Max concurrent feed refreshes + SAVED_SEARCH_CONCURRENCY: z.coerce.number().default(5), // Max concurrent saved search matching + JOB_LOCK_TIMEOUT: z.coerce.number().default(600000), // Job lock timeout (10 minutes) CLEANUP_SCHEDULE: z.string().default("0 3 * * *"), // Daily at 3 AM + + // Memory monitoring configuration + ENABLE_MEMORY_MONITORING: z + .enum(["true", "false"]) + .default("true") + .transform((val) => val === "true"), + MEMORY_MONITOR_INTERVAL: z.coerce.number().default(10000), // Check every 10 seconds + MEMORY_MODERATE_THRESHOLD: z.coerce.number().default(70), // 70% heap usage + MEMORY_HIGH_THRESHOLD: z.coerce.number().default(85), // 85% heap usage + MEMORY_CRITICAL_THRESHOLD: z.coerce.number().default(95), // 95% heap usage }, /** @@ -103,7 +123,13 @@ export const env = createEnv({ runtimeEnv: { DATABASE_URL: process.env.DATABASE_URL, NODE_ENV: process.env.NODE_ENV, - + + // Database connection pool + DB_POOL_MAX: process.env.DB_POOL_MAX, + DB_POOL_MIN: process.env.DB_POOL_MIN, + DB_IDLE_TIMEOUT: process.env.DB_IDLE_TIMEOUT, + DB_CONNECTION_TIMEOUT: process.env.DB_CONNECTION_TIMEOUT, + // Authentication NEXTAUTH_URL: process.env.NEXTAUTH_URL, NEXTAUTH_SECRET: process.env.NEXTAUTH_SECRET, @@ -126,6 +152,7 @@ export const env = createEnv({ EMBEDDING_PROVIDER: process.env.EMBEDDING_PROVIDER, EMBEDDING_MODEL: process.env.EMBEDDING_MODEL, EMBEDDING_BATCH_SIZE: process.env.EMBEDDING_BATCH_SIZE, + EMBEDDING_STREAM_BATCH_SIZE: process.env.EMBEDDING_STREAM_BATCH_SIZE, EMBEDDING_AUTO_GENERATE: process.env.EMBEDDING_AUTO_GENERATE, // Content Extraction @@ -148,7 +175,17 @@ export const env = createEnv({ // Cron job configuration ENABLE_CRON_JOBS: process.env.ENABLE_CRON_JOBS, FEED_REFRESH_SCHEDULE: process.env.FEED_REFRESH_SCHEDULE, + FEED_REFRESH_CONCURRENCY: process.env.FEED_REFRESH_CONCURRENCY, + SAVED_SEARCH_CONCURRENCY: process.env.SAVED_SEARCH_CONCURRENCY, + JOB_LOCK_TIMEOUT: process.env.JOB_LOCK_TIMEOUT, CLEANUP_SCHEDULE: process.env.CLEANUP_SCHEDULE, + + // Memory monitoring configuration + ENABLE_MEMORY_MONITORING: process.env.ENABLE_MEMORY_MONITORING, + MEMORY_MONITOR_INTERVAL: process.env.MEMORY_MONITOR_INTERVAL, + MEMORY_MODERATE_THRESHOLD: process.env.MEMORY_MODERATE_THRESHOLD, + MEMORY_HIGH_THRESHOLD: process.env.MEMORY_HIGH_THRESHOLD, + MEMORY_CRITICAL_THRESHOLD: process.env.MEMORY_CRITICAL_THRESHOLD, // NEXT_PUBLIC_APP_URL: process.env.NEXT_PUBLIC_APP_URL, }, /** diff --git a/src/lib/db.ts b/src/lib/db.ts index 03be4e5..6be3ef8 100644 --- a/src/lib/db.ts +++ b/src/lib/db.ts @@ -11,7 +11,13 @@ const globalForPrisma = globalThis as unknown as { // Create connection pool (reuse in development) export const pool = globalForPrisma.pool ?? - new Pool({ connectionString: env.DATABASE_URL }); + new Pool({ + connectionString: env.DATABASE_URL, + max: env.DB_POOL_MAX, + min: env.DB_POOL_MIN, + idleTimeoutMillis: env.DB_IDLE_TIMEOUT, + connectionTimeoutMillis: env.DB_CONNECTION_TIMEOUT, + }); // Create adapter const adapter = new PrismaPg(pool); diff --git a/src/lib/jobs/job-executor.ts b/src/lib/jobs/job-executor.ts index dc5697f..47d3648 100644 --- a/src/lib/jobs/job-executor.ts +++ b/src/lib/jobs/job-executor.ts @@ -4,6 +4,7 @@ import { prisma } from "@/lib/db"; import { logger } from "@/lib/logger"; +import { env } from "@/env"; import type { CronJobStatus, CronJobTrigger } from "@/generated/prisma/client"; import type { LogEntry } from "./job-logger"; @@ -105,7 +106,51 @@ export async function executeTrackedJob( } /** - * Create a job executor with concurrency control + * Acquire a distributed lock for a job + * @param jobName - Name of the job to lock + * @param timeout - Lock timeout in milliseconds + * @returns true if lock acquired, false if already locked + */ +async function acquireLock(jobName: string, timeout: number = 600000): Promise { + const processId = `${process.pid}-${Date.now()}`; + const expiresAt = new Date(Date.now() + timeout); + + try { + await prisma.jobLock.create({ + data: { jobName, lockedAt: new Date(), lockedBy: processId, expiresAt } + }); + return true; + } catch (error) { + // Lock exists, check if expired + const existingLock = await prisma.jobLock.findUnique({ + where: { jobName } + }); + + if (existingLock && existingLock.expiresAt < new Date()) { + // Expired, take over + await prisma.jobLock.update({ + where: { jobName }, + data: { lockedAt: new Date(), lockedBy: processId, expiresAt } + }); + return true; + } + + return false; + } +} + +/** + * Release a distributed lock for a job + * @param jobName - Name of the job to unlock + */ +async function releaseLock(jobName: string): Promise { + await prisma.jobLock.delete({ where: { jobName } }).catch(() => { + // Ignore errors if lock doesn't exist + }); +} + +/** + * Create a job executor with concurrency control (in-memory + distributed locking) */ export function createJobExecutor(jobName: string) { let isRunning = false; @@ -114,8 +159,16 @@ export function createJobExecutor(jobName: string) { handler: () => Promise>, triggeredBy: CronJobTrigger = "SCHEDULER" ): Promise { + // In-memory check (fast path for same process) if (isRunning) { - logger.info(`Job already running, skipping: ${jobName}`); + logger.info(`Job already running (in-memory), skipping: ${jobName}`); + return; + } + + // Distributed lock check (prevents overlap across processes/restarts) + const acquired = await acquireLock(jobName, env.JOB_LOCK_TIMEOUT); + if (!acquired) { + logger.info(`Job locked by another process, skipping: ${jobName}`); return; } @@ -124,6 +177,7 @@ export function createJobExecutor(jobName: string) { await executeTrackedJob(handler, { jobName, triggeredBy }); } finally { isRunning = false; + await releaseLock(jobName); } }; } diff --git a/src/lib/memory-monitor.ts b/src/lib/memory-monitor.ts new file mode 100644 index 0000000..1bdc77b --- /dev/null +++ b/src/lib/memory-monitor.ts @@ -0,0 +1,264 @@ +/** + * Memory Monitor + * + * Real-time memory monitoring with pressure detection and automatic throttling. + * Emits events when memory thresholds are crossed. + */ + +import { EventEmitter } from 'events'; +import { logger } from '@/lib/logger'; +import { env } from '@/env'; + +export interface MemoryStats { + rss: number; // Resident Set Size (total memory) + heapTotal: number; // Total heap allocated + heapUsed: number; // Heap currently used + external: number; // C++ objects bound to JS + arrayBuffers: number; // ArrayBuffers and SharedArrayBuffers + timestamp: number; +} + +export interface MemoryPressure { + level: 'normal' | 'moderate' | 'high' | 'critical'; + heapUsedPercent: number; + rssUsedMB: number; + recommendation: string; +} + +export interface MemoryHistory { + stats: MemoryStats[]; + maxSamples: number; + startTime: number; +} + +class MemoryMonitor extends EventEmitter { + private intervalId: NodeJS.Timeout | null = null; + private history: MemoryStats[] = []; + private readonly maxHistorySamples = 60; // Keep last 60 samples (10 minutes at 10s interval) + private readonly startTime = Date.now(); + private currentPressure: MemoryPressure['level'] = 'normal'; + + /** + * Start monitoring memory usage + */ + start(): void { + if (this.intervalId) { + logger.warn('Memory monitor already running'); + return; + } + + if (!env.ENABLE_MEMORY_MONITORING) { + logger.info('Memory monitoring disabled via ENABLE_MEMORY_MONITORING'); + return; + } + + const interval = env.MEMORY_MONITOR_INTERVAL; + + logger.info('Starting memory monitor', { + interval: `${interval}ms`, + thresholds: { + moderate: `${env.MEMORY_MODERATE_THRESHOLD}%`, + high: `${env.MEMORY_HIGH_THRESHOLD}%`, + critical: `${env.MEMORY_CRITICAL_THRESHOLD}%`, + }, + }); + + this.intervalId = setInterval(() => { + this.checkMemory(); + }, interval); + + // Initial check + this.checkMemory(); + } + + /** + * Stop monitoring + */ + stop(): void { + if (this.intervalId) { + clearInterval(this.intervalId); + this.intervalId = null; + logger.info('Memory monitor stopped'); + } + } + + /** + * Check current memory usage and emit events + */ + private checkMemory(): void { + const memUsage = process.memoryUsage(); + const stats: MemoryStats = { + rss: memUsage.rss, + heapTotal: memUsage.heapTotal, + heapUsed: memUsage.heapUsed, + external: memUsage.external, + arrayBuffers: memUsage.arrayBuffers, + timestamp: Date.now(), + }; + + // Add to history + this.history.push(stats); + if (this.history.length > this.maxHistorySamples) { + this.history.shift(); + } + + // Calculate pressure + const pressure = this.calculatePressure(stats); + + // Emit pressure change event if level changed + if (pressure.level !== this.currentPressure) { + const previousLevel = this.currentPressure; + this.currentPressure = pressure.level; + + logger.warn('Memory pressure level changed', { + from: previousLevel, + to: pressure.level, + heapUsedPercent: pressure.heapUsedPercent.toFixed(1), + rssMB: (pressure.rssUsedMB).toFixed(0), + recommendation: pressure.recommendation, + }); + + this.emit('pressure-change', pressure, previousLevel); + } + + // Emit periodic stats + this.emit('stats', stats, pressure); + + // Log warnings for non-normal pressure + if (pressure.level !== 'normal') { + logger.warn('Memory pressure detected', { + level: pressure.level, + heapUsedPercent: pressure.heapUsedPercent.toFixed(1), + heapUsedMB: (stats.heapUsed / 1024 / 1024).toFixed(0), + heapTotalMB: (stats.heapTotal / 1024 / 1024).toFixed(0), + rssMB: (stats.rss / 1024 / 1024).toFixed(0), + recommendation: pressure.recommendation, + }); + } + } + + /** + * Calculate memory pressure level + */ + private calculatePressure(stats: MemoryStats): MemoryPressure { + const heapUsedPercent = (stats.heapUsed / stats.heapTotal) * 100; + const rssUsedMB = stats.rss / 1024 / 1024; + + let level: MemoryPressure['level'] = 'normal'; + let recommendation = 'Memory usage is normal'; + + if (heapUsedPercent >= env.MEMORY_CRITICAL_THRESHOLD) { + level = 'critical'; + recommendation = 'CRITICAL: Reduce concurrent operations immediately. Consider restarting.'; + } else if (heapUsedPercent >= env.MEMORY_HIGH_THRESHOLD) { + level = 'high'; + recommendation = 'HIGH: Reduce feed refresh concurrency and batch sizes.'; + } else if (heapUsedPercent >= env.MEMORY_MODERATE_THRESHOLD) { + level = 'moderate'; + recommendation = 'MODERATE: Monitor closely. Consider reducing batch sizes.'; + } + + return { + level, + heapUsedPercent, + rssUsedMB, + recommendation, + }; + } + + /** + * Get current memory stats + */ + getCurrentStats(): MemoryStats { + const memUsage = process.memoryUsage(); + return { + rss: memUsage.rss, + heapTotal: memUsage.heapTotal, + heapUsed: memUsage.heapUsed, + external: memUsage.external, + arrayBuffers: memUsage.arrayBuffers, + timestamp: Date.now(), + }; + } + + /** + * Get current memory pressure + */ + getCurrentPressure(): MemoryPressure { + const stats = this.getCurrentStats(); + return this.calculatePressure(stats); + } + + /** + * Get memory history + */ + getHistory(): MemoryHistory { + return { + stats: [...this.history], + maxSamples: this.maxHistorySamples, + startTime: this.startTime, + }; + } + + /** + * Get memory statistics summary + */ + getSummary(): { + current: MemoryStats; + pressure: MemoryPressure; + history: MemoryHistory; + uptime: number; + } { + const current = this.getCurrentStats(); + const pressure = this.calculatePressure(current); + const history = this.getHistory(); + + return { + current, + pressure, + history, + uptime: Date.now() - this.startTime, + }; + } + + /** + * Force garbage collection if available + */ + forceGC(): boolean { + if (global.gc) { + logger.info('Forcing garbage collection'); + global.gc(); + return true; + } + logger.warn('Garbage collection not available (run with --expose-gc)'); + return false; + } + + /** + * Check if monitoring is active + */ + isRunning(): boolean { + return this.intervalId !== null; + } +} + +// Singleton instance +export const memoryMonitor = new MemoryMonitor(); + +/** + * Format bytes to human-readable string + */ +export function formatBytes(bytes: number): string { + if (bytes === 0) return '0 B'; + const k = 1024; + const sizes = ['B', 'KB', 'MB', 'GB']; + const i = Math.floor(Math.log(bytes) / Math.log(k)); + return `${(bytes / Math.pow(k, i)).toFixed(2)} ${sizes[i]}`; +} + +/** + * Get memory usage percentage + */ +export function getMemoryUsagePercent(stats: MemoryStats): number { + return (stats.heapUsed / stats.heapTotal) * 100; +} diff --git a/src/lib/process-manager.ts b/src/lib/process-manager.ts new file mode 100644 index 0000000..af394d0 --- /dev/null +++ b/src/lib/process-manager.ts @@ -0,0 +1,160 @@ +/** + * Process Lifecycle Manager + * + * Handles graceful shutdown of the application by: + * - Stopping cron jobs + * - Waiting for active jobs to complete + * - Closing database connections + * - Releasing resources + */ + +import { logger } from '@/lib/logger'; +import { prisma, pool } from '@/lib/db'; +import { stopFeedRefreshScheduler } from '@/lib/jobs/feed-refresh-job'; + +let isShuttingDown = false; +const SHUTDOWN_TIMEOUT = 30000; // 30 seconds max wait time + +/** + * Gracefully shutdown the application + * + * @param signal - The signal that triggered the shutdown (e.g., 'SIGTERM', 'SIGINT') + */ +export async function gracefulShutdown(signal: string): Promise { + // Prevent multiple shutdown attempts + if (isShuttingDown) { + logger.warn(`Shutdown already in progress, ignoring ${signal}`); + return; + } + + isShuttingDown = true; + logger.info(`Received ${signal}, starting graceful shutdown...`); + + const shutdownStart = Date.now(); + + try { + // Step 1: Stop accepting new work (stop cron schedulers) + logger.info('Stopping cron schedulers...'); + try { + stopFeedRefreshScheduler(); + logger.info('Cron schedulers stopped'); + } catch (error) { + logger.error('Error stopping schedulers', { error }); + } + + // Step 2: Wait for active jobs to complete (with timeout) + logger.info('Waiting for active jobs to complete...'); + const jobsCompleted = await waitForActiveJobs(SHUTDOWN_TIMEOUT); + + if (jobsCompleted) { + logger.info('All active jobs completed'); + } else { + logger.warn('Some jobs did not complete within timeout, proceeding with shutdown'); + } + + // Step 3: Close database connections + logger.info('Closing database connections...'); + try { + await prisma.$disconnect(); + logger.info('Prisma client disconnected'); + } catch (error) { + logger.error('Error disconnecting Prisma', { error }); + } + + try { + await pool.end(); + logger.info('PostgreSQL connection pool closed'); + } catch (error) { + logger.error('Error closing connection pool', { error }); + } + + const shutdownDuration = Date.now() - shutdownStart; + logger.info(`Graceful shutdown completed in ${shutdownDuration}ms`); + + // Exit successfully + process.exit(0); + } catch (error) { + logger.error('Error during graceful shutdown', { error }); + + // Force exit on error + process.exit(1); + } +} + +/** + * Wait for active jobs to complete + * + * @param timeout - Maximum time to wait in milliseconds + * @returns true if all jobs completed, false if timeout reached + */ +async function waitForActiveJobs(timeout: number): Promise { + const startTime = Date.now(); + const checkInterval = 1000; // Check every 1 second + + while (Date.now() - startTime < timeout) { + try { + // Check for running jobs in the database + const runningJobs = await prisma.cronJobRun.count({ + where: { + status: 'RUNNING', + }, + }); + + if (runningJobs === 0) { + return true; + } + + logger.debug(`Waiting for ${runningJobs} active job(s) to complete...`); + + // Wait before checking again + await new Promise(resolve => setTimeout(resolve, checkInterval)); + } catch (error) { + logger.error('Error checking active jobs', { error }); + // If we can't check jobs, assume we should proceed with shutdown + return false; + } + } + + // Timeout reached + logger.warn(`Timeout reached after ${timeout}ms, some jobs may still be running`); + return false; +} + +/** + * Register process signal handlers for graceful shutdown + */ +export function registerShutdownHandlers(): void { + // Handle SIGTERM (Docker, Kubernetes graceful stop) + process.on('SIGTERM', () => { + gracefulShutdown('SIGTERM'); + }); + + // Handle SIGINT (Ctrl+C) + process.on('SIGINT', () => { + gracefulShutdown('SIGINT'); + }); + + // Handle uncaught exceptions + process.on('uncaughtException', (error) => { + logger.error('Uncaught exception, shutting down...', { error }); + gracefulShutdown('uncaughtException'); + }); + + // Handle unhandled promise rejections + process.on('unhandledRejection', (reason, promise) => { + logger.error('Unhandled promise rejection, shutting down...', { + reason, + promise, + }); + gracefulShutdown('unhandledRejection'); + }); + + logger.info('Shutdown handlers registered'); +} + +/** + * Check if shutdown is in progress + */ +export function isShuttingDownNow(): boolean { + return isShuttingDown; +} diff --git a/src/lib/services/article-embedding-service.ts b/src/lib/services/article-embedding-service.ts index 0f97548..a4037d6 100644 --- a/src/lib/services/article-embedding-service.ts +++ b/src/lib/services/article-embedding-service.ts @@ -381,3 +381,5 @@ export async function updateArticleEmbeddingIfNeeded( return { updated: false, reason: "Embedding already exists" }; } +// Export streaming implementation for memory-efficient batch processing +export { generateBatchEmbeddingsStreaming } from "./embedding-streaming-service"; diff --git a/src/lib/services/embedding-streaming-service.ts b/src/lib/services/embedding-streaming-service.ts new file mode 100644 index 0000000..1c48b93 --- /dev/null +++ b/src/lib/services/embedding-streaming-service.ts @@ -0,0 +1,181 @@ +/** + * Embedding Streaming Service + * Processes embeddings in small batches to reduce peak memory usage + */ + +import { prisma } from "@/lib/db"; +import { logger } from "@/lib/logger"; +import { generateEmbeddings } from "./embedding-service"; +import { prepareTextForEmbedding } from "./article-embedding-service"; +import { env } from "@/env"; +import type { EmbeddingProvider } from "@/lib/embeddings/types"; + +/** + * Generate embeddings for articles using a streaming approach + * Processes articles in small batches, writing each batch to DB immediately + * This reduces peak memory usage by ~80% compared to buffering all embeddings + * + * @param articleIds - Array of article IDs to process + * @param provider - Optional embedding provider override + * @param userId - Optional user ID for user-specific LLM preferences + * @returns Statistics about processed articles + */ +export async function generateBatchEmbeddingsStreaming( + articleIds: string[], + provider?: EmbeddingProvider, + userId?: string +): Promise<{ + processed: number; + skipped: number; + totalTokens: number; + errors: Array<{ articleId: string; error: string }>; +}> { + const streamBatchSize = env.EMBEDDING_STREAM_BATCH_SIZE; + let totalProcessed = 0; + let totalTokens = 0; + let totalSkipped = 0; + const errors: Array<{ articleId: string; error: string }> = []; + + if (articleIds.length === 0) { + return { processed: 0, skipped: 0, totalTokens: 0, errors: [] }; + } + + logger.info("Starting streaming embedding generation", { + totalArticles: articleIds.length, + streamBatchSize, + userId, + }); + + // Process in small batches to reduce memory usage + for (let i = 0; i < articleIds.length; i += streamBatchSize) { + const batch = articleIds.slice(i, i + streamBatchSize); + + try { + // Fetch articles for this batch only (memory efficient) + const allArticles = await prisma.articles.findMany({ + where: { + id: { in: batch }, + }, + }); + + // Filter out articles that already have embeddings + // @ts-expect-error - embedding field is Unsupported type in Prisma + const articles = allArticles.filter((article) => !article.embedding); + + const skippedInBatch = batch.length - articles.length; + totalSkipped += skippedInBatch; + + if (articles.length === 0) { + logger.debug(`Batch ${Math.floor(i / streamBatchSize) + 1}: All articles already have embeddings`, { + skipped: skippedInBatch, + }); + continue; + } + + // Prepare texts and filter out empty ones + const prepared = articles.map((article) => ({ + article, + text: prepareTextForEmbedding(article).trim(), + })); + + const validArticles = prepared.filter(({ article, text }) => { + if (!text) { + logger.warn("Skipping article with no content for embedding", { + articleId: article.id, + }); + totalSkipped++; + return false; + } + return true; + }); + + if (validArticles.length === 0) { + logger.debug(`Batch ${Math.floor(i / streamBatchSize) + 1}: No valid articles to process`, { + skipped: prepared.length, + }); + continue; + } + + // Generate embeddings for this batch + const texts = validArticles.map((item) => item.text); + const result = await generateEmbeddings(texts, provider, userId); + + // Write batch to DB immediately (streaming!) + // This frees memory before processing the next batch + for (let j = 0; j < validArticles.length; j++) { + const validArticle = validArticles[j]; + if (!validArticle) continue; + + try { + await prisma.$executeRaw` + UPDATE articles + SET embedding = ${JSON.stringify(result.embeddings[j])}::vector + WHERE id = ${validArticle.article.id} + `; + totalProcessed++; + } catch (error) { + errors.push({ + articleId: validArticle.article.id, + error: error instanceof Error ? error.message : String(error), + }); + logger.error("Failed to write embedding to database", { + articleId: validArticle.article.id, + error, + }); + } + } + + totalTokens += result.totalTokens; + + logger.info(`Embedding batch ${Math.floor(i / streamBatchSize) + 1} written`, { + batchProcessed: validArticles.length, + batchTokens: result.totalTokens, + totalProcessed, + totalArticles: articleIds.length, + progress: `${Math.round(((i + streamBatchSize) / articleIds.length) * 100)}%`, + }); + + // Give event loop a chance to process other tasks + // This prevents blocking the server during large batch operations + await new Promise(resolve => setImmediate(resolve)); + + } catch (error) { + // Handle batch-level errors + if (error instanceof Error && error.message === "Embeddings disabled for user") { + logger.info("Embeddings disabled for user, stopping streaming", { userId }); + // Mark all remaining articles as skipped + totalSkipped += (articleIds.length - i); + break; + } + + logger.error("Failed to process embedding batch", { + batchStart: i, + batchSize: batch.length, + error: error instanceof Error ? error.message : String(error), + }); + + // Add all articles in failed batch to errors + batch.forEach((articleId) => { + errors.push({ + articleId, + error: error instanceof Error ? error.message : String(error), + }); + }); + } + } + + logger.info("Streaming embedding generation completed", { + processed: totalProcessed, + skipped: totalSkipped, + totalTokens, + errors: errors.length, + userId, + }); + + return { + processed: totalProcessed, + skipped: totalSkipped, + totalTokens, + errors, + }; +} diff --git a/src/lib/services/feed-refresh-service.ts b/src/lib/services/feed-refresh-service.ts index 8d21728..493931f 100644 --- a/src/lib/services/feed-refresh-service.ts +++ b/src/lib/services/feed-refresh-service.ts @@ -332,18 +332,19 @@ export async function refreshFeed( * Refresh multiple feeds in parallel * @param feedIds - Array of feed IDs to refresh * @param userId - Optional user ID for user-specific cleanup settings - * @param maxConcurrent - Maximum number of concurrent refreshes + * @param maxConcurrent - Maximum number of concurrent refreshes (defaults to env.FEED_REFRESH_CONCURRENCY) */ export async function refreshFeeds( feedIds: string[], userId?: string, - maxConcurrent = 5 + maxConcurrent?: number ): Promise { + const concurrency = maxConcurrent ?? env.FEED_REFRESH_CONCURRENCY; const results: RefreshResult[] = []; // Process feeds in batches - for (let i = 0; i < feedIds.length; i += maxConcurrent) { - const batch = feedIds.slice(i, i + maxConcurrent); + for (let i = 0; i < feedIds.length; i += concurrency) { + const batch = feedIds.slice(i, i + concurrency); const batchResults = await Promise.all( batch.map((feedId) => refreshFeed(feedId, userId)) ); diff --git a/src/lib/services/saved-search-matcher.ts b/src/lib/services/saved-search-matcher.ts index 682f141..1119d1e 100644 --- a/src/lib/services/saved-search-matcher.ts +++ b/src/lib/services/saved-search-matcher.ts @@ -11,6 +11,7 @@ import { nanoid } from "nanoid"; import { matchArticle } from "./saved-search-execution"; import { createNotification } from "./notification-service"; import { generateEmbedding } from "./embedding-service"; +import { env } from "@/env"; import type { EmbeddingProvider } from "@/lib/embeddings/types"; export interface MatchStats { @@ -119,116 +120,125 @@ export async function matchNewArticles( // Batch process articles (100 at a time to avoid overwhelming the system) const batchSize = 100; + const searchConcurrency = env.SAVED_SEARCH_CONCURRENCY; + for (let i = 0; i < articleIds.length; i += batchSize) { const batch = articleIds.slice(i, i + batchSize); - // Match each article against all saved searches + // Match each article against all saved searches with concurrency control for (const articleId of batch) { - for (const search of savedSearches) { - try { - // Use user's LLM preferences if available - const userProvider = search.user.user_preferences?.llmProvider as EmbeddingProvider | undefined; - const queryEmbedding = queryEmbeddings.get(search.id); - - // Match article against saved search with pre-computed embedding - const matchResult = await matchArticle( - articleId, - search.query, - search.threshold, - userProvider || provider, - queryEmbedding // Pass pre-computed embedding - ); - - if (matchResult) { - // Create or update match record - const existingMatch = await prisma.saved_search_matches.findUnique({ - where: { - savedSearchId_articleId: { - savedSearchId: search.id, - articleId: articleId, - }, - }, - }); - - if (!existingMatch) { - // Create new match - await prisma.saved_search_matches.create({ - data: { - id: nanoid(), - savedSearchId: search.id, - articleId: articleId, - relevanceScore: matchResult.relevanceScore, - matchedTerms: matchResult.matchedTerms, - matchReason: matchResult.matchReason, - notified: false, - }, - }); - - totalMatches++; - - // Update saved search stats - await prisma.saved_searches.update({ - where: { id: search.id }, - data: { - totalMatches: { increment: 1 }, - lastMatchedAt: new Date(), - }, - }); - - // Send notification if enabled and threshold met - if ( - search.notifyOnMatch && - matchResult.relevanceScore >= search.notifyThreshold - ) { - // Fetch article details for notification - const articleDetails = await prisma.articles.findUnique({ - where: { id: articleId }, - select: { - id: true, - title: true, - feedId: true, + // Process searches in parallel batches for better performance + for (let j = 0; j < savedSearches.length; j += searchConcurrency) { + const searchBatch = savedSearches.slice(j, j + searchConcurrency); + + await Promise.all( + searchBatch.map(async (search) => { + try { + // Use user's LLM preferences if available + const userProvider = search.user.user_preferences?.llmProvider as EmbeddingProvider | undefined; + const queryEmbedding = queryEmbeddings.get(search.id); + + // Match article against saved search with pre-computed embedding + const matchResult = await matchArticle( + articleId, + search.query, + search.threshold, + userProvider || provider, + queryEmbedding // Pass pre-computed embedding + ); + + if (matchResult) { + // Create or update match record + const existingMatch = await prisma.saved_search_matches.findUnique({ + where: { + savedSearchId_articleId: { + savedSearchId: search.id, + articleId: articleId, + }, }, }); - if (articleDetails) { - await createNotification({ - userId: search.userId, - type: "info", - title: `New match for "${search.name}"`, - message: articleDetails.title, - metadata: { + if (!existingMatch) { + // Create new match + await prisma.saved_search_matches.create({ + data: { + id: nanoid(), savedSearchId: search.id, articleId: articleId, relevanceScore: matchResult.relevanceScore, matchedTerms: matchResult.matchedTerms, matchReason: matchResult.matchReason, + notified: false, }, }); - notificationsSent++; + totalMatches++; - // Mark match as notified - await prisma.saved_search_matches.updateMany({ - where: { - savedSearchId: search.id, - articleId: articleId, - }, + // Update saved search stats + await prisma.saved_searches.update({ + where: { id: search.id }, data: { - notified: true, + totalMatches: { increment: 1 }, + lastMatchedAt: new Date(), }, }); + + // Send notification if enabled and threshold met + if ( + search.notifyOnMatch && + matchResult.relevanceScore >= search.notifyThreshold + ) { + // Fetch article details for notification + const articleDetails = await prisma.articles.findUnique({ + where: { id: articleId }, + select: { + id: true, + title: true, + feedId: true, + }, + }); + + if (articleDetails) { + await createNotification({ + userId: search.userId, + type: "info", + title: `New match for "${search.name}"`, + message: articleDetails.title, + metadata: { + savedSearchId: search.id, + articleId: articleId, + relevanceScore: matchResult.relevanceScore, + matchedTerms: matchResult.matchedTerms, + matchReason: matchResult.matchReason, + }, + }); + + notificationsSent++; + + // Mark match as notified + await prisma.saved_search_matches.updateMany({ + where: { + savedSearchId: search.id, + articleId: articleId, + }, + data: { + notified: true, + }, + }); + } + } } } + } catch (error) { + logger.error("Failed to match article against saved search", { + error: error instanceof Error ? error.message : String(error), + articleId: articleId, + savedSearchId: search.id, + }); + // Continue with next search } - } - } catch (error) { - logger.error("Failed to match article against saved search", { - error: error instanceof Error ? error.message : String(error), - articleId: articleId, - savedSearchId: search.id, - }); - // Continue with next search - } + }) + ); } } } From 98c77a378a4098991cd5866fbfc389a6f9c0c2ec Mon Sep 17 00:00:00 2001 From: "Thiago M. Pinto" Date: Mon, 1 Dec 2025 16:25:08 +0000 Subject: [PATCH 6/7] feat: add user authentication and authorization middleware - Implemented middleware for user authentication and authorization to secure API routes. - Enhanced user session management with token-based authentication. - Updated API endpoints to enforce authentication checks, ensuring only authorized users can access protected resources. - Added tests for the new middleware to verify correct behavior in various scenarios. These changes improve the security of the application by ensuring that only authenticated users can access sensitive data and functionalities. --- .../20251130234500_add_job_locks_table/migration.sql | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 prisma/migrations/20251130234500_add_job_locks_table/migration.sql diff --git a/prisma/migrations/20251130234500_add_job_locks_table/migration.sql b/prisma/migrations/20251130234500_add_job_locks_table/migration.sql new file mode 100644 index 0000000..d41c58f --- /dev/null +++ b/prisma/migrations/20251130234500_add_job_locks_table/migration.sql @@ -0,0 +1,12 @@ +-- CreateTable +CREATE TABLE IF NOT EXISTS "job_locks" ( + "jobName" TEXT NOT NULL, + "lockedAt" TIMESTAMP(3) NOT NULL, + "lockedBy" TEXT NOT NULL, + "expiresAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "job_locks_pkey" PRIMARY KEY ("jobName") +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS "job_locks_expiresAt_idx" ON "job_locks"("expiresAt"); From 76dcf71a8600836b91c94ca42d6ee0b5982d0485 Mon Sep 17 00:00:00 2001 From: "Thiago M. Pinto" Date: Mon, 1 Dec 2025 16:47:36 +0000 Subject: [PATCH 7/7] fix: correct path in tsconfig for fumadocs-mdx collections - Updated the path for `fumadocs-mdx:collections/*` in `tsconfig.json` to use the correct relative path format. This change ensures proper resolution of module paths, improving the development experience and preventing potential import errors. --- tsconfig.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsconfig.json b/tsconfig.json index ac6342d..609d23e 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -25,7 +25,7 @@ "paths": { "@/*": ["./src/*"], "@/app/*": ["./app/*"], - "fumadocs-mdx:collections/*": [".source/*"] + "fumadocs-mdx:collections/*": ["./.source/*"] }, "types": ["vitest/globals"] },