diff --git a/downstreamadapter/dispatchermanager/helper.go b/downstreamadapter/dispatchermanager/helper.go index 50e6f3dd9..3b77279b1 100644 --- a/downstreamadapter/dispatchermanager/helper.go +++ b/downstreamadapter/dispatchermanager/helper.go @@ -315,7 +315,6 @@ func (h *HeartBeatResponseHandler) Handle(dispatcherManager *DispatcherManager, } case heartbeatpb.InfluenceType_DB: schemaID := dispatcherStatus.InfluencedDispatchers.SchemaID - excludeDispatcherID := common.NewDispatcherIDFromPB(dispatcherStatus.InfluencedDispatchers.ExcludeDispatcherId) var dispatcherIds []common.DispatcherID if common.IsRedoMode(heartbeatResponse.Mode) { dispatcherIds = dispatcherManager.GetAllRedoDispatchers(schemaID) @@ -323,23 +322,16 @@ func (h *HeartBeatResponseHandler) Handle(dispatcherManager *DispatcherManager, dispatcherIds = dispatcherManager.GetAllDispatchers(schemaID) } for _, id := range dispatcherIds { - if id != excludeDispatcherID { - h.dispatcherStatusDynamicStream.Push(id, dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id)) - } + h.dispatcherStatusDynamicStream.Push(id, dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id)) } case heartbeatpb.InfluenceType_All: - excludeDispatcherID := common.NewDispatcherIDFromPB(dispatcherStatus.InfluencedDispatchers.ExcludeDispatcherId) if common.IsRedoMode(heartbeatResponse.Mode) { dispatcherManager.GetRedoDispatcherMap().ForEach(func(id common.DispatcherID, _ *dispatcher.RedoDispatcher) { - if id != excludeDispatcherID { - h.dispatcherStatusDynamicStream.Push(id, dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id)) - } + h.dispatcherStatusDynamicStream.Push(id, dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id)) }) } else { dispatcherManager.GetDispatcherMap().ForEach(func(id common.DispatcherID, _ *dispatcher.EventDispatcher) { - if id != excludeDispatcherID { - h.dispatcherStatusDynamicStream.Push(id, dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id)) - } + h.dispatcherStatusDynamicStream.Push(id, dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id)) }) } } diff --git a/maintainer/barrier_event.go b/maintainer/barrier_event.go index 853772e7f..e5bf55464 100644 --- a/maintainer/barrier_event.go +++ b/maintainer/barrier_event.go @@ -410,6 +410,12 @@ func (be *BarrierEvent) sendPassAction(mode int64) []*messaging.TargetMessage { be.rangeChecker.MarkCovered() return nil } else { + // writerDispatcher for DB Type is always table trigger dispatcher, so we need to add it too + writerDispatcherTask := be.spanController.GetTaskByID(be.writerDispatcher) + if writerDispatcherTask != nil { + spans = append(spans, writerDispatcherTask) + } + for _, stm := range spans { nodeID := stm.GetNodeID() if nodeID == "" { @@ -435,9 +441,6 @@ func (be *BarrierEvent) sendPassAction(mode int64) []*messaging.TargetMessage { for _, stm := range spans { nodeID := stm.GetNodeID() dispatcherID := stm.ID - if dispatcherID == be.writerDispatcher { - continue - } msg, ok := msgMap[nodeID] if !ok { msg = be.newPassActionMessage(nodeID, mode) @@ -670,17 +673,23 @@ func (be *BarrierEvent) newWriterActionMessage(capture node.ID, mode int64) *mes } func (be *BarrierEvent) newPassActionMessage(capture node.ID, mode int64) *messaging.TargetMessage { + influenced := &heartbeatpb.InfluencedDispatchers{ + InfluenceType: be.blockedDispatchers.InfluenceType, + SchemaID: be.blockedDispatchers.SchemaID, + } + if be.blockedDispatchers.InfluenceType != heartbeatpb.InfluenceType_Normal { + // ExcludeDispatcherId is deprecated. It is kept only for rolling upgrade compatibility: + // older dispatcher managers unconditionally dereference this field for DB/All types. + // New dispatcher managers should ignore it. + influenced.ExcludeDispatcherId = &heartbeatpb.DispatcherID{} + } return messaging.NewSingleTargetMessage(capture, messaging.HeartbeatCollectorTopic, &heartbeatpb.HeartBeatResponse{ ChangefeedID: be.cfID.ToPB(), DispatcherStatuses: []*heartbeatpb.DispatcherStatus{ { - Action: be.action(heartbeatpb.Action_Pass), - InfluencedDispatchers: &heartbeatpb.InfluencedDispatchers{ - InfluenceType: be.blockedDispatchers.InfluenceType, - SchemaID: be.blockedDispatchers.SchemaID, - ExcludeDispatcherId: be.writerDispatcher.ToPB(), - }, + Action: be.action(heartbeatpb.Action_Pass), + InfluencedDispatchers: influenced, }, }, Mode: mode, diff --git a/maintainer/barrier_event_test.go b/maintainer/barrier_event_test.go index a5ff597ec..1401c72d7 100644 --- a/maintainer/barrier_event_test.go +++ b/maintainer/barrier_event_test.go @@ -188,6 +188,53 @@ func TestResendAction(t *testing.T) { require.Equal(t, resp.DispatcherStatuses[0].Action.CommitTs, uint64(10)) } +func TestSendPassActionTypeDBIncludesWriterNode(t *testing.T) { + testutil.SetUpTestServices() + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"} + nodeManager.GetAliveNodes()["node2"] = &node.Info{ID: "node2"} + + tableTriggerEventDispatcherID := common.NewDispatcherID() + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme) + ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID, + common.DDLSpanSchemaID, + common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{ + ID: tableTriggerEventDispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 1, + }, "node2", false) + spanController := span.NewController(cfID, ddlSpan, nil, nil, nil, common.DefaultKeyspaceID, common.DefaultMode) + operatorController := operator.NewOperatorController(cfID, spanController, 1000, common.DefaultMode) + + spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1) + spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 2}, 1) + absents := spanController.GetAbsentForTest(100) + for _, stm := range absents { + spanController.BindSpanToNode("", "node1", stm) + spanController.MarkSpanReplicating(stm) + } + + event := NewBlockEvent(cfID, tableTriggerEventDispatcherID, spanController, operatorController, &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_DB, + SchemaID: 1, + }, + }, false) + event.selected.Store(true) + event.writerDispatcher = tableTriggerEventDispatcherID + event.writerDispatcherAdvanced = true + + msgs := event.sendPassAction(common.DefaultMode) + require.Len(t, msgs, 2) + targetNodes := make([]node.ID, 0, len(msgs)) + for _, msg := range msgs { + targetNodes = append(targetNodes, msg.To) + } + require.ElementsMatch(t, []node.ID{"node1", "node2"}, targetNodes) +} + func TestUpdateSchemaID(t *testing.T) { testutil.SetUpTestServices() tableTriggerEventDispatcherID := common.NewDispatcherID()