From 64d172314b88e7c2397f3ad9f27a90b9e8285037 Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Sat, 5 Apr 2025 00:30:28 +0100 Subject: [PATCH 1/4] Add optional rate limiting to publisher with --rps and --rps-burst flags --- go.mod | 12 ++++++++++-- go.sum | 9 +++++++++ subscriber.go | 37 +++++++++++++++++++++++++++++++------ 3 files changed, 50 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index b07fabd..7be750d 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,16 @@ module github.com/RedisLabs/pubsub-sub-bench -go 1.13 +go 1.23.0 + +toolchain go1.24.1 require ( - github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect + github.com/HdrHistogram/hdrhistogram-go v1.1.2 github.com/redis/go-redis/v9 v9.0.5 + golang.org/x/time v0.11.0 +) + +require ( + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect ) diff --git a/go.sum b/go.sum index e749287..b1b423a 100644 --- a/go.sum +++ b/go.sum @@ -11,22 +11,26 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= +github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o= github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -34,6 +38,7 @@ golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136 h1:A1gGSx58LAGVHUUsOf7IiR0u8Xb6W51gRwfDBhkdcaw= golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= @@ -47,6 +52,8 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= +golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -54,10 +61,12 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= +gonum.org/v1/gonum v0.8.2 h1:CCXrcPKiGGotvnN6jfUsKk4rRqm7q09/YbKb5xCEvtM= gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0= gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/subscriber.go b/subscriber.go index db60e49..81fc3ad 100644 --- a/subscriber.go +++ b/subscriber.go @@ -7,6 +7,7 @@ import ( "fmt" "io/ioutil" "log" + "math" "math/rand" "os" "os/signal" @@ -20,6 +21,7 @@ import ( hdrhistogram "github.com/HdrHistogram/hdrhistogram-go" redis "github.com/redis/go-redis/v9" + "golang.org/x/time/rate" ) const ( @@ -31,6 +33,8 @@ const ( redisTLSInsecureSkipVerify = "tls_insecure_skip_verify" ) +const Inf = rate.Limit(math.MaxFloat64) + var totalMessages uint64 var totalSubscribedChannels int64 var totalPublishers int64 @@ -53,7 +57,7 @@ type testResult struct { Addresses []string `json:"Addresses"` } -func publisherRoutine(clientName string, channels []string, mode string, measureRTT bool, verbose bool, dataSize int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) { +func publisherRoutine(clientName string, channels []string, mode string, measureRTT bool, verbose bool, dataSize int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client, useLimiter bool, rateLimiter *rate.Limiter) { defer wg.Done() if verbose { @@ -81,12 +85,16 @@ func publisherRoutine(clientName string, channels []string, mode string, measure default: msg := payload - if measureRTT { - now := time.Now().UnixMicro() - msg = strconv.FormatInt(now, 10) - } for _, ch := range channels { + if useLimiter { + r := rateLimiter.ReserveN(time.Now(), int(1)) + time.Sleep(r.Delay()) + } + if measureRTT { + now := time.Now().UnixMicro() + msg = strconv.FormatInt(now, 10) + } var err error switch mode { case "spublish": @@ -210,6 +218,8 @@ func main() { host := flag.String("host", "127.0.0.1", "redis host.") port := flag.String("port", "6379", "redis port.") cpuprofile := flag.String("cpuprofile", "", "write cpu profile to file") + rps := flag.Int64("rps", 0, "Max rps. If 0 no limit is applied and the DB is stressed up to maximum.") + rpsburst := flag.Int64("rps-burst", 0, "Max rps burst. If 0 the allowed burst will be the ammount of clients.") password := flag.String("a", "", "Password for Redis Auth.") dataSize := flag.Int("data-size", 128, "Payload size in bytes for publisher messages when RTT mode is disabled") mode := flag.String("mode", "subscribe", "Subscribe mode. Either 'subscribe' or 'ssubscribe'.") @@ -381,6 +391,21 @@ func main() { rttLatencyChannel := make(chan int64, 100000) // Channel for RTT measurements. buffer of 100K messages to process totalCreatedClients := 0 if strings.Contains(*mode, "publish") { + var requestRate = Inf + var requestBurst = int(*rps) + useRateLimiter := false + if *rps != 0 { + requestRate = rate.Limit(*rps) + log.Println(fmt.Sprintf("running publisher mode with rate-limit enabled. Max published %d messages/sec.\n", *rps)) + useRateLimiter = true + if *rpsburst != 0 { + requestBurst = int(*rpsburst) + } + } else { + log.Println(fmt.Sprintf("running publisher mode with maximum rate enabled.")) + } + + var rateLimiter = rate.NewLimiter(requestRate, requestBurst) // Only run publishers for client_id := 1; client_id <= *clients; client_id++ { channels := []string{} @@ -421,7 +446,7 @@ func main() { } wg.Add(1) - go publisherRoutine(publisherName, channels, *mode, *measureRTT, *verbose, *dataSize, ctx, &wg, client) + go publisherRoutine(publisherName, channels, *mode, *measureRTT, *verbose, *dataSize, ctx, &wg, client, useRateLimiter, rateLimiter) atomic.AddInt64(&totalPublishers, 1) atomic.AddUint64(&totalConnects, 1) } From 48ebbb7bb6bb58585639c725f7b29bef52018de9 Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Sat, 5 Apr 2025 00:32:06 +0100 Subject: [PATCH 2/4] Updated go version on ci to 1.23 and 1.24 --- .github/workflows/unit-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index f004ffd..18769c8 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -4,7 +4,7 @@ jobs: test: strategy: matrix: - go-version: [1.18.x, 1.19.x, 1.20.x] + go-version: [1.23.x,1.24.x] os: [ubuntu-latest] runs-on: ${{ matrix.os }} steps: From 295284d66f3b6c10800ccc7723602bf77f7153c4 Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Mon, 7 Apr 2025 00:57:51 +0100 Subject: [PATCH 3/4] Reduced logging level at start of benchmark --- subscriber.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/subscriber.go b/subscriber.go index 81fc3ad..4d0d7cc 100644 --- a/subscriber.go +++ b/subscriber.go @@ -502,9 +502,12 @@ func main() { if connectionReconnectInterval > 0 { log.Printf("Using reconnection interval of %d milliseconds for subscriber: %s", connectionReconnectInterval, subscriberName) } - - log.Printf("subscriber: %s. Total channels %d: %v", subscriberName, len(channels), channels) - + if *verbose { + log.Printf("subscriber: %s. Total channels %d: %v", subscriberName, len(channels), channels) + } + if totalCreatedClients%100 == 0 { + log.Printf("Created %d clients so far.", totalCreatedClients) + } wg.Add(1) go subscriberRoutine(subscriberName, *mode, channels, *printMessages, connectionReconnectInterval, *measureRTT, rttLatencyChannel, ctx, &wg, client) } From d4bd8db694f005baa3a719a167c9d96fc3ab10e0 Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Mon, 7 Apr 2025 00:59:18 +0100 Subject: [PATCH 4/4] Included p95 metric --- subscriber.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/subscriber.go b/subscriber.go index 4d0d7cc..fa5cebb 100644 --- a/subscriber.go +++ b/subscriber.go @@ -539,10 +539,12 @@ func main() { } avg := hist.Mean() p50 := hist.ValueAtQuantile(50.0) + p95 := hist.ValueAtQuantile(95.0) p99 := hist.ValueAtQuantile(99.0) p999 := hist.ValueAtQuantile(99.9) fmt.Fprintf(w, "Avg RTT %.3f ms\n", avg/1000.0) fmt.Fprintf(w, "P50 RTT %.3f ms\n", float64(p50)/1000.0) + fmt.Fprintf(w, "P95 RTT %.3f ms\n", float64(p95)/1000.0) fmt.Fprintf(w, "P99 RTT %.3f ms\n", float64(p99)/1000.0) fmt.Fprintf(w, "P999 RTT %.3f ms\n", float64(p999)/1000.0) } else {