diff --git a/cmd/agent/daemon/config/config.go b/cmd/agent/daemon/config/config.go index fbf602f6..55b5e252 100644 --- a/cmd/agent/daemon/config/config.go +++ b/cmd/agent/daemon/config/config.go @@ -68,11 +68,12 @@ type EnricherConfig struct { } type NetflowConfig struct { - Enabled bool `json:"enabled"` - SampleSubmitIntervalSeconds uint64 `json:"sampleSubmitIntervalSeconds"` - ExportInterval time.Duration `json:"exportInterval"` - Grouping ebpftracer.NetflowGrouping `json:"grouping"` - MaxPublicIPs int16 `json:"maxPublicIPs"` + Enabled bool `json:"enabled"` + SampleSubmitIntervalSeconds uint64 `json:"sampleSubmitIntervalSeconds"` + ExportInterval time.Duration `json:"exportInterval"` + Grouping ebpftracer.NetflowGrouping `json:"grouping"` + MaxPublicIPs int16 `json:"maxPublicIPs"` + SkipPrivateDestinationCidrCheck bool `json:"skipPrivateDestinationCidrCheck"` } type ClickhouseConfig struct { diff --git a/cmd/agent/daemon/daemon.go b/cmd/agent/daemon/daemon.go index c963bf58..a84e1f10 100644 --- a/cmd/agent/daemon/daemon.go +++ b/cmd/agent/daemon/daemon.go @@ -94,11 +94,12 @@ func NewRunCommand(version string) *cobra.Command { gitCloneDetectionSignatureRedactPasswords = command.Flags().Bool("signature-git-clone-detection-redact-password", true, "If enabled, any password passed via the URL gets redacted") ingressNightmareExploitDetectionSignatureEnabled = command.Flags().Bool("signature-ingress-nightmare-exploit-detection-enabled", true, "Enables the detection signature to detect exploits of ingress nightmare") - netflowEnabled = command.Flags().Bool("netflow-enabled", false, "Enables netflow tracking") - netflowSampleSubmitIntervalSeconds = command.Flags().Uint64("netflow-sample-submit-interval-seconds", 15, "Netflow sample submit interval") - netflowExportInterval = command.Flags().Duration("netflow-export-interval", 15*time.Second, "Netflow export interval") - netflowMaxPublicIPsBucket = command.Flags().Int16("netflow-max-public-ips-bucket", -1, "Maximum number of unique public IPs destination before aggregating into 0.0.0.0 range") - netflowGrouping = ebpftracer.NetflowGroupingDropSrcPort + netflowEnabled = command.Flags().Bool("netflow-enabled", false, "Enables netflow tracking") + netflowSampleSubmitIntervalSeconds = command.Flags().Uint64("netflow-sample-submit-interval-seconds", 15, "Netflow sample submit interval") + netflowExportInterval = command.Flags().Duration("netflow-export-interval", 15*time.Second, "Netflow export interval") + netflowMaxPublicIPsBucket = command.Flags().Int16("netflow-max-public-ips-bucket", -1, "Maximum number of unique public IPs destination before aggregating into 0.0.0.0 range") + netflowSkipPrivateDestinationCidrCheck = command.Flags().Bool("netflow-skip-dest-cidr-check", false, "Skip checking private destination CIDR before enriching with Kubernetes context") + netflowGrouping = ebpftracer.NetflowGroupingDropSrcPort eventsBatchSize = command.Flags().Int("events-batch-size", 1000, "Events batch size before exporting") eventsFlushInterval = command.Flags().Duration("events-flush-interval", 5*time.Second, "Events flush interval") @@ -205,11 +206,12 @@ func NewRunCommand(version string) *cobra.Command { RedactSensitiveValuesRegex: redactSensitiveValuesRegex, }, Netflow: config.NetflowConfig{ - Enabled: *netflowEnabled, - SampleSubmitIntervalSeconds: *netflowSampleSubmitIntervalSeconds, - Grouping: netflowGrouping, - ExportInterval: *netflowExportInterval, - MaxPublicIPs: *netflowMaxPublicIPsBucket, + Enabled: *netflowEnabled, + SampleSubmitIntervalSeconds: *netflowSampleSubmitIntervalSeconds, + Grouping: netflowGrouping, + ExportInterval: *netflowExportInterval, + MaxPublicIPs: *netflowMaxPublicIPsBucket, + SkipPrivateDestinationCidrCheck: *netflowSkipPrivateDestinationCidrCheck, }, Clickhouse: config.ClickhouseConfig{ Addr: *clickhouseAddr, diff --git a/cmd/agent/daemon/state/netflow_pipeline.go b/cmd/agent/daemon/state/netflow_pipeline.go index 72288bd7..18177475 100644 --- a/cmd/agent/daemon/state/netflow_pipeline.go +++ b/cmd/agent/daemon/state/netflow_pipeline.go @@ -17,6 +17,8 @@ import ( "github.com/castai/kvisor/pkg/net/iputil" "golang.org/x/sync/errgroup" "golang.org/x/sys/unix" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func (c *Controller) getClusterInfo(ctx context.Context) (*clusterInfo, error) { @@ -101,7 +103,7 @@ type netflowVal struct { func (c *Controller) enqueueNetworkSummaryExport(ctx context.Context, keys []ebpftracer.TrafficKey, vals []ebpftracer.TrafficSummary) { start := time.Now() - podsByIPCache := map[netip.Addr]*kubepb.IPInfo{} + ipCache := map[netip.Addr]*kubepb.IPInfo{} type cgroupID = uint64 netflows := map[cgroupID]*netflowVal{} @@ -120,7 +122,7 @@ func (c *Controller) enqueueNetworkSummaryExport(ctx context.Context, keys []ebp netflow = val } - dest, isPublicDest, err := c.toNetflowDestination(key, summary, podsByIPCache) + dest, isPublicDest, err := c.toNetflowDestination(key, summary, ipCache) if err != nil { c.log.Errorf("cannot parse netflow destination: %v", err) continue @@ -227,7 +229,7 @@ func (c *Controller) toNetflow(ctx context.Context, key ebpftracer.TrafficKey, t return res, nil } -func (c *Controller) toNetflowDestination(key ebpftracer.TrafficKey, summary ebpftracer.TrafficSummary, podsByIPCache map[netip.Addr]*kubepb.IPInfo) (*castpb.NetflowDestination, bool, error) { +func (c *Controller) toNetflowDestination(key ebpftracer.TrafficKey, summary ebpftracer.TrafficSummary, ipCache map[netip.Addr]*kubepb.IPInfo) (*castpb.NetflowDestination, bool, error) { localIP := parseAddr(key.Tuple.Saddr.Raw, key.Tuple.Family) if !localIP.IsValid() { return nil, false, fmt.Errorf("got invalid local addr `%v`", key.Tuple.Saddr.Raw) @@ -258,23 +260,40 @@ func (c *Controller) toNetflowDestination(key ebpftracer.TrafficKey, summary ebp RxPackets: summary.RxPackets, } - if c.clusterInfo != nil && (c.clusterInfo.serviceCidrContains(remote.Addr()) || c.clusterInfo.podCidrContains(remote.Addr())) { - ipInfo, found := c.getIPInfo(podsByIPCache, remote.Addr()) - if found { - destination.PodName = ipInfo.PodName - destination.Namespace = ipInfo.Namespace - destination.WorkloadName = ipInfo.WorkloadName - destination.WorkloadKind = ipInfo.WorkloadKind - destination.Zone = ipInfo.Zone - destination.NodeName = ipInfo.NodeName - } - } - isPublicDst := !iputil.IsPrivateNetwork(remote.Addr()) + c.enrichNetflowDestinationWithKubeContext(ipCache, destination, remote.Addr(), isPublicDst) + return destination, isPublicDst, nil } +func (c *Controller) enrichNetflowDestinationWithKubeContext(ipCache map[netip.Addr]*kubepb.IPInfo, destination *castpb.NetflowDestination, dstAddr netip.Addr, isPublicDst bool) { + // We do not enrich public destinations. + if isPublicDst { + return + } + + // Check if destination is part of k8s services or pods cidr. + if !c.cfg.Netflow.SkipPrivateDestinationCidrCheck { + if c.clusterInfo == nil { + return + } + if !c.clusterInfo.serviceCidrContains(dstAddr) && !c.clusterInfo.podCidrContains(dstAddr) { + return + } + } + + ipInfo, found := c.getIPInfo(ipCache, dstAddr) + if found && ipInfo != nil { + destination.PodName = ipInfo.PodName + destination.Namespace = ipInfo.Namespace + destination.WorkloadName = ipInfo.WorkloadName + destination.WorkloadKind = ipInfo.WorkloadKind + destination.Zone = ipInfo.Zone + destination.NodeName = ipInfo.NodeName + } +} + func parseAddr(data [16]byte, family uint16) netip.Addr { switch family { case uint16(types.AF_INET): @@ -286,18 +305,24 @@ func parseAddr(data [16]byte, family uint16) netip.Addr { return netip.Addr{} } -func (c *Controller) getIPInfo(podsByIPCache map[netip.Addr]*kubepb.IPInfo, addr netip.Addr) (*kubepb.IPInfo, bool) { - ipInfo, found := podsByIPCache[addr] +func (c *Controller) getIPInfo(ipCache map[netip.Addr]*kubepb.IPInfo, addr netip.Addr) (*kubepb.IPInfo, bool) { + ipInfo, found := ipCache[addr] if !found { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() resp, err := c.kubeClient.GetIPInfo(ctx, &kubepb.GetIPInfoRequest{Ip: addr.Unmap().AsSlice()}) if err != nil { + // In case we can't find ip info in controller we should still add it to the cache. + // This is needed to prevent calls to kvisor controller for some unknown ips. + if status.Code(err) == codes.NotFound { + ipCache[addr] = nil + return nil, true + } metrics.AgentFetchKubeIPInfoErrorsTotal.Inc() return nil, false } ipInfo = resp.Info - podsByIPCache[addr] = ipInfo + ipCache[addr] = ipInfo } return ipInfo, true }