Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ func main() {
client_output_buffer_limit_pubsub := flag.String("client-output-buffer-limit-pubsub", "", "Specify client output buffer limits for clients subscribed to at least one pubsub channel or pattern. If the value specified is different that the one present on the DB, this setting will apply.")
distributeSubscribers := flag.Bool("oss-cluster-api-distribute-subscribers", false, "read cluster slots and distribute subscribers among them.")
printMessages := flag.Bool("print-messages", false, "print messages.")
dialTimeout := flag.Duration("redis-timeout", time.Second*300, "determines the timeout to pass to redis connection setup. It adjust the connection, read, and write timeouts.")
flag.Parse()

totalMessages = 0
var nodes []radix.ClusterNode
var nodesAddresses []string
Expand All @@ -108,7 +110,8 @@ func main() {
opts = append(opts, radix.DialAuthPass(*password))
}
}

opts = append(opts, radix.DialTimeout(*dialTimeout))
fmt.Printf("Using a redis connection, read, and write timeout of %v\n", *dialTimeout)
if *test_time != 0 && *messages_per_channel_subscriber != 0 {
log.Fatal(fmt.Errorf("--messages and --test-time are mutially exclusive ( please specify one or the other )"))
}
Expand All @@ -120,7 +123,7 @@ func main() {
}

if strings.Compare(*client_output_buffer_limit_pubsub, "") != 0 {
checkClientOutputBufferLimitPubSub(nodes, client_output_buffer_limit_pubsub)
checkClientOutputBufferLimitPubSub(nodes, client_output_buffer_limit_pubsub, opts)
}

stopChan := make(chan struct{})
Expand Down Expand Up @@ -289,9 +292,9 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw
return false, start, time.Since(start), totalMessages, messageRateTs
}

func checkClientOutputBufferLimitPubSub(nodes []radix.ClusterNode, client_output_buffer_limit_pubsub *string) {
func checkClientOutputBufferLimitPubSub(nodes []radix.ClusterNode, client_output_buffer_limit_pubsub *string, opts []radix.DialOpt) {
for _, slot := range nodes {
conn, err := radix.Dial("tcp", slot.Addr)
conn, err := radix.Dial("tcp", slot.Addr, opts...)
if err != nil {
panic(err)
}
Expand Down