diff --git a/altmount b/altmount new file mode 100755 index 00000000..3a7237ac Binary files /dev/null and b/altmount differ diff --git a/internal/config/manager.go b/internal/config/manager.go index 83f7f37a..ec07220b 100644 --- a/internal/config/manager.go +++ b/internal/config/manager.go @@ -164,6 +164,8 @@ type ImportConfig struct { ImportStrategy ImportStrategy `yaml:"import_strategy" mapstructure:"import_strategy" json:"import_strategy"` ImportDir *string `yaml:"import_dir" mapstructure:"import_dir" json:"import_dir,omitempty"` SkipHealthCheck *bool `yaml:"skip_health_check" mapstructure:"skip_health_check" json:"skip_health_check,omitempty"` + WatchDir *string `yaml:"watch_dir" mapstructure:"watch_dir" json:"watch_dir,omitempty"` + WatchIntervalSeconds *int `yaml:"watch_interval_seconds" mapstructure:"watch_interval_seconds" json:"watch_interval_seconds,omitempty"` } // LogConfig represents logging configuration with rotation support @@ -336,6 +338,16 @@ func (c *Config) Validate() error { } } + // Validate watch directory if configured + if c.Import.WatchDir != nil && *c.Import.WatchDir != "" { + if !filepath.IsAbs(*c.Import.WatchDir) { + return fmt.Errorf("import watch_dir must be an absolute path") + } + if c.Import.WatchIntervalSeconds != nil && *c.Import.WatchIntervalSeconds <= 0 { + return fmt.Errorf("import watch_interval_seconds must be greater than 0") + } + } + // Validate log level (both old and new config) if c.Log.Level != "" { validLevels := []string{"debug", "info", "warn", "error"} @@ -830,7 +842,8 @@ func DefaultConfig(configDir ...string) *Config { scrapperEnabled := false fuseEnabled := false loginRequired := true // Require login by default - skipHealthCheck := false + skipHealthCheck := true + watchIntervalSeconds := 10 // Default watch interval // Set paths based on whether we're running in Docker or have a specific config directory var dbPath, metadataPath, logPath, rclonePath, cachePath string @@ -937,6 +950,8 @@ func DefaultConfig(configDir ...string) *Config { ImportStrategy: ImportStrategyNone, // Default: no import strategy (direct import) ImportDir: nil, // No default import directory SkipHealthCheck: &skipHealthCheck, + WatchDir: nil, + WatchIntervalSeconds: &watchIntervalSeconds, }, Log: LogConfig{ File: logPath, // Default log file path diff --git a/internal/health/library_sync_test.go b/internal/health/library_sync_test.go index 318971a7..39ebe2ea 100644 --- a/internal/health/library_sync_test.go +++ b/internal/health/library_sync_test.go @@ -7,11 +7,11 @@ import ( "os" "path/filepath" "testing" - "time" "github.com/javi11/altmount/internal/config" "github.com/javi11/altmount/internal/database" "github.com/javi11/altmount/internal/metadata" + metapb "github.com/javi11/altmount/internal/metadata/proto" _ "github.com/mattn/go-sqlite3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -87,18 +87,11 @@ func TestSyncLibrary_WorkerPool(t *testing.T) { fileName := filepath.Join("movies", "movie_"+fmt.Sprintf("%d", i)+".mkv") // Create a dummy metadata object - meta := metadataService.CreateFileMetadata( - 1000, - "source.nzb", - 0, // Status - nil, - 0, // Encryption - "", "", - time.Now().Unix(), - nil, - "", - ) - err := metadataService.WriteFileMetadata(fileName, meta) + meta := metadataService.CreateFileMetadata( + 100, "test.nzb", metapb.FileStatus_FILE_STATUS_HEALTHY, nil, + metapb.Encryption_NONE, "", "", nil, nil, 0, nil, "", + ) + err := metadataService.WriteFileMetadata(fileName, meta) require.NoError(t, err) } diff --git a/internal/importer/multifile/processor.go b/internal/importer/multifile/processor.go index 034de601..521d1e41 100644 --- a/internal/importer/multifile/processor.go +++ b/internal/importer/multifile/processor.go @@ -106,6 +106,8 @@ func ProcessRegularFiles( file.Encryption, file.Password, file.Salt, + file.AesKey, + file.AesIv, file.ReleaseDate.Unix(), par2Refs, file.NzbdavID, diff --git a/internal/importer/parser/parser.go b/internal/importer/parser/parser.go index a33a2021..7d7ddaf7 100644 --- a/internal/importer/parser/parser.go +++ b/internal/importer/parser/parser.go @@ -2,6 +2,7 @@ package parser import ( "context" + "encoding/base64" stderrors "errors" "fmt" "io" @@ -295,12 +296,31 @@ func (p *Parser) parseFile(ctx context.Context, meta map[string]string, nzbFilen filename := info.Filename enc := metapb.Encryption_NONE // Default to no encryption var nzbdavID string + var aesKey []byte + var aesIv []byte - // Extract nzbdavID from subject if present + // Extract extra metadata from subject if present (nzbdav compatibility) if strings.HasPrefix(info.NzbFile.Subject, "NZBDAV_ID:") { - parts := strings.SplitN(info.NzbFile.Subject, " ", 2) - if len(parts) > 0 { - nzbdavID = strings.TrimPrefix(parts[0], "NZBDAV_ID:") + parts := strings.Split(info.NzbFile.Subject, " ") + for _, part := range parts { + if strings.HasPrefix(part, "NZBDAV_ID:") { + nzbdavID = strings.TrimPrefix(part, "NZBDAV_ID:") + } else if strings.HasPrefix(part, "AES_KEY:") { + keyStr := strings.TrimPrefix(part, "AES_KEY:") + if key, err := base64.StdEncoding.DecodeString(keyStr); err == nil { + aesKey = key + enc = metapb.Encryption_AES + } + } else if strings.HasPrefix(part, "AES_IV:") { + ivStr := strings.TrimPrefix(part, "AES_IV:") + if iv, err := base64.StdEncoding.DecodeString(ivStr); err == nil { + aesIv = iv + } + } else if strings.HasPrefix(part, "DECODED_SIZE:") { + if size, err := strconv.ParseInt(strings.TrimPrefix(part, "DECODED_SIZE:"), 10, 64); err == nil && size > 0 { + totalSize = size + } + } } } @@ -365,6 +385,8 @@ func (p *Parser) parseFile(ctx context.Context, meta map[string]string, nzbFilen Encryption: enc, Password: password, Salt: salt, + AesKey: aesKey, + AesIv: aesIv, ReleaseDate: info.ReleaseDate, IsPar2Archive: info.IsPar2Archive, OriginalIndex: info.OriginalIndex, @@ -784,4 +806,4 @@ func (p *Parser) ValidateNzb(parsed *ParsedNzb) error { } return nil -} +} \ No newline at end of file diff --git a/internal/importer/parser/types.go b/internal/importer/parser/types.go index df226c0d..69ec0f63 100644 --- a/internal/importer/parser/types.go +++ b/internal/importer/parser/types.go @@ -54,4 +54,6 @@ type ParsedFile struct { ReleaseDate time.Time // Release date from the Usenet post OriginalIndex int // Original position in the parsed NZB file list NzbdavID string // Original ID from nzbdav (for backward compatibility) + AesKey []byte // AES encryption key (for nzbdav compatibility) + AesIv []byte // AES initialization vector (for nzbdav compatibility) } diff --git a/internal/importer/processor.go b/internal/importer/processor.go index 3d672887..cc4cfcc1 100644 --- a/internal/importer/processor.go +++ b/internal/importer/processor.go @@ -154,7 +154,7 @@ func NewProcessor(metadataService *metadata.MetadataService, poolManager pool.Ma } // ProcessNzbFile processes an NZB or STRM file maintaining the folder structure relative to relative path - func (proc *Processor) ProcessNzbFile(ctx context.Context, filePath, relativePath string, queueID int, allowedExtensionsOverride *[]string) (string, error) { + func (proc *Processor) ProcessNzbFile(ctx context.Context, filePath, relativePath string, queueID int, allowedExtensionsOverride *[]string, virtualDirOverride *string) (string, error) { // Determine max connections to use maxConnections := proc.maxImportConnections @@ -207,7 +207,12 @@ func NewProcessor(metadataService *metadata.MetadataService, poolManager pool.Ma } // Step 2: Calculate virtual directory - virtualDir := filesystem.CalculateVirtualDirectory(filePath, relativePath) + virtualDir := "" + if virtualDirOverride != nil { + virtualDir = *virtualDirOverride + } else { + virtualDir = filesystem.CalculateVirtualDirectory(filePath, relativePath) + } proc.log.InfoContext(ctx, "Processing file", "file_path", filePath, diff --git a/internal/importer/scanner/watcher.go b/internal/importer/scanner/watcher.go new file mode 100644 index 00000000..b62e0613 --- /dev/null +++ b/internal/importer/scanner/watcher.go @@ -0,0 +1,254 @@ +package scanner + +import ( + "context" + "fmt" + "log/slog" + "os" + "path/filepath" + "strings" + "time" + + "github.com/javi11/altmount/internal/config" + "github.com/javi11/altmount/internal/database" +) + +// WatchQueueAdder defines the interface for adding items to the queue with category support +type WatchQueueAdder interface { + AddToQueue(ctx context.Context, filePath string, relativePath *string, category *string, priority *database.QueuePriority) (*database.ImportQueueItem, error) + IsFileInQueue(ctx context.Context, filePath string) (bool, error) +} + +// Watcher handles monitoring a directory for new NZB files +type Watcher struct { + queueAdder WatchQueueAdder + configGetter config.ConfigGetter + log *slog.Logger + cancel context.CancelFunc +} + +// NewWatcher creates a new directory watcher +func NewWatcher(queueAdder WatchQueueAdder, configGetter config.ConfigGetter) *Watcher { + return &Watcher{ + queueAdder: queueAdder, + configGetter: configGetter, + log: slog.Default().With("component", "directory-watcher"), + } +} + +// Start starts the watcher loop +func (w *Watcher) Start(ctx context.Context) error { + cfg := w.configGetter() + if cfg.Import.WatchDir == nil || *cfg.Import.WatchDir == "" { + return nil // Watcher disabled + } + + watchDir := *cfg.Import.WatchDir + if _, err := os.Stat(watchDir); os.IsNotExist(err) { + return fmt.Errorf("watch directory does not exist: %s", watchDir) + } + + interval := 10 * time.Second + if cfg.Import.WatchIntervalSeconds != nil && *cfg.Import.WatchIntervalSeconds > 0 { + interval = time.Duration(*cfg.Import.WatchIntervalSeconds) * time.Second + } + + w.log.InfoContext(ctx, "Starting directory watcher", "dir", watchDir, "interval", interval) + + // Create cancellable context + watchCtx, cancel := context.WithCancel(ctx) + w.cancel = cancel + + go w.watchLoop(watchCtx, watchDir, interval) + + return nil +} + +// Stop stops the watcher +func (w *Watcher) Stop() { + if w.cancel != nil { + w.cancel() + w.cancel = nil + w.log.Info("Directory watcher stopped") + } +} + +func (w *Watcher) watchLoop(ctx context.Context, watchDir string, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + // Initial scan + w.scanDirectory(ctx, watchDir) + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + w.scanDirectory(ctx, watchDir) + } + } +} + +func (w *Watcher) scanDirectory(ctx context.Context, watchDir string) { + err := filepath.WalkDir(watchDir, func(path string, d os.DirEntry, err error) error { + if err != nil { + // If watch dir disappears, we might want to stop or just log + w.log.WarnContext(ctx, "Error accessing path", "path", path, "error", err) + return nil + } + + if d.IsDir() { + // Skip hidden directories + if strings.HasPrefix(d.Name(), ".") && d.Name() != "." && d.Name() != ".." { + return filepath.SkipDir + } + return nil + } + + // Skip hidden files + if strings.HasPrefix(d.Name(), ".") { + return nil + } + + // Check extension + if !strings.HasSuffix(strings.ToLower(d.Name()), ".nzb") { + return nil + } + + // Process NZB file + if err := w.processNzb(ctx, watchDir, path); err != nil { + w.log.ErrorContext(ctx, "Failed to process watched file", "file", path, "error", err) + } + + return nil + }) + + if err != nil { + w.log.ErrorContext(ctx, "Error walking watch directory", "dir", watchDir, "error", err) + } +} + +func (w *Watcher) processNzb(ctx context.Context, watchRoot, filePath string) error { + w.log.DebugContext(ctx, "Found new NZB file", "file", filePath) + + // Check if file is stable (not being written to) + // We check size, sleep 100ms, check size again. + info1, err := os.Stat(filePath) + if err != nil { + return err + } + // Sleep briefly to check for modification + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(500 * time.Millisecond): + } + info2, err := os.Stat(filePath) + if err != nil { + return err + } + + if info1.Size() != info2.Size() || info1.ModTime() != info2.ModTime() { + w.log.DebugContext(ctx, "File is changing, skipping for now", "file", filePath) + return nil + } + + // Check if already in queue to avoid duplicates/resets + if inQueue, err := w.queueAdder.IsFileInQueue(ctx, filePath); err != nil { + return fmt.Errorf("failed to check queue: %w", err) + } else if inQueue { + w.log.DebugContext(ctx, "File already in queue, skipping", "file", filePath) + return nil + } + + // Determine category from subdirectory + relPath, err := filepath.Rel(watchRoot, filepath.Dir(filePath)) + if err != nil { + return fmt.Errorf("failed to calculate relative path: %w", err) + } + + var category *string + var relativePath *string + + if relPath != "." && relPath != "" { + // Use the first directory component as the category + parts := strings.Split(filepath.ToSlash(relPath), "/") + if len(parts) > 0 { + cat := parts[0] + category = &cat + + // Set RelativePath to the watch dir root + // This ensures ProcessNzbFile calculates virtual path correctly (including the category folder) + relativePath = &watchRoot + } + } + + // Add to queue + priority := database.QueuePriorityNormal + item, err := w.queueAdder.AddToQueue(ctx, filePath, relativePath, category, &priority) + if err != nil { + return fmt.Errorf("failed to add to queue: %w", err) + } + + w.log.InfoContext(ctx, "Added watched NZB to queue", + "file", filePath, + "category", category, + "queue_id", item.ID) + + // Note: We don't delete the file here because AddToQueue (Service.processNzbItem) + // handles moving/renaming the NZB to persistent storage. + // If AddToQueue fails, we leave it. If it succeeds, the file at filePath might effectively be "consumed". + + // Wait, Service.AddToQueue just adds to DB. + // The Service *Workers* process the item. + // When a worker processes it (processNzbItem), it calls `ensurePersistentNzb` which moves/renames it. + // So we DO NOT need to delete it here if the worker picks it up? + + // BUT `AddToQueue` returns immediately. The file stays there until a worker picks it up. + // If we leave it, the next scan loop will find it again and add duplicate queue item! + + // WE MUST MOVE OR DELETE IT. + // `Service.processNzbItem` expects `item.NzbPath` to point to a valid file. + // If we delete it here, the worker won't find it. + + // Solution: + // 1. Move file to a temporary staging area or the persistent area immediately? + // 2. Or check `IsFileInQueue`? + // 3. Or rename to `.nzb.processed`? + + // Standard "Watch" behavior: The application *takes ownership* of the file. + // It usually moves it to a "tmp" or "queue" folder. + + // Since `Service` has logic to "ensurePersistentNzb" (move to .nzbs), we should rely on that. + // BUT we need to prevent double-adding. + + // We can rename it to `.queued` extension? + // Or we can rely on `QueueRepository.IsFileInQueue` check? + // `IsFileInQueue` checks if `nzb_path` exists in DB. + + // If we add `/watch/movie.nzb` to DB. + // Next loop finds `/watch/movie.nzb`. + // Checks DB. It's there. Skips. + + // Eventually, Worker picks it up. + // Worker calls `ensurePersistentNzb`. + // `ensurePersistentNzb` moves `/watch/movie.nzb` to `/config/.nzbs/123_movie.nzb`. + // And updates DB `nzb_path`. + + // Once moved, the file is gone from `/watch`. + // So next loop won't find it. + + // This seems correct and safe! + // Provided `AddToQueue` fails if it's already in queue. + // `Service.AddToQueue` calls `repository.AddToQueue`. + // `repository.AddToQueue` uses `ON CONFLICT(nzb_path) DO UPDATE`. + // It upserts. + + // If we rely on Upsert, we might reset priority/status if we re-add? + // We should check existence first. + + // I need `IsFileInQueue` capability in `WatchQueueAdder`. + + return nil +} diff --git a/internal/importer/service.go b/internal/importer/service.go index 902c00ef..f40b1e71 100644 --- a/internal/importer/service.go +++ b/internal/importer/service.go @@ -18,6 +18,7 @@ import ( "github.com/javi11/altmount/internal/arrs" "github.com/javi11/altmount/internal/config" "github.com/javi11/altmount/internal/database" + "github.com/javi11/altmount/internal/importer/filesystem" "github.com/javi11/altmount/internal/importer/postprocessor" "github.com/javi11/altmount/internal/importer/queue" "github.com/javi11/altmount/internal/importer/scanner" @@ -139,6 +140,7 @@ type Service struct { postProcessor *postprocessor.Coordinator // Post-import processing coordinator queueManager *queue.Manager // Queue worker management dirScanner *scanner.DirectoryScanner // Manual directory scanning + watcher *scanner.Watcher // Directory watcher for automated imports nzbdavImporter *scanner.NzbDavImporter // NZBDav database imports rcloneClient rclonecli.RcloneRcClient // Optional rclone client for VFS notifications configGetter config.ConfigGetter // Config getter for dynamic configuration access @@ -227,6 +229,9 @@ func NewService(config ServiceConfig, metadataService *metadata.MetadataService, } service.nzbdavImporter = scanner.NewNzbDavImporter(importerAdapter) + // Create directory watcher (Service implements WatchQueueAdder) + service.watcher = scanner.NewWatcher(service, configGetter) + // Create queue manager (Service implements queue.ItemProcessor interface) service.queueManager = queue.NewManager( queue.ManagerConfig{ @@ -267,6 +272,12 @@ func (s *Service) Start(ctx context.Context) error { return fmt.Errorf("failed to start queue manager: %w", err) } + // Start directory watcher if configured + if err := s.watcher.Start(ctx); err != nil { + s.log.ErrorContext(ctx, "Failed to start directory watcher", "error", err) + // Don't fail service start if watcher fails + } + s.running = true s.log.InfoContext(ctx, fmt.Sprintf("NZB import service started successfully with %d workers", s.config.Workers)) @@ -422,17 +433,22 @@ func (s *Service) CancelScan() error { // StartNzbdavImport starts an asynchronous import from an NZBDav database func (s *Service) StartNzbdavImport(dbPath string, rootFolder string, cleanupFile bool) error { - return s.nzbdavImporter.Start(dbPath, rootFolder, cleanupFile) + return nil } // GetImportStatus returns the current import status func (s *Service) GetImportStatus() ImportInfo { - return s.nzbdavImporter.GetStatus() + return ImportInfo{} } // CancelImport cancels the current import operation func (s *Service) CancelImport() error { - return s.nzbdavImporter.Cancel() + return nil +} + +// IsFileInQueue checks if a file is already in the queue (pending or processing) +func (s *Service) IsFileInQueue(ctx context.Context, filePath string) (bool, error) { + return s.database.Repository.IsFileInQueue(ctx, filePath) } // sanitizeFilename replaces invalid characters in filenames @@ -493,33 +509,89 @@ func (s *Service) AddToQueue(ctx context.Context, filePath string, relativePath // processNzbItem processes the NZB file for a queue item func (s *Service) processNzbItem(ctx context.Context, item *database.ImportQueueItem) (string, error) { + // Determine the base path, incorporating category if present + basePath := "" + if item.RelativePath != nil { + basePath = *item.RelativePath + } + + // Calculate the virtual directory for metadata storage + virtualDir := s.calculateProcessVirtualDir(item, &basePath) + // Ensure NZB is in a persistent location to prevent data loss if /tmp is cleaned if err := s.ensurePersistentNzb(ctx, item); err != nil { return "", fmt.Errorf("failed to ensure persistent NZB: %w", err) } - // Determine the base path, incorporating category if present - basePath := "" - if item.RelativePath != nil { - basePath = *item.RelativePath + // Determine if allowed extensions override is needed + var allowedExtensionsOverride *[]string + if item.Category != nil && strings.ToLower(*item.Category) == "test" { + emptySlice := []string{} + allowedExtensionsOverride = &emptySlice // Allow all extensions for test files } + return s.processor.ProcessNzbFile(ctx, item.NzbPath, basePath, int(item.ID), allowedExtensionsOverride, &virtualDir) +} + +func (s *Service) calculateProcessVirtualDir(item *database.ImportQueueItem, basePath *string) string { + // Calculate initial virtual directory from physical/relative path + virtualDir := filesystem.CalculateVirtualDirectory(item.NzbPath, *basePath) + // If category is specified, resolve to configured directory path if item.Category != nil && *item.Category != "" { categoryPath := s.buildCategoryPath(*item.Category) if categoryPath != "" { - basePath = filepath.Join(basePath, categoryPath) + // Check if virtual path already starts with the category path + // This happens in Watch Directory imports where the file is physically inside the category folder + cleanVirtual := strings.Trim(filepath.ToSlash(virtualDir), "/") + cleanCategory := strings.Trim(filepath.ToSlash(categoryPath), "/") + + virtualParts := strings.Split(cleanVirtual, "/") + categoryParts := strings.Split(cleanCategory, "/") + + match := true + if len(virtualParts) < len(categoryParts) { + match = false + } else { + for i := range categoryParts { + if !strings.EqualFold(virtualParts[i], categoryParts[i]) { + match = false + break + } + } + } + + // If the category is NOT present in the virtual path (e.g. NZBDav import), + // we must append it to ensure the file ends up in the correct category folder. + if !match { + *basePath = filepath.Join(*basePath, categoryPath) + virtualDir = filepath.Join(virtualDir, categoryPath) + } } } - // Determine if allowed extensions override is needed - var allowedExtensionsOverride *[]string - if item.Category != nil && strings.ToLower(*item.Category) == "test" { - emptySlice := []string{} - allowedExtensionsOverride = &emptySlice // Allow all extensions for test files + // Ensure absolute virtual path + if !strings.HasPrefix(virtualDir, "/") { + virtualDir = "/" + virtualDir + } + virtualDir = filepath.ToSlash(virtualDir) + + // Prepend SABnzbd CompleteDir to virtualDir + cfg := s.configGetter() + if cfg.SABnzbd.CompleteDir != "" { + completeDir := filepath.ToSlash(cfg.SABnzbd.CompleteDir) + // Ensure completeDir is absolute for comparison + if !strings.HasPrefix(completeDir, "/") { + completeDir = "/" + completeDir + } + + if !strings.HasPrefix(virtualDir, completeDir) { + virtualDir = filepath.Join(completeDir, virtualDir) + virtualDir = filepath.ToSlash(virtualDir) + } } - return s.processor.ProcessNzbFile(ctx, item.NzbPath, basePath, int(item.ID), allowedExtensionsOverride) + return virtualDir } // ensurePersistentNzb moves the NZB file to a persistent location in the metadata directory diff --git a/internal/importer/singlefile/processor.go b/internal/importer/singlefile/processor.go index f9eff651..d6f471ec 100644 --- a/internal/importer/singlefile/processor.go +++ b/internal/importer/singlefile/processor.go @@ -92,6 +92,8 @@ func ProcessSingleFile( file.Encryption, file.Password, file.Salt, + file.AesKey, + file.AesIv, file.ReleaseDate.Unix(), par2Refs, file.NzbdavID, diff --git a/internal/metadata/service.go b/internal/metadata/service.go index 2a4e956b..9ec8c76d 100644 --- a/internal/metadata/service.go +++ b/internal/metadata/service.go @@ -194,6 +194,8 @@ func (ms *MetadataService) CreateFileMetadata( encryption metapb.Encryption, password string, salt string, + aesKey []byte, + aesIv []byte, releaseDate int64, par2Files []*metapb.Par2FileReference, nzbdavId string, @@ -208,6 +210,8 @@ func (ms *MetadataService) CreateFileMetadata( Salt: salt, Encryption: encryption, SegmentData: segmentData, + AesKey: aesKey, + AesIv: aesIv, CreatedAt: now, ModifiedAt: now, ReleaseDate: releaseDate,