diff --git a/consts/consts.go b/consts/consts.go index 46a81d55..1055d5d1 100644 --- a/consts/consts.go +++ b/consts/consts.go @@ -50,6 +50,7 @@ const ( // known extensions MetaFileSuffix = ".meta" + WalFileSuffix = ".wal" DocsFileSuffix = ".docs" DocsDelFileSuffix = ".docs.del" diff --git a/frac/active.go b/frac/active.go index e16b48a1..3cd418e5 100644 --- a/frac/active.go +++ b/frac/active.go @@ -2,6 +2,7 @@ package frac import ( "context" + "fmt" "io" "math" "os" @@ -55,6 +56,7 @@ type Active struct { metaFile *os.File metaReader storage.DocBlocksReader + walReader *storage.WalReader writer *ActiveWriter indexer *ActiveIndexer @@ -79,7 +81,34 @@ func NewActive( cfg *Config, ) *Active { docsFile, docsStats := mustOpenFile(baseFileName+consts.DocsFileSuffix, config.SkipFsync) - metaFile, metaStats := mustOpenFile(baseFileName+consts.MetaFileSuffix, config.SkipFsync) + + var metaFile *os.File + var metaStats os.FileInfo + var writer *ActiveWriter + var metaReader storage.DocBlocksReader + var walReader *storage.WalReader + var metaSize uint64 + + legacyMetaFileName := baseFileName + consts.MetaFileSuffix + if _, err := os.Stat(legacyMetaFileName); err == nil { + // .meta file exists + metaFile, metaStats = mustOpenFile(legacyMetaFileName, config.SkipFsync) + metaSize = uint64(metaStats.Size()) + metaReader = storage.NewDocBlocksReader(readLimiter, metaFile) + writer = NewActiveWriterLegacy(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync) + logger.Info("using legacy meta file format", zap.String("fraction", baseFileName)) + } else { + logger.Info("using new WAL format", zap.String("fraction", baseFileName)) + walFileName := baseFileName + consts.WalFileSuffix + metaFile, metaStats = mustOpenFile(walFileName, config.SkipFsync) + metaSize = uint64(metaStats.Size()) + writer = NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync) + var err error + walReader, err = storage.NewWalReader(readLimiter, metaFile, baseFileName) + if err != nil { + logger.Fatal("failed to initialize WAL reader", zap.String("fraction", baseFileName), zap.Error(err)) + } + } f := &Active{ TokenList: NewActiveTokenList(config.IndexWorkers), @@ -95,13 +124,14 @@ func NewActive( sortReader: storage.NewDocsReader(readLimiter, docsFile, sortCache), metaFile: metaFile, - metaReader: storage.NewDocBlocksReader(readLimiter, metaFile), + metaReader: metaReader, + walReader: walReader, indexer: activeIndexer, - writer: NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync), + writer: writer, BaseFileName: baseFileName, - info: common.NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size())), + info: common.NewInfo(baseFileName, uint64(docsStats.Size()), metaSize), Config: cfg, } @@ -133,6 +163,81 @@ func mustOpenFile(name string, skipFsync bool) (*os.File, os.FileInfo) { } func (f *Active) Replay(ctx context.Context) error { + walFileName := f.BaseFileName + consts.WalFileSuffix + if _, err := os.Stat(walFileName); err == nil { + return f.replayWalFile(ctx) + } + + metaFileName := f.BaseFileName + consts.MetaFileSuffix + if _, err := os.Stat(metaFileName); err == nil { + return f.replayMetaFileLegacy(ctx) + } + + logger.Info("neither wal nor legacy meta file was found, skipping replay", zap.String("fraction", f.BaseFileName)) + return nil +} + +func (f *Active) replayWalFile(ctx context.Context) error { + if f.walReader == nil { + return fmt.Errorf("WAL reader not initialized") + } + + logger.Info("start replaying WAL file...", zap.String("name", f.info.Name())) + + t := time.Now() + + step := f.info.MetaOnDisk / 10 + next := step + + sw := stopwatch.New() + wg := sync.WaitGroup{} + + for entry := range f.walReader.Iter() { + // Check for context cancellation + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if entry.Err != nil { + return entry.Err + } + + if uint64(entry.Offset) > next { + next += step + progress := float64(uint64(entry.Offset)) / float64(f.info.MetaOnDisk) * 100 + logger.Info("replaying batch, meta", + zap.String("name", f.info.Name()), + zap.Int64("from", entry.Offset), + zap.Int64("to", entry.Offset+entry.Size), + zap.Uint64("target", f.info.MetaOnDisk), + util.ZapFloat64WithPrec("progress_percentage", progress, 2), + ) + } + + wg.Add(1) + f.indexer.Index(f, entry.Data, &wg, sw) + } + + wg.Wait() + + tookSeconds := util.DurationToUnit(time.Since(t), "s") + throughputRaw := util.SizeToUnit(f.info.DocsRaw, "mb") / tookSeconds + throughputMeta := util.SizeToUnit(f.info.MetaOnDisk, "mb") / tookSeconds + logger.Info("active fraction replayed", + zap.String("name", f.info.Name()), + zap.Uint32("docs_total", f.info.DocsTotal), + util.ZapUint64AsSizeStr("docs_size", f.info.DocsOnDisk), + util.ZapFloat64WithPrec("took_s", tookSeconds, 1), + util.ZapFloat64WithPrec("throughput_raw_mb_sec", throughputRaw, 1), + util.ZapFloat64WithPrec("throughput_meta_mb_sec", throughputMeta, 1), + ) + return nil +} + +// replayMetaFileLegacy replays legacy *.meta files. Only basic corruption detection support is implemented +func (f *Active) replayMetaFileLegacy(ctx context.Context) error { logger.Info("start replaying...", zap.String("name", f.info.Name())) t := time.Now() @@ -175,7 +280,9 @@ out: offset += metaSize wg.Add(1) - f.indexer.Index(f, meta, &wg, sw) + + metaBlock := storage.PackDocBlockToMetaBlock(meta) + f.indexer.Index(f, metaBlock, &wg, sw) } } @@ -204,7 +311,7 @@ var bulkStagesSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ }, []string{"stage"}) // Append causes data to be written on disk and sends metas to index workers -func (f *Active) Append(docs, metas []byte, wg *sync.WaitGroup) (err error) { +func (f *Active) Append(docs storage.DocBlock, metas storage.MetaBlock, wg *sync.WaitGroup) (err error) { sw := stopwatch.New() m := sw.Start("append") if err = f.writer.Write(docs, metas, sw); err != nil { diff --git a/frac/active_indexer.go b/frac/active_indexer.go index 5cda8f9f..deeb1619 100644 --- a/frac/active_indexer.go +++ b/frac/active_indexer.go @@ -24,7 +24,7 @@ type ActiveIndexer struct { type indexTask struct { Frac *Active - Metas storage.DocBlock + Metas storage.MetaBlock Pos uint64 Wg *sync.WaitGroup } @@ -45,10 +45,10 @@ func NewActiveIndexer(workerCount, chLen int) (*ActiveIndexer, func()) { return &idx, stopIdx } -func (ai *ActiveIndexer) Index(frac *Active, metas []byte, wg *sync.WaitGroup, sw *stopwatch.Stopwatch) { +func (ai *ActiveIndexer) Index(frac *Active, metas storage.MetaBlock, wg *sync.WaitGroup, sw *stopwatch.Stopwatch) { m := sw.Start("send_index_chan") ai.ch <- &indexTask{ - Pos: storage.DocBlock(metas).GetExt2(), + Pos: metas.DocsOffset(), Metas: metas, Frac: frac, Wg: wg, diff --git a/frac/active_indexer_test.go b/frac/active_indexer_test.go index fc2585c6..8a6862d3 100644 --- a/frac/active_indexer_test.go +++ b/frac/active_indexer_test.go @@ -99,7 +99,7 @@ func BenchmarkIndexer(b *testing.B) { bulks := make([][]byte, 0, len(readers)) for _, readNext := range readers { _, _, meta, _ := processor.ProcessBulk(time.Now(), nil, nil, readNext) - bulks = append(bulks, storage.CompressDocBlock(meta, nil, 3)) + bulks = append(bulks, storage.CompressMetaBlock(meta, nil, 3)) } b.StartTimer() diff --git a/frac/active_writer.go b/frac/active_writer.go index 95cf2fdd..52a11428 100644 --- a/frac/active_writer.go +++ b/frac/active_writer.go @@ -9,17 +9,29 @@ import ( type ActiveWriter struct { docs *FileWriter - meta *FileWriter + meta MetaWriter } -func NewActiveWriter(docsFile, metaFile *os.File, docsOffset, metaOffset int64, skipFsync bool) *ActiveWriter { +type MetaWriter interface { + Write(data []byte, sw *stopwatch.Stopwatch) (int64, error) + Stop() +} + +func NewActiveWriter(docsFile, walFile *os.File, docsOffset, walOffset int64, skipFsync bool) *ActiveWriter { + return &ActiveWriter{ + docs: NewFileWriter(docsFile, docsOffset, skipFsync), + meta: storage.NewWalWriter(walFile, walOffset, skipFsync), + } +} + +func NewActiveWriterLegacy(docsFile, metaFile *os.File, docsOffset, metaOffset int64, skipFsync bool) *ActiveWriter { return &ActiveWriter{ docs: NewFileWriter(docsFile, docsOffset, skipFsync), meta: NewFileWriter(metaFile, metaOffset, skipFsync), } } -func (a *ActiveWriter) Write(docs, meta []byte, sw *stopwatch.Stopwatch) error { +func (a *ActiveWriter) Write(docs storage.DocBlock, meta storage.MetaBlock, sw *stopwatch.Stopwatch) error { m := sw.Start("write_docs") offset, err := a.docs.Write(docs, sw) m.Stop() @@ -28,8 +40,7 @@ func (a *ActiveWriter) Write(docs, meta []byte, sw *stopwatch.Stopwatch) error { return err } - storage.DocBlock(meta).SetExt1(uint64(len(docs))) - storage.DocBlock(meta).SetExt2(uint64(offset)) + meta.SetDocsOffset(uint64(offset)) m = sw.Start("write_meta") _, err = a.meta.Write(meta, sw) diff --git a/frac/file_writer.go b/frac/file_writer.go index 9ac5ab9c..683401b7 100644 --- a/frac/file_writer.go +++ b/frac/file_writer.go @@ -6,6 +6,7 @@ import ( "sync/atomic" "github.com/ozontech/seq-db/metric/stopwatch" + "github.com/ozontech/seq-db/storage" ) type writeSyncer interface { @@ -21,6 +22,9 @@ type writeSyncer interface { // is performed, after which all requests receive a response about the successful (or unsuccessful) fsync. // // This results in one fsync system call for several writers performing a write at approximately the same time. +// +// FileWriter always stores data in DocBlock format. If MetaBlock is passed to Write, then it's converted to +// DocBlock. type FileWriter struct { ws writeSyncer offset atomic.Int64 @@ -69,6 +73,12 @@ func (fs *FileWriter) syncLoop() { func (fs *FileWriter) Write(data []byte, sw *stopwatch.Stopwatch) (int64, error) { m := sw.Start("write_duration") + if storage.IsMetaBlock(data) { + // MetaBlock must be converted to DocBock if is written to a legacy WAL meta file (with *.meta suffix) + // This may happen if a new version of store has been deployed while a legacy active fraction with *.meta file exists. + data = storage.PackMetaBlockToDocBlock(data, nil) + } + dataLen := int64(len(data)) offset := fs.offset.Add(dataLen) - dataLen _, err := fs.ws.WriteAt(data, offset) diff --git a/frac/file_writer_test.go b/frac/file_writer_test.go index b72c011b..17680408 100644 --- a/frac/file_writer_test.go +++ b/frac/file_writer_test.go @@ -13,8 +13,10 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/ozontech/seq-db/metric/stopwatch" + "github.com/ozontech/seq-db/storage" ) type testWriterSyncer struct { @@ -262,3 +264,40 @@ func TestSparseWrite(t *testing.T) { e = os.Remove(rf.Name()) assert.NoError(t, e) } + +func TestFileWriterConvertsMetaBlockToDocBlock(t *testing.T) { + f, err := os.Create(t.TempDir() + "/test_metablock.txt") + require.NoError(t, err) + defer f.Close() + + fw := NewFileWriter(f, 0, false) + + originalPayload := []byte("test payload for MetaBlock to DocBlock conversion") + metaBlock := storage.CompressMetaBlock(originalPayload, nil, 3) + metaBlock.SetDocsOffset(12345) + metaBlock.SetVersion(1) + + sw := stopwatch.New() + offset, err := fw.Write(metaBlock, sw) + require.NoError(t, err) + + fw.Stop() + + docBlockSize := storage.DocBlockHeaderLen + metaBlock.Len() + + readBuf := make([]byte, docBlockSize) + bytesRead, err := f.ReadAt(readBuf, offset) + require.NoError(t, err) + require.Equal(t, int(docBlockSize), bytesRead) + readBuf = readBuf[:bytesRead] + + docBlock := storage.DocBlock(readBuf) + + assert.Equal(t, storage.CodecZSTD, docBlock.Codec()) + assert.Equal(t, uint64(len(originalPayload)), docBlock.RawLen()) + assert.Equal(t, uint64(12345), docBlock.GetExt2()) + + decompressed, err := docBlock.DecompressTo(nil) + require.NoError(t, err) + assert.Equal(t, originalPayload, decompressed) +} diff --git a/frac/fraction_test.go b/frac/fraction_test.go index ff4ac602..6ba9dd0b 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1347,7 +1347,7 @@ func (s *FractionTestSuite) TestFractionInfo() { // but if compression/marshalling has changed, expected values can be updated accordingly s.Require().Equal(uint32(5), info.DocsTotal, "doc total doesn't match") // it varies depending on params and docs shuffled - s.Require().True(info.DocsOnDisk > uint64(200) && info.DocsOnDisk < uint64(300), + s.Require().True(info.DocsOnDisk > uint64(200) && info.DocsOnDisk < uint64(350), "doc on disk doesn't match. actual value: %d", info.DocsOnDisk) s.Require().Equal(uint64(583), info.DocsRaw, "doc raw doesn't match") s.Require().Equal(seq.MID(946731625000000000), info.From, "from doesn't match") @@ -1355,7 +1355,7 @@ func (s *FractionTestSuite) TestFractionInfo() { switch s.fraction.(type) { case *Active: - s.Require().True(info.MetaOnDisk >= uint64(250) && info.MetaOnDisk <= uint64(350), + s.Require().True(info.MetaOnDisk >= uint64(250) && info.MetaOnDisk <= uint64(400), "meta on disk doesn't match. actual value: %d", info.MetaOnDisk) s.Require().Equal(uint64(0), info.IndexOnDisk, "index on disk doesn't match") case *Sealed: diff --git a/fracmanager/frac_manifest.go b/fracmanager/frac_manifest.go index eb0eae4a..2a258bda 100644 --- a/fracmanager/frac_manifest.go +++ b/fracmanager/frac_manifest.go @@ -20,7 +20,8 @@ type fracManifest struct { basePath string // base path to fraction files (without extension) hasDocs bool // presence of main documents file hasIndex bool // presence of index file - hasMeta bool // presence of meta-information + hasMeta bool // presence of meta-information (legacy WAL format) + hasWal bool // presence of WAL with meta (new WAL format) hasSdocs bool // presence of sorted documents hasRemote bool // presence of remote fraction @@ -42,6 +43,8 @@ func (m *fracManifest) AddExtension(ext string) error { m.hasDocs = true case consts.MetaFileSuffix: m.hasMeta = true + case consts.WalFileSuffix: + m.hasWal = true case consts.SdocsFileSuffix: m.hasSdocs = true case consts.IndexFileSuffix: @@ -88,7 +91,7 @@ func (m *fracManifest) Stage() fracStage { if m.hasIndex && (m.hasSdocs || m.hasDocs) { return fracStageSealed } - if m.hasMeta && m.hasDocs { + if (m.hasMeta || m.hasWal) && m.hasDocs { return fracStageActive } if m.hasDocsDel || m.hasIndexDel || m.hasSdocsDel { @@ -116,6 +119,10 @@ func removeMeta(m *fracManifest) { util.RemoveFile(m.basePath + consts.MetaFileSuffix) m.hasMeta = false } + if m.hasWal { + util.RemoveFile(m.basePath + consts.WalFileSuffix) + m.hasWal = false + } } func removeIndex(m *fracManifest) { diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index 50d7e1f3..92b2981e 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -109,7 +109,7 @@ func (fm *FracManager) Flags() *StateManager { // Append writes documents and metadata to the active fraction // Implements retry logic in case of fraction sealing during write -func (fm *FracManager) Append(ctx context.Context, docs, metas storage.DocBlock) error { +func (fm *FracManager) Append(ctx context.Context, docs storage.DocBlock, metas storage.MetaBlock) error { for { select { case <-ctx.Done(): diff --git a/indexer/compress.go b/indexer/compress.go index 9ce4f88a..eff18712 100644 --- a/indexer/compress.go +++ b/indexer/compress.go @@ -14,7 +14,7 @@ type DocsMetasCompressor struct { metaCompressLevel int docsBuf storage.DocBlock - metaBuf storage.DocBlock + metaBuf storage.MetaBlock } var compressorPool = sync.Pool{ @@ -42,7 +42,7 @@ func (c *DocsMetasCompressor) CompressDocsAndMetas(docs, meta []byte) { // Compress docs block. c.docsBuf = storage.CompressDocBlock(docs, c.docsBuf, c.docsCompressLevel) // Compress metas block. - c.metaBuf = storage.CompressDocBlock(meta, c.metaBuf, c.metaCompressLevel) + c.metaBuf = storage.CompressMetaBlock(meta, c.metaBuf, c.metaCompressLevel) bulkSizeAfterCompression.Observe(float64(len(c.docsBuf) + len(c.metaBuf))) } diff --git a/indexer/test_doc_provider.go b/indexer/test_doc_provider.go index 0af90dde..257ed115 100644 --- a/indexer/test_doc_provider.go +++ b/indexer/test_doc_provider.go @@ -53,7 +53,7 @@ func (dp *TestDocProvider) TryReset() { } -func (dp *TestDocProvider) Provide() (storage.DocBlock, storage.DocBlock) { +func (dp *TestDocProvider) Provide() (storage.DocBlock, storage.MetaBlock) { c := GetDocsMetasCompressor(-1, -1) c.CompressDocsAndMetas(dp.Docs, dp.Metas) return c.DocsMetas() diff --git a/proxy/bulk/ingestor_test.go b/proxy/bulk/ingestor_test.go index e38a9180..bdd1d8db 100644 --- a/proxy/bulk/ingestor_test.go +++ b/proxy/bulk/ingestor_test.go @@ -430,7 +430,7 @@ func TestProcessDocuments(t *testing.T) { gotDocs = append(gotDocs, docsUnpacker.GetBinary()) } - binaryMetas, err := storage.DocBlock(c.metas).DecompressTo(nil) + binaryMetas, err := storage.MetaBlock(c.metas).DecompressTo(nil) require.NoError(t, err) metasUnpacker := packer.NewBytesUnpacker(binaryMetas) var gotMetas []indexer.MetaData diff --git a/storage/meta_block.go b/storage/meta_block.go new file mode 100644 index 00000000..eba57353 --- /dev/null +++ b/storage/meta_block.go @@ -0,0 +1,215 @@ +package storage + +import ( + "encoding/binary" + "hash/crc32" + + "github.com/ozontech/seq-db/util" + "github.com/ozontech/seq-db/zstd" +) + +const ( + MetaBlockMagic byte = 101 + + offsetMetaBlockMagic = 0 // 1 byte (M) Magic byte (always 101) + offsetMetaBlockVersion = 1 // 1 byte (V) Version + offsetMetaBlockCodec = 2 // 1 byte (C) Codec + offsetMetaBlockLength = 3 // 4 bytes (L) Length of payload + offsetMetaBlockRawLength = 7 // 4 bytes (U) Raw length (after decompression) + offsetMetaBlockPayloadChecksum = 11 // 4 bytes (P) Payload checksum - covers payload only + offsetMetaBlockDocsOffset = 15 // 8 bytes (D) Docs offset + offsetMetaBlockHeaderChecksum = 23 // 4 bytes (H) Header checksum - covers bytes 0-22 + + MetaBlockHeaderLen = 27 + MetaBlockCurrentVersion = uint8(1) +) + +// MetaBlock format: M : V : C : LLLL : UUUU : PPPP : DDDD-DDDD : HHHH +// M = Magic (101), V = Version, C = Codec, L = Length, U = Raw Length, P = Payload Checksum, D = Docs Offset, H = Header Checksum + +type MetaBlock []byte + +func (b MetaBlock) Magic() byte { + return b[offsetMetaBlockMagic] +} + +func (b MetaBlock) Version() uint8 { + return b[offsetMetaBlockVersion] +} + +func (b MetaBlock) SetVersion(version uint8) { + b[offsetMetaBlockVersion] = version +} + +func (b MetaBlock) Codec() Codec { + return Codec(b[offsetMetaBlockCodec]) +} + +func (b MetaBlock) SetCodec(codecVal Codec) { + b[offsetMetaBlockCodec] = byte(codecVal) +} + +func (b MetaBlock) Len() uint32 { + return binary.LittleEndian.Uint32(b[offsetMetaBlockLength:]) +} + +func (b MetaBlock) SetLen(val uint32) { + binary.LittleEndian.PutUint32(b[offsetMetaBlockLength:], val) +} + +func (b MetaBlock) FullLen() uint32 { + return b.Len() + MetaBlockHeaderLen +} + +func (b MetaBlock) CalcLen() { + b.SetLen(uint32(len(b) - MetaBlockHeaderLen)) +} + +func (b MetaBlock) RawLen() uint32 { + return binary.LittleEndian.Uint32(b[offsetMetaBlockRawLength:]) +} + +func (b MetaBlock) SetRawLen(x uint32) { + binary.LittleEndian.PutUint32(b[offsetMetaBlockRawLength:], x) +} + +func (b MetaBlock) PayloadChecksum() uint32 { + return binary.LittleEndian.Uint32(b[offsetMetaBlockPayloadChecksum:]) +} + +func (b MetaBlock) SetPayloadChecksum(x uint32) { + binary.LittleEndian.PutUint32(b[offsetMetaBlockPayloadChecksum:], x) +} + +func (b MetaBlock) CalcPayloadChecksum() { + b.SetPayloadChecksum(crc32.ChecksumIEEE(b.Payload())) +} + +func (b MetaBlock) HeaderChecksum() uint32 { + return binary.LittleEndian.Uint32(b[offsetMetaBlockHeaderChecksum:]) +} + +func (b MetaBlock) SetHeaderChecksum(x uint32) { + binary.LittleEndian.PutUint32(b[offsetMetaBlockHeaderChecksum:], x) +} + +func (b MetaBlock) CalcHeaderChecksum() { + b.SetHeaderChecksum(crc32.ChecksumIEEE(b[:offsetMetaBlockHeaderChecksum])) +} + +func (b MetaBlock) DocsOffset() uint64 { + return binary.LittleEndian.Uint64(b[offsetMetaBlockDocsOffset:]) +} + +// SetDocsOffset updates docs offset. It will also recalc header checksum (cheap). +func (b MetaBlock) SetDocsOffset(x uint64) { + binary.LittleEndian.PutUint64(b[offsetMetaBlockDocsOffset:], x) + b.CalcHeaderChecksum() +} + +func (b MetaBlock) Payload() []byte { + return b[MetaBlockHeaderLen:] +} + +// IsCorrect checks if this is a correct meta block by checking header and payload checksums +func (b MetaBlock) IsCorrect() bool { + return b.IsHeaderCorrect() && b.IsPayloadCorrect() +} + +// IsHeaderCorrect checks if header checksum is correct +func (b MetaBlock) IsHeaderCorrect() bool { + return crc32.ChecksumIEEE(b[:offsetMetaBlockHeaderChecksum]) == b.HeaderChecksum() +} + +// IsPayloadCorrect checks if payload checksum is valid +func (b MetaBlock) IsPayloadCorrect() bool { + return crc32.ChecksumIEEE(b.Payload()) == b.PayloadChecksum() +} + +// IsMetaBlock checks if this data is possibly a meta block. +// Returns true if the data has at least MetaBlockHeaderLen bytes and starts with magic byte. +// This doesn't check for corruption, use IsCorrect() for checksum validation. +func IsMetaBlock(data []byte) bool { + return len(data) >= MetaBlockHeaderLen && data[0] == MetaBlockMagic +} + +func CompressMetaBlock(src []byte, dst MetaBlock, zstdLevel int) MetaBlock { + dst = append(dst[:0], make([]byte, MetaBlockHeaderLen)...) // fill header with zeros for cleanup + dst = zstd.CompressLevel(src, dst, zstdLevel) + + dst[offsetMetaBlockMagic] = MetaBlockMagic + dst.SetVersion(MetaBlockCurrentVersion) + dst.CalcLen() + dst.SetRawLen(uint32(len(src))) + dst.SetCodec(CodecZSTD) + dst.CalcPayloadChecksum() + dst.CalcHeaderChecksum() + + return dst +} + +func PackMetaBlock(payload []byte, dst MetaBlock) MetaBlock { + dst = append(dst[:0], make([]byte, MetaBlockHeaderLen)...) // fill header with zeros for cleanup + dst = append(dst, payload...) + + dst[offsetMetaBlockMagic] = MetaBlockMagic + dst.SetVersion(MetaBlockCurrentVersion) + dst.CalcLen() + dst.SetRawLen(uint32(len(payload))) + dst.SetCodec(CodecNo) + dst.CalcPayloadChecksum() + dst.CalcHeaderChecksum() + + return dst +} + +// PackMetaBlockToDocBlock converts MetaBlock to legacy DocBlock. +func PackMetaBlockToDocBlock(metaBlock MetaBlock, dst DocBlock) DocBlock { + dst = append(dst[:0], make([]byte, DocBlockHeaderLen)...) + dst = append(dst, metaBlock.Payload()...) + + dst.CalcLen() + dst.SetRawLen(uint64(metaBlock.RawLen())) + dst.SetCodec(metaBlock.Codec()) + dst.SetExt2(metaBlock.DocsOffset()) + + return dst +} + +// PackDocBlockToMetaBlock converts DocBlock to MetaBlock in place without copying payload. +// docBlock will be invalid after packing +func PackDocBlockToMetaBlock(docBlock DocBlock) MetaBlock { + rawLen := uint32(docBlock.RawLen()) + codec := docBlock.Codec() + docsOffset := docBlock.GetExt2() + payloadLen := uint32(len(docBlock) - DocBlockHeaderLen) + + const headerDiff = DocBlockHeaderLen - MetaBlockHeaderLen + mb := MetaBlock(docBlock[headerDiff:]) + + mb[offsetMetaBlockMagic] = MetaBlockMagic + mb.SetVersion(MetaBlockCurrentVersion) + mb.SetLen(payloadLen) + mb.SetRawLen(rawLen) + mb.SetCodec(codec) + // write docs offset directly since SetDocsOffset recalculates header checksum + binary.LittleEndian.PutUint64(mb[offsetMetaBlockDocsOffset:], docsOffset) + mb.CalcPayloadChecksum() + mb.CalcHeaderChecksum() + + return mb +} + +// DecompressTo always put the result in `dst` regardless of whether unpacking is required +// or part of the MetaBlock can be enough. +// +// So MetaBlock does not share the same data with `dst` and can be used safely +func (b MetaBlock) DecompressTo(dst []byte) ([]byte, error) { + payload := b.Payload() + if b.Codec() == CodecNo { + dst = util.EnsureSliceSize(dst, int(b.RawLen())) + copy(dst, payload) + return dst, nil + } + return b.Codec().decompressBlock(int(b.RawLen()), payload, dst) +} diff --git a/storage/meta_block_test.go b/storage/meta_block_test.go new file mode 100644 index 00000000..06bf2a7d --- /dev/null +++ b/storage/meta_block_test.go @@ -0,0 +1,74 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCorruptionDetection(t *testing.T) { + payload := []byte("test payload for test of checking corrupted data") + block := CompressMetaBlock(payload, nil, 1) + + assert.True(t, block.IsCorrect()) + + // change 1 byte of data + prev := block[35] + block[35] = 3 + assert.False(t, block.IsCorrect()) + block[35] = prev + assert.True(t, block.IsCorrect()) + + // change type (the first byte) + block[0] = 7 + assert.False(t, block.IsCorrect()) + block[0] = MetaBlockMagic + + // change version (second byte) + block[1] = 233 + assert.False(t, block.IsCorrect()) + block[1] = 1 // TODO use meta block version const + + // change length of block + truncated := block[0 : len(block)-3] + assert.False(t, truncated.IsCorrect()) + + block = append(block, 1, 2, 3) + assert.False(t, block.IsCorrect()) +} + +func TestConvertDocToMetaBlock(t *testing.T) { + payload := []byte("test test payload") + + docBlock := CompressDocBlock(payload, nil, 1) + docBlock.SetExt2(11111) + + metaBlock := PackDocBlockToMetaBlock(docBlock) + + assert.Equal(t, CodecZSTD, metaBlock.Codec()) + assert.Equal(t, uint32(len(payload)), metaBlock.RawLen()) + assert.Equal(t, uint64(11111), metaBlock.DocsOffset()) + assert.True(t, metaBlock.IsCorrect()) + + decompressed, err := metaBlock.DecompressTo(nil) + require.NoError(t, err) + assert.Equal(t, payload, decompressed) +} + +func TestConvertMetaToDocBlock(t *testing.T) { + payload := []byte("test payload data") + + metaBlock := CompressMetaBlock(payload, nil, 1) + metaBlock.SetDocsOffset(22222) + + docBlock := PackMetaBlockToDocBlock(metaBlock, nil) + + assert.Equal(t, CodecZSTD, docBlock.Codec()) + assert.Equal(t, uint64(len(payload)), docBlock.RawLen()) + assert.Equal(t, uint64(22222), docBlock.GetExt2()) + + decompressed, err := docBlock.DecompressTo(nil) + require.NoError(t, err) + assert.Equal(t, payload, decompressed) +} diff --git a/storage/wal_reader.go b/storage/wal_reader.go new file mode 100644 index 00000000..63cf93e8 --- /dev/null +++ b/storage/wal_reader.go @@ -0,0 +1,154 @@ +package storage + +import ( + "encoding/binary" + "errors" + "fmt" + "io" + "iter" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/logger" +) + +type WalRecord struct { + Data MetaBlock + Offset int64 + Size int64 + Err error +} + +type WalReader struct { + limiter *ReadLimiter + reader io.ReaderAt + headerOffset int64 // offset where actual data starts (WALHeaderSize for new format) + baseFileName string +} + +func NewWalReader(limiter *ReadLimiter, reader io.ReaderAt, baseFileName string) (*WalReader, error) { + header := make([]byte, WALHeaderSize) + n, err := limiter.ReadAt(reader, header, 0) + if err != nil && !errors.Is(err, io.EOF) { + return nil, fmt.Errorf("failed to read WAL header: %w", err) + } + if n < WALHeaderSize { + return nil, fmt.Errorf("WAL file too short: expected at least %d bytes, got %d", WALHeaderSize, n) + } + magic := binary.LittleEndian.Uint32(header[0:4]) + if magic != WALMagic { + return nil, fmt.Errorf("invalid WAL magic: expected 0x%X, got 0x%X", WALMagic, magic) + } + version := header[4] + if version != WALVersion1 { + return nil, fmt.Errorf("unknown WAL version: %d (supported: %d)", version, WALVersion1) + } + + return &WalReader{ + limiter: limiter, + reader: reader, + headerOffset: WALHeaderSize, + baseFileName: baseFileName, + }, nil +} + +// Iter iterates through WAL file. Corrupted entries are skipped and never propagated to a client. +// Corruption ranges are logged with "from" and "to" offsets. +func (r *WalReader) Iter() iter.Seq[WalRecord] { + return func(yield func(WalRecord) bool) { + offset := nextBlockOffset(r.headerOffset) + + var corruptionStart int64 = -1 + logCorruptionEnd := func(offset int64) { + if corruptionStart >= 0 { + logger.Error("WAL file corrupted", + zap.String("fraction", r.baseFileName), + zap.Int64("from", corruptionStart), + zap.Int64("to", offset)) + corruptionStart = -1 + } + } + startCorruptionTracking := func(offset int64) { + if corruptionStart < 0 { + corruptionStart = offset + } + } + + for { + headerBuf := make([]byte, MetaBlockHeaderLen) + n, err := r.limiter.ReadAt(r.reader, headerBuf, offset) + + if err != nil && !errors.Is(err, io.EOF) { + logCorruptionEnd(offset) + yield(WalRecord{Offset: offset, Err: err}) + return + } + + if errors.Is(err, io.EOF) || n < MetaBlockHeaderLen { + logCorruptionEnd(offset) + return + } + + if !IsMetaBlock(headerBuf) { + startCorruptionTracking(offset) + offset += BlockAlignment + continue + } + + mb := MetaBlock(headerBuf) + + if !mb.IsHeaderCorrect() { + startCorruptionTracking(offset) + offset += BlockAlignment + continue + } + + // header is correct, try to read the payload + blockLen := int64(mb.FullLen()) + blockBuf := make([]byte, blockLen) + n, err = r.limiter.ReadAt(r.reader, blockBuf, offset) + + if err != nil && !errors.Is(err, io.EOF) { + // this is the last WAL record + // start corruption tracking if not started already and print + startCorruptionTracking(offset) + logCorruptionEnd(offset) + yield(WalRecord{Offset: offset, Err: err}) + return + } + + if errors.Is(err, io.EOF) || int64(n) < blockLen { + startCorruptionTracking(offset) + logCorruptionEnd(offset) + return + } + + mb = blockBuf + + if !mb.IsPayloadCorrect() { + startCorruptionTracking(offset) + offset = nextBlockOffset(offset + blockLen) + continue + } + + logCorruptionEnd(offset) + + entry := WalRecord{ + Data: mb, + Offset: offset, + Size: blockLen, + } + + if !yield(entry) { + return + } + + offset = nextBlockOffset(offset + blockLen) + } + } +} + +// nextBlockOffset aligns provided offset to BlockAlignment +func nextBlockOffset(offset int64) int64 { + return (offset + BlockAlignment - 1) &^ (BlockAlignment - 1) +} diff --git a/storage/wal_writer.go b/storage/wal_writer.go new file mode 100644 index 00000000..f9f0940b --- /dev/null +++ b/storage/wal_writer.go @@ -0,0 +1,165 @@ +package storage + +import ( + "encoding/binary" + "io" + "sync" + "sync/atomic" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/metric/stopwatch" +) + +const ( + // WALMagic is the magic number at the start of WAL files + WALMagic uint32 = 0xFFFFFFFF + // WALVersion1 is the first version of WAL file with CRC32 checksums and 64 byte alignment for blocks. + WALVersion1 uint8 = 1 + // WALCurrentVersion is the current WAL format version. + WALCurrentVersion = WALVersion1 + // WALHeaderSize is the size of the WAL header in bytes (4 bytes magic + 1 byte version). 59 bytes are also reserved + // due to alignment + WALHeaderSize = 5 + // BlockAlignment is the alignment boundary for blocks in the new WAL format. Must be greater than + // MetaBlock header (27 bytes) to prevent header torn writes and allow faster navigation during replay + // of corrupted WAL file + BlockAlignment int64 = 64 +) + +type WriteSyncer interface { + io.ReaderAt + io.WriterAt + Sync() error +} + +// WalWriter writes MetaBlocks to a WAL file with header and 64-byte alignment. +// Format: [Header 5B] [... -> align to 64] [MetaBlock] [... -> align to 64] [MetaBlock] ... +type WalWriter struct { + ws WriteSyncer + offset atomic.Int64 + skipSync bool + + mu sync.Mutex + queue []chan error + notify chan struct{} + + wg sync.WaitGroup +} + +func NewWalWriter(ws WriteSyncer, offset int64, skipSync bool) *WalWriter { + w := &WalWriter{ + ws: ws, + skipSync: skipSync, + notify: make(chan struct{}, 1), + } + + // write a header at the beggining if it's a new file + if offset == 0 { + if err := writeWALHeader(ws); err != nil { + logger.Panic("failed to write WAL header", zap.Error(err)) + } + + if !skipSync { + _ = ws.Sync() + } + w.offset.Store(WALHeaderSize) + } else { + w.offset.Store(nextBlockOffset(offset)) + } + + w.wg.Add(1) + go func() { + w.syncLoop() + w.wg.Done() + }() + + return w +} + +func (w *WalWriter) syncLoop() { + for range w.notify { + w.mu.Lock() + queue := w.queue + w.queue = make([]chan error, 0, len(queue)) + w.mu.Unlock() + + err := w.ws.Sync() + + for _, syncRes := range queue { + syncRes <- err + } + } +} + +// Write writes a MetaBlock to the WAL file. The data must already be a MetaBlock. +// Returns the offset where the MetaBlock starts. +func (w *WalWriter) Write(data []byte, sw *stopwatch.Stopwatch) (int64, error) { + m := sw.Start("write_duration") + + offset := w.reserveSpace(int64(len(data))) + + if _, err := w.ws.WriteAt(data, offset); err != nil { + m.Stop() + return 0, err + } + m.Stop() + + err := w.sync(m, sw) + + return offset, err +} + +// reserveSpace atomically reserves a necessary space and returns the next position where block may be written. The position +// is aligned to BlockAlignment +func (w *WalWriter) reserveSpace(blockSize int64) int64 { + var result int64 + for { + curr := w.offset.Load() + nextSlotOffset := nextBlockOffset(curr) + + if w.offset.CompareAndSwap(curr, nextSlotOffset+blockSize) { + result = nextSlotOffset + break + } + } + return result +} + +func (w *WalWriter) sync(m stopwatch.Metric, sw *stopwatch.Stopwatch) error { + if w.skipSync { + return nil + } + + m = sw.Start("fsync") + + syncRes := make(chan error) + + w.mu.Lock() + w.queue = append(w.queue, syncRes) + size := len(w.queue) + w.mu.Unlock() + + if size == 1 { + w.notify <- struct{}{} + } + + err := <-syncRes + + m.Stop() + return err +} + +func (w *WalWriter) Stop() { + close(w.notify) + w.wg.Wait() +} + +func writeWALHeader(w io.WriterAt) error { + header := make([]byte, WALHeaderSize) + binary.LittleEndian.PutUint32(header[0:4], WALMagic) + header[4] = WALCurrentVersion + _, err := w.WriteAt(header, 0) + return err +} diff --git a/storage/wal_writer_test.go b/storage/wal_writer_test.go new file mode 100644 index 00000000..7a3cb80e --- /dev/null +++ b/storage/wal_writer_test.go @@ -0,0 +1,775 @@ +package storage + +import ( + "errors" + "fmt" + "io" + "math/rand/v2" + "os" + "slices" + "strconv" + "sync" + "testing" + "time" + + "github.com/alecthomas/units" + "github.com/stretchr/testify/assert" + + "github.com/ozontech/seq-db/metric/stopwatch" +) + +type testWriterSyncer struct { + mu sync.RWMutex + in [][]byte + out map[string]struct{} + pause time.Duration + err bool + bytes []byte +} + +func TestFileWriter(t *testing.T) { + ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond} + fw := NewWalWriter(ws, 0, false) + + wg := sync.WaitGroup{} + for range 100 { + wg.Add(1) + go func() { + for range 100 { + sw := stopwatch.New() + k := []byte(strconv.FormatUint(rand.Uint64(), 16)) + _, err := fw.Write(k, sw) + assert.NoError(t, err) + assert.True(t, ws.Check(k)) + } + wg.Done() + }() + } + + wg.Wait() + fw.Stop() +} + +func TestFileWriterNoSync(t *testing.T) { + ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond} + fw := NewWalWriter(ws, 0, true) + + wg := sync.WaitGroup{} + for range 100 { + wg.Add(1) + go func() { + for range 100 { + sw := stopwatch.New() + k := []byte(strconv.FormatUint(rand.Uint64(), 16)) + _, err := fw.Write(k, sw) + assert.NoError(t, err) + assert.False(t, ws.Check(k)) + } + wg.Done() + }() + } + + wg.Wait() + fw.Stop() +} + +func TestFileWriterError(t *testing.T) { + ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond, err: true} + fw := NewWalWriter(ws, 0, false) + + wg := sync.WaitGroup{} + for range 100 { + wg.Add(1) + go func() { + for range 100 { + sw := stopwatch.New() + k := []byte(strconv.FormatUint(rand.Uint64(), 16)) + _, err := fw.Write(k, sw) + assert.Error(t, err) + assert.False(t, ws.Check(k)) + } + wg.Done() + }() + } + + wg.Wait() + fw.Stop() +} + +func (ws *testWriterSyncer) WriteAt(p []byte, off int64) (n int, err error) { + ws.mu.Lock() + defer ws.mu.Unlock() + + ws.in = append(ws.in, p) + + // Extend storage if needed and write data + end := int(off) + len(p) + if end > len(ws.bytes) { + newStorage := make([]byte, end) + copy(newStorage, ws.bytes) + ws.bytes = newStorage + } + copy(ws.bytes[off:], p) + + return len(p), nil +} + +func (ws *testWriterSyncer) ReadAt(p []byte, off int64) (n int, err error) { + ws.mu.RLock() + defer ws.mu.RUnlock() + + if int(off) >= len(ws.bytes) { + return 0, io.EOF + } + + n = copy(p, ws.bytes[off:]) + if n < len(p) { + return n, io.EOF + } + return n, nil +} + +func (ws *testWriterSyncer) Sync() error { + time.Sleep(ws.pause) + + ws.mu.Lock() + defer ws.mu.Unlock() + + if ws.err { + ws.in = nil + return errors.New("test") + } + + for _, val := range ws.in { + ws.out[string(val)] = struct{}{} + } + ws.in = nil + + return nil +} + +func (ws *testWriterSyncer) Check(val []byte) bool { + ws.mu.RLock() + defer ws.mu.RUnlock() + _, ok := ws.out[string(val)] + return ok +} + +type testRandPauseWriterAt struct { + f *os.File +} + +func (w *testRandPauseWriterAt) WriteAt(p []byte, off int64) (n int, err error) { + // random pause + time.Sleep(time.Microsecond * time.Duration(rand.IntN(20))) + return w.f.WriteAt(p, off) +} + +func (w *testRandPauseWriterAt) ReadAt(p []byte, off int64) (n int, err error) { + return w.f.ReadAt(p, off) +} + +func (w *testRandPauseWriterAt) Sync() error { + return w.f.Sync() +} + +func TestConcurrentFileWriting(t *testing.T) { + f, e := os.Create(t.TempDir() + "/test.txt") + assert.NoError(t, e) + + defer f.Close() + + fw := NewWalWriter(&testRandPauseWriterAt{f: f}, 0, true) + + const ( + writersCount = 100 + writesCount = 100 + ) + + type writeSample struct { + offset int64 + payload []byte + } + + wg := sync.WaitGroup{} + samplesQueues := [writersCount][]writeSample{} + + // run writers - write MetaBlocks + for i := range writersCount { + wg.Add(1) + go func() { + defer wg.Done() + + sw := stopwatch.New() + workerName := strconv.Itoa(i) + + for j := range writesCount { + payload := []byte("<" + workerName + "-" + strconv.Itoa(j) + ">") + metaBlock := PackMetaBlock(payload, nil) + offset, e := fw.Write(metaBlock, sw) + assert.NoError(t, e) + + samplesQueues[i] = append(samplesQueues[i], writeSample{payload: payload, offset: offset}) + } + }() + } + + wg.Wait() + + // join and sort all samples by offset + all := make([]writeSample, 0, writersCount*writesCount) + for _, c := range samplesQueues { + all = append(all, c...) + } + slices.SortFunc(all, func(a, b writeSample) int { + if a.offset < b.offset { + return -1 + } + if a.offset > b.offset { + return 1 + } + return 0 + }) + + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") + assert.NoError(t, err) + idx := 0 + for entry := range reader.Iter() { + assert.Equal(t, all[idx].offset, entry.Offset, "block %d offset mismatch", idx) + assert.Equal(t, all[idx].payload, entry.Data.Payload(), "block %d payload mismatch", idx) + idx++ + } + assert.Equal(t, len(all), idx, "should read all blocks") + + s, e := f.Stat() + assert.NoError(t, e) + fmt.Printf("File size: %d bytes, %d blocks written\n", s.Size(), len(all)) + + e = os.Remove(f.Name()) + assert.NoError(t, e) +} + +func TestSparseWrite(t *testing.T) { + wf, e := os.Create(t.TempDir() + "/test.txt") + assert.NoError(t, e) + + _, e = wf.WriteAt([]byte("333"), 30) + assert.NoError(t, e) + + _, e = wf.WriteAt([]byte("222"), 20) + assert.NoError(t, e) + + _, e = wf.WriteAt([]byte("111"), 10) + assert.NoError(t, e) + + e = wf.Close() + assert.NoError(t, e) + + rf, e := os.Open(wf.Name()) + buf := make([]byte, 33) + assert.NoError(t, e) + + n, e := rf.Read(buf) + assert.NoError(t, e) + assert.Equal(t, len(buf), n) + + expected := []byte("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00111\x00\x00\x00\x00\x00\x00\x00222\x00\x00\x00\x00\x00\x00\x00333") + assert.Equal(t, expected, buf) + + n, e = rf.Read(buf) + assert.Error(t, e) + assert.Equal(t, 0, n) + assert.ErrorIs(t, e, io.EOF) + + e = rf.Close() + assert.NoError(t, e) + + e = os.Remove(rf.Name()) + assert.NoError(t, e) +} + +func TestWalWriterWriteAndRead(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "wal-test-*.bin") + assert.NoError(t, err) + defer f.Close() + + fw := NewWalWriter(f, 0, false) + + payloads := [][]byte{ + []byte("block one"), + []byte("block two with more data"), + []byte("block three"), + []byte("fourth block here"), + []byte("and the fifth block"), + } + + offsets := make([]int64, len(payloads)) + sw := stopwatch.New() + + for i, payload := range payloads { + metaBlock := PackMetaBlock(payload, nil) + offset, err := fw.Write(metaBlock, sw) + assert.NoError(t, err) + offsets[i] = offset + } + + fw.Stop() + + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") + assert.NoError(t, err) + count := 0 + for entry := range reader.Iter() { + assert.Equal(t, offsets[count], entry.Offset, "block %d offset mismatch", count) + assert.Equal(t, payloads[count], entry.Data.Payload(), "block %d payload mismatch", count) + assert.Equal(t, MetaBlockMagic, entry.Data.Magic(), "block %d should have MetaBlock magic", count) + count++ + } + assert.Equal(t, len(payloads), count, "should read all blocks") +} + +func TestWalReaderIteratorEmptyFile(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "wal-iter-*.bin") + assert.NoError(t, err) + defer f.Close() + + fw := NewWalWriter(f, 0, false) + fw.Stop() + + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") + assert.NoError(t, err) + + count := 0 + for range reader.Iter() { + count++ + } + assert.Equal(t, 0, count) +} + +func TestWalReaderInvalidMagic(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "wal-invalid-magic-*.bin") + assert.NoError(t, err) + defer f.Close() + + // write invalid magic (not 0xFFFFFFFF) + header := make([]byte, WALHeaderSize) + header[0] = 0x00 + header[1] = 0x00 + header[2] = 0x00 + header[3] = 0x00 + header[4] = WALVersion1 + _, err = f.WriteAt(header, 0) + assert.NoError(t, err) + + _, err = NewWalReader(NewReadLimiter(1, nil), f, "") + assert.Error(t, err) + assert.Contains(t, err.Error(), "invalid WAL magic") +} + +func TestWalReaderUnknownVersion(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "wal-unknown-version-*.bin") + assert.NoError(t, err) + defer f.Close() + + // write correct magic but unknown version + header := make([]byte, WALHeaderSize) + header[0] = 0xFF + header[1] = 0xFF + header[2] = 0xFF + header[3] = 0xFF + header[4] = 99 // unknown version + _, err = f.WriteAt(header, 0) + assert.NoError(t, err) + + _, err = NewWalReader(NewReadLimiter(1, nil), f, "") + assert.Error(t, err) + assert.Contains(t, err.Error(), "unknown WAL version") +} + +func TestWalReaderFileTooShort(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "wal-too-short-*.bin") + assert.NoError(t, err) + defer f.Close() + + // write only 3 bytes (less than WALHeaderSize which is 5) + _, err = f.WriteAt([]byte{0xFF, 0xFF, 0xFF}, 0) + assert.NoError(t, err) + + _, err = NewWalReader(NewReadLimiter(1, nil), f, "") + assert.Error(t, err) + assert.Contains(t, err.Error(), "WAL file too short") +} + +func TestWalReaderIterator(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "wal-iter-*.bin") + assert.NoError(t, err) + defer f.Close() + + fw := NewWalWriter(f, 0, false) + + payloads := [][]byte{ + []byte("first payload data"), + []byte("second payload with more content here"), + []byte("third"), + []byte("fourth payload block"), + []byte("fifth and final payload"), + } + + sw := stopwatch.New() + var expectedOffsets []int64 + + for _, payload := range payloads { + metaBlock := PackMetaBlock(payload, nil) + offset, err := fw.Write(metaBlock, sw) + assert.NoError(t, err) + expectedOffsets = append(expectedOffsets, offset) + } + fw.Stop() + + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") + assert.NoError(t, err) + + var readPayloads [][]byte + var readOffsets []int64 + for entry := range reader.Iter() { + readPayloads = append(readPayloads, entry.Data.Payload()) + readOffsets = append(readOffsets, entry.Offset) + } + + assert.Equal(t, len(payloads), len(readPayloads)) + for i, expected := range payloads { + assert.Equal(t, expected, readPayloads[i], "block %d payload doesn't match", i) + assert.Equal(t, expectedOffsets[i], readOffsets[i], "block %d offset doesn't match", i) + } +} + +// TestWalReaderSkipsCorruptedBlocks tests very simple single byte corruption in block header +func TestWalReaderSkipsCorruptedBlocks(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "wal-corrupted-*.bin") + assert.NoError(t, err) + defer f.Close() + + fw := NewWalWriter(f, 0, false) + + payloads := [][]byte{ + []byte("block one"), + []byte("block two - will be corrupted"), + []byte("block three"), + []byte("block four"), + } + + sw := stopwatch.New() + var offsets []int64 + + for _, payload := range payloads { + metaBlock := PackMetaBlock(payload, nil) + offset, err := fw.Write(metaBlock, sw) + assert.NoError(t, err) + offsets = append(offsets, offset) + } + fw.Stop() + + // corrupt block 2 (index 1) by flipping a byte in the header checksum + corruptOffset := offsets[1] + offsetMetaBlockHeaderChecksum + _, err = f.WriteAt([]byte{0xFF}, corruptOffset) + assert.NoError(t, err) + t.Logf("corrupted header checksum at offset %d", corruptOffset) + + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") + assert.NoError(t, err) + + var readPayloads [][]byte + for entry := range reader.Iter() { + readPayloads = append(readPayloads, entry.Data.Payload()) + t.Logf("read block at offset %d: %q", entry.Offset, entry.Data.Payload()) + } + + assert.Equal(t, 3, len(readPayloads), "should recover 3 out of 4 blocks") + assert.Equal(t, payloads[0], readPayloads[0], "first block should match") + assert.Equal(t, payloads[2], readPayloads[1], "third block should match") + assert.Equal(t, payloads[3], readPayloads[2], "fourth block should match") +} + +// TestWalReaderSkipsCorruptedPayload tests very simple single byte corruption +func TestWalReaderSkipsCorruptedPayload(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "wal-payload-corrupt-*.bin") + assert.NoError(t, err) + defer f.Close() + + fw := NewWalWriter(f, 0, false) + + payloads := [][]byte{ + []byte("first block"), + []byte("second block - payload will be corrupted"), + []byte("third block"), + } + + sw := stopwatch.New() + var offsets []int64 + + for _, payload := range payloads { + metaBlock := PackMetaBlock(payload, nil) + offset, err := fw.Write(metaBlock, sw) + assert.NoError(t, err) + offsets = append(offsets, offset) + } + fw.Stop() + + payloadOffset := offsets[1] + MetaBlockHeaderLen + 5 // corrupt somewhere in payload + _, err = f.WriteAt([]byte{0xFF, 0xFF}, payloadOffset) + assert.NoError(t, err) + + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") + assert.NoError(t, err) + + var readPayloads [][]byte + for entry := range reader.Iter() { + readPayloads = append(readPayloads, entry.Data.Payload()) + } + + assert.Equal(t, 2, len(readPayloads), "should recover 2 out of 3 blocks") + assert.Equal(t, payloads[0], readPayloads[0], "first block should match") + assert.Equal(t, payloads[2], readPayloads[1], "third block should match") +} + +// TestWalReaderSingleByteCorruption tests flipping a random byte in WAL file and verifying that we never +// lose more than a single block on replay +func TestWalReaderSingleByteCorruption(t *testing.T) { + const ( + numRuns = 100 + numBlocks = 100 + minPayloadLen = 10 + maxPayloadLen = int(10 * units.KiB) + ) + + totalLostBlocks := 0 + + for run := range numRuns { + f, err := os.CreateTemp(t.TempDir(), fmt.Sprintf("wal-corruption-%d-*.bin", run)) + assert.NoError(t, err) + + fw := NewWalWriter(f, 0, false) + sw := stopwatch.New() + + blocks := make([]MetaBlock, 0) + + // write blocks to WAL + for i := range numBlocks { + payloadLen := minPayloadLen + rand.IntN(maxPayloadLen-minPayloadLen+1) + payload := make([]byte, payloadLen) + + for j := range payload { + payload[j] = byte(rand.IntN(256)) + } + // store the first byte as index of block + payload[0] = byte(i) + + metaBlock := PackMetaBlock(payload, nil) + blocks = append(blocks, metaBlock) + _, err = fw.Write(metaBlock, sw) + assert.NoError(t, err) + } + fw.Stop() + + stat, err := f.Stat() + assert.NoError(t, err) + fileSize := stat.Size() + + // flip a random byte at random offset + // we do not corrupt the first 5 bytes - WAL header, other bytes might be corrupted including block headers + corruptOffset := int64(WALHeaderSize) + rand.Int64N(fileSize-WALHeaderSize) + + originalByte := make([]byte, 1) + _, err = f.ReadAt(originalByte, corruptOffset) + assert.NoError(t, err) + corruptedByte := originalByte[0] ^ 0xFF + _, err = f.WriteAt([]byte{corruptedByte}, corruptOffset) + assert.NoError(t, err) + + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") + assert.NoError(t, err) + + readBlocks := 0 + + for entry := range reader.Iter() { + assert.NoError(t, entry.Err) + assert.True(t, entry.Data.IsCorrect()) + + expected := blocks[int(entry.Data.Payload()[0])] + assert.Equal(t, expected, entry.Data, "meta block content doesn't match") + readBlocks++ + } + + lostCount := numBlocks - readBlocks + totalLostBlocks += lostCount + + if lostCount > 1 { + assert.Fail(t, "lost %d blocks", lostCount) + } + + err = f.Close() + assert.NoError(t, err) + } + + assert.NotZero(t, totalLostBlocks, "have not missed a single block across 100 runs") +} + +// TestWalReaderTruncation tests that we can iterate through truncated WAL file. +func TestWalReaderTruncation(t *testing.T) { + const ( + numRuns = 100 + numBlocks = 100 + minPayloadLen = 512 + maxPayloadLen = int(10 * units.KiB) + ) + + for run := range numRuns { + f, err := os.CreateTemp(t.TempDir(), fmt.Sprintf("wal-truncate-%d-*.bin", run)) + assert.NoError(t, err) + + fw := NewWalWriter(f, 0, false) + sw := stopwatch.New() + + blocks := make([]MetaBlock, 0) + offsets := make([]int64, 0) + + // write blocks to WAL + for i := range numBlocks { + payloadLen := minPayloadLen + rand.IntN(maxPayloadLen-minPayloadLen+1) + payload := make([]byte, payloadLen) + + for j := range payload { + payload[j] = byte(rand.IntN(256)) + } + // store the first byte as index of block + payload[0] = byte(i) + + metaBlock := PackMetaBlock(payload, nil) + blocks = append(blocks, metaBlock) + offset, err := fw.Write(metaBlock, sw) + assert.NoError(t, err) + offsets = append(offsets, offset) + } + fw.Stop() + + // choose random block index at which truncation happens + truncateIndex := rand.IntN(numBlocks) + + // this ensures truncation happens somewhere within the chosen block either at the header or the payload + truncateOffset := offsets[truncateIndex] + rand.Int64N(50) + + err = f.Truncate(truncateOffset) + assert.NoError(t, err) + + // validate we can read all blocks from 0 to truncateIndex (exclusive) + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") + assert.NoError(t, err) + + readBlocks := 0 + + for entry := range reader.Iter() { + assert.NoError(t, entry.Err) + assert.True(t, entry.Data.IsCorrect()) + + // verify block index matches expected sequence + blockIndex := int(entry.Data.Payload()[0]) + assert.Equal(t, readBlocks, blockIndex) + + expected := blocks[blockIndex] + assert.Equal(t, expected, entry.Data) + readBlocks++ + } + + // we should have read exactly truncateIndex blocks + assert.Equal(t, truncateIndex, readBlocks) + + err = f.Close() + assert.NoError(t, err) + } +} + +// TestWalReaderSectorLoss tests losing whole disk sectors (512-byte each) +func TestWalReaderSectorLoss(t *testing.T) { + const ( + numRuns = 100 + numBlocks = 100 + minPayloadLen = 128 + maxPayloadLen = int(4 * units.KiB) + sectorSize = 512 + sectorsToCorrupt = 10 + // each disk sector spans 4 blocks at most (each block is at least 128 bytes long) + maxLostBlocks = sectorsToCorrupt * 4 + ) + + for run := range numRuns { + f, err := os.CreateTemp(t.TempDir(), fmt.Sprintf("wal-sector-loss-%d-*.bin", run)) + assert.NoError(t, err) + + fw := NewWalWriter(f, 0, false) + sw := stopwatch.New() + + blocks := make([]MetaBlock, 0) + + // write blocks to WAL + for i := range numBlocks { + payloadLen := minPayloadLen + rand.IntN(maxPayloadLen-minPayloadLen+1) + payload := make([]byte, payloadLen) + + for j := range payload { + payload[j] = byte(rand.IntN(256)) + } + // store the first byte as index of block + payload[0] = byte(i) + + metaBlock := PackMetaBlock(payload, nil) + blocks = append(blocks, metaBlock) + _, err = fw.Write(metaBlock, sw) + assert.NoError(t, err) + } + fw.Stop() + + stat, err := f.Stat() + assert.NoError(t, err) + fileSize := stat.Size() + + numSectors := int(fileSize / sectorSize) + zeroes := make([]byte, sectorSize) + corruptedSectors := make(map[int]bool) + + // zero out 10 random sectors (not the first one which contains WAL header) + for len(corruptedSectors) < sectorsToCorrupt && len(corruptedSectors) < numSectors-1 { + // choose sector index from 1 to numSectors-1 (skip first sector) + idx := 1 + rand.IntN(numSectors-1) + if corruptedSectors[idx] { + continue + } + corruptedSectors[idx] = true + + sectorOffset := int64(idx * sectorSize) + _, err = f.WriteAt(zeroes, sectorOffset) + assert.NoError(t, err) + } + + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") + assert.NoError(t, err) + readBlocks := 0 + + for entry := range reader.Iter() { + assert.NoError(t, entry.Err) + assert.True(t, entry.Data.IsCorrect()) + + // validate payload content + blockIndex := int(entry.Data.Payload()[0]) + expected := blocks[blockIndex] + assert.Equal(t, expected, entry.Data) + readBlocks++ + } + + lostCount := numBlocks - readBlocks + if lostCount > maxLostBlocks { + assert.Fail(t, "lost too much blocks") + } + + err = f.Close() + assert.NoError(t, err) + } +} diff --git a/storeapi/grpc_bulk.go b/storeapi/grpc_bulk.go index 67239edd..1f6ecce9 100644 --- a/storeapi/grpc_bulk.go +++ b/storeapi/grpc_bulk.go @@ -14,6 +14,7 @@ import ( "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/metric" "github.com/ozontech/seq-db/pkg/storeapi" + "github.com/ozontech/seq-db/storage" "github.com/ozontech/seq-db/tracing" ) @@ -64,6 +65,11 @@ func (g *GrpcV1) doBulk(ctx context.Context, req *storeapi.BulkRequest) error { start := time.Now() + if !storage.IsMetaBlock(req.Metas) { + // We use MetaBlock now for metas everywhere, DocBlock might be sent by legacy proxy service. + req.Metas = storage.PackDocBlockToMetaBlock(req.Metas) + } + err := g.fracManager.Append(ctx, req.Docs, req.Metas) if err != nil {