Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void onFailure(String source, Exception e) {
@Override
public ClusterState execute(final ClusterState currentState) {
// now, reroute in case things that require it changed (e.g. number of replicas)
return allocationService.reroute(currentState, "reroute after cluster update settings");
return allocationService.moveShards(currentState, "reroute after cluster update settings");
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.elasticsearch.script.ScriptMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskResultsService;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -105,14 +106,14 @@ public class ClusterModule extends AbstractModule {
final Collection<AllocationDecider> deciderList;
final ShardsAllocator shardsAllocator;

public ClusterModule(Settings settings, ClusterService clusterService, List<ClusterPlugin> clusterPlugins,
public ClusterModule(Settings settings, ThreadPool threadPool, ClusterService clusterService, List<ClusterPlugin> clusterPlugins,
ClusterInfoService clusterInfoService) {
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
this.allocationDeciders = new AllocationDeciders(deciderList);
this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver();
this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService);
this.allocationService = new AllocationService(settings, threadPool, clusterService, allocationDeciders, shardsAllocator, clusterInfoService);
}

public static List<Entry> getNamedWriteables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public ClusterState deleteIndices(ClusterState currentState, Set<Index> indices)
}
}

return allocationService.reroute(
return allocationService.noReroute(
ClusterState.builder(currentState)
.routingTable(routingTableBuilder.build())
.metaData(newMetaData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public ClusterState execute(final ClusterState currentState) throws Exception {
closeRoutingTable(currentState, blockedIndices, verifyResults);
assert verifyResults.size() == closingResult.v2().size();
indices.addAll(closingResult.v2());
return allocationService.reroute(closingResult.v1(), "indices closed");
return allocationService.noReroute(closingResult.v1(), "indices closed");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public ClusterState execute(ClusterState currentState) {
.routingTable(routingTableBuilder.build()).blocks(blocks).build();

// now, reroute in case things change that require it (like number of replicas)
updatedState = allocationService.reroute(updatedState, "settings update");
updatedState = allocationService.moveShards(updatedState, "settings update");
try {
for (Index index : openIndices) {
final IndexMetaData currentMetaData = currentState.getMetaData().getIndexSafe(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterStateHealth;
Expand All @@ -39,8 +40,13 @@
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -66,10 +72,23 @@ public class AllocationService {

private static final Logger logger = LogManager.getLogger(AllocationService.class);

public static final Setting<TimeValue> CLUSTER_ROUTING_ALLOCATION_REBALANCE_REROUTE_INTERVAL_SETTING =
Setting.positiveTimeSetting("cluster.routing.allocation.rebalance.reroute_interval", TimeValue.timeValueHours(2),
Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<TimeValue> CLUSTER_ROUTING_ALLOCATION_REBALANCE_SCHEDULE_INTERVAL_SETTING =
Setting.positiveTimeSetting("cluster.routing.allocation.rebalance.schedule_interval", TimeValue.timeValueMinutes(5),
Setting.Property.Dynamic, Setting.Property.NodeScope);

private final AllocationDeciders allocationDeciders;
private GatewayAllocator gatewayAllocator;
private final ShardsAllocator shardsAllocator;
private final ClusterInfoService clusterInfoService;
private ThreadPool threadPool;
private ClusterService clusterService;
private volatile long lastRunNS;
private volatile TimeValue rerouteInterval;
private volatile TimeValue scheduleInterval;
private volatile ThreadPool.Cancellable cancellable;

public AllocationService(AllocationDeciders allocationDeciders,
GatewayAllocator gatewayAllocator,
Expand All @@ -78,13 +97,57 @@ public AllocationService(AllocationDeciders allocationDeciders,
setGatewayAllocator(gatewayAllocator);
}

public AllocationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, AllocationDeciders allocationDeciders,
ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService) {
this(allocationDeciders, shardsAllocator, clusterInfoService);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.lastRunNS = System.nanoTime();
this.rerouteInterval = CLUSTER_ROUTING_ALLOCATION_REBALANCE_REROUTE_INTERVAL_SETTING.get(settings);
this.scheduleInterval = CLUSTER_ROUTING_ALLOCATION_REBALANCE_SCHEDULE_INTERVAL_SETTING.get(settings);
this.cancellable = createCancellable(threadPool, clusterService, scheduleInterval);
clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_REBALANCE_REROUTE_INTERVAL_SETTING, this::setRerouteInterval);
clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_REBALANCE_SCHEDULE_INTERVAL_SETTING, this::setScheduleInterval);
}

public AllocationService(AllocationDeciders allocationDeciders,
ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService) {
this.allocationDeciders = allocationDeciders;
this.shardsAllocator = shardsAllocator;
this.clusterInfoService = clusterInfoService;
}

private ThreadPool.Cancellable createCancellable(ThreadPool threadPool, ClusterService clusterService, TimeValue timeValue) {
return threadPool.scheduleWithFixedDelay(() -> {
if (clusterService.state().nodes().isLocalNodeElectedMaster()) {
if ((System.nanoTime() - lastRunNS) > rerouteInterval.nanos()) {
clusterService.submitStateUpdateTask("async-rebalance", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState newClusterState = balance(currentState, "async-rebalance");
lastRunNS = System.nanoTime();
return newClusterState;
}

@Override
public void onFailure(String source, Exception e) {
logger.error("[source: {}] throws exception: {}", source, e);
}
});
}
}
}, timeValue, ThreadPool.Names.MANAGEMENT);
}

private void setRerouteInterval(TimeValue rerouteInterval) {
this.rerouteInterval = rerouteInterval;
}

private void setScheduleInterval(TimeValue scheduleInterval) {
this.cancellable.cancel();
this.cancellable = createCancellable(threadPool, clusterService, scheduleInterval);
}

public void setGatewayAllocator(GatewayAllocator gatewayAllocator) {
this.gatewayAllocator = gatewayAllocator;
}
Expand Down Expand Up @@ -361,6 +424,54 @@ public ClusterState reroute(ClusterState clusterState, String reason) {
return reroute(clusterState, reason, false);
}

/** no need handle */
public ClusterState noReroute(ClusterState clusterState, String reason) {
return clusterState;
}

/** move shards by update settings */
public ClusterState moveShards(ClusterState clusterState, String reason) {
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
clusterInfoService.getClusterInfo(), currentNanoTime());
allocation.debugDecision(false);

assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes";

shardsAllocator.moveShards(allocation);
assert RoutingNodes.assertShardStats(allocation.routingNodes());

if (allocation.routingNodesChanged() == false) {
return clusterState;
}
return buildResultAndLogHealthChange(clusterState, allocation, reason);
}

/** async rebalance */
public ClusterState balance(ClusterState clusterState, String reason) {
if (clusterState.getRoutingNodes().unassigned().size() > 0) {
return clusterState;
}
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
clusterInfoService.getClusterInfo(), currentNanoTime());
allocation.debugDecision(false);

assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes";

shardsAllocator.balance(allocation);
assert RoutingNodes.assertShardStats(allocation.routingNodes());

if (allocation.routingNodesChanged() == false) {
return clusterState;
}
return buildResultAndLogHealthChange(clusterState, allocation, reason);
}

/**
* Reroutes the routing table based on the live nodes.
* <p>
Expand Down Expand Up @@ -410,7 +521,7 @@ private void reroute(RoutingAllocation allocation) {
gatewayAllocator.allocateUnassigned(allocation);
}

shardsAllocator.allocate(allocation);
shardsAllocator.allocateUnassigned(allocation);
assert RoutingNodes.assertShardStats(allocation.routingNodes());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,32 @@ private void setThreshold(float threshold) {
}

@Override
public void allocate(RoutingAllocation allocation) {
public void allocateUnassigned(RoutingAllocation allocation) {
if (allocation.routingNodes().size() == 0) {
/* with no nodes this is pointless */
return;
}
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
balancer.allocateUnassigned();
}

@Override
public void moveShards(RoutingAllocation allocation) {
if (allocation.routingNodes().size() == 0) {
/* with no nodes this is pointless */
return;
}
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
balancer.moveShards();
}

@Override
public void balance(RoutingAllocation allocation) {
if (allocation.routingNodes().size() == 0) {
/* with no nodes this is pointless */
return;
}
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
balancer.balance();
}

Expand Down Expand Up @@ -310,6 +328,10 @@ private void balance() {
if (logger.isTraceEnabled()) {
logger.trace("Start balancing cluster");
}
if (allocation.routingNodes().hasUnassignedShards()) {
logger.trace("skipping rebalance as has unassigned shards");
return;
}
if (allocation.hasPendingAsyncFetch()) {
/*
* see https://github.com/elastic/elasticsearch/issues/14387
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,21 @@ public interface ShardsAllocator {
*
* @param allocation current node allocation
*/
void allocate(RoutingAllocation allocation);
void allocateUnassigned(RoutingAllocation allocation);

/** move shards by update settings */
void moveShards(RoutingAllocation allocation);

/** timing balance */
void balance(RoutingAllocation allocation);

/**
* Returns the decision for where a shard should reside in the cluster. If the shard is unassigned,
* then the {@link AllocateUnassignedDecision} will be non-null. If the shard is not in the unassigned
* state, then the {@link MoveDecision} will be non-null.
*
* This method is primarily used by the cluster allocation explain API to provide detailed explanations
* for the allocation of a single shard. Implementations of the {@link #allocate(RoutingAllocation)} method
* for the allocation of a single shard. Implementations of the {@link #allocateUnassigned(RoutingAllocation)} method
* may use the results of this method implementation to decide on allocating shards in the routing table
* to the cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.cluster.metadata.IndexGraveyard;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
Expand Down Expand Up @@ -230,6 +231,8 @@ public void apply(Settings value, Settings current, Settings previous) {
GatewayService.RECOVER_AFTER_MASTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_TIME_SETTING,
AllocationService.CLUSTER_ROUTING_ALLOCATION_REBALANCE_REROUTE_INTERVAL_SETTING,
AllocationService.CLUSTER_ROUTING_ALLOCATION_REBALANCE_SCHEDULE_INTERVAL_SETTING,
NetworkModule.HTTP_DEFAULT_TYPE_SETTING,
NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING,
NetworkModule.HTTP_TYPE_SETTING,
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ protected Node(
modules.add(pluginModule);
}
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);
ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService);
ClusterModule clusterModule = new ClusterModule(settings, threadPool, clusterService, clusterPlugins, clusterInfoService);
modules.add(clusterModule);
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
modules.add(indicesModule);
Expand Down