Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 63 additions & 20 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ const (

DefaultIntegrationSizeLimit = 5 * 4096

// defaultSeqTableMaxBatchByteSize is the default maximum byte size of a batch of entries to be written to the
// "V" field in the "Seq" table. This is set to just under 10 MiB, the maximum size of the Spanner
// BYTES column, with some headroom for the gob encoding.
defaultSeqTableMaxBatchByteSize = 9 << 20 // 9 MiB

// SchemaCompatibilityVersion represents the expected version (e.g. layout & serialisation) of stored data.
//
// A binary built with a given version of the Tessera library is compatible with stored data created by a different version
Expand Down Expand Up @@ -707,14 +712,18 @@ func (a *Appender) updateEntryBundles(ctx context.Context, fromSeq uint64, entri
type spannerCoordinator struct {
dbPool *spanner.Client
maxOutstanding uint64

// seqTableMaxBatchByteSize is the maximum byte size of a batch of entries to be written to the "V" field in the "Seq" table.
seqTableMaxBatchByteSize int
}

// newSpannerCoordinator returns a new spannerSequencer struct which uses the provided
// spanner resource name for its spanner connection.
func newSpannerCoordinator(ctx context.Context, dbPool *spanner.Client, maxOutstanding uint64) (*spannerCoordinator, error) {
r := &spannerCoordinator{
dbPool: dbPool,
maxOutstanding: maxOutstanding,
dbPool: dbPool,
maxOutstanding: maxOutstanding,
seqTableMaxBatchByteSize: defaultSeqTableMaxBatchByteSize,
}
if err := r.checkDataCompatibility(ctx); err != nil {
return nil, fmt.Errorf("schema is not compatible with this version of the Tessera library: %v", err)
Expand Down Expand Up @@ -823,34 +832,53 @@ func (s *spannerCoordinator) assignEntries(ctx context.Context, entries []*tesse
return tessera.ErrPushbackIntegration
}

var mutations []*spanner.Mutation
next := uint64(next) // Shadow next with a uint64 version of the same value to save on casts.
sequencedEntries := make([]storage.SequencedEntry, len(entries))
startFrom := next
var sequencedEntries []storage.SequencedEntry
currentBatchByteSize := 0
// Assign provisional sequence numbers to entries.
// We need to do this here in order to support serialisations which include the log position.
for i, e := range entries {
sequencedEntries[i] = storage.SequencedEntry{
BundleData: e.MarshalBundleData(next + uint64(i)),
sequencedEntry := storage.SequencedEntry{
BundleData: e.MarshalBundleData(startFrom + uint64(i)),
LeafHash: e.LeafHash(),
}
}

// Flatten the entries into a single slice of bytes which we can store in the Seq.v column.
b := &bytes.Buffer{}
e := gob.NewEncoder(b)
if err := e.Encode(sequencedEntries); err != nil {
return fmt.Errorf("failed to serialise batch: %v", err)
// If adding this entry would make the batch too big, we need to flush the original batch.
if len(sequencedEntries) > 0 && currentBatchByteSize+len(sequencedEntry.BundleData) > s.seqTableMaxBatchByteSize {
// Gob-encode the batch of entries and add it to the mutation.
m, err := s.addSeqMutation(next, sequencedEntries)
if err != nil {
return fmt.Errorf("failed to addSeqMutation: %v", err)
}
mutations = append(mutations, m)
next += uint64(len(sequencedEntries))

// Reset our batch variables, and clear the batch slice now that it's been added to the
// mutation.
sequencedEntries = nil
currentBatchByteSize = 0
}

sequencedEntries = append(sequencedEntries, sequencedEntry)
currentBatchByteSize += len(sequencedEntry.BundleData)
}
data := b.Bytes()
num := len(entries)

// TODO(al): think about whether aligning bundles to tile boundaries would be a good idea or not.
m := []*spanner.Mutation{
// Insert our newly sequenced batch of entries into Seq,
spanner.Insert("Seq", []string{"id", "seq", "v"}, []any{0, int64(next), data}),
// and update the next-available sequence number row in SeqCoord.
spanner.Update("SeqCoord", []string{"id", "next"}, []any{0, int64(next) + int64(num)}),
// Insert the last batch of entries if there are any.
if len(sequencedEntries) > 0 {
m, err := s.addSeqMutation(next, sequencedEntries)
if err != nil {
return fmt.Errorf("failed to addSeqMutation: %v", err)
}
mutations = append(mutations, m)
next += uint64(len(sequencedEntries))
}
if err := txn.BufferWrite(m); err != nil {

// and update the next-available sequence number row in SeqCoord.
mutations = append(mutations, spanner.Update("SeqCoord", []string{"id", "next"}, []any{0, int64(next)}))

if err := txn.BufferWrite(mutations); err != nil {
return fmt.Errorf("failed to apply TX: %v", err)
}

Expand All @@ -864,6 +892,21 @@ func (s *spannerCoordinator) assignEntries(ctx context.Context, entries []*tesse
return nil
}

// addSeqMutation returns a mutation to the Seq table for the given sequence number and entries.
//
// The entries are gob-encoded and stored in the V column.
//
// The mutation is not written to the database; it is intended to be passed to a spanner.Transaction
// which will write it to the database.
func (s *spannerCoordinator) addSeqMutation(seq uint64, entries []storage.SequencedEntry) (*spanner.Mutation, error) {
b := &bytes.Buffer{}
if err := gob.NewEncoder(b).Encode(entries); err != nil {
return nil, fmt.Errorf("failed to serialise batch: %v", err)
}
// Insert our newly sequenced batch of entries into Seq.
return spanner.Insert("Seq", []string{"id", "seq", "v"}, []any{0, int64(seq), b.Bytes()}), nil
}

// consumeEntries calls f with previously sequenced entries.
//
// Once f returns without error, the entries it was called with are considered to have been consumed and are
Expand Down
82 changes: 82 additions & 0 deletions storage/gcp/gcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/transparency-dev/tessera/fsck"
storage "github.com/transparency-dev/tessera/storage/internal"
"golang.org/x/mod/sumdb/note"
"google.golang.org/api/iterator"
)

func newSpannerDB(t *testing.T) (*spanner.Client, func()) {
Expand Down Expand Up @@ -360,6 +361,87 @@ func TestBundleRoundtrip(t *testing.T) {
}
}

func TestSpannerSequencerAssignEntriesBatchSplitting(t *testing.T) {
ctx := t.Context()

db, closeDB := newSpannerDB(t)
defer closeDB()

// We want to create enough entries to exceed this limit.
// Each entry will be ~10KB, so 200 entries will be ~2MiB.
const (
numEntries = 200
entrySize = 10 * 1024
)
data := bytes.Repeat([]byte("a"), entrySize)

seq, err := newSpannerCoordinator(ctx, db, uint64(numEntries+1))
if err != nil {
t.Fatalf("newSpannerCoordinator: %v", err)
}

seq.seqTableMaxBatchByteSize = 1 << 20 // 1 MiB, set low for testing due to grpc message size limits in the Spanner emulator.

var entries []*tessera.Entry
for i := range numEntries {
entries = append(entries, tessera.NewEntry(data))
_ = i
}

if err := seq.assignEntries(ctx, entries); err != nil {
t.Fatalf("assignEntries: %v", err)
}

t.Log("assignEntries done")

// Verify that multiple rows were created in the Seq table.
var rowCount int
iter := db.Single().Read(ctx, "Seq", spanner.AllKeys(), []string{"seq"})
defer iter.Stop()
for {
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
t.Fatalf("Failed to count rows in Seq table: %v", err)
}
rowCount++
}

t.Logf("rowCount is %d", rowCount)

if rowCount <= 1 {
t.Errorf("expected more than 1 row in Seq table, got %d", rowCount)
}

// Verify that all entries are present and contiguous by consuming them.
seenEntries := 0
f := func(_ context.Context, fromSeq uint64, batch []storage.SequencedEntry) ([]byte, error) {
t.Logf("consuming from %d, saw %d entries", fromSeq, len(batch))
if fromSeq != uint64(seenEntries) {
return nil, fmt.Errorf("f called with fromSeq %d, want %d", fromSeq, seenEntries)
}
seenEntries += len(batch)
return fmt.Appendf(nil, "root<%d>", seenEntries), nil
}

for seenEntries < numEntries {
t.Logf("calling consumeEntries, seen: %d", seenEntries)
more, err := seq.consumeEntries(ctx, 100, f, false)
if err != nil {
t.Fatalf("consumeEntries: %v", err)
}
if !more && seenEntries < numEntries {
t.Fatalf("consumeEntries: more = false but only seen %d/%d entries", seenEntries, numEntries)
}
}

if seenEntries != numEntries {
t.Errorf("saw %d entries, want %d", seenEntries, numEntries)
}
}

func TestPublishTree(t *testing.T) {
ctx := t.Context()
for _, test := range []struct {
Expand Down
Loading