Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 211 additions & 66 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ package main

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"go.goms.io/aks/AKSFlexNode/pkg/bootstrapper"
"go.goms.io/aks/AKSFlexNode/pkg/config"
"go.goms.io/aks/AKSFlexNode/pkg/drift"
"go.goms.io/aks/AKSFlexNode/pkg/logger"
"go.goms.io/aks/AKSFlexNode/pkg/spec"
"go.goms.io/aks/AKSFlexNode/pkg/status"
Expand Down Expand Up @@ -132,64 +134,208 @@ func runDaemonLoop(ctx context.Context, cfg *config.Config) error {
// Clean up any stale status file on daemon startup
if _, err := os.Stat(statusFilePath); err == nil {
logger.Info("Removing stale status file from previous daemon session...")
if err := os.Remove(statusFilePath); err != nil {
logger.Warnf("Failed to remove stale status file: %v", err)
} else {
logger.Info("Stale status file removed successfully")
}
status.RemoveStatusFileBestEffortAtPath(logger, statusFilePath)
}

logger.Info("Starting periodic status collection daemon (status: 1 minutes, bootstrap check: 2 minute)")
// Always remove managed cluster spec snapshot on daemon startup.
// We'll re-collect it shortly after startup and on a schedule.
removed, err := spec.RemoveManagedClusterSpecSnapshot()
if err != nil {
logger.Warnf("Failed to remove stale managed cluster spec snapshot: %v", err)
} else if removed {
logger.Info("Removed stale managed cluster spec snapshot successfully")
}

// Create tickers for different intervals
statusTicker := time.NewTicker(1 * time.Minute)
bootstrapTicker := time.NewTicker(2 * time.Minute)
specTicker := time.NewTicker(30 * time.Minute)
defer statusTicker.Stop()
defer bootstrapTicker.Stop()
defer specTicker.Stop()
logger.Info("Starting periodic status collection daemon (status: 1 minutes, bootstrap check: 2 minute, spec collection: 10 minutes)...")

// Protect cfg reads/writes across concurrent loops. This avoids data races when we
// temporarily update cfg.Kubernetes.Version to trigger drift remediation bootstrap.
var cfgMu sync.RWMutex

// Guard to prevent overlapping bootstrap runs across loops.
var bootstrapInProgress int32

// Collect status immediately on start
if err := collectAndWriteStatus(ctx, cfg, statusFilePath); err != nil {
logger.Errorf("Failed to collect initial status: %v", err)
}

// Collect managed cluster spec once on daemon startup.
if err := collectAndWriteManagedClusterSpec(ctx, cfg); err != nil {
logger.Warnf("Failed to collect initial managed cluster spec: %v", err)
driftEnabled := cfg != nil && cfg.IsDriftDetectionAndRemediationEnabled()
if !driftEnabled {
logger.Info("Drift detection and remediation is disabled by config")
}

// Run the periodic collection and monitoring loop
for {
select {
case <-ctx.Done():
logger.Info("Daemon shutting down due to context cancellation")
return ctx.Err()
case <-statusTicker.C:
logger.Infof("Starting periodic status collection at %s...", time.Now().Format("2006-01-02 15:04:05"))
if err := collectAndWriteStatus(ctx, cfg, statusFilePath); err != nil {
logger.Errorf("Failed to collect status at %s: %v", time.Now().Format("2006-01-02 15:04:05"), err)
// Continue running even if status collection fails
var detectors []drift.Detector
if driftEnabled {
// Initialize drift detectors and collect initial managed cluster spec before starting loops to ensure drift loop has what it needs to run on schedule without waiting for the first spec collection interval.
detectors = drift.DefaultDetectors()
// Collect managed cluster spec once on daemon startup.
if err := collectAndWriteManagedClusterSpec(ctx, cfg); err != nil {
logger.Warnf("Failed to collect initial managed cluster spec: %v", err)
} else {
cfgSnap := snapshotConfig(cfg, &cfgMu)
if err := drift.DetectAndRemediateFromFiles(ctx, cfgSnap, logger, &bootstrapInProgress, detectors); err != nil {
logger.Warnf("Initial drift detection after spec collection failed: %v", err)
} else {
logger.Info("Initial drift detection after spec collection completed successfully")
}

}
}

var wg sync.WaitGroup
startDaemonLoops(ctx, cfg, statusFilePath, logger, &cfgMu, &bootstrapInProgress, detectors, driftEnabled, &wg)

<-ctx.Done()
logger.Info("Daemon shutting down due to context cancellation")
wg.Wait()
return ctx.Err()
}

func startDaemonLoops(
ctx context.Context,
cfg *config.Config,
statusFilePath string,
logger *logrus.Logger,
cfgMu *sync.RWMutex,
bootstrapInProgress *int32,
detectors []drift.Detector,
driftEnabled bool,
wg *sync.WaitGroup,
) {
if wg == nil {
return
}
if driftEnabled {
wg.Add(3)
} else {
wg.Add(2)
}
startStatusCollectionLoop(ctx, cfg, statusFilePath, logger, cfgMu, wg)
startBootstrapHealthCheckLoop(ctx, cfg, logger, cfgMu, bootstrapInProgress, wg)
if driftEnabled {
startNodeDriftDetectionAndRemediationLoop(ctx, cfg, logger, cfgMu, bootstrapInProgress, detectors, wg)
}
}

func snapshotConfig(cfg *config.Config, cfgMu *sync.RWMutex) *config.Config {
if cfg == nil {
return nil
}
if cfgMu != nil {
cfgMu.RLock()
defer cfgMu.RUnlock()
}
return cfg.DeepCopy()
}

func startStatusCollectionLoop(
ctx context.Context,
cfg *config.Config,
statusFilePath string,
logger *logrus.Logger,
cfgMu *sync.RWMutex,
wg *sync.WaitGroup,
) {
go func() {
defer wg.Done()
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
now := time.Now()
logger.Infof("Starting periodic status collection at %s...", now.Format("2006-01-02 15:04:05"))
cfgSnap := snapshotConfig(cfg, cfgMu)
err := collectAndWriteStatus(ctx, cfgSnap, statusFilePath)
if err != nil {
logger.Errorf("Failed to collect status at %s: %v", now.Format("2006-01-02 15:04:05"), err)
continue
}
logger.Infof("Status collection completed successfully at %s", time.Now().Format("2006-01-02 15:04:05"))
}
case <-bootstrapTicker.C:
logger.Infof("Starting bootstrap health check at %s...", time.Now().Format("2006-01-02 15:04:05"))
if err := checkAndBootstrap(ctx, cfg); err != nil {
logger.Errorf("Auto-bootstrap check failed at %s: %v", time.Now().Format("2006-01-02 15:04:05"), err)
// Continue running even if bootstrap check fails
} else {
logger.Infof("Bootstrap health check completed at %s", time.Now().Format("2006-01-02 15:04:05"))
}
}()
}

func startBootstrapHealthCheckLoop(
ctx context.Context,
cfg *config.Config,
logger *logrus.Logger,
cfgMu *sync.RWMutex,
bootstrapInProgress *int32,
wg *sync.WaitGroup,
) {
go func() {
defer wg.Done()
ticker := time.NewTicker(2 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
now := time.Now()
logger.Infof("Starting bootstrap health check at %s...", now.Format("2006-01-02 15:04:05"))

if !atomic.CompareAndSwapInt32(bootstrapInProgress, 0, 1) {
logger.Warn("Bootstrap already in progress, skipping this interval")
continue
}
func() {
defer atomic.StoreInt32(bootstrapInProgress, 0)
cfgSnap := snapshotConfig(cfg, cfgMu)
err := checkAndBootstrap(ctx, cfgSnap)
if err != nil {
logger.Errorf("Auto-bootstrap check failed at %s: %v", now.Format("2006-01-02 15:04:05"), err)
return
}
logger.Infof("Bootstrap health check completed at %s", time.Now().Format("2006-01-02 15:04:05"))
}()
}
case <-specTicker.C:
logger.Infof("Starting periodic managed cluster spec collection at %s...", time.Now().Format("2006-01-02 15:04:05"))
if err := collectAndWriteManagedClusterSpec(ctx, cfg); err != nil {
logger.Warnf("Failed to collect managed cluster spec at %s: %v", time.Now().Format("2006-01-02 15:04:05"), err)
} else {
}
}()
}

func startNodeDriftDetectionAndRemediationLoop(
ctx context.Context,
cfg *config.Config,
logger *logrus.Logger,
cfgMu *sync.RWMutex,
bootstrapInProgress *int32,
detectors []drift.Detector,
wg *sync.WaitGroup,
) {
go func() {
defer wg.Done()
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
now := time.Now()
logger.Infof("Starting periodic managed cluster spec collection at %s...", now.Format("2006-01-02 15:04:05"))
cfgSnap := snapshotConfig(cfg, cfgMu)
err := collectAndWriteManagedClusterSpec(ctx, cfgSnap)
if err != nil {
logger.Warnf("Failed to collect managed cluster spec at %s: %v", now.Format("2006-01-02 15:04:05"), err)
continue
}
logger.Infof("Managed cluster spec collection completed at %s", time.Now().Format("2006-01-02 15:04:05"))

// Run drift detection immediately after spec is updated so we don't wait.
if err := drift.DetectAndRemediateFromFiles(ctx, cfgSnap, logger, bootstrapInProgress, detectors); err != nil {
logger.Warnf("Drift detection after spec collection failed at %s: %v", time.Now().Format("2006-01-02 15:04:05"), err)
} else {
logger.Infof("Drift detection after spec collection completed at %s", time.Now().Format("2006-01-02 15:04:05"))
}
}
}
}
}()
}

func collectAndWriteManagedClusterSpec(ctx context.Context, cfg *config.Config) error {
Expand All @@ -213,36 +359,42 @@ func checkAndBootstrap(ctx context.Context, cfg *config.Config) error {

logger.Info("Node requires re-bootstrapping, initiating auto-bootstrap...")

if cfg != nil && cfg.IsDriftDetectionAndRemediationEnabled() {
// Best-effort: refresh the managed cluster spec snapshot before attempting to
// override Kubernetes version. This avoids falling back to an old static version
// right after reboot (we delete the snapshot at daemon startup).
if err := collectAndWriteManagedClusterSpec(ctx, cfg); err != nil {
logger.Warnf("Failed to refresh managed cluster spec before auto-bootstrap: %v", err)
}

// Best-effort: prefer Kubernetes version from the persisted managed cluster spec snapshot.
// This keeps auto-bootstrap aligned with the cluster desired version even if the static
// config has an older value.
if changed, oldV, newV, err := spec.OverrideKubernetesVersionFromManagedClusterSpec(cfg); err == nil && changed {
logger.Infof("Overriding Kubernetes version from managed cluster spec: %q -> %q", oldV, newV)
}
}

// Perform bootstrap
bootstrapExecutor := bootstrapper.New(cfg, logger)
result, err := bootstrapExecutor.Bootstrap(ctx)
if err != nil {
// Bootstrap failed - remove status file so next check will detect the problem
removeStatusFile(ctx)
status.RemoveStatusFileBestEffort(logger)
return fmt.Errorf("auto-bootstrap failed: %s", err)
}

// Handle and log the bootstrap result
if err := handleExecutionResult(result, "auto-bootstrap", logger); err != nil {
// Bootstrap execution failed - remove status file so next check will detect the problem
removeStatusFile(ctx)
status.RemoveStatusFileBestEffort(logger)
return fmt.Errorf("auto-bootstrap execution failed: %s", err)
}

logger.Info("Auto-bootstrap completed successfully")
return nil
}

func removeStatusFile(ctx context.Context) {
logger := logger.GetLoggerFromContext(ctx)
statusFilePath := status.GetStatusFilePath()
if removeErr := os.Remove(statusFilePath); removeErr != nil {
logger.Debugf("Failed to remove status file: %s", removeErr)
} else {
logger.Debug("Removed status file successfully")
}
}

// collectAndWriteStatus collects current node status and writes it to the status file
func collectAndWriteStatus(ctx context.Context, cfg *config.Config, statusFilePath string) error {
logger := logger.GetLoggerFromContext(ctx)
Expand All @@ -255,23 +407,16 @@ func collectAndWriteStatus(ctx context.Context, cfg *config.Config, statusFilePa
if err != nil {
return fmt.Errorf("failed to collect node status: %w", err)
}
if nodeStatus != nil {
nodeStatus.LastUpdatedBy = status.LastUpdatedByStatusCollectionLoop
nodeStatus.LastUpdatedReason = status.LastUpdatedReasonPeriodicStatusLoop
}

// Write status to JSON file
statusData, err := json.MarshalIndent(nodeStatus, "", " ")
err = status.WriteStatusToFile(statusFilePath, nodeStatus)
if err != nil {
return fmt.Errorf("failed to marshal status to JSON: %w", err)
}

// Write to temporary file first, then rename (atomic operation)
tempFile := statusFilePath + ".tmp"
if err := os.WriteFile(tempFile, statusData, 0o600); err != nil {
return fmt.Errorf("failed to write status to temp file: %w", err)
return fmt.Errorf("failed to write status to file: %w", err)
}

if err := os.Rename(tempFile, statusFilePath); err != nil {
return fmt.Errorf("failed to rename temp status file: %w", err)
}

logger.Debugf("Status written to %s", statusFilePath)
return nil
}
Expand Down
19 changes: 16 additions & 3 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ sudo tee /etc/aks-flex-node/config.json > /dev/null << 'EOF'
},
"agent": {
"logLevel": "info",
"logDir": "/var/log/aks-flex-node"
"logDir": "/var/log/aks-flex-node",
"enableDriftDetectionAndRemediation": true
}
}
EOF
Expand Down Expand Up @@ -325,7 +326,8 @@ sudo tee /etc/aks-flex-node/config.json > /dev/null <<EOF
},
"agent": {
"logLevel": "info",
"logDir": "/var/log/aks-flex-node"
"logDir": "/var/log/aks-flex-node",
"enableDriftDetectionAndRemediation": true
}
}
EOF
Expand Down Expand Up @@ -538,10 +540,21 @@ sudo tee /etc/aks-flex-node/config.json > /dev/null <<EOF
},
"agent": {
"logLevel": "info",
"logDir": "/var/log/aks-flex-node"
"logDir": "/var/log/aks-flex-node",
"enableDriftDetectionAndRemediation": true
}
}
EOF

You can disable automated drift detection and remediation if you prefer to self-managed node updates:

```json
{
"agent": {
"enableDriftDetectionAndRemediation": false
}
}
```
```

### Running the Agent
Expand Down
Loading
Loading