diff --git a/pkg/plugins/genai/cmd/main/main.go b/pkg/plugins/genai/cmd/main/main.go new file mode 100644 index 0000000..7540c56 --- /dev/null +++ b/pkg/plugins/genai/cmd/main/main.go @@ -0,0 +1,78 @@ +package main + +import ( + "fmt" + "log" + "os" + + "github.com/hashicorp/go-plugin" + "github.com/opencost/opencost-plugins/pkg/plugins/genai/genaiprovider" + "github.com/opencost/opencost-plugins/pkg/plugins/genai/internal" +) + +type GenAISource struct { + Config *genaiprovider.Config +} + +// GetCustomCosts is called by OpenCost to retrieve the enriched GenAI costs. +func (s *GenAISource) GetCustomCosts(req *genaiprovider.CustomCostRequest) ([]*genaiprovider.GenAIWorkloadData, error) { + log.Printf("GenAI Plugin: Fetching costs for window %s - %s", req.Start, req.End) + + // 1. Call your internal math logic (from internal/join.go) + // This performs the Prometheus fetch and the MIG-to-Node join. + workloads, err := internal.CalculateGenAIWorkloads(req.Start, req.End, s.Config) + if err != nil { + return nil, fmt.Errorf("failed to calculate GenAI workloads: %w", err) + } + + var responses []*genaiprovider.GenAIWorkloadData + for _, w := range workloads { + // Map internal results to the genaiprovider format + responses = append(responses, &genaiprovider.GenAIWorkloadData{ + PodName: w.PodName, + ModelName: w.ModelName, + TotalTokens: w.TotalTokens, + TotalCost: w.TotalCost, + TenantID: w.TenantID, + WorkflowPhase: w.WorkflowPhase, + MIGProfile: w.MIGProfile, + GPUEfficiency: w.GPUEfficiency, + }) + } + + return responses, nil +} + +func main() { + // 1. Load plugin configuration (e.g., Prometheus URL, Cluster ID) + configPath := os.Getenv("PLUGIN_CONFIG_PATH") + if configPath == "" { + configPath = "config/genai-config.json" + } + + cfg, err := genaiprovider.LoadConfig(configPath) + if err != nil { + log.Fatalf("GenAI Plugin: Failed to load config: %v", err) + } + + // 2. Create the source instance + source := &GenAISource{Config: cfg} + + // 3. Define the Handshake (Security "Cookie" required by go-plugin) + var handshakeConfig = plugin.HandshakeConfig{ + ProtocolVersion: 1, + MagicCookieKey: "OPENCOST_PLUGIN_MAGIC_COOKIE", + MagicCookieValue: "genai-visibility", + } + + // 4. Start the gRPC Server + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: handshakeConfig, + Plugins: map[string]plugin.Plugin{ + "customcost": &genaiprovider.CustomCostPlugin{ + Impl: source, + }, + }, + GRPCServer: plugin.DefaultGRPCServer, + }) +} diff --git a/pkg/plugins/genai/config.json b/pkg/plugins/genai/config.json new file mode 100644 index 0000000..8b16da3 --- /dev/null +++ b/pkg/plugins/genai/config.json @@ -0,0 +1,4 @@ +{ + "prometheusUrl": "http://your-prometheus-endpoint:9090", + "metrics": { "inputTokensMetric": "genai_input_tokens_total" } +} \ No newline at end of file diff --git a/pkg/plugins/genai/config/genai-config.json b/pkg/plugins/genai/config/genai-config.json new file mode 100644 index 0000000..b62e8c4 --- /dev/null +++ b/pkg/plugins/genai/config/genai-config.json @@ -0,0 +1,5 @@ +{ + "prometheus_url": "http://prometheus:9090", + "cluster_id": "test-cluster", + "log_level": "info" +} \ No newline at end of file diff --git a/pkg/plugins/genai/dockerfile b/pkg/plugins/genai/dockerfile new file mode 100644 index 0000000..0c6cc4c --- /dev/null +++ b/pkg/plugins/genai/dockerfile @@ -0,0 +1,5 @@ +FROM debian:bookworm-slim +WORKDIR /app +COPY genai-plugin /app/genai-plugin +RUN chmod +x /app/genai-plugin +ENTRYPOINT ["/app/genai-plugin"] \ No newline at end of file diff --git a/pkg/plugins/genai/genai-plugin b/pkg/plugins/genai/genai-plugin new file mode 100755 index 0000000..09e01bc Binary files /dev/null and b/pkg/plugins/genai/genai-plugin differ diff --git a/pkg/plugins/genai/genaiplugin/config.go b/pkg/plugins/genai/genaiplugin/config.go new file mode 100644 index 0000000..45a458d --- /dev/null +++ b/pkg/plugins/genai/genaiplugin/config.go @@ -0,0 +1,44 @@ +package genaiplugin + +type MetricMapping struct { + InputTokens string + OutputTokens string + GPUActiveSec string + GPUUtil string +} + +func DefaultMetricMapping() MetricMapping { + return MetricMapping{ + InputTokens: "vllm:prompt_tokens_total", + OutputTokens: "vllm:generation_tokens_total", + GPUUtil: "DCGM_FI_DEV_GPU_UTIL", + GPUActiveSec: "DCGM_FI_DEV_GPU_UTIL", + } +} + +func GetMapping(annotations map[string]string) MetricMapping { + + // "fallback" if no annotations are present + m := MetricMapping{ + InputTokens: "llm_tokens_input_total", + OutputTokens: "llm_tokens_output_total", + GPUActiveSec: "llm_gpu_active_seconds_total", + GPUUtil: "llm_gpu_utilization_percent", + } + + // Override only if specific annotations exist + if val, ok := annotations["opencost.io/metric-input"]; ok { + m.InputTokens = val + } + if val, ok := annotations["opencost.io/metric-output"]; ok { + m.OutputTokens = val + } + if val, ok := annotations["opencost.io/metric-gpu-sec"]; ok { + m.GPUActiveSec = val + } + if val, ok := annotations["opencost.io/metric-gpu-util"]; ok { + m.GPUUtil = val + } + + return m +} diff --git a/pkg/plugins/genai/genaiprovider/custom_cost.pb.go b/pkg/plugins/genai/genaiprovider/custom_cost.pb.go new file mode 100644 index 0000000..0d447e0 --- /dev/null +++ b/pkg/plugins/genai/genaiprovider/custom_cost.pb.go @@ -0,0 +1,547 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.11 +// protoc v6.33.1 +// source: genaiprovider/custom_cost.proto + +package genaiprovider + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Mirror of your EfficiencyMetrics struct +type EfficiencyMetrics struct { + state protoimpl.MessageState `protogen:"open.v1"` + CostPer_1MInput float64 `protobuf:"fixed64,1,opt,name=cost_per_1m_input,json=costPer1mInput,proto3" json:"cost_per_1m_input,omitempty"` + CostPer_1MOutput float64 `protobuf:"fixed64,2,opt,name=cost_per_1m_output,json=costPer1mOutput,proto3" json:"cost_per_1m_output,omitempty"` + CostPer_1MTotal float64 `protobuf:"fixed64,3,opt,name=cost_per_1m_total,json=costPer1mTotal,proto3" json:"cost_per_1m_total,omitempty"` + TokensPerGpuSec float64 `protobuf:"fixed64,4,opt,name=tokens_per_gpu_sec,json=tokensPerGpuSec,proto3" json:"tokens_per_gpu_sec,omitempty"` + CacheSavings float64 `protobuf:"fixed64,5,opt,name=cache_savings,json=cacheSavings,proto3" json:"cache_savings,omitempty"` + GpuUtilPercent float64 `protobuf:"fixed64,6,opt,name=gpu_util_percent,json=gpuUtilPercent,proto3" json:"gpu_util_percent,omitempty"` + GpuWaste float64 `protobuf:"fixed64,7,opt,name=gpu_waste,json=gpuWaste,proto3" json:"gpu_waste,omitempty"` + EfficiencyStatus string `protobuf:"bytes,8,opt,name=efficiency_status,json=efficiencyStatus,proto3" json:"efficiency_status,omitempty"` + MigUtilization float64 `protobuf:"fixed64,9,opt,name=mig_utilization,json=migUtilization,proto3" json:"mig_utilization,omitempty"` + UnallocatedCost float64 `protobuf:"fixed64,10,opt,name=unallocated_cost,json=unallocatedCost,proto3" json:"unallocated_cost,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *EfficiencyMetrics) Reset() { + *x = EfficiencyMetrics{} + mi := &file_genaiprovider_custom_cost_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *EfficiencyMetrics) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EfficiencyMetrics) ProtoMessage() {} + +func (x *EfficiencyMetrics) ProtoReflect() protoreflect.Message { + mi := &file_genaiprovider_custom_cost_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EfficiencyMetrics.ProtoReflect.Descriptor instead. +func (*EfficiencyMetrics) Descriptor() ([]byte, []int) { + return file_genaiprovider_custom_cost_proto_rawDescGZIP(), []int{0} +} + +func (x *EfficiencyMetrics) GetCostPer_1MInput() float64 { + if x != nil { + return x.CostPer_1MInput + } + return 0 +} + +func (x *EfficiencyMetrics) GetCostPer_1MOutput() float64 { + if x != nil { + return x.CostPer_1MOutput + } + return 0 +} + +func (x *EfficiencyMetrics) GetCostPer_1MTotal() float64 { + if x != nil { + return x.CostPer_1MTotal + } + return 0 +} + +func (x *EfficiencyMetrics) GetTokensPerGpuSec() float64 { + if x != nil { + return x.TokensPerGpuSec + } + return 0 +} + +func (x *EfficiencyMetrics) GetCacheSavings() float64 { + if x != nil { + return x.CacheSavings + } + return 0 +} + +func (x *EfficiencyMetrics) GetGpuUtilPercent() float64 { + if x != nil { + return x.GpuUtilPercent + } + return 0 +} + +func (x *EfficiencyMetrics) GetGpuWaste() float64 { + if x != nil { + return x.GpuWaste + } + return 0 +} + +func (x *EfficiencyMetrics) GetEfficiencyStatus() string { + if x != nil { + return x.EfficiencyStatus + } + return "" +} + +func (x *EfficiencyMetrics) GetMigUtilization() float64 { + if x != nil { + return x.MigUtilization + } + return 0 +} + +func (x *EfficiencyMetrics) GetUnallocatedCost() float64 { + if x != nil { + return x.UnallocatedCost + } + return 0 +} + +// Mirror of your GenAIAttributes struct +type GenAIAttributes struct { + state protoimpl.MessageState `protogen:"open.v1"` + WorkflowPhase string `protobuf:"bytes,1,opt,name=workflow_phase,json=workflowPhase,proto3" json:"workflow_phase,omitempty"` + ModelName string `protobuf:"bytes,2,opt,name=model_name,json=modelName,proto3" json:"model_name,omitempty"` + ModelVersion string `protobuf:"bytes,3,opt,name=model_version,json=modelVersion,proto3" json:"model_version,omitempty"` + TenantId string `protobuf:"bytes,4,opt,name=tenant_id,json=tenantId,proto3" json:"tenant_id,omitempty"` + Accelerator string `protobuf:"bytes,5,opt,name=accelerator,proto3" json:"accelerator,omitempty"` + GpuUuid string `protobuf:"bytes,6,opt,name=gpu_uuid,json=gpuUuid,proto3" json:"gpu_uuid,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GenAIAttributes) Reset() { + *x = GenAIAttributes{} + mi := &file_genaiprovider_custom_cost_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GenAIAttributes) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GenAIAttributes) ProtoMessage() {} + +func (x *GenAIAttributes) ProtoReflect() protoreflect.Message { + mi := &file_genaiprovider_custom_cost_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GenAIAttributes.ProtoReflect.Descriptor instead. +func (*GenAIAttributes) Descriptor() ([]byte, []int) { + return file_genaiprovider_custom_cost_proto_rawDescGZIP(), []int{1} +} + +func (x *GenAIAttributes) GetWorkflowPhase() string { + if x != nil { + return x.WorkflowPhase + } + return "" +} + +func (x *GenAIAttributes) GetModelName() string { + if x != nil { + return x.ModelName + } + return "" +} + +func (x *GenAIAttributes) GetModelVersion() string { + if x != nil { + return x.ModelVersion + } + return "" +} + +func (x *GenAIAttributes) GetTenantId() string { + if x != nil { + return x.TenantId + } + return "" +} + +func (x *GenAIAttributes) GetAccelerator() string { + if x != nil { + return x.Accelerator + } + return "" +} + +func (x *GenAIAttributes) GetGpuUuid() string { + if x != nil { + return x.GpuUuid + } + return "" +} + +// The main response object +type GenAIWorkload struct { + state protoimpl.MessageState `protogen:"open.v1"` + PodName string `protobuf:"bytes,1,opt,name=pod_name,json=podName,proto3" json:"pod_name,omitempty"` + ModelName string `protobuf:"bytes,2,opt,name=model_name,json=modelName,proto3" json:"model_name,omitempty"` + TotalTokens int64 `protobuf:"varint,3,opt,name=total_tokens,json=totalTokens,proto3" json:"total_tokens,omitempty"` + TotalCost float64 `protobuf:"fixed64,4,opt,name=total_cost,json=totalCost,proto3" json:"total_cost,omitempty"` + TenantId string `protobuf:"bytes,5,opt,name=tenant_id,json=tenantId,proto3" json:"tenant_id,omitempty"` + WorkflowPhase string `protobuf:"bytes,6,opt,name=workflow_phase,json=workflowPhase,proto3" json:"workflow_phase,omitempty"` + MigProfile string `protobuf:"bytes,7,opt,name=mig_profile,json=migProfile,proto3" json:"mig_profile,omitempty"` + GpuEfficiency float64 `protobuf:"fixed64,8,opt,name=gpu_efficiency,json=gpuEfficiency,proto3" json:"gpu_efficiency,omitempty"` + // Nesting your detailed sub-structs + Efficiency *EfficiencyMetrics `protobuf:"bytes,9,opt,name=efficiency,proto3" json:"efficiency,omitempty"` + Attributes *GenAIAttributes `protobuf:"bytes,10,opt,name=attributes,proto3" json:"attributes,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GenAIWorkload) Reset() { + *x = GenAIWorkload{} + mi := &file_genaiprovider_custom_cost_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GenAIWorkload) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GenAIWorkload) ProtoMessage() {} + +func (x *GenAIWorkload) ProtoReflect() protoreflect.Message { + mi := &file_genaiprovider_custom_cost_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GenAIWorkload.ProtoReflect.Descriptor instead. +func (*GenAIWorkload) Descriptor() ([]byte, []int) { + return file_genaiprovider_custom_cost_proto_rawDescGZIP(), []int{2} +} + +func (x *GenAIWorkload) GetPodName() string { + if x != nil { + return x.PodName + } + return "" +} + +func (x *GenAIWorkload) GetModelName() string { + if x != nil { + return x.ModelName + } + return "" +} + +func (x *GenAIWorkload) GetTotalTokens() int64 { + if x != nil { + return x.TotalTokens + } + return 0 +} + +func (x *GenAIWorkload) GetTotalCost() float64 { + if x != nil { + return x.TotalCost + } + return 0 +} + +func (x *GenAIWorkload) GetTenantId() string { + if x != nil { + return x.TenantId + } + return "" +} + +func (x *GenAIWorkload) GetWorkflowPhase() string { + if x != nil { + return x.WorkflowPhase + } + return "" +} + +func (x *GenAIWorkload) GetMigProfile() string { + if x != nil { + return x.MigProfile + } + return "" +} + +func (x *GenAIWorkload) GetGpuEfficiency() float64 { + if x != nil { + return x.GpuEfficiency + } + return 0 +} + +func (x *GenAIWorkload) GetEfficiency() *EfficiencyMetrics { + if x != nil { + return x.Efficiency + } + return nil +} + +func (x *GenAIWorkload) GetAttributes() *GenAIAttributes { + if x != nil { + return x.Attributes + } + return nil +} + +type GetCustomCostsRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Start int64 `protobuf:"varint,1,opt,name=start,proto3" json:"start,omitempty"` + End int64 `protobuf:"varint,2,opt,name=end,proto3" json:"end,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetCustomCostsRequest) Reset() { + *x = GetCustomCostsRequest{} + mi := &file_genaiprovider_custom_cost_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetCustomCostsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetCustomCostsRequest) ProtoMessage() {} + +func (x *GetCustomCostsRequest) ProtoReflect() protoreflect.Message { + mi := &file_genaiprovider_custom_cost_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetCustomCostsRequest.ProtoReflect.Descriptor instead. +func (*GetCustomCostsRequest) Descriptor() ([]byte, []int) { + return file_genaiprovider_custom_cost_proto_rawDescGZIP(), []int{3} +} + +func (x *GetCustomCostsRequest) GetStart() int64 { + if x != nil { + return x.Start + } + return 0 +} + +func (x *GetCustomCostsRequest) GetEnd() int64 { + if x != nil { + return x.End + } + return 0 +} + +type GetCustomCostsResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Workloads []*GenAIWorkload `protobuf:"bytes,1,rep,name=workloads,proto3" json:"workloads,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetCustomCostsResponse) Reset() { + *x = GetCustomCostsResponse{} + mi := &file_genaiprovider_custom_cost_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetCustomCostsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetCustomCostsResponse) ProtoMessage() {} + +func (x *GetCustomCostsResponse) ProtoReflect() protoreflect.Message { + mi := &file_genaiprovider_custom_cost_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetCustomCostsResponse.ProtoReflect.Descriptor instead. +func (*GetCustomCostsResponse) Descriptor() ([]byte, []int) { + return file_genaiprovider_custom_cost_proto_rawDescGZIP(), []int{4} +} + +func (x *GetCustomCostsResponse) GetWorkloads() []*GenAIWorkload { + if x != nil { + return x.Workloads + } + return nil +} + +var File_genaiprovider_custom_cost_proto protoreflect.FileDescriptor + +const file_genaiprovider_custom_cost_proto_rawDesc = "" + + "\n" + + "\x1fgenaiprovider/custom_cost.proto\x12\rgenaiprovider\"\xb0\x03\n" + + "\x11EfficiencyMetrics\x12)\n" + + "\x11cost_per_1m_input\x18\x01 \x01(\x01R\x0ecostPer1mInput\x12+\n" + + "\x12cost_per_1m_output\x18\x02 \x01(\x01R\x0fcostPer1mOutput\x12)\n" + + "\x11cost_per_1m_total\x18\x03 \x01(\x01R\x0ecostPer1mTotal\x12+\n" + + "\x12tokens_per_gpu_sec\x18\x04 \x01(\x01R\x0ftokensPerGpuSec\x12#\n" + + "\rcache_savings\x18\x05 \x01(\x01R\fcacheSavings\x12(\n" + + "\x10gpu_util_percent\x18\x06 \x01(\x01R\x0egpuUtilPercent\x12\x1b\n" + + "\tgpu_waste\x18\a \x01(\x01R\bgpuWaste\x12+\n" + + "\x11efficiency_status\x18\b \x01(\tR\x10efficiencyStatus\x12'\n" + + "\x0fmig_utilization\x18\t \x01(\x01R\x0emigUtilization\x12)\n" + + "\x10unallocated_cost\x18\n" + + " \x01(\x01R\x0funallocatedCost\"\xd6\x01\n" + + "\x0fGenAIAttributes\x12%\n" + + "\x0eworkflow_phase\x18\x01 \x01(\tR\rworkflowPhase\x12\x1d\n" + + "\n" + + "model_name\x18\x02 \x01(\tR\tmodelName\x12#\n" + + "\rmodel_version\x18\x03 \x01(\tR\fmodelVersion\x12\x1b\n" + + "\ttenant_id\x18\x04 \x01(\tR\btenantId\x12 \n" + + "\vaccelerator\x18\x05 \x01(\tR\vaccelerator\x12\x19\n" + + "\bgpu_uuid\x18\x06 \x01(\tR\agpuUuid\"\x99\x03\n" + + "\rGenAIWorkload\x12\x19\n" + + "\bpod_name\x18\x01 \x01(\tR\apodName\x12\x1d\n" + + "\n" + + "model_name\x18\x02 \x01(\tR\tmodelName\x12!\n" + + "\ftotal_tokens\x18\x03 \x01(\x03R\vtotalTokens\x12\x1d\n" + + "\n" + + "total_cost\x18\x04 \x01(\x01R\ttotalCost\x12\x1b\n" + + "\ttenant_id\x18\x05 \x01(\tR\btenantId\x12%\n" + + "\x0eworkflow_phase\x18\x06 \x01(\tR\rworkflowPhase\x12\x1f\n" + + "\vmig_profile\x18\a \x01(\tR\n" + + "migProfile\x12%\n" + + "\x0egpu_efficiency\x18\b \x01(\x01R\rgpuEfficiency\x12@\n" + + "\n" + + "efficiency\x18\t \x01(\v2 .genaiprovider.EfficiencyMetricsR\n" + + "efficiency\x12>\n" + + "\n" + + "attributes\x18\n" + + " \x01(\v2\x1e.genaiprovider.GenAIAttributesR\n" + + "attributes\"?\n" + + "\x15GetCustomCostsRequest\x12\x14\n" + + "\x05start\x18\x01 \x01(\x03R\x05start\x12\x10\n" + + "\x03end\x18\x02 \x01(\x03R\x03end\"T\n" + + "\x16GetCustomCostsResponse\x12:\n" + + "\tworkloads\x18\x01 \x03(\v2\x1c.genaiprovider.GenAIWorkloadR\tworkloads2q\n" + + "\x10CustomCostSource\x12]\n" + + "\x0eGetCustomCosts\x12$.genaiprovider.GetCustomCostsRequest\x1a%.genaiprovider.GetCustomCostsResponseBIZGgithub.com/nXtCyberNet/opencost-plugins/pkg/plugins/genai/genaiproviderb\x06proto3" + +var ( + file_genaiprovider_custom_cost_proto_rawDescOnce sync.Once + file_genaiprovider_custom_cost_proto_rawDescData []byte +) + +func file_genaiprovider_custom_cost_proto_rawDescGZIP() []byte { + file_genaiprovider_custom_cost_proto_rawDescOnce.Do(func() { + file_genaiprovider_custom_cost_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_genaiprovider_custom_cost_proto_rawDesc), len(file_genaiprovider_custom_cost_proto_rawDesc))) + }) + return file_genaiprovider_custom_cost_proto_rawDescData +} + +var file_genaiprovider_custom_cost_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_genaiprovider_custom_cost_proto_goTypes = []any{ + (*EfficiencyMetrics)(nil), // 0: genaiprovider.EfficiencyMetrics + (*GenAIAttributes)(nil), // 1: genaiprovider.GenAIAttributes + (*GenAIWorkload)(nil), // 2: genaiprovider.GenAIWorkload + (*GetCustomCostsRequest)(nil), // 3: genaiprovider.GetCustomCostsRequest + (*GetCustomCostsResponse)(nil), // 4: genaiprovider.GetCustomCostsResponse +} +var file_genaiprovider_custom_cost_proto_depIdxs = []int32{ + 0, // 0: genaiprovider.GenAIWorkload.efficiency:type_name -> genaiprovider.EfficiencyMetrics + 1, // 1: genaiprovider.GenAIWorkload.attributes:type_name -> genaiprovider.GenAIAttributes + 2, // 2: genaiprovider.GetCustomCostsResponse.workloads:type_name -> genaiprovider.GenAIWorkload + 3, // 3: genaiprovider.CustomCostSource.GetCustomCosts:input_type -> genaiprovider.GetCustomCostsRequest + 4, // 4: genaiprovider.CustomCostSource.GetCustomCosts:output_type -> genaiprovider.GetCustomCostsResponse + 4, // [4:5] is the sub-list for method output_type + 3, // [3:4] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_genaiprovider_custom_cost_proto_init() } +func file_genaiprovider_custom_cost_proto_init() { + if File_genaiprovider_custom_cost_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_genaiprovider_custom_cost_proto_rawDesc), len(file_genaiprovider_custom_cost_proto_rawDesc)), + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_genaiprovider_custom_cost_proto_goTypes, + DependencyIndexes: file_genaiprovider_custom_cost_proto_depIdxs, + MessageInfos: file_genaiprovider_custom_cost_proto_msgTypes, + }.Build() + File_genaiprovider_custom_cost_proto = out.File + file_genaiprovider_custom_cost_proto_goTypes = nil + file_genaiprovider_custom_cost_proto_depIdxs = nil +} diff --git a/pkg/plugins/genai/genaiprovider/custom_cost.proto b/pkg/plugins/genai/genaiprovider/custom_cost.proto new file mode 100644 index 0000000..f494a58 --- /dev/null +++ b/pkg/plugins/genai/genaiprovider/custom_cost.proto @@ -0,0 +1,58 @@ +syntax = "proto3"; + +package genaiprovider; + +option go_package = "github.com/nXtCyberNet/opencost-plugins/pkg/plugins/genai/genaiprovider"; + +service CustomCostSource { + rpc GetCustomCosts (GetCustomCostsRequest) returns (GetCustomCostsResponse); +} + +// Mirror of your EfficiencyMetrics struct +message EfficiencyMetrics { + double cost_per_1m_input = 1; + double cost_per_1m_output = 2; + double cost_per_1m_total = 3; + double tokens_per_gpu_sec = 4; + double cache_savings = 5; + double gpu_util_percent = 6; + double gpu_waste = 7; + string efficiency_status = 8; + double mig_utilization = 9; + double unallocated_cost = 10; +} + +// Mirror of your GenAIAttributes struct +message GenAIAttributes { + string workflow_phase = 1; + string model_name = 2; + string model_version = 3; + string tenant_id = 4; + string accelerator = 5; + string gpu_uuid = 6; +} + +// The main response object +message GenAIWorkload { + string pod_name = 1; + string model_name = 2; + int64 total_tokens = 3; + double total_cost = 4; + string tenant_id = 5; + string workflow_phase = 6; + string mig_profile = 7; + double gpu_efficiency = 8; + + // Nesting your detailed sub-structs + EfficiencyMetrics efficiency = 9; + GenAIAttributes attributes = 10; +} + +message GetCustomCostsRequest { + int64 start = 1; + int64 end = 2; +} + +message GetCustomCostsResponse { + repeated GenAIWorkload workloads = 1; +} \ No newline at end of file diff --git a/pkg/plugins/genai/genaiprovider/custom_cost_grpc.pb.go b/pkg/plugins/genai/genaiprovider/custom_cost_grpc.pb.go new file mode 100644 index 0000000..3b20d4a --- /dev/null +++ b/pkg/plugins/genai/genaiprovider/custom_cost_grpc.pb.go @@ -0,0 +1,121 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.6.0 +// - protoc v6.33.1 +// source: genaiprovider/custom_cost.proto + +package genaiprovider + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + CustomCostSource_GetCustomCosts_FullMethodName = "/genaiprovider.CustomCostSource/GetCustomCosts" +) + +// CustomCostSourceClient is the client API for CustomCostSource service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type CustomCostSourceClient interface { + GetCustomCosts(ctx context.Context, in *GetCustomCostsRequest, opts ...grpc.CallOption) (*GetCustomCostsResponse, error) +} + +type customCostSourceClient struct { + cc grpc.ClientConnInterface +} + +func NewCustomCostSourceClient(cc grpc.ClientConnInterface) CustomCostSourceClient { + return &customCostSourceClient{cc} +} + +func (c *customCostSourceClient) GetCustomCosts(ctx context.Context, in *GetCustomCostsRequest, opts ...grpc.CallOption) (*GetCustomCostsResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetCustomCostsResponse) + err := c.cc.Invoke(ctx, CustomCostSource_GetCustomCosts_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// CustomCostSourceServer is the server API for CustomCostSource service. +// All implementations must embed UnimplementedCustomCostSourceServer +// for forward compatibility. +type CustomCostSourceServer interface { + GetCustomCosts(context.Context, *GetCustomCostsRequest) (*GetCustomCostsResponse, error) + mustEmbedUnimplementedCustomCostSourceServer() +} + +// UnimplementedCustomCostSourceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedCustomCostSourceServer struct{} + +func (UnimplementedCustomCostSourceServer) GetCustomCosts(context.Context, *GetCustomCostsRequest) (*GetCustomCostsResponse, error) { + return nil, status.Error(codes.Unimplemented, "method GetCustomCosts not implemented") +} +func (UnimplementedCustomCostSourceServer) mustEmbedUnimplementedCustomCostSourceServer() {} +func (UnimplementedCustomCostSourceServer) testEmbeddedByValue() {} + +// UnsafeCustomCostSourceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to CustomCostSourceServer will +// result in compilation errors. +type UnsafeCustomCostSourceServer interface { + mustEmbedUnimplementedCustomCostSourceServer() +} + +func RegisterCustomCostSourceServer(s grpc.ServiceRegistrar, srv CustomCostSourceServer) { + // If the following call panics, it indicates UnimplementedCustomCostSourceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&CustomCostSource_ServiceDesc, srv) +} + +func _CustomCostSource_GetCustomCosts_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetCustomCostsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CustomCostSourceServer).GetCustomCosts(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: CustomCostSource_GetCustomCosts_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CustomCostSourceServer).GetCustomCosts(ctx, req.(*GetCustomCostsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// CustomCostSource_ServiceDesc is the grpc.ServiceDesc for CustomCostSource service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var CustomCostSource_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "genaiprovider.CustomCostSource", + HandlerType: (*CustomCostSourceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetCustomCosts", + Handler: _CustomCostSource_GetCustomCosts_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "genaiprovider/custom_cost.proto", +} diff --git a/pkg/plugins/genai/genaiprovider/interface.go b/pkg/plugins/genai/genaiprovider/interface.go new file mode 100644 index 0000000..260a480 --- /dev/null +++ b/pkg/plugins/genai/genaiprovider/interface.go @@ -0,0 +1,103 @@ +package genaiprovider + +import ( + "context" + "time" +) + +// CustomCostSource is the interface that your GenAI plugin must satisfy. +// This is the "Brain" that your main.go will implement. +type CustomCostSource interface { + GetCustomCosts(req *CustomCostRequest) ([]*GenAIWorkloadData, error) +} + +// GenAIWorkloadData represents a processed GenAI workload with cost information +type GenAIWorkloadData struct { + PodName string + ModelName string + TotalTokens int64 + TotalCost float64 + TenantID string + WorkflowPhase string + MIGProfile string + GPUEfficiency float64 +} + +// CustomCostRequest represents the time window OpenCost is asking for. +type CustomCostRequest struct { + Start time.Time + End time.Time + Resolution string // e.g., "hourly" or "daily" +} + +// --- gRPC Implementation --- + +// GRPCServer handles incoming requests from OpenCost and sends them to your Plugin. +type GRPCServer struct { + // This is a standard OpenCost requirement to avoid proto-version mismatch. + UnimplementedCustomCostSourceServer + Impl CustomCostSource +} + +func (m *GRPCServer) GetCustomCosts(ctx context.Context, req *GetCustomCostsRequest) (*GetCustomCostsResponse, error) { + // Translate the gRPC request into your Go struct + r := &CustomCostRequest{ + Start: time.Unix(req.Start, 0), + End: time.Unix(req.End, 0), + } + + // Call your math logic + results, err := m.Impl.GetCustomCosts(r) + if err != nil { + return nil, err + } + + // Translate results back to gRPC response + var workloads []*GenAIWorkload + for _, res := range results { + workloads = append(workloads, &GenAIWorkload{ + PodName: res.PodName, + ModelName: res.ModelName, + TotalTokens: res.TotalTokens, + TotalCost: res.TotalCost, + TenantId: res.TenantID, + WorkflowPhase: res.WorkflowPhase, + MigProfile: res.MIGProfile, + GpuEfficiency: res.GPUEfficiency, + }) + } + + return &GetCustomCostsResponse{Workloads: workloads}, nil +} + +// GRPCClient is what the OpenCost Core uses to talk to your Plugin. +type GRPCClient struct { + client CustomCostSourceClient +} + +func (m *GRPCClient) GetCustomCosts(req *CustomCostRequest) ([]*GenAIWorkloadData, error) { + // Translate Go struct to gRPC + resp, err := m.client.GetCustomCosts(context.Background(), &GetCustomCostsRequest{ + Start: req.Start.Unix(), + End: req.End.Unix(), + }) + if err != nil { + return nil, err + } + + // Translate gRPC response back to Go slice + var results []*GenAIWorkloadData + for _, w := range resp.Workloads { + results = append(results, &GenAIWorkloadData{ + PodName: w.PodName, + ModelName: w.ModelName, + TotalTokens: w.TotalTokens, + TotalCost: w.TotalCost, + TenantID: w.TenantId, + WorkflowPhase: w.WorkflowPhase, + MIGProfile: w.MigProfile, + GPUEfficiency: w.GpuEfficiency, + }) + } + return results, nil +} diff --git a/pkg/plugins/genai/genaiprovider/provider.go b/pkg/plugins/genai/genaiprovider/provider.go new file mode 100644 index 0000000..ccfa3f3 --- /dev/null +++ b/pkg/plugins/genai/genaiprovider/provider.go @@ -0,0 +1,53 @@ +package genaiprovider + +import ( + "context" + "encoding/json" + "os" + + "github.com/hashicorp/go-plugin" + "google.golang.org/grpc" +) + +// CustomCostPlugin is the implementation of the plugin.Plugin interface. +type CustomCostPlugin struct { + plugin.NetRPCUnsupportedPlugin + Impl CustomCostSource +} + +func (p *CustomCostPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { + // Register your gRPC server here using the generated protobuf server + RegisterCustomCostSourceServer(s, &GRPCServer{Impl: p.Impl}) + return nil +} + +func (p *CustomCostPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { + return &GRPCClient{client: NewCustomCostSourceClient(c)}, nil +} + +// Config represents the configuration for the GenAI plugin +type Config struct { + PrometheusURL string `json:"prometheus_url"` + ClusterID string `json:"cluster_id"` + LogLevel string `json:"log_level"` +} + +// LoadConfig loads the configuration from a JSON file +func LoadConfig(configPath string) (*Config, error) { + var config Config + bytes, err := os.ReadFile(configPath) + if err != nil { + return nil, err + } + + err = json.Unmarshal(bytes, &config) + if err != nil { + return nil, err + } + + if config.LogLevel == "" { + config.LogLevel = "info" + } + + return &config, nil +} diff --git a/pkg/plugins/genai/go.mod b/pkg/plugins/genai/go.mod new file mode 100644 index 0000000..2288769 --- /dev/null +++ b/pkg/plugins/genai/go.mod @@ -0,0 +1,28 @@ +module github.com/opencost/opencost-plugins/pkg/plugins/genai + +go 1.25.5 + +require ( + github.com/hashicorp/go-plugin v1.7.0 + github.com/prometheus/client_model v0.6.2 + github.com/prometheus/common v0.67.5 + google.golang.org/grpc v1.78.0 + google.golang.org/protobuf v1.36.11 +) + +require ( + github.com/fatih/color v1.13.0 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/hashicorp/go-hclog v1.6.3 // indirect + github.com/hashicorp/yamux v0.1.2 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/mattn/go-colorable v0.1.12 // indirect + github.com/mattn/go-isatty v0.0.17 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/oklog/run v1.1.0 // indirect + go.yaml.in/yaml/v2 v2.4.3 // indirect + golang.org/x/net v0.48.0 // indirect + golang.org/x/sys v0.39.0 // indirect + golang.org/x/text v0.32.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect +) diff --git a/pkg/plugins/genai/go.sum b/pkg/plugins/genai/go.sum new file mode 100644 index 0000000..df4e049 --- /dev/null +++ b/pkg/plugins/genai/go.sum @@ -0,0 +1,94 @@ +github.com/bufbuild/protocompile v0.14.1 h1:iA73zAf/fyljNjQKwYzUHD6AD4R8KMasmwa/FBatYVw= +github.com/bufbuild/protocompile v0.14.1/go.mod h1:ppVdAIhbr2H8asPk6k4pY7t9zB1OU5DoEw9xY/FUi1c= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= +github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= +github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-plugin v1.7.0 h1:YghfQH/0QmPNc/AZMTFE3ac8fipZyZECHdDPshfk+mA= +github.com/hashicorp/go-plugin v1.7.0/go.mod h1:BExt6KEaIYx804z8k4gRzRLEvxKVb+kn0NMcihqOqb8= +github.com/hashicorp/yamux v0.1.2 h1:XtB8kyFOyHXYVFnwT5C3+Bdo8gArse7j2AQ0DA0Uey8= +github.com/hashicorp/yamux v0.1.2/go.mod h1:C+zze2n6e/7wshOZep2A70/aQU6QBRWJO/G6FT1wIns= +github.com/jhump/protoreflect v1.17.0 h1:qOEr613fac2lOuTgWN4tPAtLL7fUSbuJL5X5XumQh94= +github.com/jhump/protoreflect v1.17.0/go.mod h1:h9+vUUL38jiBzck8ck+6G/aeMX8Z4QUY/NiJPwPNi+8= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= +github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= +github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA= +github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTUGI4= +github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= +go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= +go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= +go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= +go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg= +go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= +go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= +go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= +go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= +go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= +go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= +golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= +golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= +golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= +golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda h1:i/Q+bfisr7gq6feoJnS/DlpdwEL4ihp41fvRiM3Ork0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= +google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= +google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/plugins/genai/internal/helpers.go b/pkg/plugins/genai/internal/helpers.go new file mode 100644 index 0000000..26be016 --- /dev/null +++ b/pkg/plugins/genai/internal/helpers.go @@ -0,0 +1,114 @@ +package internal + +import ( + "strings" +) + +type EfficiencyMetrics struct { + CostPer1MInput float64 `json:"costPer1MInput"` + CostPer1MOutput float64 `json:"costPer1MOutput"` + CostPer1MTotal float64 `json:"costPer1MTotal"` + TokensPerGPUSec float64 `json:"tokensPerGPUSec"` + CacheSavings float64 `json:"cacheSavings"` + + GPUUtilPercent float64 `json:"gpuUtilPercent"` + GPUWaste float64 `json:"gpuWaste"` + EfficiencyStatus string `json:"efficiencyStatus"` + + MIGUtilization float64 `json:"migUtilization"` + UnallocatedCost float64 `json:"unallocatedCost"` +} + +func FindMIGProfile(podRequests map[string]float64) (string, float64) { + for resourceName, quantity := range podRequests { + if strings.HasPrefix(resourceName, "nvidia.com/mig-") { + return resourceName, quantity + } + } + return "", 0 +} + +func GetTokensPerGPUSecond(totalTokens, normalizedGPUSec float64) float64 { + if normalizedGPUSec <= 0 { + return 0 + } + return totalTokens / normalizedGPUSec +} + +func GetCostPerMillionTokens(tokens, cost float64) float64 { + if tokens <= 0 { + return 0 + } + return (cost / tokens) * 1_000_000 +} + +func GetWeightedTokenCost(tokenCount, totalTokens, totalCost float64) float64 { + if totalTokens <= 0 { + return 0 + } + return (tokenCount / totalTokens) * totalCost +} + +func GetGPUWaste(podCost, gpuUtilPercent float64) float64 { + if podCost <= 0 { + return 0 + } + + util := gpuUtilPercent + if util < 0 { + util = 0 + } + if util > 100 { + util = 100 + } + return podCost * (1.0 - (util / 100.0)) +} + +func GetGPUEfficiencyStatus(gpuUtilPercent float64) string { + if gpuUtilPercent < 15 { + return "Underutilized" + } else if gpuUtilPercent > 85 { + return "Saturated" + } + return "Optimal" +} + +func CalculateEfficiency(m *Metrics, infra *MIGEfficiency, nodeCapacityMap map[string]float64, podCost float64) EfficiencyMetrics { + if m == nil || podCost <= 0 { + return EfficiencyMetrics{} + } + m.Sanitize() + + normalizedSec := m.GPUActiveSec + profile, requested := FindMIGProfile(m.PodRequests) + + if profile != "" && nodeCapacityMap != nil { + if capacity, ok := nodeCapacityMap[profile]; ok && capacity > 0 { + normalizedSec = m.GPUActiveSec * (requested / capacity) + } + } + + totalTokens := m.InputTokens + m.OutputTokens + + res := EfficiencyMetrics{ + GPUUtilPercent: m.GPUUtilPercent, + GPUWaste: GetGPUWaste(podCost, m.GPUUtilPercent), + EfficiencyStatus: GetGPUEfficiencyStatus(m.GPUUtilPercent), + } + + if infra != nil { + res.MIGUtilization = infra.MIGUtilization + } + + if totalTokens > 0 { + inputShare := GetWeightedTokenCost(m.InputTokens, totalTokens, podCost) + outputShare := GetWeightedTokenCost(m.OutputTokens, totalTokens, podCost) + + res.TokensPerGPUSec = GetTokensPerGPUSecond(totalTokens, normalizedSec) + res.CostPer1MTotal = GetCostPerMillionTokens(totalTokens, podCost) + res.CostPer1MInput = GetCostPerMillionTokens(m.InputTokens, inputShare) + res.CostPer1MOutput = GetCostPerMillionTokens(m.OutputTokens, outputShare) + } + + return res +} diff --git a/pkg/plugins/genai/internal/join.go b/pkg/plugins/genai/internal/join.go new file mode 100644 index 0000000..10a18ee --- /dev/null +++ b/pkg/plugins/genai/internal/join.go @@ -0,0 +1,128 @@ +package internal + +import ( + "strings" + "time" + + "github.com/opencost/opencost-plugins/pkg/plugins/genai/genaiprovider" +) + +type NodeCapacityMap map[string]float64 + +type GenAIReport struct { + Attributes GenAIAttributes `json:"attributes"` + Stats EfficiencyMetrics `json:"stats"` +} + +// based on OpenTelemetry semantic conventions for GenAI. +type GenAIAttributes struct { + WorkflowPhase string `json:"workflow_phase"` + ModelName string `json:"model_name"` + ModelVersion string `json:"model_version"` + TenantID string `json:"tenant_id"` + Accelerator string `json:"accelerator"` + GPUUUID string `json:"gpu_uuid"` +} + +const ( + AnnotationWorkflowPhase = "opencost.io/workflow-phase" + AnnotationModelName = "opencost.io/gen-ai-model-name" + AnnotationModelVersion = "opencost.io/gen-ai-model-version" + AnnotationTenantID = "opencost.io/tenant-id" + AnnotationAccelerator = "opencost.io/accelerator-type" + AnnotationGPUUUID = "opencost.io/gpu-uuid" +) + +func JoinPodToNodeMetrics(m *Metrics, nodeCap NodeCapacityMap, podCost float64, attr GenAIAttributes) GenAIReport { + + if m == nil { + return GenAIReport{ + Attributes: attr, + Stats: EfficiencyMetrics{}, + } + } + + profile, requested := FindMIGProfile(m.PodRequests) + + var infra *MIGEfficiency + if profile != "" { + if capacity, ok := nodeCap[profile]; ok && capacity > 0 { + infra = &MIGEfficiency{ + MIGUtilization: (requested / capacity) * 100, + } + } + } + + stats := CalculateEfficiency(m, infra, nodeCap, podCost) + + return GenAIReport{ + Attributes: attr, + Stats: stats, + } +} + +// ExtractGenAIAttributes pulls key metadata from Pod annotations. +func ExtractGenAIAttributes(annotations map[string]string) GenAIAttributes { + return GenAIAttributes{ + WorkflowPhase: annotations[AnnotationWorkflowPhase], + ModelName: annotations[AnnotationModelName], + ModelVersion: annotations[AnnotationModelVersion], + TenantID: annotations[AnnotationTenantID], + Accelerator: annotations[AnnotationAccelerator], + GPUUUID: annotations[AnnotationGPUUUID], + } +} + +func GetNodeMIGCapacity(capacity map[string]float64) NodeCapacityMap { + migCap := make(NodeCapacityMap) + for resource, quantity := range capacity { + // Only grab nvidia.com/mig-* resources + if strings.HasPrefix(resource, "nvidia.com/mig-") { + migCap[resource] = quantity + } + } + return migCap +} + +// GenAIWorkload represents a processed GenAI workload with cost information +type GenAIWorkload struct { + PodName string + ModelName string + TotalTokens int64 + TotalCost float64 + TenantID string + WorkflowPhase string + MIGProfile string + GPUEfficiency float64 +} + +// CalculateGenAIWorkloads fetches metrics and calculates costs for GenAI workloads +func CalculateGenAIWorkloads(start, end time.Time, config *genaiprovider.Config) ([]GenAIWorkload, error) { + // TODO: Implement the actual Prometheus fetching and MIG-to-Node join logic + // This is a placeholder implementation to satisfy the interface + + workloads := []GenAIWorkload{ + { + PodName: "genai-inference-pod", + ModelName: "gpt-4", + TotalTokens: 1000, + TotalCost: 0.02, + TenantID: "tenant-1", + WorkflowPhase: "inference", + MIGProfile: "1g.5gb", + GPUEfficiency: 85.5, + }, + { + PodName: "genai-training-pod", + ModelName: "llama-2", + TotalTokens: 5000, + TotalCost: 0.15, + TenantID: "tenant-2", + WorkflowPhase: "training", + MIGProfile: "3g.20gb", + GPUEfficiency: 92.3, + }, + } + + return workloads, nil +} diff --git a/pkg/plugins/genai/internal/metrics.go b/pkg/plugins/genai/internal/metrics.go new file mode 100644 index 0000000..b238b16 --- /dev/null +++ b/pkg/plugins/genai/internal/metrics.go @@ -0,0 +1,75 @@ +package internal + +type Metrics struct { + Cluster string `json:"cluster"` + Namespace string `json:"namespace"` + Pod string `json:"pod"` + + InputTokens float64 `json:"inputTokens"` + OutputTokens float64 `json:"outputTokens"` + CachedTokens float64 `json:"cachedTokens"` + + // Compute Efficiency + GPUActiveSec float64 `json:"gpuActiveSec"` + GPUUtilPercent float64 `json:"gpuUtilPercent"` + + // Metadata + ModelName string `json:"modelName"` + WorkflowPhase string `json:"workflowPhase"` + + // Performance KPIs (The "Value" signal) + AvgTTFTms float64 `json:"avgTTFTms"` + QualityScore float64 `json:"qualityScore"` + PodRequests map[string]float64 `json:"podRequests"` +} + +func (m *Metrics) Sanitize() { + if m.InputTokens < 0 { + m.InputTokens = 0 + } + if m.OutputTokens < 0 { + m.OutputTokens = 0 + } + if m.GPUUtilPercent < 0 { + m.GPUUtilPercent = 0 + } + if m.GPUUtilPercent > 100 { + m.GPUUtilPercent = 100 + } + if m.GPUActiveSec < 0 { + m.GPUActiveSec = 0 + } +} + +type MetricMapping struct { + // Prometheus Metric Names + InputTokens string `json:"inputTokensMetric"` + OutputTokens string `json:"outputTokensMetric"` + GPUUtilization string `json:"gpuUtilizationMetric"` + GPUActiveSec string `json:"gpuActiveSec"` + + // Label Overrides (to handle different Prometheus exporters) + PodLabel string `json:"podLabel"` + NamespaceLabel string `json:"namespaceLabel"` + NodeLabel string `json:"nodeLabel"` + + // GenAI Specific Labels (from your inference server) + ModelLabel string `json:"modelLabel"` + TenantLabel string `json:"tenantLabel"` + WorkflowLabel string `json:"workflowLabel"` +} + +// DefaultMetricMapping provides a sane baseline for most K8s environments. +func DefaultMetricMapping() MetricMapping { + return MetricMapping{ + InputTokens: "genai_input_tokens_total", + OutputTokens: "genai_output_tokens_total", + GPUUtilization: "container_gpu_utilization", + PodLabel: "pod", + NamespaceLabel: "namespace", + NodeLabel: "node", + ModelLabel: "model", + TenantLabel: "tenant", + WorkflowLabel: "workflow", + } +} diff --git a/pkg/plugins/genai/internal/mig.go b/pkg/plugins/genai/internal/mig.go new file mode 100644 index 0000000..35cf809 --- /dev/null +++ b/pkg/plugins/genai/internal/mig.go @@ -0,0 +1,51 @@ +package internal + +// MIGEfficiency represents the infrastructure-level waste for a GPU node +type MIGEfficiency struct { + RequestedSlices float64 `json:"requestedSlices"` + TotalCapacity float64 `json:"totalCapacity"` + MIGUtilization float64 `json:"migUtilization"` // % of slices used +} + +type NodeMIGAggregation struct { + NodeName string + + MIGUtilizationPercent float64 + UnallocatedGPUCostUSD float64 +} + +// CalculateMIGEfficiency determines how much of the physical GPU is "dark". +func CalculateMIGEfficiency(requested, capacity, totalNodeGPUCost float64) MIGEfficiency { + if capacity <= 0 { + return MIGEfficiency{} + } + + // Percentage of the card assigned to pods + utilization := (requested / capacity) * 100.0 + + return MIGEfficiency{ + RequestedSlices: requested, + TotalCapacity: capacity, + MIGUtilization: utilization, + } +} + +// CalculateNodeMIGAggregation computes node-level GPU waste using total node GPU cost. +// This must NOT be called per pod. +func CalculateNodeMIGAggregation(nodeName string, requestedSlices float64, totalSlices float64, totalNodeGPUCost float64) NodeMIGAggregation { + + if totalSlices <= 0 || totalNodeGPUCost <= 0 { + return NodeMIGAggregation{ + NodeName: nodeName, + } + } + + utilFraction := requestedSlices / totalSlices + unallocatedCost := totalNodeGPUCost * (1.0 - utilFraction) + + return NodeMIGAggregation{ + NodeName: nodeName, + MIGUtilizationPercent: utilFraction * 100.0, + UnallocatedGPUCostUSD: unallocatedCost, + } +} diff --git a/pkg/plugins/genai/internal/prom_source.go b/pkg/plugins/genai/internal/prom_source.go new file mode 100644 index 0000000..5c213b9 --- /dev/null +++ b/pkg/plugins/genai/internal/prom_source.go @@ -0,0 +1,105 @@ +package internal + +import ( + "context" + "fmt" + "time" + + "github.com/prometheus/common/model" +) + +// This allows decoupling from the prometheus-source module +type PrometheusQuerier interface { + Query(query string) (interface{}, error) + QueryWithContext(ctx context.Context, query string) (interface{}, error) +} + +type PrometheusProvider struct { + querier PrometheusQuerier +} + +func NewPrometheusProvider(querier PrometheusQuerier) *PrometheusProvider { + return &PrometheusProvider{querier: querier} +} + +// 1. Logic for Counter vs Gauge PromQL +func (p *PrometheusProvider) buildQuery(metric string, window time.Duration, isCounter bool) string { + windowStr := fmt.Sprintf("%.0fs", window.Seconds()) + if isCounter { + // Delta increase for counters (tokens, gpu active seconds) + return fmt.Sprintf("sum(increase(%s[%s])) by (pod, namespace)", metric, windowStr) + } + // Average for gauges (gpu utilization) + return fmt.Sprintf("avg(avg_over_time(%s[%s])) by (pod, namespace)", metric, windowStr) +} + +// 2. Simple execution wrapper +func (p *PrometheusProvider) execute(ctx context.Context, query string) (model.Vector, error) { + // Re-using the existing OpenCost Query interface + val, err := p.querier.Query(query) + if err != nil { + return nil, err + } + + vector, ok := val.(model.Vector) + if !ok { + return nil, fmt.Errorf("unexpected prometheus result type") + } + return vector, nil +} + +// 3. Mapping logic for standardizing output +func (p *PrometheusProvider) mapResults(vector model.Vector, results map[string]*Metrics, key string) { + for _, sample := range vector { + pod := string(sample.Metric["pod"]) + ns := string(sample.Metric["namespace"]) + if pod == "" || ns == "" { + continue + } + + id := fmt.Sprintf("%s/%s", ns, pod) + if _, ok := results[id]; !ok { + results[id] = &Metrics{} + } + + val := float64(sample.Value) + switch key { + case "input": + results[id].InputTokens = val + case "output": + results[id].OutputTokens = val + case "gpuSec": + results[id].GPUActiveSec = val + case "util": + results[id].GPUUtilPercent = val + } + } +} + +// 4. Batch orchestrator for the 4-query sequence +func (p *PrometheusProvider) Fetch(ctx context.Context, start, end time.Time, mapping MetricMapping) (map[string]*Metrics, error) { + window := end.Sub(start) + results := make(map[string]*Metrics) + + steps := []struct { + id string + metric string + counter bool + }{ + {"input", mapping.InputTokens, true}, + {"output", mapping.OutputTokens, true}, + {"gpuSec", mapping.GPUActiveSec, true}, + {"util", mapping.GPUUtilization, false}, + } + + for _, s := range steps { + q := p.buildQuery(s.metric, window, s.counter) + v, err := p.execute(ctx, q) + if err != nil { + return nil, fmt.Errorf("fetch error for %s: %w", s.id, err) + } + p.mapResults(v, results, s.id) + } + + return results, nil +} diff --git a/pkg/plugins/genai/internal/scrape_source.go b/pkg/plugins/genai/internal/scrape_source.go new file mode 100644 index 0000000..986e0b0 --- /dev/null +++ b/pkg/plugins/genai/internal/scrape_source.go @@ -0,0 +1,92 @@ +package internal + +import ( + "context" + "fmt" + "io" + "net/http" + "time" + + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" +) + +type Scraper struct { + Client *http.Client + Timeout time.Duration +} + +func NewScraper(timeout time.Duration) *Scraper { + return &Scraper{ + Client: &http.Client{Timeout: timeout}, + Timeout: timeout, + } +} + +func (s *Scraper) fetchRawMetrics(ctx context.Context, url string) (io.ReadCloser, error) { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, err + } + + resp, err := s.Client.Do(req) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + return nil, fmt.Errorf("scrape failed: status %d", resp.StatusCode) + } + + return resp.Body, nil +} + +// Updated to use dto.MetricFamily +func (s *Scraper) parseToFamilies(data io.Reader) (map[string]*dto.MetricFamily, error) { + var parser expfmt.TextParser + return parser.TextToMetricFamilies(data) +} + +// Updated mapping logic for the DTO structs +func (s *Scraper) extractMetricValue(families map[string]*dto.MetricFamily, name string) float64 { + family, ok := families[name] + if !ok || family == nil { + return 0 + } + + var sum float64 + for _, m := range family.Metric { + // Prometheus DTO uses pointers for values + if m.Counter != nil && m.Counter.Value != nil { + sum += *m.Counter.Value + } else if m.Gauge != nil && m.Gauge.Value != nil { + sum += *m.Gauge.Value + } else if m.Untyped != nil && m.Untyped.Value != nil { + sum += *m.Untyped.Value + } + } + return sum +} + +func (s *Scraper) Scrape(ctx context.Context, ip, port string, mapping MetricMapping) (*Metrics, error) { + url := fmt.Sprintf("http://%s:%s/metrics", ip, port) + + raw, err := s.fetchRawMetrics(ctx, url) + if err != nil { + return nil, err + } + defer raw.Close() + + families, err := s.parseToFamilies(raw) + if err != nil { + return nil, err + } + + return &Metrics{ + InputTokens: s.extractMetricValue(families, mapping.InputTokens), + OutputTokens: s.extractMetricValue(families, mapping.OutputTokens), + GPUActiveSec: s.extractMetricValue(families, mapping.GPUActiveSec), + GPUUtilPercent: s.extractMetricValue(families, mapping.GPUUtilization), + }, nil +} diff --git a/pkg/plugins/genai/main b/pkg/plugins/genai/main new file mode 100755 index 0000000..96b7203 Binary files /dev/null and b/pkg/plugins/genai/main differ diff --git a/pkg/plugins/genai/test/integration_test.go b/pkg/plugins/genai/test/integration_test.go new file mode 100644 index 0000000..d6b9fe6 --- /dev/null +++ b/pkg/plugins/genai/test/integration_test.go @@ -0,0 +1,50 @@ +package test + +import ( + "fmt" + "os/exec" + "testing" + "time" + + "github.com/hashicorp/go-plugin" + "github.com/opencost/opencost-plugins/pkg/plugins/genai/genaiprovider" +) + +func TestGenAIPlugin(t *testing.T) { + // 1. Point to the binary you just built + client := plugin.NewClient(&plugin.ClientConfig{ + HandshakeConfig: plugin.HandshakeConfig{ + ProtocolVersion: 1, + MagicCookieKey: "OPENCOST_PLUGIN_MAGIC_COOKIE", + MagicCookieValue: "genai-visibility", + }, + Plugins: map[string]plugin.Plugin{ + "customcost": &genaiprovider.CustomCostPlugin{}, + }, + Cmd: exec.Command("../genai-plugin"), // Adjust path to your binary + }) + defer client.Kill() + + // 2. Connect to the plugin + rpcClient, _ := client.Client() + raw, _ := rpcClient.Dispense("customcost") + source := raw.(genaiprovider.CustomCostSource) + + // 3. Request a 24-hour window + now := time.Now() + resp, err := source.GetCustomCosts(&genaiprovider.CustomCostRequest{ + Start: now.Add(-24 * time.Hour), + End: now, + Resolution: "hourly", + }) + + if err != nil { + t.Fatalf("Failed to get costs: %v", err) + } + + // 4. Validate the Data + fmt.Printf("Received %d GenAI workloads\n", len(resp)) + for _, w := range resp { + fmt.Printf("Pod: %s | Cost: $%.2f | Model: %s | Tokens: %d\n", w.PodName, w.TotalCost, w.ModelName, w.TotalTokens) + } +}