diff --git a/subscriber.go b/subscriber.go index ea6edf6..7c4aaf8 100644 --- a/subscriber.go +++ b/subscriber.go @@ -95,7 +95,9 @@ func main() { client_output_buffer_limit_pubsub := flag.String("client-output-buffer-limit-pubsub", "", "Specify client output buffer limits for clients subscribed to at least one pubsub channel or pattern. If the value specified is different that the one present on the DB, this setting will apply.") distributeSubscribers := flag.Bool("oss-cluster-api-distribute-subscribers", false, "read cluster slots and distribute subscribers among them.") printMessages := flag.Bool("print-messages", false, "print messages.") + dialTimeout := flag.Duration("redis-timeout", time.Second*300, "determines the timeout to pass to redis connection setup. It adjust the connection, read, and write timeouts.") flag.Parse() + totalMessages = 0 var nodes []radix.ClusterNode var nodesAddresses []string @@ -108,7 +110,8 @@ func main() { opts = append(opts, radix.DialAuthPass(*password)) } } - + opts = append(opts, radix.DialTimeout(*dialTimeout)) + fmt.Printf("Using a redis connection, read, and write timeout of %v\n", *dialTimeout) if *test_time != 0 && *messages_per_channel_subscriber != 0 { log.Fatal(fmt.Errorf("--messages and --test-time are mutially exclusive ( please specify one or the other )")) } @@ -120,7 +123,7 @@ func main() { } if strings.Compare(*client_output_buffer_limit_pubsub, "") != 0 { - checkClientOutputBufferLimitPubSub(nodes, client_output_buffer_limit_pubsub) + checkClientOutputBufferLimitPubSub(nodes, client_output_buffer_limit_pubsub, opts) } stopChan := make(chan struct{}) @@ -289,9 +292,9 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw return false, start, time.Since(start), totalMessages, messageRateTs } -func checkClientOutputBufferLimitPubSub(nodes []radix.ClusterNode, client_output_buffer_limit_pubsub *string) { +func checkClientOutputBufferLimitPubSub(nodes []radix.ClusterNode, client_output_buffer_limit_pubsub *string, opts []radix.DialOpt) { for _, slot := range nodes { - conn, err := radix.Dial("tcp", slot.Addr) + conn, err := radix.Dial("tcp", slot.Addr, opts...) if err != nil { panic(err) }