Skip to content

Comments

Add node neighbor link updater controller#9

Open
MitchLewis930 wants to merge 1 commit intopr_049_beforefrom
pr_049_after
Open

Add node neighbor link updater controller#9
MitchLewis930 wants to merge 1 commit intopr_049_beforefrom
pr_049_after

Conversation

@MitchLewis930
Copy link

@MitchLewis930 MitchLewis930 commented Jan 30, 2026

User description

PR_049


PR Type

Enhancement


Description

  • Introduce node neighbor link updater controller for centralized monitoring

  • Add NodeNeighborEnqueuer interface for asynchronous neighbor link updates

  • Replace async goroutines with queue-based processing for better health tracking

  • Update NodeNeighborRefresh signature to include refresh parameter


Diagram Walkthrough

flowchart LR
  A["Node Updates"] -->|Enqueue| B["NodeNeighborQueue"]
  B -->|Pop| C["NodeNeighborLinkUpdater Controller"]
  C -->|NodeNeighborRefresh| D["NodeNeighbors Handler"]
  D -->|insertNeighbor| E["Kernel Neighbor Table"]
  C -->|Health Report| F["Health Scope"]
Loading

File Walkthrough

Relevant files
Enhancement
10 files
daemon_main.go
Initialize node neighbor link updater controller                 
+3/-0     
status.go
Update NodeNeighborRefresh signature with refresh parameter
+1/-1     
cells.go
Inject NodeManager into datapath configuration                     
+4/-0     
node.go
Update fake node handler with refresh parameter                   
+1/-1     
datapath.go
Pass NodeManager to NewNodeHandler constructor                     
+3/-1     
node.go
Replace async goroutines with queue-based neighbor updates,
add

NodeNeighborEnqueuer field, improve error handling
+17/-32 
config.go
Add NodeNeighborEnqueuer interface definition                       
+8/-0     
node.go
Update NodeNeighborRefresh signature with refresh parameter
+1/-1     
cell.go
Add NodeNeighborEnqueuer and StartNodeNeighborLinkUpdater to interface
+6/-0     
manager.go
Implement node neighbor queue and link updater controller,
add Enqueue

method, refactor neighbor refresh logic
+77/-15 
Tests
4 files
fuzz_test.go
Add mockEnqueuer for fuzzing and update test calls             
+15/-3   
node_linux_test.go
Update test calls with refresh parameter and mockEnqueuer
+35/-27 
node_test.go
Update NewNodeHandler calls with mockEnqueuer parameter   
+1/-1     
queue_test.go
Add queue implementation tests for push and pop operations
+65/-0   
Formatting
1 files
config.go
Minor formatting adjustment to config structure                   
+1/-0     

This controller is responsible for monitoring nodes that request a neighbor
link update.
It provides a single place in the application where all node neighbor link
are updates are performed as insertNeighbor are sprinkled in the code base.
Additionally given the central location we can provide additional
module health signals should neighbor link updates failed.

Signed-off-by: Fernand Galiana <fernand.galiana@gmail.com>
@qodo-code-review
Copy link

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
Queue-driven DoS

Description: The new neighbor-update queue/controller path can be abused (or can malfunction under
races) to cause persistent error/health degradation and potential resource exhaustion:
Enqueue() logs on n == nil but still pushes a nil entry (lines 143-150), and
StartNodeNeighborLinkUpdater() treats pop() returning nil (e.g., queue emptied after
isEmpty() check) as an "invalid node spec" error (lines 871-890), which may repeatedly
degrade module health and spam logs; additionally, the queue appears unbounded so a burst
of node events could grow memory/CPU usage significantly.
manager.go [142-895]

Referred Code
// Enqueue add a node to a controller managed queue which sets up the neighbor link.
func (m *manager) Enqueue(n *nodeTypes.Node, refresh bool) {
	if n == nil {
		log.WithFields(logrus.Fields{
			logfields.LogSubsys: "enqueue",
		}).Warn("Skipping nodeNeighbor insert: No node given")
	}
	m.nodeNeighborQueue.push(&nodeQueueEntry{node: n, refresh: refresh})
}

// Subscribe subscribes the given node handler to node events.
func (m *manager) Subscribe(nh datapath.NodeHandler) {
	m.nodeHandlersMu.Lock()
	m.nodeHandlers[nh] = struct{}{}
	m.nodeHandlersMu.Unlock()
	// Add all nodes already received by the manager.
	m.mutex.RLock()
	for _, v := range m.nodes {
		v.mutex.Lock()
		if err := nh.NodeAdd(v.node); err != nil {
			log.WithFields(logrus.Fields{


 ... (clipped 733 lines)
Ticket Compliance
🎫 No ticket provided
  • Create ticket/issue
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
🟢
Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

🔴
Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status:
Non-descriptive names: The new tests use unclear identifiers like uu, u, and e which reduce readability and are
not self-documenting.

Referred Code
uu := map[string]struct {
	items, e []string
}{
	"empty": {},

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status:
Queue underflow error: The dequeue loop treats an empty-queue pop() (returning nil) as an "invalid node
spec" error, which can make normal queue exhaustion appear as a controller failure.

Referred Code
var errs error
for {
	e := m.nodeNeighborQueue.pop()
	if e == nil || e.node == nil {
		errs = errors.Join(errs, fmt.Errorf("invalid node spec found in queue: %#v", e))
		break
	}

	log.Debugf("Refreshing node neighbor link for %s", e.node.Name)
	hr := cell.GetHealthReporter(sc, e.node.Name)
	if errs = errors.Join(errs, nh.NodeNeighborRefresh(ctx, *e.node, e.refresh)); errs != nil {
		hr.Degraded("Failed node neighbor link update", errs)
	} else {
		hr.OK("Node neighbor link update successful")
	}
}
return errs

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status:
Nil input enqueued: Enqueue() warns when n == nil but does not return, and still pushes a queue entry
containing a nil node pointer, enabling invalid input to propagate into processing.

Referred Code
func (m *manager) Enqueue(n *nodeTypes.Node, refresh bool) {
	if n == nil {
		log.WithFields(logrus.Fields{
			logfields.LogSubsys: "enqueue",
		}).Warn("Skipping nodeNeighbor insert: No node given")
	}
	m.nodeNeighborQueue.push(&nodeQueueEntry{node: n, refresh: refresh})
}

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status:
Unstructured debug log: The controller logs via log.Debugf(...) which may be unstructured depending on deployment
logging configuration and should be verified against structured logging requirements.

Referred Code
log.Debugf("Refreshing node neighbor link for %s", e.node.Name)
hr := cell.GetHealthReporter(sc, e.node.Name)

Learn more about managing compliance generic rules or creating your own custom rules

Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

@qodo-code-review
Copy link

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Use the managed controller manager

Replace controller.NewManager() with the existing m.controllerManager to ensure
the new controller is properly managed and shut down, preventing resource leaks.

pkg/node/manager/manager.go [866-895]

-	controller.NewManager().UpdateController(
+	m.controllerManager.UpdateController(
 		"node-neighbor-link-updater",
 		controller.ControllerParams{
 			Group: neighborTableUpdateControllerGroup,
 			DoFunc: func(ctx context.Context) error {
 ...
 			},
 			RunInterval: defaultNodeUpdateInterval,
 		},
 	)

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 9

__

Why: The suggestion correctly identifies a resource leak caused by creating a new unmanaged controller manager instead of using the existing managed m.controllerManager, which would lead to a dangling goroutine.

High
Synchronize queue operations

Add a mutex to protect the nodeNeighborQueue against concurrent access from
multiple goroutines to prevent data races.

pkg/node/manager/manager.go [143-150]

+// add to manager struct:
+//   nodeNeighborQueueMu sync.Mutex
 func (m *manager) Enqueue(n *nodeTypes.Node, refresh bool) {
 	if n == nil {
 		log.WithFields(logrus.Fields{
 			logfields.LogSubsys: "enqueue",
 		}).Warn("Skipping nodeNeighbor insert: No node given")
 	}
+	m.nodeNeighborQueueMu.Lock()
 	m.nodeNeighborQueue.push(&nodeQueueEntry{node: n, refresh: refresh})
+	m.nodeNeighborQueueMu.Unlock()
 }
  • Apply / Chat
Suggestion importance[1-10]: 9

__

Why: The suggestion correctly identifies a critical data race on the new nodeNeighborQueue, which is accessed concurrently by multiple goroutines without any synchronization, and proposes a valid fix using a mutex.

High
Isolate error handling for correct health reporting

Check the error from NodeNeighborRefresh directly before joining it with the
aggregate error variable to ensure accurate health reporting for each node.

pkg/node/manager/manager.go [884-888]

-					if errs = errors.Join(errs, nh.NodeNeighborRefresh(ctx, *e.node, e.refresh)); errs != nil {
-						hr.Degraded("Failed node neighbor link update", errs)
+					if err := nh.NodeNeighborRefresh(ctx, *e.node, e.refresh); err != nil {
+						errs = errors.Join(errs, err)
+						hr.Degraded("Failed node neighbor link update", err)
 					} else {
 						hr.OK("Node neighbor link update successful")
 					}
  • Apply / Chat
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a bug in the health reporting logic where an error from a previous node update would incorrectly mark a subsequent successful update as failed.

Medium
Correct neighbor error handling

Separate the error check for insertNeighborCommon from the aggregate error errs
to ensure that errors are only wrapped and returned when the
insertNeighborCommon call itself fails.

pkg/datapath/linux/node.go [750-754]

-if err := errors.Join(errs, n.insertNeighborCommon(ctx, nh, link, refresh)); err != nil {
-	return fmt.Errorf("insert node neighbor IPv4 for %s(%s) failed : %w", link.Attrs().Name, newNodeIP, err)
+commonErr := n.insertNeighborCommon(ctx, nh, link, refresh)
+if commonErr != nil {
+	return fmt.Errorf("insert node neighbor IPv4 for %s(%s) failed: %w", link.Attrs().Name, newNodeIP, commonErr)
 }
-
 return errs
  • Apply / Chat
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that the error handling logic could incorrectly wrap a nil error from insertNeighborCommon if a previous, unrelated error exists, improving the accuracy of error reporting.

Medium
High-level
Refactor neighbor updates to use a single controller

Merge the newly introduced NodeNeighborLinkUpdater controller with the existing
StartNeighborRefresh controller. This will centralize neighbor update logic into
a single controller, simplifying the design.

Examples:

pkg/node/manager/manager.go [858-933]
// StartNodeNeighborLinkUpdater manages node neighbors links sync.
// This provides a central location for all node neighbor link updates.
// Under proper conditions, publisher enqueues the node which requires a link update.
// This controller is agnostic of the condition under which the links must be established, thus
// that responsibility lies on the publishers.
// This controller also provides for module health to be reported in a single central location.
func (m *manager) StartNodeNeighborLinkUpdater(nh datapath.NodeNeighbors) {
	sc := cell.GetSubScope(m.healthScope, "neighbor-link-updater")
	controller.NewManager().UpdateController(
		"node-neighbor-link-updater",

 ... (clipped 66 lines)

Solution Walkthrough:

Before:

func (m *manager) StartNodeNeighborLinkUpdater(nh datapath.NodeNeighbors) {
    // ...
    controller.NewManager().UpdateController(
        "node-neighbor-link-updater",
        controller.ControllerParams{
            DoFunc: func(ctx context.Context) error {
                // ... process queue ...
                nh.NodeNeighborRefresh(ctx, *e.node, e.refresh)
            },
            RunInterval: defaultNodeUpdateInterval,
        },
    )
}

func (m *manager) StartNeighborRefresh(nh datapath.NodeNeighbors) {
    // ...
    controller.NewManager().UpdateController(
        "neighbor-table-refresh",
        controller.ControllerParams{
            DoFunc: func(controllerCtx context.Context) error {
                // ... iterate nodes ...
                go func(ctx context.Context, e *nodeTypes.Node) {
                    // ... sleep ...
                    m.Enqueue(e, false)
                }(ctx, &entryNode)
            },
            RunInterval: m.conf.ARPPingRefreshPeriod,
        },
    )
}

After:

func (m *manager) StartNodeNeighborLinkUpdater(nh datapath.NodeNeighbors) {
    // ...
    controller.NewManager().UpdateController(
        "node-neighbor-link-updater",
        controller.ControllerParams{
            DoFunc: func(ctx context.Context) error {
                // Enqueue nodes for periodic refresh
                for _, entry := range m.nodes {
                    // ...
                    go func(ctx context.Context, e *nodeTypes.Node) {
                        // ... sleep ...
                        m.Enqueue(e, true) // Refresh is now true
                    }(ctx, &entryNode)
                }

                // Process queue
                for {
                    e := m.nodeNeighborQueue.pop()
                    // ...
                    nh.NodeNeighborRefresh(ctx, *e.node, e.refresh)
                }
            },
            RunInterval: m.conf.ARPPingRefreshPeriod,
        },
    )
}
Suggestion importance[1-10]: 8

__

Why: This suggestion correctly identifies that the PR creates two controllers for neighbor updates, which could be consolidated into one, simplifying the design and improving maintainability.

Medium
  • More

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants