Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
245b875
refactor(fracmanager): using fifo queues of fractions
eguguchkin Nov 6, 2025
2e2c673
fix: return data provider release
eguguchkin Nov 21, 2025
2730b2a
feat(frac): new active fraction implementation based in mini-indexes
eguguchkin Dec 6, 2025
113b9c3
reuse slices
eguguchkin Dec 7, 2025
d84e7e5
sealing and merging
eguguchkin Dec 10, 2025
92e0f45
merging optimizations 1 step
eguguchkin Dec 12, 2025
72958fe
merging optimizations 2 step
eguguchkin Dec 16, 2025
d2dd334
full frac write bench
eguguchkin Dec 16, 2025
e015add
enable background merging
eguguchkin Dec 18, 2025
070a63f
tune merge strategy
eguguchkin Dec 19, 2025
84a0879
tune merge strategy: remove tires and use generations
eguguchkin Dec 21, 2025
b32f934
naming + comments
eguguchkin Dec 21, 2025
b858ddb
replace old active with a new one
eguguchkin Dec 24, 2025
f7f3305
add deduplication, fix mid.time overflow, replace aold active with ne…
eguguchkin Dec 26, 2025
06da534
remake clone token table
eguguchkin Dec 26, 2025
c17dfd3
remake clone token table 2
eguguchkin Dec 26, 2025
7860811
disable alloc pools
eguguchkin Dec 26, 2025
210b19b
enable alloc pools
eguguchkin Dec 26, 2025
e62e18f
res pool experiments
eguguchkin Dec 29, 2025
63674a6
fix panic
eguguchkin Dec 29, 2025
26b2845
resources test
eguguchkin Dec 29, 2025
ecf9d59
res pool experiments 2
eguguchkin Dec 29, 2025
fcb45ce
res pool experiments 3
eguguchkin Dec 29, 2025
bff6bcf
res pool experiments 4
eguguchkin Dec 29, 2025
fcc0501
res pool experiments 5
eguguchkin Dec 29, 2025
e73ee17
res pool experiments 6
eguguchkin Dec 29, 2025
e81e9bf
res pool experiments 7
eguguchkin Dec 29, 2025
067dc8a
deduplication tests
eguguchkin Jan 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions asyncsearcher/async_searcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,18 @@ import (
"github.com/stretchr/testify/require"

"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/processor"
"github.com/ozontech/seq-db/mappingprovider"
"github.com/ozontech/seq-db/seq"
)

type fakeFrac struct {
frac.Fraction
info common.Info
info frac.Info
dp fakeDP
}

func (f *fakeFrac) Info() *common.Info {
func (f *fakeFrac) Info() *frac.Info {
return &f.info
}

Expand Down Expand Up @@ -51,7 +50,7 @@ func TestAsyncSearcherMaintain(t *testing.T) {
Retention: time.Hour,
}
fracs := []frac.Fraction{
&fakeFrac{info: common.Info{Path: "1"}},
&fakeFrac{info: frac.Info{Path: "1"}},
}
r.NoError(as.StartSearch(req, fracs))

Expand Down
2 changes: 1 addition & 1 deletion benchmarks/docker-compose-seqdb.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
limits:
cpus: "4"
memory: "8GB"
image: ghcr.io/ozontech/seq-db:v0.61.0
image: 'gitlab-registry.ozon.ru/sre/images/seq-db:che@sha256:82d0dd34cb5d6db9e0450bc8d2cd1d9e29414ec2ba81dc8c4ae643dea6eb1bd0'
ports:
- '9002:9002'
volumes:
Expand Down
6 changes: 3 additions & 3 deletions cmd/distribution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/sealed"
"github.com/ozontech/seq-db/fracmanager"
"github.com/ozontech/seq-db/logger"
Expand Down Expand Up @@ -59,7 +59,7 @@ func readBlock(reader storage.IndexReader, blockIndex uint32) ([]byte, error) {
return data, nil
}

func loadInfo(path string) *common.Info {
func loadInfo(path string) *frac.Info {
indexReader, f := getReader(path)
defer f.Close()

Expand Down Expand Up @@ -87,7 +87,7 @@ func loadInfo(path string) *common.Info {
return b.Info
}

func buildDist(dist *seq.MIDsDistribution, path string, _ *common.Info) {
func buildDist(dist *seq.MIDsDistribution, path string, _ *frac.Info) {
blocksReader, f := getReader(path)
defer f.Close()

Expand Down
4 changes: 2 additions & 2 deletions cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/ozontech/seq-db/config"
"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/fracmanager"
"github.com/ozontech/seq-db/logger"
"github.com/ozontech/seq-db/mappingprovider"
Expand Down Expand Up @@ -259,7 +258,8 @@ func startStore(
MaintenanceDelay: 0,
CacheGCDelay: 0,
CacheCleanupDelay: 0,
SealParams: common.SealParams{
MinSealFracSize: uint64(cfg.Storage.TotalSize) * consts.DefaultMinSealPercent / 100,
SealParams: frac.SealParams{
IDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
LIDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
TokenListZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
Expand Down
2 changes: 1 addition & 1 deletion config/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ var (
ReaderWorkers int

CaseSensitive = false
SkipFsync = false
SkipFsync = true

MaxFetchSizeBytes = 4 * units.MiB

Expand Down
294 changes: 294 additions & 0 deletions frac/active/active.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
package active

import (
"context"
"io"
"os"
"sync"
"time"

"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/config"
"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/active_old"
"github.com/ozontech/seq-db/frac/processor"
"github.com/ozontech/seq-db/logger"
"github.com/ozontech/seq-db/metric"
"github.com/ozontech/seq-db/metric/stopwatch"
"github.com/ozontech/seq-db/seq"
"github.com/ozontech/seq-db/storage"
"github.com/ozontech/seq-db/util"
"go.uber.org/zap"
)

type Active struct {
Config *frac.Config

BaseFileName string

indexer *Indexer

indexes *memIndexPool
merger *mergeManager

docsFile *os.File
docsReader storage.DocsReader
sortReader storage.DocsReader
docsCache *cache.Cache[[]byte]
sortCache *cache.Cache[[]byte]

metaFile *os.File
metaReader storage.DocBlocksReader

writer *active_old.Writer
}

func New(
baseFileName string,
cfg *frac.Config,
workers int,
readLimiter *storage.ReadLimiter,
docsCache *cache.Cache[[]byte],
sortCache *cache.Cache[[]byte],
) *Active {
docsFile, docsStats := util.MustOpenFile(baseFileName+consts.DocsFileSuffix, config.SkipFsync)
metaFile, metaStats := util.MustOpenFile(baseFileName+consts.MetaFileSuffix, config.SkipFsync)

info := frac.NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size()))
indexes := NewIndexPool(info)

f := &Active{
BaseFileName: baseFileName,
Config: cfg,
indexer: NewIndexer(util.NewSemaphore(workers)),
merger: newMergeManager(indexes, util.NewSemaphore(workers)),
indexes: indexes,

docsFile: docsFile,
docsCache: docsCache,
sortCache: sortCache,
docsReader: storage.NewDocsReader(readLimiter, docsFile, docsCache),
sortReader: storage.NewDocsReader(readLimiter, docsFile, sortCache),

metaFile: metaFile,
metaReader: storage.NewDocBlocksReader(readLimiter, metaFile),

writer: active_old.NewWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync),
}

logger.Info("active fraction created", zap.String("fraction", baseFileName))

return f
}

func (f *Active) Replay(ctx context.Context) error {

info := f.indexes.info

logger.Info("start replaying...", zap.String("name", info.Name()))

t := time.Now()

offset := uint64(0)
step := info.MetaOnDisk / 10
wg := sync.WaitGroup{}
next := step

out:
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
meta, metaSize, err := f.metaReader.ReadDocBlock(int64(offset))
if err == io.EOF {
if metaSize != 0 {
logger.Warn("last meta block is partially written, skipping it")
}
break out
}
if err != nil && err != io.EOF {
return err
}

if offset > next {
next += step
progress := float64(offset) / float64(info.MetaOnDisk) * 100
logger.Info("replaying batch, meta",
zap.String("name", info.Name()),
zap.Uint64("from", offset),
zap.Uint64("to", offset+metaSize),
zap.Uint64("target", info.MetaOnDisk),
util.ZapFloat64WithPrec("progress_percentage", progress, 2),
)
}
offset += metaSize

wg.Add(1)
f.indexer.Index(meta, func(idx *memIndex, err error) {
f.AddIndex(idx, 0, 0, err)
wg.Done()
})
}
}

wg.Wait()

tookSeconds := util.DurationToUnit(time.Since(t), "s")
throughputRaw := util.SizeToUnit(info.DocsRaw, "mb") / tookSeconds
throughputMeta := util.SizeToUnit(info.MetaOnDisk, "mb") / tookSeconds
logger.Info("active fraction replayed",
zap.String("name", info.Name()),
zap.Uint32("docs_total", info.DocsTotal),
util.ZapUint64AsSizeStr("docs_size", info.DocsOnDisk),
util.ZapFloat64WithPrec("took_s", tookSeconds, 1),
util.ZapFloat64WithPrec("throughput_raw_mb_sec", throughputRaw, 1),
util.ZapFloat64WithPrec("throughput_meta_mb_sec", throughputMeta, 1),
)
return nil
}

func (f *Active) Append(docs, meta []byte, wg *sync.WaitGroup) (err error) {
sw := stopwatch.New()
ma := sw.Start("append")
if err = f.writer.Write(docs, meta, sw); err != nil {
ma.Stop()
return err
}

mi := sw.Start("send_to_indexer")

f.indexer.Index(meta, func(idx *memIndex, err error) {
f.AddIndex(idx, uint64(len(docs)), uint64(len(meta)), err)
wg.Done()
})

mi.Stop()

ma.Stop()
sw.Export(bulkStagesSeconds)
return nil
}

func (f *Active) AddIndex(idx *memIndex, docsLen, metaLen uint64, err error) {
if err != nil {
logger.Fatal("bulk indexing error", zap.Error(err))
}
f.indexes.Add(idx, docsLen, metaLen)
f.merger.requestMerge()
}

func (f *Active) String() string {
return frac.FracToString(f, "active")
}

func (f *Active) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) {
sw := stopwatch.New()
defer sw.Export(fetcherStagesSec)

t := sw.Start("total")

ss, release := f.indexes.Snapshot()
defer release()

if ss.info.DocsTotal == 0 { // it is empty active fraction state
return nil, nil
}

res := make([][]byte, len(ids))
for _, index := range ss.indexes {
fetchIndex := fetchIndex{index: index, docsReader: &f.docsReader}
if err := processor.IndexFetch(ids, sw, &fetchIndex, res); err != nil {
return nil, err
}
}
t.Stop()

return res, nil
}

func (f *Active) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error) {
ss, release := f.indexes.Snapshot()
defer release()

if ss.info.DocsTotal == 0 { // it is empty active fraction state
metric.CountersTotal.WithLabelValues("empty_data_provider").Inc()
return &seq.QPR{Aggs: make([]seq.AggregatableSamples, len(params.AggQ))}, nil
}

aggLimits := processor.AggLimits(f.Config.Search.AggLimits)

// Limit the parameter range to data boundaries to prevent histogram overflow
params.From = max(params.From, ss.info.From)
params.To = min(params.To, ss.info.To)

sw := stopwatch.New()
defer sw.Export(getActiveSearchMetric(params))

t := sw.Start("total")
qprs := make([]*seq.QPR, 0, len(ss.indexes))
for _, index := range ss.indexes {
si := searchIndex{ctx: ctx, index: index}
qpr, err := processor.IndexSearch(ctx, params, &si, aggLimits, sw)
if err != nil {
return nil, err
}
qprs = append(qprs, qpr)
}
res := processor.MergeQPRs(qprs, params)
res.IDs.ApplyHint(ss.info.Name())
t.Stop()

return res, nil
}

func (f *Active) Info() *frac.Info {
return f.indexes.Info()
}

func (f *Active) Contains(id seq.MID) bool {
return f.Info().IsIntersecting(id, id)
}

func (f *Active) IsIntersecting(from, to seq.MID) bool {
return f.Info().IsIntersecting(from, to)
}

func (f *Active) Release() {
f.releaseMem()

if !f.Config.KeepMetaFile {
util.RemoveFile(f.metaFile.Name())
}

if !f.Config.SkipSortDocs {
// we use sorted docs in sealed fraction so we can remove original docs of active fraction
util.RemoveFile(f.docsFile.Name())
}

}

func (f *Active) Suicide() {
f.releaseMem()

util.RemoveFile(f.metaFile.Name())
util.RemoveFile(f.docsFile.Name())
util.RemoveFile(f.BaseFileName + consts.SdocsFileSuffix)
}

func (f *Active) releaseMem() {
f.writer.Stop()
f.merger.Stop()
f.indexes.Release()

f.docsCache.Release()
f.sortCache.Release()

if err := f.metaFile.Close(); err != nil {
logger.Error("can't close meta file", zap.String("frac", f.BaseFileName), zap.Error(err))
}
if err := f.docsFile.Close(); err != nil {
logger.Error("can't close docs file", zap.String("frac", f.BaseFileName), zap.Error(err))
}
}
Loading