Skip to content

Commit db98659

Browse files
committed
uses cluster client
1 parent dae1509 commit db98659

File tree

6 files changed

+221
-32
lines changed

6 files changed

+221
-32
lines changed

cmd/reader/client.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
// Client defines the interface for Redis hash operations
9+
type Client interface {
10+
// Hmget retrieves the values of specified fields in a hash.
11+
// Returns []interface{} where each element is either a string or nil,
12+
// allowing distinction between empty strings and missing fields.
13+
Hmget(ctx context.Context, key string, fields ...string) ([]interface{}, error)
14+
15+
// HGetAll retrieves all fields and values of a hash.
16+
// Returns a map of field names to their values as strings.
17+
// Returns an empty map if the key does not exist.
18+
HGetAll(ctx context.Context, key string) (map[string]string, error)
19+
20+
// HSetExpire atomically sets hash fields and sets a TTL on the key.
21+
// For KVRocks, this uses the HSETEXPIRE command.
22+
// For standard Redis, this uses HSet followed by Expire.
23+
HSetExpire(ctx context.Context, key string, cols []string, data map[string]string, ttl time.Duration) error
24+
25+
// Close closes the client connection
26+
Close()
27+
}
28+

cmd/reader/main.go

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ func intToAlphabetKey(n int64) string {
4242
var (
4343
hGetAllSeconds = metrics.GetOrCreateHistogram(`kvrocks_command_seconds{command="hgetall"}`)
4444
hGetAllErrorsTotal = metrics.GetOrCreateCounter(`kvrocks_command_errors_total{command="hgetall"}`)
45-
currentIndex = metrics.GetOrCreateCounter(`kvrocks_command_counter{command="hgetall"}`)
4645
)
4746

4847
func main() {
@@ -52,13 +51,14 @@ func main() {
5251
ctx, cancel := context.WithCancel(context.Background())
5352
var wg sync.WaitGroup
5453
var clientsMu sync.Mutex
55-
var clients []rueidis.Client
54+
var clients []Client
5655

5756
numReaders := flag.Int("readers", runtime.GOMAXPROCS(0), "number of writer goroutines")
5857
readDelay := flag.Duration("delay", 0, "delay between writes (e.g., 1ms, 100us)")
5958
start := flag.Int("start", 0, "index to start at")
59+
clientType := flag.String("client", "rueidis", "client type: rueidis or redis")
6060
flag.Parse()
61-
logger.Info("starting service", zap.Int("readers", *numReaders), zap.Duration("delay", *readDelay), zap.Int("start", *start))
61+
logger.Info("starting service", zap.Int("readers", *numReaders), zap.Duration("delay", *readDelay), zap.Int("start", *start), zap.String("client", *clientType))
6262
// goal is to spam reading and client connections
6363

6464
// Timeout configuration
@@ -81,36 +81,52 @@ func main() {
8181
connWriteTimeoutMs := int64(connWriteTimeout / time.Millisecond)
8282
hGetAllTimeoutMs := int64(kvRocksLiteReadTimeout / time.Millisecond)
8383
initCounter := metrics.GetOrCreateCounter(fmt.Sprintf(
84-
`kvrocks_reader_initialized_total{readers="%d",delay_ms="%d",start_index="%d",conn_write_timeout_ms="%d",hgetall_timeout_ms="%d"}`,
85-
*numReaders, delayMs, *start, connWriteTimeoutMs, hGetAllTimeoutMs,
84+
`kvrocks_reader_initialized_total{readers="%d",delay_ms="%d",start_index="%d",conn_write_timeout_ms="%d",hgetall_timeout_ms="%d",client_type="%s"}`,
85+
*numReaders, delayMs, *start, connWriteTimeoutMs, hGetAllTimeoutMs, *clientType,
8686
))
8787
initCounter.Inc()
8888

8989
for i := 0; i < *numReaders; i++ {
9090
wg.Add(1)
91-
go func(id int, sleep time.Duration, startIndex int, connTimeout time.Duration, readTimeout time.Duration) {
91+
go func(id int, sleep time.Duration, startIndex int, connTimeout time.Duration, readTimeout time.Duration, clientTypeStr string) {
9292
defer wg.Done()
93-
client, err := rueidis.NewClient(
94-
rueidis.ClientOption{
95-
InitAddress: []string{"kvrocks-byron-test.us-east-1.stackadapt:6379"},
96-
ConnWriteTimeout: connTimeout, // explicitly set to the rueidis default; otherwise, it would be computed from Dialer.KeepAlive - e.g 60s * 10
97-
ShuffleInit: true,
98-
Dialer: net.Dialer{KeepAlive: time.Second * 60}, // To decrease the pings
99-
DisableCache: true, // client cache is not enabled on kvrocks
100-
PipelineMultiplex: 5,
101-
MaxFlushDelay: 20 * time.Microsecond,
102-
AlwaysPipelining: true,
103-
DisableRetry: true,
104-
// ClusterOption: rueidis.ClusterOption{
105-
// AvoidRefreshOnRedirectMove: true,
106-
// },
107-
// QueueType: rueidis.QueueTypeFlowBuffer,
108-
},
109-
)
110-
if err != nil {
111-
logger.Error("unable to get rueidis client", zap.Error(err), zap.Int("id", id))
112-
return
93+
var client Client
94+
95+
switch clientTypeStr {
96+
case "redis":
97+
client = NewRedisClient(
98+
[]string{"kvrocks-byron-test.us-east-1.stackadapt:6379"},
99+
connTimeout,
100+
readTimeout,
101+
connTimeout,
102+
)
103+
case "rueidis":
104+
fallthrough
105+
default:
106+
rueidisClient, err := rueidis.NewClient(
107+
rueidis.ClientOption{
108+
InitAddress: []string{"kvrocks-byron-test.us-east-1.stackadapt:6379"},
109+
ConnWriteTimeout: connTimeout, // explicitly set to the rueidis default; otherwise, it would be computed from Dialer.KeepAlive - e.g 60s * 10
110+
ShuffleInit: true,
111+
Dialer: net.Dialer{KeepAlive: time.Second * 60}, // To decrease the pings
112+
DisableCache: true, // client cache is not enabled on kvrocks
113+
PipelineMultiplex: 5,
114+
MaxFlushDelay: 20 * time.Microsecond,
115+
AlwaysPipelining: true,
116+
DisableRetry: true,
117+
// ClusterOption: rueidis.ClusterOption{
118+
// AvoidRefreshOnRedirectMove: true,
119+
// },
120+
// QueueType: rueidis.QueueTypeFlowBuffer,
121+
},
122+
)
123+
if err != nil {
124+
logger.Error("unable to get rueidis client", zap.Error(err), zap.Int("id", id))
125+
return
126+
}
127+
client = NewRueidisClientFromClient(rueidisClient)
113128
}
129+
114130
clientsMu.Lock()
115131
clients = append(clients, client)
116132
clientsMu.Unlock()
@@ -138,7 +154,7 @@ func main() {
138154
logger.Info("reading", zap.Int("keyIndex", i), zap.String("key", alphabetKey))
139155
}
140156
}
141-
}(i, *readDelay, *start, connWriteTimeout, kvRocksLiteReadTimeout)
157+
}(i, *readDelay, *start, connWriteTimeout, kvRocksLiteReadTimeout, *clientType)
142158
}
143159

144160
logger.Info("service running, waiting for shutdown signal")
@@ -178,17 +194,15 @@ func main() {
178194
func hGetAll(
179195
ctx context.Context,
180196
timeout time.Duration,
181-
client rueidis.Client,
197+
client Client,
182198
key string,
183199
) (map[string]string, error) {
184200
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
185201
defer cancel()
186202
start := time.Now()
187203
defer hGetAllSeconds.UpdateDuration(start)
188204

189-
cmd := client.B().Hgetall().Key(key).Build()
190-
resp := client.Do(timeoutCtx, cmd)
191-
data, err := resp.AsStrMap()
205+
data, err := client.HGetAll(timeoutCtx, key)
192206
if err != nil {
193207
hGetAllErrorsTotal.Inc()
194208
return nil, err

cmd/reader/reader

524 KB
Binary file not shown.

cmd/reader/reader.service

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ After=network-online.target syslog.target remote-fs.target nss-lookup.target sys
44
Wants=network-online.target
55

66
[Service]
7-
ExecStart=/home/ec2-user/reader -readers 2 -delay 4ms -start 800000000
7+
ExecStart=/home/ec2-user/reader -readers 2 -delay 4ms -start 800000000 -client redis
88

99
LimitNOFILE=1048576
1010
LimitSTACK=16777216

cmd/reader/redis.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/go-redis/redis/v8"
9+
)
10+
11+
type RedisClient struct {
12+
clusterClient *redis.ClusterClient
13+
}
14+
15+
func NewRedisClient(addrs []string, writeTimeout, readTimeout, dialTimeout time.Duration) *RedisClient {
16+
return &RedisClient{
17+
clusterClient: redis.NewClusterClient(&redis.ClusterOptions{
18+
Addrs: addrs,
19+
WriteTimeout: writeTimeout,
20+
ReadTimeout: readTimeout,
21+
DialTimeout: dialTimeout,
22+
}),
23+
}
24+
}
25+
26+
func (r *RedisClient) Hmget(ctx context.Context, key string, fields ...string) ([]interface{}, error) {
27+
result, err := r.clusterClient.HMGet(ctx, key, fields...).Result()
28+
return result, err
29+
}
30+
31+
func (r *RedisClient) HGetAll(ctx context.Context, key string) (map[string]string, error) {
32+
result, err := r.clusterClient.HGetAll(ctx, key).Result()
33+
return result, err
34+
}
35+
36+
func (r *RedisClient) HSetExpire(ctx context.Context, key string, cols []string, data map[string]string, ttl time.Duration) error {
37+
// Convert cols and data to the format expected by HSet
38+
fields := make([]interface{}, 0, len(cols)*2)
39+
for _, col := range cols {
40+
val, ok := data[col]
41+
if !ok {
42+
return fmt.Errorf("field %s not found in data", col)
43+
}
44+
fields = append(fields, col, val)
45+
}
46+
47+
// Use pipeline to atomically execute HSet and Expire
48+
pipe := r.clusterClient.Pipeline()
49+
pipe.HSet(ctx, key, fields...)
50+
pipe.Expire(ctx, key, ttl)
51+
_, err := pipe.Exec(ctx)
52+
return err
53+
}
54+
55+
func (r *RedisClient) Close() {
56+
r.clusterClient.Close()
57+
}

cmd/reader/rueidis.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
"time"
8+
9+
"github.com/redis/rueidis"
10+
)
11+
12+
type RueidisClient struct {
13+
client rueidis.Client
14+
}
15+
16+
func NewRueidisClient(options rueidis.ClientOption) (*RueidisClient, error) {
17+
client, err := rueidis.NewClient(options)
18+
if err != nil {
19+
return nil, err
20+
}
21+
return &RueidisClient{
22+
client: client,
23+
}, nil
24+
}
25+
26+
func NewRueidisClientFromClient(client rueidis.Client) *RueidisClient {
27+
return &RueidisClient{
28+
client: client,
29+
}
30+
}
31+
32+
func (r *RueidisClient) Hmget(ctx context.Context, key string, fields ...string) ([]interface{}, error) {
33+
cmd := r.client.B().Hmget().Key(key).Field(fields...).Build()
34+
resp := r.client.Do(ctx, cmd)
35+
msgs, err := resp.ToArray()
36+
if err != nil {
37+
return nil, err
38+
}
39+
40+
result := make([]interface{}, len(msgs))
41+
for i, msg := range msgs {
42+
if msg.IsNil() {
43+
result[i] = nil
44+
} else {
45+
s, err := msg.ToString()
46+
if err != nil {
47+
return nil, err
48+
}
49+
result[i] = s
50+
}
51+
}
52+
53+
return result, nil
54+
}
55+
56+
func (r *RueidisClient) HGetAll(ctx context.Context, key string) (map[string]string, error) {
57+
cmd := r.client.B().Hgetall().Key(key).Build()
58+
resp := r.client.Do(ctx, cmd)
59+
data, err := resp.AsStrMap()
60+
if err != nil {
61+
return nil, err
62+
}
63+
64+
return data, nil
65+
}
66+
67+
func (r *RueidisClient) HSetExpire(ctx context.Context, key string, cols []string, data map[string]string, ttl time.Duration) error {
68+
convertedSlice := make([]string, 0, len(cols)*2)
69+
for _, col := range cols {
70+
val, ok := data[col]
71+
if !ok {
72+
return fmt.Errorf("field %s not found in data", col)
73+
}
74+
75+
convertedSlice = append(convertedSlice, col, val)
76+
}
77+
cmd := r.client.B().
78+
Arbitrary("HSETEXPIRE").
79+
Keys(key).
80+
Args(strconv.Itoa(int(ttl.Seconds()))).
81+
Args(convertedSlice...).
82+
Build()
83+
resp := r.client.Do(ctx, cmd)
84+
return resp.Error()
85+
}
86+
87+
func (r *RueidisClient) Close() {
88+
r.client.Close()
89+
}
90+

0 commit comments

Comments
 (0)