Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions downstreamadapter/dispatchermanager/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,31 +315,23 @@ func (h *HeartBeatResponseHandler) Handle(dispatcherManager *DispatcherManager,
}
case heartbeatpb.InfluenceType_DB:
schemaID := dispatcherStatus.InfluencedDispatchers.SchemaID
excludeDispatcherID := common.NewDispatcherIDFromPB(dispatcherStatus.InfluencedDispatchers.ExcludeDispatcherId)
var dispatcherIds []common.DispatcherID
if common.IsRedoMode(heartbeatResponse.Mode) {
dispatcherIds = dispatcherManager.GetAllRedoDispatchers(schemaID)
} else {
dispatcherIds = dispatcherManager.GetAllDispatchers(schemaID)
}
for _, id := range dispatcherIds {
if id != excludeDispatcherID {
h.dispatcherStatusDynamicStream.Push(id, dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id))
}
h.dispatcherStatusDynamicStream.Push(id, dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id))
}
case heartbeatpb.InfluenceType_All:
excludeDispatcherID := common.NewDispatcherIDFromPB(dispatcherStatus.InfluencedDispatchers.ExcludeDispatcherId)
if common.IsRedoMode(heartbeatResponse.Mode) {
dispatcherManager.GetRedoDispatcherMap().ForEach(func(id common.DispatcherID, _ *dispatcher.RedoDispatcher) {
if id != excludeDispatcherID {
h.dispatcherStatusDynamicStream.Push(id, dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id))
}
h.dispatcherStatusDynamicStream.Push(id, dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id))
})
} else {
dispatcherManager.GetDispatcherMap().ForEach(func(id common.DispatcherID, _ *dispatcher.EventDispatcher) {
if id != excludeDispatcherID {
h.dispatcherStatusDynamicStream.Push(id, dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id))
}
h.dispatcherStatusDynamicStream.Push(id, dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id))
})
}
}
Expand Down
27 changes: 18 additions & 9 deletions maintainer/barrier_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ func (be *BarrierEvent) sendPassAction(mode int64) []*messaging.TargetMessage {
be.rangeChecker.MarkCovered()
return nil
} else {
// writerDispatcher for DB Type is always table trigger dispatcher, so we need to add it too
writerDispatcherTask := be.spanController.GetTaskByID(be.writerDispatcher)
if writerDispatcherTask != nil {
spans = append(spans, writerDispatcherTask)
}

for _, stm := range spans {
nodeID := stm.GetNodeID()
if nodeID == "" {
Expand All @@ -435,9 +441,6 @@ func (be *BarrierEvent) sendPassAction(mode int64) []*messaging.TargetMessage {
for _, stm := range spans {
nodeID := stm.GetNodeID()
dispatcherID := stm.ID
if dispatcherID == be.writerDispatcher {
continue
}
msg, ok := msgMap[nodeID]
if !ok {
msg = be.newPassActionMessage(nodeID, mode)
Expand Down Expand Up @@ -670,17 +673,23 @@ func (be *BarrierEvent) newWriterActionMessage(capture node.ID, mode int64) *mes
}

func (be *BarrierEvent) newPassActionMessage(capture node.ID, mode int64) *messaging.TargetMessage {
influenced := &heartbeatpb.InfluencedDispatchers{
InfluenceType: be.blockedDispatchers.InfluenceType,
SchemaID: be.blockedDispatchers.SchemaID,
}
if be.blockedDispatchers.InfluenceType != heartbeatpb.InfluenceType_Normal {
// ExcludeDispatcherId is deprecated. It is kept only for rolling upgrade compatibility:
// older dispatcher managers unconditionally dereference this field for DB/All types.
// New dispatcher managers should ignore it.
influenced.ExcludeDispatcherId = &heartbeatpb.DispatcherID{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if the older dispatcher manager received the message?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the old server receives the exclueDispatcherID in {}, it will find that no dispatcher matches this ID, so it will not perform any additional operations on this dispatcher. Therefore, there is no difference in behavior between the old and new versions when receiving this message.
Uploading image.png…

}
return messaging.NewSingleTargetMessage(capture, messaging.HeartbeatCollectorTopic,
&heartbeatpb.HeartBeatResponse{
ChangefeedID: be.cfID.ToPB(),
DispatcherStatuses: []*heartbeatpb.DispatcherStatus{
{
Action: be.action(heartbeatpb.Action_Pass),
InfluencedDispatchers: &heartbeatpb.InfluencedDispatchers{
InfluenceType: be.blockedDispatchers.InfluenceType,
SchemaID: be.blockedDispatchers.SchemaID,
ExcludeDispatcherId: be.writerDispatcher.ToPB(),
},
Action: be.action(heartbeatpb.Action_Pass),
InfluencedDispatchers: influenced,
},
},
Mode: mode,
Expand Down
47 changes: 47 additions & 0 deletions maintainer/barrier_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,53 @@ func TestResendAction(t *testing.T) {
require.Equal(t, resp.DispatcherStatuses[0].Action.CommitTs, uint64(10))
}

func TestSendPassActionTypeDBIncludesWriterNode(t *testing.T) {
testutil.SetUpTestServices()
nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)
nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"}
nodeManager.GetAliveNodes()["node2"] = &node.Info{ID: "node2"}

tableTriggerEventDispatcherID := common.NewDispatcherID()
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID,
common.DDLSpanSchemaID,
common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{
ID: tableTriggerEventDispatcherID.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 1,
}, "node2", false)
spanController := span.NewController(cfID, ddlSpan, nil, nil, nil, common.DefaultKeyspaceID, common.DefaultMode)
operatorController := operator.NewOperatorController(cfID, spanController, 1000, common.DefaultMode)

spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1)
spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 2}, 1)
absents := spanController.GetAbsentForTest(100)
for _, stm := range absents {
spanController.BindSpanToNode("", "node1", stm)
spanController.MarkSpanReplicating(stm)
}

event := NewBlockEvent(cfID, tableTriggerEventDispatcherID, spanController, operatorController, &heartbeatpb.State{
IsBlocked: true,
BlockTs: 10,
BlockTables: &heartbeatpb.InfluencedTables{
InfluenceType: heartbeatpb.InfluenceType_DB,
SchemaID: 1,
},
}, false)
event.selected.Store(true)
event.writerDispatcher = tableTriggerEventDispatcherID
event.writerDispatcherAdvanced = true

msgs := event.sendPassAction(common.DefaultMode)
require.Len(t, msgs, 2)
targetNodes := make([]node.ID, 0, len(msgs))
for _, msg := range msgs {
targetNodes = append(targetNodes, msg.To)
}
require.ElementsMatch(t, []node.ID{"node1", "node2"}, targetNodes)
}

func TestUpdateSchemaID(t *testing.T) {
testutil.SetUpTestServices()
tableTriggerEventDispatcherID := common.NewDispatcherID()
Expand Down