From a65c7599df4aef5ebc8867fd09b5f13a9067d0fc Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Wed, 1 Feb 2023 18:54:11 +0000 Subject: [PATCH] Added option to specify the timeout to pass to redis connection setup. It adjust the connection, read, and write timeouts. --- subscriber.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/subscriber.go b/subscriber.go index ea6edf6..7c4aaf8 100644 --- a/subscriber.go +++ b/subscriber.go @@ -95,7 +95,9 @@ func main() { client_output_buffer_limit_pubsub := flag.String("client-output-buffer-limit-pubsub", "", "Specify client output buffer limits for clients subscribed to at least one pubsub channel or pattern. If the value specified is different that the one present on the DB, this setting will apply.") distributeSubscribers := flag.Bool("oss-cluster-api-distribute-subscribers", false, "read cluster slots and distribute subscribers among them.") printMessages := flag.Bool("print-messages", false, "print messages.") + dialTimeout := flag.Duration("redis-timeout", time.Second*300, "determines the timeout to pass to redis connection setup. It adjust the connection, read, and write timeouts.") flag.Parse() + totalMessages = 0 var nodes []radix.ClusterNode var nodesAddresses []string @@ -108,7 +110,8 @@ func main() { opts = append(opts, radix.DialAuthPass(*password)) } } - + opts = append(opts, radix.DialTimeout(*dialTimeout)) + fmt.Printf("Using a redis connection, read, and write timeout of %v\n", *dialTimeout) if *test_time != 0 && *messages_per_channel_subscriber != 0 { log.Fatal(fmt.Errorf("--messages and --test-time are mutially exclusive ( please specify one or the other )")) } @@ -120,7 +123,7 @@ func main() { } if strings.Compare(*client_output_buffer_limit_pubsub, "") != 0 { - checkClientOutputBufferLimitPubSub(nodes, client_output_buffer_limit_pubsub) + checkClientOutputBufferLimitPubSub(nodes, client_output_buffer_limit_pubsub, opts) } stopChan := make(chan struct{}) @@ -289,9 +292,9 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw return false, start, time.Since(start), totalMessages, messageRateTs } -func checkClientOutputBufferLimitPubSub(nodes []radix.ClusterNode, client_output_buffer_limit_pubsub *string) { +func checkClientOutputBufferLimitPubSub(nodes []radix.ClusterNode, client_output_buffer_limit_pubsub *string, opts []radix.DialOpt) { for _, slot := range nodes { - conn, err := radix.Dial("tcp", slot.Addr) + conn, err := radix.Dial("tcp", slot.Addr, opts...) if err != nil { panic(err) }