Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 35 additions & 17 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
)

var totalMessages uint64
var clusterSlicesMu sync.Mutex

type testResult struct {
StartTime int64 `json:"StartTime"`
Expand Down Expand Up @@ -126,9 +127,9 @@ func main() {

ctx := context.Background()
nodeCount := 0
poolSize := *poolSizePtr
var nodesAddresses []string
var nodeClients []*redis.Client
poolSize := *poolSizePtr
var clusterClient *redis.ClusterClient
var standaloneClient *redis.Client

Expand Down Expand Up @@ -161,22 +162,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
log.Println("Getting cluster slots info.")
slots, err := clusterClient.ClusterSlots(ctx).Result()
if err != nil {
log.Fatal(err)
}
nodeCount = len(slots)
log.Println(fmt.Sprintf("Detailing cluster slots info. Total nodes: %d", nodeCount))
for _, slotInfo := range slots {
log.Println(fmt.Sprintf("\tSlot range start %d end %d. Nodes: %v", slotInfo.Start, slotInfo.End, slotInfo.Nodes))
nodesAddresses = append(nodesAddresses, slotInfo.Nodes[0].Addr)
}
fn := func(ctx context.Context, client *redis.Client) (err error) {
nodeClients = append(nodeClients, client)
return
}
clusterClient.ForEachMaster(ctx, fn)
nodeCount, nodeClients, nodesAddresses = updateSecondarySlicesCluster(clusterClient, ctx)
} else {
nodeCount = 1
nodesAddresses = append(nodesAddresses, fmt.Sprintf("%s:%s", *host, *port))
Expand Down Expand Up @@ -214,6 +200,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
nodeCount, nodeClients, nodesAddresses = updateSecondarySlicesCluster(clusterClient, ctx)
}
// trap Ctrl+C and call cancel on the context
// We Use this instead of the previous stopChannel + chan radix.PubSubMessage
Expand Down Expand Up @@ -334,6 +321,37 @@ func main() {
wg.Wait()
}

func updateSecondarySlicesCluster(clusterClient *redis.ClusterClient, ctx context.Context) (int, []*redis.Client, []string) {
var nodeCount = 0
var nodesAddresses []string
var nodeClients []*redis.Client
log.Println("Getting cluster slots info.")
slots, err := clusterClient.ClusterSlots(ctx).Result()
if err != nil {
log.Fatal(err)
}

log.Println(fmt.Sprintf("Detailing cluster slots info. Total slot groups: %d", len(slots)))
for _, slotInfo := range slots {
log.Println(fmt.Sprintf("\tSlot range start %d end %d. Nodes: %v", slotInfo.Start, slotInfo.End, slotInfo.Nodes))
}
log.Println(fmt.Sprintf("Detailing cluster node info"))
fn := func(ctx context.Context, client *redis.Client) (err error) {
clusterSlicesMu.Lock()
nodeClients = append(nodeClients, client)
addr := client.Conn().String()
addrS := strings.Split(addr, ":")
finalAddr := fmt.Sprintf("%s:%s", addrS[0][len(" Redis<")-1:], addrS[1][:len(addrS[1])-3])
log.Println(fmt.Sprintf("Cluster node pos #%d. Address: %s.", len(nodeClients), finalAddr))
nodesAddresses = append(nodesAddresses, finalAddr)
clusterSlicesMu.Unlock()
return
}
clusterClient.ForEachMaster(ctx, fn)
nodeCount = len(slots)
return nodeCount, nodeClients, nodesAddresses
}

func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabwriter.Writer, test_time int) (bool, time.Time, time.Duration, uint64, []float64) {

start := time.Now()
Expand Down