-
Notifications
You must be signed in to change notification settings - Fork 34
dispatcherOrchestrator: discard duplicate messages to avoid floods of retry messages #3988
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @hongyunyan, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly improves the message handling mechanism within the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a pendingMessageQueue to de-duplicate messages in the DispatcherOrchestrator, preventing floods of retry messages. The implementation replaces the previous channel-based approach with this new queue, which correctly drops duplicate messages while one is being processed. The changes include updating the message handling logic, the shutdown sequence, and adding comprehensive unit tests for the new queue.
My review focuses on the new queue's implementation and its integration. I've identified a couple of areas for potential improvement:
- Refactoring
getPendingMessageKeyfor better conciseness. - Removing a seemingly redundant
nilcheck inhandleMessagesfor improved clarity.
Overall, this is a solid improvement that addresses the issue of message floods effectively.
| func getPendingMessageKey(msg *messaging.TargetMessage) (pendingMessageKey, bool) { | ||
| switch req := msg.Message[0].(type) { | ||
| case *heartbeatpb.MaintainerBootstrapRequest: | ||
| return pendingMessageKey{ | ||
| changefeedID: common.NewChangefeedIDFromPB(req.ChangefeedID), | ||
| msgType: msg.Type, | ||
| }, true | ||
| case *heartbeatpb.MaintainerPostBootstrapRequest: | ||
| return pendingMessageKey{ | ||
| changefeedID: common.NewChangefeedIDFromPB(req.ChangefeedID), | ||
| msgType: msg.Type, | ||
| }, true | ||
| case *heartbeatpb.MaintainerCloseRequest: | ||
| return pendingMessageKey{ | ||
| changefeedID: common.NewChangefeedIDFromPB(req.ChangefeedID), | ||
| msgType: msg.Type, | ||
| }, true | ||
| default: | ||
| // Channel is full, log warning and drop the message | ||
| log.Warn("message channel is full, dropping message", zap.Any("message", msg.Message)) | ||
| return nil | ||
| return pendingMessageKey{}, false | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The switch statement in this function contains duplicated code for extracting req.ChangefeedID and constructing the pendingMessageKey. You can refactor this to be more concise by extracting the ChangefeedID into a variable within the switch, and then constructing the pendingMessageKey once after the switch.
func getPendingMessageKey(msg *messaging.TargetMessage) (pendingMessageKey, bool) {
var changefeedID *heartbeatpb.ChangefeedID
switch req := msg.Message[0].(type) {
case *heartbeatpb.MaintainerBootstrapRequest:
changefeedID = req.ChangefeedID
case *heartbeatpb.MaintainerPostBootstrapRequest:
changefeedID = req.ChangefeedID
case *heartbeatpb.MaintainerCloseRequest:
changefeedID = req.ChangefeedID
default:
return pendingMessageKey{}, false
}
return pendingMessageKey{
changefeedID: common.NewChangefeedIDFromPB(changefeedID),
msgType: msg.Type,
}, true
}| if msg == nil { | ||
| m.msgQueue.Done(key) | ||
| continue | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check for msg == nil appears to be unnecessary. Given the logic of pendingMessageQueue, a key is only pushed to the queue after being added to the pending map. Since handleMessages processes messages serially from the queue in a single goroutine, m.msgQueue.Get(key) should not return nil for a key that was just successfully popped. Removing this block would simplify the code. If there's a subtle race condition this is protecting against, it would be beneficial to add a comment explaining it.
| "go.uber.org/zap" | ||
| ) | ||
|
|
||
| type pendingMessageKey struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move this struct to a new file to make the code clean
| q.mu.Lock() | ||
| if _, ok := q.pending[key]; ok { | ||
| q.mu.Unlock() | ||
| return false | ||
| } | ||
| q.pending[key] = msg | ||
| q.mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to extract the code as a function
| func (m *DispatcherOrchestrator) handleMessages() { | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use close the msgQueue instead of ctx.cancel function here.
Becauses msgQueue will freeze when there is no data, making it impossible to perceive changes in ctx, and a forced close operation is required.
| } | ||
|
|
||
| // De-duplicate by (changefeedID, messageType) to avoid floods of retry messages. | ||
| _ = m.msgQueue.TryEnqueue(key, msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that the message with the same key as an existing message is not a retry message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. Besides, according to the original logic, if the channel is full, the message will be dropped directly, so I think the behavior will be the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I pause the changefeed and remove the changefeed, the key is the same, but the request is not the same.
|
/retest |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: flowbehappy, wk989898 The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
|
/retest |
|
/test pull-cdc-mysql-integration-light-next-gen |
|
/retest |
|
/test pull-cdc-mysql-integration-heavy |
|
/retest |
1 similar comment
|
/retest |
What problem does this PR solve?
Issue Number: close #3962
What is changed and how it works?
This pull request significantly improves the message handling mechanism within the
DispatcherOrchestratorby implementing a de-duplication strategy for incoming messages. By preventing the processing of redundant retry messages, the system can maintain better performance and responsiveness, ensuring that critical operations are not delayed or starved due to an overload of duplicate requests. This change enhances the overall stability and efficiency of the message dispatching system.Highlights
pendingMessageQueueto de-duplicate messages based on(changefeedID, messageType), preventing floods of retry messages from blocking or starving other requests.DispatcherOrchestratornow utilizes thependingMessageQueueinstead of a simple buffered channel for asynchronous message processing, enhancing robustness and efficiency.pendingMessageKeystruct was added to uniquely identify messages for de-duplication purposes.pendingMessageQueueto ensure its correctness in handling duplicates, preserving order, and proper closure.Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note