Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cli.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package main

import (
shellwords "github.com/mattn/go-shellwords"
"log"
"math"
"math/rand"
"strconv"

shellwords "github.com/mattn/go-shellwords"
)

type arrayStringParameters []string
Expand All @@ -19,8 +20,8 @@ func (i *arrayStringParameters) Set(value string) error {
return nil
}

func sample(cdf []float32) int {
r := rand.Float32()
func sample(cdf []float32, rng *rand.Rand) int {
r := rng.Float32()
bucket := 0
for r > cdf[bucket] {
bucket++
Expand Down
81 changes: 72 additions & 9 deletions common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package main
import (
"context"
"fmt"
"github.com/HdrHistogram/hdrhistogram-go"
radix "github.com/mediocregopher/radix/v4"
"golang.org/x/time/rate"
"math"
"math/rand"
"strings"
"sync"

"github.com/HdrHistogram/hdrhistogram-go"
radix "github.com/mediocregopher/radix/v4"
"golang.org/x/time/rate"
)

var totalCommands uint64
Expand All @@ -32,11 +33,68 @@ type datapoint struct {
cachedEntry bool
}

func stringWithCharset(length int, charset string) string {
func stringWithCharset(length int, charset string, rng *rand.Rand, dataCache map[int]string) string {
if length <= 0 {
return ""
}

// For common sizes and default charset, use per-goroutine cache
if charset == "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" && length <= 1024 {
if cached, exists := dataCache[length]; exists {
return cached
}

// Generate and cache
b := make([]byte, length)
for i := range b {
b[i] = charset[rng.Intn(len(charset))]
}
result := string(b)
dataCache[length] = result
return result
}

// For small lengths, use the original method
if length <= 64 {
b := make([]byte, length)
for i := range b {
b[i] = charset[rng.Intn(len(charset))]
}
return string(b)
}

// For larger lengths, use a more efficient approach
// Generate chunks of random data and repeat pattern
chunkSize := 64
if length < 256 {
chunkSize = 32
}

b := make([]byte, length)
for i := range b {
b[i] = charset[rand.Intn(len(charset))]
charsetLen := len(charset)

// Generate initial random chunk
for i := 0; i < chunkSize && i < length; i++ {
b[i] = charset[rng.Intn(charsetLen)]
}

// For remaining bytes, use pattern repetition with some randomness
for i := chunkSize; i < length; i += chunkSize {
end := i + chunkSize
if end > length {
end = length
}

// Copy previous chunk with slight variation
for j := i; j < end; j++ {
if rng.Intn(8) == 0 { // 12.5% chance to randomize
b[j] = charset[rng.Intn(charsetLen)]
} else {
b[j] = b[j-chunkSize]
}
}
}

return string(b)
}

Expand All @@ -50,16 +108,21 @@ type Client interface {
Close() error
}

func keyBuildLogic(keyPos int, dataPos int, datasize, keyspacelen uint64, cmdS []string, charset string) (newCmdS []string, key string) {
func keyBuildLogic(keyPos int, dataPos int, datasize, keyspacelen uint64, cmdS []string, charset string, rng *rand.Rand, dataCache map[int]string) (newCmdS []string, key string) {
newCmdS = make([]string, len(cmdS))
copy(newCmdS, cmdS)
if keyPos > -1 {
keyV := fmt.Sprintf("%d", rand.Int63n(int64(keyspacelen)))
var keyV string
if keyspacelen == 1 {
keyV = "0" // Optimize for single key case
} else {
keyV = fmt.Sprintf("%d", rng.Int63n(int64(keyspacelen)))
}
key = strings.Replace(newCmdS[keyPos], "__key__", keyV, -1)
newCmdS[keyPos] = key
}
if dataPos > -1 {
newCmdS[dataPos] = stringWithCharset(int(datasize), charset)
newCmdS[dataPos] = stringWithCharset(int(datasize), charset, rng, dataCache)
}
return newCmdS, key
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/filipecosta90/redis-benchmark-go

go 1.20
go 1.21

require (
github.com/HdrHistogram/hdrhistogram-go v1.1.0
Expand Down
157 changes: 136 additions & 21 deletions redis-bechmark-go.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,73 @@ import (
"golang.org/x/time/rate"
)

func benchmarkRoutine(radixClient Client, ruedisClient rueidis.Client, useRuedis, useCSC, enableMultiExec bool, datapointsChan chan datapoint, continueOnError bool, cmdS [][]string, commandsCDF []float32, keyspacelen, datasize, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, keyplace, dataplace []int, readOnly []bool, useLimiter bool, rateLimiter *rate.Limiter, waitReplicas, waitReplicasMs int, cacheOptions *rueidis.CacheOptions) {
func benchmarkRoutine(radixClient Client, ruedisClient rueidis.Client, useRuedis, useCSC, enableMultiExec bool, datapointsChan chan datapoint, continueOnError bool, cmdS [][]string, commandsCDF []float32, keyspacelen, datasize, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, keyplace, dataplace []int, readOnly []bool, useLimiter bool, rateLimiter *rate.Limiter, waitReplicas, waitReplicasMs, pipelineSize int, cacheOptions *rueidis.CacheOptions) {

defer wg.Done()
for i := 0; uint64(i) < number_samples || loop; i++ {
cmdPos := sample(commandsCDF)
kplace := keyplace[cmdPos]
dplace := dataplace[cmdPos]
isReadOnly := readOnly[cmdPos]
cmds := cmdS[cmdPos]
newCmdS, key := keyBuildLogic(kplace, dplace, datasize, keyspacelen, cmds, charset)
if useLimiter {
r := rateLimiter.ReserveN(time.Now(), int(1))
time.Sleep(r.Delay())

// Create per-goroutine random number generator to avoid lock contention
rng := rand.New(rand.NewSource(time.Now().UnixNano()))

// Create per-goroutine data cache to avoid mutex contention
dataCache := make(map[int]string)

// Pipeline support for radix client only
if !useRuedis && pipelineSize > 1 {
// Pipeline mode for radix client
pipelineCommands := make([][]string, 0, pipelineSize)
pipelineKeys := make([]string, 0, pipelineSize)

for i := 0; uint64(i) < number_samples || loop; i++ {
cmdPos := sample(commandsCDF, rng)
kplace := keyplace[cmdPos]
dplace := dataplace[cmdPos]
cmds := cmdS[cmdPos]
newCmdS, key := keyBuildLogic(kplace, dplace, datasize, keyspacelen, cmds, charset, rng, dataCache)

// Collect commands for pipeline
pipelineCommands = append(pipelineCommands, newCmdS)
pipelineKeys = append(pipelineKeys, key)

// When we have enough commands or reached the end, send the pipeline
if len(pipelineCommands) == pipelineSize || (uint64(i+1) >= number_samples && !loop) {
if useLimiter {
r := rateLimiter.ReserveN(time.Now(), len(pipelineCommands))
time.Sleep(r.Delay())
}
sendCmdLogicRadixPipeline(radixClient, pipelineCommands, pipelineKeys, enableMultiExec, datapointsChan, continueOnError, debug_level, waitReplicas, waitReplicasMs)

// Reset pipeline
pipelineCommands = pipelineCommands[:0]
pipelineKeys = pipelineKeys[:0]
}
}
if useRuedis {
sendCmdLogicRuedis(ruedisClient, newCmdS, enableMultiExec, datapointsChan, continueOnError, debug_level, useCSC, isReadOnly, cacheOptions, waitReplicas, waitReplicasMs)
} else {
sendCmdLogicRadix(radixClient, newCmdS, enableMultiExec, key, datapointsChan, continueOnError, debug_level, waitReplicas, waitReplicasMs)

// Send any remaining commands in the pipeline
if len(pipelineCommands) > 0 {
if useLimiter {
r := rateLimiter.ReserveN(time.Now(), len(pipelineCommands))
time.Sleep(r.Delay())
}
sendCmdLogicRadixPipeline(radixClient, pipelineCommands, pipelineKeys, enableMultiExec, datapointsChan, continueOnError, debug_level, waitReplicas, waitReplicasMs)
}
} else {
// Original single command mode
for i := 0; uint64(i) < number_samples || loop; i++ {
cmdPos := sample(commandsCDF, rng)
kplace := keyplace[cmdPos]
dplace := dataplace[cmdPos]
isReadOnly := readOnly[cmdPos]
cmds := cmdS[cmdPos]
newCmdS, key := keyBuildLogic(kplace, dplace, datasize, keyspacelen, cmds, charset, rng, dataCache)
if useLimiter {
r := rateLimiter.ReserveN(time.Now(), int(1))
time.Sleep(r.Delay())
}
if useRuedis {
sendCmdLogicRuedis(ruedisClient, newCmdS, enableMultiExec, datapointsChan, continueOnError, debug_level, useCSC, isReadOnly, cacheOptions, waitReplicas, waitReplicasMs)
} else {
sendCmdLogicRadix(radixClient, newCmdS, enableMultiExec, key, datapointsChan, continueOnError, debug_level, waitReplicas, waitReplicasMs)
}
}
}
}
Expand Down Expand Up @@ -166,6 +214,61 @@ func sendCmdLogicRadix(conn Client, newCmdS []string, enableMultiExec bool, key
datapointsChan <- datapoint{!(err != nil), duration.Microseconds(), cacheHit}
}

func sendCmdLogicRadixPipeline(conn Client, cmdsList [][]string, keys []string, enableMultiExec bool, datapointsChan chan datapoint, continueOnError bool, debug_level int, waitReplicas, waitReplicasMs int) {
ctx := context.Background()
cacheHit := false
var err error
startT := time.Now()

if enableMultiExec {
// For MULTI/EXEC, we need to handle each command individually
// This is not ideal for pipelining, but maintains compatibility
for i, newCmdS := range cmdsList {
key := keys[i]
sendCmdLogicRadix(conn, newCmdS, enableMultiExec, key, datapointsChan, continueOnError, debug_level, waitReplicas, waitReplicasMs)
}
return
}

// Create pipeline
p := radix.NewPipeline()

// Add all commands to pipeline
for _, newCmdS := range cmdsList {
cmd := radix.Cmd(nil, newCmdS[0], newCmdS[1:]...)
p.Append(cmd)
}

// Add WAIT commands if needed
if waitReplicas > 0 {
for range cmdsList {
p.Append(radix.Cmd(nil, "WAIT", fmt.Sprintf("%d", waitReplicas), fmt.Sprintf("%d", waitReplicasMs)))
}
}

// Execute pipeline
err = conn.Do(ctx, p)
endT := time.Now()

if err != nil {
if continueOnError {
if debug_level > 0 {
log.Println(fmt.Sprintf("Received an error with the following pipeline commands: %v, error: %v", cmdsList, err))
}
} else {
log.Fatalf("Received an error with the following pipeline commands: %v, error: %v", cmdsList, err)
}
}

// Calculate duration and send datapoints for each command in the pipeline
duration := endT.Sub(startT)
//avgDurationPerCmd := duration.Microseconds() / int64(len(cmdsList))

for range cmdsList {
datapointsChan <- datapoint{!(err != nil), duration.Microseconds(), cacheHit}
}
}

func onInvalidations(messages []rueidis.RedisMessage) {
if messages != nil {
cscInvalidationMutex.Lock()
Expand Down Expand Up @@ -292,6 +395,7 @@ func main() {
continueonerror := flag.Bool("continue-on-error", false, "Output verbose info")
resp := flag.String("resp", "", "redis command response protocol (2 - RESP 2, 3 - RESP 3). If empty will not enforce it.")
nameserver := flag.String("nameserver", "", "the IP address of the DNS name server. The IP address can be an IPv4 or an IPv6 address. If empty will use the default host namserver.")
pipelineSize := flag.Int("P", 1, "Pipeline <numreq> requests. Default 1 (no pipeline).")
flag.Var(&benchmarkCommands, "cmd", "Specify a query to send in quotes. Each command that you specify is run with its ratio. For example:-cmd=\"SET __key__ __value__\" -cmd-ratio=1")
flag.Var(&benchmarkCommandsRatios, "cmd-ratio", "The query ratio vs other queries used in the same benchmark. Each command that you specify is run with its ratio. For example: -cmd=\"SET __key__ __value__\" -cmd-ratio=0.8 -cmd=\"GET __key__\" -cmd-ratio=0.2")

Expand Down Expand Up @@ -429,14 +533,19 @@ func main() {
}
fmt.Printf("Using random seed: %d\n", *seed)
rand.Seed(*seed)
mainRng := rand.New(rand.NewSource(*seed))
var cluster *radix.Cluster
var radixStandalone radix.Client
var ruedisClient rueidis.Client
var err error = nil
datapointsChan := make(chan datapoint, *numberRequests)

// For radix client with pipelining, create shared connection pools
var sharedRadixPools = make(map[string]radix.Client)

for clientId := 1; uint64(clientId) <= *clients; clientId++ {
wg.Add(1)
connectionStr := fmt.Sprintf("%s:%d", ips[rand.Int63n(int64(len(ips)))], *port)
connectionStr := fmt.Sprintf("%s:%d", ips[mainRng.Int63n(int64(len(ips)))], *port)
if *verbose {
fmt.Printf("Using connection string %s for client %d\n", connectionStr, clientId)
}
Expand Down Expand Up @@ -490,15 +599,21 @@ func main() {
if err != nil {
panic(err)
}
go benchmarkRoutine(radixStandalone, ruedisClient, *useRuedis, *cscEnabled, *multi, datapointsChan, *continueonerror, cmds, cdf, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, cmdKeyplaceHolderPos, cmdDataplaceHolderPos, cmdReadOnly, useRateLimiter, rateLimiter, *waitReplicas, *waitReplicasMs, &cacheOptions)
go benchmarkRoutine(radixStandalone, ruedisClient, *useRuedis, *cscEnabled, *multi, datapointsChan, *continueonerror, cmds, cdf, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, cmdKeyplaceHolderPos, cmdDataplaceHolderPos, cmdReadOnly, useRateLimiter, rateLimiter, *waitReplicas, *waitReplicasMs, *pipelineSize, &cacheOptions)
} else {
// legacy radix code
// legacy radix code with shared connection pools for better pipeline performance
if *clusterMode {
cluster = getOSSClusterConn(connectionStr, opts, 1)
go benchmarkRoutine(cluster, ruedisClient, *useRuedis, *cscEnabled, *multi, datapointsChan, *continueonerror, cmds, cdf, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, cmdKeyplaceHolderPos, cmdDataplaceHolderPos, cmdReadOnly, useRateLimiter, rateLimiter, *waitReplicas, *waitReplicasMs, nil)
go benchmarkRoutine(cluster, ruedisClient, *useRuedis, *cscEnabled, *multi, datapointsChan, *continueonerror, cmds, cdf, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, cmdKeyplaceHolderPos, cmdDataplaceHolderPos, cmdReadOnly, useRateLimiter, rateLimiter, *waitReplicas, *waitReplicasMs, *pipelineSize, nil)
} else {
radixStandalone = getStandaloneConn(connectionStr, opts, 1)
go benchmarkRoutine(radixStandalone, ruedisClient, *useRuedis, *cscEnabled, *multi, datapointsChan, *continueonerror, cmds, cdf, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, cmdKeyplaceHolderPos, cmdDataplaceHolderPos, cmdReadOnly, useRateLimiter, rateLimiter, *waitReplicas, *waitReplicasMs, nil)
// Use shared connection pool for better pipeline performance
if sharedRadixPools[connectionStr] == nil {
// Calculate optimal pool size based on pipeline size and clients
poolSize := int(*clients)
sharedRadixPools[connectionStr] = getStandaloneConn(connectionStr, opts, uint64(poolSize))
}
radixStandalone = sharedRadixPools[connectionStr]
go benchmarkRoutine(radixStandalone, ruedisClient, *useRuedis, *cscEnabled, *multi, datapointsChan, *continueonerror, cmds, cdf, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, cmdKeyplaceHolderPos, cmdDataplaceHolderPos, cmdReadOnly, useRateLimiter, rateLimiter, *waitReplicas, *waitReplicasMs, *pipelineSize, nil)
}
}

Expand Down
8 changes: 6 additions & 2 deletions redis-bechmark-go_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package main
import (
"bytes"
"context"
"github.com/redis/rueidis"
"math/rand"
"os"
"os/exec"
"reflect"
"syscall"
"testing"
"time"

"github.com/redis/rueidis"
)

func getTestConnectionDetails() (string, string) {
Expand Down Expand Up @@ -80,7 +82,9 @@ func Test_keyBuildLogic(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotNewCmdS, gotKey := keyBuildLogic(tt.args.keyPos, tt.args.dataPos, tt.args.datasize, tt.args.keyspacelen, tt.args.cmdS, tt.args.charset)
rng := rand.New(rand.NewSource(12345)) // Use fixed seed for deterministic tests
dataCache := make(map[int]string) // Per-test cache
gotNewCmdS, gotKey := keyBuildLogic(tt.args.keyPos, tt.args.dataPos, tt.args.datasize, tt.args.keyspacelen, tt.args.cmdS, tt.args.charset, rng, dataCache)
if !reflect.DeepEqual(gotNewCmdS, tt.wantNewCmdS) {
t.Errorf("keyBuildLogic() gotNewCmdS = %v, want %v", gotNewCmdS, tt.wantNewCmdS)
}
Expand Down