Skip to content

Commit 28bf0fa

Browse files
authored
Fixed shuffle sharding consistency when zone-awareness is enabled and the shard size is increased or instances in a new zone are added (#3299)
* Fixed shuffle sharding consistency when zone-awareness is enabled and the shard size is increased or instances in a new zone are added Signed-off-by: Marco Pracucci <[email protected]> * Slightly more permissive deviance Signed-off-by: Marco Pracucci <[email protected]> * Cleaned up ring unit tests Signed-off-by: Marco Pracucci <[email protected]> * Use YoloBuf to reduce allocations in ShuffleShardSeed() Signed-off-by: Marco Pracucci <[email protected]>
1 parent cd52d6d commit 28bf0fa

File tree

10 files changed

+196
-23
lines changed

10 files changed

+196
-23
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
* [BUGFIX] Experimental Alertmanager API: Do not allow empty Alertmanager configurations or bad template filenames to be submitted through the configuration API. #3185
8585
* [BUGFIX] Reduce failures to update heartbeat when using Consul. #3259
8686
* [BUGFIX] When using ruler sharding, moving all user rule groups from ruler to a different one and then back could end up with some user groups not being evaluated at all. #3235
87+
* [BUGFIX] Fixed shuffle sharding consistency when zone-awareness is enabled and the shard size is increased or instances in a new zone are added. #3299
8788
* [BUGFIX] Use a valid grpc header when logging IP addresses. #3307
8889
* [BUGFIX] Fixed the metric `cortex_prometheus_rule_group_duration_seconds` in the Ruler, it wouldn't report any values. #3310
8990
* [BUGFIX] Fixed gRPC connections leaking in rulers when rulers sharding is enabled and APIs called. #3314

pkg/ingester/active_series.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ import (
55
"math"
66
"sync"
77
"time"
8-
"unsafe"
98

109
"github.com/cespare/xxhash"
1110
"github.com/prometheus/common/model"
1211
"github.com/prometheus/prometheus/pkg/labels"
1312
"go.uber.org/atomic"
13+
14+
"github.com/cortexproject/cortex/pkg/util"
1415
)
1516

1617
const (
@@ -69,19 +70,15 @@ func fingerprint(series labels.Labels) uint64 {
6970

7071
sum.Reset()
7172
for _, label := range series {
72-
_, _ = sum.Write(yoloBuf(label.Name))
73+
_, _ = sum.Write(util.YoloBuf(label.Name))
7374
_, _ = sum.Write(sep)
74-
_, _ = sum.Write(yoloBuf(label.Value))
75+
_, _ = sum.Write(util.YoloBuf(label.Value))
7576
_, _ = sum.Write(sep)
7677
}
7778

7879
return sum.Sum64()
7980
}
8081

81-
func yoloBuf(s string) []byte {
82-
return *((*[]byte)(unsafe.Pointer(&s)))
83-
}
84-
8582
// Purge removes expired entries from the cache. This function should be called
8683
// periodically to avoid memory leaks.
8784
func (c *ActiveSeries) Purge(keepUntil time.Time) {

pkg/ingester/active_series_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -211,9 +211,3 @@ func benchmarkPurge(b *testing.B, twice bool) {
211211
}
212212
}
213213
}
214-
215-
func TestYoloBuf(t *testing.T) {
216-
s := yoloBuf("hello world")
217-
218-
require.Equal(t, []byte("hello world"), s)
219-
}

pkg/querier/frontend/frontend_querier_queues.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) chan *request {
8989
if uq == nil {
9090
uq = &userQueue{
9191
ch: make(chan *request, q.maxUserQueueSize),
92-
seed: util.ShuffleShardSeed(userID),
92+
seed: util.ShuffleShardSeed(userID, ""),
9393
index: -1,
9494
}
9595
q.userQueues[userID] = uq

pkg/ring/ring.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -471,9 +471,6 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
471471
return cached
472472
}
473473

474-
// Initialise the random generator used to select instances in the ring.
475-
random := rand.New(rand.NewSource(util.ShuffleShardSeed(identifier)))
476-
477474
var result *Ring
478475

479476
// This deferred function will store newly computed ring into cache.
@@ -515,6 +512,12 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
515512
tokens = r.ringTokens
516513
}
517514

515+
// Initialise the random generator used to select instances in the ring.
516+
// Since we consider each zone like an independent ring, we have to use dedicated
517+
// pseudo-random generator for each zone, in order to guarantee the "consistency"
518+
// property when the shard size changes or a new zone is added.
519+
random := rand.New(rand.NewSource(util.ShuffleShardSeed(identifier, zone)))
520+
518521
// To select one more instance while guaranteeing the "consistency" property,
519522
// we do pick a random value from the generator and resolve uniqueness collisions
520523
// (if any) continuing walking the ring.

pkg/ring/ring_test.go

Lines changed: 153 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ func TestRing_ShuffleShard(t *testing.T) {
333333

334334
for testName, testData := range tests {
335335
t.Run(testName, func(t *testing.T) {
336-
// Init the ring description.
336+
// Init the ring.
337337
ringDesc := &Desc{Ingesters: testData.ringInstances}
338338
for id, instance := range ringDesc.Ingesters {
339339
instance.Timestamp = time.Now().Unix()
@@ -512,8 +512,8 @@ func TestRing_ShuffleShard_Shuffling(t *testing.T) {
512512
maxCombinations := int(math.Pow(float64(numTenants), 2)) - numTenants
513513
for numMatching, probability := range theoreticalMatchings {
514514
// We allow a max deviance of 10% compared to the theoretical probability,
515-
// clamping it between 1% and 0.1% boundaries.
516-
maxDeviance := math.Min(1, math.Max(0.1, probability*0.1))
515+
// clamping it between 1% and 0.2% boundaries.
516+
maxDeviance := math.Min(1, math.Max(0.2, probability*0.1))
517517

518518
actual := (float64(distribution[numMatching]) / float64(maxCombinations)) * 100
519519
assert.InDelta(t, probability, actual, maxDeviance, "numMatching: %d", numMatching)
@@ -609,6 +609,156 @@ func TestRing_ShuffleShard_Consistency(t *testing.T) {
609609
}
610610
}
611611

612+
func TestRing_ShuffleShard_ConsistencyOnShardSizeChanged(t *testing.T) {
613+
// Create 30 instances in 3 zones.
614+
ringInstances := map[string]IngesterDesc{}
615+
for i := 0; i < 30; i++ {
616+
name, desc := generateRingInstance(i, i%3)
617+
ringInstances[name] = desc
618+
}
619+
620+
// Init the ring.
621+
ringDesc := &Desc{Ingesters: ringInstances}
622+
ring := Ring{
623+
cfg: Config{
624+
HeartbeatTimeout: time.Hour,
625+
ZoneAwarenessEnabled: true,
626+
},
627+
ringDesc: ringDesc,
628+
ringTokens: ringDesc.getTokens(),
629+
ringTokensByZone: ringDesc.getTokensByZone(),
630+
ringZones: getZones(ringDesc.getTokensByZone()),
631+
strategy: &DefaultReplicationStrategy{},
632+
}
633+
634+
// Get the replication set with shard size = 3.
635+
firstShard := ring.ShuffleShard("tenant-id", 3)
636+
assert.Equal(t, 3, firstShard.IngesterCount())
637+
638+
firstSet, err := firstShard.GetAll(Read)
639+
require.NoError(t, err)
640+
641+
// Increase shard size to 6.
642+
secondShard := ring.ShuffleShard("tenant-id", 6)
643+
assert.Equal(t, 6, secondShard.IngesterCount())
644+
645+
secondSet, err := secondShard.GetAll(Read)
646+
require.NoError(t, err)
647+
648+
for _, firstInstance := range firstSet.Ingesters {
649+
assert.True(t, secondSet.Includes(firstInstance.Addr), "new replication set is expected to include previous instance %s", firstInstance.Addr)
650+
}
651+
652+
// Increase shard size to 9.
653+
thirdShard := ring.ShuffleShard("tenant-id", 9)
654+
assert.Equal(t, 9, thirdShard.IngesterCount())
655+
656+
thirdSet, err := thirdShard.GetAll(Read)
657+
require.NoError(t, err)
658+
659+
for _, secondInstance := range secondSet.Ingesters {
660+
assert.True(t, thirdSet.Includes(secondInstance.Addr), "new replication set is expected to include previous instance %s", secondInstance.Addr)
661+
}
662+
663+
// Decrease shard size to 6.
664+
fourthShard := ring.ShuffleShard("tenant-id", 6)
665+
assert.Equal(t, 6, fourthShard.IngesterCount())
666+
667+
fourthSet, err := fourthShard.GetAll(Read)
668+
require.NoError(t, err)
669+
670+
// We expect to have the same exact instances we had when the shard size was 6.
671+
for _, secondInstance := range secondSet.Ingesters {
672+
assert.True(t, fourthSet.Includes(secondInstance.Addr), "new replication set is expected to include previous instance %s", secondInstance.Addr)
673+
}
674+
675+
// Decrease shard size to 3.
676+
fifthShard := ring.ShuffleShard("tenant-id", 3)
677+
assert.Equal(t, 3, fifthShard.IngesterCount())
678+
679+
fifthSet, err := fifthShard.GetAll(Read)
680+
require.NoError(t, err)
681+
682+
// We expect to have the same exact instances we had when the shard size was 3.
683+
for _, firstInstance := range firstSet.Ingesters {
684+
assert.True(t, fifthSet.Includes(firstInstance.Addr), "new replication set is expected to include previous instance %s", firstInstance.Addr)
685+
}
686+
}
687+
688+
func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) {
689+
// Create 20 instances in 2 zones.
690+
ringInstances := map[string]IngesterDesc{}
691+
for i := 0; i < 20; i++ {
692+
name, desc := generateRingInstance(i, i%2)
693+
ringInstances[name] = desc
694+
}
695+
696+
// Init the ring.
697+
ringDesc := &Desc{Ingesters: ringInstances}
698+
ring := Ring{
699+
cfg: Config{
700+
HeartbeatTimeout: time.Hour,
701+
ZoneAwarenessEnabled: true,
702+
},
703+
ringDesc: ringDesc,
704+
ringTokens: ringDesc.getTokens(),
705+
ringTokensByZone: ringDesc.getTokensByZone(),
706+
ringZones: getZones(ringDesc.getTokensByZone()),
707+
strategy: &DefaultReplicationStrategy{},
708+
}
709+
710+
// Get the replication set with shard size = 2.
711+
firstShard := ring.ShuffleShard("tenant-id", 2)
712+
assert.Equal(t, 2, firstShard.IngesterCount())
713+
714+
firstSet, err := firstShard.GetAll(Read)
715+
require.NoError(t, err)
716+
717+
// Increase shard size to 4.
718+
secondShard := ring.ShuffleShard("tenant-id", 4)
719+
assert.Equal(t, 4, secondShard.IngesterCount())
720+
721+
secondSet, err := secondShard.GetAll(Read)
722+
require.NoError(t, err)
723+
724+
for _, firstInstance := range firstSet.Ingesters {
725+
assert.True(t, secondSet.Includes(firstInstance.Addr), "new replication set is expected to include previous instance %s", firstInstance.Addr)
726+
}
727+
728+
// Scale up cluster, adding 10 instances in 1 new zone.
729+
for i := 20; i < 30; i++ {
730+
name, desc := generateRingInstance(i, 2)
731+
ringInstances[name] = desc
732+
}
733+
734+
ring.ringDesc.Ingesters = ringInstances
735+
ring.ringTokens = ringDesc.getTokens()
736+
ring.ringTokensByZone = ringDesc.getTokensByZone()
737+
ring.ringZones = getZones(ringDesc.getTokensByZone())
738+
739+
// Increase shard size to 6.
740+
thirdShard := ring.ShuffleShard("tenant-id", 6)
741+
assert.Equal(t, 6, thirdShard.IngesterCount())
742+
743+
thirdSet, err := thirdShard.GetAll(Read)
744+
require.NoError(t, err)
745+
746+
for _, secondInstance := range secondSet.Ingesters {
747+
assert.True(t, thirdSet.Includes(secondInstance.Addr), "new replication set is expected to include previous instance %s", secondInstance.Addr)
748+
}
749+
750+
// Increase shard size to 9.
751+
fourthShard := ring.ShuffleShard("tenant-id", 9)
752+
assert.Equal(t, 9, fourthShard.IngesterCount())
753+
754+
fourthSet, err := fourthShard.GetAll(Read)
755+
require.NoError(t, err)
756+
757+
for _, thirdInstance := range thirdSet.Ingesters {
758+
assert.True(t, fourthSet.Includes(thirdInstance.Addr), "new replication set is expected to include previous instance %s", thirdInstance.Addr)
759+
}
760+
}
761+
612762
func BenchmarkRing_ShuffleShard(b *testing.B) {
613763
for _, numInstances := range []int{50, 100, 1000} {
614764
for _, numZones := range []int{1, 3} {

pkg/ruler/ruler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ func TestSharding(t *testing.T) {
585585

586586
// User shuffle shard token.
587587
func userToken(user string, skip int) uint32 {
588-
r := rand.New(rand.NewSource(util.ShuffleShardSeed(user)))
588+
r := rand.New(rand.NewSource(util.ShuffleShardSeed(user, "")))
589589

590590
for ; skip > 0; skip-- {
591591
_ = r.Uint32()

pkg/util/shard.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,19 @@ import (
55
"encoding/binary"
66
)
77

8+
var (
9+
seedSeparator = []byte{0}
10+
)
11+
812
// ShuffleShardSeed returns seed for random number generator, computed from provided identifier.
9-
func ShuffleShardSeed(identifier string) int64 {
13+
func ShuffleShardSeed(identifier, zone string) int64 {
1014
// Use the identifier to compute an hash we'll use to seed the random.
1115
hasher := md5.New()
12-
hasher.Write([]byte(identifier)) // nolint:errcheck
16+
hasher.Write(YoloBuf(identifier)) // nolint:errcheck
17+
if zone != "" {
18+
hasher.Write(seedSeparator) // nolint:errcheck
19+
hasher.Write(YoloBuf(zone)) // nolint:errcheck
20+
}
1321
checksum := hasher.Sum(nil)
1422

1523
// Generate the seed based on the first 64 bits of the checksum.

pkg/util/yolo.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package util
2+
3+
import "unsafe"
4+
5+
func YoloBuf(s string) []byte {
6+
return *((*[]byte)(unsafe.Pointer(&s)))
7+
}

pkg/util/yolo_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package util
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
func TestYoloBuf(t *testing.T) {
10+
s := YoloBuf("hello world")
11+
12+
require.Equal(t, []byte("hello world"), s)
13+
}

0 commit comments

Comments
 (0)