From 4d68790d6592c4d11a263daa7cc8598fdd008e91 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Wed, 24 May 2023 23:34:16 +0100 Subject: [PATCH 1/3] Enabled pprof cpuprofile collection --- .gitignore | 6 +++++- subscriber.go | 14 ++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 07a1b6e..f0b61ce 100644 --- a/.gitignore +++ b/.gitignore @@ -90,4 +90,8 @@ Thumbs.db # Coverage Results # #################### -coverage.txt \ No newline at end of file +coverage.txt + +# Profiler Results # +#################### +*.pprof diff --git a/subscriber.go b/subscriber.go index cbbc099..00d28ff 100644 --- a/subscriber.go +++ b/subscriber.go @@ -12,6 +12,7 @@ import ( "log" "os" "os/signal" + "runtime/pprof" "strings" "sync" "sync/atomic" @@ -67,6 +68,7 @@ func subscriberRoutine(addr string, mode, subscriberName string, channel string, _, _, ps, _ := bootstrapPubSub(addr, subscriberName, channel, opts) defer ps.Close() for { + ps. msg, err := ps.Next(ctx) if errors.Is(err, context.Canceled) { break @@ -111,6 +113,7 @@ func bootstrapPubSub(addr string, subscriberName string, channel string, opts ra func main() { host := flag.String("host", "127.0.0.1", "redis host.") port := flag.String("port", "6379", "redis port.") + cpuprofile := flag.String("cpuprofile", "", "write cpu profile to file") password := flag.String("a", "", "Password for Redis Auth.") mode := flag.String("mode", "subscribe", "Subscribe mode. Either 'subscribe' or 'ssubscribe'.") username := flag.String("user", "", "Used to send ACL style 'AUTH username pass'. Needs -a.") @@ -186,6 +189,13 @@ func main() { total_subscriptions := total_channels * *subscribers_per_channel total_messages := int64(total_subscriptions) * *messages_per_channel_subscriber fmt.Println(fmt.Sprintf("Total subcriptions: %d. Subscriptions per node %d. Total messages: %d", total_subscriptions, subscriptions_per_node, total_messages)) + if *cpuprofile != "" { + f, err := os.Create(*cpuprofile) + if err != nil { + log.Fatal(err) + } + pprof.StartCPUProfile(f) + } if strings.Compare(*subscribers_placement, "dense") == 0 { for channel_id := *channel_minimum; channel_id <= *channel_maximum; channel_id++ { @@ -210,6 +220,10 @@ func main() { closed, start_time, duration, totalMessages, messageRateTs := updateCLI(tick, c, total_messages, w, *test_time) messageRate := float64(totalMessages) / float64(duration.Seconds()) + if *cpuprofile != "" { + pprof.StopCPUProfile() + } + fmt.Fprint(w, fmt.Sprintf("#################################################\nTotal Duration %f Seconds\nMessage Rate %f\n#################################################\n", duration.Seconds(), messageRate)) fmt.Fprint(w, "\r\n") w.Flush() From b36213cdc08cb02458224e949734d4b2c0c91136 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 25 May 2023 00:15:15 +0100 Subject: [PATCH 2/3] Avoid context locking on pubsub due to PingInterval (disabled it) --- subscriber.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/subscriber.go b/subscriber.go index 00d28ff..7e0ebf1 100644 --- a/subscriber.go +++ b/subscriber.go @@ -68,7 +68,6 @@ func subscriberRoutine(addr string, mode, subscriberName string, channel string, _, _, ps, _ := bootstrapPubSub(addr, subscriberName, channel, opts) defer ps.Close() for { - ps. msg, err := ps.Next(ctx) if errors.Is(err, context.Canceled) { break @@ -100,7 +99,7 @@ func bootstrapPubSub(addr string, subscriberName string, channel string, opts ra } // Pass that connection into PubSub, conn should never get used after this - ps := radix.PubSubConfig{}.New(conn) + ps := radix.PubSubConfig{PingInterval: -1}.New(conn) err = ps.Subscribe(ctx, channel) if err != nil { From 26a6e829a9d7bd1d38944d053f2b0f4a70b69fdc Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 25 May 2023 00:37:28 +0100 Subject: [PATCH 3/3] Using go-redis for SUBSCRIBE flow --- subscriber.go | 51 +++++++++++---------------------------------------- 1 file changed, 11 insertions(+), 40 deletions(-) diff --git a/subscriber.go b/subscriber.go index 7e0ebf1..de8721d 100644 --- a/subscriber.go +++ b/subscriber.go @@ -3,7 +3,6 @@ package main import ( "context" "encoding/json" - "errors" "flag" "fmt" radix "github.com/mediocregopher/radix/v4" @@ -41,14 +40,14 @@ type testResult struct { func subscriberRoutine(addr string, mode, subscriberName string, channel string, printMessages bool, ctx context.Context, wg *sync.WaitGroup, opts radix.Dialer, protocolVersion int) { // tell the caller we've stopped defer wg.Done() + client := redis.NewClient(&redis.Options{ + Addr: addr, + Password: opts.AuthPass, + ClientName: subscriberName, + ProtocolVersion: protocolVersion, + }) switch mode { case "ssubscribe": - client := redis.NewClient(&redis.Options{ - Addr: addr, - Password: opts.AuthPass, - ClientName: subscriberName, - ProtocolVersion: protocolVersion, - }) spubsub := client.SSubscribe(ctx, channel) defer spubsub.Close() for { @@ -65,48 +64,20 @@ func subscriberRoutine(addr string, mode, subscriberName string, channel string, case "subscribe": fallthrough default: - _, _, ps, _ := bootstrapPubSub(addr, subscriberName, channel, opts) - defer ps.Close() + pubsub := client.Subscribe(ctx, channel) + defer pubsub.Close() for { - msg, err := ps.Next(ctx) - if errors.Is(err, context.Canceled) { - break - } else if err != nil { + msg, err := pubsub.ReceiveMessage(ctx) + if err != nil { panic(err) } if printMessages { - fmt.Println(fmt.Sprintf("received message in channel %s. Message: %s", msg.Channel, msg.Message)) + fmt.Println(fmt.Sprintf("received message in channel %s. Message: %s", msg.Channel, msg.Payload)) } atomic.AddUint64(&totalMessages, 1) } - - } - -} - -func bootstrapPubSub(addr string, subscriberName string, channel string, opts radix.Dialer) (radix.Conn, error, radix.PubSubConn, *time.Ticker) { - // Create a normal redis connection - ctx := context.Background() - conn, err := opts.Dial(ctx, "tcp", addr) - - if err != nil { - log.Fatal(err) - } - - err = conn.Do(ctx, radix.FlatCmd(nil, "CLIENT", "SETNAME", subscriberName)) - if err != nil { - log.Fatal(err) - } - - // Pass that connection into PubSub, conn should never get used after this - ps := radix.PubSubConfig{PingInterval: -1}.New(conn) - - err = ps.Subscribe(ctx, channel) - if err != nil { - log.Fatal(err) } - return conn, err, ps, nil } func main() {