diff --git a/internal/pkg/nvmlprovider/provider.go b/internal/pkg/nvmlprovider/provider.go index f8f3ae76..c23ea340 100644 --- a/internal/pkg/nvmlprovider/provider.go +++ b/internal/pkg/nvmlprovider/provider.go @@ -22,6 +22,7 @@ import ( "log/slog" "strconv" "strings" + "sync" "github.com/NVIDIA/go-nvml/pkg/nvml" ) @@ -62,13 +63,18 @@ func SetClient(n NVML) { // nvmlProvider implements NVML Interface type nvmlProvider struct { initialized bool + migCache map[string]*MIGDeviceInfo + // lock protects migCache + lock sync.RWMutex } func newNVMLProvider() (NVML, error) { // Check if a NVML client already exists and return it if so. - if Client() != nil && Client().(nvmlProvider).initialized { - slog.Info("NVML already initialized.") - return Client(), nil + if Client() != nil { + if p, ok := Client().(*nvmlProvider); ok && p.initialized { + slog.Info("NVML already initialized.") + return Client(), nil + } } slog.Info("Attempting to initialize NVML library.") @@ -79,10 +85,10 @@ func newNVMLProvider() (NVML, error) { return nvmlProvider{initialized: false}, err } - return nvmlProvider{initialized: true}, nil + return &nvmlProvider{initialized: true, migCache: make(map[string]*MIGDeviceInfo)}, nil } -func (n nvmlProvider) preCheck() error { +func (n *nvmlProvider) preCheck() error { if !n.initialized { return fmt.Errorf("NVML library not initialized") } @@ -91,18 +97,51 @@ func (n nvmlProvider) preCheck() error { } // GetMIGDeviceInfoByID returns information about MIG DEVICE by ID -func (n nvmlProvider) GetMIGDeviceInfoByID(uuid string) (*MIGDeviceInfo, error) { +func (n *nvmlProvider) GetMIGDeviceInfoByID(uuid string) (*MIGDeviceInfo, error) { if err := n.preCheck(); err != nil { slog.Error(fmt.Sprintf("failed to get MIG Device Info; err: %v", err)) return nil, err } + // Check cache first (including negative cache) + n.lock.RLock() + if info, ok := n.migCache[uuid]; ok { + n.lock.RUnlock() + if info == nil { + return nil, fmt.Errorf("previously failed to get MIG device info") + } + return info, nil + } + n.lock.RUnlock() + device, ret := nvml.DeviceGetHandleByUUID(uuid) if ret == nvml.SUCCESS { - return getMIGDeviceInfoForNewDriver(device) + info, err := getMIGDeviceInfoForNewDriver(device) + if err == nil { + n.lock.Lock() + n.migCache[uuid] = info + n.lock.Unlock() + return info, nil + } + // Negative cache on failure + n.lock.Lock() + n.migCache[uuid] = nil + n.lock.Unlock() + return nil, err } - return getMIGDeviceInfoForOldDriver(uuid) + info, err := getMIGDeviceInfoForOldDriver(uuid) + if err == nil { + n.lock.Lock() + n.migCache[uuid] = info + n.lock.Unlock() + return info, nil + } + // Negative cache on failure + n.lock.Lock() + n.migCache[uuid] = nil + n.lock.Unlock() + return nil, err } // getMIGDeviceInfoForNewDriver identifies MIG Device Information for drivers >= R470 (470.42.01+), @@ -167,7 +206,7 @@ func getMIGDeviceInfoForOldDriver(uuid string) (*MIGDeviceInfo, error) { } // Cleanup performs cleanup operations for the NVML provider -func (n nvmlProvider) Cleanup() { +func (n *nvmlProvider) Cleanup() { if err := n.preCheck(); err == nil { reset() } diff --git a/internal/pkg/server/server.go b/internal/pkg/server/server.go index 48292c8a..b9a66d5a 100644 --- a/internal/pkg/server/server.go +++ b/internal/pkg/server/server.go @@ -30,6 +30,8 @@ import ( "github.com/gorilla/mux" "github.com/prometheus/exporter-toolkit/web" + "github.com/NVIDIA/go-dcgm/pkg/dcgm" + "github.com/NVIDIA/dcgm-exporter/internal/pkg/appconfig" "github.com/NVIDIA/dcgm-exporter/internal/pkg/debug" "github.com/NVIDIA/dcgm-exporter/internal/pkg/devicewatchlistmanager" @@ -101,7 +103,20 @@ func NewMetricsServer( } } + if podMapper != nil { + if wl, exists := deviceWatchListManager.EntityWatchList(dcgm.FE_GPU); exists { + podMapper.DeviceInfo = wl.DeviceInfo() + } else { + slog.Warn("Could not find FE_GPU watchlist to configure PodMapper") + } + go podMapper.Run() + } + cleanup := func() { + if podMapper != nil { + slog.Info("Stopping PodMapper") + podMapper.Stop() + } if podMapper != nil && c.KubernetesEnableDRA && podMapper.ResourceSliceManager != nil { slog.Info("Stopping ResourceSliceManager") podMapper.ResourceSliceManager.Stop() diff --git a/internal/pkg/transformation/kubernetes.go b/internal/pkg/transformation/kubernetes.go index 74f92870..c44ac17a 100644 --- a/internal/pkg/transformation/kubernetes.go +++ b/internal/pkg/transformation/kubernetes.go @@ -23,6 +23,7 @@ import ( "log/slog" "maps" "net" + stdos "os" "regexp" "slices" "strings" @@ -98,6 +99,7 @@ func NewPodMapper(c *appconfig.Config) *PodMapper { podMapper := &PodMapper{ Config: c, labelFilterCache: newLabelFilterCache(c.KubernetesPodLabelAllowlistRegex, cacheSize), + stopChan: make(chan struct{}), } if !c.KubernetesEnablePodLabels && !c.KubernetesEnablePodUID && !c.KubernetesEnableDRA { @@ -177,15 +179,44 @@ func (p *PodMapper) Name() string { return "podMapper" } -func (p *PodMapper) Process(metrics collector.MetricsByCounter, deviceInfo deviceinfo.Provider) error { +// Run executes background cache updates on a fixed interval +func (p *PodMapper) Run() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + if p.DeviceInfo != nil { + if err := p.updateCache(p.DeviceInfo); err != nil { + slog.Warn("Failed to update pod mapper cache", "error", err) + } + } else { + slog.Warn("DeviceInfo provider not set for PodMapper, skipping initial update") + } + + for { + select { + case <-p.stopChan: + return + case <-ticker.C: + if p.DeviceInfo != nil { + if err := p.updateCache(p.DeviceInfo); err != nil { + slog.Warn("Failed to update pod mapper cache", "error", err) + } + } + } + } +} + +// Stop stops the background updater +func (p *PodMapper) Stop() { close(p.stopChan) } + +// updateCache refreshes device-to-pod mappings from kubelet +func (p *PodMapper) updateCache(deviceInfo deviceinfo.Provider) error { socketPath := p.Config.PodResourcesKubeletSocket - _, err := os.Stat(socketPath) - if os.IsNotExist(err) { - slog.Info("No Kubelet socket, ignoring") + _, err := stdos.Stat(socketPath) + if stdos.IsNotExist(err) { return nil } - // TODO: This needs to be moved out of the critical path. c, cleanup, err := connectToServer(socketPath) if err != nil { return err @@ -197,73 +228,49 @@ func (p *PodMapper) Process(metrics collector.MetricsByCounter, deviceInfo devic return err } - // Log detailed GPU allocation information for debugging purposes - slog.Debug("Pod resources API response details", - "podsWithResources", len(pods.GetPodResources()), - "fullResponse", fmt.Sprintf("%+v", pods)) - - // Log device plugin status and GPU allocation details - totalGPUsAllocated := 0 - totalContainersWithGPUs := 0 - podGPUCounts := make(map[string]int) // Track GPU count per pod + var deviceToPods map[string][]PodInfo + var deviceToPod map[string]PodInfo + var deviceToPodsDRA map[string][]PodInfo - p.iterateGPUDevices(pods, func(pod *podresourcesapi.PodResources, container *podresourcesapi.ContainerResources, device *podresourcesapi.ContainerDevices) { - podKey := pod.GetNamespace() + "/" + pod.GetName() - podGPUCounts[podKey] += len(device.GetDeviceIds()) - totalContainersWithGPUs++ - slog.Debug("Found GPU device allocation", - "pod", pod.GetName(), - "namespace", pod.GetNamespace(), - "container", container.GetName(), - "resourceName", device.GetResourceName(), - "deviceIds", device.GetDeviceIds()) - }) + if p.Config.KubernetesVirtualGPUs { + deviceToPods = p.toDeviceToSharingPods(pods, deviceInfo) + } else { + deviceToPod = p.toDeviceToPod(pods, deviceInfo) + } - // Log per-pod GPU allocation status - for _, pod := range pods.GetPodResources() { - podKey := pod.GetNamespace() + "/" + pod.GetName() - podGPUs := podGPUCounts[podKey] - if podGPUs > 0 { - totalGPUsAllocated += podGPUs - slog.Debug("Pod has GPU allocations", - "pod", pod.GetName(), - "namespace", pod.GetNamespace(), - "totalGPUs", podGPUs) - } else { - slog.Debug("Pod has NO GPU allocations", - "pod", pod.GetName(), - "namespace", pod.GetNamespace(), - "totalContainers", len(pod.GetContainers())) - } + if p.Config.KubernetesEnableDRA { + deviceToPodsDRA = p.toDeviceToPodsDRA(pods) } - slog.Debug("GPU allocation summary", - "totalPods", len(pods.GetPodResources()), - "totalGPUsAllocated", totalGPUsAllocated, - "totalContainersWithGPUs", totalContainersWithGPUs, - "devicePluginWorking", totalGPUsAllocated > 0) + p.mu.Lock() + p.deviceToPods = deviceToPods + p.deviceToPod = deviceToPod + p.deviceToPodsDRA = deviceToPodsDRA + p.mu.Unlock() + return nil +} - if p.Config.KubernetesVirtualGPUs { - deviceToPods := p.toDeviceToSharingPods(pods, deviceInfo) +// Process annotates metrics using cached device-to-pod mappings (no kubelet calls) +func (p *PodMapper) Process(metrics collector.MetricsByCounter, _ deviceinfo.Provider) error { + p.mu.RLock() + deviceToPods := p.deviceToPods + deviceToPod := p.deviceToPod + deviceToPodsDRA := p.deviceToPodsDRA + p.mu.RUnlock() + if p.Config.KubernetesVirtualGPUs { + if deviceToPods == nil { + return nil + } slog.Debug(fmt.Sprintf("Device to sharing pods mapping: %+v", deviceToPods)) - - // For each counter metric, init a slice to collect metrics to associate with shared virtual GPUs. for counter := range metrics { var newmetrics []collector.Metric - // For each instrumented device, build list of metrics and create - // new metrics for any shared GPUs. for j, val := range metrics[counter] { deviceID, err := val.GetIDOfType(p.Config.KubernetesGPUIdType) if err != nil { return err } - podInfos := deviceToPods[deviceID] - // For all containers using the GPU, extract and annotate a metric - // with the container info and the shared GPU label, if it exists. - // Notably, this will increase the number of unique metrics (i.e. labelsets) - // to by the number of containers sharing the GPU. for _, pi := range podInfos { metric, err := utils.DeepCopy(metrics[counter][j]) if err != nil { @@ -285,10 +292,6 @@ func (p *PodMapper) Process(metrics collector.MetricsByCounter, deviceInfo devic newmetrics = append(newmetrics, metric) } } - // Upsert the annotated series into the final map only if we found any - // pods using the devices for the metric. Otherwise, leave the original - // metric unmodified so we still have monitoring when pods aren't using - // GPUs. if len(newmetrics) > 0 { metrics[counter] = newmetrics } @@ -297,99 +300,79 @@ func (p *PodMapper) Process(metrics collector.MetricsByCounter, deviceInfo devic } slog.Debug("KubernetesVirtualGPUs is disabled, using device to pod mapping") - - deviceToPod := p.toDeviceToPod(pods, deviceInfo) - - slog.Debug(fmt.Sprintf("Device to pod mapping: %+v", deviceToPod)) - - // Note: for loop are copies the value, if we want to change the value - // and not the copy, we need to use the indexes - for counter := range metrics { - for j, val := range metrics[counter] { - deviceID, err := val.GetIDOfType(p.Config.KubernetesGPUIdType) - if err != nil { - return err - } - podInfo, exists := deviceToPod[deviceID] - if exists { - if !p.Config.UseOldNamespace { - metrics[counter][j].Attributes[podAttribute] = podInfo.Name - metrics[counter][j].Attributes[namespaceAttribute] = podInfo.Namespace - metrics[counter][j].Attributes[containerAttribute] = podInfo.Container - } else { - metrics[counter][j].Attributes[oldPodAttribute] = podInfo.Name - metrics[counter][j].Attributes[oldNamespaceAttribute] = podInfo.Namespace - metrics[counter][j].Attributes[oldContainerAttribute] = podInfo.Container - } - - metrics[counter][j].Attributes[uidAttribute] = podInfo.UID - maps.Copy(metrics[counter][j].Labels, podInfo.Labels) - } - } - } - - if p.Config.KubernetesEnableDRA { - deviceToPodsDRA := p.toDeviceToPodsDRA(pods) - slog.Debug(fmt.Sprintf("Device to pod mapping for DRA: %+v", deviceToPodsDRA)) - + if deviceToPod != nil { + slog.Debug(fmt.Sprintf("Device to pod mapping: %+v", deviceToPod)) for counter := range metrics { - var newmetrics []collector.Metric - // For each instrumented device, build list of metrics and create - // new metrics for any shared GPUs. for j, val := range metrics[counter] { deviceID, err := val.GetIDOfType(p.Config.KubernetesGPUIdType) if err != nil { return err } + if podInfo, exists := deviceToPod[deviceID]; exists { + if !p.Config.UseOldNamespace { + metrics[counter][j].Attributes[podAttribute] = podInfo.Name + metrics[counter][j].Attributes[namespaceAttribute] = podInfo.Namespace + metrics[counter][j].Attributes[containerAttribute] = podInfo.Container + } else { + metrics[counter][j].Attributes[oldPodAttribute] = podInfo.Name + metrics[counter][j].Attributes[oldNamespaceAttribute] = podInfo.Namespace + metrics[counter][j].Attributes[oldContainerAttribute] = podInfo.Container + } + metrics[counter][j].Attributes[uidAttribute] = podInfo.UID + maps.Copy(metrics[counter][j].Labels, podInfo.Labels) + } + } + } + } - podInfos := deviceToPodsDRA[deviceID] - // For all containers using the GPU, extract and annotate a metric - // with the container info and the shared GPU label, if it exists. - // Notably, this will increase the number of unique metrics (i.e. labelsets) - // to by the number of containers sharing the GPU. - if podInfos != nil { - for _, pi := range podInfos { - metric, err := utils.DeepCopy(metrics[counter][j]) - if err != nil { - return err - } - if !p.Config.UseOldNamespace { - metric.Attributes[podAttribute] = pi.Name - metric.Attributes[namespaceAttribute] = pi.Namespace - metric.Attributes[containerAttribute] = pi.Container - } else { - metric.Attributes[oldPodAttribute] = pi.Name - metric.Attributes[oldNamespaceAttribute] = pi.Namespace - metric.Attributes[oldContainerAttribute] = pi.Container - } - if dr := pi.DynamicResources; dr != nil { - metric.Attributes[draClaimName] = dr.ClaimName - metric.Attributes[draClaimNamespace] = dr.ClaimNamespace - metric.Attributes[draDriverName] = dr.DriverName - metric.Attributes[draPoolName] = dr.PoolName - metric.Attributes[draDeviceName] = dr.DeviceName - - // Add MIG-specific labels if this is a MIG device - if migInfo := dr.MIGInfo; migInfo != nil { - metric.Attributes[draMigProfile] = migInfo.Profile - metric.Attributes[draMigDeviceUUID] = migInfo.MIGDeviceUUID + if p.Config.KubernetesEnableDRA { + if deviceToPodsDRA != nil { + slog.Debug(fmt.Sprintf("Device to pod mapping for DRA: %+v", deviceToPodsDRA)) + for counter := range metrics { + var newmetrics []collector.Metric + for j, val := range metrics[counter] { + deviceID, err := val.GetIDOfType(p.Config.KubernetesGPUIdType) + if err != nil { + return err + } + podInfos := deviceToPodsDRA[deviceID] + if podInfos != nil { + for _, pi := range podInfos { + metric, err := utils.DeepCopy(metrics[counter][j]) + if err != nil { + return err } + if !p.Config.UseOldNamespace { + metric.Attributes[podAttribute] = pi.Name + metric.Attributes[namespaceAttribute] = pi.Namespace + metric.Attributes[containerAttribute] = pi.Container + } else { + metric.Attributes[oldPodAttribute] = pi.Name + metric.Attributes[oldNamespaceAttribute] = pi.Namespace + metric.Attributes[oldContainerAttribute] = pi.Container + } + if dr := pi.DynamicResources; dr != nil { + metric.Attributes[draClaimName] = dr.ClaimName + metric.Attributes[draClaimNamespace] = dr.ClaimNamespace + metric.Attributes[draDriverName] = dr.DriverName + metric.Attributes[draPoolName] = dr.PoolName + metric.Attributes[draDeviceName] = dr.DeviceName + if migInfo := dr.MIGInfo; migInfo != nil { + metric.Attributes[draMigProfile] = migInfo.Profile + metric.Attributes[draMigDeviceUUID] = migInfo.MIGDeviceUUID + } + } + newmetrics = append(newmetrics, metric) } - newmetrics = append(newmetrics, metric) + } else { + newmetrics = append(newmetrics, metrics[counter][j]) } - } else { - newmetrics = append(newmetrics, metrics[counter][j]) } - } - // Upsert the annotated series into the final map only if we found any - // pods using the devices for the metric. Otherwise, leave the original - // metric unmodified so we still have monitoring when pods aren't using - // GPUs. - if len(newmetrics) > 0 { - metrics[counter] = newmetrics + if len(newmetrics) > 0 { + metrics[counter] = newmetrics + } } } - return nil } return nil diff --git a/internal/pkg/transformation/types.go b/internal/pkg/transformation/types.go index e49b7599..b82dd645 100644 --- a/internal/pkg/transformation/types.go +++ b/internal/pkg/transformation/types.go @@ -43,6 +43,13 @@ type PodMapper struct { Client kubernetes.Interface ResourceSliceManager *DRAResourceSliceManager labelFilterCache *LabelFilterCache + // Async processing fields + mu sync.RWMutex + stopChan chan struct{} + DeviceInfo deviceinfo.Provider + deviceToPod map[string]PodInfo + deviceToPods map[string][]PodInfo + deviceToPodsDRA map[string][]PodInfo } // LabelFilterCache provides efficient caching for label filtering decisions