From cf7de3ef94e922c7be11bb38b086095916e69886 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Mon, 23 Dec 2024 00:06:42 +0000 Subject: [PATCH] Added --clients flag. included example in readme. --- README.md | 40 +++++++++++++++++++++ subscriber.go | 99 +++++++++++++++++++++++++-------------------------- 2 files changed, 89 insertions(+), 50 deletions(-) diff --git a/README.md b/README.md index 66c7c11..450d9dc 100644 --- a/README.md +++ b/README.md @@ -66,18 +66,40 @@ Usage of ./pubsub-sub-bench: Specify client output buffer limits for clients subscribed to at least one pubsub channel or pattern. If the value specified is different that the one present on the DB, this setting will apply. -client-update-tick int client update tick. (default 1) + -clients int + Number of parallel connections. (default 50) + -cpuprofile string + write cpu profile to file -host string redis host. (default "127.0.0.1") -json-out-file string Name of json output file, if not set, will not print to json. + -max-number-channels-per-subscriber int + max number of channels to subscribe to, per connection. (default 1) + -max-reconnect-interval int + max reconnect interval. if 0 disable (s)unsubscribe/(s)ubscribe. -messages int Number of total messages per subscriber per channel. + -min-number-channels-per-subscriber int + min number of channels to subscribe to, per connection. (default 1) + -min-reconnect-interval int + min reconnect interval. if 0 disable (s)unsubscribe/(s)ubscribe. + -mode string + Subscribe mode. Either 'subscribe' or 'ssubscribe'. (default "subscribe") -oss-cluster-api-distribute-subscribers read cluster slots and distribute subscribers among them. + -pool_size int + Maximum number of socket connections per node. -port string redis port. (default "6379") -print-messages print messages. + -rand-seed int + Random deterministic seed. (default 12345) + -redis-timeout duration + determines the timeout to pass to redis connection setup. It adjust the connection, read, and write timeouts. (default 30s) + -resp int + redis command response protocol (2 - RESP 2, 3 - RESP 3) (default 2) -subscriber-prefix string prefix for subscribing to channel, used in conjunction with key-minimum and key-maximum. (default "channel-") -subscribers-per-channel int @@ -88,5 +110,23 @@ Usage of ./pubsub-sub-bench: Number of seconds to run the test, after receiving the first message. -user string Used to send ACL style 'AUTH username pass'. Needs -a. + -verbose + verbose print. + -version + print version and exit. +``` + +### Example usage: create 10 subscribers that will subscribe to 2000 channels + +Subscriber ``` +./pubsub-sub-bench --clients 10 --channel-maximum 2000 --channel-minimum 1 -min-number-channels-per-subscriber 2000 -max-number-channels-per-subscriber 2000 +``` + +Publisher + +``` +memtier_benchmark --key-prefix "channel-" --key-maximum 2000 --key-minimum 1 --command "PUBLISH __key__ __data__" --test-time 60 --pipeline 10 +``` + diff --git a/subscriber.go b/subscriber.go index 349231e..37267a1 100644 --- a/subscriber.go +++ b/subscriber.go @@ -153,6 +153,7 @@ func main() { channel_minimum := flag.Int("channel-minimum", 1, "channel ID minimum value ( each channel has a dedicated thread ).") channel_maximum := flag.Int("channel-maximum", 100, "channel ID maximum value ( each channel has a dedicated thread ).") subscribers_per_channel := flag.Int("subscribers-per-channel", 1, "number of subscribers per channel.") + clients := flag.Int("clients", 50, "Number of parallel connections.") min_channels_per_subscriber := flag.Int("min-number-channels-per-subscriber", 1, "min number of channels to subscribe to, per connection.") max_channels_per_subscriber := flag.Int("max-number-channels-per-subscriber", 1, "max number of channels to subscribe to, per connection.") min_reconnect_interval := flag.Int("min-reconnect-interval", 0, "min reconnect interval. if 0 disable (s)unsubscribe/(s)ubscribe.") @@ -249,7 +250,6 @@ func main() { total_messages := int64(total_subscriptions) * *messages_per_channel_subscriber subscriptions_per_node := total_subscriptions / nodeCount - log.Println(fmt.Sprintf("Total subcriptions: %d. Subscriptions per node %d. Total messages: %d", total_subscriptions, subscriptions_per_node, total_messages)) log.Println(fmt.Sprintf("Will use a subscriber prefix of: %s", *subscribe_prefix)) if *poolSizePtr == 0 { @@ -306,60 +306,59 @@ func main() { } totalCreatedClients := 0 if strings.Compare(*subscribers_placement, "dense") == 0 { - for channel_id := *channel_minimum; channel_id <= *channel_maximum; channel_id++ { - channel := fmt.Sprintf("%s%d", *subscribe_prefix, channel_id) - for channel_subscriber_number := 1; channel_subscriber_number <= *subscribers_per_channel; channel_subscriber_number++ { - channels := []string{channel} - n_channels_this_conn := 1 - if *max_channels_per_subscriber == *min_channels_per_subscriber { - n_channels_this_conn = *max_channels_per_subscriber - } else { - n_channels_this_conn = rand.Intn(*max_channels_per_subscriber-*min_channels_per_subscriber) + *min_channels_per_subscriber + for client_id := 1; client_id <= *clients; client_id++ { + channels := []string{} + n_channels_this_conn := 0 + if *max_channels_per_subscriber == *min_channels_per_subscriber { + n_channels_this_conn = *max_channels_per_subscriber + } else { + n_channels_this_conn = rand.Intn(*max_channels_per_subscriber-*min_channels_per_subscriber) + *min_channels_per_subscriber + } + for channel_this_conn := 1; channel_this_conn <= n_channels_this_conn; channel_this_conn++ { + new_channel_id := rand.Intn(*channel_maximum) + *channel_minimum + new_channel := fmt.Sprintf("%s%d", *subscribe_prefix, new_channel_id) + channels = append(channels, new_channel) + } + totalCreatedClients++ + subscriberName := fmt.Sprintf("subscriber#%d", client_id) + var client *redis.Client + var err error = nil + ctx = context.Background() + // In case of SSUBSCRIBE the node is associated the to the channel name + if strings.Compare(*mode, "ssubscribe") == 0 && *distributeSubscribers == true { + firstChannel := channels[0] + client, err = clusterClient.MasterForKey(ctx, firstChannel) + if err != nil { + log.Fatal(err) } - for channel_this_conn := 1; channel_this_conn < n_channels_this_conn; channel_this_conn++ { - new_channel_id := rand.Intn(*channel_maximum) + *channel_minimum - new_channel := fmt.Sprintf("%s%d", *subscribe_prefix, new_channel_id) - channels = append(channels, new_channel) + if *verbose { + log.Println(fmt.Sprintf("client %d is a CLUSTER client connected to %v. Subscriber name %s", totalCreatedClients, client.String(), subscriberName)) } - totalCreatedClients++ - subscriberName := fmt.Sprintf("subscriber#%d-%s%d", channel_subscriber_number, *subscribe_prefix, channel_id) - var client *redis.Client - var err error = nil - ctx = context.Background() - // In case of SSUBSCRIBE the node is associated the to the channel name - if strings.Compare(*mode, "ssubscribe") == 0 && *distributeSubscribers == true { - client, err = clusterClient.MasterForKey(ctx, channel) - if err != nil { - log.Fatal(err) - } - if *verbose { - log.Println(fmt.Sprintf("client %d is a CLUSTER client connected to %v. Subscriber name %s", totalCreatedClients, client.String(), subscriberName)) - } - } else { - nodes_pos := channel_id % nodeCount - addr := nodesAddresses[nodes_pos] - client = nodeClients[nodes_pos] - if *verbose { - log.Println(fmt.Sprintf("client %d is a STANDALONE client connected to node %d (address %s). Subscriber name %s", totalCreatedClients, nodes_pos, addr, subscriberName)) - } - err = client.Ping(ctx).Err() - if err != nil { - log.Fatal(err) - } - } - wg.Add(1) - connectionReconnectInterval := 0 - if *max_reconnect_interval == *min_reconnect_interval { - connectionReconnectInterval = *max_reconnect_interval - } else { - connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *min_reconnect_interval + } else { + nodes_pos := client_id % nodeCount + addr := nodesAddresses[nodes_pos] + client = nodeClients[nodes_pos] + if *verbose { + log.Println(fmt.Sprintf("client %d is a STANDALONE client connected to node %d (address %s). Subscriber name %s", totalCreatedClients, nodes_pos, addr, subscriberName)) } - if connectionReconnectInterval > 0 { - log.Println(fmt.Sprintf("Using reconnection interval of %d milliseconds for subscriber: %s", connectionReconnectInterval, subscriberName)) + err = client.Ping(ctx).Err() + if err != nil { + log.Fatal(err) } - log.Println(fmt.Sprintf("subscriber: %s. Total channels %d: %v", subscriberName, len(channels), channels)) - go subscriberRoutine(subscriberName, *mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client) } + wg.Add(1) + connectionReconnectInterval := 0 + if *max_reconnect_interval == *min_reconnect_interval { + connectionReconnectInterval = *max_reconnect_interval + } else { + connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *min_reconnect_interval + } + if connectionReconnectInterval > 0 { + log.Println(fmt.Sprintf("Using reconnection interval of %d milliseconds for subscriber: %s", connectionReconnectInterval, subscriberName)) + } + log.Println(fmt.Sprintf("subscriber: %s. Total channels %d: %v", subscriberName, len(channels), channels)) + go subscriberRoutine(subscriberName, *mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client) + // } } }