Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
6ed1e2f
feat(config): allow multiple filtering requests
forshev Dec 2, 2025
ce2f9d7
feat(config): allow multiple filtering requests
forshev Dec 2, 2025
5f996e7
feat(config): allow multiple filtering requests
forshev Dec 2, 2025
67eb078
feat: remove imitation PoC - write path
forshev Dec 12, 2025
050a1f7
feat(docsfilter): store lids in binary format
forshev Dec 16, 2025
0de713d
feat(docsfilter): store lids in binary format
forshev Dec 16, 2025
24a48eb
feat(docsfilter): store lids in binary format
forshev Dec 16, 2025
e944da4
feat(docsfilter): method to read filtered lids
forshev Dec 19, 2025
2443add
feat(docsfilter): filter ids on search
forshev Dec 22, 2025
b6dd334
feat(docsfilter): filter ids on fetch for sealed fracs
forshev Dec 26, 2025
97c10ff
feat(docsfilter): support filtering on active fraction
forshev Jan 16, 2026
dec9333
feat(docsfilter): refresh frac tobstones after sealing
forshev Jan 20, 2026
574653c
fix: fix rebase
forshev Jan 20, 2026
c38162b
fix: fix tests
forshev Jan 20, 2026
470caf5
chore(docsfilter): organize tombstone files in blocks
forshev Jan 23, 2026
dc4862f
chore(docsfilter): compress lids blocks in tombstone files
forshev Jan 23, 2026
756c5fe
chore(docsfilter): handle errors
forshev Jan 23, 2026
6340906
chore(docsfilter): add iterator to lazy load tombstones
forshev Feb 4, 2026
a545350
chore(docsfilter): merge multiple iterators into one
forshev Feb 5, 2026
5098d47
chore(docsfilter): use iterators in search and fetch
forshev Feb 5, 2026
ef49f29
chore(docsfilter): remove tombstones when frac is evicted
forshev Feb 6, 2026
d455917
refactor(docsfilter): move common parts from iterators
forshev Feb 6, 2026
e39db32
chore(docsfilter): write metrics
forshev Feb 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 9 additions & 71 deletions asyncsearcher/async_searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (as *AsyncSearcher) storeSearchInfoLocked(id string, info asyncSearchInfo)
panic(err)
}
fpath := path.Join(as.config.DataDir, id+asyncSearchExtInfo)
mustWriteFileAtomic(fpath, b)
util.MustWriteFileAtomic(fpath, b, asyncSearchTmpFile)
info.infoSize.Store(int64(len(b)))
as.requests[id] = info
}
Expand Down Expand Up @@ -416,7 +416,7 @@ func (as *AsyncSearcher) processFrac(f frac.Fraction, info asyncSearchInfo) (err

name := getQPRFilename(info.Request.ID, f.Info().Name())
fpath := path.Join(as.config.DataDir, name)
mustWriteFileAtomic(fpath, rawQPR)
util.MustWriteFileAtomic(fpath, rawQPR, asyncSearchTmpFile)

info.qprsSize.Add(int64(len(rawQPR)))
return nil
Expand Down Expand Up @@ -461,7 +461,7 @@ func (as *AsyncSearcher) findQPRs(id string) ([]string, error) {
files = append(files, path.Join(as.config.DataDir, name))
return nil
}
if err := visitFilesWithExt(des, asyncSearchExtQPR, appendQPRInfoFile); err != nil {
if err := util.VisitFilesWithExt(des, asyncSearchExtQPR, appendQPRInfoFile); err != nil {
return nil, err
}
return files, nil
Expand All @@ -486,7 +486,7 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) {
areQPRsMerged[requestID] = true
return nil
}
if err := visitFilesWithExt(des, asyncSearchExtMergedQPR, loadMergedQPRsInfo); err != nil {
if err := util.VisitFilesWithExt(des, asyncSearchExtMergedQPR, loadMergedQPRsInfo); err != nil {
return nil, err
}

Expand All @@ -506,7 +506,7 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) {
anyRemove = true
return nil
}
if err := visitFilesWithExt(des, asyncSearchExtQPR, removeMergedQPRs); err != nil {
if err := util.VisitFilesWithExt(des, asyncSearchExtQPR, removeMergedQPRs); err != nil {
return nil, err
}

Expand All @@ -518,11 +518,11 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) {
anyRemove = true
return nil
}
if err := visitFilesWithExt(des, asyncSearchTmpFile, removeTmpFiles); err != nil {
if err := util.VisitFilesWithExt(des, asyncSearchTmpFile, removeTmpFiles); err != nil {
return nil, err
}
if anyRemove {
mustFsyncFile(dataDir)
util.MustFsyncFile(dataDir)
}

qprsDuByID := make(map[string]int)
Expand Down Expand Up @@ -588,7 +588,7 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) {
requests[requestID] = info
return nil
}
if err := visitFilesWithExt(des, asyncSearchExtInfo, loadInfos); err != nil {
if err := util.VisitFilesWithExt(des, asyncSearchExtInfo, loadInfos); err != nil {
return nil, err
}
return requests, nil
Expand Down Expand Up @@ -803,7 +803,7 @@ func (as *AsyncSearcher) mergeQPRs(job mergeJob) {
storeMQPR := func(compressed []byte) error {
sizeAfter = len(compressed)
mqprPath := path.Join(as.config.DataDir, job.ID+asyncSearchExtMergedQPR)
mustWriteFileAtomic(mqprPath, compressed)
util.MustWriteFileAtomic(mqprPath, compressed, asyncSearchTmpFile)
return nil
}
if err := compressQPR(&qpr, storeMQPR); err != nil {
Expand Down Expand Up @@ -1022,65 +1022,3 @@ func (as *AsyncSearcher) GetAsyncSearchesList(r GetAsyncSearchesListRequest) []*

return items
}

func mustWriteFileAtomic(fpath string, data []byte) {
fpathTmp := fpath + asyncSearchTmpFile

f, err := os.Create(fpathTmp)
if err != nil {
logger.Fatal("can't create file", zap.Error(err))
}
defer func() {
if err := f.Close(); err != nil {
logger.Fatal("can't close file", zap.Error(err))
}
}()

if _, err := f.Write(data); err != nil {
logger.Fatal("can't write to file", zap.Error(err))
}

if err := f.Sync(); err != nil {
logger.Fatal("can't sync file", zap.Error(err))
}

if err := os.Rename(fpathTmp, fpath); err != nil {
logger.Fatal("can't rename file", zap.Error(err))
}

absFpath, err := filepath.Abs(fpath)
if err != nil {
logger.Fatal("can't get absolute path", zap.String("path", fpath), zap.Error(err))
}
dir := path.Dir(absFpath)
mustFsyncFile(dir)
}

func mustFsyncFile(fpath string) {
dirFile, err := os.Open(fpath)
if err != nil {
logger.Fatal("can't open dir", zap.Error(err))
}
if err := dirFile.Sync(); err != nil {
logger.Fatal("can't sync dir", zap.Error(err))
}
if err := dirFile.Close(); err != nil {
logger.Fatal("can't close dir", zap.Error(err))
}
}

func visitFilesWithExt(des []os.DirEntry, ext string, cb func(name string) error) error {
for _, de := range des {
if de.IsDir() {
continue
}
name := de.Name()
if path.Ext(name) != ext {
continue
}
if err := cb(name); err != nil {
return err
}
}
return nil
}
19 changes: 18 additions & 1 deletion cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ozontech/seq-db/buildinfo"
"github.com/ozontech/seq-db/config"
"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/docsfilter"
"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/fracmanager"
Expand Down Expand Up @@ -309,10 +310,14 @@ func startStore(
From: cfg.Filtering.From,
},
},
Filters: docsfilter.Config{
DataDir: cfg.DocsFilter.DataDir,
Workers: cfg.DocsFilter.Concurrency,
},
}

s3cli := initS3Client(cfg)
store, err := storeapi.NewStore(ctx, sconfig, s3cli, mp)
store, err := storeapi.NewStore(ctx, sconfig, s3cli, mp, docFilterParamsFromCfg(cfg.DocsFilter.Filters))
if err != nil {
logger.Fatal("initializing store", zap.Error(err))
}
Expand Down Expand Up @@ -355,3 +360,15 @@ func initS3Client(cfg config.Config) *s3.Client {
func enableIndexingForAllFields(mappingPath string) bool {
return mappingPath == "auto"
}

func docFilterParamsFromCfg(in []config.Filter) []docsfilter.Params {
out := make([]docsfilter.Params, 0, len(in))
for _, f := range in {
out = append(out, docsfilter.Params{
Query: f.Query,
From: f.From.UnixNano(),
To: f.To.UnixNano(),
})
}
return out
}
19 changes: 12 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,12 @@ type Config struct {
} `config:"tracing"`

// Additional filtering options
Filtering struct {
// If a search query time range overlaps with the [from; to] range
// the search query will be `AND`-ed with an additional predicate with the provided query expression
Query string `config:"query"`
From time.Time `config:"from"`
To time.Time `config:"to"`
} `config:"filtering"`
Filtering Filter `config:"filtering"`
DocsFilter struct {
DataDir string `config:"data_dir"`
Concurrency int `config:"concurrency"`
Filters []Filter `config:"filters"`
} `config:"docs_filter"`
}

type Bytes units.Base2Bytes
Expand All @@ -277,3 +276,9 @@ func (b *Bytes) UnmarshalString(s string) error {
*b = Bytes(bytes)
return nil
}

type Filter struct {
Query string `config:"query"`
From time.Time `config:"from"`
To time.Time `config:"to"`
}
Loading
Loading