-
Notifications
You must be signed in to change notification settings - Fork 23
Implement XXHash Caching and Refactor Core Rclone Utilities #121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
577fdc7
067ad2e
508e52d
a3fc0f4
652602b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -167,3 +167,4 @@ frontend/b2m | |
| b2m | ||
| b2-manager/b2m.toml | ||
| b2-manager/db/all_dbs/version/ | ||
| b2-manager/.b2m/ | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,13 +36,6 @@ var AppConfig = Config{ | |
| ToolVersion: "v1.0", | ||
| } | ||
|
|
||
| // Sync Status Constants | ||
| const ( | ||
| SyncStatusLocalOnly = "+" | ||
| SyncStatusRemoteOnly = "-" | ||
| SyncStatusDifferent = "*" | ||
| ) | ||
|
|
||
| // InitializeConfig sets up global configuration variables | ||
| func InitializeConfig() error { | ||
| var err error | ||
|
|
@@ -54,6 +47,47 @@ func InitializeConfig() error { | |
| } | ||
|
|
||
| // Load config from b2m.toml | ||
| // Load config from b2m.toml | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is duplicated. |
||
| if err := loadTOMLConfig(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Validate and set derived paths | ||
| if err := validateAndSetPaths(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Fetch user details | ||
| fetchUserDetails() | ||
|
|
||
| AppConfig.LocalDBDir = filepath.Join(AppConfig.ProjectRoot, "db", "all_dbs") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The path
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will use in toml file There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This issue has been logged and will be investigated. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That sounds like a good plan. Moving There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the update. Using the TOML file for |
||
| AppConfig.LocalVersionDir = filepath.Join(AppConfig.LocalDBDir, ".b2m", "version") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new |
||
| AppConfig.LocalAnchorDir = filepath.Join(AppConfig.LocalDBDir, ".b2m", "local-version") | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func findProjectRoot() (string, error) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
| dir, err := os.Getwd() | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| for { | ||
| if info, err := os.Stat(filepath.Join(dir, "db")); err == nil && info.IsDir() { | ||
| return dir, nil | ||
| } | ||
| if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil { | ||
| return dir, nil | ||
| } | ||
| parent := filepath.Dir(dir) | ||
| if parent == dir { | ||
| return "", fmt.Errorf("root not found (searched for 'db' dir or 'go.mod')") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error message |
||
| } | ||
| dir = parent | ||
| } | ||
| } | ||
|
|
||
| func loadTOMLConfig() error { | ||
| tomlPath := filepath.Join(AppConfig.ProjectRoot, "b2m.toml") | ||
| if _, err := os.Stat(tomlPath); os.IsNotExist(err) { | ||
| return fmt.Errorf("couldn't find b2m.toml file at %s: %w", tomlPath, err) | ||
|
|
@@ -70,62 +104,38 @@ func InitializeConfig() error { | |
| AppConfig.RootBucket = tomlConf.RootBucket | ||
| AppConfig.DiscordWebhookURL = tomlConf.Discord | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func validateAndSetPaths() error { | ||
| if AppConfig.RootBucket == "" { | ||
| return fmt.Errorf("rootbucket not defined in b2m.toml file") | ||
| } | ||
| if AppConfig.DiscordWebhookURL == "" { | ||
| return fmt.Errorf("discord not defined in b2m.toml file") | ||
| } | ||
|
|
||
| // Derived paths | ||
| // Ensure RootBucket ends with / | ||
| if !strings.HasSuffix(AppConfig.RootBucket, "/") { | ||
| AppConfig.RootBucket += "/" | ||
| } | ||
|
|
||
| AppConfig.LockDir = AppConfig.RootBucket + "lock/" | ||
| AppConfig.VersionDir = AppConfig.RootBucket + "version/" | ||
| return nil | ||
| } | ||
|
|
||
| var u *user.User | ||
| u, err = user.Current() | ||
| func fetchUserDetails() { | ||
| u, err := user.Current() | ||
| if err != nil { | ||
| AppConfig.CurrentUser = "unknown" | ||
| } else { | ||
| AppConfig.CurrentUser = u.Username | ||
| } | ||
|
|
||
| var h string | ||
| h, err = os.Hostname() | ||
| h, err := os.Hostname() | ||
| if err != nil { | ||
| AppConfig.Hostname = "unknown" | ||
| } else { | ||
| AppConfig.Hostname = h | ||
| } | ||
|
|
||
| AppConfig.LocalDBDir = filepath.Join(AppConfig.ProjectRoot, "db", "all_dbs") | ||
| AppConfig.LocalVersionDir = filepath.Join(AppConfig.ProjectRoot, "db", "all_dbs", "version") | ||
| AppConfig.LocalAnchorDir = filepath.Join(AppConfig.ProjectRoot, "db", "all_dbs", "local-version") | ||
|
|
||
| // Initialize logging if needed, or other startup tasks | ||
| return nil | ||
| } | ||
|
|
||
| func findProjectRoot() (string, error) { | ||
| dir, err := os.Getwd() | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| for { | ||
| if info, err := os.Stat(filepath.Join(dir, "db")); err == nil && info.IsDir() { | ||
| return dir, nil | ||
| } | ||
| if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil { | ||
| return dir, nil | ||
| } | ||
| parent := filepath.Dir(dir) | ||
| if parent == dir { | ||
| return "", fmt.Errorf("root not found (searched for 'db' dir or 'go.mod')") | ||
| } | ||
| dir = parent | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,7 +21,7 @@ import ( | |
| // 1. Lock Check: Verify that no one else is currently uploading this database. | ||
| // 2. Download: Execute `rclone copy` to pull the file from B2. | ||
| // 3. Anchor: Construct a local "Verified Anchor" (LocalVersion) to mark this state as synced. | ||
| func DownloadDatabase(ctx context.Context, dbName string, onProgress func(model.RcloneProgress)) error { | ||
| func DownloadDatabase(ctx context.Context, dbName string, quiet bool, onProgress func(model.RcloneProgress)) error { | ||
| LogInfo("Downloading database %s", dbName) | ||
|
|
||
| // ------------------------------------------------------------------------- | ||
|
|
@@ -50,59 +50,16 @@ func DownloadDatabase(ctx context.Context, dbName string, onProgress func(model. | |
| // Use directory as destination for 'copy' | ||
| localDir := config.AppConfig.LocalDBDir | ||
|
|
||
| // Changed from copyto to copy for safety/data loss prevention | ||
| rcloneArgs := []string{"copy", | ||
| remotePath, | ||
| localDir, | ||
| "--checksum", | ||
| "--retries", "20", | ||
| "--low-level-retries", "30", | ||
| "--retries-sleep", "10s", | ||
| } | ||
|
|
||
| if onProgress != nil { | ||
| // Removed --verbose to avoid polluting JSON output | ||
| // User reported stats missing without verbose. restoring -v. | ||
| rcloneArgs = append(rcloneArgs, "-v", "--use-json-log", "--stats", "0.5s") | ||
| } else { | ||
| rcloneArgs = append(rcloneArgs, "--progress") | ||
| } | ||
|
|
||
| cmdSync := exec.CommandContext(ctx, "rclone", rcloneArgs...) | ||
|
|
||
| // ------------------------------------------------------------------------- | ||
| // PHASE 2: EXECUTE DOWNLOAD | ||
| // Perform the actual network transfer using `rclone copy`. | ||
| // ------------------------------------------------------------------------- | ||
| if onProgress != nil { | ||
| stderr, err := cmdSync.StderrPipe() | ||
| if err != nil { | ||
| LogError("Failed to get stderr pipe: %v", err) | ||
| return fmt.Errorf("failed to get stderr pipe: %w", err) | ||
| } | ||
| if err := cmdSync.Start(); err != nil { | ||
| LogError("Download start failed: %v", err) | ||
| return fmt.Errorf("download start failed: %w", err) | ||
| } | ||
| go ParseRcloneOutput(stderr, onProgress) | ||
|
|
||
| if err := cmdSync.Wait(); err != nil { | ||
| if ctx.Err() != nil { | ||
| return fmt.Errorf("download cancelled") | ||
| } | ||
| LogError("Download of %s failed: %v", dbName, err) | ||
| return fmt.Errorf("download of %s failed: %w", dbName, err) | ||
| } | ||
| } else { | ||
| cmdSync.Stdout = os.Stdout | ||
| cmdSync.Stderr = os.Stderr | ||
| if err := cmdSync.Run(); err != nil { | ||
| if ctx.Err() != nil { | ||
| return fmt.Errorf("download cancelled") | ||
| } | ||
| LogError("DownloadDatabase rclone copy failed for %s: %v", dbName, err) | ||
| return fmt.Errorf("download of %s failed: %w", dbName, err) | ||
| } | ||
| description := "Downloading " + dbName | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
| // Use the passed quiet parameter | ||
| // The new RcloneCopy uses !quiet for verbose. If onProgress is set, it adds json flags. | ||
| if err := RcloneCopy(ctx, "copy", remotePath, localDir, description, quiet, onProgress); err != nil { | ||
| LogError("DownloadDatabase RcloneCopy failed for %s: %v", dbName, err) | ||
| return fmt.Errorf("download of %s failed: %w", dbName, err) | ||
| } | ||
|
|
||
| // ------------------------------------------------------------------------- | ||
|
|
@@ -120,7 +77,7 @@ func DownloadDatabase(ctx context.Context, dbName string, onProgress func(model. | |
|
|
||
| // 3.1. Calculate Local Hash of the newly downloaded file | ||
| localDBPath := filepath.Join(config.AppConfig.LocalDBDir, dbName) | ||
| localHash, err := CalculateSHA256(localDBPath) | ||
| localHash, err := CalculateXXHash(localDBPath) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed from |
||
| if err != nil { | ||
| LogError("DownloadDatabase: Failed to calculate hash of downloaded file %s: %v", dbName, err) | ||
| return fmt.Errorf("failed to calculate hash of downloaded database: %w", err) | ||
|
|
@@ -177,99 +134,9 @@ func DownloadDatabase(ctx context.Context, dbName string, onProgress func(model. | |
| return fmt.Errorf("failed to update local anchor for %s: %w", dbName, err) | ||
| } else { | ||
| LogInfo("DownloadDatabase: Successfully anchored %s (Hash: %s, Ts: %d)", dbName, localHash, remoteTimestamp) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // DownloadAllDatabases syncs all databases from remote to local | ||
| func DownloadAllDatabases(onProgress func(model.RcloneProgress)) error { | ||
| ctx := GetContext() | ||
|
|
||
| LogInfo("Starting batch download of all databases") | ||
|
|
||
| if err := os.MkdirAll(config.AppConfig.LocalDBDir, 0755); err != nil { | ||
| LogError("Failed to create local directory in DownloadAllDatabases: %v", err) | ||
| return fmt.Errorf("failed to create local directory: %w", err) | ||
| } | ||
|
|
||
| rcloneArgs := []string{"copy", | ||
| config.AppConfig.RootBucket, | ||
| config.AppConfig.LocalDBDir, | ||
| "--checksum", | ||
| "--retries", "20", | ||
| "--low-level-retries", "30", | ||
| "--retries-sleep", "10s", | ||
| } | ||
|
|
||
| if onProgress != nil { | ||
| rcloneArgs = append(rcloneArgs, "--use-json-log", "--stats", "0.5s") | ||
| } else { | ||
| rcloneArgs = append(rcloneArgs, "--progress") | ||
| } | ||
|
|
||
| cmdSync := exec.CommandContext(ctx, "rclone", rcloneArgs...) | ||
|
|
||
| if onProgress != nil { | ||
| stderr, err := cmdSync.StderrPipe() | ||
| if err != nil { | ||
| LogError("Failed to get stderr pipe: %v", err) | ||
| return fmt.Errorf("failed to get stderr pipe: %w", err) | ||
| } | ||
| if err := cmdSync.Start(); err != nil { | ||
| LogError("Batch download start failed: %v", err) | ||
| return fmt.Errorf("batch download start failed: %w", err) | ||
| } | ||
| go ParseRcloneOutput(stderr, onProgress) | ||
|
|
||
| if err := cmdSync.Wait(); err != nil { | ||
| if ctx.Err() != nil { | ||
| LogInfo("DownloadAllDatabases cancelled") | ||
| return fmt.Errorf("batch download cancelled") | ||
| } | ||
| LogError("Batch download failed: %v", err) | ||
| return fmt.Errorf("batch download failed: %w", err) | ||
| } | ||
| } else { | ||
| cmdSync.Stdout = os.Stdout | ||
| cmdSync.Stderr = os.Stderr | ||
| if err := cmdSync.Run(); err != nil { | ||
| if ctx.Err() != nil { | ||
| LogInfo("DownloadAllDatabases cancelled") | ||
| return fmt.Errorf("batch download cancelled") | ||
| } | ||
| LogError("DownloadAllDatabases batch rclone copy failed: %v", err) | ||
| return fmt.Errorf("batch download failed: %w", err) | ||
| } | ||
| } | ||
|
|
||
| // 1. Sync Remote Metadata -> Local Mirror (version/) | ||
| LogInfo("DownloadAllDatabases: Updating metadata mirror...") | ||
|
|
||
| // 1. Remote:VersionDir -> Local:VersionDir (Mirror) | ||
| cmdMirror := exec.CommandContext(ctx, "rclone", "sync", config.AppConfig.VersionDir, config.AppConfig.LocalVersionDir) | ||
| if err := cmdMirror.Run(); err != nil { | ||
| LogError("DownloadAllDatabases: Failed to update metadata mirror: %v", err) | ||
| } else { | ||
| // 2. Iterate over all downloaded DBs and construct Verified Anchors | ||
| // We use the same strict logic as DownloadDatabase | ||
| LogInfo("DownloadAllDatabases: Constructing verified anchors...") | ||
|
|
||
| // Get list of local DBs we just downloaded | ||
| entries, err := os.ReadDir(config.AppConfig.LocalDBDir) | ||
| if err == nil { | ||
| for _, entry := range entries { | ||
| if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".db") { | ||
| dbName := entry.Name() | ||
|
|
||
| // Use shared helper | ||
| if err := ConstructVerifiedAnchor(dbName); err != nil { | ||
| LogError("DownloadAllDatabases: Failed to anchor %s: %v", dbName, err) | ||
| } | ||
| } | ||
| } | ||
| } else { | ||
| LogError("DownloadAllDatabases: Failed to read local db dir: %v", err) | ||
| // Update hash cache on disk as we just calculated it and it is fresh | ||
| if err := SaveHashCache(); err != nil { | ||
| LogInfo("DownloadDatabase: Warning: Failed to save hash cache: %v", err) | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed sync status constants. Clarify if these constants are replaced or if the sync status logic has been removed entirely.