From d67f276f61062f59c00aad31bcbbe7c638636278 Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Wed, 18 Feb 2026 11:47:29 +0000 Subject: [PATCH] Handle the sequenced entries with a dynamic size in GCP's `assignEntries()` --- storage/gcp/gcp.go | 83 +++++++++++++++++++++++++++++++---------- storage/gcp/gcp_test.go | 82 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 145 insertions(+), 20 deletions(-) diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 15ca7238..b992e204 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -81,6 +81,11 @@ const ( DefaultIntegrationSizeLimit = 5 * 4096 + // defaultSeqTableMaxBatchByteSize is the default maximum byte size of a batch of entries to be written to the + // "V" field in the "Seq" table. This is set to just under 10 MiB, the maximum size of the Spanner + // BYTES column, with some headroom for the gob encoding. + defaultSeqTableMaxBatchByteSize = 9 << 20 // 9 MiB + // SchemaCompatibilityVersion represents the expected version (e.g. layout & serialisation) of stored data. // // A binary built with a given version of the Tessera library is compatible with stored data created by a different version @@ -707,14 +712,18 @@ func (a *Appender) updateEntryBundles(ctx context.Context, fromSeq uint64, entri type spannerCoordinator struct { dbPool *spanner.Client maxOutstanding uint64 + + // seqTableMaxBatchByteSize is the maximum byte size of a batch of entries to be written to the "V" field in the "Seq" table. + seqTableMaxBatchByteSize int } // newSpannerCoordinator returns a new spannerSequencer struct which uses the provided // spanner resource name for its spanner connection. func newSpannerCoordinator(ctx context.Context, dbPool *spanner.Client, maxOutstanding uint64) (*spannerCoordinator, error) { r := &spannerCoordinator{ - dbPool: dbPool, - maxOutstanding: maxOutstanding, + dbPool: dbPool, + maxOutstanding: maxOutstanding, + seqTableMaxBatchByteSize: defaultSeqTableMaxBatchByteSize, } if err := r.checkDataCompatibility(ctx); err != nil { return nil, fmt.Errorf("schema is not compatible with this version of the Tessera library: %v", err) @@ -823,34 +832,53 @@ func (s *spannerCoordinator) assignEntries(ctx context.Context, entries []*tesse return tessera.ErrPushbackIntegration } + var mutations []*spanner.Mutation next := uint64(next) // Shadow next with a uint64 version of the same value to save on casts. - sequencedEntries := make([]storage.SequencedEntry, len(entries)) + startFrom := next + var sequencedEntries []storage.SequencedEntry + currentBatchByteSize := 0 // Assign provisional sequence numbers to entries. // We need to do this here in order to support serialisations which include the log position. for i, e := range entries { - sequencedEntries[i] = storage.SequencedEntry{ - BundleData: e.MarshalBundleData(next + uint64(i)), + sequencedEntry := storage.SequencedEntry{ + BundleData: e.MarshalBundleData(startFrom + uint64(i)), LeafHash: e.LeafHash(), } - } - // Flatten the entries into a single slice of bytes which we can store in the Seq.v column. - b := &bytes.Buffer{} - e := gob.NewEncoder(b) - if err := e.Encode(sequencedEntries); err != nil { - return fmt.Errorf("failed to serialise batch: %v", err) + // If adding this entry would make the batch too big, we need to flush the original batch. + if len(sequencedEntries) > 0 && currentBatchByteSize+len(sequencedEntry.BundleData) > s.seqTableMaxBatchByteSize { + // Gob-encode the batch of entries and add it to the mutation. + m, err := s.addSeqMutation(next, sequencedEntries) + if err != nil { + return fmt.Errorf("failed to addSeqMutation: %v", err) + } + mutations = append(mutations, m) + next += uint64(len(sequencedEntries)) + + // Reset our batch variables, and clear the batch slice now that it's been added to the + // mutation. + sequencedEntries = nil + currentBatchByteSize = 0 + } + + sequencedEntries = append(sequencedEntries, sequencedEntry) + currentBatchByteSize += len(sequencedEntry.BundleData) } - data := b.Bytes() - num := len(entries) - // TODO(al): think about whether aligning bundles to tile boundaries would be a good idea or not. - m := []*spanner.Mutation{ - // Insert our newly sequenced batch of entries into Seq, - spanner.Insert("Seq", []string{"id", "seq", "v"}, []any{0, int64(next), data}), - // and update the next-available sequence number row in SeqCoord. - spanner.Update("SeqCoord", []string{"id", "next"}, []any{0, int64(next) + int64(num)}), + // Insert the last batch of entries if there are any. + if len(sequencedEntries) > 0 { + m, err := s.addSeqMutation(next, sequencedEntries) + if err != nil { + return fmt.Errorf("failed to addSeqMutation: %v", err) + } + mutations = append(mutations, m) + next += uint64(len(sequencedEntries)) } - if err := txn.BufferWrite(m); err != nil { + + // and update the next-available sequence number row in SeqCoord. + mutations = append(mutations, spanner.Update("SeqCoord", []string{"id", "next"}, []any{0, int64(next)})) + + if err := txn.BufferWrite(mutations); err != nil { return fmt.Errorf("failed to apply TX: %v", err) } @@ -864,6 +892,21 @@ func (s *spannerCoordinator) assignEntries(ctx context.Context, entries []*tesse return nil } +// addSeqMutation returns a mutation to the Seq table for the given sequence number and entries. +// +// The entries are gob-encoded and stored in the V column. +// +// The mutation is not written to the database; it is intended to be passed to a spanner.Transaction +// which will write it to the database. +func (s *spannerCoordinator) addSeqMutation(seq uint64, entries []storage.SequencedEntry) (*spanner.Mutation, error) { + b := &bytes.Buffer{} + if err := gob.NewEncoder(b).Encode(entries); err != nil { + return nil, fmt.Errorf("failed to serialise batch: %v", err) + } + // Insert our newly sequenced batch of entries into Seq. + return spanner.Insert("Seq", []string{"id", "seq", "v"}, []any{0, int64(seq), b.Bytes()}), nil +} + // consumeEntries calls f with previously sequenced entries. // // Once f returns without error, the entries it was called with are considered to have been consumed and are diff --git a/storage/gcp/gcp_test.go b/storage/gcp/gcp_test.go index 491ff77d..18b1f0de 100644 --- a/storage/gcp/gcp_test.go +++ b/storage/gcp/gcp_test.go @@ -39,6 +39,7 @@ import ( "github.com/transparency-dev/tessera/fsck" storage "github.com/transparency-dev/tessera/storage/internal" "golang.org/x/mod/sumdb/note" + "google.golang.org/api/iterator" ) func newSpannerDB(t *testing.T) (*spanner.Client, func()) { @@ -360,6 +361,87 @@ func TestBundleRoundtrip(t *testing.T) { } } +func TestSpannerSequencerAssignEntriesBatchSplitting(t *testing.T) { + ctx := t.Context() + + db, closeDB := newSpannerDB(t) + defer closeDB() + + // We want to create enough entries to exceed this limit. + // Each entry will be ~10KB, so 200 entries will be ~2MiB. + const ( + numEntries = 200 + entrySize = 10 * 1024 + ) + data := bytes.Repeat([]byte("a"), entrySize) + + seq, err := newSpannerCoordinator(ctx, db, uint64(numEntries+1)) + if err != nil { + t.Fatalf("newSpannerCoordinator: %v", err) + } + + seq.seqTableMaxBatchByteSize = 1 << 20 // 1 MiB, set low for testing due to grpc message size limits in the Spanner emulator. + + var entries []*tessera.Entry + for i := range numEntries { + entries = append(entries, tessera.NewEntry(data)) + _ = i + } + + if err := seq.assignEntries(ctx, entries); err != nil { + t.Fatalf("assignEntries: %v", err) + } + + t.Log("assignEntries done") + + // Verify that multiple rows were created in the Seq table. + var rowCount int + iter := db.Single().Read(ctx, "Seq", spanner.AllKeys(), []string{"seq"}) + defer iter.Stop() + for { + _, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + t.Fatalf("Failed to count rows in Seq table: %v", err) + } + rowCount++ + } + + t.Logf("rowCount is %d", rowCount) + + if rowCount <= 1 { + t.Errorf("expected more than 1 row in Seq table, got %d", rowCount) + } + + // Verify that all entries are present and contiguous by consuming them. + seenEntries := 0 + f := func(_ context.Context, fromSeq uint64, batch []storage.SequencedEntry) ([]byte, error) { + t.Logf("consuming from %d, saw %d entries", fromSeq, len(batch)) + if fromSeq != uint64(seenEntries) { + return nil, fmt.Errorf("f called with fromSeq %d, want %d", fromSeq, seenEntries) + } + seenEntries += len(batch) + return fmt.Appendf(nil, "root<%d>", seenEntries), nil + } + + for seenEntries < numEntries { + t.Logf("calling consumeEntries, seen: %d", seenEntries) + more, err := seq.consumeEntries(ctx, 100, f, false) + if err != nil { + t.Fatalf("consumeEntries: %v", err) + } + if !more && seenEntries < numEntries { + t.Fatalf("consumeEntries: more = false but only seen %d/%d entries", seenEntries, numEntries) + } + } + + if seenEntries != numEntries { + t.Errorf("saw %d entries, want %d", seenEntries, numEntries) + } +} + func TestPublishTree(t *testing.T) { ctx := t.Context() for _, test := range []struct {