From 16085007f6244e40348b51758220481c54be4127 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Mon, 2 Dec 2024 15:21:46 +0000 Subject: [PATCH 1/3] Dynamic Reconnect and Multi-Channel Support --- subscriber.go | 131 ++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 99 insertions(+), 32 deletions(-) diff --git a/subscriber.go b/subscriber.go index 8f40860..af11c97 100644 --- a/subscriber.go +++ b/subscriber.go @@ -7,6 +7,7 @@ import ( "fmt" "io/ioutil" "log" + "math/rand" "os" "os/signal" "runtime/pprof" @@ -47,32 +48,71 @@ type testResult struct { Addresses []string `json:"Addresses"` } -func subscriberRoutine(mode, channel string, printMessages bool, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) { - // tell the caller we've stopped +func subscriberRoutine(mode string, channels []string, printMessages bool, connectionReconnectInterval int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) { + // Tell the caller we've stopped defer wg.Done() - switch mode { - case "ssubscribe": - spubsub := client.SSubscribe(ctx, channel) - defer spubsub.Close() - for { - msg, err := spubsub.ReceiveMessage(ctx) - if err != nil { - panic(err) - } - if printMessages { - fmt.Println(fmt.Sprintf("received message in channel %s. Message: %s", msg.Channel, msg.Payload)) + var reconnectTicker *time.Ticker + if connectionReconnectInterval > 0 { + reconnectTicker = time.NewTicker(time.Duration(connectionReconnectInterval) * time.Second) + defer reconnectTicker.Stop() + } else { + reconnectTicker = time.NewTicker(1 * time.Second) + reconnectTicker.Stop() + } + + var pubsub *redis.PubSub + + // Helper function to handle subscription based on mode + subscribe := func() { + if pubsub != nil { + // Unsubscribe based on mode before re-subscribing + if mode == "ssubscribe" { + if err := pubsub.SUnsubscribe(ctx, channels...); err != nil { + fmt.Printf("Error during SUnsubscribe: %v\n", err) + } + } else { + if err := pubsub.Unsubscribe(ctx, channels...); err != nil { + fmt.Printf("Error during Unsubscribe: %v\n", err) + } } - atomic.AddUint64(&totalMessages, 1) + pubsub.Close() + } + switch mode { + case "ssubscribe": + pubsub = client.SSubscribe(ctx, channels...) + default: + pubsub = client.Subscribe(ctx, channels...) } - break - case "subscribe": - fallthrough - default: - pubsub := client.Subscribe(ctx, channel) - defer pubsub.Close() - for { + } + + subscribe() + + for { + select { + case <-ctx.Done(): + // Context cancelled, exit routine + if pubsub != nil { + if mode == "ssubscribe" { + _ = pubsub.SUnsubscribe(ctx, channels...) + } else { + _ = pubsub.Unsubscribe(ctx, channels...) + } + pubsub.Close() + } + return + case <-reconnectTicker.C: + // Reconnect interval triggered, unsubscribe and resubscribe + if reconnectTicker != nil { + subscribe() + } + default: + // Handle messages msg, err := pubsub.ReceiveMessage(ctx) if err != nil { + // Handle Redis connection errors, e.g., reconnect immediately + if err == redis.Nil || err == context.DeadlineExceeded || err == context.Canceled { + continue + } panic(err) } if printMessages { @@ -81,7 +121,6 @@ func subscriberRoutine(mode, channel string, printMessages bool, ctx context.Con atomic.AddUint64(&totalMessages, 1) } } - } func main() { @@ -95,6 +134,10 @@ func main() { channel_minimum := flag.Int("channel-minimum", 1, "channel ID minimum value ( each channel has a dedicated thread ).") channel_maximum := flag.Int("channel-maximum", 100, "channel ID maximum value ( each channel has a dedicated thread ).") subscribers_per_channel := flag.Int("subscribers-per-channel", 1, "number of subscribers per channel.") + min_channels_per_subscriber := flag.Int("min-number-channels-per-subscriber", 1, "min number of channels to subscribe to, per connection.") + max_channels_per_subscriber := flag.Int("max-number-channels-per-subscriber", 1, "max number of channels to subscribe to, per connection.") + min_reconnect_interval := flag.Int("min-reconnect-interval", 0, "min reconnect interval. if 0 disable (s)unsubscribe/(s)ubscribe.") + max_reconnect_interval := flag.Int("max-reconnect-interval", 0, "max reconnect interval. if 0 disable (s)unsubscribe/(s)ubscribe.") messages_per_channel_subscriber := flag.Int64("messages", 0, "Number of total messages per subscriber per channel.") json_out_file := flag.String("json-out-file", "", "Name of json output file, if not set, will not print to json.") client_update_tick := flag.Int("client-update-tick", 1, "client update tick.") @@ -191,16 +234,19 @@ func main() { poolSize = subscriptions_per_node log.Println(fmt.Sprintf("Setting per Node pool size of %d given you haven't specified a value and we have %d Subscriptions per node. You can control this option via --%s=", poolSize, subscriptions_per_node, redisPoolSize)) clusterOptions.PoolSize = poolSize - log.Println("Reloading cluster state given we've changed pool size.") - clusterClient = redis.NewClusterClient(&clusterOptions) - // ReloadState reloads cluster state. It calls ClusterSlots func - // to get cluster slots information. - clusterClient.ReloadState(ctx) - err := clusterClient.Ping(ctx).Err() - if err != nil { - log.Fatal(err) + if *distributeSubscribers { + log.Println("Reloading cluster state given we've changed pool size.") + clusterClient = redis.NewClusterClient(&clusterOptions) + // ReloadState reloads cluster state. It calls ClusterSlots func + // to get cluster slots information. + clusterClient.ReloadState(ctx) + err := clusterClient.Ping(ctx).Err() + if err != nil { + log.Fatal(err) + } + nodeCount, nodeClients, nodesAddresses = updateSecondarySlicesCluster(clusterClient, ctx) } - nodeCount, nodeClients, nodesAddresses = updateSecondarySlicesCluster(clusterClient, ctx) + } log.Println(fmt.Sprintf("Detailing final setup used for benchmark.")) @@ -241,6 +287,18 @@ func main() { for channel_id := *channel_minimum; channel_id <= *channel_maximum; channel_id++ { channel := fmt.Sprintf("%s%d", *subscribe_prefix, channel_id) for channel_subscriber_number := 1; channel_subscriber_number <= *subscribers_per_channel; channel_subscriber_number++ { + channels := []string{channel} + n_channels_this_conn := 1 + if *max_channels_per_subscriber == *min_channels_per_subscriber { + n_channels_this_conn = *max_channels_per_subscriber + } else { + n_channels_this_conn = rand.Intn(*max_channels_per_subscriber - *min_channels_per_subscriber) + } + for channel_this_conn := 1; channel_this_conn < n_channels_this_conn; channel_this_conn++ { + new_channel_id := rand.Intn(*channel_maximum) + *channel_minimum + new_channel := fmt.Sprintf("%s%d", *subscribe_prefix, new_channel_id) + channels = append(channels, new_channel) + } totalCreatedClients++ subscriberName := fmt.Sprintf("subscriber#%d-%s%d", channel_subscriber_number, *subscribe_prefix, channel_id) var client *redis.Client @@ -268,7 +326,16 @@ func main() { } } wg.Add(1) - go subscriberRoutine(*mode, channel, *printMessages, ctx, &wg, client) + connectionReconnectInterval := 0 + if *max_reconnect_interval == *min_reconnect_interval { + connectionReconnectInterval = *max_reconnect_interval + } else { + connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *max_reconnect_interval + } + if connectionReconnectInterval > 0 { + log.Println(fmt.Sprintf("Using reconnection interval of %d for subscriber: %s", connectionReconnectInterval, subscriberName)) + } + go subscriberRoutine(*mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client) } } } From 522093dc7485c08bbaffd7d4920f03b50ceb25bb Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Mon, 2 Dec 2024 15:30:06 +0000 Subject: [PATCH 2/3] updating go-release-action to the latest --- .github/workflows/publish.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 00786b7..fd4af07 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -19,8 +19,8 @@ jobs: goos: [linux, darwin] goarch: [amd64, arm64] steps: - - uses: actions/checkout@v3 - - uses: wangyoucao577/go-release-action@v1.36 + - uses: actions/checkout@v4 + - uses: wangyoucao577/go-release-action@v1 with: github_token: ${{ secrets.GITHUB_TOKEN }} goos: ${{ matrix.goos }} From 7c40dee5e03093fbeda8c77e2c3ec21836017979 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Mon, 2 Dec 2024 17:19:19 +0000 Subject: [PATCH 3/3] Reconnection improvements and extra metrics --- subscriber.go | 71 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 22 deletions(-) diff --git a/subscriber.go b/subscriber.go index af11c97..349231e 100644 --- a/subscriber.go +++ b/subscriber.go @@ -30,6 +30,8 @@ const ( ) var totalMessages uint64 +var totalSubscribedChannels int64 +var totalConnects uint64 var clusterSlicesMu sync.Mutex type testResult struct { @@ -48,12 +50,12 @@ type testResult struct { Addresses []string `json:"Addresses"` } -func subscriberRoutine(mode string, channels []string, printMessages bool, connectionReconnectInterval int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) { +func subscriberRoutine(clientName, mode string, channels []string, printMessages bool, connectionReconnectInterval int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) { // Tell the caller we've stopped defer wg.Done() var reconnectTicker *time.Ticker if connectionReconnectInterval > 0 { - reconnectTicker = time.NewTicker(time.Duration(connectionReconnectInterval) * time.Second) + reconnectTicker = time.NewTicker(time.Duration(connectionReconnectInterval) * time.Millisecond) defer reconnectTicker.Stop() } else { reconnectTicker = time.NewTicker(1 * time.Second) @@ -61,28 +63,45 @@ func subscriberRoutine(mode string, channels []string, printMessages bool, conne } var pubsub *redis.PubSub + nChannels := len(channels) // Helper function to handle subscription based on mode subscribe := func() { if pubsub != nil { - // Unsubscribe based on mode before re-subscribing - if mode == "ssubscribe" { - if err := pubsub.SUnsubscribe(ctx, channels...); err != nil { - fmt.Printf("Error during SUnsubscribe: %v\n", err) + if nChannels > 1 { + // Unsubscribe based on mode before re-subscribing + if mode == "ssubscribe" { + if err := pubsub.SUnsubscribe(ctx, channels[1:]...); err != nil { + fmt.Printf("Error during SUnsubscribe: %v\n", err) + } + pubsub.Close() + atomic.AddInt64(&totalSubscribedChannels, int64(-len(channels[1:]))) + pubsub = client.SSubscribe(ctx, channels[1:]...) + atomic.AddInt64(&totalSubscribedChannels, int64(len(channels[1:]))) + } else { + if err := pubsub.Unsubscribe(ctx, channels[1:]...); err != nil { + fmt.Printf("Error during Unsubscribe: %v\n", err) + pubsub.Close() + atomic.AddInt64(&totalSubscribedChannels, int64(-len(channels[1:]))) + pubsub = client.Subscribe(ctx, channels[1:]...) + atomic.AddInt64(&totalSubscribedChannels, int64(len(channels[1:]))) + } } + atomic.AddUint64(&totalConnects, 1) } else { - if err := pubsub.Unsubscribe(ctx, channels...); err != nil { - fmt.Printf("Error during Unsubscribe: %v\n", err) - } + log.Println(fmt.Sprintf("Skipping (S)UNSUBSCRIBE given client %s had only one channel subscribed in this connection: %v.", clientName, channels)) } - pubsub.Close() - } - switch mode { - case "ssubscribe": - pubsub = client.SSubscribe(ctx, channels...) - default: - pubsub = client.Subscribe(ctx, channels...) + } else { + switch mode { + case "ssubscribe": + pubsub = client.SSubscribe(ctx, channels...) + default: + pubsub = client.Subscribe(ctx, channels...) + } + atomic.AddInt64(&totalSubscribedChannels, int64(len(channels))) + atomic.AddUint64(&totalConnects, 1) } + } subscribe() @@ -142,6 +161,7 @@ func main() { json_out_file := flag.String("json-out-file", "", "Name of json output file, if not set, will not print to json.") client_update_tick := flag.Int("client-update-tick", 1, "client update tick.") test_time := flag.Int("test-time", 0, "Number of seconds to run the test, after receiving the first message.") + randSeed := flag.Int64("rand-seed", 12345, "Random deterministic seed.") subscribe_prefix := flag.String("subscriber-prefix", "channel-", "prefix for subscribing to channel, used in conjunction with key-minimum and key-maximum.") client_output_buffer_limit_pubsub := flag.String("client-output-buffer-limit-pubsub", "", "Specify client output buffer limits for clients subscribed to at least one pubsub channel or pattern. If the value specified is different that the one present on the DB, this setting will apply.") distributeSubscribers := flag.Bool("oss-cluster-api-distribute-subscribers", false, "read cluster slots and distribute subscribers among them.") @@ -167,6 +187,8 @@ func main() { log.Fatal(fmt.Errorf("--messages and --test-time are mutially exclusive ( please specify one or the other )")) } log.Println(fmt.Sprintf("pubsub-sub-bench (git_sha1:%s%s)", git_sha, git_dirty_str)) + log.Println(fmt.Sprintf("using random seed:%d", *randSeed)) + rand.Seed(*randSeed) ctx := context.Background() nodeCount := 0 @@ -292,7 +314,7 @@ func main() { if *max_channels_per_subscriber == *min_channels_per_subscriber { n_channels_this_conn = *max_channels_per_subscriber } else { - n_channels_this_conn = rand.Intn(*max_channels_per_subscriber - *min_channels_per_subscriber) + n_channels_this_conn = rand.Intn(*max_channels_per_subscriber-*min_channels_per_subscriber) + *min_channels_per_subscriber } for channel_this_conn := 1; channel_this_conn < n_channels_this_conn; channel_this_conn++ { new_channel_id := rand.Intn(*channel_maximum) + *channel_minimum @@ -330,12 +352,13 @@ func main() { if *max_reconnect_interval == *min_reconnect_interval { connectionReconnectInterval = *max_reconnect_interval } else { - connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *max_reconnect_interval + connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *min_reconnect_interval } if connectionReconnectInterval > 0 { - log.Println(fmt.Sprintf("Using reconnection interval of %d for subscriber: %s", connectionReconnectInterval, subscriberName)) + log.Println(fmt.Sprintf("Using reconnection interval of %d milliseconds for subscriber: %s", connectionReconnectInterval, subscriberName)) } - go subscriberRoutine(*mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client) + log.Println(fmt.Sprintf("subscriber: %s. Total channels %d: %v", subscriberName, len(channels), channels)) + go subscriberRoutine(subscriberName, *mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client) } } } @@ -431,10 +454,11 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw start := time.Now() prevTime := time.Now() prevMessageCount := uint64(0) + prevConnectCount := uint64(0) messageRateTs := []float64{} w.Init(os.Stdout, 25, 0, 1, ' ', tabwriter.AlignRight) - fmt.Fprint(w, fmt.Sprintf("Test Time\tTotal Messages\t Message Rate \t")) + fmt.Fprint(w, fmt.Sprintf("Test Time\tTotal Messages\t Message Rate \tConnect Rate \tActive subscriptions\t")) fmt.Fprint(w, "\n") w.Flush() for { @@ -444,6 +468,8 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw now := time.Now() took := now.Sub(prevTime) messageRate := float64(totalMessages-prevMessageCount) / float64(took.Seconds()) + connectRate := float64(totalConnects-prevConnectCount) / float64(took.Seconds()) + if prevMessageCount == 0 && totalMessages != 0 { start = time.Now() } @@ -451,9 +477,10 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw messageRateTs = append(messageRateTs, messageRate) } prevMessageCount = totalMessages + prevConnectCount = totalConnects prevTime = now - fmt.Fprint(w, fmt.Sprintf("%.0f\t%d\t%.2f\t", time.Since(start).Seconds(), totalMessages, messageRate)) + fmt.Fprint(w, fmt.Sprintf("%.0f\t%d\t%.2f\t%.2f\t%d\t", time.Since(start).Seconds(), totalMessages, messageRate, connectRate, totalSubscribedChannels)) fmt.Fprint(w, "\r\n") w.Flush() if message_limit > 0 && totalMessages >= uint64(message_limit) {