Skip to content

Commit 165565a

Browse files
authored
Merge pull request #20040 from rjernst/pull_allocation_deciders
Make custom allocation deciders use pull based extensions
2 parents de208cf + 45144ed commit 165565a

File tree

79 files changed

+257
-235
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+257
-235
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/Allocators.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import java.lang.reflect.Constructor;
3939
import java.lang.reflect.InvocationTargetException;
4040
import java.util.ArrayList;
41+
import java.util.Collection;
42+
import java.util.Collections;
4143
import java.util.List;
4244
import java.util.Map;
4345

@@ -72,7 +74,7 @@ private Allocators() {
7274

7375
public static AllocationService createAllocationService(Settings settings) throws NoSuchMethodException, InstantiationException,
7476
IllegalAccessException, InvocationTargetException {
75-
return createAllocationService(settings, new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings
77+
return createAllocationService(settings, new ClusterSettings(Settings.EMPTY, ClusterSettings
7678
.BUILT_IN_CLUSTER_SETTINGS));
7779
}
7880

@@ -85,19 +87,9 @@ public static AllocationService createAllocationService(Settings settings, Clust
8587

8688
public static AllocationDeciders defaultAllocationDeciders(Settings settings, ClusterSettings clusterSettings) throws
8789
IllegalAccessException, InvocationTargetException, InstantiationException, NoSuchMethodException {
88-
List<AllocationDecider> list = new ArrayList<>();
89-
// Keep a deterministic order of allocation deciders for the benchmark
90-
for (Class<? extends AllocationDecider> deciderClass : ClusterModule.DEFAULT_ALLOCATION_DECIDERS) {
91-
try {
92-
Constructor<? extends AllocationDecider> constructor = deciderClass.getConstructor(Settings.class, ClusterSettings
93-
.class);
94-
list.add(constructor.newInstance(settings, clusterSettings));
95-
} catch (NoSuchMethodException e) {
96-
Constructor<? extends AllocationDecider> constructor = deciderClass.getConstructor(Settings.class);
97-
list.add(constructor.newInstance(settings));
98-
}
99-
}
100-
return new AllocationDeciders(settings, list.toArray(new AllocationDecider[0]));
90+
Collection<AllocationDecider> deciders =
91+
ClusterModule.createAllocationDeciders(settings, clusterSettings, Collections.emptyList());
92+
return new AllocationDeciders(settings, deciders);
10193

10294
}
10395

buildSrc/src/main/resources/checkstyle_suppressions.xml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,6 @@
688688
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]AllocationPriorityTests.java" checks="LineLength" />
689689
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]AwarenessAllocationTests.java" checks="LineLength" />
690690
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]BalanceConfigurationTests.java" checks="LineLength" />
691-
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]CatAllocationTestCase.java" checks="LineLength" />
692691
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]ClusterRebalanceRoutingTests.java" checks="LineLength" />
693692
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]ConcurrentRebalanceRoutingTests.java" checks="LineLength" />
694693
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]DeadNodesAllocationTests.java" checks="LineLength" />
@@ -1085,7 +1084,6 @@
10851084
<suppress files="test[/\\]framework[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]test[/\\]BackgroundIndexer.java" checks="LineLength" />
10861085
<suppress files="test[/\\]framework[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]test[/\\]CompositeTestCluster.java" checks="LineLength" />
10871086
<suppress files="test[/\\]framework[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]test[/\\]CorruptionUtils.java" checks="LineLength" />
1088-
<suppress files="test[/\\]framework[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]test[/\\]ESAllocationTestCase.java" checks="LineLength" />
10891087
<suppress files="test[/\\]framework[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]test[/\\]ESBackcompatTestCase.java" checks="LineLength" />
10901088
<suppress files="test[/\\]framework[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]test[/\\]ESIntegTestCase.java" checks="LineLength" />
10911089
<suppress files="test[/\\]framework[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]test[/\\]ESSingleNodeTestCase.java" checks="LineLength" />

core/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,19 @@
5656
import org.elasticsearch.common.inject.AbstractModule;
5757
import org.elasticsearch.common.logging.ESLogger;
5858
import org.elasticsearch.common.logging.Loggers;
59+
import org.elasticsearch.common.settings.ClusterSettings;
5960
import org.elasticsearch.common.settings.Setting;
6061
import org.elasticsearch.common.settings.Setting.Property;
6162
import org.elasticsearch.common.settings.Settings;
6263
import org.elasticsearch.common.util.ExtensionPoint;
6364
import org.elasticsearch.gateway.GatewayAllocator;
65+
import org.elasticsearch.plugins.ClusterPlugin;
6466
import org.elasticsearch.tasks.TaskResultsService;
6567

66-
import java.util.Arrays;
67-
import java.util.Collections;
68+
import java.util.Collection;
69+
import java.util.HashMap;
6870
import java.util.List;
71+
import java.util.Map;
6972
import java.util.function.Function;
7073

7174
/**
@@ -77,48 +80,27 @@ public class ClusterModule extends AbstractModule {
7780
public static final String BALANCED_ALLOCATOR = "balanced"; // default
7881
public static final Setting<String> SHARDS_ALLOCATOR_TYPE_SETTING =
7982
new Setting<>("cluster.routing.allocation.type", BALANCED_ALLOCATOR, Function.identity(), Property.NodeScope);
80-
public static final List<Class<? extends AllocationDecider>> DEFAULT_ALLOCATION_DECIDERS =
81-
Collections.unmodifiableList(Arrays.asList(
82-
MaxRetryAllocationDecider.class,
83-
SameShardAllocationDecider.class,
84-
FilterAllocationDecider.class,
85-
ReplicaAfterPrimaryActiveAllocationDecider.class,
86-
ThrottlingAllocationDecider.class,
87-
RebalanceOnlyWhenActiveAllocationDecider.class,
88-
ClusterRebalanceAllocationDecider.class,
89-
ConcurrentRebalanceAllocationDecider.class,
90-
EnableAllocationDecider.class,
91-
AwarenessAllocationDecider.class,
92-
ShardsLimitAllocationDecider.class,
93-
NodeVersionAllocationDecider.class,
94-
DiskThresholdDecider.class,
95-
SnapshotInProgressAllocationDecider.class));
9683

9784
private final Settings settings;
9885
private final ExtensionPoint.SelectedType<ShardsAllocator> shardsAllocators = new ExtensionPoint.SelectedType<>("shards_allocator", ShardsAllocator.class);
99-
private final ExtensionPoint.ClassSet<AllocationDecider> allocationDeciders = new ExtensionPoint.ClassSet<>("allocation_decider", AllocationDecider.class, AllocationDeciders.class);
10086
private final ExtensionPoint.ClassSet<IndexTemplateFilter> indexTemplateFilters = new ExtensionPoint.ClassSet<>("index_template_filter", IndexTemplateFilter.class);
10187
private final ClusterService clusterService;
10288
private final IndexNameExpressionResolver indexNameExpressionResolver;
89+
// pkg private for tests
90+
final Collection<AllocationDecider> allocationDeciders;
10391

10492
// pkg private so tests can mock
10593
Class<? extends ClusterInfoService> clusterInfoServiceImpl = InternalClusterInfoService.class;
10694

107-
public ClusterModule(Settings settings, ClusterService clusterService) {
95+
public ClusterModule(Settings settings, ClusterService clusterService, List<ClusterPlugin> clusterPlugins) {
10896
this.settings = settings;
109-
for (Class<? extends AllocationDecider> decider : ClusterModule.DEFAULT_ALLOCATION_DECIDERS) {
110-
registerAllocationDecider(decider);
111-
}
97+
this.allocationDeciders = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
11298
registerShardsAllocator(ClusterModule.BALANCED_ALLOCATOR, BalancedShardsAllocator.class);
11399
registerShardsAllocator(ClusterModule.EVEN_SHARD_COUNT_ALLOCATOR, BalancedShardsAllocator.class);
114100
this.clusterService = clusterService;
115101
indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
116102
}
117103

118-
public void registerAllocationDecider(Class<? extends AllocationDecider> allocationDecider) {
119-
allocationDeciders.registerExtension(allocationDecider);
120-
}
121-
122104
public void registerShardsAllocator(String name, Class<? extends ShardsAllocator> clazz) {
123105
shardsAllocators.registerExtension(name, clazz);
124106
}
@@ -131,6 +113,41 @@ public IndexNameExpressionResolver getIndexNameExpressionResolver() {
131113
return indexNameExpressionResolver;
132114
}
133115

116+
// TODO: this is public so allocation benchmark can access the default deciders...can we do that in another way?
117+
/** Return a new {@link AllocationDecider} instance with builtin deciders as well as those from plugins. */
118+
public static Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings,
119+
List<ClusterPlugin> clusterPlugins) {
120+
// collect deciders by class so that we can detect duplicates
121+
Map<Class, AllocationDecider> deciders = new HashMap<>();
122+
addAllocationDecider(deciders, new MaxRetryAllocationDecider(settings));
123+
addAllocationDecider(deciders, new SameShardAllocationDecider(settings));
124+
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
125+
addAllocationDecider(deciders, new ReplicaAfterPrimaryActiveAllocationDecider(settings));
126+
addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings));
127+
addAllocationDecider(deciders, new RebalanceOnlyWhenActiveAllocationDecider(settings));
128+
addAllocationDecider(deciders, new ClusterRebalanceAllocationDecider(settings, clusterSettings));
129+
addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings));
130+
addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings));
131+
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));
132+
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings));
133+
addAllocationDecider(deciders, new NodeVersionAllocationDecider(settings));
134+
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
135+
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider(settings, clusterSettings));
136+
137+
clusterPlugins.stream()
138+
.flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream())
139+
.forEach(d -> addAllocationDecider(deciders, d));
140+
141+
return deciders.values();
142+
}
143+
144+
/** Add the given allocation decider to the given deciders collection, erroring if the class name is already used. */
145+
private static void addAllocationDecider(Map<Class, AllocationDecider> deciders, AllocationDecider decider) {
146+
if (deciders.put(decider.getClass(), decider) != null) {
147+
throw new IllegalArgumentException("Cannot specify allocation decider [" + decider.getClass().getName() + "] twice");
148+
}
149+
}
150+
134151
@Override
135152
protected void configure() {
136153
// bind ShardsAllocator
@@ -139,7 +156,6 @@ protected void configure() {
139156
final ESLogger logger = Loggers.getLogger(getClass(), settings);
140157
logger.warn("{} allocator has been removed in 2.0 using {} instead", ClusterModule.EVEN_SHARD_COUNT_ALLOCATOR, ClusterModule.BALANCED_ALLOCATOR);
141158
}
142-
allocationDeciders.bind(binder());
143159
indexTemplateFilters.bind(binder());
144160

145161
bind(ClusterInfoService.class).to(clusterInfoServiceImpl).asEagerSingleton();
@@ -161,5 +177,6 @@ protected void configure() {
161177
bind(NodeMappingRefreshAction.class).asEagerSingleton();
162178
bind(MappingUpdatedAction.class).asEagerSingleton();
163179
bind(TaskResultsService.class).asEagerSingleton();
180+
bind(AllocationDeciders.class).toInstance(new AllocationDeciders(settings, allocationDeciders));
164181
}
165182
}

core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import org.elasticsearch.common.inject.Inject;
2727
import org.elasticsearch.common.settings.Settings;
2828

29+
import java.util.Collection;
30+
import java.util.Collections;
31+
import java.util.List;
2932
import java.util.Set;
3033

3134
/**
@@ -34,16 +37,11 @@
3437
*/
3538
public class AllocationDeciders extends AllocationDecider {
3639

37-
private final AllocationDecider[] allocations;
40+
private final Collection<AllocationDecider> allocations;
3841

39-
public AllocationDeciders(Settings settings, AllocationDecider[] allocations) {
42+
public AllocationDeciders(Settings settings, Collection<AllocationDecider> allocations) {
4043
super(settings);
41-
this.allocations = allocations;
42-
}
43-
44-
@Inject
45-
public AllocationDeciders(Settings settings, Set<AllocationDecider> allocations) {
46-
this(settings, allocations.toArray(new AllocationDecider[allocations.size()]));
44+
this.allocations = Collections.unmodifiableCollection(allocations);
4745
}
4846

4947
@Override

core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,20 @@
1919

2020
package org.elasticsearch.cluster.routing.allocation.decider;
2121

22+
import java.util.HashMap;
23+
import java.util.Map;
24+
2225
import com.carrotsearch.hppc.ObjectIntHashMap;
2326
import org.elasticsearch.cluster.metadata.IndexMetaData;
2427
import org.elasticsearch.cluster.routing.RoutingNode;
2528
import org.elasticsearch.cluster.routing.ShardRouting;
2629
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
2730
import org.elasticsearch.common.Strings;
28-
import org.elasticsearch.common.inject.Inject;
2931
import org.elasticsearch.common.settings.ClusterSettings;
3032
import org.elasticsearch.common.settings.Setting;
3133
import org.elasticsearch.common.settings.Setting.Property;
3234
import org.elasticsearch.common.settings.Settings;
3335

34-
import java.util.HashMap;
35-
import java.util.Map;
36-
3736
/**
3837
* This {@link AllocationDecider} controls shard allocation based on
3938
* <tt>awareness</tt> key-value pairs defined in the node configuration.
@@ -104,7 +103,6 @@ public AwarenessAllocationDecider(Settings settings) {
104103
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
105104
}
106105

107-
@Inject
108106
public AwarenessAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
109107
super(settings);
110108
this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);

core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,15 @@
1919

2020
package org.elasticsearch.cluster.routing.allocation.decider;
2121

22+
import java.util.Locale;
23+
2224
import org.elasticsearch.cluster.routing.ShardRouting;
2325
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
24-
import org.elasticsearch.common.inject.Inject;
2526
import org.elasticsearch.common.settings.ClusterSettings;
2627
import org.elasticsearch.common.settings.Setting;
2728
import org.elasticsearch.common.settings.Setting.Property;
2829
import org.elasticsearch.common.settings.Settings;
2930

30-
import java.util.Locale;
31-
3231
/**
3332
* This {@link AllocationDecider} controls re-balancing operations based on the
3433
* cluster wide active shard state. This decided can not be configured in
@@ -85,7 +84,6 @@ public static ClusterRebalanceType parseString(String typeString) {
8584

8685
private volatile ClusterRebalanceType type;
8786

88-
@Inject
8987
public ClusterRebalanceAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
9088
super(settings);
9189
try {

core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.elasticsearch.cluster.routing.ShardRouting;
2323
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
24-
import org.elasticsearch.common.inject.Inject;
2524
import org.elasticsearch.common.settings.ClusterSettings;
2625
import org.elasticsearch.common.settings.Setting;
2726
import org.elasticsearch.common.settings.Setting.Property;
@@ -48,7 +47,6 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
4847
Property.Dynamic, Property.NodeScope);
4948
private volatile int clusterConcurrentRebalance;
5049

51-
@Inject
5250
public ConcurrentRebalanceAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
5351
super(settings);
5452
this.clusterConcurrentRebalance = CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.get(settings);

core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
3434
import org.elasticsearch.common.Strings;
3535
import org.elasticsearch.common.collect.ImmutableOpenMap;
36-
import org.elasticsearch.common.inject.Inject;
3736
import org.elasticsearch.common.settings.ClusterSettings;
3837
import org.elasticsearch.common.settings.Settings;
3938
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -69,7 +68,6 @@ public class DiskThresholdDecider extends AllocationDecider {
6968

7069
private final DiskThresholdSettings diskThresholdSettings;
7170

72-
@Inject
7371
public DiskThresholdDecider(Settings settings, ClusterSettings clusterSettings) {
7472
super(settings);
7573
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);

core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,17 @@
1919

2020
package org.elasticsearch.cluster.routing.allocation.decider;
2121

22+
import java.util.Locale;
23+
2224
import org.elasticsearch.cluster.metadata.IndexMetaData;
2325
import org.elasticsearch.cluster.routing.RoutingNode;
2426
import org.elasticsearch.cluster.routing.ShardRouting;
2527
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
26-
import org.elasticsearch.common.inject.Inject;
2728
import org.elasticsearch.common.settings.ClusterSettings;
2829
import org.elasticsearch.common.settings.Setting;
2930
import org.elasticsearch.common.settings.Setting.Property;
3031
import org.elasticsearch.common.settings.Settings;
3132

32-
import java.util.Locale;
33-
3433
/**
3534
* This allocation decider allows shard allocations / rebalancing via the cluster wide settings
3635
* {@link #CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} and the per index setting
@@ -79,7 +78,6 @@ public class EnableAllocationDecider extends AllocationDecider {
7978
private volatile Rebalance enableRebalance;
8079
private volatile Allocation enableAllocation;
8180

82-
@Inject
8381
public EnableAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
8482
super(settings);
8583
this.enableAllocation = CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.get(settings);

core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.cluster.routing.RoutingNode;
2525
import org.elasticsearch.cluster.routing.ShardRouting;
2626
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
27-
import org.elasticsearch.common.inject.Inject;
2827
import org.elasticsearch.common.settings.ClusterSettings;
2928
import org.elasticsearch.common.settings.Setting;
3029
import org.elasticsearch.common.settings.Setting.Property;
@@ -75,7 +74,6 @@ public class FilterAllocationDecider extends AllocationDecider {
7574
private volatile DiscoveryNodeFilters clusterIncludeFilters;
7675
private volatile DiscoveryNodeFilters clusterExcludeFilters;
7776

78-
@Inject
7977
public FilterAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
8078
super(settings);
8179
setClusterRequireFilters(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING.get(settings));

0 commit comments

Comments
 (0)