From a9e69b6b829d1acdda91d055cd7f70b6b709a59d Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Thu, 18 Sep 2025 13:17:10 +0100 Subject: [PATCH] perf(cli): add pipelining and optimize RNG/string generation --- cli.go | 7 +- common.go | 81 +++++++++++++++++--- go.mod | 2 +- redis-bechmark-go.go | 157 +++++++++++++++++++++++++++++++++----- redis-bechmark-go_test.go | 8 +- 5 files changed, 219 insertions(+), 36 deletions(-) diff --git a/cli.go b/cli.go index 2e24015..5f87d12 100644 --- a/cli.go +++ b/cli.go @@ -1,11 +1,12 @@ package main import ( - shellwords "github.com/mattn/go-shellwords" "log" "math" "math/rand" "strconv" + + shellwords "github.com/mattn/go-shellwords" ) type arrayStringParameters []string @@ -19,8 +20,8 @@ func (i *arrayStringParameters) Set(value string) error { return nil } -func sample(cdf []float32) int { - r := rand.Float32() +func sample(cdf []float32, rng *rand.Rand) int { + r := rng.Float32() bucket := 0 for r > cdf[bucket] { bucket++ diff --git a/common.go b/common.go index a28e987..ae67280 100644 --- a/common.go +++ b/common.go @@ -3,13 +3,14 @@ package main import ( "context" "fmt" - "github.com/HdrHistogram/hdrhistogram-go" - radix "github.com/mediocregopher/radix/v4" - "golang.org/x/time/rate" "math" "math/rand" "strings" "sync" + + "github.com/HdrHistogram/hdrhistogram-go" + radix "github.com/mediocregopher/radix/v4" + "golang.org/x/time/rate" ) var totalCommands uint64 @@ -32,11 +33,68 @@ type datapoint struct { cachedEntry bool } -func stringWithCharset(length int, charset string) string { +func stringWithCharset(length int, charset string, rng *rand.Rand, dataCache map[int]string) string { + if length <= 0 { + return "" + } + + // For common sizes and default charset, use per-goroutine cache + if charset == "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" && length <= 1024 { + if cached, exists := dataCache[length]; exists { + return cached + } + + // Generate and cache + b := make([]byte, length) + for i := range b { + b[i] = charset[rng.Intn(len(charset))] + } + result := string(b) + dataCache[length] = result + return result + } + + // For small lengths, use the original method + if length <= 64 { + b := make([]byte, length) + for i := range b { + b[i] = charset[rng.Intn(len(charset))] + } + return string(b) + } + + // For larger lengths, use a more efficient approach + // Generate chunks of random data and repeat pattern + chunkSize := 64 + if length < 256 { + chunkSize = 32 + } + b := make([]byte, length) - for i := range b { - b[i] = charset[rand.Intn(len(charset))] + charsetLen := len(charset) + + // Generate initial random chunk + for i := 0; i < chunkSize && i < length; i++ { + b[i] = charset[rng.Intn(charsetLen)] + } + + // For remaining bytes, use pattern repetition with some randomness + for i := chunkSize; i < length; i += chunkSize { + end := i + chunkSize + if end > length { + end = length + } + + // Copy previous chunk with slight variation + for j := i; j < end; j++ { + if rng.Intn(8) == 0 { // 12.5% chance to randomize + b[j] = charset[rng.Intn(charsetLen)] + } else { + b[j] = b[j-chunkSize] + } + } } + return string(b) } @@ -50,16 +108,21 @@ type Client interface { Close() error } -func keyBuildLogic(keyPos int, dataPos int, datasize, keyspacelen uint64, cmdS []string, charset string) (newCmdS []string, key string) { +func keyBuildLogic(keyPos int, dataPos int, datasize, keyspacelen uint64, cmdS []string, charset string, rng *rand.Rand, dataCache map[int]string) (newCmdS []string, key string) { newCmdS = make([]string, len(cmdS)) copy(newCmdS, cmdS) if keyPos > -1 { - keyV := fmt.Sprintf("%d", rand.Int63n(int64(keyspacelen))) + var keyV string + if keyspacelen == 1 { + keyV = "0" // Optimize for single key case + } else { + keyV = fmt.Sprintf("%d", rng.Int63n(int64(keyspacelen))) + } key = strings.Replace(newCmdS[keyPos], "__key__", keyV, -1) newCmdS[keyPos] = key } if dataPos > -1 { - newCmdS[dataPos] = stringWithCharset(int(datasize), charset) + newCmdS[dataPos] = stringWithCharset(int(datasize), charset, rng, dataCache) } return newCmdS, key } diff --git a/go.mod b/go.mod index d982879..119f29f 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/filipecosta90/redis-benchmark-go -go 1.20 +go 1.21 require ( github.com/HdrHistogram/hdrhistogram-go v1.1.0 diff --git a/redis-bechmark-go.go b/redis-bechmark-go.go index b3add50..70775ba 100644 --- a/redis-bechmark-go.go +++ b/redis-bechmark-go.go @@ -20,25 +20,73 @@ import ( "golang.org/x/time/rate" ) -func benchmarkRoutine(radixClient Client, ruedisClient rueidis.Client, useRuedis, useCSC, enableMultiExec bool, datapointsChan chan datapoint, continueOnError bool, cmdS [][]string, commandsCDF []float32, keyspacelen, datasize, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, keyplace, dataplace []int, readOnly []bool, useLimiter bool, rateLimiter *rate.Limiter, waitReplicas, waitReplicasMs int, cacheOptions *rueidis.CacheOptions) { +func benchmarkRoutine(radixClient Client, ruedisClient rueidis.Client, useRuedis, useCSC, enableMultiExec bool, datapointsChan chan datapoint, continueOnError bool, cmdS [][]string, commandsCDF []float32, keyspacelen, datasize, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, keyplace, dataplace []int, readOnly []bool, useLimiter bool, rateLimiter *rate.Limiter, waitReplicas, waitReplicasMs, pipelineSize int, cacheOptions *rueidis.CacheOptions) { defer wg.Done() - for i := 0; uint64(i) < number_samples || loop; i++ { - cmdPos := sample(commandsCDF) - kplace := keyplace[cmdPos] - dplace := dataplace[cmdPos] - isReadOnly := readOnly[cmdPos] - cmds := cmdS[cmdPos] - newCmdS, key := keyBuildLogic(kplace, dplace, datasize, keyspacelen, cmds, charset) - if useLimiter { - r := rateLimiter.ReserveN(time.Now(), int(1)) - time.Sleep(r.Delay()) + + // Create per-goroutine random number generator to avoid lock contention + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + + // Create per-goroutine data cache to avoid mutex contention + dataCache := make(map[int]string) + + // Pipeline support for radix client only + if !useRuedis && pipelineSize > 1 { + // Pipeline mode for radix client + pipelineCommands := make([][]string, 0, pipelineSize) + pipelineKeys := make([]string, 0, pipelineSize) + + for i := 0; uint64(i) < number_samples || loop; i++ { + cmdPos := sample(commandsCDF, rng) + kplace := keyplace[cmdPos] + dplace := dataplace[cmdPos] + cmds := cmdS[cmdPos] + newCmdS, key := keyBuildLogic(kplace, dplace, datasize, keyspacelen, cmds, charset, rng, dataCache) + + // Collect commands for pipeline + pipelineCommands = append(pipelineCommands, newCmdS) + pipelineKeys = append(pipelineKeys, key) + + // When we have enough commands or reached the end, send the pipeline + if len(pipelineCommands) == pipelineSize || (uint64(i+1) >= number_samples && !loop) { + if useLimiter { + r := rateLimiter.ReserveN(time.Now(), len(pipelineCommands)) + time.Sleep(r.Delay()) + } + sendCmdLogicRadixPipeline(radixClient, pipelineCommands, pipelineKeys, enableMultiExec, datapointsChan, continueOnError, debug_level, waitReplicas, waitReplicasMs) + + // Reset pipeline + pipelineCommands = pipelineCommands[:0] + pipelineKeys = pipelineKeys[:0] + } } - if useRuedis { - sendCmdLogicRuedis(ruedisClient, newCmdS, enableMultiExec, datapointsChan, continueOnError, debug_level, useCSC, isReadOnly, cacheOptions, waitReplicas, waitReplicasMs) - } else { - sendCmdLogicRadix(radixClient, newCmdS, enableMultiExec, key, datapointsChan, continueOnError, debug_level, waitReplicas, waitReplicasMs) + // Send any remaining commands in the pipeline + if len(pipelineCommands) > 0 { + if useLimiter { + r := rateLimiter.ReserveN(time.Now(), len(pipelineCommands)) + time.Sleep(r.Delay()) + } + sendCmdLogicRadixPipeline(radixClient, pipelineCommands, pipelineKeys, enableMultiExec, datapointsChan, continueOnError, debug_level, waitReplicas, waitReplicasMs) + } + } else { + // Original single command mode + for i := 0; uint64(i) < number_samples || loop; i++ { + cmdPos := sample(commandsCDF, rng) + kplace := keyplace[cmdPos] + dplace := dataplace[cmdPos] + isReadOnly := readOnly[cmdPos] + cmds := cmdS[cmdPos] + newCmdS, key := keyBuildLogic(kplace, dplace, datasize, keyspacelen, cmds, charset, rng, dataCache) + if useLimiter { + r := rateLimiter.ReserveN(time.Now(), int(1)) + time.Sleep(r.Delay()) + } + if useRuedis { + sendCmdLogicRuedis(ruedisClient, newCmdS, enableMultiExec, datapointsChan, continueOnError, debug_level, useCSC, isReadOnly, cacheOptions, waitReplicas, waitReplicasMs) + } else { + sendCmdLogicRadix(radixClient, newCmdS, enableMultiExec, key, datapointsChan, continueOnError, debug_level, waitReplicas, waitReplicasMs) + } } } } @@ -166,6 +214,61 @@ func sendCmdLogicRadix(conn Client, newCmdS []string, enableMultiExec bool, key datapointsChan <- datapoint{!(err != nil), duration.Microseconds(), cacheHit} } +func sendCmdLogicRadixPipeline(conn Client, cmdsList [][]string, keys []string, enableMultiExec bool, datapointsChan chan datapoint, continueOnError bool, debug_level int, waitReplicas, waitReplicasMs int) { + ctx := context.Background() + cacheHit := false + var err error + startT := time.Now() + + if enableMultiExec { + // For MULTI/EXEC, we need to handle each command individually + // This is not ideal for pipelining, but maintains compatibility + for i, newCmdS := range cmdsList { + key := keys[i] + sendCmdLogicRadix(conn, newCmdS, enableMultiExec, key, datapointsChan, continueOnError, debug_level, waitReplicas, waitReplicasMs) + } + return + } + + // Create pipeline + p := radix.NewPipeline() + + // Add all commands to pipeline + for _, newCmdS := range cmdsList { + cmd := radix.Cmd(nil, newCmdS[0], newCmdS[1:]...) + p.Append(cmd) + } + + // Add WAIT commands if needed + if waitReplicas > 0 { + for range cmdsList { + p.Append(radix.Cmd(nil, "WAIT", fmt.Sprintf("%d", waitReplicas), fmt.Sprintf("%d", waitReplicasMs))) + } + } + + // Execute pipeline + err = conn.Do(ctx, p) + endT := time.Now() + + if err != nil { + if continueOnError { + if debug_level > 0 { + log.Println(fmt.Sprintf("Received an error with the following pipeline commands: %v, error: %v", cmdsList, err)) + } + } else { + log.Fatalf("Received an error with the following pipeline commands: %v, error: %v", cmdsList, err) + } + } + + // Calculate duration and send datapoints for each command in the pipeline + duration := endT.Sub(startT) + //avgDurationPerCmd := duration.Microseconds() / int64(len(cmdsList)) + + for range cmdsList { + datapointsChan <- datapoint{!(err != nil), duration.Microseconds(), cacheHit} + } +} + func onInvalidations(messages []rueidis.RedisMessage) { if messages != nil { cscInvalidationMutex.Lock() @@ -292,6 +395,7 @@ func main() { continueonerror := flag.Bool("continue-on-error", false, "Output verbose info") resp := flag.String("resp", "", "redis command response protocol (2 - RESP 2, 3 - RESP 3). If empty will not enforce it.") nameserver := flag.String("nameserver", "", "the IP address of the DNS name server. The IP address can be an IPv4 or an IPv6 address. If empty will use the default host namserver.") + pipelineSize := flag.Int("P", 1, "Pipeline requests. Default 1 (no pipeline).") flag.Var(&benchmarkCommands, "cmd", "Specify a query to send in quotes. Each command that you specify is run with its ratio. For example:-cmd=\"SET __key__ __value__\" -cmd-ratio=1") flag.Var(&benchmarkCommandsRatios, "cmd-ratio", "The query ratio vs other queries used in the same benchmark. Each command that you specify is run with its ratio. For example: -cmd=\"SET __key__ __value__\" -cmd-ratio=0.8 -cmd=\"GET __key__\" -cmd-ratio=0.2") @@ -429,14 +533,19 @@ func main() { } fmt.Printf("Using random seed: %d\n", *seed) rand.Seed(*seed) + mainRng := rand.New(rand.NewSource(*seed)) var cluster *radix.Cluster var radixStandalone radix.Client var ruedisClient rueidis.Client var err error = nil datapointsChan := make(chan datapoint, *numberRequests) + + // For radix client with pipelining, create shared connection pools + var sharedRadixPools = make(map[string]radix.Client) + for clientId := 1; uint64(clientId) <= *clients; clientId++ { wg.Add(1) - connectionStr := fmt.Sprintf("%s:%d", ips[rand.Int63n(int64(len(ips)))], *port) + connectionStr := fmt.Sprintf("%s:%d", ips[mainRng.Int63n(int64(len(ips)))], *port) if *verbose { fmt.Printf("Using connection string %s for client %d\n", connectionStr, clientId) } @@ -490,15 +599,21 @@ func main() { if err != nil { panic(err) } - go benchmarkRoutine(radixStandalone, ruedisClient, *useRuedis, *cscEnabled, *multi, datapointsChan, *continueonerror, cmds, cdf, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, cmdKeyplaceHolderPos, cmdDataplaceHolderPos, cmdReadOnly, useRateLimiter, rateLimiter, *waitReplicas, *waitReplicasMs, &cacheOptions) + go benchmarkRoutine(radixStandalone, ruedisClient, *useRuedis, *cscEnabled, *multi, datapointsChan, *continueonerror, cmds, cdf, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, cmdKeyplaceHolderPos, cmdDataplaceHolderPos, cmdReadOnly, useRateLimiter, rateLimiter, *waitReplicas, *waitReplicasMs, *pipelineSize, &cacheOptions) } else { - // legacy radix code + // legacy radix code with shared connection pools for better pipeline performance if *clusterMode { cluster = getOSSClusterConn(connectionStr, opts, 1) - go benchmarkRoutine(cluster, ruedisClient, *useRuedis, *cscEnabled, *multi, datapointsChan, *continueonerror, cmds, cdf, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, cmdKeyplaceHolderPos, cmdDataplaceHolderPos, cmdReadOnly, useRateLimiter, rateLimiter, *waitReplicas, *waitReplicasMs, nil) + go benchmarkRoutine(cluster, ruedisClient, *useRuedis, *cscEnabled, *multi, datapointsChan, *continueonerror, cmds, cdf, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, cmdKeyplaceHolderPos, cmdDataplaceHolderPos, cmdReadOnly, useRateLimiter, rateLimiter, *waitReplicas, *waitReplicasMs, *pipelineSize, nil) } else { - radixStandalone = getStandaloneConn(connectionStr, opts, 1) - go benchmarkRoutine(radixStandalone, ruedisClient, *useRuedis, *cscEnabled, *multi, datapointsChan, *continueonerror, cmds, cdf, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, cmdKeyplaceHolderPos, cmdDataplaceHolderPos, cmdReadOnly, useRateLimiter, rateLimiter, *waitReplicas, *waitReplicasMs, nil) + // Use shared connection pool for better pipeline performance + if sharedRadixPools[connectionStr] == nil { + // Calculate optimal pool size based on pipeline size and clients + poolSize := int(*clients) + sharedRadixPools[connectionStr] = getStandaloneConn(connectionStr, opts, uint64(poolSize)) + } + radixStandalone = sharedRadixPools[connectionStr] + go benchmarkRoutine(radixStandalone, ruedisClient, *useRuedis, *cscEnabled, *multi, datapointsChan, *continueonerror, cmds, cdf, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, cmdKeyplaceHolderPos, cmdDataplaceHolderPos, cmdReadOnly, useRateLimiter, rateLimiter, *waitReplicas, *waitReplicasMs, *pipelineSize, nil) } } diff --git a/redis-bechmark-go_test.go b/redis-bechmark-go_test.go index fadfa3e..e0c1275 100644 --- a/redis-bechmark-go_test.go +++ b/redis-bechmark-go_test.go @@ -3,13 +3,15 @@ package main import ( "bytes" "context" - "github.com/redis/rueidis" + "math/rand" "os" "os/exec" "reflect" "syscall" "testing" "time" + + "github.com/redis/rueidis" ) func getTestConnectionDetails() (string, string) { @@ -80,7 +82,9 @@ func Test_keyBuildLogic(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotNewCmdS, gotKey := keyBuildLogic(tt.args.keyPos, tt.args.dataPos, tt.args.datasize, tt.args.keyspacelen, tt.args.cmdS, tt.args.charset) + rng := rand.New(rand.NewSource(12345)) // Use fixed seed for deterministic tests + dataCache := make(map[int]string) // Per-test cache + gotNewCmdS, gotKey := keyBuildLogic(tt.args.keyPos, tt.args.dataPos, tt.args.datasize, tt.args.keyspacelen, tt.args.cmdS, tt.args.charset, rng, dataCache) if !reflect.DeepEqual(gotNewCmdS, tt.wantNewCmdS) { t.Errorf("keyBuildLogic() gotNewCmdS = %v, want %v", gotNewCmdS, tt.wantNewCmdS) }