diff --git a/asyncsearcher/async_searcher.go b/asyncsearcher/async_searcher.go index 155ae4f3..d51e8850 100644 --- a/asyncsearcher/async_searcher.go +++ b/asyncsearcher/async_searcher.go @@ -281,7 +281,7 @@ func (as *AsyncSearcher) storeSearchInfoLocked(id string, info asyncSearchInfo) panic(err) } fpath := path.Join(as.config.DataDir, id+asyncSearchExtInfo) - mustWriteFileAtomic(fpath, b) + util.MustWriteFileAtomic(fpath, b, asyncSearchTmpFile) info.infoSize.Store(int64(len(b))) as.requests[id] = info } @@ -416,7 +416,7 @@ func (as *AsyncSearcher) processFrac(f frac.Fraction, info asyncSearchInfo) (err name := getQPRFilename(info.Request.ID, f.Info().Name()) fpath := path.Join(as.config.DataDir, name) - mustWriteFileAtomic(fpath, rawQPR) + util.MustWriteFileAtomic(fpath, rawQPR, asyncSearchTmpFile) info.qprsSize.Add(int64(len(rawQPR))) return nil @@ -461,7 +461,7 @@ func (as *AsyncSearcher) findQPRs(id string) ([]string, error) { files = append(files, path.Join(as.config.DataDir, name)) return nil } - if err := visitFilesWithExt(des, asyncSearchExtQPR, appendQPRInfoFile); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchExtQPR, appendQPRInfoFile); err != nil { return nil, err } return files, nil @@ -486,7 +486,7 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) { areQPRsMerged[requestID] = true return nil } - if err := visitFilesWithExt(des, asyncSearchExtMergedQPR, loadMergedQPRsInfo); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchExtMergedQPR, loadMergedQPRsInfo); err != nil { return nil, err } @@ -506,7 +506,7 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) { anyRemove = true return nil } - if err := visitFilesWithExt(des, asyncSearchExtQPR, removeMergedQPRs); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchExtQPR, removeMergedQPRs); err != nil { return nil, err } @@ -518,11 +518,11 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) { anyRemove = true return nil } - if err := visitFilesWithExt(des, asyncSearchTmpFile, removeTmpFiles); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchTmpFile, removeTmpFiles); err != nil { return nil, err } if anyRemove { - mustFsyncFile(dataDir) + util.MustFsyncFile(dataDir) } qprsDuByID := make(map[string]int) @@ -588,7 +588,7 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) { requests[requestID] = info return nil } - if err := visitFilesWithExt(des, asyncSearchExtInfo, loadInfos); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchExtInfo, loadInfos); err != nil { return nil, err } return requests, nil @@ -803,7 +803,7 @@ func (as *AsyncSearcher) mergeQPRs(job mergeJob) { storeMQPR := func(compressed []byte) error { sizeAfter = len(compressed) mqprPath := path.Join(as.config.DataDir, job.ID+asyncSearchExtMergedQPR) - mustWriteFileAtomic(mqprPath, compressed) + util.MustWriteFileAtomic(mqprPath, compressed, asyncSearchTmpFile) return nil } if err := compressQPR(&qpr, storeMQPR); err != nil { @@ -1022,65 +1022,3 @@ func (as *AsyncSearcher) GetAsyncSearchesList(r GetAsyncSearchesListRequest) []* return items } - -func mustWriteFileAtomic(fpath string, data []byte) { - fpathTmp := fpath + asyncSearchTmpFile - - f, err := os.Create(fpathTmp) - if err != nil { - logger.Fatal("can't create file", zap.Error(err)) - } - defer func() { - if err := f.Close(); err != nil { - logger.Fatal("can't close file", zap.Error(err)) - } - }() - - if _, err := f.Write(data); err != nil { - logger.Fatal("can't write to file", zap.Error(err)) - } - - if err := f.Sync(); err != nil { - logger.Fatal("can't sync file", zap.Error(err)) - } - - if err := os.Rename(fpathTmp, fpath); err != nil { - logger.Fatal("can't rename file", zap.Error(err)) - } - - absFpath, err := filepath.Abs(fpath) - if err != nil { - logger.Fatal("can't get absolute path", zap.String("path", fpath), zap.Error(err)) - } - dir := path.Dir(absFpath) - mustFsyncFile(dir) -} - -func mustFsyncFile(fpath string) { - dirFile, err := os.Open(fpath) - if err != nil { - logger.Fatal("can't open dir", zap.Error(err)) - } - if err := dirFile.Sync(); err != nil { - logger.Fatal("can't sync dir", zap.Error(err)) - } - if err := dirFile.Close(); err != nil { - logger.Fatal("can't close dir", zap.Error(err)) - } -} - -func visitFilesWithExt(des []os.DirEntry, ext string, cb func(name string) error) error { - for _, de := range des { - if de.IsDir() { - continue - } - name := de.Name() - if path.Ext(name) != ext { - continue - } - if err := cb(name); err != nil { - return err - } - } - return nil -} diff --git a/cmd/seq-db/seq-db.go b/cmd/seq-db/seq-db.go index b963875b..27d97402 100644 --- a/cmd/seq-db/seq-db.go +++ b/cmd/seq-db/seq-db.go @@ -21,6 +21,7 @@ import ( "github.com/ozontech/seq-db/buildinfo" "github.com/ozontech/seq-db/config" "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/docsfilter" "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/fracmanager" @@ -309,10 +310,14 @@ func startStore( From: cfg.Filtering.From, }, }, + Filters: docsfilter.Config{ + DataDir: cfg.DocsFilter.DataDir, + Workers: cfg.DocsFilter.Concurrency, + }, } s3cli := initS3Client(cfg) - store, err := storeapi.NewStore(ctx, sconfig, s3cli, mp) + store, err := storeapi.NewStore(ctx, sconfig, s3cli, mp, docFilterParamsFromCfg(cfg.DocsFilter.Filters)) if err != nil { logger.Fatal("initializing store", zap.Error(err)) } @@ -355,3 +360,15 @@ func initS3Client(cfg config.Config) *s3.Client { func enableIndexingForAllFields(mappingPath string) bool { return mappingPath == "auto" } + +func docFilterParamsFromCfg(in []config.Filter) []docsfilter.Params { + out := make([]docsfilter.Params, 0, len(in)) + for _, f := range in { + out = append(out, docsfilter.Params{ + Query: f.Query, + From: f.From.UnixNano(), + To: f.To.UnixNano(), + }) + } + return out +} diff --git a/config/config.go b/config/config.go index 29c285d9..fbf5ed7d 100644 --- a/config/config.go +++ b/config/config.go @@ -258,13 +258,12 @@ type Config struct { } `config:"tracing"` // Additional filtering options - Filtering struct { - // If a search query time range overlaps with the [from; to] range - // the search query will be `AND`-ed with an additional predicate with the provided query expression - Query string `config:"query"` - From time.Time `config:"from"` - To time.Time `config:"to"` - } `config:"filtering"` + Filtering Filter `config:"filtering"` + DocsFilter struct { + DataDir string `config:"data_dir"` + Concurrency int `config:"concurrency"` + Filters []Filter `config:"filters"` + } `config:"docs_filter"` } type Bytes units.Base2Bytes @@ -277,3 +276,9 @@ func (b *Bytes) UnmarshalString(s string) error { *b = Bytes(bytes) return nil } + +type Filter struct { + Query string `config:"query"` + From time.Time `config:"from"` + To time.Time `config:"to"` +} diff --git a/docsfilter/docs_filter.go b/docsfilter/docs_filter.go new file mode 100644 index 00000000..b3b0c1fa --- /dev/null +++ b/docsfilter/docs_filter.go @@ -0,0 +1,384 @@ +package docsfilter + +import ( + "context" + "fmt" + "math" + "os" + "path" + "runtime" + "strings" + "sync" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/processor" + "github.com/ozontech/seq-db/fracmanager" + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/node" + "github.com/ozontech/seq-db/parser" + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/util" +) + +const ( + fracInQueueExt = ".queue" + fracDoneExt = ".filter" + tmpExt = ".tmp" +) + +type MappingProvider interface { + GetMapping() seq.Mapping +} + +type Config struct { + DataDir string + Workers int +} + +type DocsFilter struct { + ctx context.Context + + config Config + filters map[string]*Filter + + fracs map[string][]string + fracsMu *sync.RWMutex + + mp MappingProvider + + rateLimit chan struct{} + createDirOnce *sync.Once +} + +func New( + ctx context.Context, + cfg Config, + params []Params, + mp MappingProvider, +) *DocsFilter { + workers := cfg.Workers + if workers <= 0 { + workers = runtime.GOMAXPROCS(0) + } + + filtersMap := make(map[string]*Filter, len(params)) + + for _, p := range params { + f := NewFilter(p) + filtersMap[string(f.Hash())] = f + } + + return &DocsFilter{ + ctx: ctx, + config: cfg, + filters: filtersMap, + fracs: make(map[string][]string), + fracsMu: &sync.RWMutex{}, + mp: mp, + rateLimit: make(chan struct{}, workers), + createDirOnce: &sync.Once{}, + } +} + +func (df *DocsFilter) Start(fracs fracmanager.List) { + df.createDataDir() + + err := df.loadFilters() + if err != nil { + logger.Fatal("failed to load previous docs filters", zap.Error(err)) + } + + err = df.buildQueue(fracs) + if err != nil { + logger.Fatal("failed to build docs filters queue", zap.Error(err)) + } + + mapping := df.mp.GetMapping() + + for _, f := range df.filters { + ast, err := parser.ParseSeqQL(f.params.Query, mapping) + if err != nil { + panic(fmt.Errorf("BUG: search query must be valid: %s", err)) + } + f.ast = ast + + df.processFilter(f, fracs.FilterInRange(seq.MID(f.params.From), seq.MID(f.params.To))) + } +} + +func (df *DocsFilter) GetTombstonesIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, error) { + df.fracsMu.RLock() + defer df.fracsMu.RUnlock() + + fracFiles, has := df.fracs[fracName] + if !has { + return &EmptyIterator{}, nil + } + + iterators := make([]node.Node, 0, len(fracFiles)) + for _, f := range fracFiles { + loader, err := newLoader(f) + if err != nil { + logger.Error("can't open filtered lids file", zap.String("path", f), zap.Error(err)) + return nil, err + } + if reverse { + iterators = append(iterators, (*IteratorAsc)(NewIterator(loader, minLID, maxLID))) + } else { + iterators = append(iterators, (*IteratorDesc)(NewIterator(loader, minLID, maxLID))) + } + } + + return NewNMergedIterators(iterators, reverse), nil +} + +// RefreshFrac replaces frac's tombstone files with newly found results. Used after active frac is sealed. +func (df *DocsFilter) RefreshFrac(fraction frac.Fraction) { + df.fracsMu.RLock() + fracsFiles, has := df.fracs[fraction.Info().Name()] + df.fracsMu.RUnlock() + + if !has { + return + } + + for _, fileName := range fracsFiles { + filter := df.filters[filterNameFromTombstonesPath(fileName)] + + queueFilePath := path.Join(filter.dirPath, makeFileName(fraction.Info().Name(), fracInQueueExt)) + util.MustWriteFileAtomic(queueFilePath, []byte{}, tmpExt) + + filter.processWg.Add(1) + go func() { + if err := df.processFrac(fraction, filter, false); err != nil { + panic(fmt.Errorf("docs filter refresh frac err: %s", err)) + } + }() + } +} + +// RemoveFrac removes fraction's tombstones. Used after frac is deleted +func (df *DocsFilter) RemoveFrac(fracName string) { + df.fracsMu.RLock() + fracsFiles, has := df.fracs[fracName] + df.fracsMu.RUnlock() + + if !has { + return + } + + for _, fileName := range fracsFiles { + util.RemoveFile(fileName) + } + delete(df.fracs, fracName) +} + +func filterNameFromTombstonesPath(p string) string { + return path.Base(path.Dir(p)) +} + +func (df *DocsFilter) addDoneFrac(fracName, fracPath string) { + df.fracsMu.Lock() + defer df.fracsMu.Unlock() + + df.fracs[fracName] = append(df.fracs[fracName], fracPath) +} + +// loadFilters loads existing filters +func (df *DocsFilter) loadFilters() error { + des, err := os.ReadDir(df.config.DataDir) + if err != nil { + return err + } + + var anyRemove bool + + for _, de := range des { + if !de.IsDir() { + continue + } + + if _, ok := df.filters[de.Name()]; !ok { + logger.Info("there is filter folder on disk, but not in config. need to delete it.") + err := os.RemoveAll(path.Join(df.config.DataDir, de.Name())) + if err != nil && !os.IsNotExist(err) { + return err + } + anyRemove = true + continue + } + + f := df.filters[de.Name()] + f.status = StatusInProgress + f.dirPath = path.Join(df.config.DataDir, de.Name()) + + filterDes, err := os.ReadDir(f.dirPath) + if err != nil { + return fmt.Errorf("reading directory: %s", err) + } + + var hasFracsInQueue bool + + for _, fde := range filterDes { + if fde.IsDir() { + continue + } + name := fde.Name() + + switch path.Ext(name) { + case fracInQueueExt: + hasFracsInQueue = true + case fracDoneExt: + df.addDoneFrac(fracNameFromFilePath(name), path.Join(f.dirPath, name)) + } + } + + if !hasFracsInQueue { + f.status = StatusDone + } + } + + if anyRemove { + util.MustFsyncFile(df.config.DataDir) + } + + return nil +} + +// buildQueue creates a directory for each of unprocessed filters and creates .queue files +func (df *DocsFilter) buildQueue(fracs fracmanager.List) error { + for _, filter := range df.filters { + if filter.status != StatusCreated { + continue + } + filter.dirPath = path.Join(df.config.DataDir, filter.Hash()) + util.MustCreateDir(filter.dirPath) + + filterFracs := fracs.FilterInRange(seq.MID(filter.params.From), seq.MID(filter.params.To)) + for _, f := range filterFracs { + queueFilePath := path.Join(filter.dirPath, makeFileName(f.Info().Name(), fracInQueueExt)) + util.MustWriteFileAtomic(queueFilePath, []byte{}, tmpExt) + } + } + + return nil +} + +// handleFilter finds docs and writes to fs +func (df *DocsFilter) processFilter(filter *Filter, fracs fracmanager.List) { + if len(fracs) == 0 { + return + } + + fracsByName := make(map[string]frac.Fraction) + for _, f := range fracs { + fracsByName[f.Info().Name()] = f + } + + filterDes, err := os.ReadDir(filter.dirPath) + if err != nil { + panic(fmt.Errorf("BUG: reading directory must be successful: %s", err)) + } + + processFracInQueue := func(name string) error { + f := fracsByName[fracNameFromFilePath(name)] + filter.processWg.Add(1) + go func() { + if err := df.processFrac(f, filter, false); err != nil { + panic(fmt.Errorf("docs filter process frac err: %s", err)) + } + }() + + return nil + } + _ = util.VisitFilesWithExt(filterDes, fracInQueueExt, processFracInQueue) + + go func() { + filter.processWg.Wait() + filter.markAsDone() + }() +} + +func (df *DocsFilter) processFrac(f frac.Fraction, filter *Filter, refresh bool) error { + defer filter.processWg.Done() + + df.rateLimit <- struct{}{} + defer func() { <-df.rateLimit }() + + qpr, err := f.Search(df.ctx, processor.SearchParams{ + AST: filter.ast.Root, + From: seq.MID(filter.params.From), + To: seq.MID(filter.params.To), + Limit: math.MaxInt64, + }) + if err != nil { + return err + } + + queueFilePath := path.Join(filter.dirPath, makeFileName(f.Info().Name(), fracInQueueExt)) + doneFilePath := path.Join(filter.dirPath, makeFileName(f.Info().Name(), fracDoneExt)) + + if len(qpr.IDs) == 0 { + util.RemoveFile(queueFilePath) + return nil + } + + storeDocsFilter := func(rawDocsFilter []byte) error { + util.MustWriteFileAtomic(doneFilePath, rawDocsFilter, tmpExt) + util.RemoveFile(queueFilePath) + return nil + } + + // TODO: here we doing part of the work twice: + // first time we find LIDs inside f.Search() and then find IDs by these LIDs. + // Then we again find LIDs by earlier found IDs in f.FindLIDs(). + // We did it like this because otherwise we had to do serious f.Search() rewrite. + // For now we're ok with some performance penalty. + lids, err := f.FindLIDs(df.ctx, qpr.IDs.IDs()) + if err != nil { + return err + } + + docsFilterBin := DocsFilterBinIn{LIDs: lids} + if err := writeDocsFilter(&docsFilterBin, storeDocsFilter); err != nil { + return err + } + + if !refresh { + df.addDoneFrac(f.Info().Name(), doneFilePath) + } + + return nil +} + +func makeFileName(name, ext string) string { + return name + ext +} + +func fracNameFromFilePath(filterFilePath string) string { + return strings.Split(path.Base(filterFilePath), ".")[0] +} + +var marshalBufferPool util.BufferPool + +func writeDocsFilter(df *DocsFilterBinIn, cb func(compressed []byte) error) error { + rawDocsFilter := marshalBufferPool.Get() + defer marshalBufferPool.Put(rawDocsFilter) + + rawDocsFilter.B = marshalDocsFilter(rawDocsFilter.B, df) + if err := cb(rawDocsFilter.B); err != nil { + return err + } + return nil +} + +// createDataDir creates dir data lazily to avoid creating extra folders. +func (df *DocsFilter) createDataDir() { + df.createDirOnce.Do(func() { + if err := os.MkdirAll(df.config.DataDir, 0o777); err != nil { + panic(err) + } + }) +} diff --git a/docsfilter/encoding.go b/docsfilter/encoding.go new file mode 100644 index 00000000..dccfc62f --- /dev/null +++ b/docsfilter/encoding.go @@ -0,0 +1,285 @@ +package docsfilter + +import ( + "encoding/binary" + "errors" + "fmt" + "math" + "unsafe" + + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/util" + "github.com/ozontech/seq-db/zstd" +) + +type DocsFilterBinIn struct { + LIDs []seq.LID +} + +type DocsFilterBinOut struct { + LIDs []uint32 +} + +type docsFilterBinVersion uint8 + +const ( + docsFilterBinVersion1 docsFilterBinVersion = iota + 1 +) + +var availableVersions = map[docsFilterBinVersion]struct{}{ + docsFilterBinVersion1: {}, +} + +type lidsCodec byte + +const ( + lidsCodecDelta = 1 + lidsCodecDeltaZstd = 2 +) + +type lidsBlockHeader struct { + Codec lidsCodec + Length uint32 // Number of LIDs in block + MinLID uint32 + MaxLID uint32 + Size uint32 // Size of ids block in bytes. + Offset uint64 // block's offset in file +} + +func (h *lidsBlockHeader) marshal(dst []byte) { + if len(dst) < int(lidsBlockHeaderSizeBytes) { + panic("BUG: marshal lidsBlockHeader: len(dst) is less than header size") + } + + dst[0] = byte(h.Codec) + dst = dst[1:] + binary.BigEndian.PutUint32(dst, h.Length) + dst = dst[sizeOfUint32:] + binary.BigEndian.PutUint32(dst, h.MinLID) + dst = dst[sizeOfUint32:] + binary.BigEndian.PutUint32(dst, h.MaxLID) + dst = dst[sizeOfUint32:] + binary.BigEndian.PutUint32(dst, h.Size) + dst = dst[sizeOfUint32:] + binary.BigEndian.PutUint64(dst, h.Offset) + dst = dst[sizeOfUint64:] +} + +func (h *lidsBlockHeader) unmarshal(src []byte) ([]byte, error) { + if len(src) < int(lidsBlockHeaderSizeBytes) { + return src, errors.New("too few bytes") + } + + h.Codec = lidsCodec(src[0]) + src = src[1:] + h.Length = binary.BigEndian.Uint32(src) + src = src[sizeOfUint32:] + h.MinLID = binary.BigEndian.Uint32(src) + src = src[sizeOfUint32:] + h.MaxLID = binary.BigEndian.Uint32(src) + src = src[sizeOfUint32:] + h.Size = binary.BigEndian.Uint32(src) + src = src[sizeOfUint32:] + h.Offset = binary.BigEndian.Uint64(src) + src = src[sizeOfUint64:] + + return src, nil +} + +func marshalDocsFilter(dst []byte, in *DocsFilterBinIn) []byte { + dst = append(dst, uint8(docsFilterBinVersion1)) + dst = marshalLIDsBlocks(dst, in.LIDs) + return dst +} + +const ( + sizeOfUint32 = unsafe.Sizeof(uint32(0)) + sizeOfUint64 = unsafe.Sizeof(uint64(0)) +) + +const ( + lidsBlockHeaderSizeBytes = 1 + (4 * sizeOfUint32) + sizeOfUint64 + maxLIDsBlockLen = 1024 +) + +var lidsBlockBufPool util.BufferPool + +func marshalLIDsBlocks(dst []byte, in []seq.LID) []byte { + b := lidsBlockBufPool.Get() + defer lidsBlockBufPool.Put(b) + + numberOfBlocks := (len(in) + maxLIDsBlockLen - 1) / maxLIDsBlockLen + dst = binary.BigEndian.AppendUint32(dst, uint32(numberOfBlocks)) + + // reserve space for headers + curHeaderOffset := len(dst) + dst = append(dst, make([]byte, numberOfBlocks*int(lidsBlockHeaderSizeBytes))...) + + var start int + for range numberOfBlocks { + end := min(maxLIDsBlockLen, len(in[start:])) + chunk := in[start : start+end] + + var codec lidsCodec + b.B, codec = marshalLIDsBlock(b.B[:0], chunk) + if len(b.B) > math.MaxUint32 { + panic(fmt.Errorf("unexpected block length %d; want up to %d", len(b.B), math.MaxUint32)) + } + + header := lidsBlockHeader{ + Codec: codec, + Length: uint32(len(chunk)), + MinLID: uint32(chunk[0]), + MaxLID: uint32(chunk[len(chunk)-1]), + Size: uint32(len(b.B)), + Offset: uint64(len(dst)), + } + header.marshal(dst[curHeaderOffset:]) + curHeaderOffset += int(lidsBlockHeaderSizeBytes) + + dst = append(dst, b.B...) + start += end + } + + return dst +} + +func marshalLIDsBlock(dst []byte, in []seq.LID) ([]byte, lidsCodec) { + b := lidsBlockBufPool.Get() + defer lidsBlockBufPool.Put(b) + + prev := seq.LID(0) + for i := range len(in) { + lid := in[i] + deltaLID := lid - prev + prev = lid + b.B = binary.AppendVarint(b.B, int64(deltaLID)) + } + + orig := dst + dst = zstd.CompressLevel(b.B, dst, getCompressLevel(len(b.B))) + + compressRatio := float64(len(dst)-len(orig)) / float64(len(b.B)) + if compressRatio < 1.05 { + orig = append(orig, b.B...) + return orig, lidsCodecDelta + } + + return dst, lidsCodecDeltaZstd +} + +const minLIDsFIlterBytesLen = 10 // 1 byte lidsBinVersion + 8 byte number of LIDs + N (min 1) bytes varint + delta encoded LIDs + +func unmarshalDocsFilter(dst *DocsFilterBinOut, src []byte) (_ []byte, err error) { + if len(src) < minLIDsFIlterBytesLen { + return nil, fmt.Errorf("invalid LIDs filter format; want %d bytes, got %d", minLIDsFIlterBytesLen, len(src)) + } + + version := docsFilterBinVersion(src[0]) + src = src[1:] + if _, ok := availableVersions[version]; !ok { + return nil, fmt.Errorf("invalid LIDs binary version: %d", version) + } + + dst.LIDs, src, err = unmarshalLIDsBlocks(dst.LIDs, src) + if err != nil { + return src, err + } + + return src, nil +} + +func unmarshalLIDsBlocks(dst []uint32, src []byte) ([]uint32, []byte, error) { + numberOfBlocks := binary.BigEndian.Uint32(src) + src = src[sizeOfUint32:] + + var err error + + headers := make([]lidsBlockHeader, 0, numberOfBlocks) + for range numberOfBlocks { + header := lidsBlockHeader{} + src, err = header.unmarshal(src) + if err != nil { + return dst, src, fmt.Errorf("can't unmarshal lids header: %s", err) + } + headers = append(headers, header) + } + + for i := range numberOfBlocks { + dst, src, err = unmarshalLIDsBlock(dst, src, headers[i]) + if err != nil { + return dst, src, err + } + } + + if len(src) > 0 { + return dst, src, fmt.Errorf("unexpected tail when unmarshaling LIDs blocks") + } + + return dst, src, nil +} + +func unmarshalLIDsBlock(dst []uint32, src []byte, header lidsBlockHeader) ([]uint32, []byte, error) { + if len(src) == 0 { + return dst, src, fmt.Errorf("empty LIDs block") + } + + if header.Size == 0 || int(header.Size) > len(src) { + return nil, src, fmt.Errorf("invalid LIDs block length %d; want %d", len(src), header.Size) + } + + block := src[:header.Size] + src = src[header.Size:] + + var err error + + switch header.Codec { + case lidsCodecDeltaZstd: + b := lidsBlockBufPool.Get() + defer lidsBlockBufPool.Put(b) + b.B, err = zstd.Decompress(block, b.B) + if err != nil { + return dst, src, fmt.Errorf("can't decompress ids block: %s", err) + } + dst, err = unmarshalLIDsDelta(dst, b.B, header) + if err != nil { + return dst, src, err + } + return dst, src, nil + case lidsCodecDelta: + dst, err = unmarshalLIDsDelta(dst, block, header) + if err != nil { + return dst, src, err + } + return dst, src, nil + default: + return dst, src, fmt.Errorf("unknown ids codec: %d", header.Codec) + } +} + +func unmarshalLIDsDelta(dst []uint32, block []byte, header lidsBlockHeader) ([]uint32, error) { + prevLID := uint32(0) + for range header.Length { + v, n := binary.Varint(block) + block = block[n:] + lid := prevLID + uint32(v) + prevLID = lid + dst = append(dst, lid) + } + + if len(block) > 0 { + return dst, fmt.Errorf("unexpected tail when unmarshaling LIDs block") + } + + return dst, nil +} + +func getCompressLevel(size int) int { + level := 3 + if size <= 512 { + level = 1 + } else if size <= 4*1024 { + level = 2 + } + return level +} diff --git a/docsfilter/encoding_test.go b/docsfilter/encoding_test.go new file mode 100644 index 00000000..777285fb --- /dev/null +++ b/docsfilter/encoding_test.go @@ -0,0 +1,42 @@ +package docsfilter + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/seq" +) + +func TestMarshalUnmarshalLIDsFilter(t *testing.T) { + test := func(df DocsFilterBinIn) { + t.Helper() + + rawDocsFilter := marshalDocsFilter(nil, &df) + var out DocsFilterBinOut + tail, err := unmarshalDocsFilter(&out, rawDocsFilter) + require.NoError(t, err) + require.Equal(t, 0, len(tail)) + assert.Equal(t, lidsToUint32s(df.LIDs), out.LIDs) + } + + test(DocsFilterBinIn{LIDs: []seq.LID{0, 1, 2, 3}}) + test(DocsFilterBinIn{LIDs: []seq.LID{10, 15, 22, 18, 105, 1010}}) + test(DocsFilterBinIn{LIDs: []seq.LID{11}}) + + multipleBlocksSize := maxLIDsBlockLen*3 + 15 + multipleBlocksLIDs := make([]seq.LID, 0, multipleBlocksSize) + for i := range multipleBlocksSize { + multipleBlocksLIDs = append(multipleBlocksLIDs, seq.LID(i)) + } + test(DocsFilterBinIn{LIDs: multipleBlocksLIDs}) +} + +func lidsToUint32s(in []seq.LID) []uint32 { + out := make([]uint32, 0, len(in)) + for _, i := range in { + out = append(out, uint32(i)) + } + return out +} diff --git a/docsfilter/filter.go b/docsfilter/filter.go new file mode 100644 index 00000000..a5bc7770 --- /dev/null +++ b/docsfilter/filter.go @@ -0,0 +1,64 @@ +package docsfilter + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "sync" + + "github.com/ozontech/seq-db/parser" +) + +type FilterStatus byte + +const ( + StatusCreated FilterStatus = iota + StatusInProgress + StatusDone + StatusError +) + +type Params struct { + Query string + From int64 + To int64 +} + +type Filter struct { + params Params + + status FilterStatus + + ast parser.SeqQLQuery + + hash string + dirPath string + + processWg *sync.WaitGroup +} + +func NewFilter(params Params) *Filter { + return &Filter{ + params: params, + status: StatusCreated, + processWg: &sync.WaitGroup{}, + } +} + +func (f *Filter) String() string { + return fmt.Sprintf("%s_%d_%d", f.params.Query, f.params.From, f.params.To) +} + +func (f *Filter) Hash() string { + if f.hash == "" { + h := sha256.New() + h.Write([]byte(f.String())) + bs := h.Sum(nil) + f.hash = hex.EncodeToString(bs) + } + return f.hash +} + +func (f *Filter) markAsDone() { + f.status = StatusDone +} diff --git a/docsfilter/iterator.go b/docsfilter/iterator.go new file mode 100644 index 00000000..1aa2845f --- /dev/null +++ b/docsfilter/iterator.go @@ -0,0 +1,26 @@ +package docsfilter + +type Iterator struct { + loader *loader + + minLID uint32 + maxLID uint32 + + blockIndex int + tryNextBlock bool + + lids []uint32 +} + +func NewIterator( + loader *loader, + minLID uint32, + maxLID uint32, +) *Iterator { + return &Iterator{ + loader: loader, + minLID: minLID, + maxLID: maxLID, + tryNextBlock: true, + } +} diff --git a/docsfilter/iterator_asc.go b/docsfilter/iterator_asc.go new file mode 100644 index 00000000..679b6a65 --- /dev/null +++ b/docsfilter/iterator_asc.go @@ -0,0 +1,96 @@ +package docsfilter + +import ( + "sort" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/logger" +) + +type IteratorAsc Iterator + +func (it *IteratorAsc) String() string { + return "TOMBSTONES_ITERATOR_ASC" +} + +func (it *IteratorAsc) Next() (uint32, bool) { + if it.loader.headers == nil { + err := it.loader.loadHeaders() + if err != nil { + logger.Panic("can't load tombstones headers", zap.Error(err)) + } + it.blockIndex = len(it.loader.headers) - 1 + } + + for len(it.lids) == 0 { + if !it.tryNextBlock { + if err := it.loader.release(); err != nil { + logger.Panic("error closing loader", zap.Error(err)) + } + return 0, false + } + + it.loadNextLIDsBlock() + it.lids = it.narrowLIDsRange(it.lids) + } + + i := len(it.lids) - 1 + lid := it.lids[i] + it.lids = it.lids[:i] + return lid, true +} + +func (it *IteratorAsc) loadNextLIDsBlock() { + hasLIDsInRange := it.hasLIDsInRange() + if !hasLIDsInRange { + it.needTryNextBlock() + return + } + + block, err := it.loader.loadBlock(it.blockIndex) + if err != nil { + logger.Panic("error loading LIDs block", zap.Error(err)) + } + + it.lids = block + it.needTryNextBlock() +} + +func (it *IteratorAsc) hasLIDsInRange() bool { + if it.loader.headers[it.blockIndex].MinLID > it.maxLID { + return false + } + if it.loader.headers[it.blockIndex].MaxLID < it.minLID { + return false + } + + return true +} + +func (it *IteratorAsc) needTryNextBlock() { + it.tryNextBlock = it.blockIndex > 0 + it.blockIndex-- +} + +// narrowLIDsRange cuts LIDs between from and to. Returns new lids +func (it *IteratorAsc) narrowLIDsRange(lids []uint32) []uint32 { + if len(lids) == 0 { + return lids + } + + first := lids[0] + last := lids[len(lids)-1] + + if it.minLID > first { + left := sort.Search(len(lids), func(i int) bool { return lids[i] >= it.minLID }) + lids = lids[left:] + } + + if it.maxLID <= last { + right := sort.Search(len(lids), func(i int) bool { return lids[i] > it.maxLID }) + lids = lids[:right] + } + + return lids +} diff --git a/docsfilter/iterator_asc_test.go b/docsfilter/iterator_asc_test.go new file mode 100644 index 00000000..17ab86ea --- /dev/null +++ b/docsfilter/iterator_asc_test.go @@ -0,0 +1,71 @@ +package docsfilter + +import ( + "math" + "os" + "path/filepath" + "slices" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/seq" +) + +func TestIteratorAsc(t *testing.T) { + multipleBlocksSize := maxLIDsBlockLen*3 + 15 + multipleBlocksLIDs := make([]seq.LID, 0, multipleBlocksSize) + for i := range multipleBlocksSize { + multipleBlocksLIDs = append(multipleBlocksLIDs, seq.LID(i)) + } + + reversed := make([]uint32, len(multipleBlocksLIDs)) + copy(reversed, lidsToUint32s(multipleBlocksLIDs)) + slices.Reverse(reversed) + + type testCase struct { + title string + minLID, maxLID uint32 + expected []uint32 + } + + tests := []testCase{ + { + title: "ok_without_borders", + minLID: 0, + maxLID: math.MaxUint32, + expected: reversed, + }, + { + title: "ok_with_borders", + minLID: maxLIDsBlockLen + 10, + maxLID: uint32(len(multipleBlocksLIDs) - (maxLIDsBlockLen + 10)), + expected: reversed[maxLIDsBlockLen+9 : len(multipleBlocksLIDs)-(maxLIDsBlockLen+10)], + }, + { + title: "ok_out_of_borders", + minLID: uint32(len(multipleBlocksLIDs) + 100), + maxLID: uint32(len(multipleBlocksLIDs) + 200), + expected: []uint32{}, + }, + } + + for _, tc := range tests { + t.Run(tc.title, func(t *testing.T) { + rawDocsFilter := marshalDocsFilter(nil, &DocsFilterBinIn{LIDs: multipleBlocksLIDs}) + filePath := filepath.Join(t.TempDir(), "some.filter") + err := os.WriteFile(filePath, rawDocsFilter, 0o644) + require.NoError(t, err) + + loader, err := newLoader(filePath) + require.NoError(t, err) + + iterator := (*IteratorAsc)(NewIterator(loader, tc.minLID, tc.maxLID)) + resLIDs := make([]uint32, 0, len(tc.expected)) + for lid, has := iterator.Next(); has; lid, has = iterator.Next() { + resLIDs = append(resLIDs, lid) + } + require.Equal(t, tc.expected, resLIDs) + }) + } +} diff --git a/docsfilter/iterator_desc.go b/docsfilter/iterator_desc.go new file mode 100644 index 00000000..ec7806e9 --- /dev/null +++ b/docsfilter/iterator_desc.go @@ -0,0 +1,94 @@ +package docsfilter + +import ( + "sort" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/logger" +) + +type IteratorDesc Iterator + +func (it *IteratorDesc) String() string { + return "TOMBSTONES_ITERATOR_DESC" +} + +func (it *IteratorDesc) Next() (uint32, bool) { + if it.loader.headers == nil { + err := it.loader.loadHeaders() + if err != nil { + logger.Panic("can't load tombstones headers", zap.Error(err)) + } + } + + for len(it.lids) == 0 { + if !it.tryNextBlock { + if err := it.loader.release(); err != nil { + logger.Panic("error closing loader", zap.Error(err)) + } + return 0, false + } + + it.loadNextLIDsBlock() + it.lids = it.narrowLIDsRange(it.lids) + } + + lid := it.lids[0] + it.lids = it.lids[1:] + return lid, true +} + +func (it *IteratorDesc) loadNextLIDsBlock() { + hasLIDsInRange := it.hasLIDsInRange() + if !hasLIDsInRange { + it.needTryNextBlock() + return + } + + block, err := it.loader.loadBlock(it.blockIndex) + if err != nil { + logger.Panic("error loading LIDs block", zap.Error(err)) + } + + it.lids = block + it.needTryNextBlock() +} + +func (it *IteratorDesc) hasLIDsInRange() bool { + if it.loader.headers[it.blockIndex].MinLID > it.maxLID { + return false + } + if it.loader.headers[it.blockIndex].MaxLID < it.minLID { + return false + } + + return true +} + +func (it *IteratorDesc) needTryNextBlock() { + it.tryNextBlock = it.blockIndex < len(it.loader.headers)-1 + it.blockIndex++ +} + +// narrowLIDsRange cuts LIDs between from and to. Returns new lids +func (it *IteratorDesc) narrowLIDsRange(lids []uint32) []uint32 { + if len(lids) == 0 { + return lids + } + + first := lids[0] + last := lids[len(lids)-1] + + if it.minLID > first { + left := sort.Search(len(lids), func(i int) bool { return lids[i] >= it.minLID }) + lids = lids[left:] + } + + if it.maxLID <= last { + right := sort.Search(len(lids), func(i int) bool { return lids[i] > it.maxLID }) + lids = lids[:right] + } + + return lids +} diff --git a/docsfilter/iterator_desc_test.go b/docsfilter/iterator_desc_test.go new file mode 100644 index 00000000..f4fe1930 --- /dev/null +++ b/docsfilter/iterator_desc_test.go @@ -0,0 +1,66 @@ +package docsfilter + +import ( + "math" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/seq" +) + +func TestIteratorDesc(t *testing.T) { + multipleBlocksSize := maxLIDsBlockLen*3 + 15 + multipleBlocksLIDs := make([]seq.LID, 0, multipleBlocksSize) + for i := range multipleBlocksSize { + multipleBlocksLIDs = append(multipleBlocksLIDs, seq.LID(i)) + } + + type testCase struct { + title string + minLID, maxLID uint32 + expected []uint32 + } + + tests := []testCase{ + { + title: "ok_without_borders", + minLID: 0, + maxLID: math.MaxUint32, + expected: lidsToUint32s(multipleBlocksLIDs), + }, + { + title: "ok_with_borders", + minLID: maxLIDsBlockLen + 10, + maxLID: uint32(len(multipleBlocksLIDs) - (maxLIDsBlockLen + 10)), + expected: lidsToUint32s(multipleBlocksLIDs[maxLIDsBlockLen+10 : len(multipleBlocksLIDs)-(maxLIDsBlockLen+9)]), + }, + { + title: "ok_out_of_borders", + minLID: uint32(len(multipleBlocksLIDs) + 100), + maxLID: uint32(len(multipleBlocksLIDs) + 200), + expected: []uint32{}, + }, + } + + for _, tc := range tests { + t.Run(tc.title, func(t *testing.T) { + rawDocsFilter := marshalDocsFilter(nil, &DocsFilterBinIn{LIDs: multipleBlocksLIDs}) + filePath := filepath.Join(t.TempDir(), "some.filter") + err := os.WriteFile(filePath, rawDocsFilter, 0o644) + require.NoError(t, err) + + loader, err := newLoader(filePath) + require.NoError(t, err) + + iterator := (*IteratorDesc)(NewIterator(loader, tc.minLID, tc.maxLID)) + resLIDs := make([]uint32, 0, len(tc.expected)) + for lid, has := iterator.Next(); has; lid, has = iterator.Next() { + resLIDs = append(resLIDs, lid) + } + require.Equal(t, tc.expected, resLIDs) + }) + } +} diff --git a/docsfilter/loader.go b/docsfilter/loader.go new file mode 100644 index 00000000..ca2908ef --- /dev/null +++ b/docsfilter/loader.go @@ -0,0 +1,121 @@ +package docsfilter + +import ( + "encoding/binary" + "fmt" + "io" + "os" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/logger" +) + +type loader struct { + headers []lidsBlockHeader + file *os.File + // TODO: seems like cache needs to be populated somewhere outside of this struct and passed here + // cache *cache.Cache[[]lidsBlockHeader] +} + +func newLoader(filePath string) (*loader, error) { + f, err := os.Open(filePath) + if err != nil { + return nil, err + } + + return &loader{ + file: f, + }, nil +} + +func (l *loader) loadHeaders() error { + numBuf := make([]byte, 1+4) // block version 1 byte + number of blocks 4 bytes + n, err := l.file.ReadAt(numBuf, 0) + if err != nil { + return fmt.Errorf("can't read headers from disk: %s", err.Error()) + } + if n == 0 { + return fmt.Errorf("can't read headers from disk: n=0") + } + + version := docsFilterBinVersion(numBuf[0]) + if _, ok := availableVersions[version]; !ok { + return fmt.Errorf("invalid LIDs binary version: %d", version) + } + + headersPos := n + numberOfBlocks := binary.BigEndian.Uint32(numBuf[1:]) + headersBuf := make([]byte, numberOfBlocks*uint32(lidsBlockHeaderSizeBytes)) + + n, err = l.file.ReadAt(headersBuf, int64(headersPos)) + if err != nil && err != io.EOF { + return fmt.Errorf("can't read headers, %s", err.Error()) + } + if n != len(headersBuf) { + return fmt.Errorf("can't read headers, read=%d, requested=%d", n, len(headersBuf)) + } + if len(headersBuf)%int(lidsBlockHeaderSizeBytes) != 0 { + return fmt.Errorf("wrong headers format") + } + + l.headers = make([]lidsBlockHeader, 0, numberOfBlocks) + for range numberOfBlocks { + header := lidsBlockHeader{} + headersBuf, err = header.unmarshal(headersBuf) + if err != nil { + return fmt.Errorf("can't unmarshal lids header: %s", err) + } + l.headers = append(l.headers, header) + } + + if len(headersBuf) > 0 { + return fmt.Errorf("unexpected tail when unmarshaling LIDs headers") + } + + return nil +} + +func (l *loader) loadBlock(index int) ([]uint32, error) { + if l.headers == nil { + err := l.loadHeaders() + if err != nil { + return nil, err + } + } + + if len(l.headers) < index+1 { + return nil, fmt.Errorf("can't load block: headers len=%d, index=%d", len(l.headers), index) + } + + header := l.headers[index] + + blockBuf := make([]byte, header.Size) // TODO: buffer pool (???) + n, err := l.file.ReadAt(blockBuf, int64(header.Offset)) + if err != nil { + return nil, err + } + if n != len(blockBuf) { + return nil, fmt.Errorf("can't read lids block, read=%d, requested=%d", n, len(blockBuf)) + } + + lids := make([]uint32, 0, header.Length) + lids, blockBuf, err = unmarshalLIDsBlock(lids, blockBuf, header) + if err != nil { + return nil, err + } + + if len(blockBuf) > 0 { + return nil, fmt.Errorf("unexpected tail when unmarshaling LIDs block") + } + + return lids, nil +} + +func (l *loader) release() error { + if err := l.file.Close(); err != nil { + logger.Error("can't close tombstones file", zap.Error(err)) + return err + } + return nil +} diff --git a/docsfilter/loader_test.go b/docsfilter/loader_test.go new file mode 100644 index 00000000..5ed946bc --- /dev/null +++ b/docsfilter/loader_test.go @@ -0,0 +1,42 @@ +package docsfilter + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/seq" +) + +func TestLoader(t *testing.T) { + multipleBlocksSize := maxLIDsBlockLen*3 + 15 + multipleBlocksLIDs := make([]seq.LID, 0, multipleBlocksSize) + for i := range multipleBlocksSize { + multipleBlocksLIDs = append(multipleBlocksLIDs, seq.LID(i)) + } + + rawDocsFilter := marshalDocsFilter(nil, &DocsFilterBinIn{LIDs: multipleBlocksLIDs}) + filePath := filepath.Join(t.TempDir(), "some.filter") + err := os.WriteFile(filePath, rawDocsFilter, 0o644) + require.NoError(t, err) + + loader, err := newLoader(filePath) + require.NoError(t, err) + + err = loader.loadHeaders() + require.NoError(t, err) + require.Len(t, loader.headers, 4) + + resLIDs := make([]uint32, 0, len(multipleBlocksLIDs)) + const numberOfBlocks = 4 + for i := range numberOfBlocks { + block, err := loader.loadBlock(i) + require.NoError(t, err) + resLIDs = append(resLIDs, block...) + } + require.Equal(t, lidsToUint32s(multipleBlocksLIDs), resLIDs) + + require.NoError(t, loader.release()) +} diff --git a/docsfilter/merged_iterator.go b/docsfilter/merged_iterator.go new file mode 100644 index 00000000..742e80c6 --- /dev/null +++ b/docsfilter/merged_iterator.go @@ -0,0 +1,108 @@ +package docsfilter + +import "github.com/ozontech/seq-db/node" + +func NewNMergedIterators(iterators []node.Node, reverse bool) node.Node { + if len(iterators) == 0 { + return &EmptyIterator{} + } + + if len(iterators) == 1 { + return iterators[0] + } + + merged := NewMergedIterator(iterators[0], iterators[1], reverse) + for _, s := range iterators[2:] { + merged = NewMergedIterator(merged, s, reverse) + } + return merged +} + +type MergedIterator struct { + a, b node.Node + curA, curB uint32 + init bool + lessFn func(a, b uint32) bool +} + +func NewMergedIterator( + a, b node.Node, + reverse bool, +) node.Node { + lessFn := func(a, b uint32) bool { return a < b } + if reverse { + lessFn = func(a, b uint32) bool { return a > b } + } + + return &MergedIterator{ + a: a, + b: b, + init: false, + lessFn: lessFn, + } +} + +func (it *MergedIterator) String() string { + return "MERGED_TOMBSTONES_ITERATOR" +} + +func (it *MergedIterator) Next() (uint32, bool) { + if !it.init { + it.readA() + it.readB() + it.init = true + } + + if it.a == nil && it.b == nil { + return 0, false + } + if it.a == nil { + return it.readB(), true + } + if it.b == nil { + return it.readA(), true + } + + if it.curA == it.curB { + it.readA() // skip duplicate + if it.a == nil { + return it.readB(), true + } + } + if it.lessFn(it.curB, it.curA) { + return it.readB(), true + } + return it.readA(), true +} + +func (it *MergedIterator) readA() uint32 { + var has bool + current := it.curA + + if it.curA, has = it.a.Next(); !has { + it.a = nil // stop reading a + } + + return current +} + +func (it *MergedIterator) readB() uint32 { + var has bool + current := it.curB + + if it.curB, has = it.b.Next(); !has { + it.b = nil // stop reading b + } + + return current +} + +type EmptyIterator struct{} + +func (it *EmptyIterator) String() string { + return "EMPTY_TOMBSTONES_ITERATOR" +} + +func (it *EmptyIterator) Next() (uint32, bool) { + return 0, false +} diff --git a/docsfilter/merged_iterator_test.go b/docsfilter/merged_iterator_test.go new file mode 100644 index 00000000..1a5b23c3 --- /dev/null +++ b/docsfilter/merged_iterator_test.go @@ -0,0 +1,67 @@ +package docsfilter + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/node" +) + +func TestMergedIterator(t *testing.T) { + iterators := []node.Node{ + &testIterator{lids: []uint32{1, 2, 5, 22, 45}}, + &testIterator{lids: []uint32{2, 3, 9, 15, 33, 45}}, + &testIterator{lids: []uint32{1, 7, 8, 45}}, + } + + mergedIterator := NewNMergedIterators(iterators, false) + resLIDs := make([]uint32, 0) + for { + lid, has := mergedIterator.Next() + if !has { + break + } + resLIDs = append(resLIDs, lid) + + } + require.Equal(t, []uint32{1, 2, 3, 5, 7, 8, 9, 15, 22, 33, 45}, resLIDs) +} + +func TestMergedIteratorReverse(t *testing.T) { + iterators := []node.Node{ + &testIterator{lids: []uint32{45, 22, 5, 2, 1}}, + &testIterator{lids: []uint32{45, 33, 15, 9, 3, 2}}, + &testIterator{lids: []uint32{45, 8, 7, 1}}, + } + + mergedIterator := NewNMergedIterators(iterators, true) + resLIDs := make([]uint32, 0) + for { + lid, has := mergedIterator.Next() + if !has { + break + } + resLIDs = append(resLIDs, lid) + + } + require.Equal(t, []uint32{45, 33, 22, 15, 9, 8, 7, 5, 3, 2, 1}, resLIDs) +} + +type testIterator struct { + lids []uint32 +} + +func (it *testIterator) String() string { + return "TEST_TOMBSTONES_ITERATOR" +} + +func (it *testIterator) Next() (uint32, bool) { + if len(it.lids) == 0 { + return 0, false + } + + lid := it.lids[0] + it.lids = it.lids[1:] + return lid, true +} diff --git a/docsfilter/metrics.go b/docsfilter/metrics.go new file mode 100644 index 00000000..77ecdbe7 --- /dev/null +++ b/docsfilter/metrics.go @@ -0,0 +1,27 @@ +package docsfilter + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// nolint:unused // in progress +var ( + activeFilters = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "seq_db_store", + Subsystem: "filters", + Name: "in_progress", + Help: "Number of doc filters in progress", + }) + diskUsage = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "seq_db_store", + Subsystem: "filters", + Name: "disk_usage_bytes", + }, []string{"file_type"}) + storedFilters = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "seq_db_store", + Subsystem: "filters", + Name: "stored", + Help: "Number of active doc filters", + }) +) diff --git a/frac/active.go b/frac/active.go index e16b48a1..9594d527 100644 --- a/frac/active.go +++ b/frac/active.go @@ -46,6 +46,7 @@ type Active struct { TokenList *TokenList DocsPositions *DocsPositions + IDsToLIDs *ActiveLIDs // TODO: (???) docsFile *os.File docsReader storage.DocsReader @@ -58,6 +59,8 @@ type Active struct { writer *ActiveWriter indexer *ActiveIndexer + + docsFilter DocsFilter } const ( @@ -77,6 +80,7 @@ func NewActive( docsCache *cache.Cache[[]byte], sortCache *cache.Cache[[]byte], cfg *Config, + docsFilter DocsFilter, ) *Active { docsFile, docsStats := mustOpenFile(baseFileName+consts.DocsFileSuffix, config.SkipFsync) metaFile, metaStats := mustOpenFile(baseFileName+consts.MetaFileSuffix, config.SkipFsync) @@ -84,6 +88,7 @@ func NewActive( f := &Active{ TokenList: NewActiveTokenList(config.IndexWorkers), DocsPositions: NewSyncDocsPositions(), + IDsToLIDs: NewActiveLIDs(), MIDs: NewIDs(), RIDs: NewIDs(), DocBlocks: NewIDs(), @@ -103,6 +108,8 @@ func NewActive( BaseFileName: baseFileName, info: common.NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size())), Config: cfg, + + docsFilter: docsFilter, } // use of 0 as keys in maps is prohibited – it's system key, so add first element @@ -288,6 +295,17 @@ func (f *Active) Search(ctx context.Context, params processor.SearchParams) (*se return dp.Search(params) } +func (f *Active) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) { + if f.Info().DocsTotal == 0 { // it is empty active fraction state + return nil, nil + } + + dp := f.createDataProvider(ctx) + defer dp.release() + + return dp.FindLIDs(ids) +} + func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider { return &activeDataProvider{ ctx: ctx, @@ -300,7 +318,10 @@ func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider { blocksOffsets: f.DocBlocks.GetVals(), docsPositions: f.DocsPositions, + idsToLids: f.IDsToLIDs, docsReader: &f.docsReader, + + docsFilter: f.docsFilter, } } diff --git a/frac/active_index.go b/frac/active_index.go index 350a8e0d..a131c49b 100644 --- a/frac/active_index.go +++ b/frac/active_index.go @@ -2,6 +2,8 @@ package frac import ( "context" + "math" + "slices" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" @@ -25,9 +27,12 @@ type activeDataProvider struct { blocksOffsets []uint64 docsPositions *DocsPositions + idsToLids *ActiveLIDs docsReader *storage.DocsReader idsIndex *activeIDsIndex + + docsFilter DocsFilter } func (dp *activeDataProvider) release() { @@ -76,7 +81,10 @@ func (dp *activeDataProvider) Fetch(ids []seq.ID) ([][]byte, error) { indexes := []activeFetchIndex{{ blocksOffsets: dp.blocksOffsets, docsPositions: dp.docsPositions, + idsToLids: dp.idsToLids, docsReader: dp.docsReader, + docsFilter: dp.docsFilter, + fracName: dp.info.Name(), }} for _, fi := range indexes { @@ -116,6 +124,8 @@ func (dp *activeDataProvider) Search(params processor.SearchParams) (*seq.QPR, e indexes := []activeSearchIndex{{ activeIDsIndex: dp.getIDsIndex(), activeTokenIndex: dp.getTokenIndex(), + docsFilter: dp.docsFilter, + fracName: dp.info.Name(), }} m.Stop() @@ -136,6 +146,16 @@ func (dp *activeDataProvider) Search(params processor.SearchParams) (*seq.QPR, e return res, nil } +func (dp *activeDataProvider) FindLIDs(ids []seq.ID) ([]seq.LID, error) { + res := make([]seq.LID, 0, len(ids)) + for _, id := range ids { + if lid, ok := dp.idsToLids.Get(id); ok { + res = append(res, lid) + } + } + return res, nil +} + type activeIDsIndex struct { mids []uint64 rids []uint64 @@ -167,6 +187,35 @@ func (p *activeIDsIndex) LessOrEqual(lid seq.LID, id seq.ID) bool { type activeSearchIndex struct { *activeIDsIndex *activeTokenIndex + docsFilter DocsFilter + fracName string +} + +func (si *activeSearchIndex) GetTombstones(minLID, maxLID uint32, reverse bool) (node.Node, error) { + // active fraction doesn't meet min and max lid + minLID, maxLID = uint32(0), uint32(math.MaxUint32) + + iterator, err := si.docsFilter.GetTombstonesIteratorByFrac(si.fracName, minLID, maxLID, reverse) + if err != nil { + return nil, err + } + + res := make([]uint32, 0) + for { + // traverse iterator to inverse and sort lids + lid, has := iterator.Next() + if !has { + break + } + if inversed, ok := si.activeIDsIndex.inverser.Inverse(lid); ok { + res = append(res, uint32(inversed)) + } + } + + // we need to sort inversed values since they may be out of order after replay of active fraction + slices.Sort(res) + + return node.NewStatic(res, reverse), nil } type activeTokenIndex struct { @@ -212,19 +261,55 @@ func inverseLIDs(unmapped []uint32, inv *inverser, minLID, maxLID uint32) []uint type activeFetchIndex struct { blocksOffsets []uint64 docsPositions *DocsPositions + idsToLids *ActiveLIDs docsReader *storage.DocsReader + docsFilter DocsFilter + fracName string } func (di *activeFetchIndex) GetBlocksOffsets(num uint32) uint64 { return di.blocksOffsets[num] } -func (di *activeFetchIndex) GetDocPos(ids []seq.ID) []seq.DocPos { +func (di *activeFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) { + allLids := make([]uint32, len(ids)) + for i, id := range ids { + if lid, ok := di.idsToLids.Get(id); ok { + allLids[i] = uint32(lid) + } + } + + minLID, maxLID := uint32(0), uint32(math.MaxUint32) + tombstonesIterator, err := di.docsFilter.GetTombstonesIteratorByFrac(di.fracName, minLID, maxLID, false) + if err != nil { + return nil, err + } + + filteredLIDs := make(map[uint32]struct{}) + for { + lid, has := tombstonesIterator.Next() + if !has { + break + } + filteredLIDs[lid] = struct{}{} + } + docsPos := make([]seq.DocPos, len(ids)) for i, id := range ids { docsPos[i] = di.docsPositions.GetSync(id) } - return docsPos + + if len(filteredLIDs) == 0 { + return docsPos, nil + } + + for i, lid := range allLids { + if _, ok := filteredLIDs[lid]; ok { + docsPos[i] = seq.DocPosNotFound + } + } + + return docsPos, nil } func (di *activeFetchIndex) ReadDocs(blockOffset uint64, docOffsets []uint64) ([][]byte, error) { diff --git a/frac/active_indexer.go b/frac/active_indexer.go index 5cda8f9f..3821cf03 100644 --- a/frac/active_indexer.go +++ b/frac/active_indexer.go @@ -161,6 +161,10 @@ func (ai *ActiveIndexer) appendWorker(index int) { lids := active.AppendIDs(collector.IDs) m.Stop() + m = sw.Start("active_lids_map_set") + active.IDsToLIDs.SetMultiple(collector.IDs, lids) + m.Stop() + m = sw.Start("token_list_append") tokenLIDsPlaces := collector.PrepareTokenLIDsPlaces() active.TokenList.Append(collector.TokensValues, collector.FieldsLengths, tokenLIDsPlaces) diff --git a/frac/active_indexer_test.go b/frac/active_indexer_test.go index fc2585c6..f85d3d5e 100644 --- a/frac/active_indexer_test.go +++ b/frac/active_indexer_test.go @@ -90,6 +90,7 @@ func BenchmarkIndexer(b *testing.B) { cache.NewCache[[]byte](nil, nil), cache.NewCache[[]byte](nil, nil), &Config{}, + testDocsFilter{}, ) processor := getTestProcessor() diff --git a/frac/active_lids_map.go b/frac/active_lids_map.go new file mode 100644 index 00000000..bae64854 --- /dev/null +++ b/frac/active_lids_map.go @@ -0,0 +1,37 @@ +package frac + +import ( + "sync" + + "github.com/ozontech/seq-db/seq" +) + +type ActiveLIDs struct { + mu *sync.RWMutex + idToLid map[seq.ID]seq.LID +} + +func NewActiveLIDs() *ActiveLIDs { + al := ActiveLIDs{ + mu: &sync.RWMutex{}, + idToLid: make(map[seq.ID]seq.LID), + } + return &al +} + +func (al *ActiveLIDs) Get(id seq.ID) (seq.LID, bool) { + al.mu.RLock() + defer al.mu.RUnlock() + + val, ok := al.idToLid[id] + return val, ok +} + +func (al *ActiveLIDs) SetMultiple(ids []seq.ID, lids []uint32) { + al.mu.Lock() + defer al.mu.Unlock() + + for i, id := range ids { + al.idToLid[id] = seq.LID(lids[i]) + } +} diff --git a/frac/fraction.go b/frac/fraction.go index 59995639..a3027c97 100644 --- a/frac/fraction.go +++ b/frac/fraction.go @@ -21,6 +21,7 @@ type Fraction interface { Contains(mid seq.MID) bool Fetch(context.Context, []seq.ID) ([][]byte, error) Search(context.Context, processor.SearchParams) (*seq.QPR, error) + FindLIDs(context.Context, []seq.ID) ([]seq.LID, error) } var ( diff --git a/frac/fraction_concurrency_test.go b/frac/fraction_concurrency_test.go index a1d36957..94231c4b 100644 --- a/frac/fraction_concurrency_test.go +++ b/frac/fraction_concurrency_test.go @@ -52,6 +52,7 @@ func TestConcurrentAppendAndQuery(t *testing.T) { cache.NewCache[[]byte](nil, nil), cache.NewCache[[]byte](nil, nil), &Config{}, + testDocsFilter{}, ) mapping := seq.Mapping{ @@ -348,6 +349,7 @@ func seal(active *Active) (*Sealed, error) { indexCache, cache.NewCache[[]byte](nil, nil), &Config{}, + testDocsFilter{}, ) active.Release() return sealed, nil diff --git a/frac/fraction_test.go b/frac/fraction_test.go index b136104e..e88233dd 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -27,6 +27,7 @@ import ( "github.com/ozontech/seq-db/frac/sealed/seqids" "github.com/ozontech/seq-db/frac/sealed/token" "github.com/ozontech/seq-db/indexer" + "github.com/ozontech/seq-db/node" "github.com/ozontech/seq-db/parser" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/storage" @@ -34,6 +35,16 @@ import ( "github.com/ozontech/seq-db/tokenizer" ) +type testDocsFilter struct{} + +func (testDocsFilter) GetFilteredLIDsByFrac(_ string) ([]uint32, error) { + return []uint32{}, nil +} +func (testDocsFilter) GetTombstonesIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, error) { + return node.NewStatic([]uint32{}, false), nil +} +func (testDocsFilter) RemoveFrac(_ string) {} + type FractionTestSuite struct { suite.Suite tmpDir string @@ -1686,6 +1697,7 @@ func (s *FractionTestSuite) newActive(bulks ...[]string) *Active { cache.NewCache[[]byte](nil, nil), cache.NewCache[[]byte](nil, nil), s.config, + testDocsFilter{}, ) var wg sync.WaitGroup @@ -1748,6 +1760,7 @@ func (s *FractionTestSuite) newSealed(bulks ...[]string) *Sealed { indexCache, cache.NewCache[[]byte](nil, nil), s.config, + testDocsFilter{}, ) active.Release() return sealed @@ -1824,7 +1837,9 @@ func (s *ActiveReplayedFractionTestSuite) Replay(frac *Active) Fraction { storage.NewReadLimiter(1, nil), cache.NewCache[[]byte](nil, nil), cache.NewCache[[]byte](nil, nil), - &Config{}) + &Config{}, + testDocsFilter{}, + ) err := replayedFrac.Replay(context.Background()) s.Require().NoError(err, "replay failed") return replayedFrac @@ -1935,7 +1950,9 @@ func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *Seal indexCache, cache.NewCache[[]byte](nil, nil), nil, - s.config) + s.config, + testDocsFilter{}, + ) s.fraction = sealed return sealed } @@ -2004,7 +2021,9 @@ func (s *RemoteFractionTestSuite) SetupTest() { cache.NewCache[[]byte](nil, nil), sealed.info, s.config, - s3cli) + s3cli, + testDocsFilter{}, + ) s.fraction = remoteFrac } } diff --git a/frac/processor/eval_tree.go b/frac/processor/eval_tree.go index 547afcbd..e2e0835f 100644 --- a/frac/processor/eval_tree.go +++ b/frac/processor/eval_tree.go @@ -77,6 +77,11 @@ func evalLeaf( return node.BuildORTree(lidsTids, order.IsReverse()), nil } +func evalTombstones(root, tombstonesIterator node.Node, reverse bool, stats *searchStats) node.Node { + stats.NodesTotal++ + return node.NewNAnd(tombstonesIterator, root, reverse) +} + type Aggregator interface { // Next iterates to count the next lid. Next(lid uint32) error diff --git a/frac/processor/fetch.go b/frac/processor/fetch.go index 41d46152..e2267780 100644 --- a/frac/processor/fetch.go +++ b/frac/processor/fetch.go @@ -7,13 +7,16 @@ import ( type fetchIndex interface { GetBlocksOffsets(uint32) uint64 - GetDocPos([]seq.ID) []seq.DocPos + GetDocPos([]seq.ID) ([]seq.DocPos, error) ReadDocs(blockOffset uint64, docOffsets []uint64) ([][]byte, error) } func IndexFetch(ids []seq.ID, sw *stopwatch.Stopwatch, fetchIndex fetchIndex, res [][]byte) error { m := sw.Start("get_docs_pos") - docsPos := fetchIndex.GetDocPos(ids) + docsPos, err := fetchIndex.GetDocPos(ids) + if err != nil { + return err + } blocks, offsets, index := seq.GroupDocsOffsets(docsPos) m.Stop() diff --git a/frac/processor/search.go b/frac/processor/search.go index 21737712..6534bc59 100644 --- a/frac/processor/search.go +++ b/frac/processor/search.go @@ -38,6 +38,7 @@ type tokenIndex interface { type searchIndex interface { tokenIndex idsIndex + GetTombstones(minLID, maxLID uint32, reverse bool) (node.Node, error) } func IndexSearch( @@ -87,6 +88,17 @@ func IndexSearch( m.Stop() } + m = sw.Start("get_tombstones") + tombstones, err := index.GetTombstones(minLID, maxLID, params.Order.IsReverse()) + m.Stop() + if err != nil { + return nil, err + } + + m = sw.Start("eval_tombstones") + evalTree = evalTombstones(evalTree, tombstones, params.Order.IsReverse(), stats) + m.Stop() + m = sw.Start("iterate_eval_tree") total, ids, histogram, err := iterateEvalTree(ctx, params, index, evalTree, aggs, sw) m.Stop() diff --git a/frac/remote.go b/frac/remote.go index c2088caa..6f9cdb62 100644 --- a/frac/remote.go +++ b/frac/remote.go @@ -55,6 +55,8 @@ type Remote struct { s3cli *s3.Client readLimiter *storage.ReadLimiter + + docsFilter DocsFilter } func NewRemote( @@ -66,6 +68,7 @@ func NewRemote( info *common.Info, config *Config, s3cli *s3.Client, + docsFilter DocsFilter, ) *Remote { f := &Remote{ ctx: ctx, @@ -81,6 +84,8 @@ func NewRemote( Config: config, s3cli: s3cli, + + docsFilter: docsFilter, } // Fast path if fraction-info cache exists AND it has valid index size. @@ -131,6 +136,16 @@ func (f *Remote) Search(ctx context.Context, params processor.SearchParams) (*se return dp.Search(params) } +func (f *Remote) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) { + dp, err := f.createDataProvider(ctx) + if err != nil { + return nil, err + } + defer dp.release() + + return dp.FindLIDs(ids) +} + func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, error) { if err := f.load(); err != nil { logger.Error( @@ -160,6 +175,7 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e &f.blocksData.IDsTable, f.info.BinaryDataVer, ), + docsFilter: f.docsFilter, }, nil } @@ -191,6 +207,8 @@ func (f *Remote) Suicide() { zap.Error(err), ) } + + go f.docsFilter.RemoveFrac(f.info.Name()) } func (f *Remote) String() string { diff --git a/frac/sealed.go b/frac/sealed.go index c4c033d8..8161e061 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -51,6 +51,8 @@ type Sealed struct { // shit for testing PartialSuicideMode PSD + + docsFilter DocsFilter } type PSD int // emulates hard shutdown on different stages of fraction deletion, used for tests @@ -68,6 +70,7 @@ func NewSealed( docsCache *cache.Cache[[]byte], info *common.Info, config *Config, + docsFilter DocsFilter, ) *Sealed { f := &Sealed{ loadMu: &sync.RWMutex{}, @@ -81,6 +84,8 @@ func NewSealed( Config: config, PartialSuicideMode: Off, + + docsFilter: docsFilter, } // fast path if fraction-info cache exists AND it has valid index size @@ -130,6 +135,7 @@ func NewSealedPreloaded( indexCache *IndexCache, docsCache *cache.Cache[[]byte], config *Config, + docsFilter DocsFilter, ) *Sealed { f := &Sealed{ blocksData: preloaded.BlocksData, @@ -144,6 +150,8 @@ func NewSealedPreloaded( info: preloaded.Info, BaseFileName: baseFile, Config: config, + + docsFilter: docsFilter, } // put the token table built during sealing into the cache of the sealed fraction @@ -292,6 +300,8 @@ func (f *Sealed) Suicide() { zap.Error(err), ) } + + go f.docsFilter.RemoveFrac(f.info.Name()) } func (f *Sealed) String() string { @@ -312,6 +322,13 @@ func (f *Sealed) Search(ctx context.Context, params processor.SearchParams) (*se return dp.Search(params) } +func (f *Sealed) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) { + dp := f.createDataProvider(ctx) + defer dp.release() + + return dp.FindLIDs(ids) +} + func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { f.load() return &sealedDataProvider{ @@ -336,6 +353,8 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { &f.blocksData.IDsTable, f.info.BinaryDataVer, ), + + docsFilter: f.docsFilter, } } diff --git a/frac/sealed_index.go b/frac/sealed_index.go index f97c6e84..53b8e7f7 100644 --- a/frac/sealed_index.go +++ b/frac/sealed_index.go @@ -22,6 +22,11 @@ import ( "github.com/ozontech/seq-db/util" ) +type DocsFilter interface { + GetTombstonesIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, error) + RemoveFrac(fracName string) +} + type sealedDataProvider struct { ctx context.Context info *common.Info @@ -42,6 +47,8 @@ type sealedDataProvider struct { // fractionTypeLabel can be either 'sealed' or 'remote'. // This value is used in metrics to distinguish between operations over local and remote fractions. fractionTypeLabel string + + docsFilter DocsFilter } func (dp *sealedDataProvider) getIDsIndex() *sealedIDsIndex { @@ -54,9 +61,11 @@ func (dp *sealedDataProvider) getIDsIndex() *sealedIDsIndex { func (dp *sealedDataProvider) getFetchIndex() *sealedFetchIndex { return &sealedFetchIndex{ + fracName: dp.info.Name(), idsIndex: dp.getIDsIndex(), docsReader: dp.docsReader, blocksOffsets: dp.blocksOffsets, + docsFilter: dp.docsFilter, } } @@ -74,6 +83,7 @@ func (dp *sealedDataProvider) getSearchIndex() *sealedSearchIndex { return &sealedSearchIndex{ sealedIDsIndex: dp.getIDsIndex(), sealedTokenIndex: dp.getTokenIndex(), + docsFilter: dp.docsFilter, } } @@ -122,6 +132,10 @@ func (dp *sealedDataProvider) Search(params processor.SearchParams) (*seq.QPR, e return qpr, nil } +func (dp *sealedDataProvider) FindLIDs(ids []seq.ID) ([]seq.LID, error) { + return dp.getFetchIndex().findLIDs(ids), nil +} + type sealedIDsIndex struct { fracName string table *seqids.Table @@ -255,17 +269,56 @@ func (ti *sealedTokenIndex) GetLIDsFromTIDs(tids []uint32, stats lids.Counter, m } type sealedFetchIndex struct { + fracName string idsIndex *sealedIDsIndex docsReader *storage.DocsReader blocksOffsets []uint64 + docsFilter DocsFilter } func (fi *sealedFetchIndex) GetBlocksOffsets(num uint32) uint64 { return fi.blocksOffsets[num] } -func (fi *sealedFetchIndex) GetDocPos(ids []seq.ID) []seq.DocPos { - return fi.getDocPosByLIDs(fi.findLIDs(ids)) +func (fi *sealedFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) { + allLids := fi.findLIDs(ids) + + minLID, maxLID := uint32(0), uint32(math.MaxUint32) + if len(allLids) > 0 { + // allLids can be not sorted + minVal, maxVal := allLids[0], allLids[0] + for i := 1; i < len(allLids); i++ { + minVal = min(minVal, allLids[i]) + maxVal = max(maxVal, allLids[i]) + } + minLID, maxLID = uint32(minVal), uint32(maxVal) + } + + tombstonesIterator, err := fi.docsFilter.GetTombstonesIteratorByFrac(fi.fracName, minLID, maxLID, false) + if err != nil { + return nil, err + } + + filteredLIDs := make(map[uint32]struct{}) + for { + lid, has := tombstonesIterator.Next() + if !has { + break + } + filteredLIDs[lid] = struct{}{} + } + + if len(filteredLIDs) == 0 { + return fi.getDocPosByLIDs(allLids), nil + } + + for i, lid := range allLids { + if _, ok := filteredLIDs[uint32(lid)]; ok { + allLids[i] = 0 + } + } + + return fi.getDocPosByLIDs(allLids), nil } func (fi *sealedFetchIndex) ReadDocs(blockOffset uint64, docOffsets []uint64) ([][]byte, error) { @@ -320,4 +373,9 @@ func (fi *sealedFetchIndex) getDocPosByLIDs(localIDs []seq.LID) []seq.DocPos { type sealedSearchIndex struct { *sealedIDsIndex *sealedTokenIndex + docsFilter DocsFilter +} + +func (si *sealedSearchIndex) GetTombstones(minLID, maxLID uint32, reverse bool) (node.Node, error) { + return si.docsFilter.GetTombstonesIteratorByFrac(si.fracName, minLID, maxLID, reverse) } diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index 50d7e1f3..46d4da1e 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -35,13 +35,13 @@ var defaultStorageState = StorageState{ // - stats updating // // Returns the manager instance and a stop function to gracefully shutdown -func New(ctx context.Context, cfg *Config, s3cli *s3.Client) (*FracManager, func(), error) { +func New(ctx context.Context, cfg *Config, s3cli *s3.Client, docsFilter DocsFilter) (*FracManager, func(), error) { FillConfigWithDefault(cfg) readLimiter := storage.NewReadLimiter(config.ReaderWorkers, storeBytesRead) idx, stopIdx := frac.NewActiveIndexer(config.IndexWorkers, config.IndexWorkers) cache := NewCacheMaintainer(cfg.CacheSize, cfg.SortCacheSize, newDefaultCacheMetrics()) - provider := newFractionProvider(cfg, s3cli, cache, readLimiter, idx) + provider := newFractionProvider(cfg, s3cli, cache, readLimiter, idx, docsFilter) infoCache := NewFracInfoCache(filepath.Join(cfg.DataDir, consts.FracCacheFileSuffix)) // Load existing fractions into registry diff --git a/fracmanager/fracmanager_test.go b/fracmanager/fracmanager_test.go index 89437904..a2c2b8ca 100644 --- a/fracmanager/fracmanager_test.go +++ b/fracmanager/fracmanager_test.go @@ -8,9 +8,21 @@ import ( "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/indexer" + "github.com/ozontech/seq-db/node" "github.com/ozontech/seq-db/seq" ) +type testDocsFilter struct{} + +func (testDocsFilter) GetFilteredLIDsByFrac(_ string) ([]uint32, error) { + return nil, nil +} +func (testDocsFilter) GetTombstonesIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, error) { + return node.NewStatic([]uint32{}, reverse), nil +} +func (testDocsFilter) RefreshFrac(_ frac.Fraction) {} +func (testDocsFilter) RemoveFrac(_ string) {} + func setupDataDir(t testing.TB, cfg *Config) *Config { if cfg == nil { cfg = &Config{ @@ -25,7 +37,7 @@ func setupDataDir(t testing.TB, cfg *Config) *Config { func setupFracManager(t testing.TB, cfg *Config) (*Config, *FracManager, func()) { cfg = setupDataDir(t, cfg) - fm, stop, err := New(t.Context(), cfg, nil) + fm, stop, err := New(t.Context(), cfg, nil, testDocsFilter{}) assert.NoError(t, err) return cfg, fm, stop } diff --git a/fracmanager/fraction_provider.go b/fracmanager/fraction_provider.go index e2915598..f970912f 100644 --- a/fracmanager/fraction_provider.go +++ b/fracmanager/fraction_provider.go @@ -13,12 +13,19 @@ import ( "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/frac/sealed/sealing" + "github.com/ozontech/seq-db/node" "github.com/ozontech/seq-db/storage" "github.com/ozontech/seq-db/storage/s3" ) const fileBasePattern = "seq-db-" +type DocsFilter interface { + GetTombstonesIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, error) + RefreshFrac(frac frac.Fraction) + RemoveFrac(fracName string) +} + // fractionProvider is a factory for creating different types of fractions // Contains all necessary dependencies for creating and managing fractions type fractionProvider struct { @@ -28,11 +35,13 @@ type fractionProvider struct { activeIndexer *frac.ActiveIndexer // Indexer for active fractions readLimiter *storage.ReadLimiter // Read rate limiter ulidEntropy io.Reader // Entropy source for ULID generation + docsFilter DocsFilter } func newFractionProvider( cfg *Config, s3cli *s3.Client, cp *CacheMaintainer, readLimiter *storage.ReadLimiter, indexer *frac.ActiveIndexer, + docsFilter DocsFilter, ) *fractionProvider { return &fractionProvider{ s3cli: s3cli, @@ -41,6 +50,7 @@ func newFractionProvider( activeIndexer: indexer, readLimiter: readLimiter, ulidEntropy: ulid.Monotonic(rand.New(rand.NewSource(time.Now().UnixNano())), 0), + docsFilter: docsFilter, } } @@ -52,6 +62,7 @@ func (fp *fractionProvider) NewActive(name string) *frac.Active { fp.cacheProvider.CreateDocBlockCache(), fp.cacheProvider.CreateSortDocsCache(), &fp.config.Fraction, + fp.docsFilter, ) } @@ -63,6 +74,7 @@ func (fp *fractionProvider) NewSealed(name string, cachedInfo *common.Info) *fra fp.cacheProvider.CreateDocBlockCache(), cachedInfo, // Preloaded meta information &fp.config.Fraction, + fp.docsFilter, ) } @@ -74,6 +86,7 @@ func (fp *fractionProvider) NewSealedPreloaded(name string, preloadedData *seale fp.cacheProvider.CreateIndexCache(), fp.cacheProvider.CreateDocBlockCache(), &fp.config.Fraction, + fp.docsFilter, ) } @@ -87,6 +100,7 @@ func (fp *fractionProvider) NewRemote(ctx context.Context, name string, cachedIn cachedInfo, &fp.config.Fraction, fp.s3cli, + fp.docsFilter, ) } @@ -117,7 +131,9 @@ func (fp *fractionProvider) Seal(active *frac.Active) (*frac.Sealed, error) { return nil, err } - return fp.NewSealedPreloaded(active.BaseFileName, preloaded), nil + sealedFrac := fp.NewSealedPreloaded(active.BaseFileName, preloaded) + fp.docsFilter.RefreshFrac(sealedFrac) + return sealedFrac, nil } // Offload uploads fraction to S3 storage and returns a remote fraction diff --git a/fracmanager/fraction_provider_test.go b/fracmanager/fraction_provider_test.go index f315b615..4bbc1951 100644 --- a/fracmanager/fraction_provider_test.go +++ b/fracmanager/fraction_provider_test.go @@ -38,7 +38,7 @@ func setupFractionProvider(t testing.TB, cfg *Config) (*fractionProvider, func() s3cli, stopS3 := setupS3Client(t) idx, stopIdx := frac.NewActiveIndexer(1, 1) cache := NewCacheMaintainer(uint64(units.MB), uint64(units.MB), nil) - provider := newFractionProvider(cfg, s3cli, cache, rl, idx) + provider := newFractionProvider(cfg, s3cli, cache, rl, idx, testDocsFilter{}) return provider, func() { stopIdx() stopS3() @@ -46,7 +46,7 @@ func setupFractionProvider(t testing.TB, cfg *Config) (*fractionProvider, func() } func TestFractionID(t *testing.T) { - fp := newFractionProvider(nil, nil, nil, nil, nil) + fp := newFractionProvider(nil, nil, nil, nil, nil, nil) ulid1 := fp.nextFractionID() ulid2 := fp.nextFractionID() assert.NotEqual(t, ulid1, ulid2, "ULIDs should be different") diff --git a/fracmanager/proxy_frac.go b/fracmanager/proxy_frac.go index 949e2412..f18bb99c 100644 --- a/fracmanager/proxy_frac.go +++ b/fracmanager/proxy_frac.go @@ -69,6 +69,10 @@ func (p *fractionProxy) Search(ctx context.Context, params processor.SearchParam return p.impl.Search(ctx, params) } +func (p *fractionProxy) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) { + return p.impl.FindLIDs(ctx, ids) +} + // activeProxy manages an active (writable) fraction // Tracks pending write operations and provides freeze capability. // Lifecycle: Created when fraction becomes active, destroyed after sealing. @@ -172,3 +176,7 @@ func (emptyFraction) Search(_ context.Context, params processor.SearchParams) (* metric.CountersTotal.WithLabelValues("empty_data_provider").Inc() return &seq.QPR{Aggs: make([]seq.AggregatableSamples, len(params.AggQ))}, nil } + +func (emptyFraction) FindLIDs(_ context.Context, _ []seq.ID) ([]seq.LID, error) { + return nil, nil +} diff --git a/fracmanager/searcher_test.go b/fracmanager/searcher_test.go index eb70a2d2..bdb07ec6 100644 --- a/fracmanager/searcher_test.go +++ b/fracmanager/searcher_test.go @@ -62,6 +62,10 @@ func (f *testFakeFrac) Search(context.Context, processor.SearchParams) (*seq.QPR return f.qpr, nil } +func (f *testFakeFrac) FindLIDs(context.Context, []seq.ID) ([]seq.LID, error) { + return []seq.LID{}, nil +} + func newFakeFrac(from, to seq.MID, qpr *seq.QPR) *testFakeFrac { return &testFakeFrac{ info: &common.Info{From: from, To: to, DocsTotal: 1}, diff --git a/storeapi/grpc_v1_test.go b/storeapi/grpc_v1_test.go index ced60053..9b4a5453 100644 --- a/storeapi/grpc_v1_test.go +++ b/storeapi/grpc_v1_test.go @@ -12,6 +12,7 @@ import ( "github.com/ozontech/seq-db/asyncsearcher" "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/docsfilter" "github.com/ozontech/seq-db/fracmanager" "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/mappingprovider" @@ -67,11 +68,16 @@ func getTestGrpc(t *testing.T) (*GrpcV1, func(), func()) { dataDir := common.GetTestTmpDir(t) common.RecreateDir(dataDir) + mappingProvider, err := mappingprovider.New("", mappingprovider.WithMapping(seq.TestMapping)) + assert.NoError(t, err) + + df := docsfilter.New(t.Context(), docsfilter.Config{}, nil, mappingProvider) + fm, stop, err := fracmanager.New(t.Context(), &fracmanager.Config{ FracSize: 500, TotalSize: 5000, DataDir: dataDir, - }, nil) + }, nil, df) assert.NoError(t, err) config := APIConfig{ @@ -92,9 +98,6 @@ func getTestGrpc(t *testing.T) (*GrpcV1, func(), func()) { }, } - mappingProvider, err := mappingprovider.New("", mappingprovider.WithMapping(seq.TestMapping)) - assert.NoError(t, err) - g := NewGrpcV1(config, fm, mappingProvider) release := func() { diff --git a/storeapi/store.go b/storeapi/store.go index 857be4e2..c41cdf21 100644 --- a/storeapi/store.go +++ b/storeapi/store.go @@ -9,6 +9,7 @@ import ( "go.uber.org/atomic" "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/docsfilter" "github.com/ozontech/seq-db/fracmanager" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/metric" @@ -35,6 +36,7 @@ type Store struct { type StoreConfig struct { API APIConfig FracManager fracmanager.Config + Filters docsfilter.Config } func (c *StoreConfig) setDefaults() error { @@ -44,19 +46,32 @@ func (c *StoreConfig) setDefaults() error { if c.API.Search.Async.DataDir == "" { c.API.Search.Async.DataDir = path.Join(c.FracManager.DataDir, "async_searches") } + if c.Filters.DataDir == "" { + c.Filters.DataDir = path.Join(c.FracManager.DataDir, "filters") + } return nil } -func NewStore(ctx context.Context, c StoreConfig, s3cli *s3.Client, mappingProvider MappingProvider) (*Store, error) { +func NewStore( + ctx context.Context, + c StoreConfig, + s3cli *s3.Client, + mappingProvider MappingProvider, + docFilterParams []docsfilter.Params, +) (*Store, error) { if err := c.setDefaults(); err != nil { return nil, err } - fracManager, stop, err := fracmanager.New(ctx, &c.FracManager, s3cli) + df := docsfilter.New(ctx, c.Filters, docFilterParams, mappingProvider) + + fracManager, stop, err := fracmanager.New(ctx, &c.FracManager, s3cli, df) if err != nil { return nil, fmt.Errorf("loading fractions error: %w", err) } + df.Start(fracManager.Fractions()) + return &Store{ Config: c, // We will set grpcAddr later in Start() diff --git a/tests/setup/env.go b/tests/setup/env.go index cafbbc02..15072152 100644 --- a/tests/setup/env.go +++ b/tests/setup/env.go @@ -22,6 +22,7 @@ import ( "github.com/ozontech/seq-db/buildinfo" "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/docsfilter" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/fracmanager" "github.com/ozontech/seq-db/logger" @@ -275,7 +276,7 @@ func (cfg *TestingEnvConfig) MakeStores( logger.Fatal("can't create mapping", zap.Error(err)) } - store, err := storeapi.NewStore(context.Background(), confs[i], s3cli, mappingProvider) + store, err := storeapi.NewStore(context.Background(), confs[i], s3cli, mappingProvider, []docsfilter.Params{}) if err != nil { panic(err) } diff --git a/util/fs.go b/util/fs.go index 57fd7b17..b6e9eaca 100644 --- a/util/fs.go +++ b/util/fs.go @@ -6,30 +6,32 @@ package util import ( "errors" "os" + "path" + "path/filepath" "go.uber.org/zap" "github.com/ozontech/seq-db/logger" ) -func MustSyncPath(path string) { - if err := SyncPath(path); err != nil { - logger.Panic("cannot sync path", zap.String("path", path), zap.Error(err)) +func MustSyncPath(dirPath string) { + if err := SyncPath(dirPath); err != nil { + logger.Panic("cannot sync path", zap.String("path", dirPath), zap.Error(err)) } } -func MustRemoveFileByPath(path string) { - if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) { +func MustRemoveFileByPath(fpath string) { + if err := os.Remove(fpath); err != nil && !errors.Is(err, os.ErrNotExist) { logger.Panic( "cannot remove file by path", - zap.String("path", path), + zap.String("path", fpath), zap.Error(err), ) } } -func SyncPath(path string) error { - d, err := os.Open(path) +func SyncPath(dirPath string) error { + d, err := os.Open(dirPath) if err != nil { return err } @@ -53,3 +55,75 @@ func RemoveFile(file string) { logger.Error("file removing error", zap.Error(err)) } } + +func MustWriteFileAtomic(fpath string, data []byte, tmpFileExt string) { + fpathTmp := fpath + tmpFileExt + + f, err := os.Create(fpathTmp) + if err != nil { + logger.Panic("can't create file", zap.Error(err)) + } + defer func() { + if err := f.Close(); err != nil { + logger.Panic("can't close file", zap.Error(err)) + } + }() + + if _, err := f.Write(data); err != nil { + logger.Panic("can't write to file", zap.Error(err)) + } + + if err := f.Sync(); err != nil { + logger.Panic("can't sync file", zap.Error(err)) + } + + if err := os.Rename(fpathTmp, fpath); err != nil { + logger.Panic("can't rename file", zap.Error(err)) + } + + absFpath, err := filepath.Abs(fpath) + if err != nil { + logger.Panic("can't get absolute path", zap.String("path", fpath), zap.Error(err)) + } + dir := path.Dir(absFpath) + MustFsyncFile(dir) +} + +func MustFsyncFile(fpath string) { + dirFile, err := os.Open(fpath) + if err != nil { + logger.Panic("can't open dir", zap.Error(err)) + } + if err := dirFile.Sync(); err != nil { + logger.Panic("can't sync dir", zap.Error(err)) + } + if err := dirFile.Close(); err != nil { + logger.Panic("can't close dir", zap.Error(err)) + } +} + +// MustCreateDir creates directory at dirPath. +// Handles the case when directory already exists. +func MustCreateDir(dirPath string) { + err := os.MkdirAll(dirPath, 0o777) + if err != nil && !os.IsExist(err) { + logger.Panic("can't create file", zap.Error(err)) + } +} + +// VisitFilesWithExt traverses all the files with `ext` extension in `des` directory and calls a `cb` func for each of files. +func VisitFilesWithExt(des []os.DirEntry, ext string, cb func(name string) error) error { + for _, de := range des { + if de.IsDir() { + continue + } + name := de.Name() + if path.Ext(name) != ext { + continue + } + if err := cb(name); err != nil { + return err + } + } + return nil +}