Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
689467f
Initial draft
howardhuanghua Nov 27, 2021
8cf0166
update draft
howardhuanghua Nov 29, 2021
eee910e
Merge remote-tracking branch 'origin' into async_fetch_request
howardhuanghua Dec 6, 2021
6f8af21
Introduce new batch action for shard fetch
howardhuanghua Dec 6, 2021
c63da96
fix compile issue
howardhuanghua Dec 6, 2021
be1357b
revert shard store action
howardhuanghua Dec 6, 2021
00ec7fe
remove unused func
howardhuanghua Dec 6, 2021
c64755d
fix spotless issue
howardhuanghua Dec 6, 2021
a5552c2
add new action registion instance
howardhuanghua Dec 7, 2021
92c19ef
Merge remote-tracking branch 'origin' into async_fetch_request
howardhuanghua Dec 7, 2021
a84d31f
merge master
howardhuanghua Dec 27, 2021
e47853a
Replace discovery node with node id.
howardhuanghua Dec 27, 2021
e74427e
Separate batch size.
howardhuanghua Jan 11, 2022
e941c5a
Merge remote-tracking branch 'origin' into async_fetch_request
howardhuanghua Jan 11, 2022
3269c43
Fix style issue.
howardhuanghua Jan 11, 2022
fd4800f
fix spotless issue
howardhuanghua Jan 11, 2022
f4b4bb6
Fxi null pointer issue.
howardhuanghua Jan 12, 2022
0fc2788
optimize shard request info
howardhuanghua Jan 12, 2022
c9688fd
Fix conflict
howardhuanghua Apr 2, 2022
3f46f39
Merge remote-tracking branch 'origin' into async_fetch_request
howardhuanghua Apr 14, 2022
e6459d9
Fix shardId serialize issue and optimize failed shard case.
howardhuanghua Apr 14, 2022
e6f66b4
support empty shard copy.
howardhuanghua Apr 14, 2022
28e9194
revert unchange log
howardhuanghua Apr 14, 2022
916c3f4
Fix spotless.
howardhuanghua Apr 14, 2022
e415426
Add change log.
howardhuanghua Apr 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/81081.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 81081
summary: Batch async fetch shards data to reduce memory consumption.
area: Allocation
type: enhancement
issues: [80694]
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.routing.allocation;

import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.TransportNodesBatchListGatewayStartedShards;
import org.elasticsearch.gateway.TransportNodesBatchListGatewayStartedShards.NodeGatewayBatchStartedShard;
import org.elasticsearch.gateway.TransportNodesBatchListGatewayStartedShards.NodesGatewayBatchStartedShards;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.store.TransportNodesBatchListShardStoreMetadata;
import org.elasticsearch.indices.store.TransportNodesBatchListShardStoreMetadata.NodesBatchStoreFilesMetadata;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetadata.StoreFilesMetadata;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.After;

import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.hamcrest.Matchers.hasSize;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 3)
public class BatchAsyncFetchShardsIT extends ESIntegTestCase {

private Settings buildSettings(boolean batchMode, int batchStepSize) {
return Settings.builder()
.put(AllocationService.CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_ENABLE_SETTING.getKey(), batchMode)
.put(AllocationService.CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_STEP_SIZE_SETTING.getKey(), batchStepSize)
.build();
}

private Settings buildNullSettings() {
return Settings.builder()
.putNull(AllocationService.CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_ENABLE_SETTING.getKey())
.putNull(AllocationService.CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_STEP_SIZE_SETTING.getKey())
.build();
}

@After
public void clearSettings() {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(buildNullSettings())
.setPersistentSettings(buildNullSettings())
.get();
}

/**
* Test random async batch fetch shards.
*/
public void testRandomBatchMode() throws Exception {
// random batch
Settings settings = buildSettings(randomBoolean(), 10000);
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).setPersistentSettings(settings).get();
createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 10).put(SETTING_NUMBER_OF_REPLICAS, 2).build());

ensureGreen();
internalCluster().fullRestart();
ensureGreen();
}

/**
* Test batch step size.
* in flush process would assert the queued requests is less or equal to batch step size
*/
public void testBatchStepSize() throws Exception {
Settings settings = buildSettings(true, 4);
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).setPersistentSettings(settings).get();
createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 6).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureGreen();

client().admin().cluster().prepareState().get().getState();
internalCluster().fullRestart();
ensureGreen();

settings = buildSettings(true, 10);
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).setPersistentSettings(settings).get();
internalCluster().fullRestart();
ensureGreen();
}

public void testAsyncBatchFetchPrimaries() {
createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen();

final Index index = resolveIndex("test");
Map<ShardId, String> shards = new HashMap<>();
shards.put(new ShardId(index, 0), "");
shards.put(new ShardId(index, 1), "");
shards.put(new ShardId(index, 2), "");

ClusterState state = client().admin().cluster().prepareState().get().getState();
DiscoveryNode node = state.getNodes().getDataNodes().values().iterator().next();

NodesGatewayBatchStartedShards response;
response = ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesBatchListGatewayStartedShards.class),
new TransportNodesBatchListGatewayStartedShards.Request(shards, new DiscoveryNode[] { node })
);

// each node contains 3 shard entry, only got one primary shard
assertThat(response.getNodes(), hasSize(1));
assertThat(response.getNodes().get(0).getStartedShards(), hasSize(3));
int count = 0;
for (NodeGatewayBatchStartedShard shard : response.getNodes().get(0).getStartedShards()) {
if (shard.primary()) {
count++;
}
}
assertEquals(1, count);
}

public void testAsyncBatchListShardStore() {
createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureGreen();

final Index index = resolveIndex("test");
Map<ShardId, String> shards = new HashMap<>();
shards.put(new ShardId(index, 0), "");
shards.put(new ShardId(index, 1), "");
shards.put(new ShardId(index, 2), "");

ClusterState state = client().admin().cluster().prepareState().get().getState();
DiscoveryNode node = state.getNodes().getDataNodes().values().iterator().next();

NodesBatchStoreFilesMetadata response;
response = ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesBatchListShardStoreMetadata.class),
new TransportNodesBatchListShardStoreMetadata.Request(shards, new DiscoveryNode[] { node })
);

// each node contains 3 shard entry, got two shards store
assertThat(response.getNodes(), hasSize(1));
assertThat(response.getNodes().get(0).storeFilesMetadataList(), hasSize(3));
int count = 0;
for (StoreFilesMetadata shardStore : response.getNodes().get(0).storeFilesMetadataList()) {
if (shardStore.isEmpty() == false) {
count++;
}
}
assertEquals(2, count);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,15 @@
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.gateway.TransportNodesBatchListGatewayStartedShards;
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards;
import org.elasticsearch.health.GetHealthAction;
import org.elasticsearch.health.RestGetHealthAction;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.store.TransportNodesBatchListShardStoreMetadata;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetadata;
import org.elasticsearch.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.persistent.RemovePersistentTaskAction;
Expand Down Expand Up @@ -669,6 +671,8 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(TransportVerifyShardIndexBlockAction.TYPE, TransportVerifyShardIndexBlockAction.class);
actions.register(TransportNodesListGatewayStartedShards.TYPE, TransportNodesListGatewayStartedShards.class);
actions.register(TransportNodesListShardStoreMetadata.TYPE, TransportNodesListShardStoreMetadata.class);
actions.register(TransportNodesBatchListGatewayStartedShards.TYPE, TransportNodesBatchListGatewayStartedShards.class);
actions.register(TransportNodesBatchListShardStoreMetadata.TYPE, TransportNodesBatchListShardStoreMetadata.class);
actions.register(TransportShardFlushAction.TYPE, TransportShardFlushAction.class);
actions.register(TransportShardRefreshAction.TYPE, TransportShardRefreshAction.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,13 @@ public ClusterModule(
this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext, systemIndices);
this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
this.allocationService = new AllocationService(
allocationDeciders,
shardsAllocator,
clusterInfoService,
snapshotsInfoService,
clusterService
);
this.metadataDeleteIndexService = new MetadataDeleteIndexService(settings, clusterService, allocationService);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.ESLogMessage;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.gateway.PriorityComparator;
import org.elasticsearch.snapshots.SnapshotsInfoService;
Expand Down Expand Up @@ -72,7 +73,23 @@ public class AllocationService {
private Map<String, ExistingShardsAllocator> existingShardsAllocators;
private final ShardsAllocator shardsAllocator;
private final ClusterInfoService clusterInfoService;
private final SnapshotsInfoService snapshotsInfoService;
private SnapshotsInfoService snapshotsInfoService;
private boolean batchFetchShardEnable;
private int batchFetchShardStepSize;

public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_ENABLE_SETTING = Setting.boolSetting(
"cluster.routing.allocation.batch_fetch_shard.enable",
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_STEP_SIZE_SETTING = Setting.intSetting(
"cluster.routing.allocation.batch_fetch_shard.step_size",
10000,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

// only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator
public AllocationService(
Expand All @@ -82,20 +99,32 @@ public AllocationService(
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService
) {
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, null);
setExistingShardsAllocators(Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator));
}

public AllocationService(
AllocationDeciders allocationDeciders,
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService
SnapshotsInfoService snapshotsInfoService,
ClusterService clusterService
) {
this.allocationDeciders = allocationDeciders;
this.shardsAllocator = shardsAllocator;
this.clusterInfoService = clusterInfoService;
this.snapshotsInfoService = snapshotsInfoService;
if (clusterService != null) {
this.batchFetchShardEnable = CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_ENABLE_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_ENABLE_SETTING, this::setBatchFetchShardEnable);
this.batchFetchShardStepSize = CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_STEP_SIZE_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_STEP_SIZE_SETTING,
this::setBatchFetchShardStepSize
);
}
}

/**
Expand Down Expand Up @@ -536,14 +565,35 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
existingShardsAllocator.beforeAllocation(allocation);
}

GatewayAllocator gatewayAllocator = null;
if (logger.isDebugEnabled()) {
logger.debug("set batch fetch mode [{}] for routing allocation.", batchFetchShardEnable);
}
allocation.setBatchShardFetchMode(batchFetchShardEnable);

final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
while (primaryIterator.hasNext()) {
final ShardRouting shardRouting = primaryIterator.next();
if (shardRouting.primary()) {
getAllocatorForShard(shardRouting, allocation).allocateUnassigned(shardRouting, allocation, primaryIterator);
ExistingShardsAllocator allocator = getAllocatorForShard(shardRouting, allocation);
allocator.allocateUnassigned(shardRouting, allocation, primaryIterator);
if (gatewayAllocator == null && allocator instanceof GatewayAllocator) {
gatewayAllocator = (GatewayAllocator) allocator;
}

if (gatewayAllocator != null
&& gatewayAllocator.getPrimaryPendingFetchShardCount() > 0
&& gatewayAllocator.getPrimaryPendingFetchShardCount() % batchFetchShardStepSize == 0) {
gatewayAllocator.flushPendingPrimaryFetchRequests(batchFetchShardStepSize);
}
}
}

// flush the rest primaries
if (gatewayAllocator != null) {
gatewayAllocator.flushPendingPrimaryFetchRequests(batchFetchShardStepSize);
}

for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
existingShardsAllocator.afterPrimariesBeforeReplicas(allocation);
}
Expand All @@ -552,9 +602,24 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
while (replicaIterator.hasNext()) {
final ShardRouting shardRouting = replicaIterator.next();
if (shardRouting.primary() == false) {
getAllocatorForShard(shardRouting, allocation).allocateUnassigned(shardRouting, allocation, replicaIterator);
ExistingShardsAllocator allocator = getAllocatorForShard(shardRouting, allocation);
allocator.allocateUnassigned(shardRouting, allocation, replicaIterator);
if (gatewayAllocator == null && allocator instanceof GatewayAllocator) {
gatewayAllocator = (GatewayAllocator) allocator;
}

if (gatewayAllocator != null
&& gatewayAllocator.getReplicaPendingFetchShardCount() > 0
&& gatewayAllocator.getReplicaPendingFetchShardCount() % batchFetchShardStepSize == 0) {
gatewayAllocator.flushPendingReplicaFetchRequests(batchFetchShardStepSize);
}
}
}

// flush the rest replicas
if (gatewayAllocator != null) {
gatewayAllocator.flushPendingReplicaFetchRequests(batchFetchShardStepSize);
}
}

private static void disassociateDeadNodes(RoutingAllocation allocation) {
Expand Down Expand Up @@ -622,6 +687,14 @@ private static RoutingNodes getMutableRoutingNodes(ClusterState clusterState) {
return clusterState.mutableRoutingNodes();
}

public void setBatchFetchShardEnable(boolean batchFetchShardEnable) {
this.batchFetchShardEnable = batchFetchShardEnable;
}

public void setBatchFetchShardStepSize(int batchFetchShardStepSize) {
this.batchFetchShardStepSize = batchFetchShardStepSize;
}

/** override this to control time based decisions during allocation */
protected long currentNanoTime() {
return System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class RoutingAllocation {

private final long currentNanoTime;

private boolean batchShardFetchMode = false;

private final IndexMetadataUpdater indexMetadataUpdater = new IndexMetadataUpdater();
private final RoutingNodesChangedObserver nodesChangedObserver = new RoutingNodesChangedObserver();
private final RestoreInProgressUpdater restoreInProgressUpdater = new RestoreInProgressUpdater();
Expand Down Expand Up @@ -320,6 +322,18 @@ public void setHasPendingAsyncFetch() {
this.hasPendingAsyncFetch = true;
}

/**
* group shards in node level for async shard data fetching process.
* @return true batch mode, else single shard mode.
*/
public boolean isBatchShardFetchMode() {
return batchShardFetchMode;
}

public void setBatchShardFetchMode(boolean batchShardFetchMode) {
this.batchShardFetchMode = batchShardFetchMode;
}

public enum DebugMode {
/**
* debug mode is off
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.cluster.metadata.IndexGraveyard;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.DataTier;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
Expand Down Expand Up @@ -244,6 +245,8 @@ public void apply(Settings value, Settings current, Settings previous) {
SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING,
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING,
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING,
AllocationService.CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_ENABLE_SETTING,
AllocationService.CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_STEP_SIZE_SETTING,
InternalSnapshotsInfoService.INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING,
DestructiveOperations.REQUIRES_NAME_SETTING,
NoMasterBlockService.NO_MASTER_BLOCK_SETTING,
Expand Down
Loading