From 54fbb9f28729a45f9fe5b6953d0f7f39fd5ee5b7 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 15 Jun 2023 13:02:17 +0100 Subject: [PATCH 1/2] Removed radix usage from --oss-cluster-api-distribute-subscribers. Using clusterClient.MasterForKey() in case of cluster API and SSUBSCRIBE --- bin_info.go | 26 +++++ go.mod | 11 +-- go.sum | 36 +------ subscriber.go | 264 ++++++++++++++++++++++++++------------------------ 4 files changed, 168 insertions(+), 169 deletions(-) create mode 100644 bin_info.go diff --git a/bin_info.go b/bin_info.go new file mode 100644 index 0000000..3494a4b --- /dev/null +++ b/bin_info.go @@ -0,0 +1,26 @@ +package main + +import ( + "strconv" + "strings" +) + +// Vars only for git sha and diff handling +var GitSHA1 string = "" +var GitDirty string = "0" + +// internal function to return value of GitSHA1 var, which is filled in link time +func toolGitSHA1() string { + return GitSHA1 +} + +// this internal function will check for the number of altered lines that are not yet committed +// and return true in that case +func toolGitDirty() (dirty bool) { + dirty = false + dirtyLines, err := strconv.Atoi(strings.TrimSpace(GitDirty)) + if err == nil { + dirty = (dirtyLines != 0) + } + return +} diff --git a/go.mod b/go.mod index 949b4dd..91a2346 100644 --- a/go.mod +++ b/go.mod @@ -2,13 +2,4 @@ module github.com/RedisLabs/pubsub-sub-bench go 1.13 -require ( - github.com/kr/text v0.2.0 // indirect - github.com/mediocregopher/radix/v4 v4.1.2 - github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect - github.com/redis/go-redis/v9 v9.0.3 - github.com/stretchr/testify v1.8.0 // indirect - gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect -) - -replace github.com/redis/go-redis/v9 => github.com/filipecosta90/go-redis/v9 v9.0.0-20230429203646-959c94037c1e +require github.com/redis/go-redis/v9 v9.0.5 diff --git a/go.sum b/go.sum index 3ca521d..e095384 100644 --- a/go.sum +++ b/go.sum @@ -4,39 +4,7 @@ github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y= github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/filipecosta90/go-redis/v9 v9.0.0-20230429203646-959c94037c1e h1:3fUI2gXQ1Z6S7DrmrI91IXsK9uVtBUjs5HeAdUqGvHg= -github.com/filipecosta90/go-redis/v9 v9.0.0-20230429203646-959c94037c1e/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= -github.com/filipecosta90/radix/v4 v4.0.0-20230425194536-7ff9365a6164 h1:tpHQ6ng3jPWSr2krw71E1ALi0A9JdxWOQawtiy6CltY= -github.com/filipecosta90/radix/v4 v4.0.0-20230425194536-7ff9365a6164/go.mod h1:ajchozX/6ELmydxWeWM6xCFHVpZ4+67LXHOTOVR0nCE= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/mediocregopher/radix/v4 v4.1.2 h1:Pj7XnNK5WuzzFy63g98pnccainAePK+aZNQRvxSvj2I= -github.com/mediocregopher/radix/v4 v4.1.2/go.mod h1:ajchozX/6ELmydxWeWM6xCFHVpZ4+67LXHOTOVR0nCE= -github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= -github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/redis/go-redis/v9 v9.0.3 h1:+7mmR26M0IvyLxGZUHxu4GiBkJkVDid0Un+j4ScYu4k= -github.com/redis/go-redis/v9 v9.0.3/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/tilinna/clock v1.0.2 h1:6BO2tyAC9JbPExKH/z9zl44FLu1lImh3nDNKA0kgrkI= -github.com/tilinna/clock v1.0.2/go.mod h1:ZsP7BcY7sEEz7ktc0IVy8Us6boDrK8VradlKRUGfOao= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= -gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o= +github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= diff --git a/subscriber.go b/subscriber.go index de8721d..1164266 100644 --- a/subscriber.go +++ b/subscriber.go @@ -5,8 +5,6 @@ import ( "encoding/json" "flag" "fmt" - radix "github.com/mediocregopher/radix/v4" - "github.com/redis/go-redis/v9" "io/ioutil" "log" "os" @@ -17,6 +15,17 @@ import ( "sync/atomic" "text/tabwriter" "time" + + redis "github.com/redis/go-redis/v9" +) + +const ( + redisPoolSize = "pool_size" + redisPoolSizeDefault = 0 + redisTLSCA = "tls_ca" + redisTLSCert = "tls_cert" + redisTLSKey = "tls_key" + redisTLSInsecureSkipVerify = "tls_insecure_skip_verify" ) var totalMessages uint64 @@ -37,15 +46,9 @@ type testResult struct { Addresses []string `json:"Addresses"` } -func subscriberRoutine(addr string, mode, subscriberName string, channel string, printMessages bool, ctx context.Context, wg *sync.WaitGroup, opts radix.Dialer, protocolVersion int) { +func subscriberRoutine(mode, channel string, printMessages bool, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) { // tell the caller we've stopped defer wg.Done() - client := redis.NewClient(&redis.Options{ - Addr: addr, - Password: opts.AuthPass, - ClientName: subscriberName, - ProtocolVersion: protocolVersion, - }) switch mode { case "ssubscribe": spubsub := client.SSubscribe(ctx, channel) @@ -99,42 +102,119 @@ func main() { client_output_buffer_limit_pubsub := flag.String("client-output-buffer-limit-pubsub", "", "Specify client output buffer limits for clients subscribed to at least one pubsub channel or pattern. If the value specified is different that the one present on the DB, this setting will apply.") distributeSubscribers := flag.Bool("oss-cluster-api-distribute-subscribers", false, "read cluster slots and distribute subscribers among them.") printMessages := flag.Bool("print-messages", false, "print messages.") - //TODO FIX ME - //dialTimeout := flag.Duration("redis-timeout", time.Second*300, "determines the timeout to pass to redis connection setup. It adjust the connection, read, and write timeouts.") + verbose := flag.Bool("verbose", false, "verbose print.") + version := flag.Bool("version", false, "print version and exit.") + timeout := flag.Duration("redis-timeout", time.Second*30, "determines the timeout to pass to redis connection setup. It adjust the connection, read, and write timeouts.") + poolSizePtr := flag.Int(redisPoolSize, redisPoolSizeDefault, "determines the timeout to pass to redis connection setup. It adjust the connection, read, and write timeouts.") resp := flag.Int("resp", 2, "redis command response protocol (2 - RESP 2, 3 - RESP 3)") flag.Parse() - totalMessages = 0 - var nodes []radix.ClusterNode - var nodesAddresses []string - var node_subscriptions_count []int - opts := radix.Dialer{} - if *password != "" { - opts.AuthPass = *password - if *username != "" { - opts.AuthUser = *username - } + + git_sha := toolGitSHA1() + git_dirty_str := "" + if toolGitDirty() { + git_dirty_str = "-dirty" } - if *resp == 2 { - opts.Protocol = "2" - } else if *resp == 3 { - opts.Protocol = "3" + if *version { + fmt.Fprintf(os.Stdout, "pubsub-sub-bench (git_sha1:%s%s)\n", git_sha, git_dirty_str) + os.Exit(0) } if *test_time != 0 && *messages_per_channel_subscriber != 0 { log.Fatal(fmt.Errorf("--messages and --test-time are mutially exclusive ( please specify one or the other )")) } + log.Println(fmt.Sprintf("pubsub-sub-bench (git_sha1:%s%s)", git_sha, git_dirty_str)) + + ctx := context.Background() + nodeCount := 0 + poolSize := *poolSizePtr + var nodesAddresses []string + var nodeClients []*redis.Client + var clusterClient *redis.ClusterClient + var standaloneClient *redis.Client + + standaloneOptions := redis.Options{Protocol: *resp, + Addr: fmt.Sprintf("%s:%s", *host, *port), + Username: *username, + Password: *password, + DialTimeout: *timeout, + ReadTimeout: *timeout, + WriteTimeout: *timeout, + PoolSize: poolSize, + } + clusterOptions := redis.ClusterOptions{Protocol: *resp, + Addrs: []string{fmt.Sprintf("%s:%s", *host, *port)}, + Username: *username, + Password: *password, + DialTimeout: *timeout, + ReadTimeout: *timeout, + WriteTimeout: *timeout, + PoolSize: poolSize, + } if *distributeSubscribers { - nodes, nodesAddresses, node_subscriptions_count = getClusterNodesFromTopology(host, port, nodes, nodesAddresses, node_subscriptions_count, opts) + clusterClient = redis.NewClusterClient(&clusterOptions) + // ReloadState reloads cluster state. It calls ClusterSlots func + // to get cluster slots information. + log.Println("Reloading cluster state.") + clusterClient.ReloadState(ctx) + err := clusterClient.Ping(ctx).Err() + if err != nil { + log.Fatal(err) + } + log.Println("Getting cluster slots info.") + slots, err := clusterClient.ClusterSlots(ctx).Result() + if err != nil { + log.Fatal(err) + } + nodeCount = len(slots) + log.Println(fmt.Sprintf("Detailing cluster slots info. Total nodes: %d", nodeCount)) + for _, slotInfo := range slots { + log.Println(fmt.Sprintf("\tSlot range start %d end %d. Nodes: %v", slotInfo.Start, slotInfo.End, slotInfo.Nodes)) + nodesAddresses = append(nodesAddresses, slotInfo.Nodes[0].Addr) + } + fn := func(ctx context.Context, client *redis.Client) (err error) { + nodeClients = append(nodeClients, client) + return + } + clusterClient.ForEachMaster(ctx, fn) } else { - nodes, nodesAddresses, node_subscriptions_count = getClusterNodesFromArgs(nodes, port, host, nodesAddresses, node_subscriptions_count) + nodeCount = 1 + nodesAddresses = append(nodesAddresses, fmt.Sprintf("%s:%s", *host, *port)) + standaloneClient = redis.NewClient(&standaloneOptions) + err := standaloneClient.Ping(ctx).Err() + if err != nil { + log.Fatal(err) + } + nodeClients = append(nodeClients, standaloneClient) } - + // if strings.Compare(*client_output_buffer_limit_pubsub, "") != 0 { - checkClientOutputBufferLimitPubSub(nodes, client_output_buffer_limit_pubsub, opts) + log.Println("client-output-buffer-limit-pubsub is not being enforced currently.") } - ctx := context.Background() + totalMessages = 0 + total_channels := *channel_maximum - *channel_minimum + 1 + total_subscriptions := total_channels * *subscribers_per_channel + total_messages := int64(total_subscriptions) * *messages_per_channel_subscriber + subscriptions_per_node := total_subscriptions / nodeCount + + log.Println(fmt.Sprintf("Total subcriptions: %d. Subscriptions per node %d. Total messages: %d", total_subscriptions, subscriptions_per_node, total_messages)) + log.Println(fmt.Sprintf("Will use a subscriber prefix of: %s", *subscribe_prefix)) + + if *poolSizePtr == 0 { + poolSize = subscriptions_per_node + log.Println(fmt.Sprintf("Setting per Node pool size of %d given you haven't specified a value and we have %d Subscriptions per node. You can control this option via --%s=", poolSize, subscriptions_per_node, redisPoolSize)) + clusterOptions.PoolSize = poolSize + log.Println("Reloading cluster state given we've changed pool size.") + clusterClient = redis.NewClusterClient(&clusterOptions) + // ReloadState reloads cluster state. It calls ClusterSlots func + // to get cluster slots information. + clusterClient.ReloadState(ctx) + err := clusterClient.Ping(ctx).Err() + if err != nil { + log.Fatal(err) + } + } // trap Ctrl+C and call cancel on the context // We Use this instead of the previous stopChannel + chan radix.PubSubMessage ctx, cancel := context.WithCancel(ctx) @@ -154,11 +234,7 @@ func main() { // a WaitGroup for the goroutines to tell us they've stopped wg := sync.WaitGroup{} - total_channels := *channel_maximum - *channel_minimum + 1 - subscriptions_per_node := total_channels / len(nodes) - total_subscriptions := total_channels * *subscribers_per_channel - total_messages := int64(total_subscriptions) * *messages_per_channel_subscriber - fmt.Println(fmt.Sprintf("Total subcriptions: %d. Subscriptions per node %d. Total messages: %d", total_subscriptions, subscriptions_per_node, total_messages)) + if *cpuprofile != "" { f, err := os.Create(*cpuprofile) if err != nil { @@ -166,17 +242,39 @@ func main() { } pprof.StartCPUProfile(f) } - + totalCreatedClients := 0 if strings.Compare(*subscribers_placement, "dense") == 0 { for channel_id := *channel_minimum; channel_id <= *channel_maximum; channel_id++ { + channel := fmt.Sprintf("%s%d", *subscribe_prefix, channel_id) for channel_subscriber_number := 1; channel_subscriber_number <= *subscribers_per_channel; channel_subscriber_number++ { - nodes_pos := channel_id % len(nodes) - node_subscriptions_count[nodes_pos]++ - addr := nodes[nodes_pos] - channel := fmt.Sprintf("%s%d", *subscribe_prefix, channel_id) + totalCreatedClients++ subscriberName := fmt.Sprintf("subscriber#%d-%s%d", channel_subscriber_number, *subscribe_prefix, channel_id) + var client *redis.Client + var err error = nil + ctx = context.Background() + // In case of SSUBSCRIBE the node is associated the to the channel name + if strings.Compare(*mode, "ssubscribe") == 0 && *distributeSubscribers == true { + client, err = clusterClient.MasterForKey(ctx, channel) + if err != nil { + log.Fatal(err) + } + if *verbose { + log.Println(fmt.Sprintf("client %d is a CLUSTER client connected to %v. Subscriber name %s", totalCreatedClients, client.String(), subscriberName)) + } + } else { + nodes_pos := channel_id % nodeCount + addr := nodesAddresses[nodes_pos] + client = nodeClients[nodes_pos] + if *verbose { + log.Println(fmt.Sprintf("client %d is a STANDALONE client connected to node %d (address %s). Subscriber name %s", totalCreatedClients, nodes_pos, addr, subscriberName)) + } + err = client.Ping(ctx).Err() + if err != nil { + log.Fatal(err) + } + } wg.Add(1) - go subscriberRoutine(addr.Addr, *mode, subscriberName, channel, *printMessages, ctx, &wg, opts, *resp) + go subscriberRoutine(*mode, channel, *printMessages, ctx, &wg, client) } } } @@ -236,51 +334,6 @@ func main() { wg.Wait() } -func getClusterNodesFromArgs(nodes []radix.ClusterNode, port *string, host *string, nodesAddresses []string, node_subscriptions_count []int) ([]radix.ClusterNode, []string, []int) { - nodes = []radix.ClusterNode{} - ports := strings.Split(*port, ",") - for idx, nhost := range strings.Split(*host, ",") { - node := radix.ClusterNode{ - Addr: fmt.Sprintf("%s:%s", nhost, ports[idx]), - ID: "", - Slots: nil, - SecondaryOfAddr: "", - SecondaryOfID: "", - } - nodes = append(nodes, node) - nodesAddresses = append(nodesAddresses, node.Addr) - node_subscriptions_count = append(node_subscriptions_count, 0) - } - return nodes, nodesAddresses, node_subscriptions_count -} - -func getClusterNodesFromTopology(host *string, port *string, nodes []radix.ClusterNode, nodesAddresses []string, node_subscriptions_count []int, opts radix.Dialer) ([]radix.ClusterNode, []string, []int) { - // Create a normal redis connection - ctx := context.Background() - conn, err := opts.Dial(ctx, "tcp", fmt.Sprintf("%s:%s", *host, *port)) - if err != nil { - panic(err) - } - var topology radix.ClusterTopo - err = conn.Do(ctx, radix.FlatCmd(&topology, "CLUSTER", "SLOTS")) - if err != nil { - log.Fatal(err) - } - - for _, slot := range topology.Map() { - slot_host := strings.Split(slot.Addr, ":")[0] - slot_port := strings.Split(slot.Addr, ":")[1] - if strings.Compare(slot_host, "127.0.0.1") == 0 { - slot.Addr = fmt.Sprintf("%s:%s", *host, slot_port) - } - nodes = append(nodes, slot) - nodesAddresses = append(nodesAddresses, slot.Addr) - node_subscriptions_count = append(node_subscriptions_count, 0) - } - conn.Close() - return nodes, nodesAddresses, node_subscriptions_count -} - func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabwriter.Writer, test_time int) (bool, time.Time, time.Duration, uint64, []float64) { start := time.Now() @@ -328,42 +381,3 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw } return false, start, time.Since(start), totalMessages, messageRateTs } - -func checkClientOutputBufferLimitPubSub(nodes []radix.ClusterNode, client_output_buffer_limit_pubsub *string, opts radix.Dialer) { - for _, slot := range nodes { - ctx := context.Background() - conn, err := opts.Dial(ctx, "tcp", slot.Addr) - if err != nil { - panic(err) - } - _, err, pubsubTopology := getPubSubBufferLimit(err, conn) - if strings.Compare(*client_output_buffer_limit_pubsub, pubsubTopology) != 0 { - fmt.Println(fmt.Sprintf("\tCHANGING DB pubsub topology for address %s from %s to %s", slot.Addr, pubsubTopology, *client_output_buffer_limit_pubsub)) - - err = conn.Do(ctx, radix.FlatCmd(nil, "CONFIG", "SET", "client-output-buffer-limit", fmt.Sprintf("pubsub %s", *client_output_buffer_limit_pubsub))) - if err != nil { - log.Fatal(err) - } - _, err, pubsubTopology = getPubSubBufferLimit(err, conn) - if err != nil { - log.Fatal(err) - } - fmt.Println(fmt.Sprintf("\tCHANGED DB pubsub topology for address %s: %s", slot.Addr, pubsubTopology)) - } else { - fmt.Println(fmt.Sprintf("\tNo need to change pubsub topology for address %s: %s", slot.Addr, pubsubTopology)) - } - conn.Close() - } -} - -func getPubSubBufferLimit(err error, conn radix.Conn) ([]string, error, string) { - var topologyResponse []string - ctx := context.Background() - err = conn.Do(ctx, radix.FlatCmd(&topologyResponse, "CONFIG", "GET", "client-output-buffer-limit")) - if err != nil { - log.Fatal(err) - } - i := strings.Index(topologyResponse[1], "pubsub ") - pubsubTopology := topologyResponse[1][i+7:] - return topologyResponse, err, pubsubTopology -} From e0d8fdda3eeef18d5ee7c351082789e3169ff5b4 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 15 Jun 2023 13:05:07 +0100 Subject: [PATCH 2/2] Fixd --pool_size option description --- subscriber.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subscriber.go b/subscriber.go index 1164266..2803a4c 100644 --- a/subscriber.go +++ b/subscriber.go @@ -105,7 +105,7 @@ func main() { verbose := flag.Bool("verbose", false, "verbose print.") version := flag.Bool("version", false, "print version and exit.") timeout := flag.Duration("redis-timeout", time.Second*30, "determines the timeout to pass to redis connection setup. It adjust the connection, read, and write timeouts.") - poolSizePtr := flag.Int(redisPoolSize, redisPoolSizeDefault, "determines the timeout to pass to redis connection setup. It adjust the connection, read, and write timeouts.") + poolSizePtr := flag.Int(redisPoolSize, redisPoolSizeDefault, "Maximum number of socket connections per node.") resp := flag.Int("resp", 2, "redis command response protocol (2 - RESP 2, 3 - RESP 3)") flag.Parse()