From a465fcc5cf2abb36ed17b8d531704bfc65e98153 Mon Sep 17 00:00:00 2001 From: "anatolii.belomestnov" Date: Tue, 23 Apr 2019 16:09:30 -0700 Subject: [PATCH 1/2] Remove extra rw checking and test it out The old rw will error out on read() op so we don't need to recheck rw on each iteration --- worker/agent.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/worker/agent.go b/worker/agent.go index 4c127ec..7e250ba 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -55,11 +55,14 @@ func (a *agent) work() { var err error var data, leftdata []byte startRw := a.loadRw() + if startRw == nil { + return // we are reconnecting at the moment, a new go routine will run on work() when reconnected... + } // exit the loop if connection has been replaced because reconnect will launch a new work() thread - for startRw == a.loadRw() && !a.worker.isShuttingDown() { + for !a.worker.isShuttingDown() { - if data, err = a.read(); err != nil { + if data, err = a.read(startRw); err != nil { if opErr, ok := err.(*net.OpError); ok { if opErr.Temporary() { a.worker.Log(Info, "opErr.Temporary():", a.addr) @@ -68,12 +71,12 @@ func (a *agent) work() { a.worker.Log(Info, "got permanent network error with server:", a.addr, "comm thread exiting.") a.reconnectError(err) // else - we're probably dc'ing due to a Close() - break + return } } else { a.worker.Log(Info, "got error", err.Error(), "with server:", a.addr, "comm thread exiting...") a.reconnectError(err) - break + return } } if len(leftdata) > 0 { // some data left for processing @@ -86,7 +89,7 @@ func (a *agent) work() { for { if inpack, l, err = decodeInPack(data); err != nil { a.reconnectError(err) - break + return } else { leftdata = nil inpack.a = a @@ -99,7 +102,6 @@ func (a *agent) work() { } } } - } } @@ -247,13 +249,12 @@ func (a *agent) Connect() { } // read length bytes from the socket -func (a *agent) read() (data []byte, err error) { +func (a *agent) read(myRw *bufio.ReadWriter) (data []byte, err error) { n := 0 tmp := rt.NewBuffer(rt.BufferSize) var buf bytes.Buffer - myRw := a.loadRw() // read the header so we can get the length of the data if n, err = myRw.Read(tmp); err != nil { return From dea540e79b2523a65058a5fd2087732cf6e21aae Mon Sep 17 00:00:00 2001 From: Anatolii Belomestnov Date: Tue, 23 Apr 2019 20:16:39 -0700 Subject: [PATCH 2/2] Add pooling of read buffer in worker --- worker/agent.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/worker/agent.go b/worker/agent.go index 7e250ba..0442b22 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -24,16 +24,18 @@ type agent struct { rw *bufio.ReadWriter worker *Worker in chan []byte + readBuffPool *sync.Pool net, addr string } // Create the agent of job server. func newAgent(net, addr string, worker *Worker) (a *agent, err error) { a = &agent{ - net: net, - addr: addr, - worker: worker, - in: make(chan []byte, rt.QueueSize), + net: net, + addr: addr, + worker: worker, + in: make(chan []byte, rt.QueueSize), + readBuffPool: &sync.Pool{New: func() interface{} { return rt.NewBuffer(rt.BufferSize) }}, } return } @@ -252,7 +254,9 @@ func (a *agent) Connect() { func (a *agent) read(myRw *bufio.ReadWriter) (data []byte, err error) { n := 0 - tmp := rt.NewBuffer(rt.BufferSize) + tmp := a.readBuffPool.Get().([]byte) + defer a.readBuffPool.Put(tmp) + var buf bytes.Buffer // read the header so we can get the length of the data