diff --git a/worker/agent.go b/worker/agent.go index 4c127ec..0442b22 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -24,16 +24,18 @@ type agent struct { rw *bufio.ReadWriter worker *Worker in chan []byte + readBuffPool *sync.Pool net, addr string } // Create the agent of job server. func newAgent(net, addr string, worker *Worker) (a *agent, err error) { a = &agent{ - net: net, - addr: addr, - worker: worker, - in: make(chan []byte, rt.QueueSize), + net: net, + addr: addr, + worker: worker, + in: make(chan []byte, rt.QueueSize), + readBuffPool: &sync.Pool{New: func() interface{} { return rt.NewBuffer(rt.BufferSize) }}, } return } @@ -55,11 +57,14 @@ func (a *agent) work() { var err error var data, leftdata []byte startRw := a.loadRw() + if startRw == nil { + return // we are reconnecting at the moment, a new go routine will run on work() when reconnected... + } // exit the loop if connection has been replaced because reconnect will launch a new work() thread - for startRw == a.loadRw() && !a.worker.isShuttingDown() { + for !a.worker.isShuttingDown() { - if data, err = a.read(); err != nil { + if data, err = a.read(startRw); err != nil { if opErr, ok := err.(*net.OpError); ok { if opErr.Temporary() { a.worker.Log(Info, "opErr.Temporary():", a.addr) @@ -68,12 +73,12 @@ func (a *agent) work() { a.worker.Log(Info, "got permanent network error with server:", a.addr, "comm thread exiting.") a.reconnectError(err) // else - we're probably dc'ing due to a Close() - break + return } } else { a.worker.Log(Info, "got error", err.Error(), "with server:", a.addr, "comm thread exiting...") a.reconnectError(err) - break + return } } if len(leftdata) > 0 { // some data left for processing @@ -86,7 +91,7 @@ func (a *agent) work() { for { if inpack, l, err = decodeInPack(data); err != nil { a.reconnectError(err) - break + return } else { leftdata = nil inpack.a = a @@ -99,7 +104,6 @@ func (a *agent) work() { } } } - } } @@ -247,13 +251,14 @@ func (a *agent) Connect() { } // read length bytes from the socket -func (a *agent) read() (data []byte, err error) { +func (a *agent) read(myRw *bufio.ReadWriter) (data []byte, err error) { n := 0 - tmp := rt.NewBuffer(rt.BufferSize) + tmp := a.readBuffPool.Get().([]byte) + defer a.readBuffPool.Put(tmp) + var buf bytes.Buffer - myRw := a.loadRw() // read the header so we can get the length of the data if n, err = myRw.Read(tmp); err != nil { return