From 3b0440bee4f1441f602139df97ef76597169d052 Mon Sep 17 00:00:00 2001 From: Yu Peng Date: Wed, 26 Nov 2025 16:52:41 +0800 Subject: [PATCH] chore: add context in itf instead of using Background also, add slow test in golang test env --- .github/workflows/golang.yml | 6 + libmc/__init__.py | 4 +- misc/slow_server.sh | 7 + src/golibmc.go | 96 ++++++----- src/golibmc_test.go | 320 ++++++++++++++++++++++++----------- src/version.go | 2 +- 6 files changed, 284 insertions(+), 151 deletions(-) create mode 100755 misc/slow_server.sh diff --git a/.github/workflows/golang.yml b/.github/workflows/golang.yml index 3b1cecb6..9ff070c3 100644 --- a/.github/workflows/golang.yml +++ b/.github/workflows/golang.yml @@ -25,8 +25,14 @@ jobs: uses: actions/setup-go@v2 with: go-version: ${{ matrix.gover }} + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: 3.13 - name: Start memcached servers run: ./misc/memcached_server start + - name: Start slow memcached server + run: ./misc/slow_server.sh - name: Run gotest run: | if [[ ${{ matrix.compiler }} = "gcc" ]]; then export CC=gcc CXX=g++; fi diff --git a/libmc/__init__.py b/libmc/__init__.py index 6fdc4ebe..991e2be1 100644 --- a/libmc/__init__.py +++ b/libmc/__init__.py @@ -33,8 +33,8 @@ __file__ as _libmc_so_file ) -__VERSION__ = "1.4.14" -__version__ = "1.4.14" +__VERSION__ = "1.4.15" +__version__ = "1.4.15" __author__ = "mckelvin" __email__ = "mckelvin@users.noreply.github.com" __date__ = "Fri Jun 7 06:16:00 2024 +0800" diff --git a/misc/slow_server.sh b/misc/slow_server.sh new file mode 100755 index 00000000..601538b3 --- /dev/null +++ b/misc/slow_server.sh @@ -0,0 +1,7 @@ +#!/bin/sh +set -ex + +python tests/shabby/slow_memcached_server.py & +pid=$! +echo "pid of slow memcached server: $pid" + diff --git a/src/golibmc.go b/src/golibmc.go index 45eb3fe2..2def8527 100644 --- a/src/golibmc.go +++ b/src/golibmc.go @@ -116,7 +116,7 @@ type Client struct { connectTimeout C.int pollTimeout C.int retryTimeout C.int - maxRetries C.int // maximum amount of retries. maxRetries <= 0 means unlimited. default is -1. + maxRetries C.int // maximum amount of retries. maxRetries <= 0 means unlimited. default is -1. lk sync.Mutex // protects following fields freeConns []*conn @@ -649,6 +649,7 @@ func (cn *conn) configHashFunction(val int) { } // ConfigTimeout Keys: +// // PollTimeout // ConnectTimeout // RetryTimeout @@ -670,14 +671,14 @@ func (client *Client) ConfigTimeout(cCfgKey C.config_options_t, timeout time.Dur // GetServerAddressByKey will return the address of the memcached // server where a key is stored (assume all memcached servers are // accessiable and wonot establish any connections. ) -func (client *Client) GetServerAddressByKey(key string) string { +func (client *Client) GetServerAddressByKey(ctx context.Context, key string) string { rawKey := client.addPrefix(key) cKey := C.CString(rawKey) defer C.free(unsafe.Pointer(cKey)) cKeyLen := C.size_t(len(rawKey)) - cn, err := client.conn(context.Background()) + cn, err := client.conn(ctx) if err != nil { return "" } @@ -692,13 +693,13 @@ func (client *Client) GetServerAddressByKey(key string) string { // server where a key is stored. (Will try to connect to // corresponding memcached server and may failover accordingly. ) // if no server is avaiable, an empty string will be returned. -func (client *Client) GetRealtimeServerAddressByKey(key string) string { +func (client *Client) GetRealtimeServerAddressByKey(ctx context.Context, key string) string { rawKey := client.addPrefix(key) cKey := C.CString(rawKey) defer C.free(unsafe.Pointer(cKey)) cKeyLen := C.size_t(len(rawKey)) - cn, err := client.conn(context.Background()) + cn, err := client.conn(ctx) if err != nil { return "" } @@ -732,7 +733,7 @@ func (client *Client) addPrefix(key string) string { return strings.Join([]string{client.prefix, key}, "") } -func (client *Client) store(cmd string, item *Item) error { +func (client *Client) store(ctx context.Context, cmd string, item *Item) error { key := client.addPrefix(item.Key) cKey := C.CString(key) @@ -750,7 +751,7 @@ func (client *Client) store(cmd string, item *Item) error { var errCode C.err_code_t - cn, err := client.conn(context.Background()) + cn, err := client.conn(ctx) if err != nil { return err } @@ -816,32 +817,32 @@ func (client *Client) store(cmd string, item *Item) error { } // Add is a storage command, return without error only when the key is empty -func (client *Client) Add(item *Item) error { - return client.store("add", item) +func (client *Client) Add(ctx context.Context, item *Item) error { + return client.store(ctx, "add", item) } // Replace is a storage command, return without error only when the key is not empty -func (client *Client) Replace(item *Item) error { - return client.store("replace", item) +func (client *Client) Replace(ctx context.Context, item *Item) error { + return client.store(ctx, "replace", item) } // Prepend value to an existed key -func (client *Client) Prepend(item *Item) error { - return client.store("prepend", item) +func (client *Client) Prepend(ctx context.Context, item *Item) error { + return client.store(ctx, "prepend", item) } // Append value to an existed key -func (client *Client) Append(item *Item) error { - return client.store("append", item) +func (client *Client) Append(ctx context.Context, item *Item) error { + return client.store(ctx, "append", item) } // Set value to a key -func (client *Client) Set(item *Item) error { - return client.store("set", item) +func (client *Client) Set(ctx context.Context, item *Item) error { + return client.store(ctx, "set", item) } // SetMulti will set multi values at once -func (client *Client) SetMulti(items []*Item) (failedKeys []string, err error) { +func (client *Client) SetMulti(ctx context.Context, items []*Item) (failedKeys []string, err error) { nItems := len(items) cKeys := make([]*C.char, nItems) cKeyLens := make([]C.size_t, nItems) @@ -877,7 +878,7 @@ func (client *Client) SetMulti(items []*Item) (failedKeys []string, err error) { var results **C.message_result_t var n C.size_t - cn, err := client.conn(context.Background()) + cn, err := client.conn(ctx) if err != nil { return []string{}, err } @@ -934,12 +935,12 @@ func (client *Client) SetMulti(items []*Item) (failedKeys []string, err error) { } // Cas is short for Compare And Swap -func (client *Client) Cas(item *Item) error { - return client.store("cas", item) +func (client *Client) Cas(ctx context.Context, item *Item) error { + return client.store(ctx, "cas", item) } // Delete a key -func (client *Client) Delete(key string) error { +func (client *Client) Delete(ctx context.Context, key string) error { rawKey := client.addPrefix(key) cKey := C.CString(rawKey) @@ -950,7 +951,7 @@ func (client *Client) Delete(key string) error { var rst **C.message_result_t var n C.size_t - cn, err := client.conn(context.Background()) + cn, err := client.conn(ctx) if err != nil { return err } @@ -981,7 +982,7 @@ func (client *Client) Delete(key string) error { } // DeleteMulti will delete multi keys at once -func (client *Client) DeleteMulti(keys []string) (failedKeys []string, err error) { +func (client *Client) DeleteMulti(ctx context.Context, keys []string) (failedKeys []string, err error) { var rawKeys []string if len(client.prefix) == 0 { rawKeys = keys @@ -1009,7 +1010,7 @@ func (client *Client) DeleteMulti(keys []string) (failedKeys []string, err error cKeyLen := C.size_t(len(key)) cKeyLens[i] = cKeyLen } - cn, err := client.conn(context.Background()) + cn, err := client.conn(ctx) if err != nil { return []string{}, err } @@ -1065,8 +1066,8 @@ func (client *Client) DeleteMulti(keys []string) (failedKeys []string, err error return } -func (client *Client) getOrGets(cmd string, key string) (item *Item, err error) { - cn, err := client.conn(context.Background()) +func (client *Client) getOrGets(ctx context.Context, cmd string, key string) (item *Item, err error) { + cn, err := client.conn(ctx) if err != nil { return nil, err } @@ -1119,17 +1120,17 @@ func (client *Client) getOrGets(cmd string, key string) (item *Item, err error) } // Get is a retrieval command. It will return Item or nil -func (client *Client) Get(key string) (*Item, error) { - return client.getOrGets("get", key) +func (client *Client) Get(ctx context.Context, key string) (*Item, error) { + return client.getOrGets(ctx, "get", key) } // Gets is a retrieval command. It will return Item(with casid) or nil -func (client *Client) Gets(key string) (*Item, error) { - return client.getOrGets("gets", key) +func (client *Client) Gets(ctx context.Context, key string) (*Item, error) { + return client.getOrGets(ctx, "gets", key) } // GetMulti will return a map of multi values -func (client *Client) GetMulti(keys []string) (rv map[string]*Item, err error) { +func (client *Client) GetMulti(ctx context.Context, keys []string) (rv map[string]*Item, err error) { nKeys := len(keys) var rawKeys []string if len(client.prefix) == 0 { @@ -1156,7 +1157,7 @@ func (client *Client) GetMulti(keys []string) (rv map[string]*Item, err error) { var rst **C.retrieval_result_t var n C.size_t - cn, err1 := client.conn(context.Background()) + cn, err1 := client.conn(ctx) if err1 != nil { err = err1 return @@ -1200,7 +1201,7 @@ func (client *Client) GetMulti(keys []string) (rv map[string]*Item, err error) { } // Touch command -func (client *Client) Touch(key string, expiration int64) error { +func (client *Client) Touch(ctx context.Context, key string, expiration int64) error { rawKey := client.addPrefix(key) cKey := C.CString(rawKey) @@ -1212,7 +1213,7 @@ func (client *Client) Touch(key string, expiration int64) error { var rst **C.message_result_t var n C.size_t - cn, err := client.conn(context.Background()) + cn, err := client.conn(ctx) if err != nil { return err } @@ -1242,7 +1243,7 @@ func (client *Client) Touch(key string, expiration int64) error { return networkError(errorMessage(errCode)) } -func (client *Client) incrOrDecr(cmd string, key string, delta uint64) (uint64, error) { +func (client *Client) incrOrDecr(ctx context.Context, cmd string, key string, delta uint64) (uint64, error) { rawKey := client.addPrefix(key) cKey := C.CString(rawKey) defer C.free(unsafe.Pointer(cKey)) @@ -1255,7 +1256,7 @@ func (client *Client) incrOrDecr(cmd string, key string, delta uint64) (uint64, var errCode C.err_code_t - cn, err := client.conn(context.Background()) + cn, err := client.conn(ctx) if err != nil { return 0, err } @@ -1296,22 +1297,22 @@ func (client *Client) incrOrDecr(cmd string, key string, delta uint64) (uint64, } // Incr will increase the value in key by delta -func (client *Client) Incr(key string, delta uint64) (uint64, error) { - return client.incrOrDecr("incr", key, delta) +func (client *Client) Incr(ctx context.Context, key string, delta uint64) (uint64, error) { + return client.incrOrDecr(ctx, "incr", key, delta) } // Decr will decrease the value in key by delta -func (client *Client) Decr(key string, delta uint64) (uint64, error) { - return client.incrOrDecr("decr", key, delta) +func (client *Client) Decr(ctx context.Context, key string, delta uint64) (uint64, error) { + return client.incrOrDecr(ctx, "decr", key, delta) } // Version will return a map reflecting versions of each memcached server -func (client *Client) Version() (map[string]string, error) { +func (client *Client) Version(ctx context.Context) (map[string]string, error) { var rst *C.broadcast_result_t var n C.size_t rv := make(map[string]string) - cn, err := client.conn(context.Background()) + cn, err := client.conn(ctx) if err != nil { return rv, err } @@ -1342,13 +1343,13 @@ func (client *Client) Version() (map[string]string, error) { } // Stats will return a map reflecting stats map of each memcached server -func (client *Client) Stats() (map[string](map[string]string), error) { +func (client *Client) Stats(ctx context.Context) (map[string](map[string]string), error) { var rst *C.broadcast_result_t var n C.size_t rv := make(map[string](map[string]string)) - cn, err := client.conn(context.Background()) + cn, err := client.conn(ctx) if err != nil { return rv, err } @@ -1401,12 +1402,12 @@ func (client *Client) ToggleFlushAllFeature(enabled bool) { // FlushAll will flush all memcached servers // You must call ToggleFlushAllFeature(True) first to // enable this feature. -func (client *Client) FlushAll() ([]string, error) { +func (client *Client) FlushAll(ctx context.Context) ([]string, error) { var rst *C.broadcast_result_t var n C.size_t flushedHosts := []string{} - cn, err := client.conn(context.Background()) + cn, err := client.conn(ctx) C.client_toggle_flush_all_feature( cn._imp, C.bool(client.flushAllEnabled), ) @@ -1484,6 +1485,7 @@ func (cn *conn) expired(timeout time.Duration) bool { } // FIXME(Harry): We are using quit everywhere when the conn is failed, +// // I think we can just close socket instead of send quit, it should save some time. // So don't mix up Quit and Close, implement a Close function plz. func (cn *conn) quit() error { diff --git a/src/golibmc_test.go b/src/golibmc_test.go index 1060d6a3..011773ef 100644 --- a/src/golibmc_test.go +++ b/src/golibmc_test.go @@ -1,9 +1,11 @@ package golibmc import ( + "context" "fmt" "strconv" "strings" + "sync" "testing" "time" ) @@ -79,7 +81,7 @@ func TestStats(t *testing.T) { } func testStats(mc *Client, t *testing.T) { - statsDict, err := mc.Stats() + statsDict, err := mc.Stats(context.Background()) if !(err == nil && len(statsDict) == len(mc.servers) && len(mc.servers) > 0) { t.Errorf("%d %d", len(statsDict), len(mc.servers)) } @@ -130,7 +132,7 @@ func TestPrefix(t *testing.T) { func TestGetServerAddress(t *testing.T) { mc := newSimplePrefixClient(1, "") - if addr := mc.GetServerAddressByKey("key"); addr != LocalMC { + if addr := mc.GetServerAddressByKey(context.Background(), "key"); addr != LocalMC { t.Error(addr) } @@ -196,7 +198,7 @@ func testRouter(t *testing.T, servers []string, rs map[string]string, prefix str mc := New(servers, noreply, prefix, hashFunc, failover, disableLock) for key, expectedServerAddr := range rs { - actualServerAddr := mc.GetServerAddressByKey(key) + actualServerAddr := mc.GetServerAddressByKey(context.Background(), key) if actualServerAddr != expectedServerAddr { t.Errorf( "actualServerAddr: %s, expectedServerAddr: %s", @@ -214,7 +216,7 @@ func TestSetNGet(t *testing.T) { func testSetNGet(mc *Client, t *testing.T) { key := "google谷歌" value := "Google" - version, err := mc.Version() + version, err := mc.Version(context.Background()) if len(version) == 0 || err != nil { t.Error("Bad version, make sure memcached servers are started") } @@ -225,17 +227,17 @@ func testSetNGet(mc *Client, t *testing.T) { Flags: 0, Expiration: 0, } - err = mc.Set(&item) + err = mc.Set(context.Background(), &item) if err != nil { t.Error(ErrorSet) } - val, err := mc.Get(key) + val, err := mc.Get(context.Background(), key) if v := string((*val).Value); v != value || err != nil { t.Error(ErrorGetAfterSet) } - dct, err := mc.GetMulti([]string{key}) + dct, err := mc.GetMulti(context.Background(), []string{key}) if err != nil { t.Error(ErrorGetAfterSet) } @@ -248,17 +250,17 @@ func testSetNGet(mc *Client, t *testing.T) { t.Error(ErrorGetAfterSet) } - err = mc.Delete(key) + err = mc.Delete(context.Background(), key) if err != nil { t.Error(ErrorGetAfterSet) } - val, err = mc.Get(key) + val, err = mc.Get(context.Background(), key) if !(val == nil && err == ErrCacheMiss) { t.Error(ErrorGetAfterSet) } - dct, err = mc.GetMulti([]string{key}) + dct, err = mc.GetMulti(context.Background(), []string{key}) if !(len(dct) == 0 && err == ErrCacheMiss) { t.Error(ErrorGetAfterSet) } @@ -286,10 +288,10 @@ func testSetMultiNGetMulti(mc *Client, t *testing.T) { keys[i] = key } - mc.SetMulti(items) + mc.SetMulti(context.Background(), items) // GetMulti Values and Check - itemsMap, _ := mc.GetMulti(keys) + itemsMap, _ := mc.GetMulti(context.Background(), keys) for _, item := range items { itemGot := itemsMap[item.Key] @@ -304,8 +306,8 @@ func testSetMultiNGetMulti(mc *Client, t *testing.T) { items[i].Value = []byte(newValue) } - mc.SetMulti(items) - itemsMap, _ = mc.GetMulti(keys) + mc.SetMulti(context.Background(), items) + itemsMap, _ = mc.GetMulti(context.Background(), keys) for _, item := range items { itemGot := itemsMap[item.Key] @@ -318,8 +320,8 @@ func testSetMultiNGetMulti(mc *Client, t *testing.T) { } } // Delete Values and Check Again - mc.DeleteMulti(keys) - itemsMap, err := mc.GetMulti(keys) + mc.DeleteMulti(context.Background(), keys) + itemsMap, err := mc.GetMulti(context.Background(), keys) if !(len(itemsMap) == 0 && err == ErrCacheMiss) { t.Error(err) } @@ -337,22 +339,22 @@ func testIncrNDecr(mc *Client, t *testing.T) { Flags: 0, Expiration: 0, } - mc.Set(item) - if val, err := mc.Incr(key, 1); !(val == 100 && err == nil) { + mc.Set(context.Background(), item) + if val, err := mc.Incr(context.Background(), key, 1); !(val == 100 && err == nil) { t.Error(err) } - if val, err := mc.Decr(key, 1); !(val == 99 && err == nil) { + if val, err := mc.Decr(context.Background(), key, 1); !(val == 99 && err == nil) { t.Error(err) } - mc.Delete(key) + mc.Delete(context.Background(), key) - if val, err := mc.Incr(key, 1); !(val == 0 && err != nil) { + if val, err := mc.Incr(context.Background(), key, 1); !(val == 0 && err != nil) { t.Error(err) } - if val, err := mc.Decr(key, 1); !(val == 0 && err != nil) { + if val, err := mc.Decr(context.Background(), key, 1); !(val == 0 && err != nil) { t.Error(err) } } @@ -369,7 +371,7 @@ func testLargeValue(mc *Client, t *testing.T) { Flags: 0, Expiration: 0, } - if err := mc.Set(item); err == nil { + if err := mc.Set(context.Background(), item); err == nil { t.Error(err) } } @@ -383,17 +385,17 @@ func testCasAndGets(mc *Client, t *testing.T) { val := []byte("o") val2 := []byte("2") item := &Item{Key: key, Value: val} - err := mc.Set(item) + err := mc.Set(context.Background(), item) if err != nil { t.Error(err) } - item2, err := mc.Gets(key) + item2, err := mc.Gets(context.Background(), key) if item2 == nil || err != nil { t.Error(err) } (*item).Value = val2 - mc.Set(item) - item3, err := mc.Gets(key) + mc.Set(context.Background(), item) + item3, err := mc.Gets(context.Background(), key) if item3 == nil || err != nil { t.Error(err) } @@ -406,14 +408,14 @@ func testCasAndGets(mc *Client, t *testing.T) { } item3.Value = []byte("VVV") - err = mc.Cas(item3) + err = mc.Cas(context.Background(), item3) if err != nil { t.Error(err) } item3.Key = "not_existed_key" - mc.Delete(item3.Key) - err = mc.Cas(item3) + mc.Delete(context.Background(), item3.Key) + err = mc.Cas(context.Background(), item3) if err != ErrCacheMiss { t.Error(err) } @@ -435,11 +437,11 @@ func testNoreploy(mc *Client, t *testing.T) { Flags: 0, Expiration: 0, } - if err := mc.Set(item); err != nil { + if err := mc.Set(context.Background(), item); err != nil { t.Error(err) } - itemGot, err := mc.Gets(key) + itemGot, err := mc.Gets(context.Background(), key) if err != nil || string(itemGot.Value) != value { t.Error(err) } @@ -447,37 +449,37 @@ func testNoreploy(mc *Client, t *testing.T) { value2 := "22" (*item).Value = []byte(value2) - err = mc.Cas(item) + err = mc.Cas(context.Background(), item) if err != nil { t.Error(err) } - itemGot, err = mc.Get(key) + itemGot, err = mc.Get(context.Background(), key) if err != nil || string(itemGot.Value) != value2 { t.Error(err) } - val3, err := mc.Incr(key, 10) + val3, err := mc.Incr(context.Background(), key, 10) if err != nil || val3 != 0 { t.Error(val3, err) } - itemGot, err = mc.Gets(key) + itemGot, err = mc.Gets(context.Background(), key) if err != nil || string(itemGot.Value) != "32" { t.Error(err) } - val4, err := mc.Decr(key, 12) + val4, err := mc.Decr(context.Background(), key, 12) if err != nil || val4 != 0 { t.Error(err) } - itemGot, err = mc.Gets(key) + itemGot, err = mc.Gets(context.Background(), key) if err != nil || string(itemGot.Value) != "20" { t.Error(err) } - err = mc.Delete(key) + err = mc.Delete(context.Background(), key) if err != nil { t.Error(err) } @@ -496,34 +498,34 @@ func testTouch(mc *Client, t *testing.T) { } // Touch -1 - mc.Set(item) - if itemGot, err := mc.Get(key); err != nil || string(itemGot.Value) != value { + mc.Set(context.Background(), item) + if itemGot, err := mc.Get(context.Background(), key); err != nil || string(itemGot.Value) != value { t.Error(err) } - if err := mc.Touch(key, -1); err != nil { + if err := mc.Touch(context.Background(), key, -1); err != nil { t.Error(err) } - if itemGot, err := mc.Get(key); err != ErrCacheMiss || itemGot != nil { + if itemGot, err := mc.Get(context.Background(), key); err != ErrCacheMiss || itemGot != nil { t.Error(err) } // Touch - mc.Set(item) - if itemGot, err := mc.Get(key); err != nil || string(itemGot.Value) != value { + mc.Set(context.Background(), item) + if itemGot, err := mc.Get(context.Background(), key); err != nil || string(itemGot.Value) != value { t.Error(err) } - if err := mc.Touch(key, 1); err != nil { + if err := mc.Touch(context.Background(), key, 1); err != nil { t.Error(err) } - if itemGot, err := mc.Get(key); err != nil || string(itemGot.Value) != value { + if itemGot, err := mc.Get(context.Background(), key); err != nil || string(itemGot.Value) != value { t.Error(err) } // The value is expected to be exired in 1s, // so we sleep 2s and check if it's expired. time.Sleep(2 * time.Second) - if itemGot, err := mc.Get(key); err != ErrCacheMiss || itemGot != nil { + if itemGot, err := mc.Get(context.Background(), key); err != ErrCacheMiss || itemGot != nil { t.Error(err) } } @@ -539,20 +541,20 @@ func testAdd(mc *Client, t *testing.T) { Key: key, Value: []byte(valueToAdd), } - mc.Delete(key) - if err := mc.Add(item); err != nil { + mc.Delete(context.Background(), key) + if err := mc.Add(context.Background(), item); err != nil { t.Error(err) } - if itemGot, err := mc.Get(key); err != nil || string(itemGot.Value) != valueToAdd { + if itemGot, err := mc.Get(context.Background(), key); err != nil || string(itemGot.Value) != valueToAdd { t.Error(err) } - if err := mc.Add(item); err != ErrNotStored { + if err := mc.Add(context.Background(), item); err != ErrNotStored { t.Error(err) } key2 := "test_add2" item.Key = key2 - mc.Delete(key2) - if err := mc.Add(item); err != nil { + mc.Delete(context.Background(), key2) + if err := mc.Add(context.Background(), item); err != nil { t.Error(err) } } @@ -567,22 +569,22 @@ func testReplace(mc *Client, t *testing.T) { Key: key, Value: []byte(""), } - mc.Delete(key) + mc.Delete(context.Background(), key) - if err := mc.Replace(item); err != ErrNotStored { + if err := mc.Replace(context.Background(), item); err != ErrNotStored { t.Error(err) } item.Value = []byte("b") - if err := mc.Set(item); err != nil { + if err := mc.Set(context.Background(), item); err != nil { t.Error(err) } item.Value = []byte("a") - if err := mc.Replace(item); err != nil { + if err := mc.Replace(context.Background(), item); err != nil { t.Error(err) } - if itemGot, err := mc.Get(key); err != nil || string(itemGot.Value) != "a" { + if itemGot, err := mc.Get(context.Background(), key); err != nil || string(itemGot.Value) != "a" { t.Error(err) } } @@ -596,31 +598,31 @@ func testPrepend(mc *Client, t *testing.T) { value := "prepend\n" value2 := "" value3 := "before\n" - mc.Delete(key) + mc.Delete(context.Background(), key) item := &Item{ Key: key, Value: []byte(value), } - if err := mc.Prepend(item); err == nil { + if err := mc.Prepend(context.Background(), item); err == nil { t.Error(err) } item.Value = []byte(value2) - if err := mc.Set(item); err != nil { + if err := mc.Set(context.Background(), item); err != nil { t.Error(err) } item.Value = []byte(value) - if err := mc.Prepend(item); err != nil { + if err := mc.Prepend(context.Background(), item); err != nil { t.Error(err) } item.Value = []byte(value3) - if err := mc.Prepend(item); err != nil { + if err := mc.Prepend(context.Background(), item); err != nil { t.Error(err) } - itemGot, err := mc.Get(key) + itemGot, err := mc.Get(context.Background(), key) if !(err == nil && string(itemGot.Value) == value3+value) { t.Error(err) } @@ -635,31 +637,31 @@ func testAppend(mc *Client, t *testing.T) { value := "append\n" value2 := "" value3 := "after\n" - mc.Delete(key) + mc.Delete(context.Background(), key) item := &Item{ Key: key, Value: []byte(value), } - if err := mc.Append(item); err == nil { + if err := mc.Append(context.Background(), item); err == nil { t.Error(err) } item.Value = []byte(value2) - if err := mc.Set(item); err != nil { + if err := mc.Set(context.Background(), item); err != nil { t.Error(err) } item.Value = []byte(value) - if err := mc.Append(item); err != nil { + if err := mc.Append(context.Background(), item); err != nil { t.Error(err) } item.Value = []byte(value3) - if err := mc.Append(item); err != nil { + if err := mc.Append(context.Background(), item); err != nil { t.Error(err) } - itemGot, err := mc.Get(key) + itemGot, err := mc.Get(context.Background(), key) if !(err == nil && string(itemGot.Value) == value+value3) { _ = itemGot t.Errorf("[%s] : [%s]", string(itemGot.Value), value+value3) @@ -672,7 +674,7 @@ func TestSpecialItems(t *testing.T) { func testSpecialItems(mc *Client, t *testing.T) { key := "a\r\nb" - if err := mc.Delete(key); err == nil { + if err := mc.Delete(context.Background(), key); err == nil { t.Error(err) } value := "" @@ -680,7 +682,7 @@ func testSpecialItems(mc *Client, t *testing.T) { Key: key, Value: []byte(value), } - if err := mc.Set(item); err == nil { + if err := mc.Set(context.Background(), item); err == nil { t.Error(err) } } @@ -693,7 +695,7 @@ func TestInjection(t *testing.T) { } func testInjection(mc *Client, t *testing.T) { - mc.Delete("injected") + mc.Delete(context.Background(), "injected") key := strings.Repeat("a", 250) invalidKey := strings.Repeat("a", 251) value := "biu" @@ -703,21 +705,21 @@ func testInjection(mc *Client, t *testing.T) { Flags: 0, Expiration: 0, } - if err := mc.Set(item); err != nil { + if err := mc.Set(context.Background(), item); err != nil { t.Error(err) } item.Key = invalidKey - if err := mc.Set(item); err == nil { + if err := mc.Set(context.Background(), item); err == nil { t.Error(err) } item.Value = []byte("set injected 0 3600 10\r\n1234567890") - if err := mc.Set(item); err == nil { + if err := mc.Set(context.Background(), item); err == nil { t.Error(err) } - it, err := mc.Get("injected") + it, err := mc.Get(context.Background(), "injected") if it != nil || err != ErrCacheMiss { t.Error(err) } @@ -725,18 +727,18 @@ func testInjection(mc *Client, t *testing.T) { item.Key = "key1" item.Value = []byte("1234567890") - if err := mc.Set(item); err != nil { + if err := mc.Set(context.Background(), item); err != nil { t.Error(err) } item.Key = "key1 0" item.Value = []byte("123456789012345678901234567890\r\nset injected 0 3600 3\r\nINJ\r\n") - if err := mc.Set(item); err == nil { + if err := mc.Set(context.Background(), item); err == nil { t.Error(err) } - item, err = mc.Get("injected") + item, err = mc.Get(context.Background(), "injected") if item != nil || err != ErrCacheMiss { t.Error(err) } @@ -755,8 +757,8 @@ func testMaxiov(mc *Client, t *testing.T) { keys[i] = fmt.Sprintf(keyTmpl, i) } - mc.DeleteMulti(keys) - itemsGot, err := mc.GetMulti(keys) + mc.DeleteMulti(context.Background(), keys) + itemsGot, err := mc.GetMulti(context.Background(), keys) if len(itemsGot) != 0 || err != ErrCacheMiss { t.Error(err) } @@ -788,29 +790,29 @@ func testQuit(mc, mc2 *Client, t *testing.T) { keys[i] = key } - mc.SetMulti(items) + mc.SetMulti(context.Background(), items) - if _, err := mc.DeleteMulti(keys); err != nil && err != ErrCacheMiss { + if _, err := mc.DeleteMulti(context.Background(), keys); err != nil && err != ErrCacheMiss { t.Error(err) } - if _, err := mc.SetMulti(items); err != nil { + if _, err := mc.SetMulti(context.Background(), items); err != nil { t.Error(err) } - if _, err := mc2.DeleteMulti(keys); err != nil { + if _, err := mc2.DeleteMulti(context.Background(), keys); err != nil { t.Error(err) } - if _, err := mc2.SetMulti(items); err != nil { + if _, err := mc2.SetMulti(context.Background(), items); err != nil { t.Error(err) } - if versions, err := mc.Version(); err != nil || len(versions) == 0 { + if versions, err := mc.Version(context.Background()); err != nil || len(versions) == 0 { t.Error(err) } - if versions, err := mc2.Version(); err != nil || len(versions) == 0 { + if versions, err := mc2.Version(context.Background()); err != nil || len(versions) == 0 { t.Error(err) } - st1, err1 := mc.Stats() - st2, err2 := mc2.Stats() + st1, err1 := mc.Stats(context.Background()) + st2, err2 := mc2.Stats(context.Background()) if err1 != nil || err2 != nil { t.Error(err1, err2) } @@ -829,7 +831,7 @@ func testQuit(mc, mc2 *Client, t *testing.T) { } for i := 0; i < 3; i++ { - st2, err := mc2.Stats() + st2, err := mc2.Stats(context.Background()) if err != nil { t.Error(err) break @@ -892,6 +894,42 @@ func TestMaybeOpenNewConnections(t *testing.T) { mc.Quit() } +func TestConnRespectContext(t *testing.T) { + mc := newSimpleClient(1) + _, err := mc.conn(context.Background()) + if err != nil { + t.Error(err) + } + + failChan := make(chan *conn, 1) // Buffered channel + + go func(ctx context.Context, mc *Client) { + c, _ := mc.conn(ctx) + failChan <- c + }(context.Background(), mc) + + select { + case <-failChan: + t.Error("get conn should be stuck in context.Background()") + case <-time.After(5 * time.Second): + } + + successChan := make(chan *conn, 1) // Buffered channel + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() // releases resources if slowOperation completes before timeout elapses + go func(ctx context.Context, mc *Client) { + c, _ := mc.conn(ctx) + successChan <- c + }(ctx, mc) + + select { + case <-time.After(5 * time.Second): + t.Error("get conn won't stuck") + case <-successChan: + } + +} + func BenchmarkSetAndGet(b *testing.B) { mc := newSimplePrefixClient(1, "") key := "google" @@ -902,9 +940,9 @@ func BenchmarkSetAndGet(b *testing.B) { Flags: 0, Expiration: 0, } - mc.Set(&item) + mc.Set(context.Background(), &item) for i := 0; i < b.N; i++ { - mc.Get(key) + mc.Get(context.Background(), key) } } @@ -917,13 +955,13 @@ func TestFlushAll(t *testing.T) { func testFlushAll(mc *Client, t *testing.T) { // Check if flush_all is disabled by default - flushedHosts, err := mc.FlushAll() + flushedHosts, err := mc.FlushAll(context.Background()) if !(err != nil && len(flushedHosts) == 0) { t.Error(err) } mc.ToggleFlushAllFeature(true) // Make sure we can flush directly - flushedHosts, err = mc.FlushAll() + flushedHosts, err = mc.FlushAll(context.Background()) if err != nil || len(flushedHosts) != len(mc.servers) { t.Error(err) } @@ -944,24 +982,104 @@ func testFlushAll(mc *Client, t *testing.T) { } keys[i] = key } - mc.SetMulti(items) + mc.SetMulti(context.Background(), items) // Make sure we have some data in the cache cluster - itemsGot, err := mc.GetMulti(keys) + itemsGot, err := mc.GetMulti(context.Background(), keys) if err != nil || len(itemsGot) != nItems { t.Error(err) } // Flush the cache cluster - flushedHosts, err = mc.FlushAll() + flushedHosts, err = mc.FlushAll(context.Background()) if err != nil || len(flushedHosts) != len(mc.servers) { t.Error(err) } // Check if the cache cluster is flushed - itemsGot, err = mc.GetMulti(keys) + itemsGot, err = mc.GetMulti(context.Background(), keys) if err != ErrCacheMiss || len(itemsGot) != 0 { t.Error(err) } } + +func getSlowMc() *Client { + servers := make([]string, 1) + servers[0] = fmt.Sprintf("localhost:%d", 8965) + noreply := false + hashFunc := HashCRC32 + failover := false + disableLock := false + + mc := New(servers, noreply, "", hashFunc, failover, disableLock) + + // slow mc response in 500ms, larger than default poll (300ms) + mc.ConfigTimeout(PollTimeout, 800*time.Millisecond) + return mc + +} + +func TestSlowMemcacheStuck(t *testing.T) { + mc := getSlowMc() + + key := "key" + mc.Set(context.Background(), &Item{Key: key, Value: []byte("99")}) + + var wg sync.WaitGroup + for i := 1; i < 10; i++ { + wg.Add(1) + go func(mc *Client, key string, counter int, wg *sync.WaitGroup) { + startTime := time.Now() + _, err := mc.Get(context.Background(), key) + elapsedTime := time.Since(startTime) // Calculate the elapsed time + if err != nil { + t.Errorf("%d mc get error: %s\n", counter, err) + } else if counter > 1 { + if elapsedTime < 800*time.Millisecond { + t.Errorf("%d runs fast: %s\n", counter, elapsedTime) + } else { + fmt.Printf("%d get stuck, runs in: %s\n", counter, elapsedTime) + } + } + wg.Done() + }(mc, key, i, &wg) + // give a go for prev goroutine + time.Sleep(10 * time.Millisecond) + } + wg.Wait() +} + +func TestSlowMemcache(t *testing.T) { + mc := getSlowMc() + + key := "key" + mc.Set(context.Background(), &Item{Key: key, Value: []byte("99")}) + + var wg sync.WaitGroup + for i := 1; i < 10; i++ { + wg.Add(1) + go func(mc *Client, key string, counter int, wg *sync.WaitGroup) { + startTime := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + _, err := mc.Get(ctx, key) + elapsedTime := time.Since(startTime) // Calculate the elapsed time + if err == nil { + if counter != 1 { + t.Errorf("%d mc get value within time: %s\n", counter, elapsedTime) + } + } else { + if elapsedTime > 420*time.Millisecond { + t.Errorf("%d stuck fast: %s\n", counter, elapsedTime) + } else { + fmt.Printf("%d return fast in %s, got error: %s\n", counter, elapsedTime, err) + } + } + wg.Done() + }(mc, key, i, &wg) + // give a go for prev goroutine + time.Sleep(10 * time.Millisecond) + } + wg.Wait() +} diff --git a/src/version.go b/src/version.go index bcfe7ed6..d06a9b76 100644 --- a/src/version.go +++ b/src/version.go @@ -1,6 +1,6 @@ package golibmc -const _Version = "1.4.14" +const _Version = "1.4.15" const _Author = "mckelvin" const _Email = "mckelvin@users.noreply.github.com" const _Date = "Fri Jun 7 06:16:00 2024 +0800"