diff --git a/cmd/host-agent/main.go b/cmd/host-agent/main.go index 72b557a1..9ed19c58 100644 --- a/cmd/host-agent/main.go +++ b/cmd/host-agent/main.go @@ -11,6 +11,7 @@ import ( "strconv" "sync" + "github.com/k0kubun/pp" "github.com/middleware-labs/mw-agent/pkg/agent" "github.com/middleware-labs/synthetics-agent/pkg/worker" "gopkg.in/natefinch/lumberjack.v2" @@ -18,7 +19,6 @@ import ( "github.com/kardianos/service" cli "github.com/urfave/cli/v2" "github.com/urfave/cli/v2/altsrc" - "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -45,7 +45,17 @@ func (p *program) Start(s service.Service) error { p.programWG.Add(1) go p.run() - + if p.hostAgent.EnableInjector { + pp.Println("Oh baby we injecting .....") + p.programWG.Add(1) + go func() { + p.hostAgent.ReportServices(p.errCh, p.stopCh) + p.programWG.Done() + }() + p.logger.Info("Oh baby we injectin' real hard...") + } else { + p.logger.Info("injector status reporting disabled") + } // Start any goroutines that can control collection if p.hostAgent.FetchAccountOtelConfig { // Listen to the config changes provided by Middleware API @@ -135,6 +145,14 @@ func getFlags(execPath string, cfg *agent.HostConfig) []cli.Flag { DefaultText: "true", Value: true, }), + altsrc.NewBoolFlag(&cli.BoolFlag{ + Name: "enable_injector", + EnvVars: []string{"MW_ENABLE_INJECTOR "}, + Usage: "Enables the mw-injector", + Destination: &cfg.EnableInjector, + DefaultText: "true", + Value: true, + }), altsrc.NewStringFlag(&cli.StringFlag{ Name: "docker-endpoint", EnvVars: []string{"MW_DOCKER_ENDPOINT"}, diff --git a/go.mod b/go.mod index 10c3b7a9..eaeb3326 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,6 @@ module github.com/middleware-labs/mw-agent -go 1.24.2 - -toolchain go1.24.5 +go 1.25.3 replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter => github.com/middleware-labs/opentelemetry-collector-contrib/internal/filter v0.0.0-20251119125747-84554b33c7be @@ -42,6 +40,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/reso replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders => github.com/middleware-labs/opentelemetry-collector-contrib/internal/metadataproviders v0.0.0-20251119125747-84554b33c7be +replace github.com/middleware-labs/java-injector => ../mw-injector + replace go.opentelemetry.io/collector => go.opentelemetry.io/collector v0.139.0 require ( @@ -117,6 +117,8 @@ require ( ) require ( + github.com/k0kubun/pp v3.0.1+incompatible + github.com/middleware-labs/java-injector v0.0.0-20251201104016-5041a9e06475 github.com/middleware-labs/synthetics-agent v1.0.56 github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor v0.139.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver v0.139.0 @@ -304,7 +306,6 @@ require ( github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/julienschmidt/httprouter v1.3.0 // indirect - github.com/k0kubun/pp v3.0.1+incompatible // indirect github.com/klauspost/compress v1.18.1 // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/knadh/koanf v1.5.0 // indirect diff --git a/pkg/agent/definitions.go b/pkg/agent/definitions.go index 0d79086f..f45fad64 100644 --- a/pkg/agent/definitions.go +++ b/pkg/agent/definitions.go @@ -111,6 +111,7 @@ type BaseConfig struct { ProfilngServerURL string InternalMetricsPort uint EnableDataDogReceiver bool + EnableInjector bool } // String() implements stringer interface for BaseConfig @@ -427,3 +428,35 @@ func (p *Profiler) StartProfiling(appName string, target string, tags string) { p.Logger.Info("PROFILER: Running on mw-agent") } + +// ServiceSetting represents the detailed status for a single service/process. +type ServiceSetting struct { + PID int `json:"pid"` + ServiceName string `json:"service_name"` + Owner string `json:"owner"` + Status string `json:"status"` + Enabled bool `json:"enabled"` + ServiceType string `json:"service_type"` + Language string `json:"language"` + RuntimeVersion string `json:"runtime_version"` + SystemdUnit string `json:"systemd_unit,omitempty"` + JarFile string `json:"jar_file,omitempty"` + MainClass string `json:"main_class,omitempty"` + HasAgent bool `json:"has_agent"` + IsMiddlewareAgent bool `json:"is_middleware_agent"` + AgentPath string `json:"agent_path,omitempty"` + ConfigPath string `json:"config_path,omitempty"` + Instrumented bool `json:"instrumented"` + Key string `json:"key"` +} + +// OSConfig represents the configuration and status for a specific OS (e.g., "linux"). +type OSConfig struct { + AgentRestartStatus bool `json:"agent_restart_status"` + AutoInstrumentationInit bool `json:"auto_instrumentation_init"` + AutoInstrumentationSettings map[string]ServiceSetting `json:"auto_instrumentation_settings"` + // Add other OS-specific fields (darwin, windows, k8s, etc.) if needed +} + +// AgentReportValue is the root structure for the 'value' field's JSON content. +type AgentReportValue map[string]OSConfig diff --git a/pkg/agent/hostagent.go b/pkg/agent/hostagent.go index 1ecad1e2..77b37400 100644 --- a/pkg/agent/hostagent.go +++ b/pkg/agent/hostagent.go @@ -3,6 +3,7 @@ package agent import ( "bytes" "context" + "encoding/base64" "encoding/json" "errors" "fmt" @@ -16,6 +17,8 @@ import ( "sync" "time" + "github.com/k0kubun/pp" + "github.com/middleware-labs/java-injector/pkg/discovery" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/provider/envprovider" @@ -46,6 +49,12 @@ type HostAgent struct { Version string } +type AgentSettingPayload struct { + Value string `json:"value"` // Base64 encoded config + MetaData map[string]interface{} `json:"meta_data"` + Config map[string]map[string]interface{} `json:"config"` // This field is technically redundant based on the API handler's logic but included for completeness if needed. +} + // HostOptions takes in various options for HostAgent type HostOptions func(h *HostAgent) @@ -202,9 +211,10 @@ type TrackingPayload struct { } var ( - apiPathForYAML = "api/v1/agent/ingestion-rules" - apiPathForRestart = "api/v1/agent/restart-status" - apiAgentTrack = "api/v1/agent/tracking" + apiPathForYAML = "api/v1/agent/ingestion-rules" + apiPathForAgentSetting = "api/v1/agent/public/setting/" + apiPathForRestart = "api/v1/agent/restart-status" + apiAgentTrack = "api/v1/agent/tracking" ) func (d IntegrationType) String() string { @@ -646,6 +656,274 @@ func (c *HostAgent) ListenForConfigChanges(errCh chan<- error, } } +func (c *HostAgent) ReportServices( + errCh chan<- error, + stopCh <-chan struct{}, +) error { + ticker := time.NewTicker(time.Second * 3) + pp.Println("we be snitchin'") + + for { + c.logger.Debug("we be snitchin'") + select { + case <-stopCh: + ticker.Stop() + return nil + case <-ticker.C: + err := c.ReportAgentStatusAPI() + errCh <- err + } + } + return nil +} + +func (c *HostAgent) GetAgentReportValue() (AgentReportValue, error) { + + // --- 1. Perform Process Discovery --- + ctx := context.Background() + + // a) Host Processes (Java) + processes, err := discovery.FindAllJavaProcesses(ctx) + if err != nil { + c.logger.Error("Failed to discover host Java processes", zap.Error(err)) + // Decide if this should be a fatal error or just logged (assuming logged for now) + } + + // b) Docker Containers (Java/Node) + dockerDiscoverer := discovery.NewDockerDiscoverer(ctx) + javaContainers, _ := dockerDiscoverer.DiscoverJavaContainers() // Error handling omitted for brevity + nodeContainers, _ := dockerDiscoverer.DiscoverNodeContainers() // Error handling omitted for brevity + + // --- 2. Convert to AgentReportValue (ServiceSetting) --- + osKey := runtime.GOOS + settings := map[string]ServiceSetting{} + + if c.EnableInjector { + + // Convert host processes + for _, proc := range processes { + // Only report processes we care about (non-Tomcat, non-Container for simplicity) + if !proc.IsTomcat() && !proc.ContainerInfo.IsContainer { + setting := c.convertJavaProcessToServiceSetting(proc) + settings[setting.Key] = setting + } + } + + // Convert Java containers + for _, container := range javaContainers { + // ContainerInfo includes the underlying JavaProcess + setting := c.convertJavaContainerToServiceSetting(container) + settings[setting.Key] = setting + } + + // Convert Node containers (Requires a separate conversion method) + for _, container := range nodeContainers { + setting := c.convertNodeContainerToServiceSetting(container) + settings[setting.Key] = setting + } + } + + reportValue := AgentReportValue{ + osKey: OSConfig{ + AgentRestartStatus: false, + AutoInstrumentationInit: c.EnableInjector, + AutoInstrumentationSettings: settings, + }, + } + + pp.Println(reportValue) + + return reportValue, nil +} + +// --- 3. Conversion Helper Methods (MUST be implemented) --- + +// Placeholder for logic that converts discovery.JavaProcess to ServiceSetting +func (c *HostAgent) convertJavaProcessToServiceSetting(proc discovery.JavaProcess) ServiceSetting { + // Generate a unique key for the service. The naming package helps here. + // e.g., key := naming.GenerateHostServiceKey(proc.ServiceName, "systemd", proc.PID) + key := fmt.Sprintf("host-%d", proc.ProcessPID) + + return ServiceSetting{ + PID: int(proc.ProcessPID), + ServiceName: proc.ServiceName, // Uses the discovered ServiceName + Owner: proc.ProcessOwner, + Status: proc.Status, + Enabled: true, // Assuming discovery means it's available for instrumentation + ServiceType: c.detectDeploymentType(&proc), // Need your detectDeploymentType helper from the ListAllCommand! + Language: "java", + RuntimeVersion: proc.ProcessRuntimeVersion, + JarFile: proc.JarFile, + MainClass: proc.MainClass, + HasAgent: proc.HasJavaAgent, + IsMiddlewareAgent: proc.IsMiddlewareAgent, + AgentPath: proc.JavaAgentPath, + Instrumented: proc.HasJavaAgent, // Can be refined + Key: key, + } +} + +// Placeholder for logic that converts discovery.Container to ServiceSetting +func (c *HostAgent) convertJavaContainerToServiceSetting(container discovery.DockerContainer) ServiceSetting { + // Generate a unique key for the container service + // key := container.ContainerID[:12] // Use short ID + key := container.ContainerID[:12] + // You can access the embedded JavaProcess like this: container.JavaProcess + + return ServiceSetting{ + ServiceName: container.ContainerName, + // ... fill other fields from container.ContainerInfo and container.JavaProcess ... + ServiceType: "docker", + Language: "java", + Key: key, + } +} + +func (c *HostAgent) convertNodeContainerToServiceSetting(container discovery.DockerContainer) ServiceSetting { + // Generate a unique key for the container service. We'll use the container's short ID. + containerID := container.ContainerID + key := "" + if len(containerID) >= 12 { + key = containerID[:12] // Use short ID as key + } else { + key = containerID // Fallback if ID is too short + } + + // Determine if the container is currently instrumented. + // The Container struct has an IsInstrumented field. + isInstrumented := container.Instrumented + + // Determine agent path (specific to Node.js agent, if instrumented) + agentPath := "" + if isInstrumented { + // Assume the Node agent path is known or retrievable from the container struct's details + // For a clean conversion, we'll use a placeholder or check a specific field if available. + // If the discovery struct doesn't expose the path, we infer a default or leave blank. + agentPath = container.NodeAgentPath // Assuming this field exists on the discovery.Container struct + if agentPath == "" { + agentPath = "/opt/opentelemetry/node_agent" // Common default location + } + } + + return ServiceSetting{ + // PID is not always relevant or stable for containers, often left 0 or 1 + PID: 0, + ServiceName: container.ContainerName, + Status: "running", // Containers are assumed running if discovered + Enabled: true, // Available for instrumentation + ServiceType: "docker", + Language: "nodejs", + RuntimeVersion: "", // Version often hard to determine from outside container, leave empty or look up + + // Tomcat/Systemd specific fields are omitted for Docker/Node + SystemdUnit: "", + JarFile: "", + MainClass: "", + + HasAgent: isInstrumented, + IsMiddlewareAgent: isInstrumented, // Assuming only Middleware agent is tracked + AgentPath: agentPath, + Instrumented: isInstrumented, + Key: fmt.Sprintf("docker-node-%s", key), // Unique and descriptive key prefix + } +} + +// Helper needed to determine deployment type (must be moved/re-implemented from ListAllCommand) +func (c *HostAgent) detectDeploymentType(proc *discovery.JavaProcess) string { + // Example from your original command structure: + // This logic needs to be available in the HostAgent struct + // Check if process is tied to systemd, or if it's standalone/managed + // For now, return a basic guess: + if proc.ProcessParentPID == 1 { + return "systemd" // Often the case for processes managed by init systems + } + return "standalone" +} + +// ReportAgentStatusAPI makes the POST request to update the agent's settings/status. +func (c *HostAgent) ReportAgentStatusAPI() error { + // NOTE: Requires "net/url", "net/http", "bytes", "encoding/json", + // "encoding/base64", "runtime", and "time" imports. + + hostname := GetHostnameForPlatform(c.InfraPlatform) + + // 1. Construct the target URL: BASE_URL/api/v1/agent/setting/TOKEN/HOST_ID + u, err := url.Parse(c.APIURLForConfigCheck) + if err != nil { + return err + } + + // Build the path: e.g., https://qbwsw.mw.lc/api/v1/agent/setting/APIKEY/HOSTNAME + baseURL := u.JoinPath(apiPathForAgentSetting, c.APIKey, hostname) + finalURL := baseURL.String() + + // 2. Generate the dynamic report payload (the content for the 'value' field) + rawReportValue, err := c.GetAgentReportValue() + if err != nil { + return fmt.Errorf("failed to generate agent report value: %w", err) + } + + // Marshal the AgentReportValue into JSON bytes + rawConfigBytes, err := json.Marshal(rawReportValue) + if err != nil { + return fmt.Errorf("failed to marshal raw config payload: %w", err) + } + + // Base64 encode the JSON bytes for the 'value' field + encodedConfig := base64.StdEncoding.EncodeToString(rawConfigBytes) + + // 3. Assemble the final request body (AgentSettingPayload) + payload := AgentSettingPayload{ + Value: encodedConfig, + MetaData: map[string]interface{}{ + "agent_version": c.Version, + "platform": runtime.GOOS, + "infra_platform": fmt.Sprint(c.InfraPlatform), + // c.collector == nil means the collector is NOT running (i.e., collectorRunning = 1) + "col_running": c.collector == nil, + }, + // Config field is set to nil as per backend API pattern unless needed + Config: nil, + } + + // Marshal payload to JSON + payloadBytes, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("failed to marshal final request payload: %w", err) + } + + // 4. Create and Execute the HTTP POST Request + req, err := http.NewRequest(http.MethodPost, finalURL, bytes.NewBuffer(payloadBytes)) + if err != nil { + return fmt.Errorf("failed to create POST request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("agent status POST request failed for %s: %w", finalURL, err) + } + defer resp.Body.Close() + + // 5. Check Status Code + if resp.StatusCode != http.StatusOK { + // Read the error body for better debugging if available + bodyBytes, _ := io.ReadAll(resp.Body) + c.logger.Error("agent status POST API returned non-200 status code", + zap.Int("status", resp.StatusCode), + zap.ByteString("body", bodyBytes), + zap.String("url", finalURL)) + + return fmt.Errorf("agent status POST API returned non-200 status code: %d", resp.StatusCode) + } + + c.logger.Debug("Successfully reported agent status", zap.String("url", finalURL)) + + return nil +} + func (c *HostAgent) UpdateAgentTrackStatus(reason error) error { c.logger.Info("Starting UpdateAgentTrackStatus") hostname := GetHostnameForPlatform(c.InfraPlatform)