Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion frac/fraction_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func seal(active *Active) (*Sealed, error) {
}
indexCache := &IndexCache{
MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
Tokens: cache.NewCache[*token.Block](nil, nil),
Expand Down
6 changes: 3 additions & 3 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1617,7 +1617,7 @@ func (s *FractionTestSuite) newSealed(bulks ...[]string) *Sealed {

indexCache := &IndexCache{
MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
Tokens: cache.NewCache[*token.Block](nil, nil),
Expand Down Expand Up @@ -1805,7 +1805,7 @@ func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *Seal

indexCache := &IndexCache{
MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
Tokens: cache.NewCache[*token.Block](nil, nil),
Expand Down Expand Up @@ -1872,7 +1872,7 @@ func (s *RemoteFractionTestSuite) SetupTest() {

indexCache := &IndexCache{
MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
Tokens: cache.NewCache[*token.Block](nil, nil),
Expand Down
2 changes: 1 addition & 1 deletion frac/index_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
type IndexCache struct {
Registry *cache.Cache[[]byte]
MIDs *cache.Cache[[]byte]
RIDs *cache.Cache[[]byte]
RIDs *cache.Cache[seqids.BlockRIDs]
Params *cache.Cache[seqids.BlockParams]
Tokens *cache.Cache[*token.Block]
TokenTable *cache.Cache[token.Table]
Expand Down
60 changes: 30 additions & 30 deletions frac/sealed/seqids/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package seqids

import (
"errors"
"unsafe"

"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/config"
Expand Down Expand Up @@ -29,7 +30,7 @@ type Loader struct {
reader *storage.IndexReader
table *Table
cacheMIDs *cache.Cache[[]byte]
cacheRIDs *cache.Cache[[]byte]
cacheRIDs *cache.Cache[BlockRIDs]
cacheParams *cache.Cache[BlockParams]
fracVersion config.BinaryDataVersion
}
Expand All @@ -55,52 +56,51 @@ func (l *Loader) GetMIDsBlock(index uint32, buf []uint64) (BlockMIDs, error) {
return block, nil
}

func (l *Loader) GetRIDsBlock(index uint32, buf []uint64) (BlockRIDs, error) {
// load binary from index
data, err := l.cacheRIDs.GetWithError(index, func() ([]byte, int, error) {
func (l *Loader) GetRIDsBlock(index uint32) (BlockRIDs, error) {
return l.cacheRIDs.GetWithError(index, func() (BlockRIDs, int, error) {
data, _, err := l.reader.ReadIndexBlock(l.ridBlockIndex(index), nil)
return data, cap(data), err
if err != nil {
return BlockRIDs{}, 0, err
}

block := BlockRIDs{
fracVersion: l.fracVersion,
Values: make([]uint64, 0, consts.IDsPerBlock),
}

err = block.Unpack(data)
if err != nil {
return BlockRIDs{}, 0, err
}

if len(block.Values) == 0 {
return BlockRIDs{}, 0, errors.New("empty block")
}

const ui64 = int(unsafe.Sizeof(uint64(0)))
return block, cap(block.Values) * ui64, err
})
// check errors
if err == nil && len(data) == 0 {
err = errors.New("empty block")
}
if err != nil {
return BlockRIDs{}, err
}
// unpack
block := BlockRIDs{
fracVersion: l.fracVersion,
Values: buf,
}
if err := block.Unpack(data); err != nil {
return BlockRIDs{}, err
}
return block, nil
}

func (l *Loader) GetParamsBlock(index uint32) (BlockParams, error) {
// load binary from index
block, err := l.cacheParams.GetWithError(index, func() (BlockParams, int, error) {
return l.cacheParams.GetWithError(index, func() (BlockParams, int, error) {
data, _, err := l.reader.ReadIndexBlock(l.paramsBlockIndex(index), nil)
if err != nil {
return BlockParams{}, 0, err
}
// unpack

block := BlockParams{Values: make([]uint64, 0, consts.IDsPerBlock)}
if err := block.Unpack(data); err != nil {
return BlockParams{}, 0, err
}

if len(block.Values) == 0 {
return BlockParams{}, 0, errors.New("empty block")
}
return block, cap(block.Values) * 8, nil

const ui64 = int(unsafe.Sizeof(uint64(0)))
return block, cap(block.Values) * ui64, nil
})
// check errors
if err != nil {
return BlockParams{}, err
}
return block, nil
}

// blocks are stored as triplets on disk, (MID + RID + Pos), check docs/format-index-file.go
Expand Down
9 changes: 6 additions & 3 deletions frac/sealed/seqids/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Provider struct {
func NewProvider(
indexReader *storage.IndexReader,
cacheMIDs *cache.Cache[[]byte],
cacheRIDs *cache.Cache[[]byte],
cacheRIDs *cache.Cache[BlockRIDs],
cacheParams *cache.Cache[BlockParams],
table *Table,
fracVersion config.BinaryDataVersion,
Expand Down Expand Up @@ -76,13 +76,16 @@ func (p *Provider) RID(lid seq.LID) (seq.RID, error) {

func (p *Provider) fillRIDs(blockIndex uint32, dst *unpackCache) error {
if dst.blockIndex != int(blockIndex) {
block, err := p.loader.GetRIDsBlock(blockIndex, dst.values[:0])
block, err := p.loader.GetRIDsBlock(blockIndex)
if err != nil {
return err
}
dst.blockIndex = int(blockIndex)
dst.startLID = p.loader.table.BlockStartLID(blockIndex)
dst.values = block.Values
// we have to copy `block.Values` because we store them in `cache.Cache[BlockRIDs]`,
// but `dst *unpackCache` might put its `values` in sync.Pool on `release()`, and they
// will be reused and corrupted
dst.values = append(dst.values[:0], block.Values...)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion fracmanager/cache_maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (cm *CacheMaintainer) CreateSortDocsCache() *cache.Cache[[]byte] {
func (cm *CacheMaintainer) CreateIndexCache() *frac.IndexCache {
return &frac.IndexCache{
MIDs: newCache[[]byte](cm, midsName),
RIDs: newCache[[]byte](cm, ridsName),
RIDs: newCache[seqids.BlockRIDs](cm, ridsName),
Params: newCache[seqids.BlockParams](cm, paramsName),
LIDs: newCache[*lids.Block](cm, lidsName),
Tokens: newCache[*token.Block](cm, tokensName),
Expand Down
Loading