Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions cmd/agent/daemon/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ type EnricherConfig struct {
}

type NetflowConfig struct {
Enabled bool `json:"enabled"`
SampleSubmitIntervalSeconds uint64 `json:"sampleSubmitIntervalSeconds"`
ExportInterval time.Duration `json:"exportInterval"`
Grouping ebpftracer.NetflowGrouping `json:"grouping"`
MaxPublicIPs int16 `json:"maxPublicIPs"`
Enabled bool `json:"enabled"`
SampleSubmitIntervalSeconds uint64 `json:"sampleSubmitIntervalSeconds"`
ExportInterval time.Duration `json:"exportInterval"`
Grouping ebpftracer.NetflowGrouping `json:"grouping"`
MaxPublicIPs int16 `json:"maxPublicIPs"`
SkipPrivateDestinationCidrCheck bool `json:"skipPrivateDestinationCidrCheck"`
}

type ClickhouseConfig struct {
Expand Down
22 changes: 12 additions & 10 deletions cmd/agent/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ func NewRunCommand(version string) *cobra.Command {
gitCloneDetectionSignatureRedactPasswords = command.Flags().Bool("signature-git-clone-detection-redact-password", true, "If enabled, any password passed via the URL gets redacted")
ingressNightmareExploitDetectionSignatureEnabled = command.Flags().Bool("signature-ingress-nightmare-exploit-detection-enabled", true, "Enables the detection signature to detect exploits of ingress nightmare")

netflowEnabled = command.Flags().Bool("netflow-enabled", false, "Enables netflow tracking")
netflowSampleSubmitIntervalSeconds = command.Flags().Uint64("netflow-sample-submit-interval-seconds", 15, "Netflow sample submit interval")
netflowExportInterval = command.Flags().Duration("netflow-export-interval", 15*time.Second, "Netflow export interval")
netflowMaxPublicIPsBucket = command.Flags().Int16("netflow-max-public-ips-bucket", -1, "Maximum number of unique public IPs destination before aggregating into 0.0.0.0 range")
netflowGrouping = ebpftracer.NetflowGroupingDropSrcPort
netflowEnabled = command.Flags().Bool("netflow-enabled", false, "Enables netflow tracking")
netflowSampleSubmitIntervalSeconds = command.Flags().Uint64("netflow-sample-submit-interval-seconds", 15, "Netflow sample submit interval")
netflowExportInterval = command.Flags().Duration("netflow-export-interval", 15*time.Second, "Netflow export interval")
netflowMaxPublicIPsBucket = command.Flags().Int16("netflow-max-public-ips-bucket", -1, "Maximum number of unique public IPs destination before aggregating into 0.0.0.0 range")
netflowSkipPrivateDestinationCidrCheck = command.Flags().Bool("netflow-skip-dest-cidr-check", false, "Skip checking private destination CIDR before enriching with Kubernetes context")
netflowGrouping = ebpftracer.NetflowGroupingDropSrcPort

eventsBatchSize = command.Flags().Int("events-batch-size", 1000, "Events batch size before exporting")
eventsFlushInterval = command.Flags().Duration("events-flush-interval", 5*time.Second, "Events flush interval")
Expand Down Expand Up @@ -205,11 +206,12 @@ func NewRunCommand(version string) *cobra.Command {
RedactSensitiveValuesRegex: redactSensitiveValuesRegex,
},
Netflow: config.NetflowConfig{
Enabled: *netflowEnabled,
SampleSubmitIntervalSeconds: *netflowSampleSubmitIntervalSeconds,
Grouping: netflowGrouping,
ExportInterval: *netflowExportInterval,
MaxPublicIPs: *netflowMaxPublicIPsBucket,
Enabled: *netflowEnabled,
SampleSubmitIntervalSeconds: *netflowSampleSubmitIntervalSeconds,
Grouping: netflowGrouping,
ExportInterval: *netflowExportInterval,
MaxPublicIPs: *netflowMaxPublicIPsBucket,
SkipPrivateDestinationCidrCheck: *netflowSkipPrivateDestinationCidrCheck,
},
Clickhouse: config.ClickhouseConfig{
Addr: *clickhouseAddr,
Expand Down
61 changes: 43 additions & 18 deletions cmd/agent/daemon/state/netflow_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/castai/kvisor/pkg/net/iputil"
"golang.org/x/sync/errgroup"
"golang.org/x/sys/unix"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func (c *Controller) getClusterInfo(ctx context.Context) (*clusterInfo, error) {
Expand Down Expand Up @@ -101,7 +103,7 @@ type netflowVal struct {

func (c *Controller) enqueueNetworkSummaryExport(ctx context.Context, keys []ebpftracer.TrafficKey, vals []ebpftracer.TrafficSummary) {
start := time.Now()
podsByIPCache := map[netip.Addr]*kubepb.IPInfo{}
ipCache := map[netip.Addr]*kubepb.IPInfo{}

type cgroupID = uint64
netflows := map[cgroupID]*netflowVal{}
Expand All @@ -120,7 +122,7 @@ func (c *Controller) enqueueNetworkSummaryExport(ctx context.Context, keys []ebp
netflow = val
}

dest, isPublicDest, err := c.toNetflowDestination(key, summary, podsByIPCache)
dest, isPublicDest, err := c.toNetflowDestination(key, summary, ipCache)
if err != nil {
c.log.Errorf("cannot parse netflow destination: %v", err)
continue
Expand Down Expand Up @@ -227,7 +229,7 @@ func (c *Controller) toNetflow(ctx context.Context, key ebpftracer.TrafficKey, t
return res, nil
}

func (c *Controller) toNetflowDestination(key ebpftracer.TrafficKey, summary ebpftracer.TrafficSummary, podsByIPCache map[netip.Addr]*kubepb.IPInfo) (*castpb.NetflowDestination, bool, error) {
func (c *Controller) toNetflowDestination(key ebpftracer.TrafficKey, summary ebpftracer.TrafficSummary, ipCache map[netip.Addr]*kubepb.IPInfo) (*castpb.NetflowDestination, bool, error) {
localIP := parseAddr(key.Tuple.Saddr.Raw, key.Tuple.Family)
if !localIP.IsValid() {
return nil, false, fmt.Errorf("got invalid local addr `%v`", key.Tuple.Saddr.Raw)
Expand Down Expand Up @@ -258,23 +260,40 @@ func (c *Controller) toNetflowDestination(key ebpftracer.TrafficKey, summary ebp
RxPackets: summary.RxPackets,
}

if c.clusterInfo != nil && (c.clusterInfo.serviceCidrContains(remote.Addr()) || c.clusterInfo.podCidrContains(remote.Addr())) {
ipInfo, found := c.getIPInfo(podsByIPCache, remote.Addr())
if found {
destination.PodName = ipInfo.PodName
destination.Namespace = ipInfo.Namespace
destination.WorkloadName = ipInfo.WorkloadName
destination.WorkloadKind = ipInfo.WorkloadKind
destination.Zone = ipInfo.Zone
destination.NodeName = ipInfo.NodeName
}
}

isPublicDst := !iputil.IsPrivateNetwork(remote.Addr())

c.enrichNetflowDestinationWithKubeContext(ipCache, destination, remote.Addr(), isPublicDst)

return destination, isPublicDst, nil
}

func (c *Controller) enrichNetflowDestinationWithKubeContext(ipCache map[netip.Addr]*kubepb.IPInfo, destination *castpb.NetflowDestination, dstAddr netip.Addr, isPublicDst bool) {
// We do not enrich public destinations.
if isPublicDst {
return
}

// Check if destination is part of k8s services or pods cidr.
if !c.cfg.Netflow.SkipPrivateDestinationCidrCheck {
if c.clusterInfo == nil {
return
}
if !c.clusterInfo.serviceCidrContains(dstAddr) && !c.clusterInfo.podCidrContains(dstAddr) {
return
}
}

ipInfo, found := c.getIPInfo(ipCache, dstAddr)
if found && ipInfo != nil {
destination.PodName = ipInfo.PodName
destination.Namespace = ipInfo.Namespace
destination.WorkloadName = ipInfo.WorkloadName
destination.WorkloadKind = ipInfo.WorkloadKind
destination.Zone = ipInfo.Zone
destination.NodeName = ipInfo.NodeName
}
}

func parseAddr(data [16]byte, family uint16) netip.Addr {
switch family {
case uint16(types.AF_INET):
Expand All @@ -286,18 +305,24 @@ func parseAddr(data [16]byte, family uint16) netip.Addr {
return netip.Addr{}
}

func (c *Controller) getIPInfo(podsByIPCache map[netip.Addr]*kubepb.IPInfo, addr netip.Addr) (*kubepb.IPInfo, bool) {
ipInfo, found := podsByIPCache[addr]
func (c *Controller) getIPInfo(ipCache map[netip.Addr]*kubepb.IPInfo, addr netip.Addr) (*kubepb.IPInfo, bool) {
ipInfo, found := ipCache[addr]
if !found {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
resp, err := c.kubeClient.GetIPInfo(ctx, &kubepb.GetIPInfoRequest{Ip: addr.Unmap().AsSlice()})
if err != nil {
// In case we can't find ip info in controller we should still add it to the cache.
// This is needed to prevent calls to kvisor controller for some unknown ips.
if status.Code(err) == codes.NotFound {
ipCache[addr] = nil
return nil, true
}
metrics.AgentFetchKubeIPInfoErrorsTotal.Inc()
return nil, false
}
ipInfo = resp.Info
podsByIPCache[addr] = ipInfo
ipCache[addr] = ipInfo
}
return ipInfo, true
}
Expand Down