Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,40 @@ Usage of ./pubsub-sub-bench:
Specify client output buffer limits for clients subscribed to at least one pubsub channel or pattern. If the value specified is different that the one present on the DB, this setting will apply.
-client-update-tick int
client update tick. (default 1)
-clients int
Number of parallel connections. (default 50)
-cpuprofile string
write cpu profile to file
-host string
redis host. (default "127.0.0.1")
-json-out-file string
Name of json output file, if not set, will not print to json.
-max-number-channels-per-subscriber int
max number of channels to subscribe to, per connection. (default 1)
-max-reconnect-interval int
max reconnect interval. if 0 disable (s)unsubscribe/(s)ubscribe.
-messages int
Number of total messages per subscriber per channel.
-min-number-channels-per-subscriber int
min number of channels to subscribe to, per connection. (default 1)
-min-reconnect-interval int
min reconnect interval. if 0 disable (s)unsubscribe/(s)ubscribe.
-mode string
Subscribe mode. Either 'subscribe' or 'ssubscribe'. (default "subscribe")
-oss-cluster-api-distribute-subscribers
read cluster slots and distribute subscribers among them.
-pool_size int
Maximum number of socket connections per node.
-port string
redis port. (default "6379")
-print-messages
print messages.
-rand-seed int
Random deterministic seed. (default 12345)
-redis-timeout duration
determines the timeout to pass to redis connection setup. It adjust the connection, read, and write timeouts. (default 30s)
-resp int
redis command response protocol (2 - RESP 2, 3 - RESP 3) (default 2)
-subscriber-prefix string
prefix for subscribing to channel, used in conjunction with key-minimum and key-maximum. (default "channel-")
-subscribers-per-channel int
Expand All @@ -88,5 +110,23 @@ Usage of ./pubsub-sub-bench:
Number of seconds to run the test, after receiving the first message.
-user string
Used to send ACL style 'AUTH username pass'. Needs -a.
-verbose
verbose print.
-version
print version and exit.
```

### Example usage: create 10 subscribers that will subscribe to 2000 channels

Subscriber

```
./pubsub-sub-bench --clients 10 --channel-maximum 2000 --channel-minimum 1 -min-number-channels-per-subscriber 2000 -max-number-channels-per-subscriber 2000
```

Publisher

```
memtier_benchmark --key-prefix "channel-" --key-maximum 2000 --key-minimum 1 --command "PUBLISH __key__ __data__" --test-time 60 --pipeline 10
```

99 changes: 49 additions & 50 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func main() {
channel_minimum := flag.Int("channel-minimum", 1, "channel ID minimum value ( each channel has a dedicated thread ).")
channel_maximum := flag.Int("channel-maximum", 100, "channel ID maximum value ( each channel has a dedicated thread ).")
subscribers_per_channel := flag.Int("subscribers-per-channel", 1, "number of subscribers per channel.")
clients := flag.Int("clients", 50, "Number of parallel connections.")
min_channels_per_subscriber := flag.Int("min-number-channels-per-subscriber", 1, "min number of channels to subscribe to, per connection.")
max_channels_per_subscriber := flag.Int("max-number-channels-per-subscriber", 1, "max number of channels to subscribe to, per connection.")
min_reconnect_interval := flag.Int("min-reconnect-interval", 0, "min reconnect interval. if 0 disable (s)unsubscribe/(s)ubscribe.")
Expand Down Expand Up @@ -249,7 +250,6 @@ func main() {
total_messages := int64(total_subscriptions) * *messages_per_channel_subscriber
subscriptions_per_node := total_subscriptions / nodeCount

log.Println(fmt.Sprintf("Total subcriptions: %d. Subscriptions per node %d. Total messages: %d", total_subscriptions, subscriptions_per_node, total_messages))
log.Println(fmt.Sprintf("Will use a subscriber prefix of: %s<channel id>", *subscribe_prefix))

if *poolSizePtr == 0 {
Expand Down Expand Up @@ -306,60 +306,59 @@ func main() {
}
totalCreatedClients := 0
if strings.Compare(*subscribers_placement, "dense") == 0 {
for channel_id := *channel_minimum; channel_id <= *channel_maximum; channel_id++ {
channel := fmt.Sprintf("%s%d", *subscribe_prefix, channel_id)
for channel_subscriber_number := 1; channel_subscriber_number <= *subscribers_per_channel; channel_subscriber_number++ {
channels := []string{channel}
n_channels_this_conn := 1
if *max_channels_per_subscriber == *min_channels_per_subscriber {
n_channels_this_conn = *max_channels_per_subscriber
} else {
n_channels_this_conn = rand.Intn(*max_channels_per_subscriber-*min_channels_per_subscriber) + *min_channels_per_subscriber
for client_id := 1; client_id <= *clients; client_id++ {
channels := []string{}
n_channels_this_conn := 0
if *max_channels_per_subscriber == *min_channels_per_subscriber {
n_channels_this_conn = *max_channels_per_subscriber
} else {
n_channels_this_conn = rand.Intn(*max_channels_per_subscriber-*min_channels_per_subscriber) + *min_channels_per_subscriber
}
for channel_this_conn := 1; channel_this_conn <= n_channels_this_conn; channel_this_conn++ {
new_channel_id := rand.Intn(*channel_maximum) + *channel_minimum
new_channel := fmt.Sprintf("%s%d", *subscribe_prefix, new_channel_id)
channels = append(channels, new_channel)
}
totalCreatedClients++
subscriberName := fmt.Sprintf("subscriber#%d", client_id)
var client *redis.Client
var err error = nil
ctx = context.Background()
// In case of SSUBSCRIBE the node is associated the to the channel name
if strings.Compare(*mode, "ssubscribe") == 0 && *distributeSubscribers == true {
firstChannel := channels[0]
client, err = clusterClient.MasterForKey(ctx, firstChannel)
if err != nil {
log.Fatal(err)
}
for channel_this_conn := 1; channel_this_conn < n_channels_this_conn; channel_this_conn++ {
new_channel_id := rand.Intn(*channel_maximum) + *channel_minimum
new_channel := fmt.Sprintf("%s%d", *subscribe_prefix, new_channel_id)
channels = append(channels, new_channel)
if *verbose {
log.Println(fmt.Sprintf("client %d is a CLUSTER client connected to %v. Subscriber name %s", totalCreatedClients, client.String(), subscriberName))
}
totalCreatedClients++
subscriberName := fmt.Sprintf("subscriber#%d-%s%d", channel_subscriber_number, *subscribe_prefix, channel_id)
var client *redis.Client
var err error = nil
ctx = context.Background()
// In case of SSUBSCRIBE the node is associated the to the channel name
if strings.Compare(*mode, "ssubscribe") == 0 && *distributeSubscribers == true {
client, err = clusterClient.MasterForKey(ctx, channel)
if err != nil {
log.Fatal(err)
}
if *verbose {
log.Println(fmt.Sprintf("client %d is a CLUSTER client connected to %v. Subscriber name %s", totalCreatedClients, client.String(), subscriberName))
}
} else {
nodes_pos := channel_id % nodeCount
addr := nodesAddresses[nodes_pos]
client = nodeClients[nodes_pos]
if *verbose {
log.Println(fmt.Sprintf("client %d is a STANDALONE client connected to node %d (address %s). Subscriber name %s", totalCreatedClients, nodes_pos, addr, subscriberName))
}
err = client.Ping(ctx).Err()
if err != nil {
log.Fatal(err)
}
}
wg.Add(1)
connectionReconnectInterval := 0
if *max_reconnect_interval == *min_reconnect_interval {
connectionReconnectInterval = *max_reconnect_interval
} else {
connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *min_reconnect_interval
} else {
nodes_pos := client_id % nodeCount
addr := nodesAddresses[nodes_pos]
client = nodeClients[nodes_pos]
if *verbose {
log.Println(fmt.Sprintf("client %d is a STANDALONE client connected to node %d (address %s). Subscriber name %s", totalCreatedClients, nodes_pos, addr, subscriberName))
}
if connectionReconnectInterval > 0 {
log.Println(fmt.Sprintf("Using reconnection interval of %d milliseconds for subscriber: %s", connectionReconnectInterval, subscriberName))
err = client.Ping(ctx).Err()
if err != nil {
log.Fatal(err)
}
log.Println(fmt.Sprintf("subscriber: %s. Total channels %d: %v", subscriberName, len(channels), channels))
go subscriberRoutine(subscriberName, *mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client)
}
wg.Add(1)
connectionReconnectInterval := 0
if *max_reconnect_interval == *min_reconnect_interval {
connectionReconnectInterval = *max_reconnect_interval
} else {
connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *min_reconnect_interval
}
if connectionReconnectInterval > 0 {
log.Println(fmt.Sprintf("Using reconnection interval of %d milliseconds for subscriber: %s", connectionReconnectInterval, subscriberName))
}
log.Println(fmt.Sprintf("subscriber: %s. Total channels %d: %v", subscriberName, len(channels), channels))
go subscriberRoutine(subscriberName, *mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client)
// }
}
}

Expand Down
Loading