From c257d5673ec0e129b0b9a52c365d2540d2bd7870 Mon Sep 17 00:00:00 2001 From: Anatolii Belomestnov Date: Sat, 17 Aug 2019 07:38:18 -0700 Subject: [PATCH 1/3] Simplify handler map --- client/handler_map.go | 70 +++++++++++---------------- client/handler_map_test.go | 98 +++++++++++++++++++++++++++++++++++--- 2 files changed, 119 insertions(+), 49 deletions(-) diff --git a/client/handler_map.go b/client/handler_map.go index 2304007..27d6081 100644 --- a/client/handler_map.go +++ b/client/handler_map.go @@ -15,7 +15,7 @@ type HandlerMap struct { } type waiter struct { - ready chan<- struct{} // Closed when semaphore acquired. + ready chan<- ResponseHandler // Closed when semaphore acquired. } func NewHandlerMap() *HandlerMap { @@ -44,6 +44,7 @@ func (m *HandlerMap) Put(key string, value ResponseHandler) { } w := next.Value.(waiter) waiters.Remove(next) + w.ready <- value close(w.ready) } delete(m.waitersMap, key) @@ -66,50 +67,35 @@ func (m *HandlerMap) Get(key string, timeoutMs int) (value ResponseHandler, ok b return } - // let's remember the current time - curTime := time.Now() - maxTime := curTime.Add(time.Duration(timeoutMs) * time.Millisecond) - - for time.Now().Before(maxTime) && !ok { - value, ok = m.innerMap[key] - if !ok { - nsLeft := maxTime.Sub(time.Now()).Nanoseconds() - ctx, _ := context.WithTimeout(context.Background(), time.Duration(nsLeft)*time.Nanosecond) - - waiters, wok := m.waitersMap[key] - if !wok { - waiters = &list.List{} - m.waitersMap[key] = waiters - } - ready := make(chan struct{}) - w := waiter{ready: ready} - elem := waiters.PushBack(w) - m.mu.Unlock() // unlock before we start waiting on stuff - - select { - case <-ctx.Done(): - m.mu.Lock() - select { - case <-ready: - // in case we got signalled during cancellation - continue - default: - // we got timeout, let's remove - waiters.Remove(elem) - if waiters.Len() == 0 { - delete(m.waitersMap, key) - } - } - m.mu.Unlock() - return - - case <-ready: - m.mu.Lock() // going back to the loop, gotta lock - continue + ctx, _ := context.WithTimeout(context.Background(), time.Duration(timeoutMs)*time.Millisecond) + waiters, wok := m.waitersMap[key] + if !wok { + waiters = &list.List{} + m.waitersMap[key] = waiters + } + ready := make(chan ResponseHandler) + w := waiter{ready: ready} + elem := waiters.PushBack(w) + m.mu.Unlock() // unlock before waiting + + select { + case <-ctx.Done(): + m.mu.Lock() + // check if the response arrived when it timed out + select { + case value = <-ready: + ok = true + default: + // got timeout, let's remove waiter + waiters.Remove(elem) + if waiters.Len() == 0 { + delete(m.waitersMap, key) } } + m.mu.Unlock() + case value = <-ready: + ok = true } - m.mu.Unlock() return } diff --git a/client/handler_map_test.go b/client/handler_map_test.go index 1ac036a..d84ddae 100644 --- a/client/handler_map_test.go +++ b/client/handler_map_test.go @@ -1,6 +1,7 @@ package client import ( + "fmt" "github.com/stretchr/testify/assert" "math" "testing" @@ -8,9 +9,10 @@ import ( ) const ( - testKey = "test_key" - timeoutMs = 200 - marginErrorPct = 10 + testKey = "test_key" + timeoutMs = 200 + marginErrorMs = 10 + numHandlers = 100 ) func getMsSince(startTime time.Time) int { @@ -60,9 +62,9 @@ func TestHandlerMapDelayedPutRetrieve(t *testing.T) { myHandler(nil) actualResponseMs := getMsSince(startTime) var comp assert.Comparison = func() (success bool) { - return math.Abs(float64(actualResponseMs-expectedResponseMs))/float64(expectedResponseMs) < float64(marginErrorPct)/100 + return math.Abs(float64(actualResponseMs-expectedResponseMs)) < marginErrorMs } - assert.Condition(t, comp, "Response did not arrive within %d%% margin, expected time %d ms", marginErrorPct, expectedResponseMs) + assert.Condition(t, comp, "Response did not arrive within %d ms margin, expected time %d ms, actual time %d ms", marginErrorMs, expectedResponseMs, actualResponseMs) } @@ -87,9 +89,9 @@ func TestHandlerMapTimeoutPutTooLate(t *testing.T) { actualTimeoutMs := getMsSince(startTime) t.Logf("test: timed out waiting for key at %d ms after start\n", actualTimeoutMs) var comp assert.Comparison = func() (success bool) { - return math.Abs(float64(actualTimeoutMs-timeoutMs))/timeoutMs < float64(marginErrorPct)/100 + return math.Abs(float64(actualTimeoutMs-timeoutMs)) < marginErrorMs } - assert.Condition(t, comp, "Timeout did not occur within %d%% margin, expected timeout ms: %d", marginErrorPct, timeoutMs) + assert.Condition(t, comp, "Timeout did not occur within %d ms margin, expected timeout ms: %d, actual time %d ms", marginErrorMs, timeoutMs, actualTimeoutMs) // wait till producer has added the element time.Sleep(3 * timeoutMs * time.Millisecond) counts, waiters := handler_map.GetCounts() @@ -98,3 +100,85 @@ func TestHandlerMapTimeoutPutTooLate(t *testing.T) { } } + +func TestMixedMultiPutGet(t *testing.T) { + + handler_map := NewHandlerMap() + startTime := time.Now() + delayedResponseTimeMs := timeoutMs / 2 + + go func() { + var handler ResponseHandler = func(r *Response) { + t.Logf("test: got a response [%s] at time %d ms after start\n", r.Handle, getMsSince(startTime)) + } + + for i := 0; i < numHandlers; i++ { + key := fmt.Sprintf("%s-%d", testKey, i) + handler_map.Put(key, handler) + } + + // at this point the Get would be waiting for the response. + counts, _ := handler_map.GetCounts() + assert.Equal(t, numHandlers, counts, "Map Elements") + + time.Sleep(time.Duration(delayedResponseTimeMs) * time.Millisecond) + + // by now we have some non-delayed elements, and also have waiters for the delayed elements + counts, waiters := handler_map.GetCounts() + assert.Equal(t, numHandlers, counts, "Map Elements") // elements for non-delayed ones + assert.Equal(t, numHandlers, waiters, "Map Elements") // for delayed ones + + for i := 0; i < numHandlers; i++ { + key := fmt.Sprintf("%s-%d-delayed", testKey, i) + handler_map.Put(key, handler) + } + + // at this point the Get would be waiting for the response. + counts, _ = handler_map.GetCounts() + assert.Equal(t, numHandlers*2, counts, "Map Elements") + + }() + + completion := make(chan bool) + getData := func(expectedTimeMs int, key string) { + myHandler, ok := handler_map.Get(key, timeoutMs) + + if !ok { + t.Errorf("Failed to get test key at time %d ms after start\n", getMsSince(startTime)) + } else { + myHandler(&Response{Handle: key}) + actualResponseMs := getMsSince(startTime) + var comp assert.Comparison = func() (success bool) { + return math.Abs(float64(actualResponseMs-expectedTimeMs)) < marginErrorMs + } + assert.Condition(t, comp, "Response did not arrive within %d ms margin, expected time %d ms, actual time: %d ms", marginErrorMs, expectedTimeMs, actualResponseMs) + } + completion <- true + } + + for i := 0; i < numHandlers; i++ { + go getData(0, fmt.Sprintf("%s-%d", testKey, i)) + go getData(delayedResponseTimeMs, fmt.Sprintf("%s-%d-delayed", testKey, i)) + // second set of receivers + go getData(delayedResponseTimeMs, fmt.Sprintf("%s-%d-delayed", testKey, i)) + } + + // wait for completion + for i := 0; i < numHandlers*3; i++ { + _ = <-completion + } + + counts, waiters := handler_map.GetCounts() + assert.Equal(t, numHandlers*2, counts, "Map Elements") + assert.Equal(t, 0, waiters, "Waiter groups") + + for i := 0; i < numHandlers; i++ { + handler_map.Delete(fmt.Sprintf("%s-%d", testKey, i)) + handler_map.Delete(fmt.Sprintf("%s-%d-delayed", testKey, i)) + } + + counts, waiters = handler_map.GetCounts() + assert.Equal(t, 0, counts, "Map Elements") + assert.Equal(t, 0, waiters, "Waiter groups") + +} From 9a3d5b539d81c2a6d8f2a9a9c226d51a9f2e07bb Mon Sep 17 00:00:00 2001 From: Anatolii Belomestnov Date: Sat, 17 Aug 2019 07:57:45 -0700 Subject: [PATCH 2/3] Add timeouts to multi-request test --- client/handler_map_test.go | 49 ++++++++++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 5 deletions(-) diff --git a/client/handler_map_test.go b/client/handler_map_test.go index d84ddae..d997877 100644 --- a/client/handler_map_test.go +++ b/client/handler_map_test.go @@ -12,7 +12,7 @@ const ( testKey = "test_key" timeoutMs = 200 marginErrorMs = 10 - numHandlers = 100 + numHandlers = 20 ) func getMsSince(startTime time.Time) int { @@ -106,6 +106,7 @@ func TestMixedMultiPutGet(t *testing.T) { handler_map := NewHandlerMap() startTime := time.Now() delayedResponseTimeMs := timeoutMs / 2 + timedoutResponseTimeMs := timeoutMs + marginErrorMs go func() { var handler ResponseHandler = func(r *Response) { @@ -125,8 +126,8 @@ func TestMixedMultiPutGet(t *testing.T) { // by now we have some non-delayed elements, and also have waiters for the delayed elements counts, waiters := handler_map.GetCounts() - assert.Equal(t, numHandlers, counts, "Map Elements") // elements for non-delayed ones - assert.Equal(t, numHandlers, waiters, "Map Elements") // for delayed ones + assert.Equal(t, numHandlers, counts, "Map Elements") // elements for non-delayed ones + assert.Equal(t, numHandlers*2, waiters, "Map Elements") // for delayed ones for i := 0; i < numHandlers; i++ { key := fmt.Sprintf("%s-%d-delayed", testKey, i) @@ -137,6 +138,12 @@ func TestMixedMultiPutGet(t *testing.T) { counts, _ = handler_map.GetCounts() assert.Equal(t, numHandlers*2, counts, "Map Elements") + time.Sleep(time.Duration(timedoutResponseTimeMs) * time.Millisecond) + for i := 0; i < numHandlers; i++ { + key := fmt.Sprintf("%s-%d-toolate", testKey, i) + handler_map.Put(key, handler) + } + }() completion := make(chan bool) @@ -156,27 +163,59 @@ func TestMixedMultiPutGet(t *testing.T) { completion <- true } + getTimeouts := func(expectedTimeMs int, key string) { + _, ok := handler_map.Get(key, timeoutMs) + + if ok { + t.Errorf("Should have gotten a timeout on key %s but received ok %d ms after start\n", key, getMsSince(startTime)) + } else { + t.Logf("test: got the expected timeout on key %s %d ms after start\n", key, getMsSince(startTime)) + actualResponseMs := getMsSince(startTime) + var comp assert.Comparison = func() (success bool) { + return math.Abs(float64(actualResponseMs-expectedTimeMs)) < marginErrorMs + } + assert.Condition(t, comp, "Timeout did not occur within %d ms margin, expected time %d ms, actual time: %d ms", marginErrorMs, expectedTimeMs, actualResponseMs) + } + completion <- true + } + for i := 0; i < numHandlers; i++ { go getData(0, fmt.Sprintf("%s-%d", testKey, i)) go getData(delayedResponseTimeMs, fmt.Sprintf("%s-%d-delayed", testKey, i)) // second set of receivers go getData(delayedResponseTimeMs, fmt.Sprintf("%s-%d-delayed", testKey, i)) + go getTimeouts(timeoutMs, fmt.Sprintf("%s-%d-timedout", testKey, i)) } - // wait for completion + // wait for completion of non-timed out response (immediate + delayed ones) for i := 0; i < numHandlers*3; i++ { _ = <-completion } counts, waiters := handler_map.GetCounts() assert.Equal(t, numHandlers*2, counts, "Map Elements") - assert.Equal(t, 0, waiters, "Waiter groups") + assert.Equal(t, numHandlers, waiters, "Waiter groups") // the "timedout" keys are waiting for timeout for i := 0; i < numHandlers; i++ { handler_map.Delete(fmt.Sprintf("%s-%d", testKey, i)) handler_map.Delete(fmt.Sprintf("%s-%d-delayed", testKey, i)) } + counts, waiters = handler_map.GetCounts() + assert.Equal(t, 0, counts, "Map Elements") + assert.Equal(t, numHandlers, waiters, "Waiter groups") + + // wait for timed out responses + for i := 0; i < numHandlers; i++ { + _ = <-completion + } + + // delete the timed out keys + for i := 0; i < numHandlers; i++ { + handler_map.Delete(fmt.Sprintf("%s-%d-timedout", testKey, i)) + } + + // at last should have no elements and no waiters counts, waiters = handler_map.GetCounts() assert.Equal(t, 0, counts, "Map Elements") assert.Equal(t, 0, waiters, "Waiter groups") From 6c52633381f8ab131c4d30cba73b48cfde632037 Mon Sep 17 00:00:00 2001 From: Anatolii Belomestnov Date: Sat, 17 Aug 2019 08:45:07 -0700 Subject: [PATCH 3/3] Test key to value match --- client/handler_map_test.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/client/handler_map_test.go b/client/handler_map_test.go index d997877..0c9061e 100644 --- a/client/handler_map_test.go +++ b/client/handler_map_test.go @@ -108,14 +108,18 @@ func TestMixedMultiPutGet(t *testing.T) { delayedResponseTimeMs := timeoutMs / 2 timedoutResponseTimeMs := timeoutMs + marginErrorMs - go func() { - var handler ResponseHandler = func(r *Response) { + getHandler := func(key string) ResponseHandler { + return func(r *Response) { + assert.Equal(t, key, r.Handle, fmt.Sprintf("Handler Key Mismatch, expected: %s, actual: %s", key, r.Handle)) t.Logf("test: got a response [%s] at time %d ms after start\n", r.Handle, getMsSince(startTime)) } + } + + go func() { for i := 0; i < numHandlers; i++ { key := fmt.Sprintf("%s-%d", testKey, i) - handler_map.Put(key, handler) + handler_map.Put(key, getHandler(key)) } // at this point the Get would be waiting for the response. @@ -131,7 +135,7 @@ func TestMixedMultiPutGet(t *testing.T) { for i := 0; i < numHandlers; i++ { key := fmt.Sprintf("%s-%d-delayed", testKey, i) - handler_map.Put(key, handler) + handler_map.Put(key, getHandler(key)) } // at this point the Get would be waiting for the response. @@ -141,7 +145,7 @@ func TestMixedMultiPutGet(t *testing.T) { time.Sleep(time.Duration(timedoutResponseTimeMs) * time.Millisecond) for i := 0; i < numHandlers; i++ { key := fmt.Sprintf("%s-%d-toolate", testKey, i) - handler_map.Put(key, handler) + handler_map.Put(key, getHandler(key)) } }()