From 606618ce3c5fbbafef34d5634d2f555898ba4c05 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Fri, 9 Jan 2026 16:42:06 +0800 Subject: [PATCH 1/4] update --- downstreamadapter/dispatchermanager/helper.go | 14 ++---- maintainer/barrier_event.go | 26 ++++++---- maintainer/barrier_event_test.go | 47 +++++++++++++++++++ 3 files changed, 67 insertions(+), 20 deletions(-) diff --git a/downstreamadapter/dispatchermanager/helper.go b/downstreamadapter/dispatchermanager/helper.go index 50e6f3dd9e..3b77279b17 100644 --- a/downstreamadapter/dispatchermanager/helper.go +++ b/downstreamadapter/dispatchermanager/helper.go @@ -315,7 +315,6 @@ func (h *HeartBeatResponseHandler) Handle(dispatcherManager *DispatcherManager, } case heartbeatpb.InfluenceType_DB: schemaID := dispatcherStatus.InfluencedDispatchers.SchemaID - excludeDispatcherID := common.NewDispatcherIDFromPB(dispatcherStatus.InfluencedDispatchers.ExcludeDispatcherId) var dispatcherIds []common.DispatcherID if common.IsRedoMode(heartbeatResponse.Mode) { dispatcherIds = dispatcherManager.GetAllRedoDispatchers(schemaID) @@ -323,23 +322,16 @@ func (h *HeartBeatResponseHandler) Handle(dispatcherManager *DispatcherManager, dispatcherIds = dispatcherManager.GetAllDispatchers(schemaID) } for _, id := range dispatcherIds { - if id != excludeDispatcherID { - h.dispatcherStatusDynamicStream.Push(id, dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id)) - } + h.dispatcherStatusDynamicStream.Push(id, dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id)) } case heartbeatpb.InfluenceType_All: - excludeDispatcherID := common.NewDispatcherIDFromPB(dispatcherStatus.InfluencedDispatchers.ExcludeDispatcherId) if common.IsRedoMode(heartbeatResponse.Mode) { dispatcherManager.GetRedoDispatcherMap().ForEach(func(id common.DispatcherID, _ *dispatcher.RedoDispatcher) { - if id != excludeDispatcherID { - h.dispatcherStatusDynamicStream.Push(id, dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id)) - } + h.dispatcherStatusDynamicStream.Push(id, dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id)) }) } else { dispatcherManager.GetDispatcherMap().ForEach(func(id common.DispatcherID, _ *dispatcher.EventDispatcher) { - if id != excludeDispatcherID { - h.dispatcherStatusDynamicStream.Push(id, dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id)) - } + h.dispatcherStatusDynamicStream.Push(id, dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id)) }) } } diff --git a/maintainer/barrier_event.go b/maintainer/barrier_event.go index 853772e7fc..a871f8060e 100644 --- a/maintainer/barrier_event.go +++ b/maintainer/barrier_event.go @@ -14,6 +14,7 @@ package maintainer import ( + "sync" "time" "github.com/pingcap/log" @@ -32,6 +33,8 @@ import ( "go.uber.org/zap" ) +var logExcludeDispatcherIDDeprecatedOnce sync.Once + // BarrierEvent is a barrier event that reported by dispatchers, note is a block multiple dispatchers // all of these dispatchers should report the same event type BarrierEvent struct { @@ -405,6 +408,8 @@ func (be *BarrierEvent) sendPassAction(mode int64) []*messaging.TargetMessage { switch be.blockedDispatchers.InfluenceType { case heartbeatpb.InfluenceType_DB: spans := be.spanController.GetTasksBySchemaID(be.blockedDispatchers.SchemaID) + // writerDispatcher for DB Type is always table trigger dispatcher, so we need to add it too + spans = append(spans, be.spanController.GetTaskByID(be.writerDispatcher)) if len(spans) == 0 { // means tables are removed, mark the event done be.rangeChecker.MarkCovered() @@ -435,9 +440,6 @@ func (be *BarrierEvent) sendPassAction(mode int64) []*messaging.TargetMessage { for _, stm := range spans { nodeID := stm.GetNodeID() dispatcherID := stm.ID - if dispatcherID == be.writerDispatcher { - continue - } msg, ok := msgMap[nodeID] if !ok { msg = be.newPassActionMessage(nodeID, mode) @@ -670,17 +672,23 @@ func (be *BarrierEvent) newWriterActionMessage(capture node.ID, mode int64) *mes } func (be *BarrierEvent) newPassActionMessage(capture node.ID, mode int64) *messaging.TargetMessage { + influenced := &heartbeatpb.InfluencedDispatchers{ + InfluenceType: be.blockedDispatchers.InfluenceType, + SchemaID: be.blockedDispatchers.SchemaID, + } + if be.blockedDispatchers.InfluenceType != heartbeatpb.InfluenceType_Normal { + // ExcludeDispatcherId is deprecated. It is kept only for rolling upgrade compatibility: + // older dispatcher managers unconditionally dereference this field for DB/All types. + // New dispatcher managers should ignore it. + influenced.ExcludeDispatcherId = &heartbeatpb.DispatcherID{} + } return messaging.NewSingleTargetMessage(capture, messaging.HeartbeatCollectorTopic, &heartbeatpb.HeartBeatResponse{ ChangefeedID: be.cfID.ToPB(), DispatcherStatuses: []*heartbeatpb.DispatcherStatus{ { - Action: be.action(heartbeatpb.Action_Pass), - InfluencedDispatchers: &heartbeatpb.InfluencedDispatchers{ - InfluenceType: be.blockedDispatchers.InfluenceType, - SchemaID: be.blockedDispatchers.SchemaID, - ExcludeDispatcherId: be.writerDispatcher.ToPB(), - }, + Action: be.action(heartbeatpb.Action_Pass), + InfluencedDispatchers: influenced, }, }, Mode: mode, diff --git a/maintainer/barrier_event_test.go b/maintainer/barrier_event_test.go index a5ff597ec7..1401c72d76 100644 --- a/maintainer/barrier_event_test.go +++ b/maintainer/barrier_event_test.go @@ -188,6 +188,53 @@ func TestResendAction(t *testing.T) { require.Equal(t, resp.DispatcherStatuses[0].Action.CommitTs, uint64(10)) } +func TestSendPassActionTypeDBIncludesWriterNode(t *testing.T) { + testutil.SetUpTestServices() + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"} + nodeManager.GetAliveNodes()["node2"] = &node.Info{ID: "node2"} + + tableTriggerEventDispatcherID := common.NewDispatcherID() + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme) + ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID, + common.DDLSpanSchemaID, + common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{ + ID: tableTriggerEventDispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 1, + }, "node2", false) + spanController := span.NewController(cfID, ddlSpan, nil, nil, nil, common.DefaultKeyspaceID, common.DefaultMode) + operatorController := operator.NewOperatorController(cfID, spanController, 1000, common.DefaultMode) + + spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1) + spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 2}, 1) + absents := spanController.GetAbsentForTest(100) + for _, stm := range absents { + spanController.BindSpanToNode("", "node1", stm) + spanController.MarkSpanReplicating(stm) + } + + event := NewBlockEvent(cfID, tableTriggerEventDispatcherID, spanController, operatorController, &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_DB, + SchemaID: 1, + }, + }, false) + event.selected.Store(true) + event.writerDispatcher = tableTriggerEventDispatcherID + event.writerDispatcherAdvanced = true + + msgs := event.sendPassAction(common.DefaultMode) + require.Len(t, msgs, 2) + targetNodes := make([]node.ID, 0, len(msgs)) + for _, msg := range msgs { + targetNodes = append(targetNodes, msg.To) + } + require.ElementsMatch(t, []node.ID{"node1", "node2"}, targetNodes) +} + func TestUpdateSchemaID(t *testing.T) { testutil.SetUpTestServices() tableTriggerEventDispatcherID := common.NewDispatcherID() From 66b6b9de54c5d45a23b275113f171f188a85f557 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Fri, 9 Jan 2026 16:43:36 +0800 Subject: [PATCH 2/4] update --- maintainer/barrier_event.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/maintainer/barrier_event.go b/maintainer/barrier_event.go index a871f8060e..0b57e7105c 100644 --- a/maintainer/barrier_event.go +++ b/maintainer/barrier_event.go @@ -14,7 +14,6 @@ package maintainer import ( - "sync" "time" "github.com/pingcap/log" @@ -33,8 +32,6 @@ import ( "go.uber.org/zap" ) -var logExcludeDispatcherIDDeprecatedOnce sync.Once - // BarrierEvent is a barrier event that reported by dispatchers, note is a block multiple dispatchers // all of these dispatchers should report the same event type BarrierEvent struct { From d3534eaf4cb60630fc597c1d9233ddacc0b9fee8 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Fri, 9 Jan 2026 21:30:26 +0800 Subject: [PATCH 3/4] update --- maintainer/barrier_event.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/maintainer/barrier_event.go b/maintainer/barrier_event.go index 0b57e7105c..2a86f6e5ba 100644 --- a/maintainer/barrier_event.go +++ b/maintainer/barrier_event.go @@ -406,7 +406,10 @@ func (be *BarrierEvent) sendPassAction(mode int64) []*messaging.TargetMessage { case heartbeatpb.InfluenceType_DB: spans := be.spanController.GetTasksBySchemaID(be.blockedDispatchers.SchemaID) // writerDispatcher for DB Type is always table trigger dispatcher, so we need to add it too - spans = append(spans, be.spanController.GetTaskByID(be.writerDispatcher)) + writerDispatcherTask := be.spanController.GetTaskByID(be.writerDispatcher) + if writerDispatcherTask != nil { + spans = append(spans, writerDispatcherTask) + } if len(spans) == 0 { // means tables are removed, mark the event done be.rangeChecker.MarkCovered() From e9c405f2a07812706c015063c455d9a9f8a8e4bf Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Sun, 11 Jan 2026 16:44:49 +0800 Subject: [PATCH 4/4] update --- maintainer/barrier_event.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/maintainer/barrier_event.go b/maintainer/barrier_event.go index 2a86f6e5ba..e5bf55464a 100644 --- a/maintainer/barrier_event.go +++ b/maintainer/barrier_event.go @@ -405,16 +405,17 @@ func (be *BarrierEvent) sendPassAction(mode int64) []*messaging.TargetMessage { switch be.blockedDispatchers.InfluenceType { case heartbeatpb.InfluenceType_DB: spans := be.spanController.GetTasksBySchemaID(be.blockedDispatchers.SchemaID) - // writerDispatcher for DB Type is always table trigger dispatcher, so we need to add it too - writerDispatcherTask := be.spanController.GetTaskByID(be.writerDispatcher) - if writerDispatcherTask != nil { - spans = append(spans, writerDispatcherTask) - } if len(spans) == 0 { // means tables are removed, mark the event done be.rangeChecker.MarkCovered() return nil } else { + // writerDispatcher for DB Type is always table trigger dispatcher, so we need to add it too + writerDispatcherTask := be.spanController.GetTaskByID(be.writerDispatcher) + if writerDispatcherTask != nil { + spans = append(spans, writerDispatcherTask) + } + for _, stm := range spans { nodeID := stm.GetNodeID() if nodeID == "" {