diff --git a/frac/fraction_concurrency_test.go b/frac/fraction_concurrency_test.go index 0155a682..a1d36957 100644 --- a/frac/fraction_concurrency_test.go +++ b/frac/fraction_concurrency_test.go @@ -334,7 +334,7 @@ func seal(active *Active) (*Sealed, error) { } indexCache := &IndexCache{ MIDs: cache.NewCache[[]byte](nil, nil), - RIDs: cache.NewCache[[]byte](nil, nil), + RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil), Params: cache.NewCache[seqids.BlockParams](nil, nil), LIDs: cache.NewCache[*lids.Block](nil, nil), Tokens: cache.NewCache[*token.Block](nil, nil), diff --git a/frac/fraction_test.go b/frac/fraction_test.go index d43b30ae..ff4ac602 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1617,7 +1617,7 @@ func (s *FractionTestSuite) newSealed(bulks ...[]string) *Sealed { indexCache := &IndexCache{ MIDs: cache.NewCache[[]byte](nil, nil), - RIDs: cache.NewCache[[]byte](nil, nil), + RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil), Params: cache.NewCache[seqids.BlockParams](nil, nil), LIDs: cache.NewCache[*lids.Block](nil, nil), Tokens: cache.NewCache[*token.Block](nil, nil), @@ -1805,7 +1805,7 @@ func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *Seal indexCache := &IndexCache{ MIDs: cache.NewCache[[]byte](nil, nil), - RIDs: cache.NewCache[[]byte](nil, nil), + RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil), Params: cache.NewCache[seqids.BlockParams](nil, nil), LIDs: cache.NewCache[*lids.Block](nil, nil), Tokens: cache.NewCache[*token.Block](nil, nil), @@ -1872,7 +1872,7 @@ func (s *RemoteFractionTestSuite) SetupTest() { indexCache := &IndexCache{ MIDs: cache.NewCache[[]byte](nil, nil), - RIDs: cache.NewCache[[]byte](nil, nil), + RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil), Params: cache.NewCache[seqids.BlockParams](nil, nil), LIDs: cache.NewCache[*lids.Block](nil, nil), Tokens: cache.NewCache[*token.Block](nil, nil), diff --git a/frac/index_cache.go b/frac/index_cache.go index 3dc5b3a4..4536fa22 100644 --- a/frac/index_cache.go +++ b/frac/index_cache.go @@ -10,7 +10,7 @@ import ( type IndexCache struct { Registry *cache.Cache[[]byte] MIDs *cache.Cache[[]byte] - RIDs *cache.Cache[[]byte] + RIDs *cache.Cache[seqids.BlockRIDs] Params *cache.Cache[seqids.BlockParams] Tokens *cache.Cache[*token.Block] TokenTable *cache.Cache[token.Table] diff --git a/frac/sealed/seqids/loader.go b/frac/sealed/seqids/loader.go index 1928e182..a4c9ecdb 100644 --- a/frac/sealed/seqids/loader.go +++ b/frac/sealed/seqids/loader.go @@ -2,6 +2,7 @@ package seqids import ( "errors" + "unsafe" "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/config" @@ -29,7 +30,7 @@ type Loader struct { reader *storage.IndexReader table *Table cacheMIDs *cache.Cache[[]byte] - cacheRIDs *cache.Cache[[]byte] + cacheRIDs *cache.Cache[BlockRIDs] cacheParams *cache.Cache[BlockParams] fracVersion config.BinaryDataVersion } @@ -55,52 +56,51 @@ func (l *Loader) GetMIDsBlock(index uint32, buf []uint64) (BlockMIDs, error) { return block, nil } -func (l *Loader) GetRIDsBlock(index uint32, buf []uint64) (BlockRIDs, error) { - // load binary from index - data, err := l.cacheRIDs.GetWithError(index, func() ([]byte, int, error) { +func (l *Loader) GetRIDsBlock(index uint32) (BlockRIDs, error) { + return l.cacheRIDs.GetWithError(index, func() (BlockRIDs, int, error) { data, _, err := l.reader.ReadIndexBlock(l.ridBlockIndex(index), nil) - return data, cap(data), err + if err != nil { + return BlockRIDs{}, 0, err + } + + block := BlockRIDs{ + fracVersion: l.fracVersion, + Values: make([]uint64, 0, consts.IDsPerBlock), + } + + err = block.Unpack(data) + if err != nil { + return BlockRIDs{}, 0, err + } + + if len(block.Values) == 0 { + return BlockRIDs{}, 0, errors.New("empty block") + } + + const ui64 = int(unsafe.Sizeof(uint64(0))) + return block, cap(block.Values) * ui64, err }) - // check errors - if err == nil && len(data) == 0 { - err = errors.New("empty block") - } - if err != nil { - return BlockRIDs{}, err - } - // unpack - block := BlockRIDs{ - fracVersion: l.fracVersion, - Values: buf, - } - if err := block.Unpack(data); err != nil { - return BlockRIDs{}, err - } - return block, nil } func (l *Loader) GetParamsBlock(index uint32) (BlockParams, error) { - // load binary from index - block, err := l.cacheParams.GetWithError(index, func() (BlockParams, int, error) { + return l.cacheParams.GetWithError(index, func() (BlockParams, int, error) { data, _, err := l.reader.ReadIndexBlock(l.paramsBlockIndex(index), nil) if err != nil { return BlockParams{}, 0, err } - // unpack + block := BlockParams{Values: make([]uint64, 0, consts.IDsPerBlock)} if err := block.Unpack(data); err != nil { return BlockParams{}, 0, err } + if len(block.Values) == 0 { return BlockParams{}, 0, errors.New("empty block") } - return block, cap(block.Values) * 8, nil + + const ui64 = int(unsafe.Sizeof(uint64(0))) + return block, cap(block.Values) * ui64, nil }) - // check errors - if err != nil { - return BlockParams{}, err - } - return block, nil } // blocks are stored as triplets on disk, (MID + RID + Pos), check docs/format-index-file.go diff --git a/frac/sealed/seqids/provider.go b/frac/sealed/seqids/provider.go index dff698c9..18a1c57c 100644 --- a/frac/sealed/seqids/provider.go +++ b/frac/sealed/seqids/provider.go @@ -18,7 +18,7 @@ type Provider struct { func NewProvider( indexReader *storage.IndexReader, cacheMIDs *cache.Cache[[]byte], - cacheRIDs *cache.Cache[[]byte], + cacheRIDs *cache.Cache[BlockRIDs], cacheParams *cache.Cache[BlockParams], table *Table, fracVersion config.BinaryDataVersion, @@ -76,13 +76,16 @@ func (p *Provider) RID(lid seq.LID) (seq.RID, error) { func (p *Provider) fillRIDs(blockIndex uint32, dst *unpackCache) error { if dst.blockIndex != int(blockIndex) { - block, err := p.loader.GetRIDsBlock(blockIndex, dst.values[:0]) + block, err := p.loader.GetRIDsBlock(blockIndex) if err != nil { return err } dst.blockIndex = int(blockIndex) dst.startLID = p.loader.table.BlockStartLID(blockIndex) - dst.values = block.Values + // we have to copy `block.Values` because we store them in `cache.Cache[BlockRIDs]`, + // but `dst *unpackCache` might put its `values` in sync.Pool on `release()`, and they + // will be reused and corrupted + dst.values = append(dst.values[:0], block.Values...) } return nil } diff --git a/fracmanager/cache_maintainer.go b/fracmanager/cache_maintainer.go index 5139b4ca..70e5f956 100644 --- a/fracmanager/cache_maintainer.go +++ b/fracmanager/cache_maintainer.go @@ -144,7 +144,7 @@ func (cm *CacheMaintainer) CreateSortDocsCache() *cache.Cache[[]byte] { func (cm *CacheMaintainer) CreateIndexCache() *frac.IndexCache { return &frac.IndexCache{ MIDs: newCache[[]byte](cm, midsName), - RIDs: newCache[[]byte](cm, ridsName), + RIDs: newCache[seqids.BlockRIDs](cm, ridsName), Params: newCache[seqids.BlockParams](cm, paramsName), LIDs: newCache[*lids.Block](cm, lidsName), Tokens: newCache[*token.Block](cm, tokensName),