diff --git a/.gitignore b/.gitignore index 07a1b6e..f0b61ce 100644 --- a/.gitignore +++ b/.gitignore @@ -90,4 +90,8 @@ Thumbs.db # Coverage Results # #################### -coverage.txt \ No newline at end of file +coverage.txt + +# Profiler Results # +#################### +*.pprof diff --git a/subscriber.go b/subscriber.go index cbbc099..de8721d 100644 --- a/subscriber.go +++ b/subscriber.go @@ -3,7 +3,6 @@ package main import ( "context" "encoding/json" - "errors" "flag" "fmt" radix "github.com/mediocregopher/radix/v4" @@ -12,6 +11,7 @@ import ( "log" "os" "os/signal" + "runtime/pprof" "strings" "sync" "sync/atomic" @@ -40,14 +40,14 @@ type testResult struct { func subscriberRoutine(addr string, mode, subscriberName string, channel string, printMessages bool, ctx context.Context, wg *sync.WaitGroup, opts radix.Dialer, protocolVersion int) { // tell the caller we've stopped defer wg.Done() + client := redis.NewClient(&redis.Options{ + Addr: addr, + Password: opts.AuthPass, + ClientName: subscriberName, + ProtocolVersion: protocolVersion, + }) switch mode { case "ssubscribe": - client := redis.NewClient(&redis.Options{ - Addr: addr, - Password: opts.AuthPass, - ClientName: subscriberName, - ProtocolVersion: protocolVersion, - }) spubsub := client.SSubscribe(ctx, channel) defer spubsub.Close() for { @@ -64,53 +64,26 @@ func subscriberRoutine(addr string, mode, subscriberName string, channel string, case "subscribe": fallthrough default: - _, _, ps, _ := bootstrapPubSub(addr, subscriberName, channel, opts) - defer ps.Close() + pubsub := client.Subscribe(ctx, channel) + defer pubsub.Close() for { - msg, err := ps.Next(ctx) - if errors.Is(err, context.Canceled) { - break - } else if err != nil { + msg, err := pubsub.ReceiveMessage(ctx) + if err != nil { panic(err) } if printMessages { - fmt.Println(fmt.Sprintf("received message in channel %s. Message: %s", msg.Channel, msg.Message)) + fmt.Println(fmt.Sprintf("received message in channel %s. Message: %s", msg.Channel, msg.Payload)) } atomic.AddUint64(&totalMessages, 1) } - - } - -} - -func bootstrapPubSub(addr string, subscriberName string, channel string, opts radix.Dialer) (radix.Conn, error, radix.PubSubConn, *time.Ticker) { - // Create a normal redis connection - ctx := context.Background() - conn, err := opts.Dial(ctx, "tcp", addr) - - if err != nil { - log.Fatal(err) - } - - err = conn.Do(ctx, radix.FlatCmd(nil, "CLIENT", "SETNAME", subscriberName)) - if err != nil { - log.Fatal(err) - } - - // Pass that connection into PubSub, conn should never get used after this - ps := radix.PubSubConfig{}.New(conn) - - err = ps.Subscribe(ctx, channel) - if err != nil { - log.Fatal(err) } - return conn, err, ps, nil } func main() { host := flag.String("host", "127.0.0.1", "redis host.") port := flag.String("port", "6379", "redis port.") + cpuprofile := flag.String("cpuprofile", "", "write cpu profile to file") password := flag.String("a", "", "Password for Redis Auth.") mode := flag.String("mode", "subscribe", "Subscribe mode. Either 'subscribe' or 'ssubscribe'.") username := flag.String("user", "", "Used to send ACL style 'AUTH username pass'. Needs -a.") @@ -186,6 +159,13 @@ func main() { total_subscriptions := total_channels * *subscribers_per_channel total_messages := int64(total_subscriptions) * *messages_per_channel_subscriber fmt.Println(fmt.Sprintf("Total subcriptions: %d. Subscriptions per node %d. Total messages: %d", total_subscriptions, subscriptions_per_node, total_messages)) + if *cpuprofile != "" { + f, err := os.Create(*cpuprofile) + if err != nil { + log.Fatal(err) + } + pprof.StartCPUProfile(f) + } if strings.Compare(*subscribers_placement, "dense") == 0 { for channel_id := *channel_minimum; channel_id <= *channel_maximum; channel_id++ { @@ -210,6 +190,10 @@ func main() { closed, start_time, duration, totalMessages, messageRateTs := updateCLI(tick, c, total_messages, w, *test_time) messageRate := float64(totalMessages) / float64(duration.Seconds()) + if *cpuprofile != "" { + pprof.StopCPUProfile() + } + fmt.Fprint(w, fmt.Sprintf("#################################################\nTotal Duration %f Seconds\nMessage Rate %f\n#################################################\n", duration.Seconds(), messageRate)) fmt.Fprint(w, "\r\n") w.Flush()