From 577fdc7ed25d7943fecf427b33bc9b78d7983859 Mon Sep 17 00:00:00 2001 From: Ganesh Kumar Date: Sun, 8 Feb 2026 22:14:52 +0530 Subject: [PATCH 1/5] Implement XXHash Caching and Refactor Core Rclone Utilities --- b2-manager/core/download.go | 152 +------------- b2-manager/core/logger.go | 4 +- b2-manager/core/metadata.go | 169 ++++++++++++---- b2-manager/core/rclone.go | 229 +++++++++++++--------- b2-manager/core/status.go | 156 ++++++--------- b2-manager/docs/concurrent/parallelism.md | 13 +- b2-manager/docs/core.md | 6 +- b2-manager/docs/workflow/download.md | 2 +- b2-manager/docs/workflow/hashing.md | 59 ++++++ b2-manager/docs/workflow/status.md | 15 +- b2-manager/docs/workflow/upload.md | 5 +- b2-manager/go.mod | 2 + b2-manager/go.sum | 6 + b2-manager/main.go | 10 + b2-manager/model/types.go | 13 +- b2-manager/ui/operations.go | 2 +- b2-manager/ui/ui.go | 33 +++- 17 files changed, 465 insertions(+), 411 deletions(-) create mode 100644 b2-manager/docs/workflow/hashing.md diff --git a/b2-manager/core/download.go b/b2-manager/core/download.go index 93889a0f75..bf727fbd06 100644 --- a/b2-manager/core/download.go +++ b/b2-manager/core/download.go @@ -21,7 +21,7 @@ import ( // 1. Lock Check: Verify that no one else is currently uploading this database. // 2. Download: Execute `rclone copy` to pull the file from B2. // 3. Anchor: Construct a local "Verified Anchor" (LocalVersion) to mark this state as synced. -func DownloadDatabase(ctx context.Context, dbName string, onProgress func(model.RcloneProgress)) error { +func DownloadDatabase(ctx context.Context, dbName string, quiet bool, onProgress func(model.RcloneProgress)) error { LogInfo("Downloading database %s", dbName) // ------------------------------------------------------------------------- @@ -50,59 +50,17 @@ func DownloadDatabase(ctx context.Context, dbName string, onProgress func(model. // Use directory as destination for 'copy' localDir := config.AppConfig.LocalDBDir - // Changed from copyto to copy for safety/data loss prevention - rcloneArgs := []string{"copy", - remotePath, - localDir, - "--checksum", - "--retries", "20", - "--low-level-retries", "30", - "--retries-sleep", "10s", - } - - if onProgress != nil { - // Removed --verbose to avoid polluting JSON output - // User reported stats missing without verbose. restoring -v. - rcloneArgs = append(rcloneArgs, "-v", "--use-json-log", "--stats", "0.5s") - } else { - rcloneArgs = append(rcloneArgs, "--progress") - } - - cmdSync := exec.CommandContext(ctx, "rclone", rcloneArgs...) - // ------------------------------------------------------------------------- // PHASE 2: EXECUTE DOWNLOAD // Perform the actual network transfer using `rclone copy`. // ------------------------------------------------------------------------- - if onProgress != nil { - stderr, err := cmdSync.StderrPipe() - if err != nil { - LogError("Failed to get stderr pipe: %v", err) - return fmt.Errorf("failed to get stderr pipe: %w", err) - } - if err := cmdSync.Start(); err != nil { - LogError("Download start failed: %v", err) - return fmt.Errorf("download start failed: %w", err) - } - go ParseRcloneOutput(stderr, onProgress) + description := "Downloading " + dbName + // Use the passed quiet parameter + // The new RcloneCopy uses !quiet for verbose. If onProgress is set, it adds json flags. - if err := cmdSync.Wait(); err != nil { - if ctx.Err() != nil { - return fmt.Errorf("download cancelled") - } - LogError("Download of %s failed: %v", dbName, err) - return fmt.Errorf("download of %s failed: %w", dbName, err) - } - } else { - cmdSync.Stdout = os.Stdout - cmdSync.Stderr = os.Stderr - if err := cmdSync.Run(); err != nil { - if ctx.Err() != nil { - return fmt.Errorf("download cancelled") - } - LogError("DownloadDatabase rclone copy failed for %s: %v", dbName, err) - return fmt.Errorf("download of %s failed: %w", dbName, err) - } + if err := RcloneCopy(ctx, remotePath, localDir, description, quiet, onProgress); err != nil { + LogError("DownloadDatabase RcloneCopy failed for %s: %v", dbName, err) + return fmt.Errorf("download of %s failed: %w", dbName, err) } // ------------------------------------------------------------------------- @@ -120,7 +78,7 @@ func DownloadDatabase(ctx context.Context, dbName string, onProgress func(model. // 3.1. Calculate Local Hash of the newly downloaded file localDBPath := filepath.Join(config.AppConfig.LocalDBDir, dbName) - localHash, err := CalculateSHA256(localDBPath) + localHash, err := CalculateXXHash(localDBPath) if err != nil { LogError("DownloadDatabase: Failed to calculate hash of downloaded file %s: %v", dbName, err) return fmt.Errorf("failed to calculate hash of downloaded database: %w", err) @@ -181,97 +139,3 @@ func DownloadDatabase(ctx context.Context, dbName string, onProgress func(model. return nil } - -// DownloadAllDatabases syncs all databases from remote to local -func DownloadAllDatabases(onProgress func(model.RcloneProgress)) error { - ctx := GetContext() - - LogInfo("Starting batch download of all databases") - - if err := os.MkdirAll(config.AppConfig.LocalDBDir, 0755); err != nil { - LogError("Failed to create local directory in DownloadAllDatabases: %v", err) - return fmt.Errorf("failed to create local directory: %w", err) - } - - rcloneArgs := []string{"copy", - config.AppConfig.RootBucket, - config.AppConfig.LocalDBDir, - "--checksum", - "--retries", "20", - "--low-level-retries", "30", - "--retries-sleep", "10s", - } - - if onProgress != nil { - rcloneArgs = append(rcloneArgs, "--use-json-log", "--stats", "0.5s") - } else { - rcloneArgs = append(rcloneArgs, "--progress") - } - - cmdSync := exec.CommandContext(ctx, "rclone", rcloneArgs...) - - if onProgress != nil { - stderr, err := cmdSync.StderrPipe() - if err != nil { - LogError("Failed to get stderr pipe: %v", err) - return fmt.Errorf("failed to get stderr pipe: %w", err) - } - if err := cmdSync.Start(); err != nil { - LogError("Batch download start failed: %v", err) - return fmt.Errorf("batch download start failed: %w", err) - } - go ParseRcloneOutput(stderr, onProgress) - - if err := cmdSync.Wait(); err != nil { - if ctx.Err() != nil { - LogInfo("DownloadAllDatabases cancelled") - return fmt.Errorf("batch download cancelled") - } - LogError("Batch download failed: %v", err) - return fmt.Errorf("batch download failed: %w", err) - } - } else { - cmdSync.Stdout = os.Stdout - cmdSync.Stderr = os.Stderr - if err := cmdSync.Run(); err != nil { - if ctx.Err() != nil { - LogInfo("DownloadAllDatabases cancelled") - return fmt.Errorf("batch download cancelled") - } - LogError("DownloadAllDatabases batch rclone copy failed: %v", err) - return fmt.Errorf("batch download failed: %w", err) - } - } - - // 1. Sync Remote Metadata -> Local Mirror (version/) - LogInfo("DownloadAllDatabases: Updating metadata mirror...") - - // 1. Remote:VersionDir -> Local:VersionDir (Mirror) - cmdMirror := exec.CommandContext(ctx, "rclone", "sync", config.AppConfig.VersionDir, config.AppConfig.LocalVersionDir) - if err := cmdMirror.Run(); err != nil { - LogError("DownloadAllDatabases: Failed to update metadata mirror: %v", err) - } else { - // 2. Iterate over all downloaded DBs and construct Verified Anchors - // We use the same strict logic as DownloadDatabase - LogInfo("DownloadAllDatabases: Constructing verified anchors...") - - // Get list of local DBs we just downloaded - entries, err := os.ReadDir(config.AppConfig.LocalDBDir) - if err == nil { - for _, entry := range entries { - if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".db") { - dbName := entry.Name() - - // Use shared helper - if err := ConstructVerifiedAnchor(dbName); err != nil { - LogError("DownloadAllDatabases: Failed to anchor %s: %v", dbName, err) - } - } - } - } else { - LogError("DownloadAllDatabases: Failed to read local db dir: %v", err) - } - } - - return nil -} diff --git a/b2-manager/core/logger.go b/b2-manager/core/logger.go index 88d74342ba..d205532362 100644 --- a/b2-manager/core/logger.go +++ b/b2-manager/core/logger.go @@ -24,9 +24,7 @@ func InitLogger() error { return fmt.Errorf("failed to create app config dir: %v", err) } - logPath := filepath.Join(appConfigDir, "b2m.log") - // User requested logs in current directory - // logPath := "b2m.log" + logPath := "b2m.log" file, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return fmt.Errorf("failed to open log file: %v", err) diff --git a/b2-manager/core/metadata.go b/b2-manager/core/metadata.go index 188d5b8518..3267fda371 100644 --- a/b2-manager/core/metadata.go +++ b/b2-manager/core/metadata.go @@ -2,7 +2,6 @@ package core import ( "context" - "crypto/sha256" "encoding/json" "fmt" "io" @@ -11,6 +10,9 @@ import ( "path/filepath" "runtime" "strings" + "sync" + + "github.com/zeebo/xxh3" "b2m/config" "b2m/model" @@ -21,9 +23,9 @@ func GenerateLocalMetadata(dbName string, uploadDuration float64, status string) localPath := filepath.Join(config.AppConfig.LocalDBDir, dbName) // Calculate hash - hash, err := CalculateSHA256(localPath) + hash, err := CalculateXXHash(localPath) if err != nil { - LogError("GenerateLocalMetadata: calculateSHA256 failed for %s: %v", dbName, err) + LogError("GenerateLocalMetadata: CalculateXXHash failed for %s: %v", dbName, err) return nil, fmt.Errorf("failed to calculate hash: %w", err) } @@ -59,22 +61,127 @@ func GenerateLocalMetadata(dbName string, uploadDuration float64, status string) return meta, nil } -// CalculateSHA256 calculates the SHA256 hash of a file -func CalculateSHA256(filePath string) (string, error) { +// cachedHash stores the hash and file stat info to avoid re-hashing unchanged files +type cachedHash struct { + Hash string + ModTime int64 + Size int64 +} + +var ( + fileHashCache = make(map[string]cachedHash) + fileHashCacheMu sync.RWMutex +) + +// CalculateXXHash calculates the xxHash (as hex string) of a file with caching +func CalculateXXHash(filePath string) (string, error) { + info, err := os.Stat(filePath) + if err != nil { + LogError("CalculateXXHash: Failed to stat file %s: %v", filePath, err) + return "", err + } + + // Check cache + fileHashCacheMu.RLock() + cached, ok := fileHashCache[filePath] + fileHashCacheMu.RUnlock() + + if ok && cached.ModTime == info.ModTime().UnixNano() && cached.Size == info.Size() { + return cached.Hash, nil + } else { + LogInfo("Cache miss for %s. Cached: %v, Current: ModTime=%d, Size=%d", filepath.Base(filePath), ok, info.ModTime().UnixNano(), info.Size()) + } + + // Calculate hash f, err := os.Open(filePath) if err != nil { - LogError("calculateSHA256: Failed to open file %s: %v", filePath, err) + LogError("CalculateXXHash: Failed to open file %s: %v", filePath, err) return "", err } defer f.Close() - h := sha256.New() + // Use streaming digest + h := xxh3.New() if _, err := io.Copy(h, f); err != nil { - LogError("calculateSHA256: io.Copy failed for %s: %v", filePath, err) + LogError("CalculateXXHash: io.Copy failed for %s: %v", filePath, err) return "", err } - return fmt.Sprintf("%x", h.Sum(nil)), nil + // Sum64 returns uint64, format as hex string for compatibility + hash := fmt.Sprintf("%016x", h.Sum64()) + + // Update cache + fileHashCacheMu.Lock() + fileHashCache[filePath] = cachedHash{ + Hash: hash, + ModTime: info.ModTime().UnixNano(), + Size: info.Size(), + } + fileHashCacheMu.Unlock() + + return hash, nil +} + +// LoadHashCache loads the hash cache from disk +func LoadHashCache() error { + cachePath := filepath.Join(config.AppConfig.LocalAnchorDir, "hash.json") + if _, err := os.Stat(cachePath); os.IsNotExist(err) { + return nil // No cache exists yet + } + + data, err := os.ReadFile(cachePath) + if err != nil { + LogError("LoadHashCache: Failed to read cache file: %v", err) + return err + } + + fileHashCacheMu.Lock() + defer fileHashCacheMu.Unlock() + + if err := json.Unmarshal(data, &fileHashCache); err != nil { + LogError("LoadHashCache: Failed to unmarshal cache: %v", err) + // Don't fail hard, just start with empty cache + return nil + } + + LogInfo("Loaded %d entries from hash cache", len(fileHashCache)) + // LogInfo("Loaded %d entries from hash cache at %s", len(fileHashCache), cachePath) + // for k, v := range fileHashCache { + // LogInfo(" - Loaded: %s -> %s (Mod: %d, Size: %d)", filepath.Base(k), v.Hash, v.ModTime, v.Size) + // } + return nil +} + +// SaveHashCache saves the hash cache to disk +func SaveHashCache() error { + cachePath := filepath.Join(config.AppConfig.LocalAnchorDir, "hash.json") + + // Ensure directory exists + if err := os.MkdirAll(config.AppConfig.LocalAnchorDir, 0755); err != nil { + LogError("SaveHashCache: Failed to create directory: %v", err) + return err + } + + fileHashCacheMu.RLock() + data, err := json.MarshalIndent(fileHashCache, "", " ") + fileHashCacheMu.RUnlock() + + if err != nil { + LogError("SaveHashCache: Failed to marshal cache: %v", err) + return err + } + + if err := os.WriteFile(cachePath, data, 0644); err != nil { + LogError("SaveHashCache: Failed to write file: %v", err) + return err + } + + LogInfo("Saved %d entries to hash cache", len(fileHashCache)) + // LogInfo("Saving %d entries to hash cache:", len(fileHashCache)) + // for k := range fileHashCache { + // LogInfo(" - Saving: %s", filepath.Base(k)) + // } + return nil } // DownloadAndLoadMetadata syncs metadata from remote to local cache and loads it @@ -89,22 +196,10 @@ func DownloadAndLoadMetadata() (map[string]*model.Metadata, error) { // 2. Sync remote metadata to local LogInfo("Syncing metadata from %s to %s", config.AppConfig.VersionDir, config.AppConfig.LocalVersionDir) - // Safety Check: Verify remote directory is accessible - checkCmd := exec.CommandContext(GetContext(), "rclone", "lsf", config.AppConfig.VersionDir, "--max-depth", "1") - if err := checkCmd.Run(); err != nil { - // Exit status 3 means directory not found (common for new buckets) - if exitError, ok := err.(*exec.ExitError); ok && exitError.ExitCode() == 3 { - LogInfo("DownloadAndLoadMetadata: Remote metadata directory not found (new bucket?). initializing empty.") - return make(map[string]*model.Metadata), nil - } - LogError("DownloadAndLoadMetadata: Remote metadata directory inaccessible: %v", err) - return nil, fmt.Errorf("remote metadata inaccessible (safety check failed): %w", err) - } - - // rclone sync remote:dir local:dir - cmd := exec.CommandContext(GetContext(), "rclone", "sync", config.AppConfig.VersionDir, config.AppConfig.LocalVersionDir) - if err := cmd.Run(); err != nil { - LogError("DownloadAndLoadMetadata: rclone sync failed: %v", err) + // Use RcloneSync helper + if err := RcloneSync(config.AppConfig.VersionDir, config.AppConfig.LocalVersionDir); err != nil { + // Log and fail as sync is critical for accurate status + LogError("DownloadAndLoadMetadata: RcloneSync failed: %v", err) return nil, fmt.Errorf("failed to sync metadata: %w", err) } @@ -124,7 +219,6 @@ func DownloadAndLoadMetadata() (map[string]*model.Metadata, error) { // Extract DB name dbName := strings.TrimSuffix(info.Name(), ".metadata.json") + ".db" - LogInfo("DownloadAndLoadMetadata: Found metadata file %s -> DB %s", info.Name(), dbName) // Read file content, err := os.ReadFile(path) @@ -250,20 +344,12 @@ func HandleBatchMetadataGeneration() { local, err := getLocalDBs() if err != nil { // fmt.Printf("❌ Failed to list local databases: %v\n", err) - LogError("❌ Failed to list local databases: %v", err) LogError("BatchMetadata: Failed to list local databases: %v", err) return } - // We only care about local for generation, but AggregateDBs expects both. - // We can pass empty remote if we don't care about remote-only ones. - // Or we can just iterate local list directly? - // The original code used getAllDBs which returns DBInfo. - // Let's just use local list directly, it's simpler. - if len(local) == 0 { // fmt.Println("⚠️ No local databases found.") - LogInfo("⚠️ No local databases found.") LogInfo("BatchMetadata: No local databases found") return } @@ -280,7 +366,6 @@ func HandleBatchMetadataGeneration() { successCount := 0 for _, dbName := range local { - // padding := strings.Repeat(" ", maxLen-len(dbName)) // fmt.Printf("Processing %s... %s", dbName, padding) LogInfo("Processing %s...", dbName) @@ -288,7 +373,6 @@ func HandleBatchMetadataGeneration() { meta, err := GenerateLocalMetadata(dbName, 0, "success") if err != nil { // fmt.Printf("❌ Failed to generate: %v\n", err) - LogError("❌ Failed to generate: %v", err) LogError("BatchMetadata: Failed to generate metadata for %s: %v", dbName, err) continue } @@ -297,18 +381,25 @@ func HandleBatchMetadataGeneration() { // Upload metadata if err := UploadMetadata(GetContext(), dbName, meta); err != nil { // fmt.Printf("❌ Failed to upload: %v\n", err) - LogError("❌ Failed to upload: %v", err) LogError("BatchMetadata: Failed to upload metadata for %s: %v", dbName, err) continue } + // Update local anchor (local-version) to match the new metadata + // This ensures we don't show "DB Outdated" immediately after generation + if err := UpdateLocalVersion(dbName, *meta); err != nil { + LogError("BatchMetadata: Failed to update local anchor for %s: %v", dbName, err) + // Non-critical but annoying, continue + } else { + LogInfo("BatchMetadata: Local anchor updated for %s", dbName) + } + // fmt.Println("✅ Done") LogInfo("✅ Done for %s", dbName) successCount++ } // fmt.Printf("\n✨ Completed! Successfully generated metadata for %d mixed databases.\n", successCount) - LogInfo("✨ Completed! Successfully generated metadata for %d mixed databases.", successCount) LogInfo("Batch metadata generation completed. Success: %d", successCount) } @@ -320,7 +411,7 @@ func ConstructVerifiedAnchor(dbName string) error { // 1. Calculate Local Hash localDBPath := filepath.Join(config.AppConfig.LocalDBDir, dbName) - localHash, err := CalculateSHA256(localDBPath) + localHash, err := CalculateXXHash(localDBPath) if err != nil { LogError("ConstructVerifiedAnchor: Failed to calculate local hash for %s: %v", dbName, err) return fmt.Errorf("failed to calculate local hash: %w", err) diff --git a/b2-manager/core/rclone.go b/b2-manager/core/rclone.go index 1501b02541..cb519b885b 100644 --- a/b2-manager/core/rclone.go +++ b/b2-manager/core/rclone.go @@ -44,9 +44,9 @@ func checkDBDiscoveryAndSync() error { LogInfo("No local databases found.") LogInfo("No local databases found.") - remoteDBs, err := getRemoteDBs() + remoteDBs, _, err := LsfRclone(context.Background()) if err != nil { - LogError("getRemoteDBs failed: %v", err) + LogError("LsfRclone failed: %v", err) return nil } @@ -66,27 +66,6 @@ func checkDBDiscoveryAndSync() error { } return nil } - -func getRemoteDBs() ([]string, error) { - cmd := exec.CommandContext(GetContext(), "rclone", "lsf", config.AppConfig.RootBucket, "--files-only", "--include", "*.db") - LogInfo("getRemoteDBs: Running command: %v", cmd.Args) - out, err := cmd.Output() - if err != nil { - LogError("rclone lsf failed in getRemoteDBs: %v", err) - return nil, err - } - LogInfo("getRemoteDBs: Output (len %d): %s", len(out), strings.TrimSpace(string(out))) - - lines := strings.Split(string(out), "\n") - var names []string - for _, line := range lines { - trimmed := strings.TrimSpace(line) - if trimmed != "" { - names = append(names, trimmed) - } - } - return names, nil -} func checkFileChanged(dbName string) (bool, error) { localPath := filepath.Join(config.AppConfig.LocalDBDir, dbName) remotePath := config.AppConfig.RootBucket + dbName @@ -104,41 +83,11 @@ func checkFileChanged(dbName string) (bool, error) { return false, nil // No change } -// UploadDatabase uploads a single database to remote -// Returns the uploaded metadata on success, or nil/error -func UploadDatabase(ctx context.Context, dbName string, quiet bool, onProgress func(model.RcloneProgress)) (*model.Metadata, error) { - // Check for changes before uploading - changed, err := checkFileChanged(dbName) - if err != nil { - if !quiet { - // fmt.Printf("⚠️ Could not verify changes: %v. Proceeding with upload.\n", err) - LogError("⚠️ Could not verify changes: %v. Proceeding with upload.", err) - } - LogError("Could not verify changes for %s: %v", dbName, err) - changed = true // Fallback to upload - } - - if !changed { - if !quiet { - // fmt.Println("No change found in this db skipping Upload") - LogInfo("No change found in this db skipping Upload") - } - LogInfo("Skipping upload for %s (no changes)", dbName) - return nil, nil - } - - if !quiet && onProgress == nil { - // fmt.Printf("⬆ Uploading %s to Backblaze B2...\n", dbName) - LogInfo("⬆ Uploading %s to Backblaze B2...", dbName) - } - LogInfo("Uploading %s to Backblaze B2...", dbName) - localPath := filepath.Join(config.AppConfig.LocalDBDir, dbName) - - startTime := time.Now() - +// RcloneCopy copies source to destination using rclone copy with options +func RcloneCopy(ctx context.Context, src, dst, description string, quiet bool, onProgress func(model.RcloneProgress)) error { rcloneArgs := []string{"copy", - localPath, - config.AppConfig.RootBucket, + src, + dst, "--checksum", "--retries", "20", "--low-level-retries", "30", @@ -154,55 +103,96 @@ func UploadDatabase(ctx context.Context, dbName string, quiet bool, onProgress f if !quiet || onProgress != nil { stderr, err := cmd.StderrPipe() if err != nil { - LogError("Failed to get stderr pipe in UploadDatabase: %v", err) - return nil, fmt.Errorf("failed to get stderr pipe: %w", err) + LogError("RcloneCopy: Failed to get stderr pipe: %v", err) + return fmt.Errorf("failed to get stderr pipe: %w", err) } if err := cmd.Start(); err != nil { - LogError("Upload start failed in UploadDatabase: %v", err) - return nil, fmt.Errorf("upload start failed: %w", err) + LogError("RcloneCopy: Start failed: %v", err) + return fmt.Errorf("rclone start failed: %w", err) } - info, err := os.Stat(localPath) + // Calculate total size if possible for default tracker var totalSize int64 - if err == nil { + if info, err := os.Stat(src); err == nil && !info.IsDir() { totalSize = info.Size() } if onProgress != nil { go ParseRcloneOutput(stderr, onProgress) } else { - TrackProgress(stderr, totalSize, "Uploading "+dbName) + // Default tracker + desc := description + if desc == "" { + desc = "Copying..." + } + TrackProgress(stderr, totalSize, desc) } if err := cmd.Wait(); err != nil { if ctx.Err() != nil { - return nil, fmt.Errorf("upload cancelled") + return fmt.Errorf("cancelled") } - LogError("Upload failed in UploadDatabase (wait): %v", err) - return nil, fmt.Errorf("upload failed: %w", err) + LogError("RcloneCopy: Wait failed: %v", err) + return fmt.Errorf("rclone copy failed: %w", err) } } else { if err := cmd.Run(); err != nil { if ctx.Err() != nil { - return nil, fmt.Errorf("upload cancelled") + return fmt.Errorf("cancelled") } - LogError("Upload failed in UploadDatabase (run): %v", err) - return nil, fmt.Errorf("upload failed: %w", err) + LogError("RcloneCopy: Run failed: %v", err) + return fmt.Errorf("rclone copy failed: %w", err) + } + } + return nil +} + +// UploadDatabase uploads a single database to remote +// Returns the uploaded metadata on success, or nil/error +func UploadDatabase(ctx context.Context, dbName string, quiet bool, onProgress func(model.RcloneProgress)) (*model.Metadata, error) { + // Check for changes before uploading + changed, err := checkFileChanged(dbName) + if err != nil { + if !quiet { + LogError("⚠️ Could not verify changes: %v. Proceeding with upload.", err) + } + LogError("Could not verify changes for %s: %v", dbName, err) + changed = true // Fallback to upload + } + + if !changed { + if !quiet { + LogInfo("No change found in this db skipping Upload") } + LogInfo("Skipping upload for %s (no changes)", dbName) + return nil, nil + } + + if !quiet && onProgress == nil { + LogInfo("⬆ Uploading %s to Backblaze B2...", dbName) + } + LogInfo("Uploading %s to Backblaze B2...", dbName) + localPath := filepath.Join(config.AppConfig.LocalDBDir, dbName) + + startTime := time.Now() + + // Use RcloneCopy with flat arguments + description := "Uploading " + dbName + if err := RcloneCopy(ctx, localPath, config.AppConfig.RootBucket, description, quiet, onProgress); err != nil { + LogError("UploadDatabase: RcloneCopy failed: %v", err) + return nil, err } uploadDuration := time.Since(startTime).Seconds() if !quiet { - // fmt.Println("📝 Generating metadata...") LogInfo("📝 Generating metadata...") } LogInfo("Generating metadata for %s", dbName) meta, err := GenerateLocalMetadata(dbName, uploadDuration, "success") if err != nil { if !quiet { - // fmt.Printf("⚠️ Failed to generate metadata: %v\n", err) LogError("⚠️ Failed to generate metadata: %v", err) } LogError("Failed to generate metadata for %s: %v", dbName, err) @@ -212,7 +202,6 @@ func UploadDatabase(ctx context.Context, dbName string, quiet bool, onProgress f meta, err = AppendEventToMetadata(dbName, meta) if err != nil { if !quiet { - // fmt.Printf("⚠️ Failed to append event: %v\n", err) LogError("⚠️ Failed to append event: %v", err) } LogError("Failed to append event to metadata for %s: %v", dbName, err) @@ -221,29 +210,15 @@ func UploadDatabase(ctx context.Context, dbName string, quiet bool, onProgress f if err := UploadMetadata(ctx, dbName, meta); err != nil { if !quiet { - // fmt.Printf("⚠️ Failed to upload metadata: %v\n", err) LogError("⚠️ Failed to upload metadata: %v", err) } LogError("Failed to upload metadata for %s: %v", dbName, err) return nil, err } else if !quiet { - // fmt.Println("✅ Metadata uploaded") LogInfo("✅ Metadata uploaded") } - // USER REQUIREMENT: Use common logic for both upload and download to ensure verified anchor. - // We just uploaded, so we are the source of truth. - // UploadMetadata (above) already updated the VERSION directory (Mirror) via rclone copyto. - // Now we construct the anchor from that Mirror + Local Hash to be consistent. - if err := ConstructVerifiedAnchor(dbName); err != nil { - LogError("UploadDatabase: Failed to construct verified anchor: %v", err) - // Non-fatal for upload itself, but bad for subsequent status checks. - } else { - LogInfo("UploadDatabase: Verified anchor created locally.") - } - if !quiet { - // fmt.Println("📢 Notifying Discord...") LogInfo("📢 Notifying Discord...") sendDiscord(fmt.Sprintf("✅ Database updated to B2: **%s**", dbName)) } else { @@ -350,6 +325,67 @@ func UnlockDatabase(ctx context.Context, dbName, owner string, force bool) error return nil } +// LsfRclone lists all files recursively from RootBucket to get DBs and Locks in one go +func LsfRclone(ctx context.Context) ([]string, map[string]model.LockEntry, error) { + // recursive list of root bucket + cmd := exec.CommandContext(ctx, "rclone", "lsf", "-R", config.AppConfig.RootBucket) + out, err := cmd.Output() + if err != nil { + LogError("LsfRclone input failed: %v", err) + return nil, nil, fmt.Errorf("failed to list remote files: %w", err) + } + + remoteDBs := []string{} + locks := make(map[string]model.LockEntry) + + lines := strings.Split(string(out), "\n") + for _, line := range lines { + line = strings.TrimSpace(line) + if line == "" { + continue + } + + // 1. Check for DBs in root (no slashes) + if strings.HasSuffix(line, ".db") && !strings.Contains(line, "/") { + remoteDBs = append(remoteDBs, line) + continue + } + + // 2. Check for Locks in lock/ dir + if strings.HasPrefix(line, "lock/") && strings.HasSuffix(line, ".lock") { + // Extract filename from path "lock/filename" + filename := strings.TrimPrefix(line, "lock/") + + // Parse lock filename: dbname.owner.hostname.type.lock + // We can reuse logic from FetchLocks but adapted + parts := strings.Split(filename, ".") + if len(parts) < 4 { + continue + } + + lockType := parts[len(parts)-1] //lock + + // We only care about .lock files now + if lockType != "lock" { + continue + } + + hostname := parts[len(parts)-2] + owner := parts[len(parts)-3] + dbName := strings.Join(parts[:len(parts)-3], ".") + + locks[dbName] = model.LockEntry{ + DBName: dbName, + Owner: owner, + Hostname: hostname, + Type: lockType, + } + } + } + + return remoteDBs, locks, nil +} + // FetchLocks lists all files in LockDir and parses them func FetchLocks(ctx context.Context) (map[string]model.LockEntry, error) { cmd := exec.CommandContext(ctx, "rclone", "lsf", config.AppConfig.LockDir) @@ -382,12 +418,21 @@ func FetchLocks(ctx context.Context) (map[string]model.LockEntry, error) { } locks[dbName] = model.LockEntry{ - DBName: dbName, - Owner: owner, - Hostname: hostname, - Type: lockType, - ExpiresAt: time.Now().Add(24 * time.Hour), + DBName: dbName, + Owner: owner, + Hostname: hostname, + Type: lockType, } } return locks, nil } + +// RcloneSync syncs source to destination using rclone sync +func RcloneSync(src, dst string) error { + cmd := exec.CommandContext(GetContext(), "rclone", "sync", src, dst) + if err := cmd.Run(); err != nil { + LogError("RcloneSync failed (src=%s, dst=%s): %v", src, dst, err) + return fmt.Errorf("rclone sync failed: %w", err) + } + return nil +} diff --git a/b2-manager/core/status.go b/b2-manager/core/status.go index 1f518e8764..faed106745 100644 --- a/b2-manager/core/status.go +++ b/b2-manager/core/status.go @@ -6,7 +6,6 @@ import ( "os" "path/filepath" "strings" - "sync" "b2m/config" "b2m/model" @@ -123,9 +122,10 @@ func CalculateDBStatus(db model.DBInfo, locks map[string]model.LockEntry, remote // PHASE 4: CONSISTENCY CHECK (CONTENT) // We compare the actual Local File Hash vs Remote Metadata Hash. // ------------------------------------------------------------------------- + // ------------------------------------------------------------------------- if db.ExistsLocal && hasRemoteMeta { localPath := filepath.Join(config.AppConfig.LocalDBDir, db.Name) - localHash, err := CalculateSHA256(localPath) + localHash, err := CalculateXXHash(localPath) if err != nil { LogError("Status Check: Failed to verify %s: %v", db.Name, err) return model.StatusCodeErrorReadLocal, model.DBStatuses.ErrorReadLocal.Text, text.Colors{model.DBStatuses.ErrorReadLocal.Color} @@ -168,8 +168,8 @@ func CalculateDBStatus(db model.DBInfo, locks map[string]model.LockEntry, remote return model.StatusCodeUnknown, model.DBStatuses.Unknown.Text, text.Colors{model.DBStatuses.Unknown.Color} } -// FetchDBStatusData fetches all databases, locks, and metadata in parallel, then calculates status for each -func FetchDBStatusData(ctx context.Context) ([]model.DBStatusInfo, error) { +// FetchDBStatusData fetches all databases, locks, and metadata sequentially, then calculates status for each +func FetchDBStatusData(ctx context.Context, onProgress func(string)) ([]model.DBStatusInfo, error) { // Check for cancellation select { case <-ctx.Done(): @@ -177,60 +177,67 @@ func FetchDBStatusData(ctx context.Context) ([]model.DBStatusInfo, error) { default: } - var ( - wg sync.WaitGroup - localDBs []string - remoteDBs []string - locks map[string]model.LockEntry - remoteMetas map[string]*model.Metadata - errLocal error - errRemote error - errLocks error - errMetas error - ) + // 1. Get Local DBs (Fast) + if onProgress != nil { + onProgress("Scanning local databases...") + } + localDBs, errLocal := getLocalDBs() + if errLocal != nil { + LogError("Failed to get local DBs: %v", errLocal) + return nil, errLocal + } + LogInfo("FetchDBStatusData: Found %d local DBs", len(localDBs)) - wg.Add(4) + select { + case <-ctx.Done(): + return nil, fmt.Errorf("cancelled") + default: + } - // 1. Get Local DBs (Fast) - go func() { - defer wg.Done() - localDBs, errLocal = getLocalDBs() - }() - - // 2. Get Remote DBs (Network) - go func() { - defer wg.Done() - remoteDBs, errRemote = getRemoteDBs() - }() - - // 3. Fetch Locks (Network) - go func() { - defer wg.Done() - locks, errLocks = FetchLocks(ctx) - }() + // 2. Fetch Remote State (DBs + Locks) - Optimized + if onProgress != nil { + onProgress("Fetching remote state...") + } + remoteDBs, locks, errRemote := LsfRclone(ctx) + if errRemote != nil { + LogError("Failed to fetch remote state: %v", errRemote) + return nil, fmt.Errorf("failed to fetch remote state: %w", errRemote) + } + LogInfo("FetchDBStatusData: Found %d remote DBs and %d active locks", len(remoteDBs), len(locks)) + + select { + case <-ctx.Done(): + return nil, fmt.Errorf("cancelled") + default: + } + + // 3. (Skipped - Combined above) // 4. Download Metadata (Network) - go func() { - defer wg.Done() - remoteMetas, errMetas = DownloadAndLoadMetadata() - }() + if onProgress != nil { + onProgress("Syncing metadata...") + } + remoteMetas, errMetas := DownloadAndLoadMetadata() + if errMetas != nil { + LogError("Failed to sync/load metadata: %v", errMetas) + return nil, fmt.Errorf("failed to download metadata: %w", errMetas) + } + LogInfo("FetchDBStatusData: Loaded metadata for %d databases", len(remoteMetas)) // 5. Load Local-Version Metadata (Local IO) - var localVersions map[string]*model.Metadata + if onProgress != nil { + onProgress("Reading local history...") + } + localVersions := make(map[string]*model.Metadata) var errLocalVersions error - wg.Add(1) - go func() { - defer wg.Done() - localVersions = make(map[string]*model.Metadata) - // Iterate over local DBs list (wait, we don't have it here yet, it runs in parallel) - // We can scan the local-version directory directly. - entries, err := os.ReadDir(config.AppConfig.LocalAnchorDir) - if err != nil { - if !os.IsNotExist(err) { - errLocalVersions = err - } - return + + // We scan the local-version directory directly. + entries, err := os.ReadDir(config.AppConfig.LocalAnchorDir) + if err != nil { + if !os.IsNotExist(err) { + errLocalVersions = err } + } else { for _, entry := range entries { if !entry.IsDir() && filepath.Ext(entry.Name()) == ".json" { // Construct DB name from metadata filename (helper does logic, but we need dbname for map key) @@ -248,49 +255,17 @@ func FetchDBStatusData(ctx context.Context) ([]model.DBStatusInfo, error) { } } } - }() - - // Wait for all - doneCh := make(chan struct{}) - go func() { - wg.Wait() - close(doneCh) - }() - - select { - case <-ctx.Done(): - return nil, fmt.Errorf("cancelled") - case <-doneCh: - // Completed - } - - // Check errors - if errLocal != nil { - LogError("Failed to get local DBs: %v", errLocal) - return nil, errLocal - } - LogInfo("FetchDBStatusData: Found %d local DBs", len(localDBs)) - - if errRemote != nil { - LogError("Failed to get remote DBs: %v", errRemote) - // Critical failure: Without remote list, we cannot determine sync status accurately. - return nil, fmt.Errorf("failed to list remote databases: %w", errRemote) - } - LogInfo("FetchDBStatusData: Found %d remote DBs", len(remoteDBs)) - - if errLocks != nil { - LogError("Failed to fetch locks: %v", errLocks) - return nil, fmt.Errorf("failed to fetch locks: %w", errLocks) } - LogInfo("FetchDBStatusData: Found %d active locks", len(locks)) - if errMetas != nil { - LogError("Failed to sync/load metadata: %v", errMetas) - return nil, fmt.Errorf("failed to download metadata: %w", errMetas) + if errLocalVersions != nil { + LogInfo("FetchDBStatusData: Failed to read local versions: %v", errLocalVersions) + // Non-critical, just means we can't do smart status } - LogInfo("FetchDBStatusData: Loaded metadata for %d databases", len(remoteMetas)) // Aggregate + if onProgress != nil { + onProgress("Calculating status...") + } allDBs, err := AggregateDBs(localDBs, remoteDBs) if err != nil { LogError("Aggregation failed: %v", err) @@ -298,11 +273,6 @@ func FetchDBStatusData(ctx context.Context) ([]model.DBStatusInfo, error) { } LogInfo("FetchDBStatusData: Aggregated total %d databases", len(allDBs)) - if errLocalVersions != nil { - LogInfo("FetchDBStatusData: Failed to read local versions: %v", errLocalVersions) - // Non-critical, just means we can't do smart status - } - // Calculate status for each database var statusData []model.DBStatusInfo for _, db := range allDBs { diff --git a/b2-manager/docs/concurrent/parallelism.md b/b2-manager/docs/concurrent/parallelism.md index a75b8b682a..c7f8b34185 100644 --- a/b2-manager/docs/concurrent/parallelism.md +++ b/b2-manager/docs/concurrent/parallelism.md @@ -6,7 +6,7 @@ This document details the usage of parallelism and concurrency within the `b2m-c | Component | File | Mechanism | Purpose | | :-------------------- | :----------------- | :----------------- | :------------------------------------------------------------------------------------------------------ | -| **Status Collection** | `core/status.go` | `sync.WaitGroup` | Fetching local files, remote files, locks, and metadata in parallel to reduce wait times. | +| **Status Collection** | `core/status.go` | Sequential | Fetching local files, remote files, locks, and metadata sequentially to reduce system load. | | **UI Main Loop** | `ui/ui.go` | `gocui` (Internal) | Handles terminal drawing and user input events. | | **UI Updates** | `ui/ui.go` | `sync.Mutex` | Protects shared state (`dbs`, `activeOps`, `dbStatus`) from concurrent access by background operations. | | **Operations** | `ui/ui.go` | `go func` | Offloads long-running tasks (Upload/Download) to background threads to keep the UI responsive. | @@ -19,13 +19,12 @@ This document details the usage of parallelism and concurrency within the `b2m-c - **Function**: `FetchDBStatusData` - **Implementation**: - - Uses `sync.WaitGroup` to wait for 4 concurrent operations: + - Executes operations **sequentially** to reduce "thundering herd" effect on CPU and network: 1. `getLocalDBs()` - 2. `getRemoteDBs()` - 3. `FetchLocks()` - 4. `DownloadAndLoadMetadata()` - - **Safety**: Each goroutine writes to independent variables. Errors are captured independently. - - **Verdict**: **Safe**. No race conditions detected. + 2. `LsfRclone()` (Fetches DBs and Locks in one go) + 3. `DownloadAndLoadMetadata()` + - **Reason**: Parallel execution previously caused high system load and rclone instability. Sequential execution relies on optimized single-call fetching (`LsfRclone`) to maintain performance. + - **Verdict**: **Safe**. No concurrency issues. ### 2. UI State Management (`ui/ui.go`) diff --git a/b2-manager/docs/core.md b/b2-manager/docs/core.md index ce8f0839f9..be9aad92aa 100644 --- a/b2-manager/docs/core.md +++ b/b2-manager/docs/core.md @@ -16,13 +16,13 @@ There are **15** functions defined in this file: 4. **BootstrapSystem**: Performs initial checks for database discovery and synchronization. 5. **checkDBDiscoveryAndSync**: (Internal) Orchestrates local and remote database discovery. 6. **getLocalDBs**: Lists local database files. -7. **getRemoteDBs**: Lists remote database files using `rclone lsf`. +7. **LsfRclone**: Lists all files recursively using `rclone lsf -R` to retrieve both databases and locks efficiently. 8. **checkSyncStatus**: (Internal) Checks synchronization status for all files using `rclone check`. 9. **checkFileChanged**: Checks if a specific file has changed between local and remote. 10. **SyncDatabase** / **DownloadDatabase**: (Moved to `core/download.go`) Downloads a single database. 11. **DownloadAllDatabases**: (Moved to `core/download.go`) Downloads all databases. 12. **UploadDatabase**: Uploads a single database from local to remote. **Returns** the uploaded `model.Metadata` object for anchor persistence. -13. **LockDatabase**: Creates a lock file (`.lock` or `.reserve`) on the remote. +13. **LockDatabase**: Creates a lock file (`.lock`) on the remote. 14. **UnlockDatabase**: Removes a lock file from the remote. 15. **FetchLocks**: Retrieves and parses all active lock files from the remote. @@ -33,7 +33,7 @@ The code executes the following **6** distinct `rclone` commands: | Command | Usage Context | Description | | :----------- | :------------------------------------------------------- | :--------------------------------------------------------------------------------------- | | `version` | `RunInit` | Checks the installed version of rclone. | -| `lsf` | `getRemoteDBs`, `FetchLocks` | Lists files in the remote bucket (databases or locks). | +| `lsf` | `LsfRclone`, `FetchLocks` | Lists files in the remote bucket (databases or locks). | | `check` | `checkSyncStatus`, `checkFileChanged` | Compares local files with remote files to detect changes. | | `copyto` | `LockDatabase` | Copies a specific file to a specific destination (used for naming lock files). | | `copy` | `SyncDatabase`, `DownloadAllDatabases`, `UploadDatabase` | Copies files from source to destination (used for sync, bulk downloads, and DB uploads). | diff --git a/b2-manager/docs/workflow/download.md b/b2-manager/docs/workflow/download.md index 2697f99695..952d61204d 100644 --- a/b2-manager/docs/workflow/download.md +++ b/b2-manager/docs/workflow/download.md @@ -28,7 +28,7 @@ Before starting, `core.ValidateAction` enforces safeguards. Once the download is successful, we anchor the local state to the remote state. -1. **Calculate Local Hash**: Computes SHA256 of the newly downloaded file. +1. **ConstructVerifiedAnchor**: This function calculates the SHA256 of the newly downloaded file. 2. **Fetch Remote Context**: Reads the latest metadata from the local mirror (`db/all_dbs/version/`) to get the `Timestamp` and `Uploader`. 3. **Construct Anchor**: Creates a new metadata object combining the **Local Hash** + **Remote Timestamp**. 4. **Save**: Writes to `local-versions/`. diff --git a/b2-manager/docs/workflow/hashing.md b/b2-manager/docs/workflow/hashing.md new file mode 100644 index 0000000000..c4fbde52fb --- /dev/null +++ b/b2-manager/docs/workflow/hashing.md @@ -0,0 +1,59 @@ +# Hashing Workflow & Cache Structure + +The application uses xxHash (specifically xxh3) to calculate checksums of local database files. To improve performance, especially for large files that haven't changed, a persistent hash cache is used. + +## Cache Logic + +1. **Startup**: The application loads the existing cache from `hash.json` located in the local anchor directory (`db/all_dbs/local-version/`). +2. **Hashing Request**: When a hash is requested for a file: + - The system checks if the file path exists in the in-memory cache. + - It compares the file's current modification time (`ModTime`) and size (`Size`) against the cached values. + - **Hit**: If both match, the cached hash is returned immediately (no I/O is performed to read the file content). + - **Miss**: If there is no entry or the metadata doesn't match, the file is re-hashed, and the cache is updated. +3. **Shutdown**: The in-memory cache is saved back to `hash.json` when the application acts. + +## hash.json Structure + +The `hash.json` file is a JSON object where keys are absolute file paths and values are objects containing the hash and file metadata. + +### Schema + +```json +{ + "/absolute/path/to/file.db": { + "Hash": "string (hex encoded xxh3 checksum)", + "ModTime": int64 (nanoseconds), + "Size": int64 (bytes) + } +} +``` + +### Fields + +- **Hash**: The computed XXH3 hash of the file content, represented as a hex string. +- **ModTime**: The modification time of the file in nanoseconds (Unix timestamp). This is used to detect if the file has been modified since the last hash calculation. +- **Size**: The size of the file in bytes. This serves as a secondary check for file changes. + +## Usage in Code + +### Loading + +The cache is loaded at the start of the application in `main.go`: + +```go +if err := core.LoadHashCache(); err != nil { + core.LogInfo("Warning: Failed to load hash cache: %v", err) +} +``` + +### Saving + +The cache is saved deferred until the application exits: + +```go +defer func() { + if err := core.SaveHashCache(); err != nil { + core.LogError("Failed to save hash cache: %v", err) + } +}() +``` diff --git a/b2-manager/docs/workflow/status.md b/b2-manager/docs/workflow/status.md index 2aee81bba7..43ca4d0cdc 100644 --- a/b2-manager/docs/workflow/status.md +++ b/b2-manager/docs/workflow/status.md @@ -2,24 +2,19 @@ This document outlines the structural logic used by the CLI to determine the status of each database. The process is divided into two main phases: **Data Collection** and **Status Calculation**. -## 1. Parallel Data Collection +## 1. Sequential Data Collection -The `FetchDBStatusData` function (in `core/status.go`) orchestrates the gathering of all necessary information. It executes the following 5 operations in **parallel** using goroutines to minimize latency. +The `FetchDBStatusData` function (in `core/status.go`) orchestrates the gathering of all necessary information. It executes the following operations **sequentially** to ensure stability and reduce resource usage. ### A. List Local Databases - **Source**: `core/helpers.go` (`getLocalDBs`) - **Action**: Scans `config.LocalDBDir` for `*.db` files. -### B. List Remote Databases +### B. Fetch Remote State (DBs + Locks) -- **Source**: `core/helpers.go` (`getRemoteDBs`) -- **Action**: Executes `rclone lsf` on the B2 bucket to list `*.db` files. - -### C. Fetch Locks - -- **Source**: `core/rclone.go` -- **Action**: Lists files in the lock directory (`locks/`) on B2 and parses entries. +- **Source**: `core/rclone.go` (`LsfRclone`) +- **Action**: Executes `rclone lsf -R` on the B2 bucket to list `*.db` files and lock files recursively in a single call. ### D. Download Metadata diff --git a/b2-manager/docs/workflow/upload.md b/b2-manager/docs/workflow/upload.md index 08cb632def..8e4dc90ec2 100644 --- a/b2-manager/docs/workflow/upload.md +++ b/b2-manager/docs/workflow/upload.md @@ -27,8 +27,9 @@ The upload process is orchestrated by `PerformUpload` in `core/upload.go`. ### Phase 4: Anchor Update -1. **Sync Local State**: Updates the `local-versions/` anchor file with the metadata just uploaded. -2. **Result**: Ensures the system knows the local file is now identical to the remote (Status becomes "Synced"). +1. **Sync Local State**: `PerformUpload` receives the generated metadata from `UploadDatabase` and calls `UpdateLocalVersion` to update the `local-versions/` anchor file. +2. **Efficiency**: This avoids re-calculating the hash locally, as the metadata object already contains the verified hash used for the upload. +3. **Result**: Ensures the system knows the local file is now identical to the remote (Status becomes "Synced"). ### Phase 5: Finalization diff --git a/b2-manager/go.mod b/b2-manager/go.mod index a7602e2367..fe36552c3b 100644 --- a/b2-manager/go.mod +++ b/b2-manager/go.mod @@ -6,9 +6,11 @@ require ( github.com/BurntSushi/toml v1.6.0 github.com/jedib0t/go-pretty/v6 v6.7.8 github.com/jroimartin/gocui v0.5.0 + github.com/zeebo/xxh3 v1.1.0 ) require ( + github.com/klauspost/cpuid/v2 v2.2.10 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect github.com/nsf/termbox-go v1.1.1 // indirect github.com/rivo/uniseg v0.4.7 // indirect diff --git a/b2-manager/go.sum b/b2-manager/go.sum index 2d4e7d89f8..058cf3c850 100644 --- a/b2-manager/go.sum +++ b/b2-manager/go.sum @@ -6,6 +6,8 @@ github.com/jedib0t/go-pretty/v6 v6.7.8 h1:BVYrDy5DPBA3Qn9ICT+PokP9cvCv1KaHv2i+Hc github.com/jedib0t/go-pretty/v6 v6.7.8/go.mod h1:YwC5CE4fJ1HFUDeivSV1r//AmANFHyqczZk+U6BDALU= github.com/jroimartin/gocui v0.5.0 h1:DCZc97zY9dMnHXJSJLLmx9VqiEnAj0yh0eTNpuEtG/4= github.com/jroimartin/gocui v0.5.0/go.mod h1:l7Hz8DoYoL6NoYnlnaX6XCNR62G7J5FfSW5jEogzaxE= +github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= +github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= @@ -18,6 +20,10 @@ github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs= +github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s= golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.29.0 h1:L6pJp37ocefwRRtYPKSWOWzOtWSxVajvz2ldH/xi3iU= diff --git a/b2-manager/main.go b/b2-manager/main.go index 0d35239a36..4475d99b27 100644 --- a/b2-manager/main.go +++ b/b2-manager/main.go @@ -23,6 +23,16 @@ func main() { os.Exit(1) } + // Load hash cache + if err := core.LoadHashCache(); err != nil { + core.LogInfo("Warning: Failed to load hash cache: %v", err) + } + defer func() { + if err := core.SaveHashCache(); err != nil { + core.LogError("Failed to save hash cache: %v", err) + } + }() + core.LogInfo("Configuration loaded successfully") core.LogInfo("RootBucket: %s", config.AppConfig.RootBucket) core.LogInfo("DiscordWebhookURL: %s", config.AppConfig.DiscordWebhookURL) diff --git a/b2-manager/model/types.go b/b2-manager/model/types.go index 603eb50ec8..d08ca86a81 100644 --- a/b2-manager/model/types.go +++ b/b2-manager/model/types.go @@ -49,11 +49,10 @@ type RcloneProgress struct { // LockEntry represents a lock file on B2 type LockEntry struct { - DBName string - Owner string - Hostname string - Type string // "reserve" or "lock" - ExpiresAt time.Time + DBName string + Owner string + Hostname string + Type string // "lock" } // Metadata represents the synchronization state of a database @@ -121,11 +120,11 @@ var ( // Local-only Logic DBStatusNewLocal = DBStatusDefinition{StatusCodeNewLocal, "Ready To Upload ⬆️", text.FgCyan} // Exists locally only. Needs upload. - DBStatusUploadCancelled = DBStatusDefinition{StatusCodeUploadCancelled, "Ready To Upload ⬆️", text.FgCyan} + DBStatusUploadCancelled = DBStatusDefinition{StatusCodeUploadCancelled, "Ready To Upload ⬆️", text.FgCyan} // Remote Logic DBStatusRecievedStaleMeta = DBStatusDefinition{StatusCodeRecievedStaleMeta, "Ready To Upload ⬆️", text.FgCyan} // There is no metadata present in new remote db. - DBStatusRemoteOnly = DBStatusDefinition{StatusCodeRemoteOnly, "Download DB ⬇️", text.FgYellow} // Exists remotely only. Needs download. + DBStatusRemoteOnly = DBStatusDefinition{StatusCodeRemoteOnly, "Download DB ⬇️", text.FgYellow} // Exists remotely only. Needs download. // Consistency Checks DBStatusNoMetadata = DBStatusDefinition{StatusCodeNoMetadata, "Ready To Upload ⬆️", text.FgCyan} diff --git a/b2-manager/ui/operations.go b/b2-manager/ui/operations.go index 2258b23a79..e6e44a902a 100644 --- a/b2-manager/ui/operations.go +++ b/b2-manager/ui/operations.go @@ -138,7 +138,7 @@ func (lc *ListController) onDownload(g *gocui.Gui, v *gocui.View) error { // var bar *progressbar.ProgressBar - err := core.DownloadDatabase(ctx, dbName, func(p model.RcloneProgress) { + err := core.DownloadDatabase(ctx, dbName, false, func(p model.RcloneProgress) { // Remove unused progressbar library usage var percent int var speedMB float64 diff --git a/b2-manager/ui/ui.go b/b2-manager/ui/ui.go index 5f3eff07f3..442aed4471 100644 --- a/b2-manager/ui/ui.go +++ b/b2-manager/ui/ui.go @@ -21,13 +21,14 @@ const ( // AppUI is the main application UI struct type AppUI struct { - g *gocui.Gui - dbs []model.DBStatusInfo - selected int - loading bool - ctx context.Context - cancel context.CancelFunc - mu sync.Mutex + g *gocui.Gui + dbs []model.DBStatusInfo + selected int + loading bool + statusMsg string + ctx context.Context + cancel context.CancelFunc + mu sync.Mutex activeOps map[string]context.CancelFunc dbStatus map[string]UIStatus @@ -361,6 +362,7 @@ func (app *AppUI) startOperation(opName string, op func(context.Context, string) func (app *AppUI) refreshStatus() { app.mu.Lock() app.loading = true + app.statusMsg = "Initializing..." app.mu.Unlock() // Update immediately to show spinner @@ -370,13 +372,22 @@ func (app *AppUI) refreshStatus() { }) go func() { - dbs, err := core.FetchDBStatusData(app.ctx) + dbs, err := core.FetchDBStatusData(app.ctx, func(msg string) { + app.mu.Lock() + app.statusMsg = msg + app.mu.Unlock() + app.g.Update(func(g *gocui.Gui) error { + app.renderStatusLine(g) + return nil + }) + }) app.mu.Lock() if err == nil { app.dbs = dbs } app.loading = false + app.statusMsg = "" app.mu.Unlock() app.g.Update(func(g *gocui.Gui) error { @@ -441,7 +452,11 @@ func (app *AppUI) renderStatusLine(g *gocui.Gui) { // Calculate frame idx := (time.Now().UnixMilli() / 50) % int64(len(spinnerFrames)) spinner := spinnerFrames[idx] - fmt.Fprintf(v, " %s Fetching status...", spinner) + msg := app.statusMsg + if msg == "" { + msg = "Fetching status..." + } + fmt.Fprintf(v, " %s %s", spinner, msg) } else { // Show nothing or 'Ready' // fmt.Fprint(v, " Ready") From 067ad2ec27d8c0ec3037320e6ddf55ad2f0bf1f1 Mon Sep 17 00:00:00 2001 From: Ganesh Kumar Date: Mon, 9 Feb 2026 18:47:21 +0530 Subject: [PATCH 2/5] Restructure Local B2M Data Paths --- b2-manager/config/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/b2-manager/config/config.go b/b2-manager/config/config.go index 5557d7cc93..14152c254e 100644 --- a/b2-manager/config/config.go +++ b/b2-manager/config/config.go @@ -103,8 +103,8 @@ func InitializeConfig() error { } AppConfig.LocalDBDir = filepath.Join(AppConfig.ProjectRoot, "db", "all_dbs") - AppConfig.LocalVersionDir = filepath.Join(AppConfig.ProjectRoot, "db", "all_dbs", "version") - AppConfig.LocalAnchorDir = filepath.Join(AppConfig.ProjectRoot, "db", "all_dbs", "local-version") + AppConfig.LocalVersionDir = filepath.Join(AppConfig.LocalDBDir, ".b2m", "version") + AppConfig.LocalAnchorDir = filepath.Join(AppConfig.LocalDBDir, ".b2m", "local-version") // Initialize logging if needed, or other startup tasks return nil From 508e52dec3325b45fd146895cb75997c70a41018 Mon Sep 17 00:00:00 2001 From: Ganesh Kumar Date: Mon, 9 Feb 2026 19:38:02 +0530 Subject: [PATCH 3/5] Refactor Configuration Initialization --- b2-manager/config/config.go | 90 ++++++++++++++++++++----------------- 1 file changed, 50 insertions(+), 40 deletions(-) diff --git a/b2-manager/config/config.go b/b2-manager/config/config.go index 14152c254e..f9d2c3c14f 100644 --- a/b2-manager/config/config.go +++ b/b2-manager/config/config.go @@ -36,13 +36,6 @@ var AppConfig = Config{ ToolVersion: "v1.0", } -// Sync Status Constants -const ( - SyncStatusLocalOnly = "+" - SyncStatusRemoteOnly = "-" - SyncStatusDifferent = "*" -) - // InitializeConfig sets up global configuration variables func InitializeConfig() error { var err error @@ -54,6 +47,47 @@ func InitializeConfig() error { } // Load config from b2m.toml + // Load config from b2m.toml + if err := loadTOMLConfig(); err != nil { + return err + } + + // Validate and set derived paths + if err := validateAndSetPaths(); err != nil { + return err + } + + // Fetch user details + fetchUserDetails() + + AppConfig.LocalDBDir = filepath.Join(AppConfig.ProjectRoot, "db", "all_dbs") + AppConfig.LocalVersionDir = filepath.Join(AppConfig.LocalDBDir, ".b2m", "version") + AppConfig.LocalAnchorDir = filepath.Join(AppConfig.LocalDBDir, ".b2m", "local-version") + + return nil +} + +func findProjectRoot() (string, error) { + dir, err := os.Getwd() + if err != nil { + return "", err + } + for { + if info, err := os.Stat(filepath.Join(dir, "db")); err == nil && info.IsDir() { + return dir, nil + } + if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil { + return dir, nil + } + parent := filepath.Dir(dir) + if parent == dir { + return "", fmt.Errorf("root not found (searched for 'db' dir or 'go.mod')") + } + dir = parent + } +} + +func loadTOMLConfig() error { tomlPath := filepath.Join(AppConfig.ProjectRoot, "b2m.toml") if _, err := os.Stat(tomlPath); os.IsNotExist(err) { return fmt.Errorf("couldn't find b2m.toml file at %s: %w", tomlPath, err) @@ -70,6 +104,10 @@ func InitializeConfig() error { AppConfig.RootBucket = tomlConf.RootBucket AppConfig.DiscordWebhookURL = tomlConf.Discord + return nil +} + +func validateAndSetPaths() error { if AppConfig.RootBucket == "" { return fmt.Errorf("rootbucket not defined in b2m.toml file") } @@ -77,55 +115,27 @@ func InitializeConfig() error { return fmt.Errorf("discord not defined in b2m.toml file") } - // Derived paths - // Ensure RootBucket ends with / if !strings.HasSuffix(AppConfig.RootBucket, "/") { AppConfig.RootBucket += "/" } AppConfig.LockDir = AppConfig.RootBucket + "lock/" AppConfig.VersionDir = AppConfig.RootBucket + "version/" + return nil +} - var u *user.User - u, err = user.Current() +func fetchUserDetails() { + u, err := user.Current() if err != nil { AppConfig.CurrentUser = "unknown" } else { AppConfig.CurrentUser = u.Username } - var h string - h, err = os.Hostname() + h, err := os.Hostname() if err != nil { AppConfig.Hostname = "unknown" } else { AppConfig.Hostname = h } - - AppConfig.LocalDBDir = filepath.Join(AppConfig.ProjectRoot, "db", "all_dbs") - AppConfig.LocalVersionDir = filepath.Join(AppConfig.LocalDBDir, ".b2m", "version") - AppConfig.LocalAnchorDir = filepath.Join(AppConfig.LocalDBDir, ".b2m", "local-version") - - // Initialize logging if needed, or other startup tasks - return nil -} - -func findProjectRoot() (string, error) { - dir, err := os.Getwd() - if err != nil { - return "", err - } - for { - if info, err := os.Stat(filepath.Join(dir, "db")); err == nil && info.IsDir() { - return dir, nil - } - if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil { - return dir, nil - } - parent := filepath.Dir(dir) - if parent == dir { - return "", fmt.Errorf("root not found (searched for 'db' dir or 'go.mod')") - } - dir = parent - } } From a3fc0f43eef8c7a58fbf9ef466a0a526e017286e Mon Sep 17 00:00:00 2001 From: Ganesh Kumar Date: Mon, 9 Feb 2026 22:10:56 +0530 Subject: [PATCH 4/5] Add Distributed Database Locking and Hashing Cache --- .gitignore | 1 + b2-manager/core/download.go | 7 +- b2-manager/core/handleFiles.go | 111 +++++++++++++++++ b2-manager/core/hash.go | 136 +++++++++++++++++++++ b2-manager/core/metadata.go | 174 ++++++++------------------- b2-manager/core/rclone.go | 106 ++-------------- b2-manager/core/status.go | 14 ++- b2-manager/core/updating.go | 63 ++++++++++ b2-manager/core/upload.go | 86 +++++++++++++ b2-manager/core/validation.go | 29 +++++ b2-manager/docs/workflow/download.md | 2 +- b2-manager/docs/workflow/hashing.md | 2 +- b2-manager/model/types.go | 9 +- b2-manager/ui/keybindings.go | 5 + b2-manager/ui/operations.go | 59 +++++++++ 15 files changed, 572 insertions(+), 232 deletions(-) create mode 100644 b2-manager/core/handleFiles.go create mode 100644 b2-manager/core/hash.go create mode 100644 b2-manager/core/updating.go diff --git a/.gitignore b/.gitignore index f6e52e0e74..1e769a1449 100644 --- a/.gitignore +++ b/.gitignore @@ -167,3 +167,4 @@ frontend/b2m b2m b2-manager/b2m.toml b2-manager/db/all_dbs/version/ +b2-manager/.b2m/ diff --git a/b2-manager/core/download.go b/b2-manager/core/download.go index bf727fbd06..2a7b26b5af 100644 --- a/b2-manager/core/download.go +++ b/b2-manager/core/download.go @@ -57,8 +57,7 @@ func DownloadDatabase(ctx context.Context, dbName string, quiet bool, onProgress description := "Downloading " + dbName // Use the passed quiet parameter // The new RcloneCopy uses !quiet for verbose. If onProgress is set, it adds json flags. - - if err := RcloneCopy(ctx, remotePath, localDir, description, quiet, onProgress); err != nil { + if err := RcloneCopy(ctx, "copy", remotePath, localDir, description, quiet, onProgress); err != nil { LogError("DownloadDatabase RcloneCopy failed for %s: %v", dbName, err) return fmt.Errorf("download of %s failed: %w", dbName, err) } @@ -135,6 +134,10 @@ func DownloadDatabase(ctx context.Context, dbName string, quiet bool, onProgress return fmt.Errorf("failed to update local anchor for %s: %w", dbName, err) } else { LogInfo("DownloadDatabase: Successfully anchored %s (Hash: %s, Ts: %d)", dbName, localHash, remoteTimestamp) + // Update hash cache on disk as we just calculated it and it is fresh + if err := SaveHashCache(); err != nil { + LogInfo("DownloadDatabase: Warning: Failed to save hash cache: %v", err) + } } return nil diff --git a/b2-manager/core/handleFiles.go b/b2-manager/core/handleFiles.go new file mode 100644 index 0000000000..0a886aadcd --- /dev/null +++ b/b2-manager/core/handleFiles.go @@ -0,0 +1,111 @@ +package core + +import ( + "b2m/config" + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" +) + +// LockDatabase creates a .lock file +func LockDatabase(ctx context.Context, dbName, owner, host, intent string, force bool) error { + locks, err := FetchLocks(ctx) + if err != nil { + LogError("fetchLocks failed in LockDatabase: %v", err) + return err + } + if l, ok := locks[dbName]; ok { + // If force is true, we ignore existing locks (we will overwrite) + // If force is false, we check ownership + if !force { + if l.Owner != owner { + LogError("Database %s already locked by %s", dbName, l.Owner) + return fmt.Errorf("%w: already locked by %s", ErrDatabaseLocked, l.Owner) + } + } + } + + filename := fmt.Sprintf("%s.%s.%s.lock", dbName, owner, host) + + // If forcing, we first clean up ALL existing locks for this DB to ensure we start fresh. + if force { + LogInfo("Force locking: Cleaning up old locks for %s", dbName) + if err := UnlockDatabase(ctx, dbName, "", true); err != nil { + LogInfo("Warning: Failed to cleanup old locks during force lock: %v", err) + } + } + + tmpFile := filepath.Join(os.TempDir(), filename) + if err := os.WriteFile(tmpFile, []byte(intent), 0644); err != nil { + LogError("Failed to write temp lock file: %v", err) + return err + } + defer os.Remove(tmpFile) + + // Use RcloneCopy to upload the lock file + // We use "copyto" because we want to rename the temp file to the target lock filename + // quiet=true because we don't need progress for a small lock file + // onProgress=nil + if err := RcloneCopy(ctx, "copyto", tmpFile, config.AppConfig.LockDir+filename, "Acquiring lock...", true, nil); err != nil { + // If cancelled + if ctx.Err() != nil { + return fmt.Errorf("lock cancelled") + } + LogError("LockDatabase: RcloneCopy failed: %v", err) + return fmt.Errorf("failed to acquire lock: %w", err) + } + return nil +} + +// UnlockDatabase removes the .lock file +func UnlockDatabase(ctx context.Context, dbName, owner string, force bool) error { + // If force is true, we delete ALL lock files for this DB to ensure a clean slate. + if force { + // Use rclone delete with include pattern + // Pattern: dbName.*.lock + pattern := fmt.Sprintf("%s.*.lock", dbName) + LogInfo("Force unlocking %s: deleting all files matching %s", dbName, pattern) + + cmd := exec.CommandContext(ctx, "rclone", "delete", config.AppConfig.LockDir, "--include", pattern) + if err := cmd.Run(); err != nil { + LogError("Failed to force delete lock files on B2: %v", err) + return fmt.Errorf("failed to force delete lock files: %w", err) + } + return nil + } + + // Normal graceful unlock + locks, err := FetchLocks(ctx) + if err != nil { + LogError("fetchLocks failed in UnlockDatabase: %v", err) + return err + } + + entry, ok := locks[dbName] + if !ok { + return nil // Already unlocked + } + + if entry.Owner != owner { + LogError("Cannot unlock %s: owned by %s", dbName, entry.Owner) + return fmt.Errorf("cannot unlock: owned by %s", entry.Owner) + } + + filename := fmt.Sprintf("%s.%s.%s.%s", dbName, entry.Owner, entry.Hostname, entry.Type) + + // Safety check: ensure we are only deleting a .lock file + if !strings.HasSuffix(filename, ".lock") { + LogError("Safety check failed: attempted to delete non-lock file %s", filename) + return fmt.Errorf("safety check failed: attempted to delete non-lock file %s", filename) + } + + // Use RcloneDeleteFile + if err := RcloneDeleteFile(ctx, config.AppConfig.LockDir+filename); err != nil { + LogError("UnlockDatabase: RcloneDeleteFile failed: %v", err) + return fmt.Errorf("failed to delete lock file: %w", err) + } + return nil +} diff --git a/b2-manager/core/hash.go b/b2-manager/core/hash.go new file mode 100644 index 0000000000..4d7cb2dd01 --- /dev/null +++ b/b2-manager/core/hash.go @@ -0,0 +1,136 @@ +package core + +import ( + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "sync" + + "github.com/zeebo/xxh3" + + "b2m/config" +) + +// cachedHash stores the hash and file stat info to avoid re-hashing unchanged files +type cachedHash struct { + Hash string + ModTime int64 + Size int64 +} + +var ( + fileHashCache = make(map[string]cachedHash) + fileHashCacheMu sync.RWMutex +) + +// CalculateXXHash calculates the xxHash (as hex string) of a file with caching +func CalculateXXHash(filePath string) (string, error) { + info, err := os.Stat(filePath) + if err != nil { + LogError("CalculateXXHash: Failed to stat file %s: %v", filePath, err) + return "", err + } + + // Check cache + fileHashCacheMu.RLock() + cached, ok := fileHashCache[filePath] + fileHashCacheMu.RUnlock() + + if ok && cached.ModTime == info.ModTime().UnixNano() && cached.Size == info.Size() { + return cached.Hash, nil + } else { + LogInfo("Cache miss for %s. Cached: %v, Current: ModTime=%d, Size=%d", filepath.Base(filePath), ok, info.ModTime().UnixNano(), info.Size()) + } + + // Calculate hash + f, err := os.Open(filePath) + if err != nil { + LogError("CalculateXXHash: Failed to open file %s: %v", filePath, err) + return "", err + } + defer f.Close() + + // Use streaming digest + h := xxh3.New() + if _, err := io.Copy(h, f); err != nil { + LogError("CalculateXXHash: io.Copy failed for %s: %v", filePath, err) + return "", err + } + + // Sum64 returns uint64, format as hex string for compatibility + hash := fmt.Sprintf("%016x", h.Sum64()) + + // Update cache + fileHashCacheMu.Lock() + fileHashCache[filePath] = cachedHash{ + Hash: hash, + ModTime: info.ModTime().UnixNano(), + Size: info.Size(), + } + fileHashCacheMu.Unlock() + + return hash, nil +} + +// LoadHashCache loads the hash cache from disk +func LoadHashCache() error { + cachePath := filepath.Join(config.AppConfig.LocalAnchorDir, "hash.json") + if _, err := os.Stat(cachePath); os.IsNotExist(err) { + return nil // No cache exists yet + } + + data, err := os.ReadFile(cachePath) + if err != nil { + LogError("LoadHashCache: Failed to read cache file: %v", err) + return err + } + + fileHashCacheMu.Lock() + defer fileHashCacheMu.Unlock() + + if err := json.Unmarshal(data, &fileHashCache); err != nil { + LogError("LoadHashCache: Failed to unmarshal cache: %v", err) + return fmt.Errorf("failed to unmarshal cache: %w", err) + } + + LogInfo("Loaded %d entries from hash cache", len(fileHashCache)) + // LogInfo("Loaded %d entries from hash cache at %s", len(fileHashCache), cachePath) + // for k, v := range fileHashCache { + // LogInfo(" - Loaded: %s -> %s (Mod: %d, Size: %d)", filepath.Base(k), v.Hash, v.ModTime, v.Size) + // } + return nil +} + +// SaveHashCache saves the hash cache to disk +func SaveHashCache() error { + cachePath := filepath.Join(config.AppConfig.LocalAnchorDir, "hash.json") + + // Ensure directory exists + if err := os.MkdirAll(config.AppConfig.LocalAnchorDir, 0755); err != nil { + LogError("SaveHashCache: Failed to create directory: %v", err) + return err + } + + fileHashCacheMu.RLock() + data, err := json.MarshalIndent(fileHashCache, "", " ") + fileHashCacheMu.RUnlock() + + if err != nil { + LogError("SaveHashCache: Failed to marshal cache: %v", err) + return err + } + + if err := os.WriteFile(cachePath, data, 0644); err != nil { + LogError("SaveHashCache: Failed to write file: %v", err) + return err + } + + LogInfo("Saved %d entries to hash cache", len(fileHashCache)) + // LogInfo("Saving %d entries to hash cache:", len(fileHashCache)) + // for k := range fileHashCache { + // LogInfo(" - Saving: %s", filepath.Base(k)) + // } + return nil +} diff --git a/b2-manager/core/metadata.go b/b2-manager/core/metadata.go index 3267fda371..0ed432f3ae 100644 --- a/b2-manager/core/metadata.go +++ b/b2-manager/core/metadata.go @@ -4,15 +4,11 @@ import ( "context" "encoding/json" "fmt" - "io" "os" "os/exec" "path/filepath" "runtime" "strings" - "sync" - - "github.com/zeebo/xxh3" "b2m/config" "b2m/model" @@ -61,129 +57,6 @@ func GenerateLocalMetadata(dbName string, uploadDuration float64, status string) return meta, nil } -// cachedHash stores the hash and file stat info to avoid re-hashing unchanged files -type cachedHash struct { - Hash string - ModTime int64 - Size int64 -} - -var ( - fileHashCache = make(map[string]cachedHash) - fileHashCacheMu sync.RWMutex -) - -// CalculateXXHash calculates the xxHash (as hex string) of a file with caching -func CalculateXXHash(filePath string) (string, error) { - info, err := os.Stat(filePath) - if err != nil { - LogError("CalculateXXHash: Failed to stat file %s: %v", filePath, err) - return "", err - } - - // Check cache - fileHashCacheMu.RLock() - cached, ok := fileHashCache[filePath] - fileHashCacheMu.RUnlock() - - if ok && cached.ModTime == info.ModTime().UnixNano() && cached.Size == info.Size() { - return cached.Hash, nil - } else { - LogInfo("Cache miss for %s. Cached: %v, Current: ModTime=%d, Size=%d", filepath.Base(filePath), ok, info.ModTime().UnixNano(), info.Size()) - } - - // Calculate hash - f, err := os.Open(filePath) - if err != nil { - LogError("CalculateXXHash: Failed to open file %s: %v", filePath, err) - return "", err - } - defer f.Close() - - // Use streaming digest - h := xxh3.New() - if _, err := io.Copy(h, f); err != nil { - LogError("CalculateXXHash: io.Copy failed for %s: %v", filePath, err) - return "", err - } - - // Sum64 returns uint64, format as hex string for compatibility - hash := fmt.Sprintf("%016x", h.Sum64()) - - // Update cache - fileHashCacheMu.Lock() - fileHashCache[filePath] = cachedHash{ - Hash: hash, - ModTime: info.ModTime().UnixNano(), - Size: info.Size(), - } - fileHashCacheMu.Unlock() - - return hash, nil -} - -// LoadHashCache loads the hash cache from disk -func LoadHashCache() error { - cachePath := filepath.Join(config.AppConfig.LocalAnchorDir, "hash.json") - if _, err := os.Stat(cachePath); os.IsNotExist(err) { - return nil // No cache exists yet - } - - data, err := os.ReadFile(cachePath) - if err != nil { - LogError("LoadHashCache: Failed to read cache file: %v", err) - return err - } - - fileHashCacheMu.Lock() - defer fileHashCacheMu.Unlock() - - if err := json.Unmarshal(data, &fileHashCache); err != nil { - LogError("LoadHashCache: Failed to unmarshal cache: %v", err) - // Don't fail hard, just start with empty cache - return nil - } - - LogInfo("Loaded %d entries from hash cache", len(fileHashCache)) - // LogInfo("Loaded %d entries from hash cache at %s", len(fileHashCache), cachePath) - // for k, v := range fileHashCache { - // LogInfo(" - Loaded: %s -> %s (Mod: %d, Size: %d)", filepath.Base(k), v.Hash, v.ModTime, v.Size) - // } - return nil -} - -// SaveHashCache saves the hash cache to disk -func SaveHashCache() error { - cachePath := filepath.Join(config.AppConfig.LocalAnchorDir, "hash.json") - - // Ensure directory exists - if err := os.MkdirAll(config.AppConfig.LocalAnchorDir, 0755); err != nil { - LogError("SaveHashCache: Failed to create directory: %v", err) - return err - } - - fileHashCacheMu.RLock() - data, err := json.MarshalIndent(fileHashCache, "", " ") - fileHashCacheMu.RUnlock() - - if err != nil { - LogError("SaveHashCache: Failed to marshal cache: %v", err) - return err - } - - if err := os.WriteFile(cachePath, data, 0644); err != nil { - LogError("SaveHashCache: Failed to write file: %v", err) - return err - } - - LogInfo("Saved %d entries to hash cache", len(fileHashCache)) - // LogInfo("Saving %d entries to hash cache:", len(fileHashCache)) - // for k := range fileHashCache { - // LogInfo(" - Saving: %s", filepath.Base(k)) - // } - return nil -} - // DownloadAndLoadMetadata syncs metadata from remote to local cache and loads it func DownloadAndLoadMetadata() (map[string]*model.Metadata, error) { LogInfo("Downloading and loading metadata...") @@ -246,6 +119,53 @@ func DownloadAndLoadMetadata() (map[string]*model.Metadata, error) { return result, nil } +// FetchSingleRemoteMetadata downloads and parses the metadata for a specific DB from the remote version dir +func FetchSingleRemoteMetadata(ctx context.Context, dbName string) (*model.Metadata, error) { + fileID := strings.TrimSuffix(dbName, ".db") + metadataFilename := fileID + ".metadata.json" + + // Paths + remotePath := filepath.Join(config.AppConfig.VersionDir, metadataFilename) + localDir := config.AppConfig.LocalVersionDir // Destination is the directory + localFile := filepath.Join(localDir, metadataFilename) + + // Ensure local dir exists + if err := os.MkdirAll(localDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create local version dir: %w", err) + } + + // Use RcloneCopy (it takes a file source and a directory destination usually, but let's check RcloneCopy impl) + // RcloneCopy signature: func RcloneCopy(ctx context.Context, src, dst, description string, quiet bool, onProgress func(model.RcloneProgress)) error + // If src is a file, dst can be a directory or a file? rclone copy usually expects dest to be a directory if source is a file? + // Actually rclone copy src dest. If dest is existing dir, it puts it there. + // Our RcloneCopy wraps `rclone copy`. + + // To be safe and specific, we can use `copyto` via direct exec or trust `RcloneCopy` if we pass the full remote path. + // RcloneCopy uses `rclone copy`. + // "Copy the source to the destination. Doesn't transfer unchanged files." + + // Let's use RcloneCopy with quiet=true. + if err := RcloneCopy(ctx, "copy", remotePath, localDir, "Fetching metadata", true, nil); err != nil { + return nil, fmt.Errorf("failed to fetch remote metadata: %w", err) + } + + // Read the file + data, err := os.ReadFile(localFile) + if err != nil { + if os.IsNotExist(err) { + return nil, nil // Metadata doesn't exist remotely yet (New DB) + } + return nil, fmt.Errorf("failed to read fetched metadata: %w", err) + } + + var meta model.Metadata + if err := json.Unmarshal(data, &meta); err != nil { + return nil, fmt.Errorf("failed to unmarshal metadata: %w", err) + } + + return &meta, nil +} + // UploadMetadata uploads the metadata file for a database func UploadMetadata(ctx context.Context, dbName string, meta *model.Metadata) error { LogInfo("Uploading metadata for %s", dbName) diff --git a/b2-manager/core/rclone.go b/b2-manager/core/rclone.go index cb519b885b..882ef3357f 100644 --- a/b2-manager/core/rclone.go +++ b/b2-manager/core/rclone.go @@ -83,9 +83,9 @@ func checkFileChanged(dbName string) (bool, error) { return false, nil // No change } -// RcloneCopy copies source to destination using rclone copy with options -func RcloneCopy(ctx context.Context, src, dst, description string, quiet bool, onProgress func(model.RcloneProgress)) error { - rcloneArgs := []string{"copy", +// RcloneCopy copies source to destination using rclone copy/copyto with options +func RcloneCopy(ctx context.Context, cmdName, src, dst, description string, quiet bool, onProgress func(model.RcloneProgress)) error { + rcloneArgs := []string{cmdName, src, dst, "--checksum", @@ -179,7 +179,7 @@ func UploadDatabase(ctx context.Context, dbName string, quiet bool, onProgress f // Use RcloneCopy with flat arguments description := "Uploading " + dbName - if err := RcloneCopy(ctx, localPath, config.AppConfig.RootBucket, description, quiet, onProgress); err != nil { + if err := RcloneCopy(ctx, "copy", localPath, config.AppConfig.RootBucket, description, quiet, onProgress); err != nil { LogError("UploadDatabase: RcloneCopy failed: %v", err) return nil, err } @@ -208,6 +208,11 @@ func UploadDatabase(ctx context.Context, dbName string, quiet bool, onProgress f return nil, err } + // Update hash cache on disk as GenerateLocalMetadata updated memory cache + if err := SaveHashCache(); err != nil { + LogInfo("UploadDatabase: Warning: Failed to save hash cache: %v", err) + } + if err := UploadMetadata(ctx, dbName, meta); err != nil { if !quiet { LogError("⚠️ Failed to upload metadata: %v", err) @@ -229,99 +234,14 @@ func UploadDatabase(ctx context.Context, dbName string, quiet bool, onProgress f return meta, nil } -// LockDatabase creates a .lock file -func LockDatabase(ctx context.Context, dbName, owner, host, intent string, force bool) error { - locks, err := FetchLocks(ctx) - if err != nil { - LogError("fetchLocks failed in LockDatabase: %v", err) - return err - } - if l, ok := locks[dbName]; ok { - // If force is true, we ignore existing locks (we will overwrite) - // If force is false, we check ownership - if !force { - if l.Owner != owner { - LogError("Database %s already locked by %s", dbName, l.Owner) - return fmt.Errorf("%w: already locked by %s", ErrDatabaseLocked, l.Owner) - } - } - } - - filename := fmt.Sprintf("%s.%s.%s.lock", dbName, owner, host) - // If forcing, we first clean up ALL existing locks for this DB to ensure we start fresh. - if force { - LogInfo("Force locking: Cleaning up old locks for %s", dbName) - if err := UnlockDatabase(ctx, dbName, "", true); err != nil { - LogInfo("Warning: Failed to cleanup old locks during force lock: %v", err) - } - } - - tmpFile := filepath.Join(os.TempDir(), filename) - if err := os.WriteFile(tmpFile, []byte(intent), 0644); err != nil { - LogError("Failed to write temp lock file: %v", err) - return err - } - defer os.Remove(tmpFile) - - cmd := exec.CommandContext(ctx, "rclone", "copyto", tmpFile, config.AppConfig.LockDir+filename) +// RcloneDeleteFile deletes a single file using rclone deletefile +func RcloneDeleteFile(ctx context.Context, filePath string) error { + cmd := exec.CommandContext(ctx, "rclone", "deletefile", filePath) if err := cmd.Run(); err != nil { - if ctx.Err() != nil { - return fmt.Errorf("lock cancelled") - } - LogError("Failed to upload lock file to B2: %v", err) - return fmt.Errorf("failed to acquire lock: %w", err) - } - return nil -} - -// UnlockDatabase removes the .lock file -func UnlockDatabase(ctx context.Context, dbName, owner string, force bool) error { - // If force is true, we delete ALL lock files for this DB to ensure a clean slate. - if force { - // Use rclone delete with include pattern - // Pattern: dbName.*.lock - pattern := fmt.Sprintf("%s.*.lock", dbName) - LogInfo("Force unlocking %s: deleting all files matching %s", dbName, pattern) - - cmd := exec.CommandContext(ctx, "rclone", "delete", config.AppConfig.LockDir, "--include", pattern) - if err := cmd.Run(); err != nil { - LogError("Failed to force delete lock files on B2: %v", err) - return fmt.Errorf("failed to force delete lock files: %w", err) - } - return nil - } - - // Normal graceful unlock - locks, err := FetchLocks(ctx) - if err != nil { - LogError("fetchLocks failed in UnlockDatabase: %v", err) + LogError("RcloneDeleteFile: Failed to delete %s: %v", filePath, err) return err } - - entry, ok := locks[dbName] - if !ok { - return nil // Already unlocked - } - - if entry.Owner != owner { - LogError("Cannot unlock %s: owned by %s", dbName, entry.Owner) - return fmt.Errorf("cannot unlock: owned by %s", entry.Owner) - } - - filename := fmt.Sprintf("%s.%s.%s.%s", dbName, entry.Owner, entry.Hostname, entry.Type) - - // Safety check: ensure we are only deleting a .lock file - if !strings.HasSuffix(filename, ".lock") { - LogError("Safety check failed: attempted to delete non-lock file %s", filename) - return fmt.Errorf("safety check failed: attempted to delete non-lock file %s", filename) - } - - cmd := exec.CommandContext(ctx, "rclone", "deletefile", config.AppConfig.LockDir+filename) - if err := cmd.Run(); err != nil { - LogError("Failed to delete lock file on B2: %v", err) - return fmt.Errorf("failed to delete lock file: %w", err) - } return nil } diff --git a/b2-manager/core/status.go b/b2-manager/core/status.go index faed106745..62d5898191 100644 --- a/b2-manager/core/status.go +++ b/b2-manager/core/status.go @@ -283,11 +283,17 @@ func FetchDBStatusData(ctx context.Context, onProgress func(string)) ([]model.DB colorVal = statusColor[0] } + var rawMetaStatus string + if meta, ok := remoteMetas[db.Name]; ok { + rawMetaStatus = meta.Status + } + statusData = append(statusData, model.DBStatusInfo{ - DB: db, - Status: statusText, - StatusCode: statusCode, - Color: colorVal, + DB: db, + Status: statusText, + StatusCode: statusCode, + RemoteMetaStatus: rawMetaStatus, + Color: colorVal, }) } diff --git a/b2-manager/core/updating.go b/b2-manager/core/updating.go new file mode 100644 index 0000000000..22d058e730 --- /dev/null +++ b/b2-manager/core/updating.go @@ -0,0 +1,63 @@ +package core + +import ( + "context" + + "b2m/config" +) + +// AcquireCustomLock acquires a lock for a database and sets its status to "updating" +// This is used for manual locking operations where the user intends to perform maintenance +func AcquireCustomLock(ctx context.Context, dbName string) error { + LogInfo("AcquireCustomLock: Locking %s for manual update", dbName) + + // 1. Acquire Lock + // We use "manual_update" as the intent, but the standard Lock mechanism just uses .lock extension + // The intent ends up in the file content if we look at LockDatabase implementation + // User requirement: "User can lock the db... This 'l' command should only create lock and update metadata with status: 'updating'" + // We use Force=false here to respect other locks. The LockDatabase function checks validity. + if err := LockDatabase(ctx, dbName, config.AppConfig.CurrentUser, config.AppConfig.Hostname, "manual_update", false); err != nil { + return err + } + + // 2. Generate and Upload Metadata with Status "updating" + // We need to create a metadata entry that signifies "updating". + // We can base it on the current local state or just create a minimal update. + // "This 'l' commad should only create lock and update metadata with status: 'updating'" + + // Let's get current local metadata or generate new if missing + meta, err := GenerateLocalMetadata(dbName, 0, "updating") + if err != nil { + // If gen fails (e.g. no local file), what should we do? + // If custom lock is allowed on remote-only DB? Probably not. + // Assume local DB exists. + LogError("AcquireCustomLock: Failed to generate metadata: %v", err) + // Undo lock? + UnlockDatabase(ctx, dbName, config.AppConfig.CurrentUser, true) + return err + } + + // Explicitly set status to updating + meta.Status = "updating" + + // Appending event? + // The user request says "update metadata with status: 'updating'". + // Typically we track history. Let's append an event. + meta, err = AppendEventToMetadata(dbName, meta) + if err != nil { + LogError("AcquireCustomLock: Failed to append event: %v", err) + UnlockDatabase(ctx, dbName, config.AppConfig.CurrentUser, true) + return err + } + + // Upload Metadata + if err := UploadMetadata(ctx, dbName, meta); err != nil { + LogError("AcquireCustomLock: Failed to upload metadata: %v", err) + UnlockDatabase(ctx, dbName, config.AppConfig.CurrentUser, true) + return err + } + + LogInfo("AcquireCustomLock: Successfully locked %s and set status to 'updating'", dbName) + + return nil +} diff --git a/b2-manager/core/upload.go b/b2-manager/core/upload.go index e6744d9f66..b8864811cc 100644 --- a/b2-manager/core/upload.go +++ b/b2-manager/core/upload.go @@ -3,6 +3,7 @@ package core import ( "context" "fmt" + "path/filepath" "time" "b2m/config" @@ -19,6 +20,21 @@ import ( // 4. Anchor: Upon success, update the local execution anchor (LocalVersion) to match the new state. // 5. Finalize: Remove the lock file. func PerformUpload(ctx context.Context, dbName string, force bool, onProgress func(model.RcloneProgress), onStatusUpdate func(string)) error { + if onStatusUpdate != nil { + onStatusUpdate("Safety Check...") + } + + // ------------------------------------------------------------------------- + // PHASE 0: PRE-UPLOAD SAFETY CHECK + // Check if the remote file is newer than our local version to prevent data loss. + // ------------------------------------------------------------------------- + if !force { + if err := CheckUploadSafety(ctx, dbName); err != nil { + LogError("PerformUpload: Safety check failed for %s: %v", dbName, err) + return fmt.Errorf("safety check failed: %w", err) + } + } + if onStatusUpdate != nil { onStatusUpdate("Locking...") } @@ -100,6 +116,18 @@ func PerformUpload(ctx context.Context, dbName string, force bool, onProgress fu // Non-fatal, but meaningful warning } else { LogInfo("Successfully updated local-version anchor for %s", dbName) + + // Update hash cache on disk as we just calculated it and it is fresh + // We recalculate the hash of the LOCAL file. CalculateXXHash updates the in-memory cache + // with the new ModTime and Size. Then SaveHashCache persists it. + localPath := filepath.Join(config.AppConfig.LocalDBDir, dbName) + if _, err := CalculateXXHash(localPath); err != nil { + LogError("PerformUpload: Failed to recalculate hash for cache update: %v", err) + } else { + if err := SaveHashCache(); err != nil { + LogInfo("PerformUpload: Warning: Failed to save hash cache: %v", err) + } + } } } @@ -122,5 +150,63 @@ func PerformUpload(ctx context.Context, dbName string, force bool, onProgress fu } LogInfo("Upload complete for %s", dbName) + LogInfo("Upload complete for %s", dbName) + + return nil +} + +// CheckUploadSafety verifies that the remote database is not newer than the local one. +// It fetches the specific remote metadata and compares it with the local anchor and file. +func CheckUploadSafety(ctx context.Context, dbName string) error { + LogInfo("CheckUploadSafety: Verifying status for %s...", dbName) + + // 1. Fetch Remote Metadata (Specific file only) + remoteMeta, err := FetchSingleRemoteMetadata(ctx, dbName) + if err != nil { + // If fetch failed, it might be net issue or config. + // If it's just "not found", FetchSingleRemoteMetadata returns nil, nil. + return fmt.Errorf("failed to fetch remote metadata: %w", err) + } + + if remoteMeta == nil { + LogInfo("CheckUploadSafety: No remote metadata found. Safe to upload (New DB).") + return nil + } + + // 2. Get Local Anchor + localAnchor, err := GetLocalVersion(dbName) + if err != nil { + // If non-critical error (like permission), we might fail. + // If not found, localAnchor is nil. + LogInfo("CheckUploadSafety: Error reading local anchor: %v (Assuming no anchor)", err) + } + + // 3. Compare + // Logic matches CalculateDBStatus Phase 3 & 4 somewhat, but strict for upload. + + // Case A: Remote Exists, but No Local Anchor. + // This implies we pulled a repo or deleted local metadata, but remote has history. + // We risk overwriting something we don't know about. + if localAnchor == nil { + // Exception: If hashes match, we are coincidentally in sync (autofixed elsewhere, but here we proceed). + localPath := filepath.Join(config.AppConfig.LocalDBDir, dbName) + if hash, err := CalculateXXHash(localPath); err == nil && hash == remoteMeta.Hash { + LogInfo("CheckUploadSafety: No anchor, but hashes match. Safe to upload (Update).") + return nil + } + + return fmt.Errorf("remote database exists but no local history found. Please download first to sync") + } + + // Case B: Remote Hash != Anchor Hash + // This means Remote has changed since we last downloaded/uploaded. + if remoteMeta.Hash != localAnchor.Hash { + // The remote version is different from what we based our work on. + return fmt.Errorf("remote database is newer (Remote Hash %s != Anchor Hash %s). Please download to merge/sync", remoteMeta.Hash[:8], localAnchor.Hash[:8]) + } + + // Case C: Remote Hash == Anchor Hash + // We are based on the latest remote. Safe to overwrite. + LogInfo("CheckUploadSafety: Local anchor matches remote. Safe to upload.") return nil } diff --git a/b2-manager/core/validation.go b/b2-manager/core/validation.go index 1ecb4e1f93..c3a2c717c3 100644 --- a/b2-manager/core/validation.go +++ b/b2-manager/core/validation.go @@ -9,6 +9,9 @@ import ( // ErrWarningLocalChanges is a special error indicating a warning validation state var ErrWarningLocalChanges = fmt.Errorf("WARNING_LOCAL_CHANGES") +// ErrWarningDatabaseUpdating indicates the database is being updated by another user +var ErrWarningDatabaseUpdating = fmt.Errorf("WARNING_DATABASE_UPDATING") + const ( ActionUpload = "upload" ActionDownload = "download" @@ -33,6 +36,32 @@ func ValidateAction(dbInfo model.DBStatusInfo, action string) error { // If we have local changes (or are new local), downloading will overwrite them. return ErrWarningLocalChanges } + + // Check if locked by other and updating + if dbInfo.StatusCode == model.StatusCodeLockedByOther { + // We can inspect the Status text or we might need access to more details. + // Currently dbInfo.Status is the formatted string. + // But wait, ValidateAction only takes dbInfo. + // core.CalculateDBStatus returns formatted string. + // "User 1 is Updating 🔄" + // This relies on string parsing which is brittle. + // BETTER: ValidateAction should maybe take the raw metadata or lock info? + // OR validation check in UI? + // The user request says: "Also should show small warning notification suggesting to downloader that db is updating" + // In `CalculateDBStatus`, we return `LockedByOther` and text "... is Updating ...". + + // Let's rely on string check for now as we don't want to change ValidateAction signature too much yet + // unless we really need to. + // Actually, let's look at `ui/operations.go`: `core.ValidateAction(selectedDB, "download")` + // `selectedDB` is `model.DBStatusInfo`. + + // If status contains "Updating", return warning. + // We now check dbInfo.RemoteMetaStatus which is populated from raw metadata + // If status is "updating" or "uploading", return warning. + if dbInfo.StatusCode == model.StatusCodeLockedByOther && (dbInfo.RemoteMetaStatus == "updating" || dbInfo.RemoteMetaStatus == "uploading") { + return ErrWarningDatabaseUpdating + } + } } return nil } diff --git a/b2-manager/docs/workflow/download.md b/b2-manager/docs/workflow/download.md index 952d61204d..059ce42406 100644 --- a/b2-manager/docs/workflow/download.md +++ b/b2-manager/docs/workflow/download.md @@ -29,7 +29,7 @@ Before starting, `core.ValidateAction` enforces safeguards. Once the download is successful, we anchor the local state to the remote state. 1. **ConstructVerifiedAnchor**: This function calculates the SHA256 of the newly downloaded file. -2. **Fetch Remote Context**: Reads the latest metadata from the local mirror (`db/all_dbs/version/`) to get the `Timestamp` and `Uploader`. +2. **Fetch Remote Context**: Reads the latest metadata from the local mirror (`db/all_dbs/.b2m/version/`) to get the `Timestamp` and `Uploader`. 3. **Construct Anchor**: Creates a new metadata object combining the **Local Hash** + **Remote Timestamp**. 4. **Save**: Writes to `local-versions/`. 5. **Result**: Status becomes **"Up to Date ✅"**. diff --git a/b2-manager/docs/workflow/hashing.md b/b2-manager/docs/workflow/hashing.md index c4fbde52fb..c0fbababde 100644 --- a/b2-manager/docs/workflow/hashing.md +++ b/b2-manager/docs/workflow/hashing.md @@ -4,7 +4,7 @@ The application uses xxHash (specifically xxh3) to calculate checksums of local ## Cache Logic -1. **Startup**: The application loads the existing cache from `hash.json` located in the local anchor directory (`db/all_dbs/local-version/`). +1. **Startup**: The application loads the existing cache from `hash.json` located in the local anchor directory (`db/all_dbs/.b2m/local-version/`). 2. **Hashing Request**: When a hash is requested for a file: - The system checks if the file path exists in the in-memory cache. - It compares the file's current modification time (`ModTime`) and size (`Size`) against the cached values. diff --git a/b2-manager/model/types.go b/b2-manager/model/types.go index d08ca86a81..1ad0694d71 100644 --- a/b2-manager/model/types.go +++ b/b2-manager/model/types.go @@ -17,10 +17,11 @@ type DBInfo struct { // DBStatusInfo represents a database with its calculated status type DBStatusInfo struct { - DB DBInfo - Status string - StatusCode string // Stable identifier for logic (e.g. "remote_newer") - Color text.Color + DB DBInfo + Status string + StatusCode string // Stable identifier for logic (e.g. "remote_newer") + RemoteMetaStatus string // Raw status from remote metadata (e.g. "updating", "uploading") + Color text.Color } // RcloneProgress represents the structure of rclone's JSON stats output diff --git a/b2-manager/ui/keybindings.go b/b2-manager/ui/keybindings.go index ec05b261ce..b7bf4d5a8f 100644 --- a/b2-manager/ui/keybindings.go +++ b/b2-manager/ui/keybindings.go @@ -36,6 +36,11 @@ func (lc *ListController) GetKeybindings() []*Keybinding { Handler: lc.onDownload, Description: "Download", }, + { + Key: 'l', + Handler: lc.onLock, + Description: "Lock (Update)", + }, { Key: 'c', Handler: lc.onCancel, diff --git a/b2-manager/ui/operations.go b/b2-manager/ui/operations.go index e6e44a902a..e1e1d8290b 100644 --- a/b2-manager/ui/operations.go +++ b/b2-manager/ui/operations.go @@ -181,6 +181,13 @@ func (lc *ListController) onDownload(g *gocui.Gui, v *gocui.View) error { }, nil) return nil } + if err == core.ErrWarningDatabaseUpdating { + // Warning: Database is being updated by another user + lc.app.confirm("Warning: DB Updating", "This database is currently being updated by another user.\nDownloading now might give you incomplete data.\n\nAre you sure?", func() { + startDownload() + }, nil) + return nil + } // Block error lc.app.confirm("Error: Cannot Download", err.Error()+"\n\n(Press y/n to close)", nil, nil) return nil @@ -206,3 +213,55 @@ func (lc *ListController) onCancel(g *gocui.Gui, v *gocui.View) error { } return nil } + +func (lc *ListController) onLock(g *gocui.Gui, v *gocui.View) error { + lc.app.mu.Lock() + if lc.app.selected < 0 || lc.app.selected >= len(lc.app.dbs) { + lc.app.mu.Unlock() + return nil + } + selectedDB := lc.app.dbs[lc.app.selected] + lc.app.mu.Unlock() + + // Validation: Check if already locked by other + if selectedDB.StatusCode == model.StatusCodeLockedByOther { + lc.app.confirm("Error: Cannot Lock", "Database is already locked by another user.\n\n(Press y/n to close)", nil, nil) + return nil + } + + // Check if already locked by you (status LockedByYou or Uploading) + // If it is locked by us, we might want to either re-lock or just warn? + // User request: "User can lock the db for update by pressing 'l' key." + // If we already lock it, maybe we just want to update status to "updating"? + // Use case: I locked it potentially automatically or manually before, now I want to explicitly say "updating". + // So we should allow it even if locked by us. + + confirmMsg := fmt.Sprintf("Lock %s for manual update?\n\nThis will prevent others from uploading\nand set status to 'updating'.", selectedDB.DB.Name) + + lc.app.confirm("Lock Database?", confirmMsg, func() { + // on Yes + lc.app.startOperation("Locking", func(ctx context.Context, dbName string) error { + lc.app.updateDBStatus(dbName, "Locking...", 0, 0, "lock", "") + + err := core.AcquireCustomLock(ctx, dbName) + if err != nil { + lc.app.updateDBStatus(dbName, fmt.Sprintf("Error: %v", err), -1, -1, "error", "") + return err + } + + // Success + lc.app.updateDBStatus(dbName, "Locked (Updating)...", 100, 0, "lock", "") + // Refresh to show new status + go func() { + // We sleep briefly to allow the remote change to propagate and be visible to the next list/status fetch. + // This is a heuristic; ideally we should poll or have immediate local state reflection. + time.Sleep(1 * time.Second) + lc.app.refreshStatus() + }() + + return nil + }) + }, nil) + + return nil +} From 652602baabd4cdbf27507f72afc4e1630d364d4c Mon Sep 17 00:00:00 2001 From: Ganesh Kumar Date: Mon, 9 Feb 2026 22:15:02 +0530 Subject: [PATCH 5/5] Refactor Metadata Handling, Strengthen Validation, and Detail Workflow Operations --- b2-manager/core/metadata.go | 64 +--------------------------- b2-manager/core/upload.go | 2 - b2-manager/core/validation.go | 12 ------ b2-manager/docs/workflow/cancel.md | 2 + b2-manager/docs/workflow/download.md | 9 ++-- b2-manager/docs/workflow/hashing.md | 15 ++++--- b2-manager/docs/workflow/status.md | 4 +- b2-manager/docs/workflow/upload.md | 22 ++++++++-- b2-manager/ui/operations.go | 29 +++++++++++-- 9 files changed, 64 insertions(+), 95 deletions(-) diff --git a/b2-manager/core/metadata.go b/b2-manager/core/metadata.go index 0ed432f3ae..d5d1396b2a 100644 --- a/b2-manager/core/metadata.go +++ b/b2-manager/core/metadata.go @@ -121,6 +121,8 @@ func DownloadAndLoadMetadata() (map[string]*model.Metadata, error) { // FetchSingleRemoteMetadata downloads and parses the metadata for a specific DB from the remote version dir func FetchSingleRemoteMetadata(ctx context.Context, dbName string) (*model.Metadata, error) { + // Get file ID (db name without .db extension). Use simple string trimming; + // if dbName doesn't end in .db, fileID will equal dbName. fileID := strings.TrimSuffix(dbName, ".db") metadataFilename := fileID + ".metadata.json" @@ -323,68 +325,6 @@ func HandleBatchMetadataGeneration() { LogInfo("Batch metadata generation completed. Success: %d", successCount) } -// ConstructVerifiedAnchor creates a local anchor by combining the local file's hash -// with the identity (timestamp, etc.) from the remote metadata mirror. -// This ensures that the anchor truthfully represents the local state while linking it to the remote version. -func ConstructVerifiedAnchor(dbName string) error { - LogInfo("ConstructVerifiedAnchor: Building anchor for %s...", dbName) - - // 1. Calculate Local Hash - localDBPath := filepath.Join(config.AppConfig.LocalDBDir, dbName) - localHash, err := CalculateXXHash(localDBPath) - if err != nil { - LogError("ConstructVerifiedAnchor: Failed to calculate local hash for %s: %v", dbName, err) - return fmt.Errorf("failed to calculate local hash: %w", err) - } - - // 2. Read Remote Mirror (Source of Truth for Identity) - fileID := strings.TrimSuffix(dbName, ".db") - metadataFilename := fileID + ".metadata.json" - mirrorPath := filepath.Join(config.AppConfig.LocalVersionDir, metadataFilename) // LocalVersionDir = Mirror (db/all_dbs/version) - - input, err := os.ReadFile(mirrorPath) - if err != nil { - LogError("ConstructVerifiedAnchor: Failed to read mirror metadata at %s: %v", mirrorPath, err) - return fmt.Errorf("failed to read mirror metadata: %w", err) - } - - var meta model.Metadata - if err := json.Unmarshal(input, &meta); err != nil { - LogError("ConstructVerifiedAnchor: Failed to unmarshal mirror metadata: %v", err) - return fmt.Errorf("failed to unmarshal mirror metadata: %w", err) - } - - // 3. Update Hash to match Local File - // User Requirement: "fetching local db hash and cpy same time from remote metada json" - // We preserve all other fields (Timestamp, Events, Uploader, etc.) from the Mirror. - meta.Hash = localHash - meta.Status = "success" // Ensure status is success - - // FIX: Update SizeBytes from local file as well, to match the Hash we just calculated. - // If the file changed locally, its size might have changed too. - info, err := os.Stat(localDBPath) - if err == nil { - meta.SizeBytes = info.Size() - } else { - // Just log warning, Hash is more critical, but SizeBytes mismatch is confusing. - LogInfo("ConstructVerifiedAnchor: Warning: Failed to stat %s for size update: %v", dbName, err) - } - - // FIX: Spec at docs/b2m.md:L658 shows local-version WITHOUT events. - // It serves as a lightweight anchor. History is in the 'version/' mirror. - // meta.Events = nil // Handled globally by UpdateLocalVersion now. - - // 4. Save to Anchor Directory (LocalAnchorDir) - // UpdateLocalVersion handles writing to config.AppConfig.LocalAnchorDir - if err := UpdateLocalVersion(dbName, meta); err != nil { - LogError("ConstructVerifiedAnchor: Failed to save anchor: %v", err) - return fmt.Errorf("failed to save anchor: %w", err) - } - - LogInfo("ConstructVerifiedAnchor: Successfully anchored %s. Hash: %s, TS: %d", dbName, localHash, meta.Timestamp) - return nil -} - // UpdateLocalVersion writes the metadata to db/all_dbs/local-version/.metadata.json // UpdateLocalVersion writes the metadata to db/all_dbs/local-version/.metadata.json func UpdateLocalVersion(dbName string, meta model.Metadata) error { diff --git a/b2-manager/core/upload.go b/b2-manager/core/upload.go index b8864811cc..b7ea12914b 100644 --- a/b2-manager/core/upload.go +++ b/b2-manager/core/upload.go @@ -150,8 +150,6 @@ func PerformUpload(ctx context.Context, dbName string, force bool, onProgress fu } LogInfo("Upload complete for %s", dbName) - LogInfo("Upload complete for %s", dbName) - return nil } diff --git a/b2-manager/core/validation.go b/b2-manager/core/validation.go index c3a2c717c3..76e5587350 100644 --- a/b2-manager/core/validation.go +++ b/b2-manager/core/validation.go @@ -44,18 +44,6 @@ func ValidateAction(dbInfo model.DBStatusInfo, action string) error { // But wait, ValidateAction only takes dbInfo. // core.CalculateDBStatus returns formatted string. // "User 1 is Updating 🔄" - // This relies on string parsing which is brittle. - // BETTER: ValidateAction should maybe take the raw metadata or lock info? - // OR validation check in UI? - // The user request says: "Also should show small warning notification suggesting to downloader that db is updating" - // In `CalculateDBStatus`, we return `LockedByOther` and text "... is Updating ...". - - // Let's rely on string check for now as we don't want to change ValidateAction signature too much yet - // unless we really need to. - // Actually, let's look at `ui/operations.go`: `core.ValidateAction(selectedDB, "download")` - // `selectedDB` is `model.DBStatusInfo`. - - // If status contains "Updating", return warning. // We now check dbInfo.RemoteMetaStatus which is populated from raw metadata // If status is "updating" or "uploading", return warning. if dbInfo.StatusCode == model.StatusCodeLockedByOther && (dbInfo.RemoteMetaStatus == "updating" || dbInfo.RemoteMetaStatus == "uploading") { diff --git a/b2-manager/docs/workflow/cancel.md b/b2-manager/docs/workflow/cancel.md index 2117bc98d6..9ec27ed516 100644 --- a/b2-manager/docs/workflow/cancel.md +++ b/b2-manager/docs/workflow/cancel.md @@ -30,6 +30,8 @@ The cleanup logic is defined in `CleanupOnCancel` (`core/context.go`) and invoke - **Unlock**: - Function calls `UnlockDatabase` with `force=true`. + - **Retry Logic**: Attempts to release the lock up to **3 times** (with 1s delay) to ensure the lock is cleared even if transient network errors occur. + - **Safety Check**: Before deletion, the filename is strictly validated to ensure it ends with `.lock`. This prevents accidental deletion of `.db` files or other critical data if the path construction was malformed. - Deletes the `.lock` file from B2, freeing the database for future operations. ## Diagram diff --git a/b2-manager/docs/workflow/download.md b/b2-manager/docs/workflow/download.md index 059ce42406..831b69ecd2 100644 --- a/b2-manager/docs/workflow/download.md +++ b/b2-manager/docs/workflow/download.md @@ -13,6 +13,7 @@ The download process is executed by `DownloadDatabase` in `core/download.go`. Before starting, `core.ValidateAction` enforces safeguards. - **Local Changes Warning**: If status is "Ready To Upload" (Local Ahead / New Local), the UI warns the user: **"Overwrite local changes?"**. The user must confirm to proceed. +- **Concurrent Update Warning**: If the database is locked by another user and marked as **"Updating"** or **"Uploading"**, the system warns: **"This database is currently being updated... Are you sure?"**. ### Phase 1: Lock Check & Safety @@ -28,11 +29,12 @@ Before starting, `core.ValidateAction` enforces safeguards. Once the download is successful, we anchor the local state to the remote state. -1. **ConstructVerifiedAnchor**: This function calculates the SHA256 of the newly downloaded file. +1. **Construct Anchor**: The system calculates the SHA256 of the newly downloaded file. 2. **Fetch Remote Context**: Reads the latest metadata from the local mirror (`db/all_dbs/.b2m/version/`) to get the `Timestamp` and `Uploader`. 3. **Construct Anchor**: Creates a new metadata object combining the **Local Hash** + **Remote Timestamp**. 4. **Save**: Writes to `local-versions/`. -5. **Result**: Status becomes **"Up to Date ✅"**. +5. **Update Cache**: The persistent `hash.json` cache is updated with the new file's hash and statistics. +6. **Result**: Status becomes **"Up to Date ✅"**. ## Diagram @@ -52,5 +54,6 @@ graph TD CalcHash --> ReadMirror[3. Read Remote Mirror] ReadMirror -- Get Timestamp --> Construct[4. Construct Anchor] Construct --> Save[5. Save Local Version] - Save --> Done[Status: Up to Date] + Save --> Cache[6. Update Cache] + Cache --> Done[Status: Up to Date] ``` diff --git a/b2-manager/docs/workflow/hashing.md b/b2-manager/docs/workflow/hashing.md index c0fbababde..a941ec1177 100644 --- a/b2-manager/docs/workflow/hashing.md +++ b/b2-manager/docs/workflow/hashing.md @@ -48,12 +48,15 @@ if err := core.LoadHashCache(); err != nil { ### Saving -The cache is saved deferred until the application exits: +The cache is saved to disk (`hash.json`) in the following scenarios to avoid recalculating hashes unnecessarily: + +1. **Application Shutdown**: Deferred save in `main.go`. +2. **Post-Operation**: Immediately after a successful **Upload** or **Download** operation (to capture state of the new file). +3. **Hash Calculation**: Whenever a new hash is calculated (cache miss). ```go -defer func() { - if err := core.SaveHashCache(); err != nil { - core.LogError("Failed to save hash cache: %v", err) - } -}() +// Example from core/upload.go +if err := SaveHashCache(); err != nil { + LogInfo("PerformUpload: Warning: Failed to save hash cache: %v", err) +} ``` diff --git a/b2-manager/docs/workflow/status.md b/b2-manager/docs/workflow/status.md index 43ca4d0cdc..647f05c1ca 100644 --- a/b2-manager/docs/workflow/status.md +++ b/b2-manager/docs/workflow/status.md @@ -8,7 +8,7 @@ The `FetchDBStatusData` function (in `core/status.go`) orchestrates the gatherin ### A. List Local Databases -- **Source**: `core/helpers.go` (`getLocalDBs`) +- **Source**: `core/local.go` (`getLocalDBs`) - **Action**: Scans `config.LocalDBDir` for `*.db` files. ### B. Fetch Remote State (DBs + Locks) @@ -23,7 +23,7 @@ The `FetchDBStatusData` function (in `core/status.go`) orchestrates the gatherin ### E. Load Local-Version Anchors -- **Source**: `core/status.go` / `core/helpers.go` +- **Source**: `core/status.go` / `core/metadata.go` - **Action**: Scans `local-version/` directory for metadata files representing the state of the database _at the time of last sync_. This is crucial for 3-way comparison. ### Aggregation diff --git a/b2-manager/docs/workflow/upload.md b/b2-manager/docs/workflow/upload.md index 8e4dc90ec2..e353fd01a6 100644 --- a/b2-manager/docs/workflow/upload.md +++ b/b2-manager/docs/workflow/upload.md @@ -8,6 +8,16 @@ The upload process is orchestrated by `PerformUpload` in `core/upload.go`. > **UI Action**: This workflow is triggered when the user presses **'U'** (or selects "Upload") in the main dashboard. +### Phase 0: Pre-Upload Safety Check + +Before any action, `core.CheckUploadSafety` performs a critical validation to prevent data loss. + +1. **Remote Comparison**: Fetches specific remote metadata for the database. +2. **Safety Logic**: + - **Safe**: If remote hash matches local anchor (we are up-to-date) OR if remote doesn't exist (new DB). + - **Unsafe**: If remote hash differs from local anchor, it means **someone else updated the DB** since we last synced. +3. **Action**: The upload is **aborted** immediately if the safety check fails, preventing accidental overwrite of newer remote data. + ### Phase 1: Locking (Signal Intent) 1. **Acquire Lock**: Creates a `.lock` file on B2 (`dbname.user.host.lock`). @@ -28,7 +38,7 @@ The upload process is orchestrated by `PerformUpload` in `core/upload.go`. ### Phase 4: Anchor Update 1. **Sync Local State**: `PerformUpload` receives the generated metadata from `UploadDatabase` and calls `UpdateLocalVersion` to update the `local-versions/` anchor file. -2. **Efficiency**: This avoids re-calculating the hash locally, as the metadata object already contains the verified hash used for the upload. +2. **Update Cache**: The system recalculates the local file hash and updates the persistent `hash.json` cache. This ensures that subsequent status checks can quickly verify the file state without expensive re-hashing. 3. **Result**: Ensures the system knows the local file is now identical to the remote (Status becomes "Synced"). ### Phase 5: Finalization @@ -40,14 +50,18 @@ The upload process is orchestrated by `PerformUpload` in `core/upload.go`. ```mermaid graph TD - Start[User Presses 'U'] --> Lock[1. Acquire Lock] + Start[User Presses 'U'] --> SafetyCheck{0. Safety Check} + SafetyCheck -- Fail --> Abort[Abort: Remote Newer] + SafetyCheck -- Pass --> Lock[1. Acquire Lock] + Lock --> SetMeta[2. Set Meta: 'uploading'] SetMeta --> Upload[3. Upload File] Upload -- Success --> GenMeta[Generate 'Success' Meta] GenMeta --> UpMeta[Upload Meta] - UpMeta --> Anchor[4. Update Local Anchor] - Anchor --> Unlock[5. Release Lock] + UpMeta --> Anchor[4. Update Anchor] + Anchor --> Cache[Update Cache] + Cache --> Unlock[5. Release Lock] Unlock --> Done[Done] Upload -- Fail --> Cleanup[Cleanup & Unlock] diff --git a/b2-manager/ui/operations.go b/b2-manager/ui/operations.go index e1e1d8290b..28128a63ae 100644 --- a/b2-manager/ui/operations.go +++ b/b2-manager/ui/operations.go @@ -253,10 +253,31 @@ func (lc *ListController) onLock(g *gocui.Gui, v *gocui.View) error { lc.app.updateDBStatus(dbName, "Locked (Updating)...", 100, 0, "lock", "") // Refresh to show new status go func() { - // We sleep briefly to allow the remote change to propagate and be visible to the next list/status fetch. - // This is a heuristic; ideally we should poll or have immediate local state reflection. - time.Sleep(1 * time.Second) - lc.app.refreshStatus() + // Poll for lock propagation to avoid stale state + // We verify that the lock is visible remotely before triggering a full refresh + pollCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-pollCtx.Done(): + // Timeout, status update might be stale but we proceed + lc.app.refreshStatus() + return + case <-ticker.C: + locks, err := core.FetchLocks(pollCtx) + if err == nil { + if _, ok := locks[dbName]; ok { + // Lock is visible, safe to refresh + lc.app.refreshStatus() + return + } + } + } + } }() return nil