Skip to content

Conversation

@wlwilliamx
Copy link
Collaborator

@wlwilliamx wlwilliamx commented Dec 18, 2025

What problem does this PR solve?

Issue Number: close #3411

During maintainer failover/restart, in-flight dispatcher operators (add/remove/move/split) could be lost. If a dispatcher becomes Stopped/Removed while the corresponding operator state is missing, the span may never be rescheduled, leading to “lost dispatcher” and stalled replication. This is reproducible when dispatcher creation/close is blocked (e.g. move-table/split/remove in progress) and affects both default and redo modes.

What is changed and how it works?

  • Persist in-flight operators across failover via bootstrap
    • Add OperatorType to ScheduleDispatcherRequest and include operators in MaintainerBootstrapResponse (heartbeatpb/heartbeat.proto).
    • DispatcherManager records current working operators (currentOperatorMap / redoCurrentOperatorMap) when handling schedule requests, and clears them when the dispatcher is created/cleaned (downstreamadapter/dispatchermanager/...).
    • DispatcherOrchestrator returns these operators in bootstrap responses (downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go).
  • Restore operator consistency during maintainer bootstrap
    • During FinishBootstrap, restore reported in-flight operators before rebuilding task/schema state (maintainer/maintainer_controller_bootstrap.go).
    • For Create: ensure an absent replica-set exists and re-issue an add operator.
    • For Remove:
      • O_Remove: re-issue remove; if the table is gone, drop the span from the controller.
      • O_Move/O_Split: re-issue the remove stage and mark the span absent on finish so normal scheduling can safely re-add it (target may change after failover).
  • Runtime safety net
    • If a dispatcher reports Stopped/Removed but there is no operator for it, mark the span absent to force rescheduling (maintainer/maintainer_controller.go).
  • Tests / test tooling
    • Add failpoints to keep operators in-progress (NotReadyToCloseDispatcher, BlockCreateDispatcher).
    • Add integration test maintainer_failover_when_operator (covers add/split/move/remove + maintainer failover; default + redo subcases).
    • Add wait param/flag to move-table API/CLI for tests so a move can be initiated without blocking until completion.

Check List

Tests

  • Unit test
  • Integration test
  • Manual test

Questions

Will it cause performance regression or break compatibility?

None

Do you need to update user documentation, design documentation or monitoring documentation?

None

Release note

Fix an issue where table spans could stop replicating after maintainer failover/restart when dispatcher add/remove/move/split operators were in progress.

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-triage-completed release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. and removed do-not-merge/needs-triage-completed labels Dec 18, 2025
@gemini-code-assist
Copy link

Summary of Changes

Hello @wlwilliamx, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical consistency issue within the scheduler where the state of active dispatcher operators could be lost or become inconsistent following a maintainer restart. By enhancing the Protocol Buffer definitions to include operator-specific metadata and implementing robust tracking mechanisms within the DispatcherManager, the system can now accurately persist and restore the state of these operators. This ensures that operations on table spans, such as adding, removing, or moving dispatchers, are correctly resumed and maintained, leading to a more resilient and consistent scheduling process.

Highlights

  • Operator State Persistence: Introduced mechanisms to persist and restore the state of in-progress dispatcher operators (add, remove, move, split, merge) across maintainer restarts, ensuring consistency.
  • Protocol Buffer Updates: Added a new OperatorType enum and fields to DispatcherConfig (enabledSplit) and ScheduleDispatcherRequest (operatorType) in the heartbeatpb protobuf definitions to support operator tracking.
  • Dispatcher Manager Enhancements: The DispatcherManager now uses sync.Map to track currentOperatorMap and redoCurrentOperatorMap, preventing duplicate operator scheduling and ensuring proper cleanup.
  • Bootstrap Operator Restoration: During maintainer bootstrap, existing operators are now restored from the MaintainerBootstrapResponse, allowing the system to resume operations on previously active table spans.
  • Operator Type Integration: All relevant operator creation and message sending functions across maintainer/operator and maintainer/replica packages have been updated to utilize the new OperatorType.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@wlwilliamx wlwilliamx changed the title fix(scheduler): ensure span consistency for operators after maintainer restart fix(spanController): ensure span consistency for operators after maintainer restart Dec 18, 2025
Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces changes to ensure span consistency for operators after a maintainer restart. The main changes involve adding an enabledSplit flag to dispatchers and tracking ongoing operators to restore them during bootstrap. My review focuses on ensuring correctness, consistency, and robustness of these new mechanisms. I've identified a few areas for improvement, including a typo, a misleading comment, a copy-paste error in a log message, and an unhandled error. Overall, the changes are well-structured and address the intended problem.

@wlwilliamx
Copy link
Collaborator Author

/test pull-integration-test

@ti-chi-bot
Copy link

ti-chi-bot bot commented Dec 18, 2025

@wlwilliamx: The specified target(s) for /test were not found.
The following commands are available to trigger required jobs:

/test pull-build
/test pull-cdc-kafka-integration-heavy
/test pull-cdc-kafka-integration-light
/test pull-cdc-mysql-integration-heavy
/test pull-cdc-mysql-integration-light
/test pull-cdc-storage-integration-heavy
/test pull-cdc-storage-integration-light
/test pull-check
/test pull-error-log-review
/test pull-unit-test

The following commands are available to trigger optional jobs:

/test pull-build-next-gen
/test pull-cdc-kafka-integration-heavy-next-gen
/test pull-cdc-kafka-integration-light-next-gen
/test pull-cdc-mysql-integration-heavy-next-gen
/test pull-cdc-mysql-integration-light-next-gen
/test pull-cdc-pulsar-integration-light
/test pull-cdc-pulsar-integration-light-next-gen
/test pull-cdc-storage-integration-heavy-next-gen
/test pull-cdc-storage-integration-light-next-gen
/test pull-unit-test-next-gen

Use /test all to run the following jobs that were automatically triggered:

pull-build
pull-build-next-gen
pull-check
pull-error-log-review
pull-unit-test
pull-unit-test-next-gen
Details

In response to this:

/test pull-integration-test

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@wlwilliamx
Copy link
Collaborator Author

/test pull-cdc-mysql-integration-heavy

@wlwilliamx
Copy link
Collaborator Author

/test pull-cdc-mysql-integration-light

@wlwilliamx
Copy link
Collaborator Author

/test pull-cdc-mysql-integration-heavy

@wlwilliamx
Copy link
Collaborator Author

/test pull-cdc-mysql-integration-heavy

// or just a part of the table (span). When true, the dispatcher handles the entire table;
// when false, it only handles a portion of the table.
isCompleteTable bool
enabledSplit bool
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need this field?

@wlwilliamx
Copy link
Collaborator Author

/test pull-cdc-mysql-integration-heavy

@wlwilliamx
Copy link
Collaborator Author

/test pull-cdc-mysql-integration-heavy

@wlwilliamx
Copy link
Collaborator Author

/test pull-cdc-mysql-integration-light

@wlwilliamx
Copy link
Collaborator Author

/retest

)
redoInfos[dispatcherID] = info
} else {
dispatcherManager.currentOperatorMap.Store(operatorKey, req)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be put outside.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 9, 2026

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: wk989898

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the needs-1-more-lgtm Indicates a PR needs 1 more LGTM. label Jan 9, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 9, 2026

[LGTM Timeline notifier]

Timeline:

  • 2026-01-09 06:00:33.832135836 +0000 UTC m=+941189.650444268: ☑️ agreed by wk989898.

@ti-chi-bot ti-chi-bot bot added the approved label Jan 9, 2026
@wlwilliamx
Copy link
Collaborator Author

/retest

if isRedo && (!dispatcherManager.RedoEnable || dispatcherManager.redoDispatcherMap == nil) {
return common.DispatcherID{}, false
}
if _, operatorExists := dispatcherManager.currentOperatorMap.Load(operatorKey); operatorExists {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one dispatcher is in move operator, then maintainer generater a remove operator for this dispatcher, but you seem just discard the remove action here. It may cause some strang problems

// - The task is removed (for example, due to DDL).
removed atomic.Bool
spanController *span.Controller
// This add operator may be a part of move/split operator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the comment, we may should first explain what the "operatorType means", why operateType could be "move" in a AddDispatcherOperator

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, you may could explain why some operators have this field, but others doesn't

finished atomic.Bool
postFinish func()
spanController *span.Controller
// This remove operator may be a part of move/split operator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

@@ -0,0 +1,362 @@
#!/bin/bash
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a description here to describe what is the main purpose for this test, and what is the main steps for the tests.
Test code is often more complex and harder to read and maintain. Having a comprehensive explanation and step-by-step instructions makes it easier to check and fix test failures later on.

if span == nil {
span = spanInfo.Span
}
if schemaID == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will the schemaID == 0, and why we need to make schemaID = spanInfo.SchemaID here?

return spanInfoByID
}

func (c *Controller) restoreCurrentWorkingOperators(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this section difficult to understand, mainly because it involves many special logic checks. This might stem from the numerous special cases reported by Bootstrap, such as when a span has a value but the operator doesn't, or vice versa. I think it would be better to provide an overview of the information it receives, the special cases, and their origins before explaining the logic, and then explain the underlying logic. This would make it easier to understand.

From your perspective, all the special processing logic within a function might seem clear, but those unfamiliar with it might wonder why there are different processing checks and whether these special checks are truly necessary or for some other purpose. Therefore, the best approach as an author is to clearly explain the logic through specific case statements in the comments.

spanController.ShouldEnableSplit(tableSplitMap[spanInfo.Span.TableID]),
)
spanController.AddReplicatingSpan(replicaSet)
} else if replicaSet.GetNodeID() == "" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will the node be ""

splitEnabled := spanController.ShouldEnableSplit(table.Splitable)
// Add new table if not working
if isTableWorking {
if isTableWorking || isTableSpanExists {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will isTableWorking == false but isTableSpanExists == true

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the related logic is a little confused for me. Maybe you can explain here.

// If a dispatcher becomes non-working but there's no operator handling it,
// it means the dispatcher is removed unexpectedly (e.g. maintainer failover loses the operator),
// and we must reschedule it to avoid the dispatcher being lost forever.
if status.ComponentStatus == heartbeatpb.ComponentState_Stopped ||
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this is a fallback logic, but I'm a little unsure if we really won't reach this point under all normal logic. Could you help me confirm this? We're considering various scenarios such as message resending, multiple sending, late arrival, and uncertain message order across multiple nodes.

@wlwilliamx
Copy link
Collaborator Author

/test pull-cdc-mysql-integration-light

@wlwilliamx
Copy link
Collaborator Author

/retest

@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 14, 2026

@wlwilliamx: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-cdc-storage-integration-heavy 7e8b71a link true /test pull-cdc-storage-integration-heavy
pull-cdc-mysql-integration-light ad3c626 link true /test pull-cdc-mysql-integration-light
pull-error-log-review ad3c626 link true /test pull-error-log-review

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved needs-1-more-lgtm Indicates a PR needs 1 more LGTM. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

add index can not sync to downstream and data inconsistency between upstream and downstream after ticdc rolling restart

3 participants