Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func startStore(
DataDir: cfg.Storage.DataDir,
FracSize: uint64(cfg.Storage.FracSize),
TotalSize: uint64(cfg.Storage.TotalSize),
SealingQueueLen: uint64(cfg.Storage.SealingQueueLen),
CacheSize: uint64(cfg.Resources.CacheSize),
SortCacheSize: uint64(cfg.Resources.SortDocsCacheSize),
ReplayWorkers: cfg.Resources.ReplayWorkers,
Expand Down Expand Up @@ -280,8 +281,10 @@ func startStore(
SkipSortDocs: !cfg.DocsSorting.Enabled,
KeepMetaFile: false,
},
OffloadingEnabled: cfg.Offloading.Enabled,
OffloadingRetention: cfg.Offloading.Retention,
OffloadingEnabled: cfg.Offloading.Enabled,
OffloadingRetention: cfg.Offloading.Retention,
OffloadingRetryDelay: cfg.Offloading.RetryDelay,
OffloadingQueueSize: uint64(float64(cfg.Storage.TotalSize) * cfg.Offloading.QueueSizePercent / 100),
},
API: storeapi.APIConfig{
StoreMode: configMode,
Expand Down
12 changes: 11 additions & 1 deletion config/config.go
Copy link
Member

@dkharms dkharms Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add corresponding validations (of the new config options) into config/validation.go.

Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ type Config struct {
// TotalSize specifies upper bound of how much disk space can be occupied
// by sealed fractions before they get deleted (or offloaded).
TotalSize Bytes `config:"total_size" default:"1GiB"`
// SealingQueueLen defines the maximum length of the sealing queue.
// If the queue size exceeds this limit, writing to the store will be paused,
// and bulk requests will start returning errors.
// A value of zero disables this limit, allowing writes to proceed unconditionally.
SealingQueueLen int `config:"sealing_queue_len" default:"10"`
} `config:"storage"`

Cluster struct {
Expand Down Expand Up @@ -234,8 +239,13 @@ type Config struct {
// You can learn more about secret keys [here](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html).
SecretKey string `config:"secret_key"`
// RetryCount sets [RetryMaxAttempts] for S3 client which is applied for all API calls.
// Be aware that fraction is suicided when offloading attempts exceeds [RetryCount].
RetryCount int `config:"retry_count" default:"5"`
// Specifies the percentage of total local dataset size allocated to the offloading queue.
// Note: When the queue overflows, the oldest fraction of data is automatically removed.
// This automatic removal is disabled when set to zero.
QueueSizePercent float64 `config:"queue_size_percent" default:"5"`
// Delay duration between consecutive offloading retries
RetryDelay time.Duration `config:"retry_delay" default:"2s"`
} `config:"offloading"`

AsyncSearch struct {
Expand Down
9 changes: 7 additions & 2 deletions fracmanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Config struct {
TotalSize uint64
CacheSize uint64

SuspendThreshold uint64
SealingQueueLen uint64

ReplayWorkers int
MaintenanceDelay time.Duration
CacheCleanupDelay time.Duration
Expand All @@ -28,8 +31,10 @@ type Config struct {
Fraction frac.Config
MinSealFracSize uint64

OffloadingEnabled bool
OffloadingRetention time.Duration
OffloadingEnabled bool
OffloadingQueueSize uint64
OffloadingRetention time.Duration
OffloadingRetryDelay time.Duration
}

func FillConfigWithDefault(config *Config) *Config {
Expand Down
72 changes: 72 additions & 0 deletions fracmanager/fraction_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active
curInfo := old.instance.Info()
r.stats.sealing.Add(curInfo)

r.active.Suspend(old.Suspended())

wg := sync.WaitGroup{}
wg.Add(1)
// since old.WaitWriteIdle() can take some time, we don't want to do it under the lock
Expand All @@ -151,6 +153,31 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active
return old, wg.Wait, nil
}

func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems like return value is never used

r.mu.Lock()
defer r.mu.Unlock()

if maxQueue > 0 && r.stats.sealing.count >= int(maxQueue) {
r.active.Suspend(true)
return true
}

if maxSize > 0 && r.diskUsage() > maxSize {
r.active.Suspend(true)
return true
}

r.active.Suspend(false)
return false
}

func (r *fractionRegistry) diskUsage() uint64 {
return r.active.instance.Info().FullSize() +
r.stats.sealed.totalSizeOnDisk +
r.stats.sealing.totalSizeOnDisk +
r.stats.offloading.totalSizeOnDisk
}

// addActive sets a new active fraction and updates the complete fractions list.
func (r *fractionRegistry) addActive(a *activeProxy) {
r.muAll.Lock()
Expand Down Expand Up @@ -227,6 +254,10 @@ func (r *fractionRegistry) EvictLocal(shouldOffload bool, sizeLimit uint64) ([]*
// Fractions older than retention period are permanently deleted.
// Returns removed fractions or empty slice if nothing to remove.
func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy {
if retention == 0 {
return nil
}

r.mu.Lock()
defer r.mu.Unlock()

Expand All @@ -248,6 +279,42 @@ func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy {
return evicted
}

// EvictOverflowed removes oldest fractions from offloading queue when it exceeds size limit.
// Selects fractions that haven't finished offloading yet to minimize data loss.
// Used when offloading queue grows too large due to slow remote storage performance.
func (r *fractionRegistry) EvictOverflowed(sizeLimit uint64) []*sealedProxy {
if sizeLimit == 0 {
return nil
}

r.mu.Lock()
defer r.mu.Unlock()

// Fast path: skip processing if within size limits
if r.stats.offloading.totalSizeOnDisk <= sizeLimit {
return nil
}

count := 0
evicted := []*sealedProxy{}
// filter fractions
for _, item := range r.offloading {
// keep items that are within limits or already offloaded
if r.stats.offloading.totalSizeOnDisk <= sizeLimit || item.remote != nil {
r.offloading[count] = item
count++
continue
}
evicted = append(evicted, item)
r.stats.offloading.Sub(item.instance.Info())
}

r.offloading = r.offloading[:count]
r.rebuildAllFractions()

return evicted
}

// PromoteToSealed moves fractions from sealing to local queue when sealing completes.
// Maintains strict ordering - younger fractions wait for older ones to seal first.
func (r *fractionRegistry) PromoteToSealed(active *activeProxy, sealed *frac.Sealed) {
Expand Down Expand Up @@ -322,6 +389,11 @@ func (r *fractionRegistry) removeFromOffloading(sealed *sealedProxy) {
count++
}
}

if count == len(r.offloading) { // not found to remove (can be removed earlier in EvictOverflowed)
return
}

r.offloading = r.offloading[:count]
r.stats.offloading.Sub(sealed.instance.Info())

Expand Down
82 changes: 69 additions & 13 deletions fracmanager/lifecycle_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type lifecycleManager struct {
provider *fractionProvider // provider for fraction operations
flags *StateManager // storage state flags
registry *fractionRegistry // fraction state registry
tasks *TaskManager // Background offloading tasks

sealingWg sync.WaitGroup
}
Expand All @@ -36,18 +37,26 @@ func newLifecycleManager(
provider: provider,
flags: flags,
registry: registry,
tasks: NewTaskManager(),
}
}

// Maintain performs periodic lifecycle management tasks.
// It coordinates rotation, offloading, cleanup based on configuration.
func (lc *lifecycleManager) Maintain(ctx context.Context, config *Config, wg *sync.WaitGroup) {
lc.rotate(config.FracSize, wg)
if config.OffloadingEnabled {
lc.offloadLocal(ctx, config.TotalSize, wg)
lc.cleanRemote(config.OffloadingRetention, wg)
func (lc *lifecycleManager) Maintain(ctx context.Context, cfg *Config, wg *sync.WaitGroup) {
Copy link
Member

@dkharms dkharms Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe you can add method Maintain to FracManager as well? It will grab mutexes and call lifecycleManager.Maintain.

Now it's done like this in startMaintWorker:

fm.mu.Lock()
// Perform fraction maintenance (rotation, truncating, offloading, etc.)
fm.lc.Maintain(ctx, cfg, wg)
fm.mu.Unlock()

I do not like that we grab mutexes somewhere in the related code. So I suggest to do just:

// Perform fraction maintenance (rotation, truncating, offloading, etc.)
fm.Maintain(ctx, cfg, wg)


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Empty line.

suspendThreshold := cfg.TotalSize + cfg.TotalSize/100 + cfg.OffloadingQueueSize
lc.registry.SuspendIfOverCapacity(cfg.SealingQueueLen, suspendThreshold)

lc.rotate(cfg.FracSize, wg)
if cfg.OffloadingEnabled {
lc.offloadLocal(ctx, cfg.TotalSize, cfg.OffloadingRetryDelay, wg)
if cfg.OffloadingQueueSize > 0 {
lc.removeOverflowed(cfg.OffloadingQueueSize, wg)
}
Comment on lines +54 to +56
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we check for overflow before making an attempt to offload sealed fractions?

lc.cleanRemote(cfg.OffloadingRetention, wg)
} else {
lc.cleanLocal(config.TotalSize, wg)
lc.cleanLocal(cfg.TotalSize, wg)
}
lc.updateOldestMetric()
lc.SyncInfoCache()
Expand Down Expand Up @@ -113,17 +122,18 @@ func (lc *lifecycleManager) rotate(maxSize uint64, wg *sync.WaitGroup) {

// offloadLocal starts offloading of local fractions to remote storage.
// Selects fractions based on disk space usage and retention policy.
func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64, wg *sync.WaitGroup) {
func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64, retryDelay time.Duration, wg *sync.WaitGroup) {
toOffload, err := lc.registry.EvictLocal(true, sizeLimit)
if err != nil {
logger.Fatal("error releasing old fractions:", zap.Error(err))
}
for _, sealed := range toOffload {
wg.Add(1)
go func() {
lc.tasks.Run(sealed.instance.BaseFileName, ctx, func(ctx context.Context) {
defer wg.Done()

remote, _ := lc.tryOffload(ctx, sealed.instance)
remote := lc.offloadWithRetry(ctx, sealed.instance, retryDelay)

lc.registry.PromoteToRemote(sealed, remote)

if remote == nil {
Expand All @@ -136,7 +146,41 @@ func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64,
// free up local resources
sealed.instance.Suicide()
maintenanceTruncateTotal.Add(1)
}()
})
}
}

// OffloadWithRetry attempts to offload a fraction with retries until success or cancellation.
// Returns the remote fraction instance and a boolean indicating whether offloading was not canceled.
func (lc *lifecycleManager) offloadWithRetry(ctx context.Context, sealed *frac.Sealed, retryDelay time.Duration) *frac.Remote {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we attempted to offload fraction Offloading.RetryCount times and if there is no success we delete the fraction.

Now, there might be more than Offloading.RetryCount attempts.

Also Offloading.RetryDelay is ambiguous. When I saw this option I though that it is duration between S3 client retries. Not the duration between retry groups (let's call it this way).

start := time.Now()
for i := 0; ; i++ {
remote, err := lc.tryOffload(ctx, sealed)
if err == nil {
return remote
}

logger.Warn(
"fail to offload fraction",
zap.String("name", sealed.BaseFileName),
zap.Duration("offloading_time", time.Since(start)),
zap.Int("attempts", i),
zap.Error(err),
)

select {
case <-ctx.Done():
logger.Info(
"fraction offloading was stopped",
zap.String("name", sealed.BaseFileName),
zap.Duration("offloading_time", time.Since(start)),
zap.Int("attempts", i),
zap.Error(ctx.Err()),
)
return nil
case <-time.After(retryDelay):
// Wait before next retry attempt
}
}
}

Expand All @@ -163,9 +207,6 @@ func (lc *lifecycleManager) tryOffload(ctx context.Context, sealed *frac.Sealed)

// cleanRemote deletes outdated remote fractions based on retention policy.
func (lc *lifecycleManager) cleanRemote(retention time.Duration, wg *sync.WaitGroup) {
if retention == 0 {
return
}
toDelete := lc.registry.EvictRemote(retention)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -207,3 +248,18 @@ func (lc *lifecycleManager) updateOldestMetric() {
oldestFracTime.WithLabelValues("remote").Set((time.Duration(lc.registry.OldestTotal()) * time.Millisecond).Seconds())
oldestFracTime.WithLabelValues("local").Set((time.Duration(lc.registry.OldestLocal()) * time.Millisecond).Seconds())
}

// removeOverflowed removes fractions from offloading queue that exceed size limit
// Stops ongoing offloading tasks and cleans up both local and remote resources.
func (lc *lifecycleManager) removeOverflowed(sizeLimit uint64, wg *sync.WaitGroup) {
evicted := lc.registry.EvictOverflowed(sizeLimit)
for _, item := range evicted {
wg.Add(1)
go func() {
defer wg.Done()
// Cancel the offloading task - this operation may take significant time
// hence executed in a separate goroutine to avoid blocking
lc.tasks.Cancel(item.instance.BaseFileName)
}()
}
}
2 changes: 1 addition & 1 deletion fracmanager/lifecycle_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestOldestMetrics(t *testing.T) {
}

wg := sync.WaitGroup{}
lc.offloadLocal(t.Context(), total-halfSize, &wg)
lc.offloadLocal(t.Context(), total-halfSize, 0, &wg)
wg.Wait()

// Check state after offloading
Expand Down
19 changes: 19 additions & 0 deletions fracmanager/proxy_frac.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var (
_ frac.Fraction = (*emptyFraction)(nil)

ErrFractionNotWritable = errors.New("fraction is not writable")
ErrFractionSuspended = errors.New("write operations temporarily suspended - database capacity exceeded")
)

// fractionProxy provides thread-safe access to a fraction with atomic replacement
Expand Down Expand Up @@ -81,6 +82,7 @@ type activeProxy struct {
wg sync.WaitGroup // Tracks pending write operations

finalized bool // Whether fraction is frozen for writes
suspended bool // Temporarily suspended for writes
}

func newActiveProxy(active *frac.Active) *activeProxy {
Expand All @@ -97,6 +99,10 @@ func (p *activeProxy) Append(docs, meta []byte) error {
p.mu.RUnlock()
return ErrFractionNotWritable
}
if p.suspended {
p.mu.RUnlock()
return ErrFractionSuspended
}
p.wg.Add(1) // Important: wg.Add() inside lock to prevent race with WaitWriteIdle()
p.mu.RUnlock()

Expand All @@ -115,6 +121,19 @@ func (p *activeProxy) WaitWriteIdle() {
zap.Float64("time_wait_s", waitTime))
}

func (p *activeProxy) Suspended() bool {
p.mu.Lock()
defer p.mu.Unlock()

return p.suspended
}

func (p *activeProxy) Suspend(value bool) {
p.mu.Lock()
p.suspended = value
p.mu.Unlock()
}

// Finalize marks the fraction as read-only and prevents new writes from starting after finalize.
func (p *activeProxy) Finalize() error {
p.mu.Lock()
Expand Down
Loading
Loading