diff --git a/build.sh b/build.sh index 8818ae6..a98f4ef 100755 --- a/build.sh +++ b/build.sh @@ -1 +1 @@ -gox -osarch="darwin/amd64 linux/386 linux/amd64 windows/amd64 freebsd/386 freebsd/amd64 linux/arm linux/arm64" --output="dist/{{.Dir}}_{{.OS}}_{{.Arch}}" +gox -osarch="darwin/amd64 darwin/arm64 linux/386 linux/amd64 windows/amd64 freebsd/386 freebsd/amd64 linux/arm linux/arm64" --output="dist/{{.Dir}}_{{.OS}}_{{.Arch}}" diff --git a/go.mod b/go.mod index 2d6e28b..23eb9db 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,28 @@ module github.com/tigrawap/goader -go 1.16 +go 1.21 + +toolchain go1.23.3 require ( github.com/dustin/go-humanize v1.0.0 - github.com/mattn/go-colorable v0.1.12 // indirect github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d + github.com/prometheus/client_golang v1.21.0 + github.com/prometheus/client_model v0.6.1 + github.com/prometheus/common v0.62.0 github.com/valyala/fasthttp v1.34.0 - golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886 + golang.org/x/sys v0.28.0 +) + +require ( + github.com/andybalholm/brotli v1.0.4 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/klauspost/compress v1.17.11 // indirect + github.com/mattn/go-colorable v0.1.12 // indirect + github.com/mattn/go-isatty v0.0.16 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/procfs v0.15.1 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + google.golang.org/protobuf v1.36.1 // indirect ) diff --git a/go.sum b/go.sum index a2fbe68..a7625f7 100644 --- a/go.sum +++ b/go.sum @@ -1,15 +1,41 @@ github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/klauspost/compress v1.15.0 h1:xqfchp4whNFxn5A4XFyyYtitiWI8Hy5EW59jEwcyL6U= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= -github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQE9x6ikvDFZS2mDVS3drnohI= github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.21.0 h1:DIsaGmiaBkSangBgMtWdNfxbMNdku5IK6iNhrEqWvdA= +github.com/prometheus/client_golang v1.21.0/go.mod h1:U9NM32ykUErtVBxdvD3zfi+EuFkkaBvMb09mIfe0Zgg= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= +github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.34.0 h1:d3AAQJ2DRcxJYHm7OXNXtXt2as1vMDfxeIcFvhmGGm4= @@ -25,10 +51,15 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886 h1:eJv7u3ksNXoLbGSKuv2s/SIO4tJVxc/A+MTpzxDgz/Q= -golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= +google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/goader.go b/goader.go index 08688ef..abe4ae6 100644 --- a/goader.go +++ b/goader.go @@ -29,7 +29,7 @@ import ( "sync/atomic" ) -//Request struct +// Request struct type Request struct { targeter Target url string @@ -43,84 +43,89 @@ func (r *Request) getUrl() string { return r.url } -//Response struct +// Response struct type Response struct { - request *Request - latency time.Duration - err error + request *Request + latency time.Duration + payloadSize int64 + err error } -//Various constants to avoid typos +// Various constants to avoid typos const ( - LowLatency = "low-latency" - ConstantRatio = "constant" - ConstantThreads = "constant-threads" - Sleep = "sleep" - Upload = "upload" - Http = "http" - S3 = "s3" - Disk = "disk" - Meta = "meta" - Null = "null" - NotSet = -1 - EmptyString = "" - NotSetString = "_GOADER_NOT_SET_" - NotSetFloat64 = -1.0 - FormatHuman = "human" - FormatJSON = "json" - HttpsScheme = "https" - HttpScheme = "http" + LowLatency = "low-latency" + ConstantRatio = "constant" + ConstantThreads = "constant-threads" + Sleep = "sleep" + Upload = "upload" + Http = "http" + S3 = "s3" + Disk = "disk" + Meta = "meta" + Null = "null" + NotSet = -1 + EmptyString = "" + NotSetString = "_GOADER_NOT_SET_" + NotSetFloat64 = -1.0 + FormatHuman = "human" + FormatJSON = "json" + FormatPrometheus = "prometheus" + HttpsScheme = "https" + HttpScheme = "http" + MaxGoodInMemUrlsForIndefiniteRun = 1000000 // only used if numRequests = -1 ) var config struct { - url string //url/pattern - urlsSourceFile string //url/pattern - writtenUrlsDump string - rps int - wps int - rpw float64 - writeThreads int - readThreads int - mkdirs bool - maxChannels int - verbose bool - memoryDebug bool - maxRequests int64 - bodySize uint64 - minBodySize uint64 - maxBodySize uint64 - bodySizeInput string - minBodySizeInput string - maxBodySizeInput string - metaOps metaOps - metaXattrKeys int - metaXattrLength int - randomFairDistribution bool - randomFairBuckets int - fileOffsetLimitInput string - fileOffsetLimit uint64 - mode string - engine string - outputFormat string - showProgress bool - stopOnBadRate bool - adjustOnErrors bool - output Output - syncSleep time.Duration - maxLatency time.Duration - S3ApiKey string - S3Bucket string - S3Endpoint string - S3HttpScheme string - S3Region string - S3SecretKey string - S3SignatureVersion int - timelineFile string - seed int64 - writeGoodUrls bool -} - -//OPResults result of specific operation, lately can be printed by different outputters + url string //url/pattern + urlsSourceFile string //url/pattern + writtenUrlsDump string + rps int + wps int + rpw float64 + writeThreads int + readThreads int + mkdirs bool + maxChannels int + verbose bool + memoryDebug bool + maxRequests int64 + bodySize uint64 + minBodySize uint64 + maxBodySize uint64 + bodySizeInput string + minBodySizeInput string + maxBodySizeInput string + bodySizeGranularity int64 + metaOps metaOps + metaXattrKeys int + metaXattrLength int + randomFairDistribution bool + randomFairBuckets int + fileOffsetLimitInput string + fileOffsetLimit uint64 + mode string + engine string + outputFormat string + showProgress bool + stopOnBadRate bool + adjustOnErrors bool + output Output + syncSleep time.Duration + maxLatency time.Duration + S3ApiKey string + S3Bucket string + S3Endpoint string + S3HttpScheme string + S3Region string + S3SecretKey string + S3SignatureVersion int + timelineFile string + seed int64 + writeGoodUrls bool + enablePrometheusMetrics bool +} + +// OPResults result of specific operation, lately can be printed by different outputters type OPResults struct { Errors int64 Done int64 @@ -133,7 +138,7 @@ type OPResults struct { StaggeredFor time.Duration } -//Results of benchmark execution, represents final output, marshalled directly into json in json output mode +// Results of benchmark execution, represents final output, marshalled directly into json in json output mode type Results struct { Writes OPResults Reads OPResults @@ -151,7 +156,10 @@ func startWorker(progress *Progress, state *OPState, targeter Target, requester case <-state.requests: if !(atomic.AddInt64(&progress.totalRequests, 1) > config.maxRequests) { requester.request(state.responses, &Request{targeter: targeter, startTime: time.Now()}) + } else if config.maxRequests == -1 { + requester.request(state.responses, &Request{targeter: targeter, startTime: time.Now()}) } else { + return } } @@ -160,13 +168,13 @@ func startWorker(progress *Progress, state *OPState, targeter Target, requester type op int -//Operation type +// Operation type const ( READ op = iota WRITE ) -//OPState contains state for OP(READ/WRITE) +// OPState contains state for OP(READ/WRITE) type OPState struct { color string concurrency int @@ -201,21 +209,22 @@ func newOPState(op op, color string) *OPState { buffersLen = o } } + arraySize := max(config.maxRequests, 10000) state := OPState{ op: op, name: name, color: color, - latencies: make(timeArray, 0, config.maxRequests), + latencies: make(timeArray, 0, arraySize), colored: ansi.ColorFunc(fmt.Sprintf("%s+h:black", color)), responses: make(chan *Response, buffersLen), requests: make(chan int, buffersLen), progress: make(chan bool, buffersLen), } if config.timelineFile != EmptyString { - state.timeline = make([]RequestTimes, 0, config.maxRequests) + state.timeline = make([]RequestTimes, 0, arraySize) } if op == WRITE && config.writeGoodUrls { - state.goodUrls = make([]string, 0, config.maxRequests) + state.goodUrls = make([]string, 0, arraySize) } return &state } @@ -251,20 +260,35 @@ func processResponses(state *OPState, results *Results, adjuster Adjuster, w *sy case response = <-state.responses: state.opDone() state.progress <- response.err == nil - if response.err == nil { - state.totalTime += response.latency - state.slicesLock.Lock() - if state.op == WRITE && config.writeGoodUrls { - state.goodUrls = append(state.goodUrls, response.request.getUrl()) + if config.enablePrometheusMetrics { + if counter, ok := OpCounters[state.name]; ok { + if response.err == nil { + counter.WithLabelValues("success", strconv.FormatInt(response.payloadSize, 10)).Inc() + } else { + counter.WithLabelValues("error", strconv.FormatInt(response.payloadSize, 10)).Inc() + } + } + if hist, ok := OpDurations[state.name]; ok { + hist.Observe(float64(response.latency.Nanoseconds())) } - state.latencies = append(state.latencies, response.latency) - state.slicesLock.Unlock() } else { - if config.verbose { - results.reportError(response.err.Error()) + if response.err == nil { + state.totalTime += response.latency + state.slicesLock.Lock() + state.latencies = append(state.latencies, response.latency) + state.slicesLock.Unlock() + } else { + if config.verbose { + results.reportError(response.err.Error()) + } + state.errors++ } - state.errors++ } + + if response.err == nil && state.op == WRITE && config.writeGoodUrls && (config.maxRequests > 0 || len(state.goodUrls) < MaxGoodInMemUrlsForIndefiniteRun) { + state.goodUrls = append(state.goodUrls, response.request.getUrl()) + } + if config.timelineFile != EmptyString { state.timeline = append(state.timeline, RequestTimes{ Start: response.request.startTime, @@ -326,16 +350,16 @@ func fillResults(results *OPResults, state *OPState, startTime time.Time) { results.AverageGoodOps = int64(float64((state.getDone()-state.errors)*int64(time.Second))/float64(time.Since(startTime).Nanoseconds())) + 1 } -//Operators chosen by config +// Operators chosen by config type Operators struct { - readEmitter Emitter - writeEmitter Emitter - readAdjuster Adjuster - writeAdjuster Adjuster - readRequester Requester - writeRequster Requester - readTarget Target - writeTarget Target + readEmitter Emitter + writeEmitter Emitter + readAdjuster Adjuster + writeAdjuster Adjuster + readRequester Requester + writeRequester Requester + readTarget Target + writeTarget Target } func getOperators(progress *Progress) *Operators { @@ -371,10 +395,10 @@ func getOperators(progress *Progress) *Operators { } switch config.engine { case Sleep: - operators.writeRequster = newSleepRequster(progress.writes) + operators.writeRequester = newSleepRequster(progress.writes) operators.readRequester = newSleepRequster(progress.reads) case Upload, Http: - operators.writeRequster = newHTTPRequester(progress.writes, &nullAuther{}) + operators.writeRequester = newHTTPRequester(progress.writes, &nullAuther{}) operators.readRequester = newHTTPRequester(progress.reads, &nullAuther{}) case S3: s3params := s3Params{ @@ -390,16 +414,16 @@ func getOperators(progress *Progress) *Operators { } else { s3Auther = &s3AutherV2{s3params} } - operators.writeRequster = newHTTPRequester(progress.writes, s3Auther) + operators.writeRequester = newHTTPRequester(progress.writes, s3Auther) operators.readRequester = newHTTPRequester(progress.reads, s3Auther) case Disk: - operators.writeRequster = newDiskRequester(progress.writes) + operators.writeRequester = newDiskRequester(progress.writes) operators.readRequester = newDiskRequester(progress.reads) case Meta: - operators.writeRequster = newMetaRequester(progress.writes) + operators.writeRequester = newMetaRequester(progress.writes) operators.readRequester = newMetaRequester(progress.reads) case Null: - operators.writeRequster = &nullRequester{} + operators.writeRequester = &nullRequester{} operators.readRequester = &nullRequester{} default: log.Println("Unknown engine") @@ -447,7 +471,7 @@ func p(s string) { config.output.progress(s) } -//TODO: Don't really like this global variables, methods on result, global config. Need to rethink +// TODO: Don't really like this global variables, methods on result, global config. Need to rethink func (r *Results) reportError(s string) { r.Errors = append(r.Errors, s) config.output.reportError(s) @@ -542,7 +566,7 @@ func makeLoad() { default: time.Sleep(time.Millisecond) } - if progress.reads.getDone()+progress.writes.getDone() >= config.maxRequests { + if progress.reads.getDone()+progress.writes.getDone() >= config.maxRequests && config.maxRequests != -1 { results.report("Maximum requests count") break FOR_LOOP } @@ -561,7 +585,7 @@ func makeLoad() { for i := 0; i < config.maxChannels; i++ { go startWorker(&progress, progress.reads, operators.readTarget, operators.readRequester, stopWorkers, workersWait) - go startWorker(&progress, progress.writes, operators.writeTarget, operators.writeRequster, stopWorkers, workersWait) + go startWorker(&progress, progress.writes, operators.writeTarget, operators.writeRequester, stopWorkers, workersWait) } go processResponses(progress.writes, &results, operators.writeAdjuster, responseWait, stopWorkers) go processResponses(progress.reads, &results, operators.readAdjuster, responseWait, stopWorkers) @@ -700,6 +724,9 @@ func selectPrinter() { config.output = newHumanOutput() case FormatJSON: config.output = &JSONOutput{} + case FormatPrometheus: + config.output = &PrometheusOutput{} + config.enablePrometheusMetrics = true default: fmt.Println("Unknown output format") os.Exit(2) @@ -761,7 +788,7 @@ func setPayloadGetter() { if config.randomFairDistribution { requestersConfig.payloadGetter = newFairPayload(fullData, int64(config.minBodySize), config.randomFairBuckets) } else { - requestersConfig.payloadGetter = newRandomPayload(fullData, int64(config.minBodySize)) + requestersConfig.payloadGetter = newRandomPayload(fullData, int64(config.minBodySize), config.bodySizeGranularity) } } requestersConfig.scratchBufferGetter = newScratchDataPayloadGetter(len(requestersConfig.fullData)) @@ -801,6 +828,7 @@ func configure() { flag.StringVar(&config.bodySizeInput, "body-size", "160KiB", "Body size for put requests, in bytes.") flag.StringVar(&config.maxBodySizeInput, "max-body-size", NotSetString, "Maximum body size for put requests (will randomize)") flag.StringVar(&config.minBodySizeInput, "min-body-size", NotSetString, "Minimal body size for put requests (will randomize)") + flag.Int64Var(&config.bodySizeGranularity, "body-size-granularity", 1, "Granularity of body size, i.e 4096 will randomize body size adjusted to 4096 bytes") flag.BoolVar(&config.randomFairDistribution, "fair-random", false, "Will produce fair distribution of body sizes, i.e with size 1-100 will produce ~100 requests of 1 byte and 1 requests of 100 bytes") flag.StringVar(&config.engine, "requests-engine", Disk, "s3/sleep/upload/http") flag.DurationVar(&config.maxLatency, "max-latency", NotSet, @@ -828,17 +856,22 @@ func configure() { flag.IntVar(&config.metaXattrLength, "meta-xattr-length", 5120, "Maximum length of xattrs, using weighed algorithm to distribute the sizes") flag.StringVar(&config.fileOffsetLimitInput, "meta-offset-limit", "16MiB", "Limit of offset for writes/truncate") flag.Int64Var(&config.seed, "seed", NotSet, "Seed to use in random generator") + flag.BoolVar(&config.enablePrometheusMetrics, "enable-prometheus-metrics", false, "Enable prometheus metrics endpoint, for long runs in Kubernetes") flag.Parse() var err error config.bodySize, err = humanize.ParseBytes(config.bodySizeInput) if config.url == EmptyString && flag.NArg() == 1 { config.url = flag.Args()[0] } + if config.outputFormat == FormatPrometheus { + config.enablePrometheusMetrics = true + } if err != nil { fmt.Println(err.Error()) } setParams() + setMetrics() setRandomData() setPayloadGetter() selectMode() diff --git a/interfaces.go b/interfaces.go index 190dbd2..6e792d7 100644 --- a/interfaces.go +++ b/interfaces.go @@ -8,32 +8,32 @@ type Emitter interface { emitRequests(state *OPState) } -//URLFormatter alters user supplied url string +// URLFormatter alters user supplied url string type URLFormatter interface { format(requestNum int64) string } -//HTTPAuther alters http requests with authentication headers +// HTTPAuther alters http requests with authentication headers type HTTPAuther interface { sign(r *fasthttp.Request) } -//Adjuster should decide whether change throughput based on response +// Adjuster should decide whether change throughput based on response type Adjuster interface { adjust(response *Response) } -//Requester does actual requests to server/fs +// Requester does actual requests to server/fs type Requester interface { request(channel chan *Response, request *Request) } -//Target supplies requester with num of file/template +// Target supplies requester with num of file/template type Target interface { get() string } -//Output presents result in human or variable machine readable forms +// Output presents result in human or variable machine readable forms type Output interface { progress(s string) reportError(s string) diff --git a/output.go b/output.go index 455fdb9..ff48f32 100644 --- a/output.go +++ b/output.go @@ -9,13 +9,13 @@ import ( "github.com/mgutz/ansi" ) -//HumanOutput will represent results in human form +// HumanOutput will represent results in human form type HumanOutput struct { pb chan string quit chan bool } -//JSONOutput will represent results in json form +// JSONOutput will represent results in json form type JSONOutput struct{} func newHumanOutput() *HumanOutput { @@ -120,3 +120,18 @@ func (o *JSONOutput) report(s string) { func (o *JSONOutput) reportError(s string) { } + +type PrometheusOutput struct{} + +func (p *PrometheusOutput) progress(s string) { +} + +func (p *PrometheusOutput) reportError(s string) { +} + +func (p *PrometheusOutput) report(s string) { +} + +func (p *PrometheusOutput) printResults(result *Results) { + dumpMetrics() +} diff --git a/payload_getters.go b/payload_getters.go index cafd3c9..0ca20e6 100644 --- a/payload_getters.go +++ b/payload_getters.go @@ -27,9 +27,10 @@ func newFullPayload(data []byte) *fullPayload { } type randomPayload struct { - data []byte - min int64 - max int64 + data []byte + min int64 + max int64 + blockSize int64 } func (p *randomPayload) Get() []byte { @@ -44,19 +45,20 @@ func (p *randomPayload) GetLength() int64 { if p.max == p.min { return p.max } - return p.min + rand.Int63n(p.max-p.min) + return (p.min + rand.Int63n(p.max-p.min)) / p.blockSize * p.blockSize } -func newRandomPayload(data []byte, min int64) *randomPayload { +func newRandomPayload(data []byte, min int64, blockSize int64) *randomPayload { max := int64(len(data)) - return &randomPayload{data, min, max} + return &randomPayload{data, min, max, blockSize} } type fairRandomPayload struct { - data []byte - min int64 - max int64 - roller *utils.WeightedRoller + data []byte + min int64 + max int64 + blockSize int64 + roller *utils.WeightedRoller } func (p *fairRandomPayload) Get() []byte { diff --git a/prometheus.go b/prometheus.go new file mode 100644 index 0000000..d2e581b --- /dev/null +++ b/prometheus.go @@ -0,0 +1,101 @@ +package main + +import ( + "fmt" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "net/http" + "os" +) + +var ( + OpCounters = make(map[string]*prometheus.CounterVec) + OpDurations = make(map[string]prometheus.Histogram) +) + +func initMetrics() { + metrics := allMetaOps + metrics = append(metrics, "sleep", "null") + for _, op := range allMetaOps { + counterVec := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "goader_" + string(op) + "_ops", + Help: "Number of " + string(op) + " operations", + }, + []string{"status", "payload_size"}, + ) + prometheus.MustRegister(counterVec) + OpCounters[string(op)] = counterVec + + duration := prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "goader_" + string(op) + "_duration_nanoseconds", + Help: "Duration of " + string(op) + " operations in nanoseconds", + Buckets: prometheus.ExponentialBuckets(64, 2, 25), // Adjust buckets as needed + }) + prometheus.MustRegister(duration) + OpDurations[string(op)] = duration + } +} + +func setMetrics() { + if !config.enablePrometheusMetrics { + return + } + + // Create a new HTTP server to serve metrics + http.Handle("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{ + DisableCompression: true, + })) + initMetrics() + + go func() { + http.ListenAndServe(":8090", nil) + }() +} + +func dumpMetrics() { + metrics, err := prometheus.DefaultGatherer.Gather() + if err != nil { + fmt.Println("Error gathering metrics:", err) + return + } + + for _, m := range metrics { + skip := true + for _, metric := range m.Metric { + switch *m.Type { + case io_prometheus_client.MetricType_COUNTER: + if metric.Counter.GetValue() != 0 { + skip = false + } + case io_prometheus_client.MetricType_GAUGE: + if metric.Gauge.GetValue() != 0 { + skip = false + } + case io_prometheus_client.MetricType_HISTOGRAM: + if metric.Histogram.GetSampleCount() != 0 { + skip = false + } + case io_prometheus_client.MetricType_SUMMARY: + if metric.Summary.GetSampleCount() != 0 { + skip = false + } + case io_prometheus_client.MetricType_UNTYPED: + if metric.Untyped.GetValue() != 0 { + skip = false + } + } + } + + if skip { + continue + } + + encoder := expfmt.NewEncoder(os.Stdout, expfmt.FmtText) + err := encoder.Encode(m) + if err != nil { + fmt.Println("Error encoding metric:", err) + } + } +} diff --git a/requesters.go b/requesters.go index e57a97b..f705002 100644 --- a/requesters.go +++ b/requesters.go @@ -22,18 +22,18 @@ import ( type nullRequester struct { } -type sleepRequster struct { +type sleepRequester struct { state *OPState db chan int } func (n *nullRequester) request(responses chan *Response, request *Request) { - responses <- &Response{request, time.Nanosecond, nil} + responses <- &Response{request, time.Nanosecond * time.Duration(rand.Intn(1000000)), 0, nil} } -func (requester *sleepRequster) request(responses chan *Response, request *Request) { +func (requester *sleepRequester) request(responses chan *Response, request *Request) { if rand.Intn(10000)-int(requester.state.inFlight) < 0 { - responses <- &Response{request, 0, errors.New("Bad response")} + responses <- &Response{request, 0, 0, errors.New("Bad response")} return } start := time.Now() @@ -41,11 +41,11 @@ func (requester *sleepRequster) request(responses chan *Response, request *Reque var timeToSleep = time.Duration(rand.Intn(200)) * time.Millisecond time.Sleep(timeToSleep) <-requester.db - responses <- &Response{request, time.Since(start), nil} + responses <- &Response{request, time.Since(start), 0, nil} } -func newSleepRequster(state *OPState) *sleepRequster { - r := sleepRequster{ +func newSleepRequster(state *OPState) *sleepRequester { + r := sleepRequester{ state: state, db: make(chan int, 10), } @@ -92,7 +92,7 @@ func (requester *httpRequester) request(responses chan *Response, request *Reque defer func() { if err := recover(); err != nil { //catch responses <- &Response{&Request{targeter: &BadUrlTarget{}, startTime: time.Now()}, time.Nanosecond, - fmt.Errorf("Error: %s,%v", "panic:", err)} + 0, fmt.Errorf("Error: %s,%v", "panic:", err)} return } }() @@ -110,8 +110,11 @@ func (requester *httpRequester) request(responses chan *Response, request *Reque req.Header.SetMethodBytes([]byte(requester.method)) req.Header.Set("Connection", "keep-alive") + var payloadSize int64 = 0 if requester.method == "PUT" || requester.method == "POST" { - req.SetBody(requestersConfig.payloadGetter.Get()) + body := requestersConfig.payloadGetter.Get() + payloadSize = int64(len(body)) + req.SetBody(body) } requester.auther.sign(req) resp := fasthttp.AcquireResponse() @@ -123,15 +126,15 @@ func (requester *httpRequester) request(responses chan *Response, request *Reque if err != nil { responses <- &Response{request, timeSpent, - fmt.Errorf("Bad request: %s\n%s\n%s", err, resp.Header.String(), resp.Body())} + payloadSize, fmt.Errorf("Bad request: %s\n%s\n%s", err, resp.Header.String(), resp.Body())} return } switch statusCode { case fasthttp.StatusOK, fasthttp.StatusCreated: - responses <- &Response{request, timeSpent, nil} + responses <- &Response{request, timeSpent, payloadSize, nil} default: - responses <- &Response{request, timeSpent, + responses <- &Response{request, timeSpent, payloadSize, fmt.Errorf("Error: %d \n%s \n%s ", resp.StatusCode(), resp.Header.String(), resp.Body())} } } @@ -155,9 +158,12 @@ func (requester *diskRequester) doRequest(responses chan *Response, request *Req filename := request.getUrl() var err error + var payloadSize int64 = 0 start := time.Now() if requester.state.op == WRITE { - err = ioutil.WriteFile(filename, requestersConfig.payloadGetter.Get(), 0644) + payload := requestersConfig.payloadGetter.Get() + payloadSize = int64(len(payload)) + err = ioutil.WriteFile(filename, payload, 0644) if os.IsNotExist(err) && config.mkdirs { err = os.MkdirAll(path.Dir(filename), 0755) if !isRetry { @@ -169,12 +175,13 @@ func (requester *diskRequester) doRequest(responses chan *Response, request *Req if fd, err := os.OpenFile(filename, os.O_RDONLY, 0644); err == nil { if fi, err := fd.Stat(); err == nil { size := fi.Size() + payloadSize = size fd.Read(requestersConfig.scratchBufferGetter.GetBuffer(size)) fd.Close() } } } - responses <- &Response{request, time.Since(start), err} + responses <- &Response{request, time.Since(start), payloadSize, err} } type metaOpRequst string @@ -229,6 +236,7 @@ func (r *metaRequester) doRequest(responses chan *Response, request *Request, is var err error start := time.Now() op := r.ops[rand.Intn(r.opLen)] + var payloadSize int64 = 0 switch op { case opWrite, opRead: var fd *os.File @@ -241,9 +249,12 @@ func (r *metaRequester) doRequest(responses chan *Response, request *Request, is if fd, err = os.OpenFile(filename, flags, 0644); err == nil { fd.Seek(rand.Int63n(int64(config.fileOffsetLimit)), io.SeekStart) if op == opWrite { - fd.Write(requestersConfig.payloadGetter.Get()) + payload := requestersConfig.payloadGetter.Get() + payloadSize = int64(len(payload)) + fd.Write(payload) } else { - fd.Read(requestersConfig.scratchBufferGetter.GetBuffer(requestersConfig.payloadGetter.GetLength())) + payloadSize = requestersConfig.payloadGetter.GetLength() + fd.Read(requestersConfig.scratchBufferGetter.GetBuffer(payloadSize)) } fd.Close() // TODO: Defer and reuse FD } @@ -283,7 +294,7 @@ func (r *metaRequester) doRequest(responses chan *Response, request *Request, is return } } - responses <- &Response{request, time.Since(start), err} + responses <- &Response{request, time.Since(start), payloadSize, err} } func newMetaRequester(state *OPState) *metaRequester {