From 43425f81583991e00de103039bee80e433e24c4e Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Mon, 17 May 2021 10:53:06 +0100 Subject: [PATCH] [add] Removed all locking on stats updates --- .github/workflows/publish.yml | 4 +-- Makefile | 9 +++++-- README.md | 11 +++++---- redis-bechmark-go.go | 46 +++++++++++++++++++++++------------ 4 files changed, 45 insertions(+), 25 deletions(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 4191c57..e0676c4 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -19,8 +19,8 @@ jobs: uses: actions/setup-go@v2 with: go-version: ${{ matrix.go-version }} - - name: Make all - run: make all + - name: Make Release + run: make release - name: Upload release binaries uses: alexellis/upload-assets@0.3.0 env: diff --git a/Makefile b/Makefile index 554913c..a795aa4 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,7 @@ # Go parameters GOCMD=GO111MODULE=on go GOBUILD=$(GOCMD) build +GOBUILDRACE=$(GOCMD) build -race GOINSTALL=$(GOCMD) install GOCLEAN=$(GOCMD) clean GOTEST=$(GOCMD) test @@ -8,7 +9,7 @@ GOGET=$(GOCMD) get GOMOD=$(GOCMD) mod GOFMT=$(GOCMD) fmt DISTDIR = ./dist -OS_ARCHs = "linux/amd64 linux/arm64 windows/amd64 darwin/amd64 darwin/arm64" +OS_ARCHs = "linux/amd64 linux/arm64 linux/arm windows/amd64 darwin/amd64 darwin/arm64" # Build-time GIT variables ifeq ($(GIT_SHA),) @@ -20,12 +21,16 @@ GIT_DIRTY:=$(shell git diff --no-ext-diff 2> /dev/null | wc -l) endif .PHONY: all test coverage -all: test build release +all: test build build: $(GOBUILD) \ -ldflags="-X 'main.GitSHA1=$(GIT_SHA)' -X 'main.GitDirty=$(GIT_DIRTY)'" . +build-race: + $(GOBUILDRACE) \ + -ldflags="-X 'main.GitSHA1=$(GIT_SHA)' -X 'main.GitDirty=$(GIT_DIRTY)'" . + checkfmt: @echo 'Checking gofmt';\ bash -c "diff -u <(echo -n) <(gofmt -d .)";\ diff --git a/README.md b/README.md index 708bfd7..c62728c 100644 --- a/README.md +++ b/README.md @@ -11,11 +11,12 @@ If you don't have go on your machine and just want to use the produced binaries | OS | Arch | Link | | :--- | :---: | ---: | -| Windows | amd64 | [redis-benchmark-go_windows_amd64.exe](https://s3.amazonaws.com/benchmarks.redislabs/tools/redis-benchmark-go/redis-benchmark-go_windows_amd64.exe) | -| Linux | amd64 | [redis-benchmark-go_linux_amd64](https://s3.amazonaws.com/benchmarks.redislabs/tools/redis-benchmark-go/redis-benchmark-go_linux_amd64) | -| Linux | arm64 | [redis-benchmark-go_linux_arm64](https://s3.amazonaws.com/benchmarks.redislabs/tools/redis-benchmark-go/redis-benchmark-go_linux_arm64) | -| Darwin | amd64 | [redis-benchmark-go_darwin_amd64](https://s3.amazonaws.com/benchmarks.redislabs/tools/redis-benchmark-go/redis-benchmark-go_darwin_amd64) | -| Darwin | arm64 | [redis-benchmark-go_darwin_arm64](https://s3.amazonaws.com/benchmarks.redislabs/tools/redis-benchmark-go/redis-benchmark-go_darwin_arm64) | +| Windows | amd64 (64-bit X86) | [redis-benchmark-go_windows_amd64.exe](https://s3.amazonaws.com/benchmarks.redislabs/tools/redis-benchmark-go/redis-benchmark-go_windows_amd64.exe) | +| Linux | amd64 (64-bit X86) | [redis-benchmark-go_linux_amd64](https://s3.amazonaws.com/benchmarks.redislabs/tools/redis-benchmark-go/redis-benchmark-go_linux_amd64) | +| Linux | arm64 (64-bit ARM) | [redis-benchmark-go_linux_arm64](https://s3.amazonaws.com/benchmarks.redislabs/tools/redis-benchmark-go/redis-benchmark-go_linux_arm64) | +| Linux | arm (32-bit ARM) | [redis-benchmark-go_linux_arm](https://s3.amazonaws.com/benchmarks.redislabs/tools/redis-benchmark-go/redis-benchmark-go_linux_arm) | +| Darwin | amd64 (64-bit X86) | [redis-benchmark-go_darwin_amd64](https://s3.amazonaws.com/benchmarks.redislabs/tools/redis-benchmark-go/redis-benchmark-go_darwin_amd64) | +| Darwin | arm64 (64-bit ARM) | [redis-benchmark-go_darwin_arm64](https://s3.amazonaws.com/benchmarks.redislabs/tools/redis-benchmark-go/redis-benchmark-go_darwin_arm64) | diff --git a/redis-bechmark-go.go b/redis-bechmark-go.go index 2553c65..b476fef 100644 --- a/redis-bechmark-go.go +++ b/redis-bechmark-go.go @@ -12,7 +12,6 @@ import ( "os" "os/signal" "sync" - "sync/atomic" "time" ) @@ -23,6 +22,11 @@ var latencies *hdrhistogram.Histogram const Inf = rate.Limit(math.MaxFloat64) const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" +type datapoint struct { + success bool + duration_ms int64 +} + func stringWithCharset(length int, charset string) string { b := make([]byte, length) @@ -32,11 +36,11 @@ func stringWithCharset(length int, charset string) string { return string(b) } -func ingestionRoutine(conn radix.Client, enableMultiExec, continueOnError bool, cmdS []string, keyspacelen, datasize, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, keyplace, dataplace int, useLimiter bool, rateLimiter *rate.Limiter) { +func ingestionRoutine(conn radix.Client, enableMultiExec bool, datapointsChan chan datapoint, continueOnError bool, cmdS []string, keyspacelen, datasize, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, keyplace, dataplace int, useLimiter bool, rateLimiter *rate.Limiter) { defer wg.Done() for i := 0; uint64(i) < number_samples || loop; i++ { rawCurrentCmd, _, _ := keyBuildLogic(keyplace, dataplace, datasize, keyspacelen, cmdS) - sendCmdLogic(conn, rawCurrentCmd, enableMultiExec, continueOnError, debug_level, useLimiter, rateLimiter) + sendCmdLogic(conn, rawCurrentCmd, enableMultiExec, datapointsChan, continueOnError, debug_level, useLimiter, rateLimiter) } } @@ -53,7 +57,7 @@ func keyBuildLogic(keyPos int, dataPos int, datasize, keyspacelen uint64, cmdS [ return rawCmd, key, radix.ClusterSlot([]byte(newCmdS[1])) } -func sendCmdLogic(conn radix.Client, cmd radix.CmdAction, enableMultiExec bool, continueOnError bool, debug_level int, useRateLimiter bool, rateLimiter *rate.Limiter) { +func sendCmdLogic(conn radix.Client, cmd radix.CmdAction, enableMultiExec bool, datapointsChan chan datapoint, continueOnError bool, debug_level int, useRateLimiter bool, rateLimiter *rate.Limiter) { if useRateLimiter { r := rateLimiter.ReserveN(time.Now(), int(1)) time.Sleep(r.Delay()) @@ -98,7 +102,6 @@ func sendCmdLogic(conn radix.Client, cmd radix.CmdAction, enableMultiExec bool, endT := time.Now() if err != nil { if continueOnError { - atomic.AddUint64(&totalErrors, uint64(1)) if debug_level > 0 { log.Println(fmt.Sprintf("Received an error with the following command(s): %v, error: %v", cmd, err)) } @@ -107,11 +110,7 @@ func sendCmdLogic(conn radix.Client, cmd radix.CmdAction, enableMultiExec bool, } } duration := endT.Sub(startT) - err = latencies.RecordValue(duration.Microseconds()) - if err != nil { - log.Fatalf("Received an error while recording latencies: %v", err) - } - atomic.AddUint64(&totalCommands, uint64(1)) + datapointsChan <- datapoint{!(err != nil), duration.Microseconds()} } func main() { @@ -190,17 +189,18 @@ func main() { } else { standalone = getStandaloneConn(connectionStr, opts, *clients) } + datapointsChan := make(chan datapoint, *numberRequests) for channel_id := 1; uint64(channel_id) <= *clients; channel_id++ { wg.Add(1) cmd := make([]string, len(args)) copy(cmd, args) if *clusterMode { - go ingestionRoutine(cluster, *multi, true, cmd, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, keyPlaceOlderPos, dataPlaceOlderPos, useRateLimiter, rateLimiter) + go ingestionRoutine(cluster, *multi, datapointsChan, true, cmd, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, keyPlaceOlderPos, dataPlaceOlderPos, useRateLimiter, rateLimiter) } else { if *multi { - go ingestionRoutine(getStandaloneConn(connectionStr, opts, 1), *multi, true, cmd, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, keyPlaceOlderPos, dataPlaceOlderPos, useRateLimiter, rateLimiter) + go ingestionRoutine(getStandaloneConn(connectionStr, opts, 1), *multi, datapointsChan, true, cmd, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, keyPlaceOlderPos, dataPlaceOlderPos, useRateLimiter, rateLimiter) } else { - go ingestionRoutine(standalone, *multi, true, cmd, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, keyPlaceOlderPos, dataPlaceOlderPos, useRateLimiter, rateLimiter) + go ingestionRoutine(standalone, *multi, datapointsChan, true, cmd, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, keyPlaceOlderPos, dataPlaceOlderPos, useRateLimiter, rateLimiter) } } } @@ -210,7 +210,7 @@ func main() { signal.Notify(c, os.Interrupt) tick := time.NewTicker(time.Duration(client_update_tick) * time.Second) - closed, _, duration, totalMessages, _ := updateCLI(tick, c, *numberRequests, *loop) + closed, _, duration, totalMessages, _ := updateCLI(tick, c, *numberRequests, *loop, datapointsChan) messageRate := float64(totalMessages) / float64(duration.Seconds()) p50IngestionMs := float64(latencies.ValueAtQuantile(50.0)) / 1000.0 p95IngestionMs := float64(latencies.ValueAtQuantile(95.0)) / 1000.0 @@ -235,17 +235,31 @@ func main() { wg.Wait() } -func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit uint64, loop bool) (bool, time.Time, time.Duration, uint64, []float64) { - +func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit uint64, loop bool, datapointsChan chan datapoint) (bool, time.Time, time.Duration, uint64, []float64) { + var currentErr uint64 = 0 + var currentCount uint64 = 0 start := time.Now() prevTime := time.Now() prevMessageCount := uint64(0) messageRateTs := []float64{} + var dp datapoint fmt.Printf("%26s %7s %25s %25s %7s %25s %25s\n", "Test time", " ", "Total Commands", "Total Errors", "", "Command Rate", "p50 lat. (msec)") for { select { + case dp = <-datapointsChan: + { + latencies.RecordValue(dp.duration_ms) + if !dp.success { + currentErr++ + } + currentCount++ + } case <-tick.C: { + totalCommands += currentCount + totalErrors += currentErr + currentErr = 0 + currentCount = 0 now := time.Now() took := now.Sub(prevTime) messageRate := float64(totalCommands-prevMessageCount) / float64(took.Seconds())