From d73e08a67285bbf2cf575df8e8f207bc8aada6e8 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Mon, 26 Jan 2026 09:41:32 +0400 Subject: [PATCH] bitpack encoding for LID and MID blocks --- config/frac_version.go | 4 +- consts/consts.go | 2 +- frac/sealed/lids/block.go | 123 +++++--- frac/sealed/lids/block2.go | 90 ++++++ frac/sealed/lids/block_test.go | 480 +++++++++++++++++++++++++++++ frac/sealed/lids/loader.go | 25 +- frac/sealed/seqids/blocks.go | 45 ++- frac/sealed/seqids/blocks_test.go | 78 +++++ frac/sealed/seqids/loader.go | 6 +- frac/sealed/seqids/provider.go | 2 +- frac/sealed/seqids/unpack_cache.go | 3 + go.mod | 1 + go.sum | 2 + packer/bitpack_packer.go | 89 ++++++ packer/bitpack_unpacker.go | 185 +++++++++++ packer/bytes_unpacker.go | 9 + 16 files changed, 1090 insertions(+), 54 deletions(-) create mode 100644 frac/sealed/lids/block2.go create mode 100644 frac/sealed/lids/block_test.go create mode 100644 frac/sealed/seqids/blocks_test.go create mode 100644 packer/bitpack_packer.go create mode 100644 packer/bitpack_unpacker.go diff --git a/config/frac_version.go b/config/frac_version.go index d3ff1b14..012eb304 100644 --- a/config/frac_version.go +++ b/config/frac_version.go @@ -9,6 +9,8 @@ const ( BinaryDataV1 // BinaryDataV2 - MIDs stored in nanoseconds BinaryDataV2 + // BinaryDataV3 - MIDs and LIDs encoded in bitpack, variable LID block size + BinaryDataV3 ) -const CurrentFracVersion = BinaryDataV2 +const CurrentFracVersion = BinaryDataV3 diff --git a/consts/consts.go b/consts/consts.go index 46a81d55..7e6111e8 100644 --- a/consts/consts.go +++ b/consts/consts.go @@ -12,7 +12,7 @@ const ( DummyMID = 0 IDsPerBlock = int(4 * units.KiB) - LIDBlockCap = int(64 * units.KiB) + LIDBlockCap = int(4 * units.KiB) RegularBlockSize = int(16 * units.KiB) DefaultMaintenanceDelay = time.Second diff --git a/frac/sealed/lids/block.go b/frac/sealed/lids/block.go index 08884c5c..fe50d031 100644 --- a/frac/sealed/lids/block.go +++ b/frac/sealed/lids/block.go @@ -2,9 +2,9 @@ package lids import ( "encoding/binary" - "math" "unsafe" + "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/packer" ) @@ -24,19 +24,38 @@ func (b *Block) getLIDs(i int) []uint32 { } func (b *Block) Pack(dst []byte) []byte { - lastLID := int64(0) - last := b.getCount() - 1 - for i := 0; i <= last; i++ { - for _, lid := range b.getLIDs(i) { - dst = binary.AppendVarint(dst, int64(lid)-lastLID) - lastLID = int64(lid) - } + // TODO store next flags into a single byte + // write b.IsLastLID as a dedicated uint32 in the header of block + switch b.IsLastLID { + case true: + dst = binary.LittleEndian.AppendUint32(dst, 1) + case false: + dst = binary.LittleEndian.AppendUint32(dst, 0) + } + + fullBlock := len(b.LIDs) == consts.LIDBlockCap + switch fullBlock { + case true: + dst = binary.LittleEndian.AppendUint32(dst, 1) + case false: + dst = binary.LittleEndian.AppendUint32(dst, 0) + } - if i < last || b.IsLastLID { - // when we add this value to prev we must get -1 (or math.MaxUint32 for uint32) - // it is the end-marker; see `Block.Unpack()` - dst = binary.AppendVarint(dst, -1-lastLID) + if len(b.LIDs) == consts.LIDBlockCap { + offsetPacker := packer.NewBitpacker(dst, 128) + offsetPacker.Append(b.Offsets) + dst = offsetPacker.Close() + lidPacker := packer.NewBitpacker(dst, 128) + dst = lidPacker.Append4kBlock(b.LIDs) + } else { + lidPacker := packer.NewBitpacker(dst, 128) + sep := []uint32{0} + last := b.getCount() - 1 + for i := 0; i <= last; i++ { + lidPacker.Append(b.getLIDs(i)) + lidPacker.Append(sep) } + dst = lidPacker.Close() } return dst } @@ -49,41 +68,63 @@ func (b *Block) GetSizeBytes() int { return blockSize + uint32Size*cap(b.LIDs) + uint32Size*cap(b.Offsets) } +// TODO add support of the previous versions func (b *Block) Unpack(data []byte, buf *UnpackBuffer) error { - var lid, offset uint32 - - b.IsLastLID = true - - buf.lids = buf.lids[:0] - buf.offsets = buf.offsets[:0] - buf.offsets = append(buf.offsets, 0) // first offset is always zero - unpacker := packer.NewBytesUnpacker(data) - for unpacker.Len() > 0 { - delta, err := unpacker.GetVarint() - if err != nil { - return err - } - lid += uint32(delta) + buf.Reset() + + // read IsLastLID from a dedicated uint32 + isLastLIDValue := unpacker.GetUint32() + switch isLastLIDValue { + case 1: + b.IsLastLID = true + case 0: + b.IsLastLID = false + } - if lid == math.MaxUint32 { // end of LIDs of current TID, see `Block.Pack()` method - offset = uint32(len(buf.lids)) - buf.offsets = append(buf.offsets, offset) - lid -= uint32(delta) - continue + fullBlock := unpacker.GetUint32() + switch fullBlock { + case 1: + // block has exactly consts.LIDBlockCap LIDs + decompressedChunk := buf.decompressed + compressedChunk := buf.compressed + offsetUnpacker := packer.NewBitpackUnpacker(unpacker, decompressedChunk, compressedChunk) + for { + offsetChunk, ok := offsetUnpacker.NextChunk() + if !ok { + break + } + b.Offsets = append(b.Offsets, offsetChunk...) } - buf.lids = append(buf.lids, lid) - } + lidUnpacker := packer.NewBitpackUnpacker(unpacker, decompressedChunk, compressedChunk) + b.LIDs = lidUnpacker.AllocateAndRead4kChunk() + case 0: + decompressedChunk := buf.decompressed + compressedChunk := buf.compressed + buf.offsets = append(buf.offsets, 0) + + bitpackUnpacker := packer.NewBitpackUnpacker(unpacker, decompressedChunk, compressedChunk) + pos := 0 + for { + chunk, ok := bitpackUnpacker.NextChunk() + if !ok { + break + } + + for _, lid := range chunk { + if pos > 0 && lid == 0 { + b.LIDs = append(b.LIDs, buf.lids...) + buf.lids = buf.lids[:0] + buf.offsets = append(buf.offsets, uint32(pos)) + } else { + buf.lids = append(buf.lids, lid) + pos++ + } + } + } - if int(offset) < len(buf.lids) { - b.IsLastLID = false - buf.offsets = append(buf.offsets, uint32(len(buf.lids))) + b.Offsets = append([]uint32{}, buf.offsets...) } - - // copy from buffer - b.LIDs = append([]uint32{}, buf.lids...) - b.Offsets = append([]uint32{}, buf.offsets...) - return nil } diff --git a/frac/sealed/lids/block2.go b/frac/sealed/lids/block2.go new file mode 100644 index 00000000..77fb26a9 --- /dev/null +++ b/frac/sealed/lids/block2.go @@ -0,0 +1,90 @@ +package lids + +import ( + "encoding/binary" + "math" + "unsafe" + + "github.com/ozontech/seq-db/packer" +) + +// TODO remove legacy block +type Block2 struct { + LIDs []uint32 + Offsets []uint32 + // todo remove this legacy field + IsLastLID bool +} + +func (b *Block2) getCount() int { + return len(b.Offsets) - 1 +} + +func (b *Block2) getLIDs(i int) []uint32 { + return b.LIDs[b.Offsets[i]:b.Offsets[i+1]] +} + +func (b *Block2) Pack(dst []byte) []byte { + lastLID := int64(0) + last := b.getCount() - 1 + for i := 0; i <= last; i++ { + for _, lid := range b.getLIDs(i) { + dst = binary.AppendVarint(dst, int64(lid)-lastLID) + lastLID = int64(lid) + } + + if i < last || b.IsLastLID { + // when we add this value to prev we must get -1 (or math.MaxUint32 for uint32) + // it is the end-marker; see `Block.Unpack()` + dst = binary.AppendVarint(dst, -1-lastLID) + } + } + return dst +} + +func (b *Block2) GetSizeBytes() int { + const ( + uint32Size = int(unsafe.Sizeof(uint32(0))) + blockSize = int(unsafe.Sizeof(*b)) + ) + return blockSize + uint32Size*cap(b.LIDs) + uint32Size*cap(b.Offsets) +} + +func (b *Block2) Unpack(data []byte, buf *UnpackBuffer) error { + var lid, offset uint32 + + b.IsLastLID = true + + buf.lids = buf.lids[:0] + buf.offsets = buf.offsets[:0] + buf.offsets = append(buf.offsets, 0) // first offset is always zero + + unpacker := packer.NewBytesUnpacker(data) + for unpacker.Len() > 0 { + delta, err := unpacker.GetVarint() + if err != nil { + return err + } + lid += uint32(delta) + + if lid == math.MaxUint32 { // end of LIDs of current TID, see `Block.Pack()` method + offset = uint32(len(buf.lids)) + buf.offsets = append(buf.offsets, offset) + lid -= uint32(delta) + continue + } + + buf.lids = append(buf.lids, lid) + } + + if int(offset) < len(buf.lids) { + b.IsLastLID = false + buf.offsets = append(buf.offsets, uint32(len(buf.lids))) + } + + // copy from buffer + b.LIDs = append([]uint32{}, buf.lids...) + b.Offsets = append([]uint32{}, buf.offsets...) + + return nil +} diff --git a/frac/sealed/lids/block_test.go b/frac/sealed/lids/block_test.go new file mode 100644 index 00000000..27d49a70 --- /dev/null +++ b/frac/sealed/lids/block_test.go @@ -0,0 +1,480 @@ +package lids + +import ( + "fmt" + "math" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBlock_Pack_Unpack(t *testing.T) { + testCases := []struct { + name string + lids []uint32 + offsets []uint32 + isLastLID bool + setup func() ([]uint32, []uint32) + }{ + { + name: "SingleToken", + lids: []uint32{100, 150, 200, 250}, + offsets: []uint32{0, 4}, + isLastLID: true, + }, + { + name: "MultipleTokens", + lids: []uint32{100, 150, 200, 250, 300, 350}, + offsets: []uint32{0, 3, 6}, + isLastLID: true, + }, + { + name: "NotLastLID", + lids: []uint32{100, 150, 200}, + offsets: []uint32{0, 3}, + isLastLID: false, + }, + { + name: "SingleLID", + lids: []uint32{100}, + offsets: []uint32{0, 1}, + isLastLID: true, + }, + { + name: "ConsecutiveLIDs", + lids: func() []uint32 { + lids := make([]uint32, 50) + for i := range lids { + lids[i] = uint32(1000 + i) + } + return lids + }(), + offsets: []uint32{0, 50}, + isLastLID: true, + }, + { + name: "LargeLIDs", + lids: []uint32{math.MaxUint32 - 100, math.MaxUint32 - 50, math.MaxUint32 - 10}, + offsets: []uint32{0, 3}, + isLastLID: true, + }, + { + name: "MultipleTokens_IsLastLID_False", + lids: []uint32{100, 150, 200, 250, 300, 350, 400, 450}, + offsets: []uint32{0, 3, 6, 8}, + isLastLID: false, + }, + { + name: "ManyTokens", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 0) + offsets := []uint32{0} + startLID := uint32(100) + for i := 0; i < 10; i++ { + for j := 0; j < 3; j++ { + lids = append(lids, startLID+uint32(i*10+j)) + } + offsets = append(offsets, uint32(len(lids))) + startLID += 30 + } + return lids, offsets + }, + isLastLID: true, + }, + { + name: "LargeBlock", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 0, 200) + startLID := uint32(1000) + for i := 0; i < 200; i++ { + lids = append(lids, startLID+uint32(i*10)) + } + return lids, []uint32{0, uint32(len(lids))} + }, + isLastLID: true, + }, + { + name: "LargeBlock_IsLastLID_False", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 0, 200) + startLID := uint32(1000) + for i := 0; i < 200; i++ { + lids = append(lids, startLID+uint32(i*10)) + } + return lids, []uint32{0, uint32(len(lids))} + }, + isLastLID: false, + }, + { + name: "LargeBlockWithMultipleTokens", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 0, 150) + offsets := []uint32{0} + startLID := uint32(1000) + groupSize := 30 + for group := 0; group < 5; group++ { + for i := 0; i < groupSize; i++ { + lids = append(lids, startLID+uint32(group*groupSize*10+i*10)) + } + offsets = append(offsets, uint32(len(lids))) + startLID += uint32(groupSize * 10) + } + return lids, offsets + }, + isLastLID: true, + }, + { + name: "LargeBlockWithMultipleTokens_IsLastLID_False", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 0, 150) + offsets := []uint32{0} + startLID := uint32(1000) + groupSize := 30 + for group := 0; group < 5; group++ { + for i := 0; i < groupSize; i++ { + lids = append(lids, startLID+uint32(group*groupSize*10+i*10)) + } + offsets = append(offsets, uint32(len(lids))) + startLID += uint32(groupSize * 10) + } + return lids, offsets + }, + isLastLID: false, + }, + { + name: "Exactly128LIDs", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 128) + startLID := uint32(1000) + for i := 0; i < 128; i++ { + lids[i] = startLID + uint32(i*5) + } + return lids, []uint32{0, 128} + }, + isLastLID: true, + }, + { + name: "Exactly128LIDs_IsLastLID_False", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 128) + startLID := uint32(1000) + for i := 0; i < 128; i++ { + lids[i] = startLID + uint32(i*5) + } + return lids, []uint32{0, 128} + }, + isLastLID: false, + }, + { + name: "127LIDs", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 127) + startLID := uint32(1000) + for i := 0; i < 127; i++ { + lids[i] = startLID + uint32(i*5) + } + return lids, []uint32{0, 127} + }, + isLastLID: true, + }, + { + name: "129LIDs", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 129) + startLID := uint32(1000) + for i := 0; i < 129; i++ { + lids[i] = startLID + uint32(i*5) + } + return lids, []uint32{0, 129} + }, + isLastLID: true, + }, + { + name: "129LIDs_IsLastLID_False", + setup: func() ([]uint32, []uint32) { + lids := make([]uint32, 129) + startLID := uint32(1000) + for i := 0; i < 129; i++ { + lids[i] = startLID + uint32(i*5) + } + return lids, []uint32{0, 129} + }, + isLastLID: false, + }, + { + name: "64k_65536_LIDs", + setup: func() ([]uint32, []uint32) { + size := 65536 + lids := make([]uint32, size) + startLID := uint32(1000) + for i := 0; i < size; i++ { + lids[i] = startLID + uint32(i) + } + return lids, []uint32{0, uint32(size)} + }, + isLastLID: false, + }, + { + name: "64k_65539_LIDs", + setup: func() ([]uint32, []uint32) { + size := 65539 + lids := make([]uint32, size) + startLID := uint32(1000) + for i := 0; i < size; i++ { + lids[i] = startLID + uint32(i) + } + return lids, []uint32{0, uint32(size)} + }, + isLastLID: false, + }, + { + name: "64k_65533_LIDs", + setup: func() ([]uint32, []uint32) { + size := 65533 + lids := make([]uint32, size) + startLID := uint32(1000) + for i := 0; i < size; i++ { + lids[i] = startLID + uint32(i) + } + return lids, []uint32{0, uint32(size)} + }, + isLastLID: false, + }, + { + name: "IsLastLID_True", + lids: []uint32{100, 150, 200, 250, 300}, + offsets: []uint32{0, 5}, + isLastLID: true, + }, + { + name: "IsLastLID_False", + lids: []uint32{100, 150, 200, 250, 300}, + offsets: []uint32{0, 5}, + isLastLID: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var lids []uint32 + var offsets []uint32 + + if tc.setup != nil { + lids, offsets = tc.setup() + } else { + lids = tc.lids + offsets = tc.offsets + } + + block := &Block{ + LIDs: lids, + Offsets: offsets, + IsLastLID: tc.isLastLID, + } + + packed := block.Pack(nil) + require.NotEmpty(t, packed, "packed data should not be empty") + + unpacked := &Block{} + buf := &UnpackBuffer{} + err := unpacked.Unpack(packed, buf) + require.NoError(t, err, "unpack should succeed") + + assert.Equal(t, block.LIDs, unpacked.LIDs, "LIDs should match") + assert.Equal(t, block.Offsets, unpacked.Offsets, "Offsets should match") + assert.Equal(t, block.IsLastLID, unpacked.IsLastLID, "IsLastLID should match") + }) + } +} + +func TestBlock_Pack_4k(t *testing.T) { + lids := make([]uint32, 4*1024) + startLID := uint32(1000) + for i := 0; i < 4*1024; i++ { + lids[i] = startLID + uint32(i) + } + + block := &Block{ + LIDs: lids, + Offsets: []uint32{0, 10, 50, 100, 1000, 1500, 2000, 2500, 3000, 4 * 1024}, + IsLastLID: true, + } + + packed := block.Pack(nil) + fmt.Println("packed len: ", len(packed)) + require.NotEmpty(t, packed, "packed data should not be empty") + + unpacked := &Block{} + buf := &UnpackBuffer{} + err := unpacked.Unpack(packed, buf) + require.NoError(t, err, "unpack should succeed") + + assert.Equal(t, block.LIDs, unpacked.LIDs, "LIDs should match") + assert.Equal(t, block.Offsets, unpacked.Offsets, "Offsets should match") + assert.Equal(t, block.IsLastLID, unpacked.IsLastLID, "IsLastLID should match") +} + +func TestBlock_Pack_4k_Dense(t *testing.T) { + lids := make([]uint32, 4*1024) + startLID := uint32(1000) + for i := 0; i < 4*1024; i++ { + lids[i] = startLID + uint32(i) + } + offsets := make([]uint32, 2*1024) + for i := 0; i < 2*1024; i++ { + offsets[i] = uint32(i) + } + offsets = append(offsets, 4*1024) + + block := &Block{ + LIDs: lids, + Offsets: offsets, + IsLastLID: true, + } + + packed := block.Pack(nil) + fmt.Println("packed len: ", len(packed)) + require.NotEmpty(t, packed, "packed data should not be empty") + + unpacked := &Block{} + buf := &UnpackBuffer{} + err := unpacked.Unpack(packed, buf) + require.NoError(t, err, "unpack should succeed") + + assert.Equal(t, block.LIDs, unpacked.LIDs, "LIDs should match") + assert.Equal(t, block.Offsets, unpacked.Offsets, "Offsets should match") + assert.Equal(t, block.IsLastLID, unpacked.IsLastLID, "IsLastLID should match") +} + +func TestBlock_Pack_64k(t *testing.T) { + lids := make([]uint32, 64*1024) + startLID := uint32(1000) + for i := 0; i < 64*1024; i++ { + lids[i] = startLID + uint32(i) + } + + block := &Block{ + LIDs: lids, + Offsets: []uint32{0, 10, 50, 100, 1000, 1500, 2000, 2500, 3000, 64 * 1024}, + IsLastLID: true, + } + + packed := block.Pack(nil) + fmt.Println("packed len: ", len(packed)) + require.NotEmpty(t, packed, "packed data should not be empty") + + unpacked := &Block{} + buf := &UnpackBuffer{} + err := unpacked.Unpack(packed, buf) + require.NoError(t, err, "unpack should succeed") + + assert.Equal(t, block.LIDs, unpacked.LIDs, "LIDs should match") + assert.Equal(t, block.Offsets, unpacked.Offsets, "Offsets should match") + assert.Equal(t, block.IsLastLID, unpacked.IsLastLID, "IsLastLID should match") +} + +func TestBlock_Pack_64k_Dense(t *testing.T) { + lids := make([]uint32, 64*1024) + startLID := uint32(1000) + for i := 0; i < 64*1024; i++ { + lids[i] = startLID + uint32(i) + } + offsets := make([]uint32, 32*1024) + for i := 0; i < 32*1024; i++ { + offsets[i] = uint32(i) + } + offsets = append(offsets, 64*1024) + + block := &Block{ + LIDs: lids, + Offsets: offsets, + IsLastLID: true, + } + + packed := block.Pack(nil) + fmt.Println("packed len: ", len(packed)) + require.NotEmpty(t, packed, "packed data should not be empty") + + unpacked := &Block{} + buf := &UnpackBuffer{} + err := unpacked.Unpack(packed, buf) + require.NoError(t, err, "unpack should succeed") + + assert.Equal(t, block.LIDs, unpacked.LIDs, "LIDs should match") + assert.Equal(t, block.Offsets, unpacked.Offsets, "Offsets should match") + assert.Equal(t, block.IsLastLID, unpacked.IsLastLID, "IsLastLID should match") +} + +func TestBlock_Pack_Unpack_ReuseBuffer(t *testing.T) { + // Test that UnpackBuffer can be reused + block1 := &Block{ + LIDs: []uint32{100, 150, 200}, + Offsets: []uint32{0, 3}, + IsLastLID: true, + } + + block2 := &Block{ + LIDs: []uint32{300, 350, 400, 450}, + Offsets: []uint32{0, 4}, + IsLastLID: true, + } + + packed1 := block1.Pack(nil) + packed2 := block2.Pack(nil) + + buf := &UnpackBuffer{} + + unpacked1 := &Block{} + err := unpacked1.Unpack(packed1, buf) + require.NoError(t, err) + assert.Equal(t, block1.LIDs, unpacked1.LIDs) + + // Reuse the same buffer + unpacked2 := &Block{} + err = unpacked2.Unpack(packed2, buf) + require.NoError(t, err) + assert.Equal(t, block2.LIDs, unpacked2.LIDs) +} + +func BenchmarkBlock_Pack(b *testing.B) { + lids := make([]uint32, 200) + startLID := uint32(1000) + for i := 0; i < 200; i++ { + lids[i] = startLID + uint32(i*10) + } + + block := &Block{ + LIDs: lids, + Offsets: []uint32{0, uint32(len(lids))}, + IsLastLID: true, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + block.Pack(nil) + } +} + +func BenchmarkBlock_Unpack(b *testing.B) { + lids := make([]uint32, 4*1024) + startLID := uint32(1000) + for i := 0; i < 4*1024; i++ { + lids[i] = startLID + uint32(i*10) + } + + block := &Block{ + LIDs: lids, + Offsets: []uint32{0, 100, 500, 600, 800, 1000, 1100, 1250, 1500, 2000, 2500, 3000, 3500, 4 * 1024}, + IsLastLID: true, + } + + packed := block.Pack(nil) + buf := &UnpackBuffer{} + unpacked := &Block{} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + unpacked.Unpack(packed, buf) + } +} diff --git a/frac/sealed/lids/loader.go b/frac/sealed/lids/loader.go index e5ee8db1..435974a3 100644 --- a/frac/sealed/lids/loader.go +++ b/frac/sealed/lids/loader.go @@ -6,8 +6,29 @@ import ( ) type UnpackBuffer struct { - lids []uint32 - offsets []uint32 + lids []uint32 + offsets []uint32 + decompressed []uint32 + compressed []uint32 +} + +func (b *UnpackBuffer) Reset() { + if b.lids == nil { + b.lids = make([]uint32, 0, 128) + } else { + b.lids = b.lids[:0] + } + if b.offsets == nil { + b.offsets = make([]uint32, 0, 8) + } else { + b.offsets = b.offsets[:0] + } + if b.decompressed == nil { + b.decompressed = make([]uint32, 0, 1024) + } + if b.compressed == nil { + b.compressed = make([]uint32, 0, 256) + } } // Loader is responsible for reading from disk, unpacking and caching LID. diff --git a/frac/sealed/seqids/blocks.go b/frac/sealed/seqids/blocks.go index f17f1be5..8a38bbca 100644 --- a/frac/sealed/seqids/blocks.go +++ b/frac/sealed/seqids/blocks.go @@ -3,8 +3,12 @@ package seqids import ( "encoding/binary" "errors" + "unsafe" + + "github.com/ronanh/intcomp" "github.com/ozontech/seq-db/config" + "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/seq" ) @@ -13,15 +17,46 @@ type BlockMIDs struct { } func (b BlockMIDs) Pack(dst []byte) []byte { - var prev uint64 - for _, mid := range b.Values { - dst = binary.AppendVarint(dst, int64(mid-prev)) - prev = mid + if len(b.Values) == consts.IDsPerBlock { + dst = binary.LittleEndian.AppendUint32(dst, 1) + + _, compressed := intcomp.CompressDeltaBinPackUint64(b.Values, nil) + dst = binary.LittleEndian.AppendUint32(dst, uint32(len(compressed))) + for _, val := range compressed { + dst = binary.LittleEndian.AppendUint64(dst, val) + } + } else { + dst = binary.LittleEndian.AppendUint32(dst, 0) + + var prev uint64 + for _, mid := range b.Values { + dst = binary.AppendVarint(dst, int64(mid-prev)) + prev = mid + } } return dst } -func (b *BlockMIDs) Unpack(data []byte, fracVersion config.BinaryDataVersion) error { +func (b *BlockMIDs) Unpack(data []byte, fracVersion config.BinaryDataVersion, cache *unpackCache) error { + if fracVersion >= config.BinaryDataV3 { + fastPath := binary.LittleEndian.Uint32(data) + data = data[4:] + + if fastPath == 1 { + valuesCount := binary.LittleEndian.Uint32(data) + data = data[4:] + + // TODO this is unsafe, we rely that we running on little-endian host + cache.compressed = cache.compressed[:valuesCount] + byteLen := int(valuesCount) * 8 + src := unsafe.Slice((*uint64)(unsafe.Pointer(unsafe.SliceData(data[:byteLen]))), valuesCount) + copy(cache.compressed, src) + + _, b.Values = intcomp.UncompressDeltaBinPackUint64(cache.compressed, b.Values) + return nil + } + } + values, err := unpackRawMIDsVarint(data, b.Values, fracVersion) if err != nil { return err diff --git a/frac/sealed/seqids/blocks_test.go b/frac/sealed/seqids/blocks_test.go new file mode 100644 index 00000000..98f1a668 --- /dev/null +++ b/frac/sealed/seqids/blocks_test.go @@ -0,0 +1,78 @@ +package seqids + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/config" + "github.com/ozontech/seq-db/consts" +) + +func TestBlockMIDs_Pack_Unpack(t *testing.T) { + tests := []struct { + name string + values []uint64 + fracVersion config.BinaryDataVersion + }{ + { + name: "SlowPath_SmallBlock", + values: []uint64{100, 200, 300, 400, 500}, + fracVersion: config.BinaryDataV3, + }, + { + name: "SlowPath_EmptyBlock", + values: []uint64{}, + fracVersion: config.BinaryDataV3, + }, + { + name: "SlowPath_SingleValue", + values: []uint64{12345678901234}, + fracVersion: config.BinaryDataV3, + }, + { + name: "FastPath_4kBlock", + values: generate4kMIDs(1000000000000, 1000000), + fracVersion: config.BinaryDataV3, + }, + { + name: "FastPath_4kBlock_LargeValues", + values: generate4kMIDs(0xFFFFFFFF00000000, 1), + fracVersion: config.BinaryDataV3, + }, + { + name: "FastPath_4kBlock_Sequential", + values: generate4kMIDs(0, 1), + fracVersion: config.BinaryDataV3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + original := BlockMIDs{Values: tt.values} + + packed := original.Pack(nil) + t.Logf("packed len: %d", len(packed)) + + cache := NewCache() + defer cache.Release() + + unpacked := BlockMIDs{} + err := unpacked.Unpack(packed, tt.fracVersion, cache) + require.NoError(t, err) + + require.Equal(t, len(original.Values), len(unpacked.Values), "length mismatch") + for i := range original.Values { + require.Equal(t, original.Values[i], unpacked.Values[i], "value mismatch at index %d", i) + } + }) + } +} + +func generate4kMIDs(base uint64, increment uint64) []uint64 { + values := make([]uint64, consts.IDsPerBlock) + for i := range values { + values[i] = base + uint64(i)*increment + } + return values +} diff --git a/frac/sealed/seqids/loader.go b/frac/sealed/seqids/loader.go index 1928e182..e6c79b98 100644 --- a/frac/sealed/seqids/loader.go +++ b/frac/sealed/seqids/loader.go @@ -34,7 +34,7 @@ type Loader struct { fracVersion config.BinaryDataVersion } -func (l *Loader) GetMIDsBlock(index uint32, buf []uint64) (BlockMIDs, error) { +func (l *Loader) GetMIDsBlock(index uint32, cache *unpackCache) (BlockMIDs, error) { // load binary from index data, err := l.cacheMIDs.GetWithError(index, func() ([]byte, int, error) { data, _, err := l.reader.ReadIndexBlock(l.midBlockIndex(index), nil) @@ -48,8 +48,8 @@ func (l *Loader) GetMIDsBlock(index uint32, buf []uint64) (BlockMIDs, error) { return BlockMIDs{}, err } // unpack - block := BlockMIDs{Values: buf} - if err := block.Unpack(data, l.fracVersion); err != nil { + block := BlockMIDs{Values: cache.values[:0]} + if err := block.Unpack(data, l.fracVersion, cache); err != nil { return BlockMIDs{}, err } return block, nil diff --git a/frac/sealed/seqids/provider.go b/frac/sealed/seqids/provider.go index dff698c9..8c32760f 100644 --- a/frac/sealed/seqids/provider.go +++ b/frac/sealed/seqids/provider.go @@ -55,7 +55,7 @@ func (p *Provider) MID(lid seq.LID) (seq.MID, error) { func (p *Provider) fillMIDs(blockIndex uint32, dst *unpackCache) error { if dst.blockIndex != int(blockIndex) { - block, err := p.loader.GetMIDsBlock(blockIndex, dst.values[:0]) + block, err := p.loader.GetMIDsBlock(blockIndex, dst) if err != nil { return err } diff --git a/frac/sealed/seqids/unpack_cache.go b/frac/sealed/seqids/unpack_cache.go index f621dd57..443d8886 100644 --- a/frac/sealed/seqids/unpack_cache.go +++ b/frac/sealed/seqids/unpack_cache.go @@ -10,6 +10,7 @@ type unpackCache struct { blockIndex int startLID uint32 values []uint64 + compressed []uint64 // buffer for decompressing MID blocks } var cachePool = sync.Pool{} @@ -26,6 +27,7 @@ func NewCache() *unpackCache { blockIndex: -1, startLID: 0, values: make([]uint64, 0, defaultValsCapacity), + compressed: make([]uint64, 0, defaultValsCapacity), } } @@ -33,6 +35,7 @@ func (c *unpackCache) reset() *unpackCache { c.blockIndex = -1 c.startLID = 0 c.values = c.values[:0] + c.compressed = c.compressed[:0] return c } diff --git a/go.mod b/go.mod index 6ba7a1d8..6c0d07df 100644 --- a/go.mod +++ b/go.mod @@ -74,6 +74,7 @@ require ( github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.16.1 // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect + github.com/ronanh/intcomp v1.1.1 // indirect github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 // indirect github.com/uber/jaeger-client-go v2.25.0+incompatible // indirect go.shabbyrobe.org/gocovmerge v0.0.0-20230507111327-fa4f82cfbf4d // indirect diff --git a/go.sum b/go.sum index 92b59e95..b30da514 100644 --- a/go.sum +++ b/go.sum @@ -231,6 +231,8 @@ github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlT github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/ronanh/intcomp v1.1.1 h1:+1bGV/wEBiHI0FvzS7RHgzqOpfbBJzLIxkqMJ9e6yxY= +github.com/ronanh/intcomp v1.1.1/go.mod h1:7FOLy3P3Zj3er/kVrU/pl+Ql7JFZj7bwliMGketo0IU= github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 h1:GHRpF1pTW19a8tTFrMLUcfWwyC0pnifVo2ClaLq+hP8= github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46/go.mod h1:uAQ5PCi+MFsC7HjREoAz1BU+Mq60+05gifQSsHSDG/8= github.com/spf13/afero v1.2.1 h1:qgMbHoJbPbw579P+1zVY+6n4nIFuIchaIjzZ/I/Yq8M= diff --git a/packer/bitpack_packer.go b/packer/bitpack_packer.go new file mode 100644 index 00000000..b99e6ed9 --- /dev/null +++ b/packer/bitpack_packer.go @@ -0,0 +1,89 @@ +package packer + +import ( + "encoding/binary" + + "github.com/ronanh/intcomp" +) + +// TODO the whole file looks pretty unreadable + +type Bitpacker struct { + dst []byte // output buffer + // TODO need to remove this chunk, use slice from input slice instead + chunk []uint32 // temporary chunk to feed to intcomp library + compressedChunk []uint32 // temporary chunk to which intcomp will write compressed data + chunkSize int +} + +func NewBitpacker(dst []byte, chunkSize int) *Bitpacker { + if chunkSize <= 0 { + chunkSize = 1024 // default chunk size + } + return &Bitpacker{ + dst: dst, + chunk: make([]uint32, 0, chunkSize), + compressedChunk: make([]uint32, 0, chunkSize/4), + chunkSize: chunkSize, + } +} + +// Append adds values to the current chunk. When the chunk is full, it compresses +// and writes it to dst. +func (b *Bitpacker) Append(values []uint32) { + // TODO this is slow af + // TODO copy values to chunk fully if values slice is large + for _, val := range values { + b.chunk = append(b.chunk, val) + + if len(b.chunk) == b.chunkSize { + b.compressChunk() + } + } +} + +// TODO use intcomp directly for that +func (b *Bitpacker) Append4kBlock(values []uint32) []byte { + _, compressed := intcomp.CompressDeltaBinPackUint32(values, nil) + b.dst = binary.AppendVarint(b.dst, int64(len(compressed))) + + // TODO memcpy (arrow?) + for _, val := range compressed { + b.dst = binary.LittleEndian.AppendUint32(b.dst, val) + } + return b.dst +} + +func (b *Bitpacker) compressChunk() { + if len(b.chunk) == 0 { + return + } + + _, b.compressedChunk = intcomp.CompressDeltaBinPackUint32(b.chunk, b.compressedChunk) + b.dst = binary.AppendVarint(b.dst, int64(len(b.compressedChunk))) + + for _, val := range b.compressedChunk { + b.dst = binary.LittleEndian.AppendUint32(b.dst, val) + } + + b.chunk = b.chunk[:0] + b.compressedChunk = b.compressedChunk[:0] +} + +// Close writes a residual (less than 128 number) as varints +func (b *Bitpacker) Close() []byte { + // append 0 - an indicator of the last chunk with varints + b.dst = binary.AppendVarint(b.dst, 0) + + // append number of varints first + b.dst = binary.AppendVarint(b.dst, int64(len(b.chunk))) + + for _, lid := range b.chunk { + b.dst = binary.AppendVarint(b.dst, int64(lid)) + } + + // TODO remove? + // append the trailer + b.dst = binary.AppendVarint(b.dst, -1) + return b.dst +} diff --git a/packer/bitpack_unpacker.go b/packer/bitpack_unpacker.go new file mode 100644 index 00000000..a2f41175 --- /dev/null +++ b/packer/bitpack_unpacker.go @@ -0,0 +1,185 @@ +package packer + +import ( + "unsafe" + + "github.com/ronanh/intcomp" +) + +// TODO the whole file looks pretty unreadable + +// TODO use golang iter? +// ChunkIterator allows to iterate of chunks of fixed number (usually 128). The last chunk might be less +// than 128 numbers +type ChunkIterator interface { + NextChunk() ([]uint32, bool) +} + +type BitpackUnpacker struct { + unpacker *BytesUnpacker + + // TODO bad design: a caller must provided dst buffer on Next() call, we shall not own it + decompressedBuf []uint32 // descompressed buf, used to return in iterator + compressedBuf []uint32 // temporary buf, intcomp works on top of []uint32 slices + done bool +} + +var _ ChunkIterator = (*BitpackUnpacker)(nil) + +func isLittleEndian() bool { + // TODO check binary.NativeEndian == littleEndian? + return true +} + +func NewBitpackUnpacker(unpacker *BytesUnpacker, decompressedBuf []uint32, compressedBuf []uint32) *BitpackUnpacker { + return &BitpackUnpacker{ + unpacker: unpacker, + decompressedBuf: decompressedBuf, + compressedBuf: compressedBuf, + } +} + +// SkipChunks allows to skip chunks to navigate directly to chunk needed +func (b *BitpackUnpacker) SkipChunks(chunks int) bool { + for i := 0; i < chunks; i++ { + if !b.SkipChunk() { + return false + } + } + return true +} + +func (b *BitpackUnpacker) SkipChunk() bool { + if b.done || b.unpacker.Len() == 0 { + return false + } + + blockLen, err := b.unpacker.GetVarint() + if err != nil { + return false + } + + if blockLen == 0 { + b.readVarintChunk() + // we return false, since varint chunk is always the last, so there is no chunks left + return false + } + + // -1 indicates end of all data + if blockLen < 0 { + return false + } + b.unpacker.SkipUints32(int(blockLen)) + return true +} + +// TODO use directly intcomp for that +func (b *BitpackUnpacker) AllocateAndRead4kChunk() []uint32 { + blockLen, err := b.unpacker.GetVarint() + if err != nil { + return nil + } + + b.compressedBuf = b.compressedBuf[:0] + b.readUint32Block(blockLen) + + _, decompressed := intcomp.UncompressDeltaBinPackUint32(b.compressedBuf, nil) + return decompressed +} + +// NextChunk returns the next decompressed chunk of values. +// Returns nil, false when all chunks have been read. +func (b *BitpackUnpacker) NextChunk() ([]uint32, bool) { + if b.done || b.unpacker.Len() == 0 { + return nil, false + } + + // read the start of the chunk. + // TODO it's a space overhead + blockLen, err := b.unpacker.GetVarint() + if err != nil { + return nil, false + } + + if blockLen == 0 { + return b.readVarintChunk() + } + if blockLen < 0 { + return nil, false + } + + b.compressedBuf = b.compressedBuf[:0] + b.readUint32Block(blockLen) + + b.decompressedBuf = b.decompressedBuf[:0] + _, b.decompressedBuf = intcomp.UncompressDeltaBinPackUint32(b.compressedBuf, b.decompressedBuf) + return b.decompressedBuf, true +} + +// readVarintChunk reads the varint chunk (there is at most one at the end) +func (b *BitpackUnpacker) readVarintChunk() ([]uint32, bool) { + if b.done { + return nil, false + } + + varintCount, err := b.unpacker.GetVarint() + if err != nil { + return nil, false + } + + if varintCount == 0 { + sentinel, err := b.unpacker.GetVarint() + if err != nil || sentinel != -1 { + return nil, false + } + b.done = true + return nil, false + } + + chunk := b.decompressedBuf[0:int(varintCount)] + for i := int64(0); i < varintCount; i++ { + val, err := b.unpacker.GetVarint() + if err != nil { + return nil, false + } + chunk[i] = uint32(val) + } + + sentinel, err := b.unpacker.GetVarint() + if err != nil || sentinel != -1 { + return nil, false + } + + b.done = true + return chunk, true +} + +// readUint32Block reads the next blockLen bytes of data into compressedBuf - a []uint32 buffer +func (b *BitpackUnpacker) readUint32Block(blockLen int64) { + if isLittleEndian() { + // TODO use apache arrow or some other lib to avoid this shuit? + if cap(b.compressedBuf) < int(blockLen) { + b.compressedBuf = make([]uint32, blockLen) + } else { + b.compressedBuf = b.compressedBuf[:blockLen] + } + + byteCount := int(blockLen) * 4 + buf := b.unpacker.GetBuffer() + if len(buf) < byteCount { + b.compressedBuf = b.compressedBuf[:0] + for i := int64(0); i < blockLen; i++ { + b.compressedBuf = append(b.compressedBuf, b.unpacker.GetUint32()) + } + } else { + src := unsafe.Slice((*uint32)(unsafe.Pointer(unsafe.SliceData(buf[:byteCount]))), blockLen) + copy(b.compressedBuf, src) + b.unpacker.SkipUints32(int(blockLen)) + } + } else { + // slow path, unpack with binary.LittleEndian.Uint32 + for i := int64(0); i < blockLen; i++ { + b.compressedBuf = append(b.compressedBuf, b.unpacker.GetUint32()) + } + } +} diff --git a/packer/bytes_unpacker.go b/packer/bytes_unpacker.go index a5e3fd59..1f728001 100644 --- a/packer/bytes_unpacker.go +++ b/packer/bytes_unpacker.go @@ -33,6 +33,15 @@ func (u *BytesUnpacker) GetUint32() uint32 { return val } +func (u *BytesUnpacker) SkipUints32(skipCount int) { + skipBytes := 4 * skipCount + u.buf = u.buf[skipBytes:] +} + +func (u *BytesUnpacker) GetBuffer() []byte { + return u.buf +} + func (u *BytesUnpacker) GetBinary() []byte { l := u.GetUint32() val := u.buf[:l]