From 20dad99e79e53611d6673cab668d466d001de7a7 Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Fri, 4 Apr 2025 00:06:11 +0100 Subject: [PATCH] Added the option to measure rtt --- go.mod | 5 +- go.sum | 53 ++++++++ subscriber.go | 335 +++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 322 insertions(+), 71 deletions(-) diff --git a/go.mod b/go.mod index 91a2346..b07fabd 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module github.com/RedisLabs/pubsub-sub-bench go 1.13 -require github.com/redis/go-redis/v9 v9.0.5 +require ( + github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect + github.com/redis/go-redis/v9 v9.0.5 +) diff --git a/go.sum b/go.sum index e095384..e749287 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,63 @@ +dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= +github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= +github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao= github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w= github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y= github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= +github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= +github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o= github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY= +golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= +golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= +golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= +golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= +golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= +gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0= +gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= +gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/subscriber.go b/subscriber.go index 37267a1..2087b8e 100644 --- a/subscriber.go +++ b/subscriber.go @@ -11,12 +11,14 @@ import ( "os" "os/signal" "runtime/pprof" + "strconv" "strings" "sync" "sync/atomic" "text/tabwriter" "time" + hdrhistogram "github.com/HdrHistogram/hdrhistogram-go" redis "github.com/redis/go-redis/v9" ) @@ -31,6 +33,7 @@ const ( var totalMessages uint64 var totalSubscribedChannels int64 +var totalPublishers int64 var totalConnects uint64 var clusterSlicesMu sync.Mutex @@ -50,8 +53,57 @@ type testResult struct { Addresses []string `json:"Addresses"` } -func subscriberRoutine(clientName, mode string, channels []string, printMessages bool, connectionReconnectInterval int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) { - // Tell the caller we've stopped +func publisherRoutine(clientName string, channels []string, mode string, measureRTT bool, verbose bool, dataSize int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) { + defer wg.Done() + + if verbose { + log.Printf("Publisher %s started. Mode: %s | Channels: %d | Payload: %s", + clientName, mode, len(channels), + func() string { + if measureRTT { + return "RTT timestamp" + } + return fmt.Sprintf("fixed size %d bytes", dataSize) + }(), + ) + } + + var payload string + if !measureRTT { + payload = strings.Repeat("A", dataSize) + } + + for { + select { + case <-ctx.Done(): + log.Printf("Publisher %s exiting due to context cancellation.", clientName) + return + + default: + msg := payload + if measureRTT { + now := time.Now().UnixMicro() + msg = strconv.FormatInt(now, 10) + } + + for _, ch := range channels { + var err error + switch mode { + case "spublish": + err = client.SPublish(ctx, ch, msg).Err() + default: + err = client.Publish(ctx, ch, msg).Err() + } + if err != nil { + log.Printf("Error publishing to channel %s: %v", ch, err) + } + atomic.AddUint64(&totalMessages, 1) + } + } + } +} + +func subscriberRoutine(clientName, mode string, channels []string, verbose bool, connectionReconnectInterval int, measureRTT bool, rttLatencyChannel chan int64, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) { // Tell the caller we've stopped defer wg.Done() var reconnectTicker *time.Ticker if connectionReconnectInterval > 0 { @@ -134,8 +186,20 @@ func subscriberRoutine(clientName, mode string, channels []string, printMessages } panic(err) } - if printMessages { - fmt.Println(fmt.Sprintf("received message in channel %s. Message: %s", msg.Channel, msg.Payload)) + if verbose { + log.Println(fmt.Sprintf("received message in channel %s. Message: %s", msg.Channel, msg.Payload)) + } + if measureRTT { + if ts, err := strconv.ParseInt(msg.Payload, 10, 64); err == nil { + now := time.Now().UnixMicro() + rtt := now - ts + rttLatencyChannel <- rtt + if verbose { + log.Printf("RTT measured: %d ms\n", rtt/1000) + } + } else { + log.Printf("Invalid timestamp in message: %s, err: %v\n", msg.Payload, err) + } } atomic.AddUint64(&totalMessages, 1) } @@ -147,6 +211,7 @@ func main() { port := flag.String("port", "6379", "redis port.") cpuprofile := flag.String("cpuprofile", "", "write cpu profile to file") password := flag.String("a", "", "Password for Redis Auth.") + dataSize := flag.Int("data-size", 128, "Payload size in bytes for publisher messages when RTT mode is disabled") mode := flag.String("mode", "subscribe", "Subscribe mode. Either 'subscribe' or 'ssubscribe'.") username := flag.String("user", "", "Used to send ACL style 'AUTH username pass'. Needs -a.") subscribers_placement := flag.String("subscribers-placement-per-channel", "dense", "(dense,sparse) dense - Place all subscribers to channel in a specific shard. sparse- spread the subscribers across as many shards possible, in a round-robin manner.") @@ -168,6 +233,7 @@ func main() { distributeSubscribers := flag.Bool("oss-cluster-api-distribute-subscribers", false, "read cluster slots and distribute subscribers among them.") printMessages := flag.Bool("print-messages", false, "print messages.") verbose := flag.Bool("verbose", false, "verbose print.") + measureRTT := flag.Bool("measure-rtt-latency", false, "Enable RTT latency measurement mode") version := flag.Bool("version", false, "print version and exit.") timeout := flag.Duration("redis-timeout", time.Second*30, "determines the timeout to pass to redis connection setup. It adjust the connection, read, and write timeouts.") poolSizePtr := flag.Int(redisPoolSize, redisPoolSizeDefault, "Maximum number of socket connections per node.") @@ -190,7 +256,14 @@ func main() { log.Println(fmt.Sprintf("pubsub-sub-bench (git_sha1:%s%s)", git_sha, git_dirty_str)) log.Println(fmt.Sprintf("using random seed:%d", *randSeed)) rand.Seed(*randSeed) - + if *measureRTT { + log.Println("RTT measurement enabled.") + } else { + log.Println("RTT measurement disabled.") + } + if *verbose { + log.Println("verbose mode enabled.") + } ctx := context.Background() nodeCount := 0 var nodesAddresses []string @@ -251,6 +324,7 @@ func main() { subscriptions_per_node := total_subscriptions / nodeCount log.Println(fmt.Sprintf("Will use a subscriber prefix of: %s", *subscribe_prefix)) + log.Println(fmt.Sprintf("total_channels: %d", total_channels)) if *poolSizePtr == 0 { poolSize = subscriptions_per_node @@ -304,62 +378,114 @@ func main() { } pprof.StartCPUProfile(f) } + rttLatencyChannel := make(chan int64, 100000) // Channel for RTT measurements. buffer of 100K messages to process totalCreatedClients := 0 - if strings.Compare(*subscribers_placement, "dense") == 0 { + if strings.Contains(*mode, "publish") { + // Only run publishers for client_id := 1; client_id <= *clients; client_id++ { channels := []string{} - n_channels_this_conn := 0 + n_channels_this_pub := 0 if *max_channels_per_subscriber == *min_channels_per_subscriber { - n_channels_this_conn = *max_channels_per_subscriber + n_channels_this_pub = *max_channels_per_subscriber } else { - n_channels_this_conn = rand.Intn(*max_channels_per_subscriber-*min_channels_per_subscriber) + *min_channels_per_subscriber + n_channels_this_pub = rand.Intn(*max_channels_per_subscriber-*min_channels_per_subscriber) + *min_channels_per_subscriber } - for channel_this_conn := 1; channel_this_conn <= n_channels_this_conn; channel_this_conn++ { - new_channel_id := rand.Intn(*channel_maximum) + *channel_minimum + for i := 0; i < n_channels_this_pub; i++ { + new_channel_id := rand.Intn(*channel_maximum-*channel_minimum+1) + *channel_minimum new_channel := fmt.Sprintf("%s%d", *subscribe_prefix, new_channel_id) channels = append(channels, new_channel) } - totalCreatedClients++ - subscriberName := fmt.Sprintf("subscriber#%d", client_id) + + publisherName := fmt.Sprintf("publisher#%d", client_id) var client *redis.Client - var err error = nil + var err error + ctx = context.Background() - // In case of SSUBSCRIBE the node is associated the to the channel name - if strings.Compare(*mode, "ssubscribe") == 0 && *distributeSubscribers == true { + if strings.Compare(*mode, "spublish") == 0 && *distributeSubscribers { firstChannel := channels[0] client, err = clusterClient.MasterForKey(ctx, firstChannel) if err != nil { log.Fatal(err) } - if *verbose { - log.Println(fmt.Sprintf("client %d is a CLUSTER client connected to %v. Subscriber name %s", totalCreatedClients, client.String(), subscriberName)) - } } else { - nodes_pos := client_id % nodeCount - addr := nodesAddresses[nodes_pos] - client = nodeClients[nodes_pos] - if *verbose { - log.Println(fmt.Sprintf("client %d is a STANDALONE client connected to node %d (address %s). Subscriber name %s", totalCreatedClients, nodes_pos, addr, subscriberName)) - } + nodePos := client_id % nodeCount + client = nodeClients[nodePos] err = client.Ping(ctx).Err() if err != nil { log.Fatal(err) } } - wg.Add(1) - connectionReconnectInterval := 0 - if *max_reconnect_interval == *min_reconnect_interval { - connectionReconnectInterval = *max_reconnect_interval - } else { - connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *min_reconnect_interval + + if *verbose { + log.Printf("publisher %d targeting channels %v", client_id, channels) } - if connectionReconnectInterval > 0 { - log.Println(fmt.Sprintf("Using reconnection interval of %d milliseconds for subscriber: %s", connectionReconnectInterval, subscriberName)) + + wg.Add(1) + go publisherRoutine(publisherName, channels, *mode, *measureRTT, *verbose, *dataSize, ctx, &wg, client) + atomic.AddInt64(&totalPublishers, 1) + atomic.AddUint64(&totalConnects, 1) + } + + } else if strings.Contains(*mode, "subscribe") { + // Only run subscribers + if strings.Compare(*subscribers_placement, "dense") == 0 { + for client_id := 1; client_id <= *clients; client_id++ { + channels := []string{} + n_channels_this_conn := 0 + if *max_channels_per_subscriber == *min_channels_per_subscriber { + n_channels_this_conn = *max_channels_per_subscriber + } else { + n_channels_this_conn = rand.Intn(*max_channels_per_subscriber-*min_channels_per_subscriber) + *min_channels_per_subscriber + } + for channel_this_conn := 1; channel_this_conn <= n_channels_this_conn; channel_this_conn++ { + new_channel_id := rand.Intn(*channel_maximum-*channel_minimum+1) + *channel_minimum + new_channel := fmt.Sprintf("%s%d", *subscribe_prefix, new_channel_id) + channels = append(channels, new_channel) + } + totalCreatedClients++ + subscriberName := fmt.Sprintf("subscriber#%d", client_id) + var client *redis.Client + var err error = nil + ctx = context.Background() + + if strings.Compare(*mode, "ssubscribe") == 0 && *distributeSubscribers == true { + firstChannel := channels[0] + client, err = clusterClient.MasterForKey(ctx, firstChannel) + if err != nil { + log.Fatal(err) + } + } else { + nodes_pos := client_id % nodeCount + addr := nodesAddresses[nodes_pos] + client = nodeClients[nodes_pos] + if *verbose { + log.Printf("client %d is a STANDALONE client connected to node %d (address %s). Subscriber name %s", totalCreatedClients, nodes_pos, addr, subscriberName) + } + err = client.Ping(ctx).Err() + if err != nil { + log.Fatal(err) + } + } + + connectionReconnectInterval := 0 + if *max_reconnect_interval == *min_reconnect_interval { + connectionReconnectInterval = *max_reconnect_interval + } else { + connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *min_reconnect_interval + } + + if connectionReconnectInterval > 0 { + log.Printf("Using reconnection interval of %d milliseconds for subscriber: %s", connectionReconnectInterval, subscriberName) + } + + log.Printf("subscriber: %s. Total channels %d: %v", subscriberName, len(channels), channels) + + wg.Add(1) + go subscriberRoutine(subscriberName, *mode, channels, *printMessages, connectionReconnectInterval, *measureRTT, rttLatencyChannel, ctx, &wg, client) } - log.Println(fmt.Sprintf("subscriber: %s. Total channels %d: %v", subscriberName, len(channels), channels)) - go subscriberRoutine(subscriberName, *mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client) - // } } + } else { + log.Fatalf("Invalid mode '%s'. Must be one of: subscribe, ssubscribe, publish, spublish", *mode) } // listen for C-c @@ -368,14 +494,32 @@ func main() { w := new(tabwriter.Writer) tick := time.NewTicker(time.Duration(*client_update_tick) * time.Second) - closed, start_time, duration, totalMessages, messageRateTs := updateCLI(tick, c, total_messages, w, *test_time) + closed, start_time, duration, totalMessages, messageRateTs, rttValues := updateCLI(tick, c, total_messages, w, *test_time, *measureRTT, *mode, rttLatencyChannel) messageRate := float64(totalMessages) / float64(duration.Seconds()) if *cpuprofile != "" { pprof.StopCPUProfile() } - - fmt.Fprint(w, fmt.Sprintf("#################################################\nTotal Duration %f Seconds\nMessage Rate %f\n#################################################\n", duration.Seconds(), messageRate)) + fmt.Fprintf(w, "#################################################\n") + fmt.Fprintf(w, "Mode: %s\n", *mode) + fmt.Fprintf(w, "Total Duration: %f Seconds\n", duration.Seconds()) + fmt.Fprintf(w, "Message Rate: %f msg/sec\n", messageRate) + if *measureRTT && (*mode != "publish" && *mode != "spublish") { + hist := hdrhistogram.New(1, 10_000_000, 3) // 1us to 10s, 3 sig digits + for _, rtt := range rttValues { + _ = hist.RecordValue(rtt) + } + avg := hist.Mean() + p50 := hist.ValueAtQuantile(50.0) + p99 := hist.ValueAtQuantile(99.0) + p999 := hist.ValueAtQuantile(99.9) + fmt.Fprintf(w, "Avg RTT %.3f ms\n", avg/1000.0) + fmt.Fprintf(w, "P50 RTT %.3f ms\n", float64(p50)/1000.0) + fmt.Fprintf(w, "P99 RTT %.3f ms\n", float64(p99)/1000.0) + fmt.Fprintf(w, "P999 RTT %.3f ms\n", float64(p999)/1000.0) + } else { + } + fmt.Fprintf(w, "#################################################\n") fmt.Fprint(w, "\r\n") w.Flush() @@ -447,55 +591,106 @@ func updateSecondarySlicesCluster(clusterClient *redis.ClusterClient, ctx contex nodeCount = len(nodesAddresses) return nodeCount, nodeClients, nodesAddresses } - -func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabwriter.Writer, test_time int) (bool, time.Time, time.Duration, uint64, []float64) { +func updateCLI( + tick *time.Ticker, + c chan os.Signal, + message_limit int64, + w *tabwriter.Writer, + test_time int, + measureRTT bool, + mode string, + rttLatencyChannel chan int64, +) (bool, time.Time, time.Duration, uint64, []float64, []int64) { start := time.Now() prevTime := time.Now() prevMessageCount := uint64(0) prevConnectCount := uint64(0) messageRateTs := []float64{} + tickRttValues := []int64{} + rttValues := []int64{} w.Init(os.Stdout, 25, 0, 1, ' ', tabwriter.AlignRight) - fmt.Fprint(w, fmt.Sprintf("Test Time\tTotal Messages\t Message Rate \tConnect Rate \tActive subscriptions\t")) - fmt.Fprint(w, "\n") + + // Header + if measureRTT { + fmt.Fprint(w, "Test Time\tTotal Messages\t Message Rate \tConnect Rate \t") + if strings.HasPrefix(mode, "subscribe") { + fmt.Fprint(w, "Active subscriptions\t") + } else { + fmt.Fprint(w, "Active publishers\t") + } + fmt.Fprint(w, "Avg RTT (ms)\t\n") + } else { + fmt.Fprint(w, "Test Time\tTotal Messages\t Message Rate \tConnect Rate \t") + if strings.HasPrefix(mode, "subscribe") { + fmt.Fprint(w, "Active subscriptions\t\n") + } else { + fmt.Fprint(w, "Active publishers\t\n") + } + } w.Flush() + + // Main loop for { select { + case rtt := <-rttLatencyChannel: + rttValues = append(rttValues, rtt) + tickRttValues = append(tickRttValues, rtt) + case <-tick.C: - { - now := time.Now() - took := now.Sub(prevTime) - messageRate := float64(totalMessages-prevMessageCount) / float64(took.Seconds()) - connectRate := float64(totalConnects-prevConnectCount) / float64(took.Seconds()) - - if prevMessageCount == 0 && totalMessages != 0 { - start = time.Now() - } - if totalMessages != 0 { - messageRateTs = append(messageRateTs, messageRate) - } - prevMessageCount = totalMessages - prevConnectCount = totalConnects - prevTime = now - - fmt.Fprint(w, fmt.Sprintf("%.0f\t%d\t%.2f\t%.2f\t%d\t", time.Since(start).Seconds(), totalMessages, messageRate, connectRate, totalSubscribedChannels)) - fmt.Fprint(w, "\r\n") - w.Flush() - if message_limit > 0 && totalMessages >= uint64(message_limit) { - return true, start, time.Since(start), totalMessages, messageRateTs - } - if test_time > 0 && time.Since(start) >= time.Duration(test_time*1000*1000*1000) && totalMessages != 0 { - return true, start, time.Since(start), totalMessages, messageRateTs + now := time.Now() + took := now.Sub(prevTime) + messageRate := float64(totalMessages-prevMessageCount) / float64(took.Seconds()) + connectRate := float64(totalConnects-prevConnectCount) / float64(took.Seconds()) + + if prevMessageCount == 0 && totalMessages != 0 { + start = time.Now() + } + if totalMessages != 0 { + messageRateTs = append(messageRateTs, messageRate) + } + prevMessageCount = totalMessages + prevConnectCount = totalConnects + prevTime = now + + // Metrics line + fmt.Fprintf(w, "%.0f\t%d\t%.2f\t%.2f\t", time.Since(start).Seconds(), totalMessages, messageRate, connectRate) + + if strings.HasPrefix(mode, "subscribe") { + fmt.Fprintf(w, "%d\t", totalSubscribedChannels) + } else { + fmt.Fprintf(w, "%d\t", atomic.LoadInt64(&totalPublishers)) + } + + if measureRTT { + var avgRTTms float64 + if len(tickRttValues) > 0 { + var total int64 + for _, v := range tickRttValues { + total += v + } + avgRTTms = float64(total) / float64(len(tickRttValues)) / 1000.0 + tickRttValues = tickRttValues[:0] + fmt.Fprintf(w, "%.3f\t", avgRTTms) + } else { + fmt.Fprintf(w, "--\t") } + } - break + fmt.Fprint(w, "\r\n") + w.Flush() + + if message_limit > 0 && totalMessages >= uint64(message_limit) { + return true, start, time.Since(start), totalMessages, messageRateTs, rttValues + } + if test_time > 0 && time.Since(start) >= time.Duration(test_time*int(time.Second)) && totalMessages != 0 { + return true, start, time.Since(start), totalMessages, messageRateTs, rttValues } case <-c: fmt.Println("received Ctrl-c - shutting down") - return true, start, time.Since(start), totalMessages, messageRateTs + return true, start, time.Since(start), totalMessages, messageRateTs, rttValues } } - return false, start, time.Since(start), totalMessages, messageRateTs }