Skip to content

Commit 912a221

Browse files
committed
new writers that can use rueidis or go-redis
1 parent 43795d2 commit 912a221

File tree

6 files changed

+183
-45
lines changed

6 files changed

+183
-45
lines changed

cmd/writer/client.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
// Client defines the interface for Redis hash operations
9+
type Client interface {
10+
// HSetExpire atomically sets hash fields and sets a TTL on the key.
11+
// For KVRocks, this uses the HSETEXPIRE command.
12+
// For standard Redis, this uses HSet followed by Expire.
13+
HSetExpire(ctx context.Context, key string, cols []string, data map[string]string, ttl time.Duration) error
14+
15+
// Close closes the client connection
16+
Close()
17+
18+
Name() string
19+
}
20+

cmd/writer/main.go

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ import (
44
"context"
55
"flag"
66
"fmt"
7+
"net"
78
"os"
89
"os/signal"
910
"runtime"
10-
"strconv"
1111
"sync"
1212
"syscall"
1313
"time"
@@ -18,18 +18,13 @@ import (
1818
"go.uber.org/zap"
1919
)
2020

21-
var (
22-
hSetExpireSeconds = metrics.GetOrCreateHistogram(`kvrocks_command_seconds{command="hsetexpire"}`)
23-
hSetExpireErrorsTotal = metrics.GetOrCreateCounter(`kvrocks_command_errors_total{command="hsetexpire"}`)
24-
currentIndex = metrics.GetOrCreateCounter(`kvrocks_command_counter{command="hsetexpire"}`)
25-
)
26-
2721
func main() {
2822
ctx, cancel := context.WithCancel(context.Background())
2923
// Define command-line flags
3024
numWriters := flag.Int("writers", runtime.GOMAXPROCS(0), "number of writer goroutines")
3125
writeDelay := flag.Duration("delay", 0, "delay between writes (e.g., 1ms, 100us)")
3226
start := flag.Int("start", 0, "index to start at")
27+
clientType := flag.String("client", "rueidis", "client type: rueidis or redis")
3328

3429
flag.Parse()
3530

@@ -53,14 +48,14 @@ func main() {
5348
return
5449
}
5550

56-
logger.Get().Info("starting service", zap.Int("writers", numOfWriters), zap.Duration("delay", *writeDelay), zap.Int("start_index", *start))
51+
logger.Get().Info("starting service", zap.Int("writers", numOfWriters), zap.Duration("delay", *writeDelay), zap.Int("start_index", *start), zap.String("client", *clientType))
5752

5853
writers := make([]*Writer, numOfWriters)
5954
for i := 0; i < numOfWriters; i++ {
6055
logger.Get().Info("creating writers", zap.Int("num", i))
61-
writer, err := NewWriter(connWriteTimeout)
56+
writer, err := NewWriter(connWriteTimeout, *clientType)
6257
if err != nil {
63-
logger.Get().Error("unable to get rueidis client", zap.Error(err))
58+
logger.Get().Error("unable to get client", zap.Error(err), zap.String("client_type", *clientType))
6459
return
6560
}
6661
writers[i] = writer
@@ -89,8 +84,8 @@ func main() {
8984
connWriteTimeoutMs := int64(connWriteTimeout / time.Millisecond)
9085
hSetExpireTimeoutMs := int64(hSetExpireTimeout / time.Millisecond)
9186
initCounter := metrics.GetOrCreateCounter(fmt.Sprintf(
92-
`kvrocks_writer_initialized_total{writers="%d",delay_ms="%d",payload_size_bytes="%d",total_size_per_key_bytes="%d",conn_write_timeout_ms="%d",hsetexpire_timeout_ms="%d", start_index="%d"}`,
93-
numOfWriters, delayMs, payloadSize, totalSizePerKey, connWriteTimeoutMs, hSetExpireTimeoutMs, *start,
87+
`kvrocks_writer_initialized_total{writers="%d",delay_ms="%d",payload_size_bytes="%d",total_size_per_key_bytes="%d",conn_write_timeout_ms="%d",hsetexpire_timeout_ms="%d",start_index="%d",client_type="%s"}`,
88+
numOfWriters, delayMs, payloadSize, totalSizePerKey, connWriteTimeoutMs, hSetExpireTimeoutMs, *start, *clientType,
9489
))
9590
initCounter.Inc()
9691

@@ -129,26 +124,46 @@ func main() {
129124
}
130125

131126
type Writer struct {
132-
client rueidis.Client
127+
client Client
133128
}
134129

135-
func NewWriter(connWriteTimeout time.Duration) (*Writer, error) {
136-
client, err := rueidis.NewClient(
137-
rueidis.ClientOption{
138-
InitAddress: []string{"kvrocks-byron-test.us-east-1.stackadapt:6379"},
139-
ShuffleInit: true,
140-
ConnWriteTimeout: connWriteTimeout,
141-
DisableCache: true, // client cache is not enabled on kvrocks
142-
PipelineMultiplex: 5,
143-
MaxFlushDelay: 50 * time.Microsecond,
144-
AlwaysPipelining: true,
145-
DisableTCPNoDelay: true,
146-
DisableRetry: true,
147-
},
148-
)
130+
func NewWriter(connWriteTimeout time.Duration, clientType string) (*Writer, error) {
131+
var client Client
132+
133+
switch clientType {
134+
case "redis":
135+
client = NewRedisClient(
136+
[]string{"kvrocks-byron-test.us-east-1.stackadapt:6379"},
137+
connWriteTimeout,
138+
connWriteTimeout,
139+
connWriteTimeout,
140+
)
141+
case "rueidis":
142+
fallthrough
143+
default:
144+
rueidisClient, err := rueidis.NewClient(
145+
rueidis.ClientOption{
146+
InitAddress: []string{"kvrocks-byron-test.us-east-1.stackadapt:6379"},
147+
ShuffleInit: true,
148+
ConnWriteTimeout: connWriteTimeout,
149+
Dialer: net.Dialer{KeepAlive: time.Second * 60}, // To decrease the pings
150+
DisableCache: true, // client cache is not enabled on kvrocks
151+
PipelineMultiplex: 5,
152+
MaxFlushDelay: 50 * time.Microsecond,
153+
AlwaysPipelining: true,
154+
DisableTCPNoDelay: true,
155+
DisableRetry: true,
156+
},
157+
)
158+
if err != nil {
159+
return nil, err
160+
}
161+
client = NewRueidisClientFromClient(rueidisClient)
162+
}
163+
149164
return &Writer{
150165
client: client,
151-
}, err
166+
}, nil
152167
}
153168

154169
// intToAlphabetKey converts an integer to an alphabet-only string using bijective base-26 encoding
@@ -220,7 +235,7 @@ func (w *Writer) Start(ctx context.Context, wg *sync.WaitGroup, data map[string]
220235
func hSetExpire(
221236
ctx context.Context,
222237
timeout time.Duration,
223-
client rueidis.Client,
238+
client Client,
224239
key string,
225240
cols []string,
226241
data map[string][]byte,
@@ -229,26 +244,18 @@ func hSetExpire(
229244
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
230245
defer cancel()
231246
start := time.Now()
247+
hSetExpireSeconds := metrics.GetOrCreateHistogram(fmt.Sprintf(`kvrocks_command_seconds{command="hsetexpire",conn_client="%s"}`, client.Name()))
232248
defer hSetExpireSeconds.UpdateDuration(start)
233249

234-
convertedSlice := make([]string, 0, len(cols)*2)
235-
for _, col := range cols {
236-
if _, ok := data[col]; !ok {
237-
return fmt.Errorf("field %s not found in data", col)
238-
}
239-
convertedSlice = append(convertedSlice, col, string(data[col]))
250+
// Convert map[string][]byte to map[string]string
251+
stringData := make(map[string]string, len(data))
252+
for col, val := range data {
253+
stringData[col] = string(val)
240254
}
241255

242-
cmd := client.B().
243-
Arbitrary("HSETEXPIRE").
244-
Keys(key).
245-
Args(strconv.Itoa(int(ttl.Seconds()))).
246-
Args(convertedSlice...).
247-
Build()
248-
resp := client.Do(timeoutCtx, cmd)
249-
err := resp.Error()
256+
err := client.HSetExpire(timeoutCtx, key, cols, stringData, ttl)
250257
if err != nil {
251-
hSetExpireErrorsTotal.Inc()
258+
metrics.GetOrCreateCounter(fmt.Sprintf(`kvrocks_command_errors_total{command="hsetexpire",conn_client="%s"}`, client.Name())).Inc()
252259
}
253260
return err
254261
}

cmd/writer/redis.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/go-redis/redis/v8"
9+
)
10+
11+
type RedisClient struct {
12+
clusterClient *redis.ClusterClient
13+
}
14+
15+
func NewRedisClient(addrs []string, writeTimeout, readTimeout, dialTimeout time.Duration) *RedisClient {
16+
return &RedisClient{
17+
clusterClient: redis.NewClusterClient(&redis.ClusterOptions{
18+
Addrs: addrs,
19+
WriteTimeout: writeTimeout,
20+
ReadTimeout: readTimeout,
21+
DialTimeout: dialTimeout,
22+
}),
23+
}
24+
}
25+
26+
func (r *RedisClient) Name() string {
27+
return "redis"
28+
}
29+
30+
func (r *RedisClient) HSetExpire(ctx context.Context, key string, cols []string, data map[string]string, ttl time.Duration) error {
31+
// Convert cols and data to the format expected by HSet
32+
fields := make([]interface{}, 0, len(cols)*2)
33+
for _, col := range cols {
34+
val, ok := data[col]
35+
if !ok {
36+
return fmt.Errorf("field %s not found in data", col)
37+
}
38+
fields = append(fields, col, val)
39+
}
40+
41+
// Use pipeline to atomically execute HSet and Expire
42+
pipe := r.clusterClient.Pipeline()
43+
pipe.HSet(ctx, key, fields...)
44+
pipe.Expire(ctx, key, ttl)
45+
_, err := pipe.Exec(ctx)
46+
return err
47+
}
48+
49+
func (r *RedisClient) Close() {
50+
r.clusterClient.Close()
51+
}
52+

cmd/writer/rueidis.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
"time"
8+
9+
"github.com/redis/rueidis"
10+
)
11+
12+
type RueidisClient struct {
13+
client rueidis.Client
14+
}
15+
16+
func (r *RueidisClient) Name() string {
17+
return "rueidis"
18+
}
19+
20+
func NewRueidisClient(options rueidis.ClientOption) (*RueidisClient, error) {
21+
client, err := rueidis.NewClient(options)
22+
if err != nil {
23+
return nil, err
24+
}
25+
return &RueidisClient{
26+
client: client,
27+
}, nil
28+
}
29+
30+
func NewRueidisClientFromClient(client rueidis.Client) *RueidisClient {
31+
return &RueidisClient{
32+
client: client,
33+
}
34+
}
35+
36+
func (r *RueidisClient) HSetExpire(ctx context.Context, key string, cols []string, data map[string]string, ttl time.Duration) error {
37+
convertedSlice := make([]string, 0, len(cols)*2)
38+
for _, col := range cols {
39+
val, ok := data[col]
40+
if !ok {
41+
return fmt.Errorf("field %s not found in data", col)
42+
}
43+
44+
convertedSlice = append(convertedSlice, col, val)
45+
}
46+
cmd := r.client.B().
47+
Arbitrary("HSETEXPIRE").
48+
Keys(key).
49+
Args(strconv.Itoa(int(ttl.Seconds()))).
50+
Args(convertedSlice...).
51+
Build()
52+
resp := r.client.Do(ctx, cmd)
53+
return resp.Error()
54+
}
55+
56+
func (r *RueidisClient) Close() {
57+
r.client.Close()
58+
}
59+

cmd/writer/writer

626 KB
Binary file not shown.

cmd/writer/writer.service

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ After=network-online.target syslog.target remote-fs.target nss-lookup.target sys
44
Wants=network-online.target
55

66
[Service]
7-
ExecStart=/home/ec2-user/writer -writers 4 -delay 8ms -start 900000000
7+
ExecStart=/home/ec2-user/writer -writers 2 -delay 4ms -start 1000000000
88

99
LimitNOFILE=1048576
1010
LimitSTACK=16777216

0 commit comments

Comments
 (0)