From 23802dade54c78618542d19e9463ffa6bcec54f2 Mon Sep 17 00:00:00 2001 From: Steven Hartland Date: Mon, 30 Jan 2017 17:05:50 +0000 Subject: [PATCH 01/35] Add GaugeRelative to support Telegraf statsd implementation Telegraf doesn't support support negative counters, requiring relative gauge values instead. This adds the new method GaugeRelative to allow for this. --- conn.go | 15 +++++++++++++++ statsd.go | 8 ++++++++ 2 files changed, 23 insertions(+) diff --git a/conn.go b/conn.go index 4dbda63..3004722 100644 --- a/conn.go +++ b/conn.go @@ -1,6 +1,7 @@ package statsd import ( + "fmt" "io" "math/rand" "net" @@ -93,6 +94,20 @@ func (c *conn) metric(prefix, bucket string, n interface{}, typ string, rate flo c.mu.Unlock() } +func (c *conn) gaugeRelative(prefix, bucket string, value interface{}, tags string) { + c.mu.Lock() + l := len(c.buf) + c.appendBucket(prefix, bucket, tags) + if isNegative(value) { + c.appendNumber(value) + } else { + c.appendString(fmt.Sprintf("+%v", value)) + } + c.appendType("g") + c.flushIfBufferFull(l) + c.mu.Unlock() +} + func (c *conn) gauge(prefix, bucket string, value interface{}, tags string) { c.mu.Lock() l := len(c.buf) diff --git a/statsd.go b/statsd.go index f19204d..28885b5 100644 --- a/statsd.go +++ b/statsd.go @@ -100,6 +100,14 @@ func (c *Client) Gauge(bucket string, value interface{}) { c.conn.gauge(c.prefix, bucket, value, c.tags) } +// GaugeRelative records an relative value for the given bucket. +func (c *Client) GaugeRelative(bucket string, value interface{}) { + if c.skip() { + return + } + c.conn.gaugeRelative(c.prefix, bucket, value, c.tags) +} + // Timing sends a timing value to a bucket. func (c *Client) Timing(bucket string, value interface{}) { if c.skip() { From a51b818945282449703ebe9d8f5339d2b8ff65b3 Mon Sep 17 00:00:00 2001 From: David Hudson Date: Fri, 6 Apr 2018 12:29:37 +0100 Subject: [PATCH 02/35] Update client to not require a successful ping Please refer to https://github.com/gdiazlo/statsd as a solution to this issue in the original repo https://github.com/alexcesaro/statsd/issues/6. This will add the functionality needed while keeping the additional changes that are currently present in this fork. It is useful to have this change as the metrics within a service *shouldn't* stop a service from operating. Currently, it works as is but as soon as `.Clone` is called on the client, it crashes due to a nil conn (caused by this check). --- conn.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/conn.go b/conn.go index 3004722..de8ad04 100644 --- a/conn.go +++ b/conn.go @@ -46,18 +46,6 @@ func newConn(conf connConfig, muted bool) (*conn, error) { if err != nil { return c, err } - // When using UDP do a quick check to see if something is listening on the - // given port to return an error as soon as possible. - if c.network[:3] == "udp" { - for i := 0; i < 2; i++ { - _, err = c.w.Write(nil) - if err != nil { - _ = c.w.Close() - c.w = nil - return c, err - } - } - } // To prevent a buffer overflow add some capacity to the buffer to allow for // an additional metric. From f706d6df8f645133f8bc47a83449e2172620b628 Mon Sep 17 00:00:00 2001 From: David Hudson Date: Fri, 6 Apr 2018 12:38:10 +0100 Subject: [PATCH 03/35] Change references to multiplay project --- README.md | 4 +++- examples_test.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 774a1c6..b73d18e 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,8 @@ statsd is a simple and efficient [Statsd](https://github.com/etsy/statsd) client. +This is a fork of a [dead statsd client project](https://github.com/alexcesaro/statsd) with various impprovements. + See the [benchmark](https://github.com/alexcesaro/statsdbench) for a comparison with other Go StatsD clients. @@ -27,7 +29,7 @@ https://godoc.org/gopkg.in/alexcesaro/statsd.v2 ## Download - go get gopkg.in/alexcesaro/statsd.v2 + go get github.com/multiplay/statsd ## Example diff --git a/examples_test.go b/examples_test.go index dc6c2f3..45ff0fd 100644 --- a/examples_test.go +++ b/examples_test.go @@ -5,7 +5,7 @@ import ( "runtime" "time" - "gopkg.in/alexcesaro/statsd.v2" + "github.com/multiplay/statsd" ) var ( From 06d149b64f4dd3a04af5a96182b391631f9fa026 Mon Sep 17 00:00:00 2001 From: David Hudson Date: Fri, 13 Apr 2018 10:45:51 +0100 Subject: [PATCH 04/35] Remove old references from README.md --- README.md | 24 ++---------------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index b73d18e..3afc3d5 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,4 @@ # statsd -[![Build Status](https://travis-ci.org/alexcesaro/statsd.svg?branch=v2)](https://travis-ci.org/alexcesaro/statsd) [![Code Coverage](http://gocover.io/_badge/gopkg.in/alexcesaro/statsd.v2)](http://gocover.io/gopkg.in/alexcesaro/statsd.v2) [![Documentation](https://godoc.org/gopkg.in/alexcesaro/statsd.v2?status.svg)](https://godoc.org/gopkg.in/alexcesaro/statsd.v2) ## Introduction @@ -8,35 +7,18 @@ client. This is a fork of a [dead statsd client project](https://github.com/alexcesaro/statsd) with various impprovements. -See the [benchmark](https://github.com/alexcesaro/statsdbench) for a comparison -with other Go StatsD clients. - ## Features -- Supports all StatsD metrics: counter, gauge, timing and set +- Supports all StatsD metrics: counter, gauge (absolute and relative), timing and set - Supports InfluxDB and Datadog tags - Fast and GC-friendly: all functions for sending metrics do not allocate - Efficient: metrics are buffered by default - Simple and clean API -- 100% test coverage -- Versioned API using gopkg.in - - -## Documentation - -https://godoc.org/gopkg.in/alexcesaro/statsd.v2 - ## Download go get github.com/multiplay/statsd - -## Example - -See the [examples in the documentation](https://godoc.org/gopkg.in/alexcesaro/statsd.v2#example-package). - - ## License [MIT](LICENSE) @@ -47,6 +29,4 @@ See the [examples in the documentation](https://godoc.org/gopkg.in/alexcesaro/st Do you have any question the documentation does not answer? Is there a use case that you feel is common and is not well-addressed by the current API? -If so you are more than welcome to ask questions in the -[thread on golang-nuts](https://groups.google.com/d/topic/golang-nuts/Tz6t4_iLgnw/discussion) -or open an issue or send a pull-request here on Github. +If so you are more than welcome to ask questions or open an issue or send a pull-request here on Github. From 687c8cb8044e5d70b99d3fdfa69b4a5a3ed80a0d Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Tue, 21 May 2019 09:03:24 +1000 Subject: [PATCH 05/35] refactor(conn): remove unused fields that were being populated from the config --- conn.go | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) mode change 100644 => 100755 conn.go diff --git a/conn.go b/conn.go old mode 100644 new mode 100755 index 4dbda63..4796065 --- a/conn.go +++ b/conn.go @@ -10,29 +10,27 @@ import ( ) type conn struct { - // Fields settable with options at Client's creation. - addr string - errorHandler func(error) - flushPeriod time.Duration - maxPacketSize int - network string - tagFormat TagFormat + // config - mu sync.Mutex - // Fields guarded by the mutex. - closed bool - w io.WriteCloser - buf []byte - rateCache map[float32]string + errorHandler func(error) // errorHandler may be provided by the user + flushPeriod time.Duration // flushPeriod may be provided by the user + maxPacketSize int // maxPacketSize may be provided by the user and has a sensible default + tagFormat TagFormat // tagFormat must be provided by the user + + // state + + mu sync.Mutex // mu synchronises internal state + closed bool // closed indicates if w has been closed (triggered by first client close) + w io.WriteCloser // w is the writer for the connection + buf []byte // buf is the buffer for the connection + rateCache map[float32]string // rateCache caches string representations of sampling rates } func newConn(conf connConfig, muted bool) (*conn, error) { c := &conn{ - addr: conf.Addr, errorHandler: conf.ErrorHandler, flushPeriod: conf.FlushPeriod, maxPacketSize: conf.MaxPacketSize, - network: conf.Network, tagFormat: conf.TagFormat, } @@ -41,13 +39,13 @@ func newConn(conf connConfig, muted bool) (*conn, error) { } var err error - c.w, err = dialTimeout(c.network, c.addr, 5*time.Second) + c.w, err = dialTimeout(conf.Network, conf.Addr, 5*time.Second) if err != nil { return c, err } // When using UDP do a quick check to see if something is listening on the // given port to return an error as soon as possible. - if c.network[:3] == "udp" { + if conf.Network[:3] == "udp" { for i := 0; i < 2; i++ { _, err = c.w.Write(nil) if err != nil { From 510fd890eb71bf627e2e5c6a5b4c6769adb5998e Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Wed, 22 May 2019 06:37:31 +1000 Subject: [PATCH 06/35] fix(TestFlush): increase sleep to chance of race causing test failure --- statsd_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) mode change 100644 => 100755 statsd_test.go diff --git a/statsd_test.go b/statsd_test.go old mode 100644 new mode 100755 index c97650c..a47fe72 --- a/statsd_test.go +++ b/statsd_test.go @@ -233,7 +233,7 @@ func TestFlush(t *testing.T) { func TestFlushPeriod(t *testing.T) { testClient(t, func(c *Client) { c.Increment(testKey) - time.Sleep(time.Millisecond) + time.Sleep(time.Millisecond * 400) c.conn.mu.Lock() got := getOutput(c) want := "test_key:1|c" From 4004ac8176af7f4ae2353fc54a80d975e5a8e453 Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Wed, 22 May 2019 07:25:41 +1000 Subject: [PATCH 07/35] feat(WriteCloser): support arbitrary output streams --- conn.go | 48 ++++++++++++++++++--------- options.go | 20 ++++++++++- statsd_test.go | 90 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 142 insertions(+), 16 deletions(-) mode change 100644 => 100755 options.go diff --git a/conn.go b/conn.go index 4796065..dd7b4ac 100755 --- a/conn.go +++ b/conn.go @@ -32,27 +32,24 @@ func newConn(conf connConfig, muted bool) (*conn, error) { flushPeriod: conf.FlushPeriod, maxPacketSize: conf.MaxPacketSize, tagFormat: conf.TagFormat, + w: conf.WriteCloser, } + // exit if muted if muted { + // close and clear any provided writer + if c.w != nil { + _ = c.w.Close() + c.w = nil + } + // return muted client return c, nil } - var err error - c.w, err = dialTimeout(conf.Network, conf.Addr, 5*time.Second) - if err != nil { - return c, err - } - // When using UDP do a quick check to see if something is listening on the - // given port to return an error as soon as possible. - if conf.Network[:3] == "udp" { - for i := 0; i < 2; i++ { - _, err = c.w.Write(nil) - if err != nil { - _ = c.w.Close() - c.w = nil - return c, err - } + // initialise writer if not provided + if c.w == nil { + if err := c.connect(conf.Network, conf.Addr); err != nil { + return c, err } } @@ -79,6 +76,27 @@ func newConn(conf connConfig, muted bool) (*conn, error) { return c, nil } +func (c *conn) connect(network string, address string) error { + var err error + c.w, err = dialTimeout(network, address, 5*time.Second) + if err != nil { + return err + } + // When using UDP do a quick check to see if something is listening on the + // given port to return an error as soon as possible. + if network[:3] == "udp" { + for i := 0; i < 2; i++ { + _, err = c.w.Write(nil) + if err != nil { + _ = c.w.Close() + c.w = nil + return err + } + } + } + return nil +} + func (c *conn) metric(prefix, bucket string, n interface{}, typ string, rate float32, tags string) { c.mu.Lock() l := len(c.buf) diff --git a/options.go b/options.go old mode 100644 new mode 100755 index ef95bb8..5ffd5d2 --- a/options.go +++ b/options.go @@ -2,6 +2,7 @@ package statsd import ( "bytes" + "io" "strings" "time" ) @@ -25,6 +26,7 @@ type connConfig struct { MaxPacketSize int Network string TagFormat TagFormat + WriteCloser io.WriteCloser } // An Option represents an option for a Client. It must be used as an @@ -84,8 +86,24 @@ func Network(network string) Option { }) } +// WriteCloser sets the connection writer used by the client. If this option is +// present it will take precedence over the Network and Address options. If the +// client is muted then the writer will be closed before returning. The writer +// will be closed on Client.Close. Multiples of this option will cause the last +// writer to be used (if any), and previously provided writers to be closed. +// +// This option is ignored in Client.Clone(). +func WriteCloser(writer io.WriteCloser) Option { + return func(c *config) { + if c.Conn.WriteCloser != nil { + _ = c.Conn.WriteCloser.Close() + } + c.Conn.WriteCloser = writer + } +} + // Mute sets whether the Client is muted. All methods of a muted Client do -// nothing and return immedialtly. +// nothing and return immediately. // // This option can be used in Client.Clone() only if the parent Client is not // muted. The clones of a muted Client are always muted. diff --git a/statsd_test.go b/statsd_test.go index a47fe72..41dd812 100755 --- a/statsd_test.go +++ b/statsd_test.go @@ -245,6 +245,27 @@ func TestFlushPeriod(t *testing.T) { }, FlushPeriod(time.Nanosecond)) } +func TestFlushPeriod_writeCloser(t *testing.T) { + c, err := New( + ErrorHandler(expectNoError(t)), + FlushPeriod(time.Nanosecond), + WriteCloser(&testBuffer{}), + ) + if err != nil { + t.Fatalf("New: %v", err) + } + c.Increment(testKey) + time.Sleep(time.Millisecond * 400) + c.conn.mu.Lock() + got := getOutput(c) + want := "test_key:1|c" + if got != want { + t.Errorf("Invalid output, got %q, want %q", got, want) + } + c.conn.mu.Unlock() + c.Close() +} + func TestMaxPacketSize(t *testing.T) { testClient(t, func(c *Client) { c.Increment(testKey) @@ -372,6 +393,75 @@ func TestUDPNotListening(t *testing.T) { } } +func TestWriteCloser(t *testing.T) { + count := 0 + writer := &mockWriteCloser{ + close: func() error { + count++ + return nil + }, + } + config := new(config) + WriteCloser(writer)(config) + if v := config.Conn.WriteCloser; v != writer { + t.Fatal(v) + } + if count != 0 { + t.Fatal(writer) + } + WriteCloser(nil)(config) + if v := config.Conn.WriteCloser; v != nil { + t.Fatal(v) + } + if count != 1 { + t.Fatal(writer) + } +} + +func TestNew_writeCloserClosesOnMute(t *testing.T) { + count := 0 + writer := &mockWriteCloser{ + close: func() error { + count++ + return nil + }, + } + client, err := New( + FlushPeriod(0), + ErrorHandler(expectNoError(t)), + WriteCloser(writer), + Mute(true), + ) + if client == nil || err != nil { + t.Fatal(client, err) + } + if client.conn.w != nil { + t.Error(client.conn.w) + } + if count != 1 { + t.Error(count) + } +} + +type mockWriteCloser struct { + write func(p []byte) (n int, err error) + close func() error +} + +func (m *mockWriteCloser) Write(p []byte) (n int, err error) { + if m.write != nil { + return m.write(p) + } + panic("implement me") +} + +func (m *mockWriteCloser) Close() error { + if m.close != nil { + return m.close() + } + panic("implement me") +} + type mockClosedUDPConn struct { i int net.Conn From a7101e18d6e928dbfadfcd0a609a38ae1f510b23 Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Wed, 22 May 2019 07:34:51 +1000 Subject: [PATCH 08/35] update(readme): maintainer notes --- README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) mode change 100644 => 100755 README.md diff --git a/README.md b/README.md old mode 100644 new mode 100755 index 774a1c6..37f20c5 --- a/README.md +++ b/README.md @@ -1,3 +1,16 @@ +I intend to maintain this fork of [alexcesaro/statsd](https://github.com/alexcesaro/statsd) for the foreseeable future, +as I use this library in my own projects. Backwards compatibility is my highest priority. I did attempt to look for +existing maintained forks, but the few I investigated all made breaking changes. I will be adding new features, but +only when I have an immediate use case, and I will do my best to keep to the spirit of the original implementation. + +No releases but `master` will remain stable™. + +**Changelog** + +* 2019-05-22 - Added support for arbitrary output streams via new `statsd.WriteCloser` option + +--- + # statsd [![Build Status](https://travis-ci.org/alexcesaro/statsd.svg?branch=v2)](https://travis-ci.org/alexcesaro/statsd) [![Code Coverage](http://gocover.io/_badge/gopkg.in/alexcesaro/statsd.v2)](http://gocover.io/gopkg.in/alexcesaro/statsd.v2) [![Documentation](https://godoc.org/gopkg.in/alexcesaro/statsd.v2?status.svg)](https://godoc.org/gopkg.in/alexcesaro/statsd.v2) From 88faa5ece54bf50218958f166fb0989851fef965 Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Wed, 22 May 2019 08:53:02 +1000 Subject: [PATCH 09/35] feat(InlineFlush): support flushing after metrics --- README.md | 2 ++ conn.go | 66 ++++++++++++++++++++++++++++++--------------- options.go | 15 ++++++++++- statsd_test.go | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 132 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 37f20c5..0f0904d 100755 --- a/README.md +++ b/README.md @@ -9,6 +9,8 @@ No releases but `master` will remain stable™. * 2019-05-22 - Added support for arbitrary output streams via new `statsd.WriteCloser` option +* 2019-05-22 - Added support for simplified inline flush logic via new `statsd.InlineFlush` option + --- # statsd diff --git a/conn.go b/conn.go index dd7b4ac..f057776 100755 --- a/conn.go +++ b/conn.go @@ -12,10 +12,11 @@ import ( type conn struct { // config - errorHandler func(error) // errorHandler may be provided by the user - flushPeriod time.Duration // flushPeriod may be provided by the user - maxPacketSize int // maxPacketSize may be provided by the user and has a sensible default - tagFormat TagFormat // tagFormat must be provided by the user + errorHandler func(error) + flushPeriod time.Duration + maxPacketSize int + tagFormat TagFormat + inlineFlush bool // state @@ -32,6 +33,7 @@ func newConn(conf connConfig, muted bool) (*conn, error) { flushPeriod: conf.FlushPeriod, maxPacketSize: conf.MaxPacketSize, tagFormat: conf.TagFormat, + inlineFlush: conf.InlineFlush, w: conf.WriteCloser, } @@ -57,25 +59,32 @@ func newConn(conf connConfig, muted bool) (*conn, error) { // an additional metric. c.buf = make([]byte, 0, c.maxPacketSize+200) - if c.flushPeriod > 0 { - go func() { - ticker := time.NewTicker(c.flushPeriod) - for _ = range ticker.C { - c.mu.Lock() - if c.closed { - ticker.Stop() - c.mu.Unlock() - return - } - c.flush(0) - c.mu.Unlock() - } - }() + // start the flush worker only if we have a rate and it's not unnecessary + if c.flushPeriod > 0 && !c.inlineFlush { + go c.flushWorker() } return c, nil } +func (c *conn) flushWorker() { + ticker := time.NewTicker(c.flushPeriod) + defer ticker.Stop() + for range ticker.C { + if func() bool { + c.mu.Lock() + defer c.mu.Unlock() + if c.closed { + return true + } + c.flush(0) + return false + }() { + return + } + } +} + func (c *conn) connect(network string, address string) error { var err error c.w, err = dialTimeout(network, address, 5*time.Second) @@ -105,7 +114,7 @@ func (c *conn) metric(prefix, bucket string, n interface{}, typ string, rate flo c.appendType(typ) c.appendRate(rate) c.closeMetric(tags) - c.flushIfBufferFull(l) + c.flushIfNecessary(l) c.mu.Unlock() } @@ -120,7 +129,7 @@ func (c *conn) gauge(prefix, bucket string, value interface{}, tags string) { } c.appendBucket(prefix, bucket, tags) c.appendGauge(value, tags) - c.flushIfBufferFull(l) + c.flushIfNecessary(l) c.mu.Unlock() } @@ -137,7 +146,7 @@ func (c *conn) unique(prefix, bucket string, value string, tags string) { c.appendString(value) c.appendType("s") c.closeMetric(tags) - c.flushIfBufferFull(l) + c.flushIfNecessary(l) c.mu.Unlock() } @@ -247,8 +256,21 @@ func (c *conn) closeMetric(tags string) { c.appendByte('\n') } -func (c *conn) flushIfBufferFull(lastSafeLen int) { +func (c *conn) flushNecessary() bool { + if c.inlineFlush { + return true + } if len(c.buf) > c.maxPacketSize { + return true + } + return false +} + +func (c *conn) flushIfNecessary(lastSafeLen int) { + if c.inlineFlush { + lastSafeLen = 0 + } + if c.flushNecessary() { c.flush(lastSafeLen) } } diff --git a/options.go b/options.go index 5ffd5d2..e1d8387 100755 --- a/options.go +++ b/options.go @@ -27,6 +27,7 @@ type connConfig struct { Network string TagFormat TagFormat WriteCloser io.WriteCloser + InlineFlush bool } // An Option represents an option for a Client. It must be used as an @@ -54,7 +55,7 @@ func ErrorHandler(h func(error)) Option { } // FlushPeriod sets how often the Client's buffer is flushed. If p is 0, the -// goroutine that periodically flush the buffer is not lauched and the buffer +// goroutine that periodically flush the buffer is not launched and the buffer // is only flushed when it is full. // // By default, the flush period is 100 ms. This option is ignored in @@ -102,6 +103,18 @@ func WriteCloser(writer io.WriteCloser) Option { } } +// InlineFlush enables or disables (default disabled) forced flushing, inline +// with recording each stat. This option takes precedence over FlushPeriod, +// which would be redundant if always flushing after each write. Note that +// this DOES NOT guarantee exactly one line per write. +// +// This option is ignored in Client.Clone(). +func InlineFlush(enabled bool) Option { + return func(c *config) { + c.Conn.InlineFlush = enabled + } +} + // Mute sets whether the Client is muted. All methods of a muted Client do // nothing and return immediately. // diff --git a/statsd_test.go b/statsd_test.go index 41dd812..dc311ed 100755 --- a/statsd_test.go +++ b/statsd_test.go @@ -6,6 +6,7 @@ import ( "io" "io/ioutil" "net" + "runtime" "sync" "testing" "time" @@ -443,6 +444,77 @@ func TestNew_writeCloserClosesOnMute(t *testing.T) { } } +func TestNew_inlineFlush(t *testing.T) { + defer func() func() { + startGoroutines := runtime.NumGoroutine() + return func() { + endGoroutines := runtime.NumGoroutine() + if startGoroutines < endGoroutines { + t.Error(startGoroutines, endGoroutines) + } + } + }()() + client, err := New( + FlushPeriod(0), + ErrorHandler(expectNoError(t)), + WriteCloser(&mockWriteCloser{}), + InlineFlush(true), + ) + if client == nil || err != nil || client.muted { + t.Fatal(client, err) + } + if !client.conn.inlineFlush { + t.Error(client.conn.inlineFlush) + } +} + +func TestInlineFlush(t *testing.T) { + config := new(config) + InlineFlush(true)(config) + if !config.Conn.InlineFlush { + t.Error(config.Conn.InlineFlush) + } +} + +func TestConn_flushNecessary_inlineFlush(t *testing.T) { + if !(&conn{inlineFlush: true}).flushNecessary() { + t.Error(`expected always true if always flush is enabled`) + } +} + +func TestConn_flushIfNecessary_inlineFlush(t *testing.T) { + var ( + called bool + flushed string + c = &conn{ + buf: []byte("test_key:1|c\n"), + inlineFlush: true, + maxPacketSize: 100, + w: &mockWriteCloser{ + write: func(b []byte) (int, error) { + if called { + t.Error(`called more than once`) + } + called = true + flushed = string(b) + return len(b), nil + }, + }, + } + ) + // will actually flush everything + c.flushIfNecessary(2) + if !called { + t.Error(called) + } + if flushed != "test_key:1|c" { + t.Error(flushed) + } + if len(c.buf) != 0 { + t.Error(c.buf) + } +} + type mockWriteCloser struct { write func(p []byte) (n int, err error) close func() error From 23eed68f050597cdc9bc483df40cc4d416228624 Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Sun, 26 May 2019 13:27:18 +1000 Subject: [PATCH 10/35] fix(tcp-flush): write trailing newlines for all non-udp cases --- README.md | 2 ++ conn.go | 31 ++++++++++++++++++++++--------- statsd_test.go | 12 ++++++++++-- 3 files changed, 34 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 0f0904d..63e34f1 100755 --- a/README.md +++ b/README.md @@ -11,6 +11,8 @@ No releases but `master` will remain stable™. * 2019-05-22 - Added support for simplified inline flush logic via new `statsd.InlineFlush` option +* 2019-05-26 - Fixed bug causing trailing newlines to be removed for streaming (non-udp) connections + --- # statsd diff --git a/conn.go b/conn.go index f057776..0e8dfcb 100755 --- a/conn.go +++ b/conn.go @@ -20,11 +20,12 @@ type conn struct { // state - mu sync.Mutex // mu synchronises internal state - closed bool // closed indicates if w has been closed (triggered by first client close) - w io.WriteCloser // w is the writer for the connection - buf []byte // buf is the buffer for the connection - rateCache map[float32]string // rateCache caches string representations of sampling rates + mu sync.Mutex // mu synchronises internal state + closed bool // closed indicates if w has been closed (triggered by first client close) + w io.WriteCloser // w is the writer for the connection + buf []byte // buf is the buffer for the connection + rateCache map[float32]string // rateCache caches string representations of sampling rates + trimTrailingNewline bool // trimTrailingNewline is set only when running in UDP mode } func newConn(conf connConfig, muted bool) (*conn, error) { @@ -91,9 +92,13 @@ func (c *conn) connect(network string, address string) error { if err != nil { return err } - // When using UDP do a quick check to see if something is listening on the - // given port to return an error as soon as possible. + if network[:3] == "udp" { + // udp retains behavior from the original implementation where it would strip a trailing newline + c.trimTrailingNewline = true + + // When using UDP do a quick check to see if something is listening on the + // given port to return an error as soon as possible. for i := 0; i < 2; i++ { _, err = c.w.Write(nil) if err != nil { @@ -285,9 +290,17 @@ func (c *conn) flush(n int) { n = len(c.buf) } - // Trim the last \n, StatsD does not like it. - _, err := c.w.Write(c.buf[:n-1]) + // write + buffer := c.buf[:n] + if c.trimTrailingNewline { + // https://github.com/cactus/go-statsd-client/issues/17 + // Trim the last \n, StatsD does not like it. + buffer = buffer[:len(buffer)-1] + } + _, err := c.w.Write(buffer) c.handleError(err) + + // consume if n < len(c.buf) { copy(c.buf, c.buf[n:]) } diff --git a/statsd_test.go b/statsd_test.go index dc311ed..8734c5f 100755 --- a/statsd_test.go +++ b/statsd_test.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "net" "runtime" + "strings" "sync" "testing" "time" @@ -259,7 +260,7 @@ func TestFlushPeriod_writeCloser(t *testing.T) { time.Sleep(time.Millisecond * 400) c.conn.mu.Lock() got := getOutput(c) - want := "test_key:1|c" + want := "test_key:1|c\n" if got != want { t.Errorf("Invalid output, got %q, want %q", got, want) } @@ -507,7 +508,7 @@ func TestConn_flushIfNecessary_inlineFlush(t *testing.T) { if !called { t.Error(called) } - if flushed != "test_key:1|c" { + if flushed != "test_key:1|c\n" { t.Error(flushed) } if len(c.buf) != 0 { @@ -636,6 +637,13 @@ func testNetwork(t *testing.T, network string) { received := make(chan bool) server := newServer(t, network, testAddr, func(p []byte) { s := string(p) + if network != "udp" { + if !strings.HasSuffix(s, "\n") { + t.Error(s) + } else { + s = s[:len(s)-1] + } + } if s != "test_key:1|c" { t.Errorf("invalid output: %q", s) } From c94382d2f27f4ad13796f7029b40aa1d86c3cf5d Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Sat, 15 Feb 2020 09:12:35 +1000 Subject: [PATCH 11/35] revert(readme): remove maintainer notes for pr to upstream --- README.md | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/README.md b/README.md index 63e34f1..774a1c6 100755 --- a/README.md +++ b/README.md @@ -1,20 +1,3 @@ -I intend to maintain this fork of [alexcesaro/statsd](https://github.com/alexcesaro/statsd) for the foreseeable future, -as I use this library in my own projects. Backwards compatibility is my highest priority. I did attempt to look for -existing maintained forks, but the few I investigated all made breaking changes. I will be adding new features, but -only when I have an immediate use case, and I will do my best to keep to the spirit of the original implementation. - -No releases but `master` will remain stable™. - -**Changelog** - -* 2019-05-22 - Added support for arbitrary output streams via new `statsd.WriteCloser` option - -* 2019-05-22 - Added support for simplified inline flush logic via new `statsd.InlineFlush` option - -* 2019-05-26 - Fixed bug causing trailing newlines to be removed for streaming (non-udp) connections - ---- - # statsd [![Build Status](https://travis-ci.org/alexcesaro/statsd.svg?branch=v2)](https://travis-ci.org/alexcesaro/statsd) [![Code Coverage](http://gocover.io/_badge/gopkg.in/alexcesaro/statsd.v2)](http://gocover.io/gopkg.in/alexcesaro/statsd.v2) [![Documentation](https://godoc.org/gopkg.in/alexcesaro/statsd.v2?status.svg)](https://godoc.org/gopkg.in/alexcesaro/statsd.v2) From 9a056b66e94e4cfea35d39f3d4b952460bb55060 Mon Sep 17 00:00:00 2001 From: Steven Hartland Date: Sun, 23 Feb 2020 20:06:02 +0000 Subject: [PATCH 12/35] fix: Close relative gauges (#4) Close relative gauges which was creating corrupt statsd protocol. Also: * Remove TestUDPNotListening which is broken since a51b81. * Fix lint warnings. * Enabled lint checks in travis. * Enabled 1.x and master only for travis as older versions fail to detect go mod support properly. --- .travis.yml | 15 +++++++++------ conn.go | 13 +++++++------ statsd_test.go | 41 +++++++---------------------------------- 3 files changed, 23 insertions(+), 46 deletions(-) diff --git a/.travis.yml b/.travis.yml index 48915e7..8670c48 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,12 @@ language: go go: - - 1.2 - - 1.3 - - 1.4 - - 1.5 - - 1.6 - - tip + - 1.x + - master + +install: + - curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin v1.23.6 + +script: + - go test -v -race -timeout=10s ./... + - golangci-lint run diff --git a/conn.go b/conn.go index de8ad04..c1b76b3 100644 --- a/conn.go +++ b/conn.go @@ -54,7 +54,7 @@ func newConn(conf connConfig, muted bool) (*conn, error) { if c.flushPeriod > 0 { go func() { ticker := time.NewTicker(c.flushPeriod) - for _ = range ticker.C { + for range ticker.C { c.mu.Lock() if c.closed { ticker.Stop() @@ -92,6 +92,7 @@ func (c *conn) gaugeRelative(prefix, bucket string, value interface{}, tags stri c.appendString(fmt.Sprintf("+%v", value)) } c.appendType("g") + c.closeMetric(tags) c.flushIfBufferFull(l) c.mu.Unlock() } @@ -170,23 +171,23 @@ func isNegative(v interface{}) bool { case int: return n < 0 case uint: - return n < 0 + return false case int64: return n < 0 case uint64: - return n < 0 + return false case int32: return n < 0 case uint32: - return n < 0 + return false case int16: return n < 0 case uint16: - return n < 0 + return false case int8: return n < 0 case uint8: - return n < 0 + return false case float64: return n < 0 case float32: diff --git a/statsd_test.go b/statsd_test.go index c97650c..fc7ffe8 100644 --- a/statsd_test.go +++ b/statsd_test.go @@ -37,6 +37,13 @@ func TestGauge(t *testing.T) { }) } +func TestGaugeRelative(t *testing.T) { + testOutput(t, "test_key:+5|g\ntest_key:-10|g", func(c *Client) { + c.GaugeRelative(testKey, 5) + c.GaugeRelative(testKey, -10) + }) +} + func TestTiming(t *testing.T) { testOutput(t, "test_key:6|ms", func(c *Client) { c.Timing(testKey, 6) @@ -359,40 +366,6 @@ func TestConcurrency(t *testing.T) { }) } -func TestUDPNotListening(t *testing.T) { - dialTimeout = mockUDPClosed - defer func() { dialTimeout = net.DialTimeout }() - - c, err := New() - if c == nil || !c.muted { - t.Error("New() did not return a muted client") - } - if err == nil { - t.Error("New should return an error") - } -} - -type mockClosedUDPConn struct { - i int - net.Conn -} - -func (c *mockClosedUDPConn) Write(p []byte) (int, error) { - c.i++ - if c.i == 2 { - return 0, errors.New("test error") - } - return 0, nil -} - -func (c *mockClosedUDPConn) Close() error { - return nil -} - -func mockUDPClosed(string, string, time.Duration) (net.Conn, error) { - return &mockClosedUDPConn{}, nil -} - func testClient(t *testing.T, f func(*Client), options ...Option) { dialTimeout = mockDial defer func() { dialTimeout = net.DialTimeout }() From 111b368b7794dfcbf1daec887e5723fbdf69ac67 Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Thu, 27 Aug 2020 08:08:08 +1000 Subject: [PATCH 13/35] fix(Tags): tags option now updates existing tags correctly https://github.com/alexcesaro/statsd/issues/41 --- README.md | 2 ++ options.go | 30 ++++++++---------------------- statsd_test.go | 13 ++++++++----- 3 files changed, 18 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index 63e34f1..59ecde6 100755 --- a/README.md +++ b/README.md @@ -7,6 +7,8 @@ No releases but `master` will remain stable™. **Changelog** +* 2020-08-27 - Fixed bug in `statsd.Tags` identified by https://github.com/alexcesaro/statsd/issues/41 + * 2019-05-22 - Added support for arbitrary output streams via new `statsd.WriteCloser` option * 2019-05-22 - Added support for simplified inline flush logic via new `statsd.InlineFlush` option diff --git a/options.go b/options.go index e1d8387..f95f1d5 100755 --- a/options.go +++ b/options.go @@ -166,33 +166,19 @@ func Tags(tags ...string) Option { if len(tags)%2 != 0 { panic("statsd: Tags only accepts an even number of arguments") } - - return Option(func(c *config) { - if len(tags) == 0 { - return - } - - newTags := make([]tag, len(tags)/2) + return func(c *config) { + UpdateLoop: for i := 0; i < len(tags)/2; i++ { - newTags[i] = tag{K: tags[2*i], V: tags[2*i+1]} - } - - for _, newTag := range newTags { - exists := false - for _, oldTag := range c.Client.Tags { + newTag := tag{K: tags[2*i], V: tags[2*i+1]} + for i, oldTag := range c.Client.Tags { if newTag.K == oldTag.K { - exists = true - oldTag.V = newTag.V + c.Client.Tags[i] = newTag + continue UpdateLoop } } - if !exists { - c.Client.Tags = append(c.Client.Tags, tag{ - K: newTag.K, - V: newTag.V, - }) - } + c.Client.Tags = append(c.Client.Tags, newTag) } - }) + } } type tag struct { diff --git a/statsd_test.go b/statsd_test.go index 8734c5f..ceb81da 100755 --- a/statsd_test.go +++ b/statsd_test.go @@ -340,17 +340,17 @@ func TestCloneRate(t *testing.T) { } func TestCloneInfluxDBTags(t *testing.T) { - testOutput(t, "test_key,tag1=value1,tag2=value2:5|c", func(c *Client) { - clone := c.Clone(Tags("tag1", "value3", "tag2", "value2")) + testOutput(t, "test_key,tag1=value3,tag3=value4,tag4=value9,tag5=value6,tag2=value2:5|c", func(c *Client) { + clone := c.Clone(Tags("tag2", "value2", "tag1", "value3", "tag4", "value8", "tag4", "value9")) clone.Count(testKey, 5) - }, TagsFormat(InfluxDB), Tags("tag1", "value1")) + }, TagsFormat(InfluxDB), Tags("tag1", "value1", "tag3", "value4", "tag4", "value5", "tag5", "value6", "tag4", "value7")) } func TestCloneDatadogTags(t *testing.T) { - testOutput(t, "test_key:5|c|#tag1:value1,tag2:value2", func(c *Client) { + testOutput(t, "test_key:5|c|#tag1:value3,tag3:value4,tag2:value2", func(c *Client) { clone := c.Clone(Tags("tag1", "value3", "tag2", "value2")) clone.Count(testKey, 5) - }, TagsFormat(Datadog), Tags("tag1", "value1")) + }, TagsFormat(Datadog), Tags("tag1", "value1", "tag3", "value4")) } func TestDialError(t *testing.T) { @@ -557,6 +557,7 @@ func mockUDPClosed(string, string, time.Duration) (net.Conn, error) { } func testClient(t *testing.T, f func(*Client), options ...Option) { + t.Helper() dialTimeout = mockDial defer func() { dialTimeout = net.DialTimeout }() @@ -573,7 +574,9 @@ func testClient(t *testing.T, f func(*Client), options ...Option) { } func testOutput(t *testing.T, want string, f func(*Client), options ...Option) { + t.Helper() testClient(t, func(c *Client) { + t.Helper() f(c) c.Close() From 3bf2bb40be320e3599d1558de33453547edc998e Mon Sep 17 00:00:00 2001 From: Page Bowers <40775967+pabowers@users.noreply.github.com> Date: Mon, 16 Nov 2020 19:58:16 -0800 Subject: [PATCH 14/35] Create a new WriteCloser that will check before writing if connection is down (#2) --- README.md | 2 + safeconn.go | 68 ++++++++++++++++++++++++++ safeconn_test.go | 123 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 193 insertions(+) create mode 100644 safeconn.go create mode 100644 safeconn_test.go diff --git a/README.md b/README.md index 59ecde6..4aaf7d8 100755 --- a/README.md +++ b/README.md @@ -7,6 +7,8 @@ No releases but `master` will remain stable™. **Changelog** +* 2020-11-13 - Added the SafeConn write closer that checks if the connection is still up before attempting to write + * 2020-08-27 - Fixed bug in `statsd.Tags` identified by https://github.com/alexcesaro/statsd/issues/41 * 2019-05-22 - Added support for arbitrary output streams via new `statsd.WriteCloser` option diff --git a/safeconn.go b/safeconn.go new file mode 100644 index 0000000..f1058ab --- /dev/null +++ b/safeconn.go @@ -0,0 +1,68 @@ +package statsd + +import ( + "fmt" + "io" + "net" + "time" +) + +const ( + defaultReadTimeout = 10 * time.Millisecond + defaultConnTimeout = 5 * time.Second +) + +// SafeConn is an implementation of the io.WriteCloser that wraps a net.Conn type +// its purpose is to perform a guard as a part of each Write call to first check if +// the connection is still up by performing a small read. The use case of this is to +// protect against the case where a TCP connection comes disconnected and the Write +// continues to retry for up to 15 minutes before determining that the connection has +// been broken off. +type SafeConn struct { + netConn net.Conn + connTimeout time.Duration + readTimeout time.Duration +} + +func NewSafeConn(network, address string, connTimeout, readTimeout time.Duration) (*SafeConn, error) { + newConn, err := dialTimeout(network, address, connTimeout) + if err != nil { + return nil, err + } + + c := &SafeConn{ + netConn: newConn, + connTimeout: connTimeout, + readTimeout: readTimeout, + } + + return c, nil +} + +func NewSafeConnWithDefaultTimeouts(network string, address string) (*SafeConn, error) { + return NewSafeConn(network, address, defaultConnTimeout, defaultReadTimeout) +} + +func (s *SafeConn) Write(p []byte) (n int, err error) { + // check if connection is closed + if s.connIsClosed() { + return 0, fmt.Errorf("connection is closed") + } + + return s.netConn.Write(p) +} + +func (s *SafeConn) Close() error { + return s.netConn.Close() +} + +func (s *SafeConn) connIsClosed() bool { + err := s.netConn.SetReadDeadline(time.Now().Add(s.readTimeout)) + if err != nil { + return true + } + + one := make([]byte, 1) + _, err = s.netConn.Read(one) + return err == io.EOF +} diff --git a/safeconn_test.go b/safeconn_test.go new file mode 100644 index 0000000..a703f30 --- /dev/null +++ b/safeconn_test.go @@ -0,0 +1,123 @@ +package statsd + +import ( + "io" + "net" + "testing" + "time" +) + +type mockNetConn struct { + read func(p []byte) (n int, err error) + write func(p []byte) (n int, err error) + close func() error + localAddr func() net.Addr + remoteAddr func() net.Addr + setDeadline func(t time.Time) error + setReadDeadline func(t time.Time) error + setWriteDeadline func(t time.Time) error +} + +func (m *mockNetConn) Read(p []byte) (n int, err error) { + if m.read != nil { + return m.read(p) + } + panic("implement me") +} + +func (m *mockNetConn) Write(p []byte) (n int, err error) { + if m.write != nil { + return m.write(p) + } + panic("implement me") +} + +func (m *mockNetConn) Close() error { + if m.close != nil { + return m.close() + } + panic("implement me") +} + +func (m *mockNetConn) LocalAddr() net.Addr { + if m.localAddr != nil { + return m.localAddr() + } + panic("implement me") +} + +func (m *mockNetConn) RemoteAddr() net.Addr { + if m.remoteAddr != nil { + return m.remoteAddr() + } + panic("implement me") +} + +func (m *mockNetConn) SetDeadline(t time.Time) error { + if m.setDeadline != nil { + return m.setDeadline(t) + } + panic("implement me") +} + +func (m *mockNetConn) SetReadDeadline(t time.Time) error { + if m.setReadDeadline != nil { + return m.setReadDeadline(t) + } + panic("implement me") +} + +func (m *mockNetConn) SetWriteDeadline(t time.Time) error { + if m.setWriteDeadline != nil { + return m.setWriteDeadline(t) + } + panic("implement me") +} + +func TestSafeConn_FailsToWriteIfCannotRead(t *testing.T) { + c := &mockNetConn{ + setReadDeadline: func(t time.Time) error { + return nil + }, + read: func(b []byte) (int, error) { + return 0, io.EOF + }, + } + + s := SafeConn{ + netConn: c, + } + + p := []byte("test_key:1|c\n") + n, err := s.Write(p) + if n != 0 { + t.Error("Write() did not return 0 bytes when it failed") + } + if err == nil { + t.Error("Error should have been connection is closed") + } +} + +func TestSafeConn_SuccessfullyWritesWhenConnectionOpen(t *testing.T) { + c := &mockNetConn{ + setReadDeadline: func(t time.Time) error { + return nil + }, + read: func(b []byte) (int, error) { + return 1, nil + }, + write: func(b []byte) (int, error) { + return len(b), nil + }, + } + + s := SafeConn{ + netConn: c, + } + + p := []byte("test_key:1|c\n") + _, err := s.Write(p) + if err != nil { + t.Errorf("Error should have been nil, but instead it was: %v", err) + } +} From d6baccde015eab9eadd3b9fde2dc1c3e41ee9fc2 Mon Sep 17 00:00:00 2001 From: Page Bowers <40775967+pabowers@users.noreply.github.com> Date: Mon, 16 Nov 2020 20:00:50 -0800 Subject: [PATCH 15/35] Change the repository to support go modules (#3) --- README.md | 4 +++- examples_test.go | 2 +- go.mod | 3 +++ 3 files changed, 7 insertions(+), 2 deletions(-) create mode 100644 go.mod diff --git a/README.md b/README.md index 4aaf7d8..c3f7a2d 100755 --- a/README.md +++ b/README.md @@ -7,6 +7,8 @@ No releases but `master` will remain stable™. **Changelog** +* 2020-11-13 - Added support for go modules + * 2020-11-13 - Added the SafeConn write closer that checks if the connection is still up before attempting to write * 2020-08-27 - Fixed bug in `statsd.Tags` identified by https://github.com/alexcesaro/statsd/issues/41 @@ -48,7 +50,7 @@ https://godoc.org/gopkg.in/alexcesaro/statsd.v2 ## Download - go get gopkg.in/alexcesaro/statsd.v2 + go get github.com/joeycumines/statsd ## Example diff --git a/examples_test.go b/examples_test.go index dc6c2f3..89bc9af 100644 --- a/examples_test.go +++ b/examples_test.go @@ -5,7 +5,7 @@ import ( "runtime" "time" - "gopkg.in/alexcesaro/statsd.v2" + "github.com/joeycumines/statsd" ) var ( diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..3cd736f --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/joeycumines/statsd + +go 1.14 From 807810e4b94304f42ee6d29648b4915ffef220d9 Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Tue, 17 Nov 2020 14:05:18 +1000 Subject: [PATCH 16/35] Update README.md --- README.md | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index c3f7a2d..41504c1 100755 --- a/README.md +++ b/README.md @@ -1,9 +1,6 @@ -I intend to maintain this fork of [alexcesaro/statsd](https://github.com/alexcesaro/statsd) for the foreseeable future, -as I use this library in my own projects. Backwards compatibility is my highest priority. I did attempt to look for -existing maintained forks, but the few I investigated all made breaking changes. I will be adding new features, but -only when I have an immediate use case, and I will do my best to keep to the spirit of the original implementation. +This is a backwards compatible fork of [alexcesaro/statsd](https://github.com/alexcesaro/statsd). -No releases but `master` will remain stable™. +Note that this repository is versioned independently of the (unmaintained) upstream. **Changelog** From 13ed52b6fb5135340b70a9935b736ffc99e7f255 Mon Sep 17 00:00:00 2001 From: Page Bowers <40775967+pabowers@users.noreply.github.com> Date: Mon, 16 Nov 2020 19:58:16 -0800 Subject: [PATCH 17/35] Create a new WriteCloser that will check before writing if connection is down (#2) --- README.md | 0 conn.go | 0 options.go | 0 safeconn.go | 68 ++++++++++++++++++++++++++ safeconn_test.go | 123 +++++++++++++++++++++++++++++++++++++++++++++++ statsd_test.go | 0 6 files changed, 191 insertions(+) mode change 100755 => 100644 README.md mode change 100755 => 100644 conn.go mode change 100755 => 100644 options.go create mode 100644 safeconn.go create mode 100644 safeconn_test.go mode change 100755 => 100644 statsd_test.go diff --git a/README.md b/README.md old mode 100755 new mode 100644 diff --git a/conn.go b/conn.go old mode 100755 new mode 100644 diff --git a/options.go b/options.go old mode 100755 new mode 100644 diff --git a/safeconn.go b/safeconn.go new file mode 100644 index 0000000..f1058ab --- /dev/null +++ b/safeconn.go @@ -0,0 +1,68 @@ +package statsd + +import ( + "fmt" + "io" + "net" + "time" +) + +const ( + defaultReadTimeout = 10 * time.Millisecond + defaultConnTimeout = 5 * time.Second +) + +// SafeConn is an implementation of the io.WriteCloser that wraps a net.Conn type +// its purpose is to perform a guard as a part of each Write call to first check if +// the connection is still up by performing a small read. The use case of this is to +// protect against the case where a TCP connection comes disconnected and the Write +// continues to retry for up to 15 minutes before determining that the connection has +// been broken off. +type SafeConn struct { + netConn net.Conn + connTimeout time.Duration + readTimeout time.Duration +} + +func NewSafeConn(network, address string, connTimeout, readTimeout time.Duration) (*SafeConn, error) { + newConn, err := dialTimeout(network, address, connTimeout) + if err != nil { + return nil, err + } + + c := &SafeConn{ + netConn: newConn, + connTimeout: connTimeout, + readTimeout: readTimeout, + } + + return c, nil +} + +func NewSafeConnWithDefaultTimeouts(network string, address string) (*SafeConn, error) { + return NewSafeConn(network, address, defaultConnTimeout, defaultReadTimeout) +} + +func (s *SafeConn) Write(p []byte) (n int, err error) { + // check if connection is closed + if s.connIsClosed() { + return 0, fmt.Errorf("connection is closed") + } + + return s.netConn.Write(p) +} + +func (s *SafeConn) Close() error { + return s.netConn.Close() +} + +func (s *SafeConn) connIsClosed() bool { + err := s.netConn.SetReadDeadline(time.Now().Add(s.readTimeout)) + if err != nil { + return true + } + + one := make([]byte, 1) + _, err = s.netConn.Read(one) + return err == io.EOF +} diff --git a/safeconn_test.go b/safeconn_test.go new file mode 100644 index 0000000..a703f30 --- /dev/null +++ b/safeconn_test.go @@ -0,0 +1,123 @@ +package statsd + +import ( + "io" + "net" + "testing" + "time" +) + +type mockNetConn struct { + read func(p []byte) (n int, err error) + write func(p []byte) (n int, err error) + close func() error + localAddr func() net.Addr + remoteAddr func() net.Addr + setDeadline func(t time.Time) error + setReadDeadline func(t time.Time) error + setWriteDeadline func(t time.Time) error +} + +func (m *mockNetConn) Read(p []byte) (n int, err error) { + if m.read != nil { + return m.read(p) + } + panic("implement me") +} + +func (m *mockNetConn) Write(p []byte) (n int, err error) { + if m.write != nil { + return m.write(p) + } + panic("implement me") +} + +func (m *mockNetConn) Close() error { + if m.close != nil { + return m.close() + } + panic("implement me") +} + +func (m *mockNetConn) LocalAddr() net.Addr { + if m.localAddr != nil { + return m.localAddr() + } + panic("implement me") +} + +func (m *mockNetConn) RemoteAddr() net.Addr { + if m.remoteAddr != nil { + return m.remoteAddr() + } + panic("implement me") +} + +func (m *mockNetConn) SetDeadline(t time.Time) error { + if m.setDeadline != nil { + return m.setDeadline(t) + } + panic("implement me") +} + +func (m *mockNetConn) SetReadDeadline(t time.Time) error { + if m.setReadDeadline != nil { + return m.setReadDeadline(t) + } + panic("implement me") +} + +func (m *mockNetConn) SetWriteDeadline(t time.Time) error { + if m.setWriteDeadline != nil { + return m.setWriteDeadline(t) + } + panic("implement me") +} + +func TestSafeConn_FailsToWriteIfCannotRead(t *testing.T) { + c := &mockNetConn{ + setReadDeadline: func(t time.Time) error { + return nil + }, + read: func(b []byte) (int, error) { + return 0, io.EOF + }, + } + + s := SafeConn{ + netConn: c, + } + + p := []byte("test_key:1|c\n") + n, err := s.Write(p) + if n != 0 { + t.Error("Write() did not return 0 bytes when it failed") + } + if err == nil { + t.Error("Error should have been connection is closed") + } +} + +func TestSafeConn_SuccessfullyWritesWhenConnectionOpen(t *testing.T) { + c := &mockNetConn{ + setReadDeadline: func(t time.Time) error { + return nil + }, + read: func(b []byte) (int, error) { + return 1, nil + }, + write: func(b []byte) (int, error) { + return len(b), nil + }, + } + + s := SafeConn{ + netConn: c, + } + + p := []byte("test_key:1|c\n") + _, err := s.Write(p) + if err != nil { + t.Errorf("Error should have been nil, but instead it was: %v", err) + } +} diff --git a/statsd_test.go b/statsd_test.go old mode 100755 new mode 100644 From bb35aa955658bbb49dc4ce0ee09cfe4ac1913062 Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Tue, 17 Nov 2020 14:33:32 +1000 Subject: [PATCH 18/35] fix(fileperms): correct source files erroneously made executable --- README.md | 0 conn.go | 0 options.go | 0 statsd_test.go | 0 4 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 README.md mode change 100755 => 100644 conn.go mode change 100755 => 100644 options.go mode change 100755 => 100644 statsd_test.go diff --git a/README.md b/README.md old mode 100755 new mode 100644 diff --git a/conn.go b/conn.go old mode 100755 new mode 100644 diff --git a/options.go b/options.go old mode 100755 new mode 100644 diff --git a/statsd_test.go b/statsd_test.go old mode 100755 new mode 100644 From 1390b97a3e4a9a449e5dce05b1a39f734c984bb4 Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Thu, 27 Aug 2020 08:08:08 +1000 Subject: [PATCH 19/35] fix(Tags): tags option now updates existing tags correctly https://github.com/alexcesaro/statsd/issues/41 --- options.go | 30 ++++++++---------------------- statsd_test.go | 13 ++++++++----- 2 files changed, 16 insertions(+), 27 deletions(-) diff --git a/options.go b/options.go index e1d8387..f95f1d5 100644 --- a/options.go +++ b/options.go @@ -166,33 +166,19 @@ func Tags(tags ...string) Option { if len(tags)%2 != 0 { panic("statsd: Tags only accepts an even number of arguments") } - - return Option(func(c *config) { - if len(tags) == 0 { - return - } - - newTags := make([]tag, len(tags)/2) + return func(c *config) { + UpdateLoop: for i := 0; i < len(tags)/2; i++ { - newTags[i] = tag{K: tags[2*i], V: tags[2*i+1]} - } - - for _, newTag := range newTags { - exists := false - for _, oldTag := range c.Client.Tags { + newTag := tag{K: tags[2*i], V: tags[2*i+1]} + for i, oldTag := range c.Client.Tags { if newTag.K == oldTag.K { - exists = true - oldTag.V = newTag.V + c.Client.Tags[i] = newTag + continue UpdateLoop } } - if !exists { - c.Client.Tags = append(c.Client.Tags, tag{ - K: newTag.K, - V: newTag.V, - }) - } + c.Client.Tags = append(c.Client.Tags, newTag) } - }) + } } type tag struct { diff --git a/statsd_test.go b/statsd_test.go index 8734c5f..ceb81da 100644 --- a/statsd_test.go +++ b/statsd_test.go @@ -340,17 +340,17 @@ func TestCloneRate(t *testing.T) { } func TestCloneInfluxDBTags(t *testing.T) { - testOutput(t, "test_key,tag1=value1,tag2=value2:5|c", func(c *Client) { - clone := c.Clone(Tags("tag1", "value3", "tag2", "value2")) + testOutput(t, "test_key,tag1=value3,tag3=value4,tag4=value9,tag5=value6,tag2=value2:5|c", func(c *Client) { + clone := c.Clone(Tags("tag2", "value2", "tag1", "value3", "tag4", "value8", "tag4", "value9")) clone.Count(testKey, 5) - }, TagsFormat(InfluxDB), Tags("tag1", "value1")) + }, TagsFormat(InfluxDB), Tags("tag1", "value1", "tag3", "value4", "tag4", "value5", "tag5", "value6", "tag4", "value7")) } func TestCloneDatadogTags(t *testing.T) { - testOutput(t, "test_key:5|c|#tag1:value1,tag2:value2", func(c *Client) { + testOutput(t, "test_key:5|c|#tag1:value3,tag3:value4,tag2:value2", func(c *Client) { clone := c.Clone(Tags("tag1", "value3", "tag2", "value2")) clone.Count(testKey, 5) - }, TagsFormat(Datadog), Tags("tag1", "value1")) + }, TagsFormat(Datadog), Tags("tag1", "value1", "tag3", "value4")) } func TestDialError(t *testing.T) { @@ -557,6 +557,7 @@ func mockUDPClosed(string, string, time.Duration) (net.Conn, error) { } func testClient(t *testing.T, f func(*Client), options ...Option) { + t.Helper() dialTimeout = mockDial defer func() { dialTimeout = net.DialTimeout }() @@ -573,7 +574,9 @@ func testClient(t *testing.T, f func(*Client), options ...Option) { } func testOutput(t *testing.T, want string, f func(*Client), options ...Option) { + t.Helper() testClient(t, func(c *Client) { + t.Helper() f(c) c.Close() From 3f100362de964e9b55d13fefb4f167e0effd775c Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Fri, 31 Dec 2021 12:06:17 +1000 Subject: [PATCH 20/35] doc(changelogs): link and clarify old and new changelogs --- CHANGELOG.md | 4 ++++ README.md | 2 ++ 2 files changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 04d811b..3b26c92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,8 @@ # Change Log + +**NOTE:** This is the original change log, from the (unmaintained) upstream. +Please refer to the [readme](README.md) and Git history, for more recent changes. + All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). diff --git a/README.md b/README.md index 41504c1..9cd03fd 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,8 @@ Note that this repository is versioned independently of the (unmaintained) upstr * 2019-05-26 - Fixed bug causing trailing newlines to be removed for streaming (non-udp) connections +See also the [upstream changelog](CHANGELOG.md). + --- # statsd From 88a32038212f6e5104282f79bd34816d50448b9a Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Fri, 31 Dec 2021 12:07:43 +1000 Subject: [PATCH 21/35] Remove defunct .travis.yml --- .travis.yml | 9 --------- 1 file changed, 9 deletions(-) delete mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 48915e7..0000000 --- a/.travis.yml +++ /dev/null @@ -1,9 +0,0 @@ -language: go - -go: - - 1.2 - - 1.3 - - 1.4 - - 1.5 - - 1.6 - - tip From 02b390d4006d461210d41f3e2249c292271102b5 Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Fri, 31 Dec 2021 12:34:34 +1000 Subject: [PATCH 22/35] Document New/connect mute behavior (fix 1/2 for alexcesaro/statsd#6) This implementation does not provide connection state management. Handling connection failures and retrying as necessary should be implemented by the caller, if desired. --- statsd.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/statsd.go b/statsd.go index f19204d..4ad7d5f 100644 --- a/statsd.go +++ b/statsd.go @@ -11,7 +11,9 @@ type Client struct { tags string } -// New returns a new Client. +// New returns a new Client, which will always be non-nil, but will be muted +// (permanently inert / stubbed) if returned with an error, or if the Mute +// option was set. func New(opts ...Option) (*Client, error) { // The default configuration. conf := &config{ From 3f725ad10aa79c7628a6feeb490bd13f4422c257 Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Fri, 31 Dec 2021 14:38:02 +1000 Subject: [PATCH 23/35] feat(UDPCheck): option to disable initial udp connection check (fix 2/2 for alexcesaro/statsd#6) Adds an option to facilitate initialising an unmuted client for UDP conns where the server is unreachable. --- conn.go | 24 +++++++++++++++--------- options.go | 15 +++++++++++++++ statsd.go | 1 + statsd_test.go | 49 +++++++++++++++++++++++++++++++++++++++---------- 4 files changed, 70 insertions(+), 19 deletions(-) diff --git a/conn.go b/conn.go index 0e8dfcb..4f12d40 100644 --- a/conn.go +++ b/conn.go @@ -5,6 +5,7 @@ import ( "math/rand" "net" "strconv" + "strings" "sync" "time" ) @@ -51,7 +52,7 @@ func newConn(conf connConfig, muted bool) (*conn, error) { // initialise writer if not provided if c.w == nil { - if err := c.connect(conf.Network, conf.Addr); err != nil { + if err := c.connect(conf.Network, conf.Addr, conf.UDPCheck); err != nil { return c, err } } @@ -86,28 +87,33 @@ func (c *conn) flushWorker() { } } -func (c *conn) connect(network string, address string) error { +func (c *conn) connect(network string, address string, UDPCheck bool) error { var err error c.w, err = dialTimeout(network, address, 5*time.Second) if err != nil { return err } - if network[:3] == "udp" { + if strings.HasPrefix(network, "udp") { // udp retains behavior from the original implementation where it would strip a trailing newline c.trimTrailingNewline = true // When using UDP do a quick check to see if something is listening on the // given port to return an error as soon as possible. - for i := 0; i < 2; i++ { - _, err = c.w.Write(nil) - if err != nil { - _ = c.w.Close() - c.w = nil - return err + // + // See also doc for UDPCheck option (factory func) and https://github.com/alexcesaro/statsd/issues/6 + if UDPCheck { + for i := 0; i < 2; i++ { + _, err = c.w.Write(nil) + if err != nil { + _ = c.w.Close() + c.w = nil + return err + } } } } + return nil } diff --git a/options.go b/options.go index f95f1d5..a757662 100644 --- a/options.go +++ b/options.go @@ -19,6 +19,7 @@ type clientConfig struct { Tags []tag } +// connConfig is used by New, to initialise a conn type connConfig struct { Addr string ErrorHandler func(error) @@ -28,6 +29,7 @@ type connConfig struct { TagFormat TagFormat WriteCloser io.WriteCloser InlineFlush bool + UDPCheck bool } // An Option represents an option for a Client. It must be used as an @@ -115,6 +117,19 @@ func InlineFlush(enabled bool) Option { } } +// UDPCheck enables or disables (default enabled) checking UDP connections, as +// part of New. This behavior is useful, as it makes it easier to quickly +// identify misconfigured services. Disabling this option removes the need to +// explicitly manage the connection state, at the cost of error visibility. +// Using an error handler may mitigate some of this cost. +// +// This option is ignored in Client.Clone(). +func UDPCheck(enabled bool) Option { + return func(c *config) { + c.Conn.UDPCheck = enabled + } +} + // Mute sets whether the Client is muted. All methods of a muted Client do // nothing and return immediately. // diff --git a/statsd.go b/statsd.go index 4ad7d5f..735cfa3 100644 --- a/statsd.go +++ b/statsd.go @@ -27,6 +27,7 @@ func New(opts ...Option) (*Client, error) { // Ethernet MTU - IPv6 Header - TCP Header = 1500 - 40 - 20 = 1440 MaxPacketSize: 1440, Network: "udp", + UDPCheck: true, }, } for _, o := range opts { diff --git a/statsd_test.go b/statsd_test.go index ceb81da..5a51e20 100644 --- a/statsd_test.go +++ b/statsd_test.go @@ -382,16 +382,45 @@ func TestConcurrency(t *testing.T) { }) } -func TestUDPNotListening(t *testing.T) { - dialTimeout = mockUDPClosed - defer func() { dialTimeout = net.DialTimeout }() - - c, err := New() - if c == nil || !c.muted { - t.Error("New() did not return a muted client") - } - if err == nil { - t.Error("New should return an error") +func TestNew_udpNotListening(t *testing.T) { + for _, tc := range [...]struct { + Name string + Options []Option + Muted bool + Errored bool + }{ + { + Name: `default`, + Muted: true, + Errored: true, + }, + { + Name: `true`, + Options: []Option{UDPCheck(true)}, + Muted: true, + Errored: true, + }, + { + Name: `false`, + Options: []Option{UDPCheck(false)}, + Muted: false, + Errored: false, + }, + } { + t.Run(tc.Name, func(t *testing.T) { + dialTimeout = mockUDPClosed + defer func() { dialTimeout = net.DialTimeout }() + c, err := New(tc.Options...) + if c == nil { + t.Fatal(`client should never be nil`) + } + if c.muted != tc.Muted { + t.Error(c.muted) + } + if (err != nil) != tc.Errored { + t.Error(err) + } + }) } } From 63f17cc7adf2350c880fc423fe4b351ff91f2c4d Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Fri, 31 Dec 2021 15:20:33 +1000 Subject: [PATCH 24/35] Add github issue templates --- .github/ISSUE_TEMPLATE/bug_report.md | 21 +++++++++++++++++++++ .github/ISSUE_TEMPLATE/feature_request.md | 20 ++++++++++++++++++++ 2 files changed, 41 insertions(+) create mode 100644 .github/ISSUE_TEMPLATE/bug_report.md create mode 100644 .github/ISSUE_TEMPLATE/feature_request.md diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md new file mode 100644 index 0000000..ff74c28 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -0,0 +1,21 @@ +--- +name: Bug report +about: Create a report to help us improve +title: "[bug]" +labels: '' +assignees: '' + +--- + +**Describe the bug** +A clear and concise description of what the bug is. + +**To Reproduce** +Steps to reproduce the behavior. +Preferably, a short / minimal code example, that reproduces the behavior. + +**Expected behavior** +A clear and concise description of what you expected to happen. + +**Additional context** +Add any other context about the problem here. diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md new file mode 100644 index 0000000..70d2797 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.md @@ -0,0 +1,20 @@ +--- +name: Feature request +about: Suggest an idea for this project +title: "[feature request]" +labels: '' +assignees: '' + +--- + +**Is your feature request related to a problem? Please describe.** +A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] + +**Describe the solution you'd like** +A clear and concise description of what you want to happen. + +**Describe alternatives you've considered** +A clear and concise description of any alternative solutions or features you've considered. + +**Additional context** +Add any other context or screenshots about the feature request here. From e99aaacd09d3f7b4124d955ef5329b7bdf9cd514 Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Fri, 31 Dec 2021 15:24:57 +1000 Subject: [PATCH 25/35] Fix minor staticcheck issues --- conn.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/conn.go b/conn.go index 4f12d40..828bd94 100644 --- a/conn.go +++ b/conn.go @@ -202,24 +202,14 @@ func isNegative(v interface{}) bool { switch n := v.(type) { case int: return n < 0 - case uint: - return n < 0 case int64: return n < 0 - case uint64: - return n < 0 case int32: return n < 0 - case uint32: - return n < 0 case int16: return n < 0 - case uint16: - return n < 0 case int8: return n < 0 - case uint8: - return n < 0 case float64: return n < 0 case float32: From bb5eb9dacacd5e0036e644c1c3353fdaaf5cc44d Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Fri, 31 Dec 2021 15:27:10 +1000 Subject: [PATCH 26/35] Add github workflow to build test and lint --- .github/workflows/go.yml | 41 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 .github/workflows/go.yml diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 0000000..1a1414c --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,41 @@ +name: Go +on: + push: + branches: + - master + pull_request: + branches: + - master +jobs: + build: + runs-on: ubuntu-latest + permissions: + actions: read + contents: read + strategy: + fail-fast: false + matrix: + go-version: + - 1.17 + - 1.13 + steps: + - name: Set up Go + uses: actions/setup-go@v2 + with: + go-version: ${{ matrix.go-version }} + - name: Set up Staticcheck + run: go install honnef.co/go/tools/cmd/staticcheck@latest + if: ${{ matrix.go-version != '1.13' }} + - name: Set up Staticcheck (Go v1.13) + run: GO111MODULE=on go get -u honnef.co/go/tools/cmd/staticcheck@2021.1 + if: ${{ matrix.go-version == '1.13' }} + - name: Checkout + uses: actions/checkout@v2 + - name: Vet + run: go vet -v ./... + - name: Staticcheck + run: staticcheck ./... + - name: Build + run: go build -v ./... + - name: Test + run: go test -v -cover ./... From e0b328913234954e6150850e284fa66459de82df Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Fri, 31 Dec 2021 15:49:04 +1000 Subject: [PATCH 27/35] Change readme links from upstream to this fork --- README.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 9cd03fd..425c46f 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,8 @@ See also the [upstream changelog](CHANGELOG.md). --- # statsd -[![Build Status](https://travis-ci.org/alexcesaro/statsd.svg?branch=v2)](https://travis-ci.org/alexcesaro/statsd) [![Code Coverage](http://gocover.io/_badge/gopkg.in/alexcesaro/statsd.v2)](http://gocover.io/gopkg.in/alexcesaro/statsd.v2) [![Documentation](https://godoc.org/gopkg.in/alexcesaro/statsd.v2?status.svg)](https://godoc.org/gopkg.in/alexcesaro/statsd.v2) +[![Code Coverage](https://gocover.io/_badge/github.com/joeycumines/statsd)](https://gocover.io/github.com/joeycumines/statsd) +[![Documentation](https://godoc.org/github.com/joeycumines/statsd?status.svg)](https://godoc.org/github.com/joeycumines/statsd) ## Introduction @@ -44,7 +45,7 @@ with other Go StatsD clients. ## Documentation -https://godoc.org/gopkg.in/alexcesaro/statsd.v2 +https://pkg.go.dev/github.com/joeycumines/statsd#section-documentation ## Download @@ -54,7 +55,7 @@ https://godoc.org/gopkg.in/alexcesaro/statsd.v2 ## Example -See the [examples in the documentation](https://godoc.org/gopkg.in/alexcesaro/statsd.v2#example-package). +See the [examples in the documentation](https://pkg.go.dev/github.com/joeycumines/statsd#pkg-examples). ## License From ead05d60c197a2ce6676307f090eb3f0fa3311ed Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Fri, 31 Dec 2021 16:19:55 +1000 Subject: [PATCH 28/35] Add missing unit tests --- safeconn_test.go | 101 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/safeconn_test.go b/safeconn_test.go index a703f30..3cf5ffe 100644 --- a/safeconn_test.go +++ b/safeconn_test.go @@ -1,6 +1,7 @@ package statsd import ( + "errors" "io" "net" "testing" @@ -121,3 +122,103 @@ func TestSafeConn_SuccessfullyWritesWhenConnectionOpen(t *testing.T) { t.Errorf("Error should have been nil, but instead it was: %v", err) } } + +func TestNewSafeConnWithDefaultTimeouts(t *testing.T) { + for _, tc := range [...]struct { + Name string + Conn net.Conn + Err error + }{ + { + Name: `failure`, + Err: errors.New(`some error`), + }, + { + Name: `success`, + Conn: new(mockNetConn), + }, + } { + t.Run(tc.Name, func(t *testing.T) { + type ( + DialIn struct { + Network string + Address string + Timeout time.Duration + } + DialOut struct { + Conn net.Conn + Err error + } + ) + var ( + dialIn = make(chan DialIn) + dialOut = make(chan DialOut) + ) + defer close(dialIn) + defer close(dialOut) + defer func() func() { + old := dialTimeout + dialTimeout = func(network, address string, timeout time.Duration) (net.Conn, error) { + dialIn <- DialIn{network, address, timeout} + v := <-dialOut + return v.Conn, v.Err + } + return func() { dialTimeout = old } + }()() + + const ( + expectedNetwork = `tcp` + expectedAddress = `127.0.0.1:21969` + ) + + done := make(chan struct{}) + go func() { + defer close(done) + if v := <-dialIn; v != (DialIn{Network: expectedNetwork, Address: expectedAddress, Timeout: defaultConnTimeout}) { + t.Errorf("%+v", v) + } + dialOut <- DialOut{tc.Conn, tc.Err} + }() + + conn, err := NewSafeConnWithDefaultTimeouts(expectedNetwork, expectedAddress) + if err != tc.Err { + t.Error(conn, err) + } + if (tc.Conn == nil) != (conn == nil) { + t.Error(conn) + } else if conn != nil { + if conn.netConn != tc.Conn { + t.Error(conn.netConn) + } + if conn.connTimeout != defaultConnTimeout { + t.Error(conn.connTimeout) + } + if conn.readTimeout != defaultReadTimeout { + t.Error(conn.readTimeout) + } + } + + <-done + }) + } +} + +func TestSafeConn_Close(t *testing.T) { + a, b := net.Pipe() + conn := SafeConn{ + netConn: a, + readTimeout: 1, + } + if conn.connIsClosed() { + t.Error() + } + if err := (&SafeConn{netConn: b}).Close(); err != nil { + t.Error(err) + } + if !conn.connIsClosed() { + t.Error() + } + if err := conn.Close(); err != nil { + t.Error(err) + } +} From 23446bfc18706f0159e98457c018c4f1fde984d0 Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Fri, 31 Dec 2021 16:45:06 +1000 Subject: [PATCH 29/35] Minor documentation typo fixes --- statsd.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/statsd.go b/statsd.go index 735cfa3..f012b75 100644 --- a/statsd.go +++ b/statsd.go @@ -90,7 +90,7 @@ func (c *Client) skip() bool { return c.muted || (c.rate != 1 && randFloat() > c.rate) } -// Increment increment the given bucket. It is equivalent to Count(bucket, 1). +// Increment increments the given bucket. It is equivalent to Count(bucket, 1). func (c *Client) Increment(bucket string) { c.Count(bucket, 1) } @@ -111,7 +111,7 @@ func (c *Client) Timing(bucket string, value interface{}) { c.conn.metric(c.prefix, bucket, value, "ms", c.rate, c.tags) } -// Histogram sends an histogram value to a bucket. +// Histogram sends a histogram value to a bucket. func (c *Client) Histogram(bucket string, value interface{}) { if c.skip() { return @@ -119,7 +119,7 @@ func (c *Client) Histogram(bucket string, value interface{}) { c.conn.metric(c.prefix, bucket, value, "h", c.rate, c.tags) } -// A Timing is an helper object that eases sending timing values. +// Timing is a helper object that eases sending timing values. type Timing struct { start time.Time c *Client From a8e6f804b9bd5dc8504d4bda11208a85887beaa5 Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Fri, 31 Dec 2021 16:48:39 +1000 Subject: [PATCH 30/35] Update changelog --- README.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 425c46f..32a66dc 100644 --- a/README.md +++ b/README.md @@ -4,15 +4,17 @@ Note that this repository is versioned independently of the (unmaintained) upstr **Changelog** +* 2021-12-31 - Added support for disabling the initial ping/check for UDP connections via new `UDPCheck` option + * 2020-11-13 - Added support for go modules * 2020-11-13 - Added the SafeConn write closer that checks if the connection is still up before attempting to write -* 2020-08-27 - Fixed bug in `statsd.Tags` identified by https://github.com/alexcesaro/statsd/issues/41 +* 2020-08-27 - Fixed bug in `Tags` identified by https://github.com/alexcesaro/statsd/issues/41 -* 2019-05-22 - Added support for arbitrary output streams via new `statsd.WriteCloser` option +* 2019-05-22 - Added support for arbitrary output streams via new `WriteCloser` option -* 2019-05-22 - Added support for simplified inline flush logic via new `statsd.InlineFlush` option +* 2019-05-22 - Added support for simplified inline flush logic via new `InlineFlush` option * 2019-05-26 - Fixed bug causing trailing newlines to be removed for streaming (non-udp) connections From 3c8f8bacb62a10bff89de3f01047bfb6e44a4223 Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Sat, 1 Jan 2022 00:24:13 +1000 Subject: [PATCH 31/35] fix(GaugeRelative): remove fmt.Sprintf and fix handling of -0.0 Also fixes handling of -0.0 within Gauge, by stripping the sign. This is a follow up fix for alexcesaro/statsd#12. --- conn.go | 42 ++++++++---- statsd.go | 2 +- statsd_test.go | 179 +++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 203 insertions(+), 20 deletions(-) diff --git a/conn.go b/conn.go index 319b8b7..7739eeb 100644 --- a/conn.go +++ b/conn.go @@ -1,8 +1,8 @@ package statsd import ( - "fmt" "io" + "math" "math/rand" "net" "strconv" @@ -134,13 +134,14 @@ func (c *conn) gaugeRelative(prefix, bucket string, value interface{}, tags stri c.mu.Lock() l := len(c.buf) c.appendBucket(prefix, bucket, tags) - if isNegative(value) { - c.appendNumber(value) - } else { - c.appendString(fmt.Sprintf("+%v", value)) + // add a (positive) sign if necessary (if there's no negative sign) + // this is complicated by the special case of negative zero (IEEE-754 floating point thing) + // note that NaN ends up "+NaN" and invalid values end up "+" (both probably going to do nothing / error) + if f, ok := floatValue(value); (!ok && !isNegativeInteger(value)) || + (ok && (f != f || (f == 0 && !math.Signbit(f)) || (f > 0 && f <= math.MaxFloat64))) { + c.appendByte('+') } - c.appendType("g") - c.closeMetric(tags) + c.appendGauge(value, tags) c.flushIfNecessary(l) c.mu.Unlock() } @@ -150,7 +151,14 @@ func (c *conn) gauge(prefix, bucket string, value interface{}, tags string) { l := len(c.buf) // To set a gauge to a negative value we must first set it to 0. // https://github.com/etsy/statsd/blob/master/docs/metric_types.md#gauges - if isNegative(value) { + // the presence of a sign (/^[-+]/) requires the special case handling + // https://github.com/statsd/statsd/blob/2041f6fb5e64bbf779a8bcb3e9729e63fe207e2f/stats.js#L307 + // +Inf doesn't get this special case, no particular reason, it's just existing behavior + if f, ok := floatValue(value); ok && f == 0 { + // special case to handle negative zero (IEEE-754 floating point thing) + value = 0 + } else if (ok && f < 0) || (!ok && isNegativeInteger(value)) { + // note this case includes -Inf, which is just existing behavior that's been retained c.appendBucket(prefix, bucket, tags) c.appendGauge(0, tags) } @@ -214,8 +222,8 @@ func (c *conn) appendNumber(v interface{}) { } } -func isNegative(v interface{}) bool { - switch n := v.(type) { +func isNegativeInteger(n interface{}) bool { + switch n := n.(type) { case int: return n < 0 case int64: @@ -226,12 +234,20 @@ func isNegative(v interface{}) bool { return n < 0 case int8: return n < 0 + default: + return false + } +} + +func floatValue(n interface{}) (float64, bool) { + switch n := n.(type) { case float64: - return n < 0 + return n, true case float32: - return n < 0 + return float64(n), true + default: + return 0, false } - return false } func (c *conn) appendBucket(prefix, bucket string, tags string) { diff --git a/statsd.go b/statsd.go index 042749e..c86724c 100644 --- a/statsd.go +++ b/statsd.go @@ -103,7 +103,7 @@ func (c *Client) Gauge(bucket string, value interface{}) { c.conn.gauge(c.prefix, bucket, value, c.tags) } -// GaugeRelative records an relative value for the given bucket. +// GaugeRelative records a relative value for the given bucket. func (c *Client) GaugeRelative(bucket string, value interface{}) { if c.skip() { return diff --git a/statsd_test.go b/statsd_test.go index 8abc7a3..fb976df 100644 --- a/statsd_test.go +++ b/statsd_test.go @@ -5,6 +5,7 @@ import ( "errors" "io" "io/ioutil" + "math" "net" "runtime" "strings" @@ -33,17 +34,182 @@ func TestIncrement(t *testing.T) { } func TestGauge(t *testing.T) { - testOutput(t, "test_key:5|g\ntest_key:0|g\ntest_key:-10|g", func(c *Client) { + testOutput(t, "test_key:5|g\ntest_key:0|g\ntest_key:-10|g\ntest_key:5|g\ntest_key:0|g\ntest_key:-10|g\ntest_key:0|g\ntest_key:0|g", func(c *Client) { c.Gauge(testKey, 5) c.Gauge(testKey, -10) + c.Gauge(testKey, 5.0) + c.Gauge(testKey, -10.0) + c.Gauge(testKey, 0.0) + c.Gauge(testKey, math.Copysign(0, -1)) }) } -func TestGaugeRelative(t *testing.T) { - testOutput(t, "test_key:+5|g\ntest_key:-10|g", func(c *Client) { - c.GaugeRelative(testKey, 5) - c.GaugeRelative(testKey, -10) - }) +func TestClient_GaugeRelative(t *testing.T) { + for _, tc := range [...]struct { + Name string + Output string + Input func(c *Client) + }{ + { + Name: "inc then dec", + Output: "test_key:+5|g\ntest_key:-10|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, 5) + c.GaugeRelative(testKey, -10) + }, + }, + { + Name: "neg", + Output: "test_key:-3.123|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, -3.123) + }, + }, + { + Name: "pos", + Output: "test_key:+3.123|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, 3.123) + }, + }, + { + Name: "zero", + Output: "test_key:+0|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, 0) + }, + }, + { + Name: "zero float64", + Output: "test_key:+0|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, float64(0)) + }, + }, + { + Name: "zero float32", + Output: "test_key:+0|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, float32(0)) + }, + }, + { + Name: "neg zero float64", + Output: "test_key:-0|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, math.Copysign(0, -1)) + }, + }, + { + Name: "neg zero float32", + Output: "test_key:-0|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, float32(math.Copysign(0, -1))) + }, + }, + { + Name: "pos large", + Output: "test_key:+5932443000000000000000000000|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, float32(59324.4289e23)) + }, + }, + { + Name: "pos small", + Output: "test_key:+0.00000000000000000059324427|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, float32(59324.4289e-23)) + }, + }, + { + Name: "uint8", + Output: "test_key:+252|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, byte(252)) + }, + }, + { + Name: "neg int64", + Output: "test_key:-9942423|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, int64(-9942423)) + }, + }, + { + Name: "uint 0", + Output: "test_key:+0|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, uint(0)) + }, + }, + { + Name: "pos max float64", + Output: "test_key:+179769313486231570000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, math.MaxFloat64) + }, + }, + { + Name: "neg max float64", + Output: "test_key:-179769313486231570000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, -math.MaxFloat64) + }, + }, + { + Name: "pos inf float64", + Output: "test_key:+Inf|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, math.Inf(1)) + }, + }, + { + Name: "neg inf float64", + Output: "test_key:-Inf|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, math.Inf(-1)) + }, + }, + { + Name: "pos inf float32", + Output: "test_key:+Inf|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, float32(math.Inf(1))) + }, + }, + { + Name: "neg inf float32", + Output: "test_key:-Inf|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, float32(math.Inf(-1))) + }, + }, + { + Name: "nan float64", + Output: "test_key:+NaN|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, math.NaN()) + }, + }, + { + Name: "nan float32", + Output: "test_key:+NaN|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, float32(math.NaN())) + }, + }, + { + Name: "unsupported", + Output: "test_key:+|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, struct{}{}) + }, + }, + } { + t.Run(tc.Name, func(t *testing.T) { + testOutput(t, tc.Output, tc.Input) + }) + } } func TestTiming(t *testing.T) { @@ -141,6 +307,7 @@ func TestMute(t *testing.T) { } c.Increment(testKey) c.Gauge(testKey, 1) + c.GaugeRelative(testKey, 1) c.Timing(testKey, 1) c.Histogram(testKey, 1) c.Unique(testKey, "1") From 217995054894500f07798513a8e84e23d8f8ed3d Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Sat, 1 Jan 2022 00:32:56 +1000 Subject: [PATCH 32/35] Update changelog --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 3e4382b..a8fbf50 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,8 @@ Note that this repository is versioned independently of the (unmaintained) upstr **Changelog** +* 2022-01-01 - Added support for relative gauges via `GaugeRelative`, and fixed gauge handling of negative zero floats + * 2021-12-31 - Added support for disabling the initial ping/check for UDP connections via new `UDPCheck` option * 2020-11-13 - Added support for go modules From 78fe5e2df6b2a7014b957a987672bb46c5dcaf36 Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Sat, 1 Jan 2022 00:56:42 +1000 Subject: [PATCH 33/35] Readme typo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a8fbf50..ce163ed 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Note that this repository is versioned independently of the (unmaintained) upstr **Changelog** -* 2022-01-01 - Added support for relative gauges via `GaugeRelative`, and fixed gauge handling of negative zero floats +* 2022-01-01 - Added support for relative gauges via `Client.GaugeRelative`, and fixed gauge handling of -0 floats * 2021-12-31 - Added support for disabling the initial ping/check for UDP connections via new `UDPCheck` option From 0ded5b1a84e7e984bd286f277997b1f49d5ad742 Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Sat, 1 Jan 2022 10:42:22 +1000 Subject: [PATCH 34/35] Remove .github directory that was erroneously included --- .github/ISSUE_TEMPLATE/bug_report.md | 21 ------------ .github/ISSUE_TEMPLATE/feature_request.md | 20 ----------- .github/workflows/go.yml | 41 ----------------------- 3 files changed, 82 deletions(-) delete mode 100644 .github/ISSUE_TEMPLATE/bug_report.md delete mode 100644 .github/ISSUE_TEMPLATE/feature_request.md delete mode 100644 .github/workflows/go.yml diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md deleted file mode 100644 index ff74c28..0000000 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ /dev/null @@ -1,21 +0,0 @@ ---- -name: Bug report -about: Create a report to help us improve -title: "[bug]" -labels: '' -assignees: '' - ---- - -**Describe the bug** -A clear and concise description of what the bug is. - -**To Reproduce** -Steps to reproduce the behavior. -Preferably, a short / minimal code example, that reproduces the behavior. - -**Expected behavior** -A clear and concise description of what you expected to happen. - -**Additional context** -Add any other context about the problem here. diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md deleted file mode 100644 index 70d2797..0000000 --- a/.github/ISSUE_TEMPLATE/feature_request.md +++ /dev/null @@ -1,20 +0,0 @@ ---- -name: Feature request -about: Suggest an idea for this project -title: "[feature request]" -labels: '' -assignees: '' - ---- - -**Is your feature request related to a problem? Please describe.** -A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] - -**Describe the solution you'd like** -A clear and concise description of what you want to happen. - -**Describe alternatives you've considered** -A clear and concise description of any alternative solutions or features you've considered. - -**Additional context** -Add any other context or screenshots about the feature request here. diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml deleted file mode 100644 index 1a1414c..0000000 --- a/.github/workflows/go.yml +++ /dev/null @@ -1,41 +0,0 @@ -name: Go -on: - push: - branches: - - master - pull_request: - branches: - - master -jobs: - build: - runs-on: ubuntu-latest - permissions: - actions: read - contents: read - strategy: - fail-fast: false - matrix: - go-version: - - 1.17 - - 1.13 - steps: - - name: Set up Go - uses: actions/setup-go@v2 - with: - go-version: ${{ matrix.go-version }} - - name: Set up Staticcheck - run: go install honnef.co/go/tools/cmd/staticcheck@latest - if: ${{ matrix.go-version != '1.13' }} - - name: Set up Staticcheck (Go v1.13) - run: GO111MODULE=on go get -u honnef.co/go/tools/cmd/staticcheck@2021.1 - if: ${{ matrix.go-version == '1.13' }} - - name: Checkout - uses: actions/checkout@v2 - - name: Vet - run: go vet -v ./... - - name: Staticcheck - run: staticcheck ./... - - name: Build - run: go build -v ./... - - name: Test - run: go test -v -cover ./... From efc594e7845b2924dea868a376f5291a35246c75 Mon Sep 17 00:00:00 2001 From: Joseph Cumines Date: Sat, 1 Jan 2022 10:45:36 +1000 Subject: [PATCH 35/35] Restore .travis.yml, reset CHANGELOG.md, remove go.mod --- .travis.yml | 9 +++++++++ CHANGELOG.md | 4 ---- go.mod | 3 --- 3 files changed, 9 insertions(+), 7 deletions(-) create mode 100644 .travis.yml delete mode 100644 go.mod diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..48915e7 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,9 @@ +language: go + +go: + - 1.2 + - 1.3 + - 1.4 + - 1.5 + - 1.6 + - tip diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b26c92..04d811b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,4 @@ # Change Log - -**NOTE:** This is the original change log, from the (unmaintained) upstream. -Please refer to the [readme](README.md) and Git history, for more recent changes. - All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). diff --git a/go.mod b/go.mod deleted file mode 100644 index 3cd736f..0000000 --- a/go.mod +++ /dev/null @@ -1,3 +0,0 @@ -module github.com/joeycumines/statsd - -go 1.14