Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions cmd/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,8 @@ btf:
kernel: "/sys/kernel/btf/vmlinux"

output:
type: file
file:
path: "./log"
otel:
enable: true
endpoint: "localhost:4317"
type: clickhouse
clickhouse:
enable: true
host: "192.168.200.201"
port: "9000"
username: "default"
Expand Down
26 changes: 20 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.22.4

require (
github.com/ClickHouse/clickhouse-go/v2 v2.30.0
github.com/IBM/sarama v1.45.0
github.com/cheggaaa/pb/v3 v3.1.5
github.com/gin-contrib/pprof v1.5.2
github.com/gin-gonic/gin v1.10.0
Expand Down Expand Up @@ -31,6 +32,9 @@ require (
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fatih/color v1.15.0 // indirect
Expand All @@ -49,14 +53,23 @@ require (
github.com/goccy/go-json v0.10.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand All @@ -68,10 +81,11 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/paulmach/orb v0.11.1 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
Expand All @@ -83,10 +97,10 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/arch v0.12.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/term v0.27.0 // indirect
golang.org/x/term v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.36.1 // indirect
Expand All @@ -104,5 +118,5 @@ require (
github.com/cilium/ebpf v0.16.1-0.20241204125435-9895aae6467e
github.com/go-logr/logr v1.4.2 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
golang.org/x/sys v0.28.0
golang.org/x/sys v0.29.0
)
82 changes: 68 additions & 14 deletions go.sum

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion internal/config/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ const (
)

type ClickhouseOutputConfig struct {
Enable bool `yaml:"enable"`
Port string `yaml:"port"`
Host string `yaml:"host"`
Username string `yaml:"username"`
Expand Down
106 changes: 106 additions & 0 deletions internal/output/output.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package output

import (
"context"
"encoding/json"

"github.com/ClickHouse/clickhouse-go/v2"
ckdriver "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/cen-ngc5139/shepherd/internal/binary"
"github.com/cen-ngc5139/shepherd/internal/config"
"github.com/cen-ngc5139/shepherd/internal/log"
"github.com/cen-ngc5139/shepherd/pkg/client"
"github.com/cen-ngc5139/shepherd/pkg/kafka"
"github.com/pkg/errors"
)

type SinkCli struct {
CKCli CKCli
KafkaCli *kafka.Producer
}

type CKCli struct {
conn clickhouse.Conn
batch ckdriver.Batch
counter int
}

type Output struct {
SinkType config.OutputType
SinkCli SinkCli
ctx context.Context
}

func NewOutput(cfg config.Configuration, ctx context.Context) (*Output, error) {
o := &Output{SinkType: cfg.Output.Type, ctx: ctx}
if err := o.InitSinkCli(cfg.Output); err != nil {
return nil, errors.Wrapf(err, "failed to init sink %s client", o.SinkType)
}

return o, nil
}

func (o *Output) Close() {
if o.SinkType == config.OutputTypeClickhouse {
log.Info("close clickhouse client")
o.SinkCli.CKCli.conn.Close()
}
}

func (o *Output) InitSinkCli(cfg config.OutputConfig) (err error) {
if o.SinkType == config.OutputTypeClickhouse {
conn, err := client.NewClickHouseConn(cfg.Clickhouse)
if err != nil {
return errors.Wrap(err, "failed to init clickhouse client")
}

o.SinkCli.CKCli.batch, err = conn.PrepareBatch(o.ctx, `
INSERT INTO sched_latency (
pid, tid, delay_ns, ts,
preempted_pid, preempted_comm,
is_preempt, comm
)
`)
if err != nil {
return errors.Wrap(err, "failed to prepare batch")
}

o.SinkCli.CKCli.conn = conn
}

if o.SinkType == config.OutputTypeKafka {
o.SinkCli.KafkaCli, err = kafka.NewSyncProducer(config.Config.Output.Kafka.Brokers, config.Config.Output.Kafka.Topic, true, true)
if err != nil {
return errors.Wrap(err, "failed to init kafka client")
}
}

return nil
}

func (o *Output) Push(event binary.ShepherdSchedLatencyT) error {
if o.SinkType == config.OutputTypeClickhouse {
batch, count, err := insertSchedMetrics(o.ctx, o.SinkCli.CKCli.conn, o.SinkCli.CKCli.batch, event, o.SinkCli.CKCli.counter)
if err != nil {
return errors.Wrap(err, "failed to insert sched metrics")
}
o.SinkCli.CKCli.batch = batch
o.SinkCli.CKCli.counter = count
}

if o.SinkType == config.OutputTypeKafka {
raw, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "failed to marshal event")
}

_, _, err = o.SinkCli.KafkaCli.SyncSendMessage(raw)
if err != nil {
return errors.Wrap(err, "fail to push kafka data")
}
}

log.StdoutOrFile("file", event)

return nil
}
30 changes: 5 additions & 25 deletions internal/output/sched_delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package output

import (
"context"
"fmt"
"os"

"github.com/ClickHouse/clickhouse-go/v2"
Expand All @@ -12,7 +11,6 @@ import (
"github.com/cen-ngc5139/shepherd/internal/config"
"github.com/cen-ngc5139/shepherd/internal/log"
"github.com/cen-ngc5139/shepherd/internal/metadata"
"github.com/cen-ngc5139/shepherd/pkg/client"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/perf"
)
Expand All @@ -27,27 +25,12 @@ func ProcessSchedDelay(coll *ebpf.Collection, ctx context.Context, cfg config.Co

defer perfReader.Close()

conn, err := client.NewClickHouseConn(cfg, cfg.Output.Clickhouse.Database)
output, err := NewOutput(cfg, ctx)
if err != nil {
log.Fatalf("failed to connect to clickhouse: %v", err)
log.Fatalf("failed to init output: %v", err)
}

defer conn.Close()

// 准备批量插入语句
batch, err := conn.PrepareBatch(ctx, `
INSERT INTO sched_latency (
pid, tid, delay_ns, ts,
preempted_pid, preempted_comm,
is_preempt, comm
)
`)
if err != nil {
log.Fatalf("failed to prepare batch: %v", err)
}

// 添加静态计数器
var count int
defer output.Close()

var event binary.ShepherdSchedLatencyT
for {
Expand All @@ -62,11 +45,8 @@ func ProcessSchedDelay(coll *ebpf.Collection, ctx context.Context, cfg config.Co
continue
}

fmt.Println(event)

batch, count, err = insertSchedMetrics(ctx, conn, batch, event, count)
if err != nil {
log.Errorf("failed to insert sched metrics: %v", err)
if err := output.Push(event); err != nil {
log.Errorf("failed to push event: %v", err)
continue
}

Expand Down
12 changes: 5 additions & 7 deletions pkg/client/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,14 @@ import (
"github.com/cen-ngc5139/shepherd/internal/config"
)

func NewClickHouseConn(cfg config.Configuration, db string) (clickhouse.Conn, error) {
ckCfg := cfg.Output.Clickhouse

func NewClickHouseConn(cfg config.ClickhouseOutputConfig) (clickhouse.Conn, error) {
conn, err := clickhouse.Open(&clickhouse.Options{
Protocol: clickhouse.HTTP,
Addr: []string{fmt.Sprintf("%s:%s", ckCfg.Host, ckCfg.Port)},
Addr: []string{fmt.Sprintf("%s:%s", cfg.Host, cfg.Port)},
Auth: clickhouse.Auth{
Database: db,
Username: ckCfg.Username,
Password: ckCfg.Password,
Database: cfg.Database,
Username: cfg.Username,
Password: cfg.Password,
},
MaxIdleConns: 5,
MaxOpenConns: 10,
Expand Down
Loading