Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions blocksync/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func EncodeMsg(pb proto.Message) ([]byte, error) {
msg.Sum = &bcproto.Message_BlockRequest{BlockRequest: pb}
case *bcproto.BlockResponse:
msg.Sum = &bcproto.Message_BlockResponse{BlockResponse: pb}
case *bcproto.BlockResponseV2:
msg.Sum = &bcproto.Message_BlockResponseV2{BlockResponseV2: pb}
case *bcproto.NoBlockResponse:
msg.Sum = &bcproto.Message_NoBlockResponse{NoBlockResponse: pb}
case *bcproto.StatusRequest:
Expand Down Expand Up @@ -60,6 +62,8 @@ func DecodeMsg(bz []byte) (proto.Message, error) {
return msg.BlockRequest, nil
case *bcproto.Message_BlockResponse:
return msg.BlockResponse, nil
case *bcproto.Message_BlockResponseV2:
return msg.BlockResponseV2, nil
case *bcproto.Message_NoBlockResponse:
return msg.NoBlockResponse, nil
case *bcproto.Message_StatusRequest:
Expand All @@ -83,10 +87,17 @@ func ValidateMsg(pb proto.Message) error {
return errors.New("negative Height")
}
case *bcproto.BlockResponse:
// V1 block format
_, err := types.BlockFromProto(msg.Block)
if err != nil {
return err
}
case *bcproto.BlockResponseV2:
// V2 block format (sequencer mode)
_, err := types.BlockV2FromProto(msg.Block)
if err != nil {
return err
}
case *bcproto.NoBlockResponse:
if msg.Height < 0 {
return errors.New("negative Height")
Expand Down
49 changes: 37 additions & 12 deletions blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,29 @@ func (pool *BlockPool) IsCaughtUp() bool {
return isCaughtUp
}

// MaxPeerHeight returns the highest height reported by any peer.
func (pool *BlockPool) MaxPeerHeight() int64 {
pool.mtx.Lock()
defer pool.mtx.Unlock()
return pool.maxPeerHeight
}

// GetPeerHeight returns the height of a specific peer.
// Returns 0 if peer is not found.
func (pool *BlockPool) GetPeerHeight(peerID p2p.ID) int64 {
pool.mtx.Lock()
defer pool.mtx.Unlock()
if peer, ok := pool.peers[peerID]; ok {
return peer.height
}
return 0
}

// PeekTwoBlocks returns blocks at pool.height and pool.height+1.
// We need to see the second block's Commit to validate the first block.
// So we peek two blocks at a time.
// The caller will verify the commit.
func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
func (pool *BlockPool) PeekTwoBlocks() (first, second types.SyncableBlock) {
pool.mtx.Lock()
defer pool.mtx.Unlock()

Expand Down Expand Up @@ -242,12 +260,15 @@ func (pool *BlockPool) RedoRequest(height int64) p2p.ID {
}

// AddBlock validates that the block comes from the peer it was expected from and calls the requester to store it.
// AddBlock adds a block (either Block or BlockV2) to the pool.
// Uses SyncableBlock interface to handle both types uniformly.
// TODO: ensure that blocks come in order for each peer.
func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int) {
func (pool *BlockPool) AddBlock(peerID p2p.ID, block types.SyncableBlock, blockSize int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()

requester := pool.requesters[block.Height]
height := block.GetHeight()
requester := pool.requesters[height]
if requester == nil {
pool.Logger.Info(
"peer sent us a block we didn't expect",
Expand All @@ -256,8 +277,8 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
"curHeight",
pool.height,
"blockHeight",
block.Height)
diff := pool.height - block.Height
height)
diff := pool.height - height
if diff < 0 {
diff *= -1
}
Expand All @@ -274,16 +295,20 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
peer.decrPending(blockSize)
}
} else {
pool.Logger.Info("invalid peer", "peer", peerID, "blockHeight", block.Height)
pool.Logger.Info("invalid peer", "peer", peerID, "blockHeight", height)
pool.sendError(errors.New("invalid peer"), peerID)
}
}

// MaxPeerHeight returns the highest reported height.
func (pool *BlockPool) MaxPeerHeight() int64 {
// GetPeerIDs returns all peer IDs in the pool.
func (pool *BlockPool) GetPeerIDs() []p2p.ID {
pool.mtx.Lock()
defer pool.mtx.Unlock()
return pool.maxPeerHeight
ids := make([]p2p.ID, 0, len(pool.peers))
for id := range pool.peers {
ids = append(ids, id)
}
return ids
}

// SetPeerRange sets the peer's alleged blockchain base and height.
Expand Down Expand Up @@ -515,7 +540,7 @@ type bpRequester struct {

mtx tmsync.Mutex
peerID p2p.ID
block *types.Block
block types.SyncableBlock // Supports both Block and BlockV2
}

func newBPRequester(pool *BlockPool, height int64) *bpRequester {
Expand All @@ -538,7 +563,7 @@ func (bpr *bpRequester) OnStart() error {
}

// Returns true if the peer matches and block doesn't already exist.
func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool {
func (bpr *bpRequester) setBlock(block types.SyncableBlock, peerID p2p.ID) bool {
bpr.mtx.Lock()
if bpr.block != nil || bpr.peerID != peerID {
bpr.mtx.Unlock()
Expand All @@ -554,7 +579,7 @@ func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool {
return true
}

func (bpr *bpRequester) getBlock() *types.Block {
func (bpr *bpRequester) getBlock() types.SyncableBlock {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
return bpr.block
Expand Down
Loading
Loading