diff --git a/docs/changelog/81081.yaml b/docs/changelog/81081.yaml new file mode 100644 index 0000000000000..dfbab4e7c6094 --- /dev/null +++ b/docs/changelog/81081.yaml @@ -0,0 +1,5 @@ +pr: 81081 +summary: Batch async fetch shards data to reduce memory consumption. +area: Allocation +type: enhancement +issues: [80694] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/BatchAsyncFetchShardsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/BatchAsyncFetchShardsIT.java new file mode 100644 index 0000000000000..c213856e47a76 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/BatchAsyncFetchShardsIT.java @@ -0,0 +1,155 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.action.support.ActionTestUtils; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.TransportNodesBatchListGatewayStartedShards; +import org.elasticsearch.gateway.TransportNodesBatchListGatewayStartedShards.NodeGatewayBatchStartedShard; +import org.elasticsearch.gateway.TransportNodesBatchListGatewayStartedShards.NodesGatewayBatchStartedShards; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.store.TransportNodesBatchListShardStoreMetadata; +import org.elasticsearch.indices.store.TransportNodesBatchListShardStoreMetadata.NodesBatchStoreFilesMetadata; +import org.elasticsearch.indices.store.TransportNodesListShardStoreMetadata.StoreFilesMetadata; +import org.elasticsearch.test.ESIntegTestCase; +import org.junit.After; + +import java.util.HashMap; +import java.util.Map; + +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.hamcrest.Matchers.hasSize; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 3) +public class BatchAsyncFetchShardsIT extends ESIntegTestCase { + + private Settings buildSettings(boolean batchMode, int batchStepSize) { + return Settings.builder() + .put(AllocationService.CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_ENABLE_SETTING.getKey(), batchMode) + .put(AllocationService.CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_STEP_SIZE_SETTING.getKey(), batchStepSize) + .build(); + } + + private Settings buildNullSettings() { + return Settings.builder() + .putNull(AllocationService.CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_ENABLE_SETTING.getKey()) + .putNull(AllocationService.CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_STEP_SIZE_SETTING.getKey()) + .build(); + } + + @After + public void clearSettings() { + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(buildNullSettings()) + .setPersistentSettings(buildNullSettings()) + .get(); + } + + /** + * Test random async batch fetch shards. + */ + public void testRandomBatchMode() throws Exception { + // random batch + Settings settings = buildSettings(randomBoolean(), 10000); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).setPersistentSettings(settings).get(); + createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 10).put(SETTING_NUMBER_OF_REPLICAS, 2).build()); + + ensureGreen(); + internalCluster().fullRestart(); + ensureGreen(); + } + + /** + * Test batch step size. + * in flush process would assert the queued requests is less or equal to batch step size + */ + public void testBatchStepSize() throws Exception { + Settings settings = buildSettings(true, 4); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).setPersistentSettings(settings).get(); + createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 6).put(SETTING_NUMBER_OF_REPLICAS, 1).build()); + ensureGreen(); + + client().admin().cluster().prepareState().get().getState(); + internalCluster().fullRestart(); + ensureGreen(); + + settings = buildSettings(true, 10); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).setPersistentSettings(settings).get(); + internalCluster().fullRestart(); + ensureGreen(); + } + + public void testAsyncBatchFetchPrimaries() { + createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 0).build()); + ensureGreen(); + + final Index index = resolveIndex("test"); + Map shards = new HashMap<>(); + shards.put(new ShardId(index, 0), ""); + shards.put(new ShardId(index, 1), ""); + shards.put(new ShardId(index, 2), ""); + + ClusterState state = client().admin().cluster().prepareState().get().getState(); + DiscoveryNode node = state.getNodes().getDataNodes().values().iterator().next(); + + NodesGatewayBatchStartedShards response; + response = ActionTestUtils.executeBlocking( + internalCluster().getInstance(TransportNodesBatchListGatewayStartedShards.class), + new TransportNodesBatchListGatewayStartedShards.Request(shards, new DiscoveryNode[] { node }) + ); + + // each node contains 3 shard entry, only got one primary shard + assertThat(response.getNodes(), hasSize(1)); + assertThat(response.getNodes().get(0).getStartedShards(), hasSize(3)); + int count = 0; + for (NodeGatewayBatchStartedShard shard : response.getNodes().get(0).getStartedShards()) { + if (shard.primary()) { + count++; + } + } + assertEquals(1, count); + } + + public void testAsyncBatchListShardStore() { + createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build()); + ensureGreen(); + + final Index index = resolveIndex("test"); + Map shards = new HashMap<>(); + shards.put(new ShardId(index, 0), ""); + shards.put(new ShardId(index, 1), ""); + shards.put(new ShardId(index, 2), ""); + + ClusterState state = client().admin().cluster().prepareState().get().getState(); + DiscoveryNode node = state.getNodes().getDataNodes().values().iterator().next(); + + NodesBatchStoreFilesMetadata response; + response = ActionTestUtils.executeBlocking( + internalCluster().getInstance(TransportNodesBatchListShardStoreMetadata.class), + new TransportNodesBatchListShardStoreMetadata.Request(shards, new DiscoveryNode[] { node }) + ); + + // each node contains 3 shard entry, got two shards store + assertThat(response.getNodes(), hasSize(1)); + assertThat(response.getNodes().get(0).storeFilesMetadataList(), hasSize(3)); + int count = 0; + for (StoreFilesMetadata shardStore : response.getNodes().get(0).storeFilesMetadataList()) { + if (shardStore.isEmpty() == false) { + count++; + } + } + assertEquals(2, count); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 5329c56ba19d3..84381d3b9c0c4 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -254,6 +254,7 @@ import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.gateway.TransportNodesBatchListGatewayStartedShards; import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards; import org.elasticsearch.health.GetHealthAction; import org.elasticsearch.health.RestGetHealthAction; @@ -261,6 +262,7 @@ import org.elasticsearch.index.seqno.RetentionLeaseActions; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.store.TransportNodesBatchListShardStoreMetadata; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetadata; import org.elasticsearch.persistent.CompletionPersistentTaskAction; import org.elasticsearch.persistent.RemovePersistentTaskAction; @@ -669,6 +671,8 @@ public void reg actions.register(TransportVerifyShardIndexBlockAction.TYPE, TransportVerifyShardIndexBlockAction.class); actions.register(TransportNodesListGatewayStartedShards.TYPE, TransportNodesListGatewayStartedShards.class); actions.register(TransportNodesListShardStoreMetadata.TYPE, TransportNodesListShardStoreMetadata.class); + actions.register(TransportNodesBatchListGatewayStartedShards.TYPE, TransportNodesBatchListGatewayStartedShards.class); + actions.register(TransportNodesBatchListShardStoreMetadata.TYPE, TransportNodesBatchListShardStoreMetadata.class); actions.register(TransportShardFlushAction.TYPE, TransportShardFlushAction.class); actions.register(TransportShardRefreshAction.TYPE, TransportShardRefreshAction.class); diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 5aa458c0a5ca1..10f8b350ad42c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -121,7 +121,13 @@ public ClusterModule( this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins); this.clusterService = clusterService; this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext, systemIndices); - this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService); + this.allocationService = new AllocationService( + allocationDeciders, + shardsAllocator, + clusterInfoService, + snapshotsInfoService, + clusterService + ); this.metadataDeleteIndexService = new MetadataDeleteIndexService(settings, clusterService, allocationService); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 8755ee87c5977..6d897d9de7774 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.logging.ESLogMessage; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.gateway.PriorityComparator; import org.elasticsearch.snapshots.SnapshotsInfoService; @@ -72,7 +73,23 @@ public class AllocationService { private Map existingShardsAllocators; private final ShardsAllocator shardsAllocator; private final ClusterInfoService clusterInfoService; - private final SnapshotsInfoService snapshotsInfoService; + private SnapshotsInfoService snapshotsInfoService; + private boolean batchFetchShardEnable; + private int batchFetchShardStepSize; + + public static final Setting CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_ENABLE_SETTING = Setting.boolSetting( + "cluster.routing.allocation.batch_fetch_shard.enable", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_STEP_SIZE_SETTING = Setting.intSetting( + "cluster.routing.allocation.batch_fetch_shard.step_size", + 10000, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); // only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator public AllocationService( @@ -82,7 +99,7 @@ public AllocationService( ClusterInfoService clusterInfoService, SnapshotsInfoService snapshotsInfoService ) { - this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService); + this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, null); setExistingShardsAllocators(Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator)); } @@ -90,12 +107,24 @@ public AllocationService( AllocationDeciders allocationDeciders, ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService, - SnapshotsInfoService snapshotsInfoService + SnapshotsInfoService snapshotsInfoService, + ClusterService clusterService ) { this.allocationDeciders = allocationDeciders; this.shardsAllocator = shardsAllocator; this.clusterInfoService = clusterInfoService; this.snapshotsInfoService = snapshotsInfoService; + if (clusterService != null) { + this.batchFetchShardEnable = CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_ENABLE_SETTING.get(clusterService.getSettings()); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_ENABLE_SETTING, this::setBatchFetchShardEnable); + this.batchFetchShardStepSize = CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_STEP_SIZE_SETTING.get(clusterService.getSettings()); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_STEP_SIZE_SETTING, + this::setBatchFetchShardStepSize + ); + } } /** @@ -536,14 +565,35 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { existingShardsAllocator.beforeAllocation(allocation); } + GatewayAllocator gatewayAllocator = null; + if (logger.isDebugEnabled()) { + logger.debug("set batch fetch mode [{}] for routing allocation.", batchFetchShardEnable); + } + allocation.setBatchShardFetchMode(batchFetchShardEnable); + final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator(); while (primaryIterator.hasNext()) { final ShardRouting shardRouting = primaryIterator.next(); if (shardRouting.primary()) { - getAllocatorForShard(shardRouting, allocation).allocateUnassigned(shardRouting, allocation, primaryIterator); + ExistingShardsAllocator allocator = getAllocatorForShard(shardRouting, allocation); + allocator.allocateUnassigned(shardRouting, allocation, primaryIterator); + if (gatewayAllocator == null && allocator instanceof GatewayAllocator) { + gatewayAllocator = (GatewayAllocator) allocator; + } + + if (gatewayAllocator != null + && gatewayAllocator.getPrimaryPendingFetchShardCount() > 0 + && gatewayAllocator.getPrimaryPendingFetchShardCount() % batchFetchShardStepSize == 0) { + gatewayAllocator.flushPendingPrimaryFetchRequests(batchFetchShardStepSize); + } } } + // flush the rest primaries + if (gatewayAllocator != null) { + gatewayAllocator.flushPendingPrimaryFetchRequests(batchFetchShardStepSize); + } + for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) { existingShardsAllocator.afterPrimariesBeforeReplicas(allocation); } @@ -552,9 +602,24 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { while (replicaIterator.hasNext()) { final ShardRouting shardRouting = replicaIterator.next(); if (shardRouting.primary() == false) { - getAllocatorForShard(shardRouting, allocation).allocateUnassigned(shardRouting, allocation, replicaIterator); + ExistingShardsAllocator allocator = getAllocatorForShard(shardRouting, allocation); + allocator.allocateUnassigned(shardRouting, allocation, replicaIterator); + if (gatewayAllocator == null && allocator instanceof GatewayAllocator) { + gatewayAllocator = (GatewayAllocator) allocator; + } + + if (gatewayAllocator != null + && gatewayAllocator.getReplicaPendingFetchShardCount() > 0 + && gatewayAllocator.getReplicaPendingFetchShardCount() % batchFetchShardStepSize == 0) { + gatewayAllocator.flushPendingReplicaFetchRequests(batchFetchShardStepSize); + } } } + + // flush the rest replicas + if (gatewayAllocator != null) { + gatewayAllocator.flushPendingReplicaFetchRequests(batchFetchShardStepSize); + } } private static void disassociateDeadNodes(RoutingAllocation allocation) { @@ -622,6 +687,14 @@ private static RoutingNodes getMutableRoutingNodes(ClusterState clusterState) { return clusterState.mutableRoutingNodes(); } + public void setBatchFetchShardEnable(boolean batchFetchShardEnable) { + this.batchFetchShardEnable = batchFetchShardEnable; + } + + public void setBatchFetchShardStepSize(int batchFetchShardStepSize) { + this.batchFetchShardStepSize = batchFetchShardStepSize; + } + /** override this to control time based decisions during allocation */ protected long currentNanoTime() { return System.nanoTime(); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index d9827add4a2f1..fd8171a681c8f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -62,6 +62,8 @@ public class RoutingAllocation { private final long currentNanoTime; + private boolean batchShardFetchMode = false; + private final IndexMetadataUpdater indexMetadataUpdater = new IndexMetadataUpdater(); private final RoutingNodesChangedObserver nodesChangedObserver = new RoutingNodesChangedObserver(); private final RestoreInProgressUpdater restoreInProgressUpdater = new RestoreInProgressUpdater(); @@ -320,6 +322,18 @@ public void setHasPendingAsyncFetch() { this.hasPendingAsyncFetch = true; } + /** + * group shards in node level for async shard data fetching process. + * @return true batch mode, else single shard mode. + */ + public boolean isBatchShardFetchMode() { + return batchShardFetchMode; + } + + public void setBatchShardFetchMode(boolean batchShardFetchMode) { + this.batchShardFetchMode = batchShardFetchMode; + } + public enum DebugMode { /** * debug mode is off diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 286b0604ef6ba..2f1937b09b1b3 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.DataTier; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; @@ -244,6 +245,8 @@ public void apply(Settings value, Settings current, Settings previous) { SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING, InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING, + AllocationService.CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_ENABLE_SETTING, + AllocationService.CLUSTER_ROUTING_ALLOCATION_BATCH_FETCH_SHARD_STEP_SIZE_SETTING, InternalSnapshotsInfoService.INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING, DestructiveOperations.REQUIRES_NAME_SETTING, NoMasterBlockService.NO_MASTER_BLOCK_SETTING, diff --git a/server/src/main/java/org/elasticsearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/BaseGatewayShardAllocator.java index c7145ed444d38..21429616148e5 100644 --- a/server/src/main/java/org/elasticsearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/BaseGatewayShardAllocator.java @@ -106,4 +106,12 @@ protected static List buildDecisionsForAllNodes(ShardRouti } return results; } + + protected void flushPendingFetchShardRequests(int batchStepSize) { + // default no-op + } + + protected int getPendingFetchShardCount() { + return 0; + } } diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 5765a43b30598..dcfc7d96226f5 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.client.internal.node.NodeClient; @@ -30,16 +31,32 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Releasables; import org.elasticsearch.gateway.AsyncShardFetch.Lister; +import org.elasticsearch.gateway.TransportNodesBatchListGatewayStartedShards.NodeGatewayBatchStartedShard; +import org.elasticsearch.gateway.TransportNodesBatchListGatewayStartedShards.NodeGatewayBatchStartedShards; +import org.elasticsearch.gateway.TransportNodesBatchListGatewayStartedShards.NodesGatewayBatchStartedShards; +import org.elasticsearch.gateway.TransportNodesBatchListGatewayStartedShards.ShardRequestInfo; import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; +import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodesGatewayStartedShards; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.store.TransportNodesBatchListShardStoreMetadata; +import org.elasticsearch.indices.store.TransportNodesBatchListShardStoreMetadata.NodeBatchStoreFilesMetadata; +import org.elasticsearch.indices.store.TransportNodesBatchListShardStoreMetadata.NodesBatchStoreFilesMetadata; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetadata; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata; +import org.elasticsearch.indices.store.TransportNodesListShardStoreMetadata.NodesStoreFilesMetadata; +import org.elasticsearch.indices.store.TransportNodesListShardStoreMetadata.StoreFilesMetadata; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; public class GatewayAllocator implements ExistingShardsAllocator { @@ -165,6 +182,34 @@ public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting } } + public void flushPendingPrimaryFetchRequests(int batchStepSize) { + // test case could be null + if (primaryShardAllocator != null) { + this.primaryShardAllocator.flushPendingFetchShardRequests(batchStepSize); + } + } + + public void flushPendingReplicaFetchRequests(int batchStepSize) { + // test case could be null + if (replicaShardAllocator != null) { + this.replicaShardAllocator.flushPendingFetchShardRequests(batchStepSize); + } + } + + public int getPrimaryPendingFetchShardCount() { + if (primaryShardAllocator == null) { + return 0; + } + return primaryShardAllocator.getPendingFetchShardCount(); + } + + public int getReplicaPendingFetchShardCount() { + if (replicaShardAllocator == null) { + return 0; + } + return replicaShardAllocator.getPendingFetchShardCount(); + } + /** * Clear the fetched data for the primary to ensure we do not cancel recoveries based on excessively stale data. */ @@ -241,6 +286,10 @@ protected void reroute(ShardId shardId, String reason) { class InternalPrimaryShardAllocator extends PrimaryShardAllocator { private final NodeClient client; + private final AtomicInteger pendingFetchShardCount = new AtomicInteger(); + // node batched shard requests + private final Map>> queuedRequests = + new ConcurrentHashMap<>(); InternalPrimaryShardAllocator(NodeClient client) { this.client = client; @@ -249,16 +298,24 @@ class InternalPrimaryShardAllocator extends PrimaryShardAllocator { @Override protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { // explicitly type lister, some IDEs (Eclipse) are not able to correctly infer the function type - Lister, NodeGatewayStartedShards> lister = this::listStartedShards; + AsyncShardFetch fetch = asyncFetchStarted.computeIfAbsent( shard.shardId(), - shardId -> new InternalAsyncFetch<>( - logger, - "shard_started", - shardId, - IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()), - lister - ) + shardId -> allocation.isBatchShardFetchMode() + ? new InternalAsyncFetch<>( + logger, + "batch_shard_started", + shardId, + IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()), + batchLister + ) + : new InternalAsyncFetch<>( + logger, + "shard_started", + shardId, + IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()), + lister + ) ); AsyncShardFetch.FetchResult shardState = fetch.fetchData( allocation.nodes(), @@ -271,6 +328,8 @@ protected AsyncShardFetch.FetchResult fetchData(ShardR return shardState; } + Lister, NodeGatewayStartedShards> lister = this::listStartedShards; + private void listStartedShards( ShardId shardId, String customDataPath, @@ -284,11 +343,170 @@ private void listStartedShards( ActionListener.wrap(listener::onResponse, listener::onFailure) ); } + + Lister, NodeGatewayStartedShards> batchLister = this::batchListStartedShards; + + private synchronized void batchListStartedShards( + ShardId shardId, + String customDataPath, + DiscoveryNode[] nodes, + ActionListener> listener + ) { + pendingFetchShardCount.incrementAndGet(); + // group shards by node + ShardRequestInfo shardRequestInfo = new ShardRequestInfo<>(shardId, customDataPath, listener); + for (DiscoveryNode node : nodes) { + var nodeLevelRequests = queuedRequests.computeIfAbsent(node, n -> new HashMap<>()); + nodeLevelRequests.put(shardId, shardRequestInfo); + } + if (logger.isTraceEnabled()) { + for (DiscoveryNode node : nodes) { + logger.trace( + "Queued number of [{}] async fetch shard requests for node [{}]", + queuedRequests.get(node).size(), + node.getId() + ); + } + } + }; + + @Override + public synchronized void flushPendingFetchShardRequests(int batchStepSize) { + if (queuedRequests.isEmpty()) { + return; + } + assert assertLessOrEqualToBatchStepSize(batchStepSize); + logger.debug("flushing {} primary fetching requests", queuedRequests.size()); + final CountDownLatch nodeRequestLatch = new CountDownLatch(queuedRequests.size()); + for (DiscoveryNode node : queuedRequests.keySet()) { + Map> shardRequests = queuedRequests.get(node); + Map targetShards = new HashMap<>(); + for (ShardId shardId : shardRequests.keySet()) { + targetShards.put(shardId, shardRequests.get(shardId).getCustomDataPath()); + } + + assert targetShards.isEmpty() == false; + logger.debug("Batch sending number of {} primary shard async fetch requests to node {}", targetShards.size(), node.getId()); + + // send shards fetch request per node + final var curNodeRequests = queuedRequests.get(node); + client.executeLocally( + TransportNodesBatchListGatewayStartedShards.TYPE, + new TransportNodesBatchListGatewayStartedShards.Request(targetShards, new DiscoveryNode[] { node }), + new ActionListener<>() { + @Override + public void onResponse(NodesGatewayBatchStartedShards nodesBatchStartedShards) { + nodeRequestLatch.countDown(); + if (nodesBatchStartedShards.failures().size() > 0) { + // single node, got failed node request then node response must be empty. + assert nodesBatchStartedShards.getNodes().size() == 0; + for (var request : curNodeRequests.entrySet()) { + request.getValue() + .getListener() + .onResponse( + new NodesGatewayStartedShards( + nodesBatchStartedShards.getClusterName(), + new ArrayList<>(), // empty response + nodesBatchStartedShards.failures() + ) + ); + } + return; + } + + assert nodesBatchStartedShards.getNodes().size() == 1; + NodeGatewayBatchStartedShards startedShardResponse = nodesBatchStartedShards.getNodes().get(0); + List startedShards = startedShardResponse.getStartedShards(); + assert startedShards.size() == curNodeRequests.size(); + for (NodeGatewayBatchStartedShard shard : startedShards) { + // transfer to NodeGatewayStartedShards to bwc. + List listSingleStartedShards = new ArrayList<>(1); + listSingleStartedShards.add( + new NodeGatewayStartedShards( + shard.getNode(), + shard.allocationId(), + shard.primary(), + shard.storeException() + ) + ); + + NodesGatewayStartedShards newNodesResponse = new NodesGatewayStartedShards( + nodesBatchStartedShards.getClusterName(), + listSingleStartedShards, + nodesBatchStartedShards.failures() + ); + + if (logger.isTraceEnabled()) { + logger.trace("got primary {} fetching response from node {}", shard.getShardId(), node.getId()); + } + ShardRequestInfo requestInfo = curNodeRequests.get(shard.getShardId()); + if (requestInfo != null) { + requestInfo.getListener().onResponse(newNodesResponse); + } else { + logger.debug("primary shard {} fetching has failed, listener has been cleared", shard.getShardId()); + } + + targetShards.remove(shard.getShardId()); + } + + // some shards may not respond + for (ShardId shard : targetShards.keySet()) { + curNodeRequests.get(shard) + .getListener() + .onFailure( + new FailedNodeException( + node.getId(), + "Failed node [" + node.getId() + "]", + new Exception("Failed to fetch " + shard) + ) + ); + } + } + + @Override + public void onFailure(Exception e) { + nodeRequestLatch.countDown(); + for (var request : curNodeRequests.entrySet()) { + request.getValue().getListener().onFailure(e); + } + } + } + ); + } + + try { + nodeRequestLatch.await(); + } catch (InterruptedException e) { + logger.warn("thread got interrupted while waiting for the node level primary shard fetch response."); + Thread.currentThread().interrupt(); + } finally { + queuedRequests.clear(); + pendingFetchShardCount.set(0); + } + } + + @Override + public int getPendingFetchShardCount() { + return pendingFetchShardCount.get(); + } + + private boolean assertLessOrEqualToBatchStepSize(int batchStepSize) { + Set shards = new HashSet<>(); + for (var request : queuedRequests.entrySet()) { + for (var shard : request.getValue().entrySet()) { + shards.add(shard.getValue().shardId()); + } + } + return shards.size() <= batchStepSize; + } } class InternalReplicaShardAllocator extends ReplicaShardAllocator { private final NodeClient client; + private final AtomicInteger pendingFetchShardCount = new AtomicInteger(); + // node batched shard requests + private final Map>> queuedRequests = new ConcurrentHashMap<>(); InternalReplicaShardAllocator(NodeClient client) { this.client = client; @@ -297,16 +515,23 @@ class InternalReplicaShardAllocator extends ReplicaShardAllocator { @Override protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { // explicitly type lister, some IDEs (Eclipse) are not able to correctly infer the function type - Lister, NodeStoreFilesMetadata> lister = this::listStoreFilesMetadata; AsyncShardFetch fetch = asyncFetchStore.computeIfAbsent( shard.shardId(), - shardId -> new InternalAsyncFetch<>( - logger, - "shard_store", - shard.shardId(), - IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()), - lister - ) + shardId -> allocation.isBatchShardFetchMode() + ? new InternalAsyncFetch<>( + logger, + "batch_shard_store", + shard.shardId(), + IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()), + batchLister + ) + : new InternalAsyncFetch<>( + logger, + "shard_store", + shard.shardId(), + IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()), + lister + ) ); AsyncShardFetch.FetchResult shardStores = fetch.fetchData( allocation.nodes(), @@ -318,6 +543,13 @@ protected AsyncShardFetch.FetchResult fetchData(ShardRou return shardStores; } + @Override + protected boolean hasInitiatedFetching(ShardRouting shard) { + return asyncFetchStore.get(shard.shardId()) != null; + } + + Lister, NodeStoreFilesMetadata> lister = this::listStoreFilesMetadata; + private void listStoreFilesMetadata( ShardId shardId, String customDataPath, @@ -332,9 +564,160 @@ private void listStoreFilesMetadata( ); } + Lister, NodeStoreFilesMetadata> batchLister = this::batchListStoreFilesMetadata; + + private void batchListStoreFilesMetadata( + ShardId shardId, + String customDataPath, + DiscoveryNode[] nodes, + ActionListener> listener + ) { + pendingFetchShardCount.incrementAndGet(); + // group shards by node + ShardRequestInfo shardRequestInfo = new ShardRequestInfo<>(shardId, customDataPath, listener); + for (DiscoveryNode node : nodes) { + var nodeLevelRequests = queuedRequests.computeIfAbsent(node, n -> new HashMap<>()); + nodeLevelRequests.put(shardId, shardRequestInfo); + } + if (logger.isTraceEnabled()) { + for (DiscoveryNode node : nodes) { + logger.trace( + "Queued number of [{}] async list store metadata requests for node [{}]", + queuedRequests.get(node).size(), + node.getId() + ); + } + } + }; + + @Override + public void flushPendingFetchShardRequests(int batchStepSize) { + if (queuedRequests.isEmpty()) { + return; + } + assert assertLessOrEqualToBatchStepSize(batchStepSize); + logger.debug("flushing {} replica fetching requests", queuedRequests.size()); + final CountDownLatch nodeRequestLatch = new CountDownLatch(queuedRequests.size()); + for (DiscoveryNode node : queuedRequests.keySet()) { + Map targetShards = new HashMap<>(); + var nodeRequest = queuedRequests.get(node); + for (var shardRequest : nodeRequest.entrySet()) { + targetShards.put(shardRequest.getKey(), shardRequest.getValue().getCustomDataPath()); + } + + assert targetShards.isEmpty() == false; + logger.debug("Batch sending number of {} replica shard async fetch requests to node {}", targetShards.size(), node.getId()); + + // send shards fetch request per node + final var curNodeRequests = queuedRequests.get(node); + client.executeLocally( + TransportNodesBatchListShardStoreMetadata.TYPE, + new TransportNodesBatchListShardStoreMetadata.Request(targetShards, new DiscoveryNode[] { node }), + new ActionListener<>() { + @Override + public void onResponse(NodesBatchStoreFilesMetadata nodesBatchStoreFilesMetadata) { + nodeRequestLatch.countDown(); + if (nodesBatchStoreFilesMetadata.failures().size() > 0) { + // single node, got failed node request then node response must be empty. + assert nodesBatchStoreFilesMetadata.getNodes().size() == 0; + for (var request : curNodeRequests.entrySet()) { + request.getValue() + .getListener() + .onResponse( + new NodesStoreFilesMetadata( + nodesBatchStoreFilesMetadata.getClusterName(), + new ArrayList<>(), // empty response + nodesBatchStoreFilesMetadata.failures() + ) + ); + } + return; + } + + assert nodesBatchStoreFilesMetadata.getNodes().size() == 1; + NodeBatchStoreFilesMetadata storeFilesResponse = nodesBatchStoreFilesMetadata.getNodes().get(0); + List storeFiles = storeFilesResponse.storeFilesMetadataList(); + for (StoreFilesMetadata shard : storeFiles) { + // transfer to NodeStoreFilesMetadata to bwc. + List listStoreFiles = new ArrayList<>(1); + listStoreFiles.add(new NodeStoreFilesMetadata(node, shard)); + + NodesStoreFilesMetadata newNodesResponse = new NodesStoreFilesMetadata( + nodesBatchStoreFilesMetadata.getClusterName(), + listStoreFiles, + nodesBatchStoreFilesMetadata.failures() + ); + + if (logger.isTraceEnabled()) { + logger.trace("got replica {} fetching response from node {}", shard.shardId(), node.getId()); + } + + ShardRequestInfo requestInfo = curNodeRequests.get(shard.shardId()); + if (requestInfo != null) { + requestInfo.getListener().onResponse(newNodesResponse); + } else { + logger.debug("replica shard {} fetching has failed, listener has been cleared", shard.shardId()); + } + + targetShards.remove(shard.shardId()); + } + + // some shards don't have copy on this node + for (ShardId shard : targetShards.keySet()) { + // transfer to NodeStoreFilesMetadata to bwc. + List listStoreFiles = new ArrayList<>(1); + listStoreFiles.add(new NodeStoreFilesMetadata(node, StoreFilesMetadata.EMPTY)); + + NodesStoreFilesMetadata newNodesResponse = new NodesStoreFilesMetadata( + nodesBatchStoreFilesMetadata.getClusterName(), + listStoreFiles, + nodesBatchStoreFilesMetadata.failures() + ); + + ShardRequestInfo requestInfo = curNodeRequests.get(shard); + if (requestInfo != null) { + requestInfo.getListener().onResponse(newNodesResponse); + } else { + logger.debug("replica shard {} fetching has failed, listener has been cleared", shard); + } + } + } + + @Override + public void onFailure(Exception e) { + nodeRequestLatch.countDown(); + for (var request : curNodeRequests.entrySet()) { + request.getValue().getListener().onFailure(e); + } + } + } + ); + } + + try { + nodeRequestLatch.await(); + } catch (InterruptedException e) { + logger.warn("thread got interrupted while waiting for the node level replica shard fetch response."); + Thread.currentThread().interrupt(); + } finally { + queuedRequests.clear(); + pendingFetchShardCount.set(0); + } + } + @Override - protected boolean hasInitiatedFetching(ShardRouting shard) { - return asyncFetchStore.get(shard.shardId()) != null; + public int getPendingFetchShardCount() { + return pendingFetchShardCount.get(); + } + + private boolean assertLessOrEqualToBatchStepSize(int batchStepSize) { + Set shards = new HashSet<>(); + for (var request : queuedRequests.entrySet()) { + for (var shard : request.getValue().entrySet()) { + shards.add(shard.getValue().shardId()); + } + } + return shards.size() <= batchStepSize; } } } diff --git a/server/src/main/java/org/elasticsearch/gateway/TransportNodesBatchListGatewayStartedShards.java b/server/src/main/java/org/elasticsearch/gateway/TransportNodesBatchListGatewayStartedShards.java new file mode 100644 index 0000000000000..326cd6c0e9cce --- /dev/null +++ b/server/src/main/java/org/elasticsearch/gateway/TransportNodesBatchListGatewayStartedShards.java @@ -0,0 +1,393 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.gateway; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.shard.ShardStateMetadata; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.NamedXContentRegistry; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; + +/** + * This transport action is used to fetch the shard version from each node during primary allocation in {@link GatewayAllocator}. + * We use this to find out which node holds the latest shard version and which of them used to be a primary in order to allocate + * shards after node or cluster restarts. + */ +public class TransportNodesBatchListGatewayStartedShards extends TransportNodesAction< + TransportNodesBatchListGatewayStartedShards.Request, + TransportNodesBatchListGatewayStartedShards.NodesGatewayBatchStartedShards, + TransportNodesBatchListGatewayStartedShards.NodeRequest, + TransportNodesBatchListGatewayStartedShards.NodeGatewayBatchStartedShards> { + + public static final String ACTION_NAME = "internal:gateway/local/batch_started_shards"; + public static final ActionType TYPE = new ActionType<>( + ACTION_NAME, + NodesGatewayBatchStartedShards::new + ); + + private final Settings settings; + private final NodeEnvironment nodeEnv; + private final IndicesService indicesService; + private final NamedXContentRegistry namedXContentRegistry; + + @Inject + public TransportNodesBatchListGatewayStartedShards( + Settings settings, + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + NodeEnvironment env, + IndicesService indicesService, + NamedXContentRegistry namedXContentRegistry + ) { + super( + ACTION_NAME, + threadPool, + clusterService, + transportService, + actionFilters, + Request::new, + NodeRequest::new, + ThreadPool.Names.FETCH_SHARD_STARTED, + NodeGatewayBatchStartedShards.class + ); + this.settings = settings; + this.nodeEnv = env; + this.indicesService = indicesService; + this.namedXContentRegistry = namedXContentRegistry; + } + + @Override + protected NodeRequest newNodeRequest(Request request) { + return new NodeRequest(request); + } + + @Override + protected NodeGatewayBatchStartedShards newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + final NodeGatewayBatchStartedShards response = new NodeGatewayBatchStartedShards(in, node); + assert response.getNode() == node; + return response; + } + + @Override + protected NodesGatewayBatchStartedShards newResponse( + Request request, + List responses, + List failures + ) { + return new NodesGatewayBatchStartedShards(clusterService.getClusterName(), responses, failures); + } + + @Override + protected NodeGatewayBatchStartedShards nodeOperation(NodeRequest request, Task task) { + NodeGatewayBatchStartedShards batchStartedShards = new NodeGatewayBatchStartedShards(clusterService.localNode()); + try { + final CountDownLatch latch = new CountDownLatch(request.getShards().size()); + for (Map.Entry entry : request.getShards().entrySet()) { + threadPool.executor(ThreadPool.Names.FETCH_SHARD_STARTED).execute(() -> { + try { + batchStartedShards.addStartedShard(listStartedShard(entry.getKey(), entry.getValue())); + } finally { + latch.countDown(); + } + }); + } + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return batchStartedShards; + } + + private NodeGatewayBatchStartedShard listStartedShard(final ShardId shardId, String customDataPath) { + try { + logger.trace("{} loading local shard state info", shardId); + ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState( + logger, + namedXContentRegistry, + nodeEnv.availableShardPaths(shardId) + ); + if (shardStateMetadata != null) { + if (indicesService.getShardOrNull(shardId) == null) { + if (customDataPath == null) { + // TODO: Fallback for BWC with older ES versions. Remove once request.getCustomDataPath() always returns non-null + final IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); + if (metadata != null) { + customDataPath = new IndexSettings(metadata, settings).customDataPath(); + } else { + logger.trace("{} node doesn't have meta data for the requests index", shardId); + throw new ElasticsearchException("node doesn't have meta data for index " + shardId.getIndex()); + } + } + // we don't have an open shard on the store, validate the files on disk are openable + ShardPath shardPath = null; + try { + shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath); + if (shardPath == null) { + throw new IllegalStateException(shardId + " no shard path found"); + } + Store.tryOpenIndex(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger); + } catch (Exception exception) { + final ShardPath finalShardPath = shardPath; + logger.trace( + () -> new ParameterizedMessage( + "{} can't open index for shard [{}] in path [{}]", + shardId, + shardStateMetadata, + (finalShardPath != null) ? finalShardPath.resolveIndex() : "" + ), + exception + ); + String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; + return new NodeGatewayBatchStartedShard( + shardId, + clusterService.localNode(), + allocationId, + shardStateMetadata.primary, + exception + ); + } + } + + logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata); + String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; + return new NodeGatewayBatchStartedShard(shardId, clusterService.localNode(), allocationId, shardStateMetadata.primary); + } + logger.trace("{} no local shard info found", shardId); + return new NodeGatewayBatchStartedShard(shardId, clusterService.localNode(), null, false); + } catch (Exception e) { + throw new ElasticsearchException("failed to load started shards", e); + } + } + + public static class ShardRequestInfo { + + private final ShardId shardId; + private final ActionListener> listener; + private final String customDataPath; + + public ShardRequestInfo(ShardId shardId, String customDataPath, ActionListener> listener) { + this.shardId = Objects.requireNonNull(shardId); + this.customDataPath = Objects.requireNonNull(customDataPath); + this.listener = Objects.requireNonNull(listener); + } + + public ShardId shardId() { + return shardId; + } + + public String getCustomDataPath() { + return customDataPath; + } + + public ActionListener> getListener() { + return listener; + } + } + + public static class Request extends BaseNodesRequest { + + private final Map shards; + + public Request(StreamInput in) throws IOException { + super(in); + shards = new HashMap<>(); + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + shards.put(new ShardId(in), in.readString()); + } + } + + public Request(Map shards, DiscoveryNode[] nodes) { + super(nodes); + this.shards = shards; + } + + public Map getShards() { + return shards; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(shards.size()); + for (Map.Entry entry : shards.entrySet()) { + entry.getKey().writeTo(out); + out.writeString(entry.getValue()); + } + } + } + + public static class NodeRequest extends TransportRequest { + + private final Map shards; + + public NodeRequest(StreamInput in) throws IOException { + super(in); + shards = new HashMap<>(); + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + shards.put(new ShardId(in), in.readString()); + } + } + + public NodeRequest(Request request) { + this.shards = request.shards; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(shards.size()); + for (Map.Entry entry : shards.entrySet()) { + entry.getKey().writeTo(out); + out.writeString(entry.getValue()); + } + } + + public Map getShards() { + return shards; + } + } + + public static class NodesGatewayBatchStartedShards extends BaseNodesResponse { + + public NodesGatewayBatchStartedShards(StreamInput in) throws IOException { + super(in); + } + + public NodesGatewayBatchStartedShards( + ClusterName clusterName, + List nodes, + List failures + ) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodeGatewayBatchStartedShards::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + } + + public static class NodeGatewayBatchStartedShards extends BaseNodeResponse { + + private final List startedShards = Collections.synchronizedList(new ArrayList<>()); + + public NodeGatewayBatchStartedShards(DiscoveryNode node) { + super(node); + } + + public NodeGatewayBatchStartedShards(StreamInput in) throws IOException { + this(in, null); + } + + public NodeGatewayBatchStartedShards(StreamInput in, DiscoveryNode node) throws IOException { + super(node); // we skip node serialization here, instead do it in NodeGatewayBatchStartedShard. + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + startedShards.add(new NodeGatewayBatchStartedShard(in, node)); + } + } + + public void addStartedShard(NodeGatewayBatchStartedShard startedShard) { + startedShards.add(startedShard); + } + + public List getStartedShards() { + return startedShards; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + // we skip node serialization here, instead do it in NodeGatewayBatchStartedShard. + out.writeVInt(startedShards.size()); + for (NodeGatewayBatchStartedShard shard : startedShards) { + shard.writeTo(out); + } + } + } + + public static class NodeGatewayBatchStartedShard extends NodeGatewayStartedShards { + + private final ShardId shardId; + + public NodeGatewayBatchStartedShard(StreamInput in, DiscoveryNode node) throws IOException { + super(in, node); + shardId = new ShardId(in); + } + + public NodeGatewayBatchStartedShard(ShardId shardId, DiscoveryNode node, String allocationId, boolean primary) { + super(node, allocationId, primary, null); + this.shardId = shardId; + } + + public NodeGatewayBatchStartedShard( + ShardId shardId, + DiscoveryNode node, + String allocationId, + boolean primary, + Exception storeException + ) { + super(node, allocationId, primary, storeException); + this.shardId = shardId; + } + + public ShardId getShardId() { + return shardId; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardId.writeTo(out); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesBatchListShardStoreMetadata.java b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesBatchListShardStoreMetadata.java new file mode 100644 index 0000000000000..2babc39c588f7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesBatchListShardStoreMetadata.java @@ -0,0 +1,332 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.indices.store; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.store.TransportNodesListShardStoreMetadata.StoreFilesMetadata; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class TransportNodesBatchListShardStoreMetadata extends TransportNodesAction< + TransportNodesBatchListShardStoreMetadata.Request, + TransportNodesBatchListShardStoreMetadata.NodesBatchStoreFilesMetadata, + TransportNodesBatchListShardStoreMetadata.NodeRequest, + TransportNodesBatchListShardStoreMetadata.NodeBatchStoreFilesMetadata> { + + public static final String ACTION_NAME = "internal:cluster/nodes/indices/batch_shard/store"; + public static final ActionType TYPE = new ActionType<>(ACTION_NAME, NodesBatchStoreFilesMetadata::new); + + private final Settings settings; + private final IndicesService indicesService; + private final NodeEnvironment nodeEnv; + + @Inject + public TransportNodesBatchListShardStoreMetadata( + Settings settings, + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + IndicesService indicesService, + NodeEnvironment nodeEnv, + ActionFilters actionFilters + ) { + super( + ACTION_NAME, + threadPool, + clusterService, + transportService, + actionFilters, + Request::new, + NodeRequest::new, + ThreadPool.Names.FETCH_SHARD_STORE, + NodeBatchStoreFilesMetadata.class + ); + this.settings = settings; + this.indicesService = indicesService; + this.nodeEnv = nodeEnv; + } + + @Override + protected NodeRequest newNodeRequest(Request request) { + return new NodeRequest(request); + } + + @Override + protected NodeBatchStoreFilesMetadata newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + final NodeBatchStoreFilesMetadata nodeBatchStoreFilesMetadata = new NodeBatchStoreFilesMetadata(in, node); + assert nodeBatchStoreFilesMetadata.getNode() == node; + return nodeBatchStoreFilesMetadata; + } + + @Override + protected NodesBatchStoreFilesMetadata newResponse( + Request request, + List responses, + List failures + ) { + return new NodesBatchStoreFilesMetadata(clusterService.getClusterName(), responses, failures); + } + + @Override + protected NodeBatchStoreFilesMetadata nodeOperation(NodeRequest request, Task task) { + NodeBatchStoreFilesMetadata batchStoreFiles = new NodeBatchStoreFilesMetadata(clusterService.localNode()); + try { + final CountDownLatch latch = new CountDownLatch(request.getShards().size()); + for (Map.Entry entry : request.getShards().entrySet()) { + threadPool.executor(ThreadPool.Names.FETCH_SHARD_STORE).execute(() -> { + try { + batchStoreFiles.addStoreFilesMetadata(listStoreMetadata(entry.getKey(), entry.getValue())); + } catch (IOException e) { + throw new ElasticsearchException("Failed to list store metadata for shard [" + entry.getKey() + "]", e); + } finally { + latch.countDown(); + } + }); + } + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return batchStoreFiles; + } + + private StoreFilesMetadata listStoreMetadata(final ShardId shardId, String customDataPath) throws IOException { + logger.trace("listing store meta data for {}", shardId); + long startTimeNS = System.nanoTime(); + boolean exists = false; + try { + IndexService indexService = indicesService.indexService(shardId.getIndex()); + if (indexService != null) { + IndexShard indexShard = indexService.getShardOrNull(shardId.id()); + if (indexShard != null) { + try { + final StoreFilesMetadata storeFilesMetadata = new StoreFilesMetadata( + shardId, + indexShard.snapshotStoreMetadata(), + indexShard.getPeerRecoveryRetentionLeases() + ); + exists = true; + return storeFilesMetadata; + } catch (org.apache.lucene.index.IndexNotFoundException e) { + logger.trace(new ParameterizedMessage("[{}] node is missing index, responding with empty", shardId), e); + return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); + } catch (IOException e) { + logger.warn(new ParameterizedMessage("[{}] can't read metadata from store, responding with empty", shardId), e); + return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); + } + } + } + + if (customDataPath == null) { + // TODO: Fallback for BWC with older ES versions. Remove this once request.getCustomDataPath() always returns non-null + if (indexService != null) { + customDataPath = indexService.getIndexSettings().customDataPath(); + } else { + IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); + if (metadata != null) { + customDataPath = new IndexSettings(metadata, settings).customDataPath(); + } else { + logger.trace("{} node doesn't have meta data for the requests index", shardId); + throw new ElasticsearchException("node doesn't have meta data for index " + shardId.getIndex()); + } + } + } + final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath); + if (shardPath == null) { + return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); + } + // note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means: + // 1) a shard is being constructed, which means the master will not use a copy of this replica + // 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the master may not + // reuse local resources. + final Store.MetadataSnapshot metadataSnapshot = Store.readMetadataSnapshot( + shardPath.resolveIndex(), + shardId, + nodeEnv::shardLock, + logger + ); + // We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases when + // we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard. + return new StoreFilesMetadata(shardId, metadataSnapshot, Collections.emptyList()); + } finally { + TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); + if (exists) { + logger.debug("{} loaded store meta data (took [{}])", shardId, took); + } else { + logger.trace("{} didn't find any store meta data to load (took [{}])", shardId, took); + } + } + } + + public static class Request extends BaseNodesRequest { + + private final Map shards; + + public Request(StreamInput in) throws IOException { + super(in); + shards = new HashMap<>(); + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + shards.put(new ShardId(in), in.readString()); + } + } + + public Request(Map shards, DiscoveryNode[] nodes) { + super(nodes); + this.shards = shards; + } + + public Map getShards() { + return shards; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(shards.size()); + for (Map.Entry entry : shards.entrySet()) { + entry.getKey().writeTo(out); + out.writeString(entry.getValue()); + } + } + } + + public static class NodeRequest extends TransportRequest { + + private final Map shards; + + public NodeRequest(StreamInput in) throws IOException { + super(in); + shards = new HashMap<>(); + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + shards.put(new ShardId(in), in.readString()); + } + } + + public NodeRequest(Request request) { + this.shards = request.shards; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(shards.size()); + for (Map.Entry entry : shards.entrySet()) { + entry.getKey().writeTo(out); + out.writeString(entry.getValue()); + } + } + + public Map getShards() { + return shards; + } + } + + public static class NodesBatchStoreFilesMetadata extends BaseNodesResponse { + + public NodesBatchStoreFilesMetadata(StreamInput in) throws IOException { + super(in); + } + + public NodesBatchStoreFilesMetadata( + ClusterName clusterName, + List nodes, + List failures + ) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodeBatchStoreFilesMetadata::readListShardStoreNodeOperationResponse); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + } + + public static class NodeBatchStoreFilesMetadata extends BaseNodeResponse { + + private final List storeFilesMetadataList = Collections.synchronizedList( + new ArrayList<>() + ); + + public NodeBatchStoreFilesMetadata(DiscoveryNode node) { + super(node); + } + + public NodeBatchStoreFilesMetadata(StreamInput in, DiscoveryNode node) throws IOException { + super(in, node); + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + storeFilesMetadataList.add(StoreFilesMetadata.readFrom(in)); + } + } + + public List storeFilesMetadataList() { + return storeFilesMetadataList; + } + + public void addStoreFilesMetadata(TransportNodesListShardStoreMetadata.StoreFilesMetadata nodeStoreFilesMetadata) { + this.storeFilesMetadataList.add(nodeStoreFilesMetadata); + } + + public static NodeBatchStoreFilesMetadata readListShardStoreNodeOperationResponse(StreamInput in) throws IOException { + return new NodeBatchStoreFilesMetadata(in, null); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(storeFilesMetadataList.size()); + for (TransportNodesListShardStoreMetadata.StoreFilesMetadata shard : storeFilesMetadataList) { + shard.writeTo(out); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetadata.java b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetadata.java index a3cb92a183a71..d5aea126f6644 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetadata.java +++ b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetadata.java @@ -134,6 +134,7 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep if (indexShard != null) { try { final StoreFilesMetadata storeFilesMetadata = new StoreFilesMetadata( + shardId, indexShard.snapshotStoreMetadata(), indexShard.getPeerRecoveryRetentionLeases() ); @@ -181,7 +182,7 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep ); // We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases when // we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard. - return new StoreFilesMetadata(metadataSnapshot, emptyList()); + return new StoreFilesMetadata(shardId, metadataSnapshot, emptyList()); } finally { TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); if (exists) { @@ -192,35 +193,33 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep } } - public record StoreFilesMetadata(Store.MetadataSnapshot metadataSnapshot, List peerRecoveryRetentionLeases) - implements - Iterable, - Writeable { + public record StoreFilesMetadata( + ShardId shardId, + Store.MetadataSnapshot metadataSnapshot, + List peerRecoveryRetentionLeases + ) implements Iterable, Writeable { private static final ShardId FAKE_SHARD_ID = new ShardId("_na_", "_na_", 0); - public static final StoreFilesMetadata EMPTY = new StoreFilesMetadata(Store.MetadataSnapshot.EMPTY, emptyList()); + public static final StoreFilesMetadata EMPTY = new StoreFilesMetadata(FAKE_SHARD_ID, Store.MetadataSnapshot.EMPTY, emptyList()); public static StoreFilesMetadata readFrom(StreamInput in) throws IOException { - if (in.getVersion().before(Version.V_8_2_0)) { - new ShardId(in); + ShardId shardId = FAKE_SHARD_ID; + if (in.getVersion().onOrAfter(Version.V_8_3_0)) { + shardId = new ShardId(in); } final var metadataSnapshot = Store.MetadataSnapshot.readFrom(in); final var peerRecoveryRetentionLeases = in.readList(RetentionLease::new); if (metadataSnapshot == Store.MetadataSnapshot.EMPTY && peerRecoveryRetentionLeases.isEmpty()) { return EMPTY; } else { - return new StoreFilesMetadata(metadataSnapshot, peerRecoveryRetentionLeases); + return new StoreFilesMetadata(shardId, metadataSnapshot, peerRecoveryRetentionLeases); } } @Override public void writeTo(StreamOutput out) throws IOException { - if (out.getVersion().before(Version.V_8_2_0)) { - // no compatible version cares about the shard ID, we can just make one up - FAKE_SHARD_ID.writeTo(out); - - // NB only checked this for versions back to 7.17.0, we are assuming that we don't use this with earlier versions: - assert out.getVersion().onOrAfter(Version.V_7_17_0) : out.getVersion(); + if (out.getVersion().onOrAfter(Version.V_8_3_0)) { + shardId.writeTo(out); } metadataSnapshot.writeTo(out); out.writeList(peerRecoveryRetentionLeases); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationServiceTests.java index 34ca77e4e4996..79562d457de31 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationServiceTests.java @@ -129,7 +129,8 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing } }, new EmptyClusterInfoService(), - EmptySnapshotsInfoService.INSTANCE + EmptySnapshotsInfoService.INSTANCE, + null ); final String unrealisticAllocatorName = "unrealistic"; @@ -228,7 +229,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing } public void testExplainsNonAllocationOfShardWithUnknownAllocator() { - final AllocationService allocationService = new AllocationService(null, null, null, null); + final AllocationService allocationService = new AllocationService(null, (ShardsAllocator) null, null, null, null); allocationService.setExistingShardsAllocators( Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, new TestGatewayAllocator()) ); diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index 758da73def503..d386e080606a7 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -684,6 +684,7 @@ TestAllocator addData( data.put( node, new TransportNodesListShardStoreMetadata.StoreFilesMetadata( + shardId, new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()), peerRecoveryRetentionLeases ) diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index d545ac1a4b16e..1c358632d47da 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -997,7 +997,11 @@ public void testStreamStoreFilesMetadata() throws Exception { ); } TransportNodesListShardStoreMetadata.StoreFilesMetadata outStoreFileMetadata = - new TransportNodesListShardStoreMetadata.StoreFilesMetadata(metadataSnapshot, peerRecoveryRetentionLeases); + new TransportNodesListShardStoreMetadata.StoreFilesMetadata( + new ShardId("index", "_na_", 1), + metadataSnapshot, + peerRecoveryRetentionLeases + ); ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); org.elasticsearch.Version targetNodeVersion = randomCompatibleVersion(random(), org.elasticsearch.Version.CURRENT); @@ -1018,7 +1022,11 @@ public void testStreamStoreFilesMetadata() throws Exception { public void testStreamEmptyStoreFilesMetadata() throws Exception { var outStoreFileMetadata = randomBoolean() ? TransportNodesListShardStoreMetadata.StoreFilesMetadata.EMPTY - : new TransportNodesListShardStoreMetadata.StoreFilesMetadata(Store.MetadataSnapshot.EMPTY, emptyList()); + : new TransportNodesListShardStoreMetadata.StoreFilesMetadata( + new ShardId("index", "_na_", 1), + Store.MetadataSnapshot.EMPTY, + emptyList() + ); var outBuffer = new ByteArrayOutputStream(); var out = new OutputStreamStreamOutput(outBuffer); var targetNodeVersion = randomCompatibleVersion(random(), org.elasticsearch.Version.CURRENT);