From 3639a6f9b3c37d8eafcebc8d822a394226794f0a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 11 Oct 2019 21:16:16 +0200 Subject: [PATCH 01/27] step 1 --- .../elasticsearch/index/shard/IndexShard.java | 36 ++++++++++--------- .../snapshots/RestoreService.java | 2 +- .../index/shard/IndexShardTests.java | 6 ++-- .../ShardFollowTaskReplicationTests.java | 4 ++- .../engine/FollowEngineIndexShardTests.java | 7 ++-- .../SourceOnlySnapshotShardTests.java | 7 ++-- 6 files changed, 37 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 873a49d0048e8..0bca2fe463d93 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1841,12 +1841,16 @@ public boolean recoverFromStore() { return storeRecovery.recoverFromStore(this); } - public boolean restoreFromRepository(Repository repository) { - assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; - assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " + - recoveryState.getRecoverySource(); - StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - return storeRecovery.recoverFromRepository(this, repository); + public void restoreFromRepository(Repository repository, ActionListener listener) { + try { + assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; + assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " + + recoveryState.getRecoverySource(); + StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); + listener.onResponse(storeRecovery.recoverFromRepository(this, repository)); + } catch (Exception e) { + listener.onFailure(e); + } } /** @@ -2529,17 +2533,15 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService case SNAPSHOT: markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState.getRecoverySource(); - threadPool.generic().execute(() -> { - try { - final Repository repository = repositoriesService.repository(recoverySource.snapshot().getRepository()); - if (restoreFromRepository(repository)) { - recoveryListener.onRecoveryDone(recoveryState); - } - } catch (Exception e) { - recoveryListener.onRecoveryFailure(recoveryState, - new RecoveryFailedException(recoveryState, null, e), true); - } - }); + threadPool.generic().execute(() -> restoreFromRepository( + repositoriesService.repository(recoverySource.snapshot().getRepository()), + ActionListener.wrap(r -> { + if (r) { + recoveryListener.onRecoveryDone(recoveryState); + } + }, + e -> recoveryListener.onRecoveryFailure(recoveryState, + new RecoveryFailedException(recoveryState, null, e), true)))); break; case LOCAL_SHARDS: final IndexMetaData indexMetaData = indexSettings().getIndexMetaData(); diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 5f3bc91a9978e..b7722cfaf732e 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -107,7 +107,7 @@ * {@link RoutingTable.Builder#addAsRestore(IndexMetaData, SnapshotRecoverySource)} method. *

* Individual shards are getting restored as part of normal recovery process in - * {@link IndexShard#restoreFromRepository(Repository)} )} + * {@link IndexShard#restoreFromRepository} )} * method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking * at the {@link ShardRouting#recoverySource()} property. *

diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 90ec2ece394e9..c075793b26d09 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2341,7 +2341,8 @@ public void testRestoreShard() throws IOException { DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); target.markAsRecovering("store", new RecoveryState(routing, localNode, null)); - assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") { + final PlainActionFuture future = PlainActionFuture.newFuture(); + target.restoreFromRepository(new RestoreOnlyRepository("test") { @Override public void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { @@ -2357,7 +2358,8 @@ public void restoreShard(Store store, SnapshotId snapshotId, throw new RuntimeException(ex); } } - })); + }, future); + assertTrue(future.actionGet()); assertThat(target.getLocalCheckpoint(), equalTo(2L)); assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L)); assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index a2717905503c4..9e16663209b9b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -451,6 +451,7 @@ protected synchronized void recoverPrimary(IndexShard primary) { ShardRouting routing = ShardRoutingHelper.newWithRestoreSource(primary.routingEntry(), new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test")); primary.markAsRecovering("remote recovery from leader", new RecoveryState(routing, localNode, null)); + final PlainActionFuture future = PlainActionFuture.newFuture(); primary.restoreFromRepository(new RestoreOnlyRepository(index.getName()) { @Override public void restoreShard(Store store, SnapshotId snapshotId, @@ -469,7 +470,8 @@ public void restoreShard(Store store, SnapshotId snapshotId, throw new AssertionError(ex); } } - }); + }, future); + future.actionGet(); } }; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java index f8260f2fce57c..bb4ac8cce95a6 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; @@ -125,7 +126,8 @@ public void testRestoreShard() throws IOException { DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); target.markAsRecovering("store", new RecoveryState(routing, localNode, null)); - assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") { + final PlainActionFuture future = PlainActionFuture.newFuture(); + target.restoreFromRepository(new RestoreOnlyRepository("test") { @Override public void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { @@ -141,7 +143,8 @@ public void restoreShard(Store store, SnapshotId snapshotId, throw new RuntimeException(ex); } } - })); + }, future); + assertTrue(future.actionGet()); assertThat(target.getLocalCheckpoint(), equalTo(0L)); assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L)); assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L)); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 1bb3250af1a07..39222a7246888 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -230,8 +230,11 @@ public void testRestoreMinmal() throws IOException { restoredShard.mapperService().merge(shard.indexSettings().getIndexMetaData(), MapperService.MergeReason.MAPPING_RECOVERY); DiscoveryNode discoveryNode = new DiscoveryNode("node_g", buildNewFakeTransportAddress(), Version.CURRENT); restoredShard.markAsRecovering("test from snap", new RecoveryState(restoredShard.routingEntry(), discoveryNode, null)); - runAsSnapshot(shard.getThreadPool(), () -> - assertTrue(restoredShard.restoreFromRepository(repository))); + runAsSnapshot(shard.getThreadPool(), () -> { + final PlainActionFuture future = PlainActionFuture.newFuture(); + restoredShard.restoreFromRepository(repository, future); + assertTrue(future.actionGet()); + }); assertEquals(restoredShard.recoveryState().getStage(), RecoveryState.Stage.DONE); assertEquals(restoredShard.recoveryState().getTranslog().recoveredOperations(), 0); assertEquals(IndexShardState.POST_RECOVERY, restoredShard.state()); From acacf4f53df4cae3e6a21adb1e94dd4f6741afdf Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 11 Oct 2019 21:20:05 +0200 Subject: [PATCH 02/27] step 2 --- .../elasticsearch/index/shard/IndexShard.java | 2 +- .../index/shard/StoreRecovery.java | 25 +++++++++++-------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 0bca2fe463d93..00ee49e57b8c4 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1847,7 +1847,7 @@ public void restoreFromRepository(Repository repository, ActionListener assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " + recoveryState.getRecoverySource(); StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - listener.onResponse(storeRecovery.recoverFromRepository(this, repository)); + storeRecovery.recoverFromRepository(this, repository, listener); } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 858e6edd3fbfc..4bd450f086e3c 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -32,6 +32,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.RecoverySource; @@ -267,17 +268,19 @@ public void readBytes(byte[] b, int offset, int len) throws IOException { * @return true if the shard has been recovered successfully, false if the recovery * has been ignored due to a concurrent modification of if the clusters state has changed due to async updates. */ - boolean recoverFromRepository(final IndexShard indexShard, Repository repository) { - if (canRecover(indexShard)) { - RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); - assert recoveryType == RecoverySource.Type.SNAPSHOT : "expected snapshot recovery type: " + recoveryType; - SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) indexShard.recoveryState().getRecoverySource(); - return executeRecovery(indexShard, () -> { - logger.debug("restoring from {} ...", indexShard.recoveryState().getRecoverySource()); - restore(indexShard, repository, recoverySource); - }); - } - return false; + void recoverFromRepository(final IndexShard indexShard, Repository repository, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + if (canRecover(indexShard)) { + RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); + assert recoveryType == RecoverySource.Type.SNAPSHOT : "expected snapshot recovery type: " + recoveryType; + SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) indexShard.recoveryState().getRecoverySource(); + return executeRecovery(indexShard, () -> { + logger.debug("restoring from {} ...", indexShard.recoveryState().getRecoverySource()); + restore(indexShard, repository, recoverySource); + }); + } + return false; + }); } From 0c4a128f07e9bbc156a9dc892174590efe58c43d Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 12 Oct 2019 09:31:24 +0200 Subject: [PATCH 03/27] async restore --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 00ee49e57b8c4..6899a465b5821 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2540,8 +2540,8 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService recoveryListener.onRecoveryDone(recoveryState); } }, - e -> recoveryListener.onRecoveryFailure(recoveryState, - new RecoveryFailedException(recoveryState, null, e), true)))); + e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true))) + ); break; case LOCAL_SHARDS: final IndexMetaData indexMetaData = indexSettings().getIndexMetaData(); From 1fd86af25377e51494f11166e3141c6b6e722104 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 12 Oct 2019 16:05:44 +0200 Subject: [PATCH 04/27] bck --- .../index/shard/StoreRecovery.java | 126 ++++++++++-------- .../repositories/FilterRepository.java | 6 +- .../repositories/Repository.java | 3 +- .../blobstore/BlobStoreRepository.java | 47 ++++--- .../index/shard/IndexShardTests.java | 11 +- .../RepositoriesServiceTests.java | 4 +- .../repositories/fs/FsRepositoryTests.java | 10 +- .../index/shard/IndexShardTestCase.java | 5 +- .../xpack/ccr/repository/CcrRepository.java | 93 ++++++------- .../ShardFollowTaskReplicationTests.java | 29 ++-- .../engine/FollowEngineIndexShardTests.java | 11 +- 11 files changed, 185 insertions(+), 160 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 4bd450f086e3c..4bf15aba5b681 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -33,6 +33,7 @@ import org.apache.lucene.store.IndexInput; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.RecoverySource; @@ -91,10 +92,12 @@ boolean recoverFromStore(final IndexShard indexShard) { RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE : "expected store recovery type but was: " + recoveryType; - return executeRecovery(indexShard, () -> { + final PlainActionFuture future = PlainActionFuture.newFuture(); + executeRecovery(indexShard, () -> { logger.debug("starting recovery from store ..."); internalRecoverFromStore(indexShard); - }); + }, future); + return future.actionGet(); } return false; } @@ -120,7 +123,8 @@ boolean recoverFromLocalShards(BiConsumer mappingUpdate Sort indexSort = indexShard.getIndexSort(); final boolean hasNested = indexShard.mapperService().hasNested(); final boolean isSplit = sourceMetaData.getNumberOfShards() < indexShard.indexSettings().getNumberOfShards(); - return executeRecovery(indexShard, () -> { + final PlainActionFuture future = PlainActionFuture.newFuture(); + executeRecovery(indexShard, () -> { logger.debug("starting recovery from local shards {}", shards); try { final Directory directory = indexShard.store().directory(); // don't close this directory!! @@ -138,8 +142,8 @@ boolean recoverFromLocalShards(BiConsumer mappingUpdate } catch (IOException ex) { throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex); } - - }); + }, future); + return future.actionGet(); } return false; } @@ -265,23 +269,24 @@ public void readBytes(byte[] b, int offset, int len) throws IOException { * previously created index snapshot into an existing initializing shard. * @param indexShard the index shard instance to recovery the snapshot from * @param repository the repository holding the physical files the shard should be recovered from - * @return true if the shard has been recovered successfully, false if the recovery - * has been ignored due to a concurrent modification of if the clusters state has changed due to async updates. + * TODO: document listener */ void recoverFromRepository(final IndexShard indexShard, Repository repository, ActionListener listener) { - ActionListener.completeWith(listener, () -> { + try { if (canRecover(indexShard)) { RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); assert recoveryType == RecoverySource.Type.SNAPSHOT : "expected snapshot recovery type: " + recoveryType; SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) indexShard.recoveryState().getRecoverySource(); - return executeRecovery(indexShard, () -> { + executeRecovery(indexShard, () -> { logger.debug("restoring from {} ...", indexShard.recoveryState().getRecoverySource()); restore(indexShard, repository, recoverySource); - }); + }, listener); + } else { + listener.onResponse(false); } - return false; - }); - + } catch (Exception e) { + listener.onFailure(e); + } } private boolean canRecover(IndexShard indexShard) { @@ -298,56 +303,59 @@ private boolean canRecover(IndexShard indexShard) { /** * Recovers the state of the shard from the store. */ - private boolean executeRecovery(final IndexShard indexShard, Runnable recoveryRunnable) throws IndexShardRecoveryException { - try { - recoveryRunnable.run(); - // Check that the gateway didn't leave the shard in init or recovering stage. it is up to the gateway - // to call post recovery. - final IndexShardState shardState = indexShard.state(); - final RecoveryState recoveryState = indexShard.recoveryState(); - assert shardState != IndexShardState.CREATED && shardState != IndexShardState.RECOVERING : - "recovery process of " + shardId + " didn't get to post_recovery. shardState [" + shardState + "]"; - - if (logger.isTraceEnabled()) { - RecoveryState.Index index = recoveryState.getIndex(); - StringBuilder sb = new StringBuilder(); - sb.append(" index : files [").append(index.totalFileCount()).append("] with total_size [") + private void executeRecovery(IndexShard indexShard, Runnable recoveryRunnable, + ActionListener listener) throws IndexShardRecoveryException { + ActionListener.completeWith(listener, () -> { + try { + recoveryRunnable.run(); + // Check that the gateway didn't leave the shard in init or recovering stage. it is up to the gateway + // to call post recovery. + final IndexShardState shardState = indexShard.state(); + final RecoveryState recoveryState = indexShard.recoveryState(); + assert shardState != IndexShardState.CREATED && shardState != IndexShardState.RECOVERING : + "recovery process of " + shardId + " didn't get to post_recovery. shardState [" + shardState + "]"; + + if (logger.isTraceEnabled()) { + RecoveryState.Index index = recoveryState.getIndex(); + StringBuilder sb = new StringBuilder(); + sb.append(" index : files [").append(index.totalFileCount()).append("] with total_size [") .append(new ByteSizeValue(index.totalBytes())).append("], took[") .append(TimeValue.timeValueMillis(index.time())).append("]\n"); - sb.append(" : recovered_files [").append(index.recoveredFileCount()).append("] with total_size [") + sb.append(" : recovered_files [").append(index.recoveredFileCount()).append("] with total_size [") .append(new ByteSizeValue(index.recoveredBytes())).append("]\n"); - sb.append(" : reusing_files [").append(index.reusedFileCount()).append("] with total_size [") + sb.append(" : reusing_files [").append(index.reusedFileCount()).append("] with total_size [") .append(new ByteSizeValue(index.reusedBytes())).append("]\n"); - sb.append(" verify_index : took [") - .append(TimeValue.timeValueMillis(recoveryState.getVerifyIndex().time())).append("], check_index [") - .append(timeValueMillis(recoveryState.getVerifyIndex().checkIndexTime())).append("]\n"); - sb.append(" translog : number_of_operations [").append(recoveryState.getTranslog().recoveredOperations()) + sb.append(" verify_index : took [") + .append(TimeValue.timeValueMillis(recoveryState.getVerifyIndex().time())).append("], check_index [") + .append(timeValueMillis(recoveryState.getVerifyIndex().checkIndexTime())).append("]\n"); + sb.append(" translog : number_of_operations [").append(recoveryState.getTranslog().recoveredOperations()) .append("], took [").append(TimeValue.timeValueMillis(recoveryState.getTranslog().time())).append("]"); - logger.trace("recovery completed from [shard_store], took [{}]\n{}", - timeValueMillis(recoveryState.getTimer().time()), sb); - } else if (logger.isDebugEnabled()) { - logger.debug("recovery completed from [shard_store], took [{}]", timeValueMillis(recoveryState.getTimer().time())); - } - return true; - } catch (IndexShardRecoveryException e) { - if (indexShard.state() == IndexShardState.CLOSED) { - // got closed on us, just ignore this recovery - return false; - } - if ((e.getCause() instanceof IndexShardClosedException) || (e.getCause() instanceof IndexShardNotStartedException)) { - // got closed on us, just ignore this recovery - return false; - } - throw e; - } catch (IndexShardClosedException | IndexShardNotStartedException e) { - } catch (Exception e) { - if (indexShard.state() == IndexShardState.CLOSED) { - // got closed on us, just ignore this recovery - return false; + logger.trace("recovery completed from [shard_store], took [{}]\n{}", + timeValueMillis(recoveryState.getTimer().time()), sb); + } else if (logger.isDebugEnabled()) { + logger.debug("recovery completed from [shard_store], took [{}]", timeValueMillis(recoveryState.getTimer().time())); + } + return true; + } catch (IndexShardRecoveryException e) { + if (indexShard.state() == IndexShardState.CLOSED) { + // got closed on us, just ignore this recovery + return false; + } + if ((e.getCause() instanceof IndexShardClosedException) || (e.getCause() instanceof IndexShardNotStartedException)) { + // got closed on us, just ignore this recovery + return false; + } + throw e; + } catch (IndexShardClosedException | IndexShardNotStartedException e) { + } catch (Exception e) { + if (indexShard.state() == IndexShardState.CLOSED) { + // got closed on us, just ignore this recovery + return false; + } + throw new IndexShardRecoveryException(shardId, "failed recovery", e); } - throw new IndexShardRecoveryException(shardId, "failed recovery", e); - } - return false; + return false; + }); } /** @@ -460,8 +468,10 @@ private void restore(final IndexShard indexShard, final Repository repository, f } final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName); assert indexShard.getEngineOrNull() == null; + final PlainActionFuture future = PlainActionFuture.newFuture(); repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), - restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState()); + restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState(), future); + future.actionGet(); final Store store = indexShard.store(); bootstrap(indexShard, store); assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index f048528ab789e..76afc55eba35f 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -122,9 +122,9 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, listener); } @Override - public void restoreShard(Store store, SnapshotId snapshotId, - Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { - in.restoreShard(store, snapshotId, version, indexId, snapshotShardId, recoveryState); + public void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, + RecoveryState recoveryState, ActionListener listener) { + in.restoreShard(store, snapshotId, version, indexId, snapshotShardId, recoveryState, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 4804b0852ea5b..fc0dcf691390c 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -208,9 +208,10 @@ void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshot * @param indexId id of the index in the repository from which the restore is occurring * @param snapshotShardId shard id (in the snapshot) * @param recoveryState recovery state + * @param listener listener to invoke once done */ void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, - RecoveryState recoveryState); + RecoveryState recoveryState, ActionListener listener); /** * Retrieve shard snapshot status for the stored snapshot diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 59c6a248ca0f4..ba11a04747263 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1143,28 +1143,31 @@ public void onFailure(Exception e) { @Override public void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, - RecoveryState recoveryState) { - ShardId shardId = store.shardId(); - try { - final BlobContainer container = shardContainer(indexId, snapshotShardId); - BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId); - SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); - new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE) { - @Override - protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { - final InputStream dataBlobCompositeStream = new SlicedInputStream(fileInfo.numberOfParts()) { - @Override - protected InputStream openSlice(long slice) throws IOException { - return container.readBlob(fileInfo.partName(slice)); - } - }; - return restoreRateLimiter == null ? dataBlobCompositeStream - : new RateLimitingInputStream(dataBlobCompositeStream, restoreRateLimiter, restoreRateLimitingTimeInNanos::inc); - } - }.restore(snapshotFiles, store); - } catch (Exception e) { - throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e); - } + RecoveryState recoveryState, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + ShardId shardId = store.shardId(); + try { + final BlobContainer container = shardContainer(indexId, snapshotShardId); + BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId); + SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); + new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE) { + @Override + protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { + final InputStream dataBlobCompositeStream = new SlicedInputStream(fileInfo.numberOfParts()) { + @Override + protected InputStream openSlice(long slice) throws IOException { + return container.readBlob(fileInfo.partName(slice)); + } + }; + return restoreRateLimiter == null ? dataBlobCompositeStream : + new RateLimitingInputStream(dataBlobCompositeStream, restoreRateLimiter, restoreRateLimitingTimeInNanos::inc); + } + }.restore(snapshotFiles, store); + return null; + } catch (Exception e) { + throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e); + } + }); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index c075793b26d09..de7f9c7e9c190 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2344,9 +2344,9 @@ public void testRestoreShard() throws IOException { final PlainActionFuture future = PlainActionFuture.newFuture(); target.restoreFromRepository(new RestoreOnlyRepository("test") { @Override - public void restoreShard(Store store, SnapshotId snapshotId, - Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { - try { + public void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, + RecoveryState recoveryState, ActionListener listener) { + ActionListener.completeWith(listener, () -> { cleanLuceneIndex(targetStore.directory()); for (String file : sourceStore.directory().listAll()) { if (file.equals("write.lock") || file.startsWith("extra")) { @@ -2354,9 +2354,8 @@ public void restoreShard(Store store, SnapshotId snapshotId, } targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT); } - } catch (Exception ex) { - throw new RuntimeException(ex); - } + return null; + }); } }, future); assertTrue(future.actionGet()); diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 5cd03b3251a21..9d38ea39350d2 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -203,8 +203,8 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s } @Override - public void restoreShard(Store store, SnapshotId snapshotId, - Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { + public void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, + RecoveryState recoveryState, ActionListener listener) { } diff --git a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java index 3c2d59564deac..2e5553f33901c 100644 --- a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java @@ -118,8 +118,10 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException { new UnassignedInfo(UnassignedInfo.Reason.EXISTING_INDEX_RESTORED, "")); routing = ShardRoutingHelper.initialize(routing, localNode.getId(), 0); RecoveryState state = new RecoveryState(routing, localNode, null); + final PlainActionFuture futureA = PlainActionFuture.newFuture(); runGeneric(threadPool, () -> - repository.restoreShard(store, snapshotId, Version.CURRENT, indexId, shardId, state)); + repository.restoreShard(store, snapshotId, Version.CURRENT, indexId, shardId, state, futureA)); + futureA.actionGet(); assertTrue(state.getIndex().recoveredBytes() > 0); assertEquals(0, state.getIndex().reusedFileCount()); assertEquals(indexCommit.getFileNames().size(), state.getIndex().recoveredFileCount()); @@ -140,14 +142,16 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException { // roll back to the first snap and then incrementally restore RecoveryState firstState = new RecoveryState(routing, localNode, null); + final PlainActionFuture futureB = PlainActionFuture.newFuture(); runGeneric(threadPool, () -> - repository.restoreShard(store, snapshotId, Version.CURRENT, indexId, shardId, firstState)); + repository.restoreShard(store, snapshotId, Version.CURRENT, indexId, shardId, firstState, futureB)); assertEquals("should reuse everything except of .liv and .si", commitFileNames.size()-2, firstState.getIndex().reusedFileCount()); RecoveryState secondState = new RecoveryState(routing, localNode, null); + final PlainActionFuture futureC = PlainActionFuture.newFuture(); runGeneric(threadPool, () -> - repository.restoreShard(store, incSnapshotId, Version.CURRENT, indexId, shardId, secondState)); + repository.restoreShard(store, incSnapshotId, Version.CURRENT, indexId, shardId, secondState, futureC)); assertEquals(secondState.getIndex().reusedFileCount(), commitFileNames.size()-2); assertEquals(secondState.getIndex().recoveredFileCount(), 2); List recoveredFiles = diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 35da786474aaf..ddfb9f0749e58 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -805,11 +805,14 @@ protected void recoverShardFromSnapshot(final IndexShard shard, new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, version, index); final ShardRouting shardRouting = newShardRouting(shardId, node.getId(), true, ShardRoutingState.INITIALIZING, recoverySource); shard.markAsRecovering("from snapshot", new RecoveryState(shardRouting, node, null)); + final PlainActionFuture future = PlainActionFuture.newFuture(); repository.restoreShard(shard.store(), snapshot.getSnapshotId(), version, indexId, shard.shardId(), - shard.recoveryState()); + shard.recoveryState(), + future); + future.actionGet(); } /** diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 6ac65c3182061..344a7be65785c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -297,27 +297,28 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s } @Override - public void restoreShard(Store store, SnapshotId snapshotId, - Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { - // TODO: Add timeouts to network calls / the restore process. - createEmptyStore(store); - ShardId shardId = store.shardId(); - - final Map ccrMetaData = store.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); - final String leaderIndexName = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY); - final String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); - final Index leaderIndex = new Index(leaderIndexName, leaderUUID); - final ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId()); - - final Client remoteClient = getRemoteClusterClient(); - - final String retentionLeaseId = + public void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, + RecoveryState recoveryState, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + // TODO: Add timeouts to network calls / the restore process. + createEmptyStore(store); + ShardId shardId = store.shardId(); + + final Map ccrMetaData = store.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); + final String leaderIndexName = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY); + final String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); + final Index leaderIndex = new Index(leaderIndexName, leaderUUID); + final ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId()); + + final Client remoteClient = getRemoteClusterClient(); + + final String retentionLeaseId = retentionLeaseId(localClusterName, shardId.getIndex(), remoteClusterAlias, leaderIndex); - acquireRetentionLeaseOnLeader(shardId, retentionLeaseId, leaderShardId, remoteClient); + acquireRetentionLeaseOnLeader(shardId, retentionLeaseId, leaderShardId, remoteClient); - // schedule renewals to run during the restore - final Scheduler.Cancellable renewable = threadPool.scheduleWithFixedDelay( + // schedule renewals to run during the restore + final Scheduler.Cancellable renewable = threadPool.scheduleWithFixedDelay( () -> { logger.trace("{} background renewal of retention lease [{}] during restore", shardId, retentionLeaseId); final ThreadContext threadContext = threadPool.getThreadContext(); @@ -325,38 +326,40 @@ public void restoreShard(Store store, SnapshotId snapshotId, // we have to execute under the system context so that if security is enabled the renewal is authorized threadContext.markAsSystemContext(); CcrRetentionLeases.asyncRenewRetentionLease( - leaderShardId, - retentionLeaseId, - RETAIN_ALL, - remoteClient, - ActionListener.wrap( - r -> {}, - e -> { - final Throwable cause = ExceptionsHelper.unwrapCause(e); - assert cause instanceof ElasticsearchSecurityException == false : cause; - if (cause instanceof RetentionLeaseInvalidRetainingSeqNoException == false) { - logger.warn(new ParameterizedMessage( - "{} background renewal of retention lease [{}] failed during restore", shardId, - retentionLeaseId), cause); - } - })); + leaderShardId, + retentionLeaseId, + RETAIN_ALL, + remoteClient, + ActionListener.wrap( + r -> {}, + e -> { + final Throwable cause = ExceptionsHelper.unwrapCause(e); + assert cause instanceof ElasticsearchSecurityException == false : cause; + if (cause instanceof RetentionLeaseInvalidRetainingSeqNoException == false) { + logger.warn(new ParameterizedMessage( + "{} background renewal of retention lease [{}] failed during restore", shardId, + retentionLeaseId), cause); + } + })); } }, CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(store.indexSettings().getNodeSettings()), Ccr.CCR_THREAD_POOL_NAME); - // TODO: There should be some local timeout. And if the remote cluster returns an unknown session - // response, we should be able to retry by creating a new session. - try (RestoreSession restoreSession = openSession(metadata.name(), remoteClient, leaderShardId, shardId, recoveryState)) { - restoreSession.restoreFiles(store); - updateMappings(remoteClient, leaderIndex, restoreSession.mappingVersion, client, shardId.getIndex()); - } catch (Exception e) { - throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e); - } finally { - logger.trace("{} canceling background renewal of retention lease [{}] at the end of restore", shardId, - retentionLeaseId); - renewable.cancel(); - } + // TODO: There should be some local timeout. And if the remote cluster returns an unknown session + // response, we should be able to retry by creating a new session. + try (RestoreSession restoreSession = openSession(metadata.name(), remoteClient, leaderShardId, shardId, recoveryState)) { + restoreSession.restoreFiles(store); + updateMappings(remoteClient, leaderIndex, restoreSession.mappingVersion, client, shardId.getIndex()); + return null; + } catch (Exception e) { + throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e); + } finally { + logger.trace("{} canceling background renewal of retention lease [{}] at the end of restore", shardId, + retentionLeaseId); + renewable.cancel(); + } + }); } private void createEmptyStore(Store store) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 9e16663209b9b..774de74787542 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -454,21 +454,24 @@ protected synchronized void recoverPrimary(IndexShard primary) { final PlainActionFuture future = PlainActionFuture.newFuture(); primary.restoreFromRepository(new RestoreOnlyRepository(index.getName()) { @Override - public void restoreShard(Store store, SnapshotId snapshotId, - Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { - try { - IndexShard leader = leaderGroup.getPrimary(); - Lucene.cleanLuceneIndex(primary.store().directory()); - try (Engine.IndexCommitRef sourceCommit = leader.acquireSafeIndexCommit()) { - Store.MetadataSnapshot sourceSnapshot = leader.store().getMetadata(sourceCommit.getIndexCommit()); - for (StoreFileMetaData md : sourceSnapshot) { - primary.store().directory().copyFrom( - leader.store().directory(), md.name(), md.name(), IOContext.DEFAULT); + public void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, + RecoveryState recoveryState, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + try { + IndexShard leader = leaderGroup.getPrimary(); + Lucene.cleanLuceneIndex(primary.store().directory()); + try (Engine.IndexCommitRef sourceCommit = leader.acquireSafeIndexCommit()) { + Store.MetadataSnapshot sourceSnapshot = leader.store().getMetadata(sourceCommit.getIndexCommit()); + for (StoreFileMetaData md : sourceSnapshot) { + primary.store().directory().copyFrom( + leader.store().directory(), md.name(), md.name(), IOContext.DEFAULT); + } } + return null; + } catch (Exception ex) { + throw new AssertionError(ex); } - } catch (Exception ex) { - throw new AssertionError(ex); - } + }); } }, future); future.actionGet(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java index bb4ac8cce95a6..e684c83fec14d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java @@ -129,9 +129,9 @@ public void testRestoreShard() throws IOException { final PlainActionFuture future = PlainActionFuture.newFuture(); target.restoreFromRepository(new RestoreOnlyRepository("test") { @Override - public void restoreShard(Store store, SnapshotId snapshotId, - Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { - try { + public void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, + RecoveryState recoveryState, ActionListener listener) { + ActionListener.completeWith(listener, () -> { cleanLuceneIndex(targetStore.directory()); for (String file : sourceStore.directory().listAll()) { if (file.equals("write.lock") || file.startsWith("extra")) { @@ -139,9 +139,8 @@ public void restoreShard(Store store, SnapshotId snapshotId, } targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT); } - } catch (Exception ex) { - throw new RuntimeException(ex); - } + return null; + }); } }, future); assertTrue(future.actionGet()); From eb6f102a0836dbed622d6ae9e41d7667741e9dda Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sun, 13 Oct 2019 17:19:03 +0200 Subject: [PATCH 05/27] another step --- .../index/shard/StoreRecovery.java | 42 ++++++++++++------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 4bf15aba5b681..489f976524136 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.shard; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; - import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; @@ -60,6 +59,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.stream.Collectors; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; @@ -93,9 +93,10 @@ boolean recoverFromStore(final IndexShard indexShard) { assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE : "expected store recovery type but was: " + recoveryType; final PlainActionFuture future = PlainActionFuture.newFuture(); - executeRecovery(indexShard, () -> { + executeRecovery(indexShard, l -> { logger.debug("starting recovery from store ..."); internalRecoverFromStore(indexShard); + l.onResponse(true); }, future); return future.actionGet(); } @@ -124,7 +125,7 @@ boolean recoverFromLocalShards(BiConsumer mappingUpdate final boolean hasNested = indexShard.mapperService().hasNested(); final boolean isSplit = sourceMetaData.getNumberOfShards() < indexShard.indexSettings().getNumberOfShards(); final PlainActionFuture future = PlainActionFuture.newFuture(); - executeRecovery(indexShard, () -> { + executeRecovery(indexShard, l -> { logger.debug("starting recovery from local shards {}", shards); try { final Directory directory = indexShard.store().directory(); // don't close this directory!! @@ -139,6 +140,7 @@ boolean recoverFromLocalShards(BiConsumer mappingUpdate // copied segments - we will also see them in stats etc. indexShard.getEngine().forceMerge(false, -1, false, false, false); + l.onResponse(true); } catch (IOException ex) { throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex); } @@ -277,9 +279,10 @@ void recoverFromRepository(final IndexShard indexShard, Repository repository, A RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); assert recoveryType == RecoverySource.Type.SNAPSHOT : "expected snapshot recovery type: " + recoveryType; SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) indexShard.recoveryState().getRecoverySource(); - executeRecovery(indexShard, () -> { + executeRecovery(indexShard, l -> { logger.debug("restoring from {} ...", indexShard.recoveryState().getRecoverySource()); restore(indexShard, repository, recoverySource); + l.onResponse(true); }, listener); } else { listener.onResponse(false); @@ -303,11 +306,10 @@ private boolean canRecover(IndexShard indexShard) { /** * Recovers the state of the shard from the store. */ - private void executeRecovery(IndexShard indexShard, Runnable recoveryRunnable, + private void executeRecovery(IndexShard indexShard, Consumer> recoveryRunnable, ActionListener listener) throws IndexShardRecoveryException { - ActionListener.completeWith(listener, () -> { - try { - recoveryRunnable.run(); + ActionListener finalListener = ActionListener.wrap(res -> { + if (res) { // Check that the gateway didn't leave the shard in init or recovering stage. it is up to the gateway // to call post recovery. final IndexShardState shardState = indexShard.state(); @@ -335,27 +337,39 @@ private void executeRecovery(IndexShard indexShard, Runnable recoveryRunnable, } else if (logger.isDebugEnabled()) { logger.debug("recovery completed from [shard_store], took [{}]", timeValueMillis(recoveryState.getTimer().time())); } - return true; + } + listener.onResponse(res); + }, ex -> { + try { + throw ex; } catch (IndexShardRecoveryException e) { if (indexShard.state() == IndexShardState.CLOSED) { // got closed on us, just ignore this recovery - return false; + listener.onResponse(false); + return; } if ((e.getCause() instanceof IndexShardClosedException) || (e.getCause() instanceof IndexShardNotStartedException)) { // got closed on us, just ignore this recovery - return false; + listener.onResponse(false); + return; } throw e; } catch (IndexShardClosedException | IndexShardNotStartedException e) { } catch (Exception e) { if (indexShard.state() == IndexShardState.CLOSED) { // got closed on us, just ignore this recovery - return false; + listener.onResponse(false); + } else { + throw new IndexShardRecoveryException(shardId, "failed recovery", e); } - throw new IndexShardRecoveryException(shardId, "failed recovery", e); } - return false; + listener.onResponse(false); }); + try { + recoveryRunnable.accept(finalListener); + } catch (Exception e) { + finalListener.onFailure(e); + } } /** From 9deb3cda12fd3e56e89d9f8a0990eebbcfc4fc15 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 14 Oct 2019 07:25:44 +0200 Subject: [PATCH 06/27] another step --- .../index/shard/StoreRecovery.java | 46 +++--- .../blobstore/BlobStoreRepository.java | 47 +++--- .../blobstore/FileRestoreContext.java | 128 +++++++++------- .../xpack/ccr/repository/CcrRepository.java | 142 +++++++++--------- 4 files changed, 194 insertions(+), 169 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 489f976524136..4aba808f5a315 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -279,11 +279,7 @@ void recoverFromRepository(final IndexShard indexShard, Repository repository, A RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); assert recoveryType == RecoverySource.Type.SNAPSHOT : "expected snapshot recovery type: " + recoveryType; SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) indexShard.recoveryState().getRecoverySource(); - executeRecovery(indexShard, l -> { - logger.debug("restoring from {} ...", indexShard.recoveryState().getRecoverySource()); - restore(indexShard, repository, recoverySource); - l.onResponse(true); - }, listener); + executeRecovery(indexShard, l -> restore(indexShard, repository, recoverySource, l), listener); } else { listener.onResponse(false); } @@ -360,8 +356,9 @@ private void executeRecovery(IndexShard indexShard, Consumer listener) { + logger.debug("restoring from {} ...", indexShard.recoveryState().getRecoverySource()); final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog(); if (restoreSource == null) { - throw new IndexShardRestoreFailedException(shardId, "empty restore source"); + listener.onFailure(new IndexShardRestoreFailedException(shardId, "empty restore source")); + return; } if (logger.isTraceEnabled()) { logger.trace("[{}] restoring shard [{}]", restoreSource.snapshot(), shardId); } + final ActionListener restoreListener = ActionListener.wrap( + v -> { + final Store store = indexShard.store(); + bootstrap(indexShard, store); + assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; + writeEmptyRetentionLeasesFile(indexShard); + indexShard.openEngineAndRecoverFromTranslog(); + indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); + indexShard.finalizeRecovery(); + indexShard.postRecovery("restore done"); + listener.onResponse(true); + listener.onResponse(true); + }, e -> listener.onFailure(new IndexShardRestoreFailedException(shardId, "restore failed", e)) + ); try { translogState.totalOperations(0); translogState.totalOperationsOnStart(0); @@ -482,20 +496,10 @@ private void restore(final IndexShard indexShard, final Repository repository, f } final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName); assert indexShard.getEngineOrNull() == null; - final PlainActionFuture future = PlainActionFuture.newFuture(); - repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), - restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState(), future); - future.actionGet(); - final Store store = indexShard.store(); - bootstrap(indexShard, store); - assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; - writeEmptyRetentionLeasesFile(indexShard); - indexShard.openEngineAndRecoverFromTranslog(); - indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); - indexShard.finalizeRecovery(); - indexShard.postRecovery("restore done"); + repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, + snapshotShardId, indexShard.recoveryState(), restoreListener); } catch (Exception e) { - throw new IndexShardRestoreFailedException(shardId, "restore failed", e); + restoreListener.onFailure(e); } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index ba11a04747263..66355668de1ca 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1144,30 +1144,29 @@ public void onFailure(Exception e) { @Override public void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState, ActionListener listener) { - ActionListener.completeWith(listener, () -> { - ShardId shardId = store.shardId(); - try { - final BlobContainer container = shardContainer(indexId, snapshotShardId); - BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId); - SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); - new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE) { - @Override - protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { - final InputStream dataBlobCompositeStream = new SlicedInputStream(fileInfo.numberOfParts()) { - @Override - protected InputStream openSlice(long slice) throws IOException { - return container.readBlob(fileInfo.partName(slice)); - } - }; - return restoreRateLimiter == null ? dataBlobCompositeStream : - new RateLimitingInputStream(dataBlobCompositeStream, restoreRateLimiter, restoreRateLimitingTimeInNanos::inc); - } - }.restore(snapshotFiles, store); - return null; - } catch (Exception e) { - throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e); - } - }); + final ShardId shardId = store.shardId(); + final ActionListener restoreListener = ActionListener.delegateResponse(listener, + (l, e) -> l.onFailure(new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e))); + try { + final BlobContainer container = shardContainer(indexId, snapshotShardId); + BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId); + SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); + new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE) { + @Override + protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { + final InputStream dataBlobCompositeStream = new SlicedInputStream(fileInfo.numberOfParts()) { + @Override + protected InputStream openSlice(long slice) throws IOException { + return container.readBlob(fileInfo.partName(slice)); + } + }; + return restoreRateLimiter == null ? dataBlobCompositeStream : + new RateLimitingInputStream(dataBlobCompositeStream, restoreRateLimiter, restoreRateLimitingTimeInNanos::inc); + } + }.restore(snapshotFiles, store, restoreListener); + } catch (Exception e) { + restoreListener.onFailure(e); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java index 96402d0d1bbc6..9203015e1d493 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java @@ -26,6 +26,8 @@ import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexOutput; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.shard.ShardId; @@ -85,7 +87,7 @@ protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId /** * Performs restore operation */ - public void restore(SnapshotFiles snapshotFiles, Store store) throws IOException { + public void restore(SnapshotFiles snapshotFiles, Store store, ActionListener listener) { store.incRef(); try { logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId); @@ -177,44 +179,57 @@ public void restore(SnapshotFiles snapshotFiles, Store store) throws IOException } } - restoreFiles(filesToRecover, store); + restoreFiles(filesToRecover, store, ActionListener.wrap( + v -> afterRestore(snapshotFiles, store, restoredSegmentsFile, listener), listener::onFailure)); } catch (IOException ex) { throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex); } + } catch (Exception e) { + listener.onFailure(e); + } finally { + store.decRef(); + } + } - // read the snapshot data persisted - try { - Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory()); - } catch (IOException e) { - throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e); - } + private void afterRestore(SnapshotFiles snapshotFiles, Store store, StoreFileMetaData restoredSegmentsFile, + ActionListener listener) { + // read the snapshot data persisted + try { + Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory()); + } catch (IOException e) { + throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e); + } - /// now, go over and clean files that are in the store, but were not in the snapshot - try { - for (String storeFile : store.directory().listAll()) { - if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) { - continue; //skip write.lock, checksum files and files that exist in the snapshot - } - try { - store.deleteQuiet("restore", storeFile); - store.directory().deleteFile(storeFile); - } catch (IOException e) { - logger.warn("[{}] [{}] failed to delete file [{}] during snapshot cleanup", shardId, snapshotId, storeFile); - } + /// now, go over and clean files that are in the store, but were not in the snapshot + try { + for (String storeFile : store.directory().listAll()) { + if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) { + continue; //skip write.lock, checksum files and files that exist in the snapshot + } + try { + store.deleteQuiet("restore", storeFile); + store.directory().deleteFile(storeFile); + } catch (IOException e) { + logger.warn("[{}] [{}] failed to delete file [{}] during snapshot cleanup", shardId, snapshotId, storeFile); } - } catch (IOException e) { - logger.warn("[{}] [{}] failed to list directory - some of files might not be deleted", shardId, snapshotId); } - } finally { - store.decRef(); + } catch (IOException e) { + logger.warn("[{}] [{}] failed to list directory - some of files might not be deleted", shardId, snapshotId); } + listener.onResponse(null); } - protected void restoreFiles(List filesToRecover, Store store) throws IOException { - // restore the files from the snapshot to the Lucene store - for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { - logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); - restoreFile(fileToRecover, store); + protected void restoreFiles(List filesToRecover, Store store, ActionListener listener) { + if (filesToRecover.isEmpty()) { + listener.onResponse(null); + } else { + ActionListener allFilesListener = + new GroupedActionListener<>(ActionListener.map(listener, v -> null), filesToRecover.size()); + // restore the files from the snapshot to the Lucene store + for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { + logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); + restoreFile(fileToRecover, store, allFilesListener); + } } } @@ -230,34 +245,37 @@ private static Iterable concat(Store.RecoveryDiff diff) { * * @param fileInfo file to be restored */ - private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final Store store) throws IOException { - boolean success = false; + private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + boolean success = false; - try (InputStream stream = fileInputStream(fileInfo)) { - try (IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { - final byte[] buffer = new byte[bufferSize]; - int length; - while ((length = stream.read(buffer)) > 0) { - indexOutput.writeBytes(buffer, 0, length); - recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length); - } - Store.verify(indexOutput); - indexOutput.close(); - store.directory().sync(Collections.singleton(fileInfo.physicalName())); - success = true; - } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { - try { - store.markStoreCorrupted(ex); - } catch (IOException e) { - logger.warn("store cannot be marked as corrupted", e); - } - throw ex; - } finally { - if (success == false) { - store.deleteQuiet(fileInfo.physicalName()); + try (InputStream stream = fileInputStream(fileInfo)) { + try (IndexOutput indexOutput = + store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { + final byte[] buffer = new byte[bufferSize]; + int length; + while ((length = stream.read(buffer)) > 0) { + indexOutput.writeBytes(buffer, 0, length); + recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length); + } + Store.verify(indexOutput); + indexOutput.close(); + store.directory().sync(Collections.singleton(fileInfo.physicalName())); + success = true; + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { + try { + store.markStoreCorrupted(ex); + } catch (IOException e) { + logger.warn("store cannot be marked as corrupted", e); + } + throw ex; + } finally { + if (success == false) { + store.deleteQuiet(fileInfo.physicalName()); + } } } - } + return null; + }); } - } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 344a7be65785c..95fe9a1d6add9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -468,94 +468,98 @@ private static class RestoreSession extends FileRestoreContext implements Closea this.throttleListener = throttleListener; } - void restoreFiles(Store store) throws IOException { + void restoreFiles(Store store) { ArrayList fileInfos = new ArrayList<>(); for (StoreFileMetaData fileMetaData : sourceMetaData) { ByteSizeValue fileSize = new ByteSizeValue(fileMetaData.length()); fileInfos.add(new FileInfo(fileMetaData.name(), fileMetaData, fileSize)); } SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos); - restore(snapshotFiles, store); + final PlainActionFuture future = PlainActionFuture.newFuture(); + restore(snapshotFiles, store, future); + future.actionGet(); } @Override - protected void restoreFiles(List filesToRecover, Store store) throws IOException { + protected void restoreFiles(List filesToRecover, Store store, ActionListener doneListener) { logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover); - - try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> { - })) { - final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); - final AtomicReference> error = new AtomicReference<>(); - - for (FileInfo fileInfo : filesToRecover) { - final long fileLength = fileInfo.length(); - long offset = 0; - while (offset < fileLength && error.get() == null) { - final long requestSeqId = requestSeqIdTracker.generateSeqNo(); - try { - requestSeqIdTracker.waitForProcessedOpsToComplete(requestSeqId - ccrSettings.getMaxConcurrentFileChunks()); - - if (error.get() != null) { - requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId); - break; - } - - final int bytesRequested = Math.toIntExact( - Math.min(ccrSettings.getChunkSize().getBytes(), fileLength - offset)); - offset += bytesRequested; - - final GetCcrRestoreFileChunkRequest request = - new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileInfo.name(), bytesRequested); - logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId, - fileInfo.name(), offset, bytesRequested); - - TimeValue timeout = ccrSettings.getRecoveryActionTimeout(); - ActionListener listener = - ListenerTimeouts.wrapWithTimeout(threadPool, ActionListener.wrap( - r -> threadPool.generic().execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { + ActionListener.completeWith(doneListener, () -> { + try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> { + })) { + final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); + final AtomicReference> error = new AtomicReference<>(); + + for (FileInfo fileInfo : filesToRecover) { + final long fileLength = fileInfo.length(); + long offset = 0; + while (offset < fileLength && error.get() == null) { + final long requestSeqId = requestSeqIdTracker.generateSeqNo(); + try { + requestSeqIdTracker.waitForProcessedOpsToComplete(requestSeqId - ccrSettings.getMaxConcurrentFileChunks()); + + if (error.get() != null) { + requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId); + break; + } + + final int bytesRequested = Math.toIntExact( + Math.min(ccrSettings.getChunkSize().getBytes(), fileLength - offset)); + offset += bytesRequested; + + final GetCcrRestoreFileChunkRequest request = + new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileInfo.name(), bytesRequested); + logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId, + fileInfo.name(), offset, bytesRequested); + + TimeValue timeout = ccrSettings.getRecoveryActionTimeout(); + ActionListener listener = + ListenerTimeouts.wrapWithTimeout(threadPool, ActionListener.wrap( + r -> threadPool.generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e)); + requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId); + } + + @Override + protected void doRun() throws Exception { + final int actualChunkSize = r.getChunk().length(); + logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", shardId, + snapshotId, fileInfo.name(), r.getOffset(), actualChunkSize); + final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize); + throttleListener.accept(nanosPaused); + final boolean lastChunk = r.getOffset() + actualChunkSize >= fileLength; + multiFileWriter.writeFileChunk(fileInfo.metadata(), r.getOffset(), r.getChunk(), lastChunk); + requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId); + } + }), + e -> { error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e)); requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId); } - - @Override - protected void doRun() throws Exception { - final int actualChunkSize = r.getChunk().length(); - logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", shardId, - snapshotId, fileInfo.name(), r.getOffset(), actualChunkSize); - final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize); - throttleListener.accept(nanosPaused); - final boolean lastChunk = r.getOffset() + actualChunkSize >= fileLength; - multiFileWriter.writeFileChunk(fileInfo.metadata(), r.getOffset(), r.getChunk(), lastChunk); - requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId); - } - }), - e -> { - error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e)); - requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId); - } ), timeout, ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME); - remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, listener); - } catch (Exception e) { - error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e)); - requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId); + remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, listener); + } catch (Exception e) { + error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e)); + requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId); + } } } - } - try { - requestSeqIdTracker.waitForProcessedOpsToComplete(requestSeqIdTracker.getMaxSeqNo()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ElasticsearchException(e); - } - if (error.get() != null) { - handleError(store, error.get().v2()); + try { + requestSeqIdTracker.waitForProcessedOpsToComplete(requestSeqIdTracker.getMaxSeqNo()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ElasticsearchException(e); + } + if (error.get() != null) { + handleError(store, error.get().v2()); + } } - } - logger.trace("[{}] completed CCR restore", shardId); + logger.trace("[{}] completed CCR restore", shardId); + return null; + }); } private void handleError(Store store, Exception e) throws IOException { From b4d6d88caf1bd86bd4efc388fa1e0497b92a815d Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 14 Oct 2019 15:16:12 +0200 Subject: [PATCH 07/27] async --- .../repositories/blobstore/BlobStoreRepository.java | 8 +++----- .../elasticsearch/repositories/fs/FsRepositoryTests.java | 2 ++ 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 66355668de1ca..7f1f7a5e1b189 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1147,7 +1147,7 @@ public void restoreShard(Store store, SnapshotId snapshotId, Version version, In final ShardId shardId = store.shardId(); final ActionListener restoreListener = ActionListener.delegateResponse(listener, (l, e) -> l.onFailure(new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e))); - try { + threadPool.generic().execute(ActionRunnable.wrap(restoreListener, l -> { final BlobContainer container = shardContainer(indexId, snapshotShardId); BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId); SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); @@ -1163,10 +1163,8 @@ protected InputStream openSlice(long slice) throws IOException { return restoreRateLimiter == null ? dataBlobCompositeStream : new RateLimitingInputStream(dataBlobCompositeStream, restoreRateLimiter, restoreRateLimitingTimeInNanos::inc); } - }.restore(snapshotFiles, store, restoreListener); - } catch (Exception e) { - restoreListener.onFailure(e); - } + }.restore(snapshotFiles, store, l); + })); } @Override diff --git a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java index 2e5553f33901c..5920a3295e8e4 100644 --- a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java @@ -145,6 +145,7 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException { final PlainActionFuture futureB = PlainActionFuture.newFuture(); runGeneric(threadPool, () -> repository.restoreShard(store, snapshotId, Version.CURRENT, indexId, shardId, firstState, futureB)); + futureB.actionGet(); assertEquals("should reuse everything except of .liv and .si", commitFileNames.size()-2, firstState.getIndex().reusedFileCount()); @@ -152,6 +153,7 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException { final PlainActionFuture futureC = PlainActionFuture.newFuture(); runGeneric(threadPool, () -> repository.restoreShard(store, incSnapshotId, Version.CURRENT, indexId, shardId, secondState, futureC)); + futureC.actionGet(); assertEquals(secondState.getIndex().reusedFileCount(), commitFileNames.size()-2); assertEquals(secondState.getIndex().recoveredFileCount(), 2); List recoveredFiles = From 13bf8609d99b4fb93ceaad18df53dda8e0f2942f Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 15 Oct 2019 13:48:04 +0200 Subject: [PATCH 08/27] parallel --- .../repositories/blobstore/BlobStoreRepository.java | 2 +- .../repositories/blobstore/FileRestoreContext.java | 9 +++++++-- .../xpack/ccr/repository/CcrRepository.java | 3 ++- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 7f1f7a5e1b189..fcbb37fa4f902 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1151,7 +1151,7 @@ public void restoreShard(Store store, SnapshotId snapshotId, Version version, In final BlobContainer container = shardContainer(indexId, snapshotShardId); BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId); SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); - new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE) { + new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE, threadPool.generic()) { @Override protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { final InputStream dataBlobCompositeStream = new SlicedInputStream(fileInfo.numberOfParts()) { diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java index 9203015e1d493..e4437780ca5e8 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java @@ -27,6 +27,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexOutput; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.util.iterable.Iterables; @@ -47,6 +48,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; @@ -67,6 +69,8 @@ public abstract class FileRestoreContext { protected final ShardId shardId; protected final int bufferSize; + private final Executor executor; + /** * Constructs new restore context * @@ -76,12 +80,13 @@ public abstract class FileRestoreContext { * @param bufferSize buffer size for restore */ protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState, - int bufferSize) { + int bufferSize, Executor executor) { this.repositoryName = repositoryName; this.recoveryState = recoveryState; this.snapshotId = snapshotId; this.shardId = shardId; this.bufferSize = bufferSize; + this.executor = executor; } /** @@ -228,7 +233,7 @@ protected void restoreFiles(List filesToRe // restore the files from the snapshot to the Lucene store for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); - restoreFile(fileToRecover, store, allFilesListener); + executor.execute(ActionRunnable.wrap(allFilesListener, l -> restoreFile(fileToRecover, store, l))); } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 95fe9a1d6add9..17d0a163c9b9c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -457,7 +457,8 @@ private static class RestoreSession extends FileRestoreContext implements Closea RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, ShardId shardId, RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion, ThreadPool threadPool, CcrSettings ccrSettings, LongConsumer throttleListener) { - super(repositoryName, shardId, SNAPSHOT_ID, recoveryState, Math.toIntExact(ccrSettings.getChunkSize().getBytes())); + super(repositoryName, shardId, SNAPSHOT_ID, recoveryState, Math.toIntExact(ccrSettings.getChunkSize().getBytes()), + threadPool.generic()); this.remoteClient = remoteClient; this.sessionUUID = sessionUUID; this.node = node; From 51e7c34b3c923b4e5f57460cd224aa4f93741f4d Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 16 Oct 2019 10:20:58 +0200 Subject: [PATCH 09/27] bck --- .../repositories/blobstore/FileRestoreContext.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java index e4437780ca5e8..7c80442ff96d1 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java @@ -233,7 +233,7 @@ protected void restoreFiles(List filesToRe // restore the files from the snapshot to the Lucene store for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); - executor.execute(ActionRunnable.wrap(allFilesListener, l -> restoreFile(fileToRecover, store, l))); + executor.execute(ActionRunnable.run(allFilesListener, () -> restoreFile(fileToRecover, store))); } } } @@ -250,8 +250,7 @@ private static Iterable concat(Store.RecoveryDiff diff) { * * @param fileInfo file to be restored */ - private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store, ActionListener listener) { - ActionListener.completeWith(listener, () -> { + private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final Store store) throws IOException { boolean success = false; try (InputStream stream = fileInputStream(fileInfo)) { @@ -280,7 +279,5 @@ private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store st } } } - return null; - }); } } From 22b2306920fda27db235a7890599e3f384310395 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 16 Oct 2019 19:20:29 +0200 Subject: [PATCH 10/27] Simplify Shard Snapshot Upload Code The code here was needlessly complicated when it enqueued all file uploads up-front. Instead, we can go with a cleaner worker + queue pattern here by taking the max-parallelism from the threadpool info. Also, I slightly simplified the rethrow and listener (step listener is pointless when you add the callback in the next line) handling it since I noticed that we were needlessly rethrowing in the same code and that wasn't worth a separate PR. --- .../blobstore/BlobStoreRepository.java | 68 ++++++++----------- .../coordination/DeterministicTaskQueue.java | 2 +- 2 files changed, 31 insertions(+), 39 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 59c6a248ca0f4..74f1fc8835b03 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -104,8 +104,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -194,6 +196,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final BlobPath basePath; + // Maximum number of concurrent file writes that should be scheduled on the SNAPSHOT thread-pool + private final int maxConcurrentWrites; + /** * Constructs new BlobStoreRepository * @param metadata The metadata for this repository including name and settings @@ -222,6 +227,7 @@ protected BlobStoreRepository( IndexMetaData::fromXContent, namedXContentRegistry, compress); snapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, SnapshotInfo::fromXContentInternal, namedXContentRegistry, compress); + maxConcurrentWrites = threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(); } @Override @@ -953,11 +959,9 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { final ShardId shardId = store.shardId(); final long startTime = threadPool.absoluteTimeInMillis(); - final StepListener snapshotDoneListener = new StepListener<>(); - snapshotDoneListener.whenComplete(listener::onResponse, e -> { + final ActionListener snapshotDoneListener = ActionListener.wrap(listener::onResponse, e -> { snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.stackTrace(e)); - listener.onFailure(e instanceof IndexShardSnapshotFailedException ? (IndexShardSnapshotFailedException) e - : new IndexShardSnapshotFailedException(store.shardId(), e)); + listener.onFailure(e instanceof IndexShardSnapshotFailedException ? e : new IndexShardSnapshotFailedException(shardId, e)); }); try { logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name()); @@ -980,7 +984,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s } final List indexCommitPointFiles = new ArrayList<>(); - ArrayList filesToSnapshot = new ArrayList<>(); + final BlockingQueue filesToSnapshot = new LinkedBlockingQueue<>(); store.incRef(); final Collection fileNames; final Store.MetadataSnapshot metadataFromStore; @@ -1099,42 +1103,30 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s allFilesUploadedListener.onResponse(Collections.emptyList()); return; } - final GroupedActionListener filesListener = - new GroupedActionListener<>(allFilesUploadedListener, indexIncrementalFileCount); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - // Flag to signal that the snapshot has been aborted/failed so we can stop any further blob uploads from starting - final AtomicBoolean alreadyFailed = new AtomicBoolean(); - for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { - executor.execute(new ActionRunnable<>(filesListener) { - @Override - protected void doRun() { - try { - if (alreadyFailed.get() == false) { - if (store.tryIncRef()) { - try { - snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); - } finally { - store.decRef(); - } - } else if (snapshotStatus.isAborted()) { - throw new IndexShardSnapshotFailedException(shardId, "Aborted"); - } else { - assert false : "Store was closed before aborting the snapshot"; - throw new IllegalStateException("Store is closed already"); - } + final int workers = Math.min(maxConcurrentWrites, indexIncrementalFileCount); + final GroupedActionListener filesListener = new GroupedActionListener<>(allFilesUploadedListener, workers); + for (int i = 0; i < workers; ++i) { + executor.execute(ActionRunnable.run(filesListener, () -> { + try { + BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS); + if (snapshotFileInfo != null) { + store.incRef(); + try { + do { + snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); + snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS); + } while (snapshotFileInfo != null); + } finally { + store.decRef(); } - filesListener.onResponse(null); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); } + } catch (Exception e) { + filesToSnapshot.clear(); // Stop uploading the remaining files if we run into any exception + throw e; } - - @Override - public void onFailure(Exception e) { - alreadyFailed.set(true); - super.onFailure(e); - } - }); + filesListener.onResponse(null); + })); } } catch (Exception e) { snapshotDoneListener.onFailure(e); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 4567b97700604..0871cd166439d 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -309,7 +309,7 @@ public ThreadPoolInfo info() { @Override public Info info(String name) { - throw new UnsupportedOperationException(); + return new Info(name, ThreadPoolType.FIXED, 1); } @Override From 852efe3d94b2ca9314dd7ec14d6eb7e394d14c03 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 16 Oct 2019 19:25:53 +0200 Subject: [PATCH 11/27] fix mistake --- .../repositories/blobstore/BlobStoreRepository.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 74f1fc8835b03..d1c95efa5f55a 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1125,7 +1125,6 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s filesToSnapshot.clear(); // Stop uploading the remaining files if we run into any exception throw e; } - filesListener.onResponse(null); })); } } catch (Exception e) { From 77b6ab2fad097647143cd7abf7b8c3a32b53cb89 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 16 Oct 2019 19:30:18 +0200 Subject: [PATCH 12/27] simpler --- .../blobstore/BlobStoreRepository.java | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index d1c95efa5f55a..c825b7dddaeb0 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1105,25 +1105,24 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s } final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); final int workers = Math.min(maxConcurrentWrites, indexIncrementalFileCount); - final GroupedActionListener filesListener = new GroupedActionListener<>(allFilesUploadedListener, workers); + final ActionListener filesListener = ActionListener.delegateResponse( + new GroupedActionListener<>(allFilesUploadedListener, workers), (l, e) -> { + filesToSnapshot.clear(); // Stop uploading the remaining files if we run into any exception + l.onFailure(e); + }); for (int i = 0; i < workers; ++i) { executor.execute(ActionRunnable.run(filesListener, () -> { - try { - BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS); - if (snapshotFileInfo != null) { - store.incRef(); - try { - do { - snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); - snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS); - } while (snapshotFileInfo != null); - } finally { - store.decRef(); - } + BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS); + if (snapshotFileInfo != null) { + store.incRef(); + try { + do { + snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); + snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS); + } while (snapshotFileInfo != null); + } finally { + store.decRef(); } - } catch (Exception e) { - filesToSnapshot.clear(); // Stop uploading the remaining files if we run into any exception - throw e; } })); } From f5e18c77a544bcf86c3a6303559aeb17b0d35f61 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 16 Oct 2019 19:59:23 +0200 Subject: [PATCH 13/27] fix test --- ...ckEventuallyConsistentRepositoryTests.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index 9d94fc3ed4368..3c585f4858da0 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -49,7 +49,7 @@ public void testReadAfterWriteConsistently() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + xContentRegistry(), mockThreadPool(), blobStoreContext)) { repository.start(); final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -69,7 +69,7 @@ public void testReadAfterWriteAfterReadThrows() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + xContentRegistry(), mockThreadPool(), blobStoreContext)) { repository.start(); final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -85,7 +85,7 @@ public void testReadAfterDeleteAfterWriteThrows() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + xContentRegistry(), mockThreadPool(), blobStoreContext)) { repository.start(); final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -103,7 +103,7 @@ public void testOverwriteRandomBlobFails() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + xContentRegistry(), mockThreadPool(), blobStoreContext)) { repository.start(); final BlobContainer container = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -120,7 +120,7 @@ public void testOverwriteShardSnapBlobFails() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + xContentRegistry(), mockThreadPool(), blobStoreContext)) { repository.start(); final BlobContainer container = repository.blobStore().blobContainer(repository.basePath().add("indices").add("someindex").add("0")); @@ -136,7 +136,7 @@ public void testOverwriteShardSnapBlobFails() throws IOException { public void testOverwriteSnapshotInfoBlob() { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); - final ThreadPool threadPool = mock(ThreadPool.class); + final ThreadPool threadPool = mockThreadPool(); when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService()); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), @@ -170,6 +170,13 @@ public void testOverwriteSnapshotInfoBlob() { } } + private static ThreadPool mockThreadPool() { + final ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn( + new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, 1)); + return threadPool; + } + private static void assertThrowsOnInconsistentRead(BlobContainer blobContainer, String blobName) { final AssertionError assertionError = expectThrows(AssertionError.class, () -> blobContainer.readBlob(blobName)); assertThat(assertionError.getMessage(), equalTo("Inconsistent read on [" + blobName + ']')); From cea3e850cbb4d781506d1221873363a65e7a01f3 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 16 Oct 2019 20:06:20 +0200 Subject: [PATCH 14/27] smarter snapshot info --- .../elasticsearch/repositories/url/URLRepositoryTests.java | 6 +++++- .../cluster/coordination/DeterministicTaskQueue.java | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java index 96a82ee0b9d24..4d31283c1c4a8 100644 --- a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java +++ b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java @@ -36,12 +36,16 @@ import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class URLRepositoryTests extends ESTestCase { private URLRepository createRepository(Settings baseSettings, RepositoryMetaData repositoryMetaData) { + final ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn( + new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, 1)); return new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings), - new NamedXContentRegistry(Collections.emptyList()), mock(ThreadPool.class)) { + new NamedXContentRegistry(Collections.emptyList()), threadPool) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 0871cd166439d..6d8acda4b823d 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -30,7 +30,9 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.Delayed; @@ -286,6 +288,8 @@ public ThreadPool getThreadPool() { * @return A ThreadPool that uses this task queue and wraps Runnables in the given wrapper. */ public ThreadPool getThreadPool(Function runnableWrapper) { + + final Map infos = new HashMap<>(); return new ThreadPool(settings) { { @@ -309,7 +313,7 @@ public ThreadPoolInfo info() { @Override public Info info(String name) { - return new Info(name, ThreadPoolType.FIXED, 1); + return infos.computeIfAbsent(name, n -> new Info(n, ThreadPoolType.FIXED, random.nextInt(10) + 1)); } @Override From 087aeebc38f0896d80686f7f8c745d5c27e7f464 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 16 Oct 2019 20:10:03 +0200 Subject: [PATCH 15/27] cleaner --- .../cluster/coordination/DeterministicTaskQueue.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 6d8acda4b823d..0837f431fff9c 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -288,10 +288,10 @@ public ThreadPool getThreadPool() { * @return A ThreadPool that uses this task queue and wraps Runnables in the given wrapper. */ public ThreadPool getThreadPool(Function runnableWrapper) { - - final Map infos = new HashMap<>(); return new ThreadPool(settings) { + private final Map infos = new HashMap<>(); + { stopCachedTimeThread(); } From 3bb0683c41815f78ce8de422b12ce63a31d25db4 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 16 Oct 2019 20:25:46 +0200 Subject: [PATCH 16/27] shorter --- .../repositories/url/URLRepositoryTests.java | 6 +----- .../blobstore/BlobStoreRepository.java | 7 ++----- ...ckEventuallyConsistentRepositoryTests.java | 21 +++++++------------ 3 files changed, 11 insertions(+), 23 deletions(-) diff --git a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java index 4d31283c1c4a8..96a82ee0b9d24 100644 --- a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java +++ b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java @@ -36,16 +36,12 @@ import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class URLRepositoryTests extends ESTestCase { private URLRepository createRepository(Settings baseSettings, RepositoryMetaData repositoryMetaData) { - final ThreadPool threadPool = mock(ThreadPool.class); - when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn( - new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, 1)); return new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings), - new NamedXContentRegistry(Collections.emptyList()), threadPool) { + new NamedXContentRegistry(Collections.emptyList()), mock(ThreadPool.class)) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index c825b7dddaeb0..2923c893b5994 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -196,9 +196,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final BlobPath basePath; - // Maximum number of concurrent file writes that should be scheduled on the SNAPSHOT thread-pool - private final int maxConcurrentWrites; - /** * Constructs new BlobStoreRepository * @param metadata The metadata for this repository including name and settings @@ -227,7 +224,6 @@ protected BlobStoreRepository( IndexMetaData::fromXContent, namedXContentRegistry, compress); snapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, SnapshotInfo::fromXContentInternal, namedXContentRegistry, compress); - maxConcurrentWrites = threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(); } @Override @@ -1104,7 +1100,8 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s return; } final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - final int workers = Math.min(maxConcurrentWrites, indexIncrementalFileCount); + // Start as many workers as fit into the snapshot pool at once at the most + final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), indexIncrementalFileCount); final ActionListener filesListener = ActionListener.delegateResponse( new GroupedActionListener<>(allFilesUploadedListener, workers), (l, e) -> { filesToSnapshot.clear(); // Stop uploading the remaining files if we run into any exception diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index 3c585f4858da0..a1903e1a3f1e6 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -49,7 +49,7 @@ public void testReadAfterWriteConsistently() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mockThreadPool(), blobStoreContext)) { + xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { repository.start(); final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -69,7 +69,7 @@ public void testReadAfterWriteAfterReadThrows() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mockThreadPool(), blobStoreContext)) { + xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { repository.start(); final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -85,7 +85,7 @@ public void testReadAfterDeleteAfterWriteThrows() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mockThreadPool(), blobStoreContext)) { + xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { repository.start(); final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -103,7 +103,7 @@ public void testOverwriteRandomBlobFails() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mockThreadPool(), blobStoreContext)) { + xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { repository.start(); final BlobContainer container = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -120,7 +120,7 @@ public void testOverwriteShardSnapBlobFails() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mockThreadPool(), blobStoreContext)) { + xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { repository.start(); final BlobContainer container = repository.blobStore().blobContainer(repository.basePath().add("indices").add("someindex").add("0")); @@ -136,8 +136,10 @@ public void testOverwriteShardSnapBlobFails() throws IOException { public void testOverwriteSnapshotInfoBlob() { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); - final ThreadPool threadPool = mockThreadPool(); + final ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService()); + when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn( + new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, 1)); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), xContentRegistry(), threadPool, blobStoreContext)) { @@ -170,13 +172,6 @@ public void testOverwriteSnapshotInfoBlob() { } } - private static ThreadPool mockThreadPool() { - final ThreadPool threadPool = mock(ThreadPool.class); - when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn( - new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, 1)); - return threadPool; - } - private static void assertThrowsOnInconsistentRead(BlobContainer blobContainer, String blobName) { final AssertionError assertionError = expectThrows(AssertionError.class, () -> blobContainer.readBlob(blobName)); assertThat(assertionError.getMessage(), equalTo("Inconsistent read on [" + blobName + ']')); From ea7f6ce243d341396d2c3dec790ebd353ac4358e Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 16 Oct 2019 20:32:54 +0200 Subject: [PATCH 17/27] more randomness --- .../mockstore/MockEventuallyConsistentRepositoryTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index a1903e1a3f1e6..ee766ef7360b5 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -139,7 +139,7 @@ public void testOverwriteSnapshotInfoBlob() { final ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService()); when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn( - new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, 1)); + new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10))); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), xContentRegistry(), threadPool, blobStoreContext)) { From 1cf0f134e8ae0f9561dab19ed4541bb768e87ec2 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 17 Oct 2019 08:32:58 +0200 Subject: [PATCH 18/27] cleanup abstraction levels --- .../blobstore/BlobStoreRepository.java | 59 +++++++++++++++- .../blobstore/FileRestoreContext.java | 68 +------------------ .../xpack/ccr/repository/CcrRepository.java | 9 +-- 3 files changed, 61 insertions(+), 75 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index fcbb37fa4f902..37ae44939a699 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -22,9 +22,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; @@ -1151,9 +1155,25 @@ public void restoreShard(Store store, SnapshotId snapshotId, Version version, In final BlobContainer container = shardContainer(indexId, snapshotShardId); BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId); SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); - new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE, threadPool.generic()) { + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE) { @Override - protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { + protected void restoreFiles(List filesToRecover, Store store, + ActionListener listener) { + if (filesToRecover.isEmpty()) { + listener.onResponse(null); + } else { + ActionListener allFilesListener = + new GroupedActionListener<>(ActionListener.map(listener, v -> null), filesToRecover.size()); + // restore the files from the snapshot to the Lucene store + for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { + logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); + executor.execute(ActionRunnable.run(allFilesListener, () -> restoreFile(fileToRecover, store))); + } + } + } + + private InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { final InputStream dataBlobCompositeStream = new SlicedInputStream(fileInfo.numberOfParts()) { @Override protected InputStream openSlice(long slice) throws IOException { @@ -1163,6 +1183,41 @@ protected InputStream openSlice(long slice) throws IOException { return restoreRateLimiter == null ? dataBlobCompositeStream : new RateLimitingInputStream(dataBlobCompositeStream, restoreRateLimiter, restoreRateLimitingTimeInNanos::inc); } + + /** + * Restores a file + * @param fileInfo file to be restored + */ + private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) throws IOException { + boolean success = false; + + try (InputStream stream = fileInputStream(fileInfo)) { + try (IndexOutput indexOutput = + store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { + final byte[] buffer = new byte[bufferSize]; + int length; + while ((length = stream.read(buffer)) > 0) { + indexOutput.writeBytes(buffer, 0, length); + recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length); + } + Store.verify(indexOutput); + indexOutput.close(); + store.directory().sync(Collections.singleton(fileInfo.physicalName())); + success = true; + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { + try { + store.markStoreCorrupted(ex); + } catch (IOException e) { + logger.warn("store cannot be marked as corrupted", e); + } + throw ex; + } finally { + if (success == false) { + store.deleteQuiet(fileInfo.physicalName()); + } + } + } + } }.restore(snapshotFiles, store, l); })); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java index 7c80442ff96d1..134f206ca8b52 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java @@ -21,14 +21,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.IndexFormatTooNewException; -import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexOutput; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.shard.ShardId; @@ -41,14 +34,11 @@ import org.elasticsearch.snapshots.SnapshotId; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; @@ -69,8 +59,6 @@ public abstract class FileRestoreContext { protected final ShardId shardId; protected final int bufferSize; - private final Executor executor; - /** * Constructs new restore context * @@ -80,13 +68,12 @@ public abstract class FileRestoreContext { * @param bufferSize buffer size for restore */ protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState, - int bufferSize, Executor executor) { + int bufferSize) { this.repositoryName = repositoryName; this.recoveryState = recoveryState; this.snapshotId = snapshotId; this.shardId = shardId; this.bufferSize = bufferSize; - this.executor = executor; } /** @@ -224,60 +211,11 @@ private void afterRestore(SnapshotFiles snapshotFiles, Store store, StoreFileMet listener.onResponse(null); } - protected void restoreFiles(List filesToRecover, Store store, ActionListener listener) { - if (filesToRecover.isEmpty()) { - listener.onResponse(null); - } else { - ActionListener allFilesListener = - new GroupedActionListener<>(ActionListener.map(listener, v -> null), filesToRecover.size()); - // restore the files from the snapshot to the Lucene store - for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { - logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); - executor.execute(ActionRunnable.run(allFilesListener, () -> restoreFile(fileToRecover, store))); - } - } - } - - protected abstract InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo); + protected abstract void restoreFiles(List filesToRecover, Store store, + ActionListener listener); @SuppressWarnings("unchecked") private static Iterable concat(Store.RecoveryDiff diff) { return Iterables.concat(diff.different, diff.missing); } - - /** - * Restores a file - * - * @param fileInfo file to be restored - */ - private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final Store store) throws IOException { - boolean success = false; - - try (InputStream stream = fileInputStream(fileInfo)) { - try (IndexOutput indexOutput = - store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { - final byte[] buffer = new byte[bufferSize]; - int length; - while ((length = stream.read(buffer)) > 0) { - indexOutput.writeBytes(buffer, 0, length); - recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length); - } - Store.verify(indexOutput); - indexOutput.close(); - store.directory().sync(Collections.singleton(fileInfo.physicalName())); - success = true; - } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { - try { - store.markStoreCorrupted(ex); - } catch (IOException e) { - logger.warn("store cannot be marked as corrupted", e); - } - throw ex; - } finally { - if (success == false) { - store.deleteQuiet(fileInfo.physicalName()); - } - } - } - } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 17d0a163c9b9c..af0a419e0d37c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -81,7 +81,6 @@ import java.io.Closeable; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -457,8 +456,7 @@ private static class RestoreSession extends FileRestoreContext implements Closea RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, ShardId shardId, RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion, ThreadPool threadPool, CcrSettings ccrSettings, LongConsumer throttleListener) { - super(repositoryName, shardId, SNAPSHOT_ID, recoveryState, Math.toIntExact(ccrSettings.getChunkSize().getBytes()), - threadPool.generic()); + super(repositoryName, shardId, SNAPSHOT_ID, recoveryState, Math.toIntExact(ccrSettings.getChunkSize().getBytes())); this.remoteClient = remoteClient; this.sessionUUID = sessionUUID; this.node = node; @@ -577,11 +575,6 @@ private void handleError(Store store, Exception e) throws IOException { } } - @Override - protected InputStream fileInputStream(FileInfo fileInfo) { - throw new UnsupportedOperationException(); - } - @Override public void close() { ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node); From c48183f391e3fbf4f4324077b743da128ad7ec0b Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 21 Oct 2019 18:45:22 +0100 Subject: [PATCH 19/27] nicer --- .../blobstore/BlobStoreRepository.java | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index ad5abaa81e67a..3ad61ce281bcd 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1160,17 +1160,6 @@ protected void restoreFiles(List filesToRe } } - private InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { - final InputStream dataBlobCompositeStream = new SlicedInputStream(fileInfo.numberOfParts()) { - @Override - protected InputStream openSlice(long slice) throws IOException { - return container.readBlob(fileInfo.partName(slice)); - } - }; - return restoreRateLimiter == null ? dataBlobCompositeStream : - new RateLimitingInputStream(dataBlobCompositeStream, restoreRateLimiter, restoreRateLimitingTimeInNanos::inc); - } - /** * Restores a file * @param fileInfo file to be restored @@ -1178,7 +1167,13 @@ protected InputStream openSlice(long slice) throws IOException { private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) throws IOException { boolean success = false; - try (InputStream stream = fileInputStream(fileInfo)) { + try (InputStream stream = maybeRateLimit(new SlicedInputStream(fileInfo.numberOfParts()) { + @Override + protected InputStream openSlice(long slice) throws IOException { + return container.readBlob(fileInfo.partName(slice)); + } + }, + restoreRateLimiter, restoreRateLimitingTimeInNanos)) { try (IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { final byte[] buffer = new byte[BUFFER_SIZE]; From f273373324f8c309e805034e5ccd8871f4dd8c06 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 21 Oct 2019 19:42:24 +0100 Subject: [PATCH 20/27] nicer --- .../blobstore/BlobStoreRepository.java | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 3ad61ce281bcd..6becbdb7f1247 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1106,11 +1106,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); // Start as many workers as fit into the snapshot pool at once at the most final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), indexIncrementalFileCount); - final ActionListener filesListener = ActionListener.delegateResponse( - new GroupedActionListener<>(allFilesUploadedListener, workers), (l, e) -> { - filesToSnapshot.clear(); // Stop uploading the remaining files if we run into any exception - l.onFailure(e); - }); + final ActionListener filesListener = fileQueueListener(filesToSnapshot, workers, allFilesUploadedListener); for (int i = 0; i < workers; ++i) { executor.execute(ActionRunnable.run(filesListener, () -> { BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS); @@ -1138,11 +1134,11 @@ public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, Sh final ShardId shardId = store.shardId(); final ActionListener restoreListener = ActionListener.delegateResponse(listener, (l, e) -> l.onFailure(new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e))); - threadPool.generic().execute(ActionRunnable.wrap(restoreListener, l -> { + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + executor.execute(ActionRunnable.wrap(restoreListener, l -> { final BlobContainer container = shardContainer(indexId, snapshotShardId); BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId); SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); - final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) { @Override protected void restoreFiles(List filesToRecover, Store store, @@ -1150,12 +1146,19 @@ protected void restoreFiles(List filesToRe if (filesToRecover.isEmpty()) { listener.onResponse(null); } else { - ActionListener allFilesListener = - new GroupedActionListener<>(ActionListener.map(listener, v -> null), filesToRecover.size()); + // Start as many workers as fit into the snapshot pool at once at the most + final int workers = + Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), snapshotFiles.indexFiles().size()); + final BlockingQueue files = new LinkedBlockingQueue<>(filesToRecover); + ActionListener allFilesListener = fileQueueListener(files, workers, ActionListener.map(listener, v -> null)); // restore the files from the snapshot to the Lucene store - for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { - logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); - executor.execute(ActionRunnable.run(allFilesListener, () -> restoreFile(fileToRecover, store))); + for (int i = 0; i < workers; ++i) { + executor.execute(ActionRunnable.run(allFilesListener, () -> { + BlobStoreIndexShardSnapshot.FileInfo fileToRecover; + while ((fileToRecover = files.poll(0L, TimeUnit.MILLISECONDS)) != null) { + restoreFile(fileToRecover, store); + } + })); } } } @@ -1204,6 +1207,14 @@ protected InputStream openSlice(long slice) throws IOException { })); } + private static ActionListener fileQueueListener(BlockingQueue files, int workers, + ActionListener> listener) { + return ActionListener.delegateResponse(new GroupedActionListener<>(listener, workers), (l, e) -> { + files.clear(); // Stop uploading the remaining files if we run into any exception + l.onFailure(e); + }); + } + private static InputStream maybeRateLimit(InputStream stream, @Nullable RateLimiter rateLimiter, CounterMetric metric) { return rateLimiter == null ? stream : new RateLimitingInputStream(stream, rateLimiter, metric::inc); } From 94feeca4922626a7a4dd27cd9374244fb577d9dd Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 21 Oct 2019 22:16:45 +0100 Subject: [PATCH 21/27] nicer + todo --- .../repositories/blobstore/BlobStoreRepository.java | 9 +++++---- .../xpack/ccr/repository/CcrRepository.java | 2 ++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 6becbdb7f1247..4db316aacffc4 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1135,10 +1135,10 @@ public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, Sh final ActionListener restoreListener = ActionListener.delegateResponse(listener, (l, e) -> l.onFailure(new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e))); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + final BlobContainer container = shardContainer(indexId, snapshotShardId); executor.execute(ActionRunnable.wrap(restoreListener, l -> { - final BlobContainer container = shardContainer(indexId, snapshotShardId); - BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId); - SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); + final BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId); + final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) { @Override protected void restoreFiles(List filesToRecover, Store store, @@ -1150,7 +1150,8 @@ protected void restoreFiles(List filesToRe final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), snapshotFiles.indexFiles().size()); final BlockingQueue files = new LinkedBlockingQueue<>(filesToRecover); - ActionListener allFilesListener = fileQueueListener(files, workers, ActionListener.map(listener, v -> null)); + final ActionListener allFilesListener = + fileQueueListener(files, workers, ActionListener.map(listener, v -> null)); // restore the files from the snapshot to the Lucene store for (int i = 0; i < workers; ++i) { executor.execute(ActionRunnable.run(allFilesListener, () -> { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index cded5ff55ab9f..fc951fb3aa5ba 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -481,6 +481,8 @@ void restoreFiles(Store store) { @Override protected void restoreFiles(List filesToRecover, Store store, ActionListener doneListener) { logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover); + // TODO: now that the restoreFiles API is async we should stop blocking a thread here and make the following logic fully + // async ActionListener.completeWith(doneListener, () -> { try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> { })) { From 0971b46b0c29ff94f4c87a83a137a85267982d62 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 23 Oct 2019 18:27:20 +0200 Subject: [PATCH 22/27] add todo --- .../org/elasticsearch/xpack/ccr/repository/CcrRepository.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index e8c76e33a7799..2f8fc4610d954 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -297,6 +297,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s @Override public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState, ActionListener listener) { + // TODO: Instead of blocking in the restore logic and synchronously completing the listener we should just make below logic async ActionListener.completeWith(listener, () -> { // TODO: Add timeouts to network calls / the restore process. createEmptyStore(store); From 80bf095f40a78b81c0d26df7703b84b3db7bfb9a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 23 Oct 2019 19:14:45 +0200 Subject: [PATCH 23/27] fix --- .../java/org/elasticsearch/index/shard/StoreRecovery.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index de5a7c3893adc..bb4665a033d51 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -271,7 +271,8 @@ public void readBytes(byte[] b, int offset, int len) throws IOException { * previously created index snapshot into an existing initializing shard. * @param indexShard the index shard instance to recovery the snapshot from * @param repository the repository holding the physical files the shard should be recovered from - * TODO: document listener + * @param listener resolves to true if the shard has been recovered successfully, false if the recovery + * has been ignored due to a concurrent modification of if the clusters state has changed due to async updates. */ void recoverFromRepository(final IndexShard indexShard, Repository repository, ActionListener listener) { try { @@ -482,7 +483,6 @@ private void restore(IndexShard indexShard, Repository repository, SnapshotRecov indexShard.finalizeRecovery(); indexShard.postRecovery("restore done"); listener.onResponse(true); - listener.onResponse(true); }, e -> listener.onFailure(new IndexShardRestoreFailedException(shardId, "restore failed", e)) ); try { From 3ef568651b21a89cdb121d9332a6a638536849ab Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 24 Oct 2019 01:44:43 +0200 Subject: [PATCH 24/27] nicer --- .../index/shard/StoreRecovery.java | 50 ++++++++----------- 1 file changed, 20 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index bb4665a033d51..c414bf87235c5 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -59,7 +59,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.stream.Collectors; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; @@ -93,11 +92,14 @@ boolean recoverFromStore(final IndexShard indexShard) { assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE : "expected store recovery type but was: " + recoveryType; final PlainActionFuture future = PlainActionFuture.newFuture(); - executeRecovery(indexShard, l -> { + final ActionListener recoveryListener = recoveryListener(indexShard, future); + try { logger.debug("starting recovery from store ..."); internalRecoverFromStore(indexShard); - l.onResponse(true); - }, future); + recoveryListener.onResponse(true); + } catch (Exception e) { + recoveryListener.onFailure(e); + } return future.actionGet(); } return false; @@ -125,14 +127,14 @@ boolean recoverFromLocalShards(BiConsumer mappingUpdate final boolean hasNested = indexShard.mapperService().hasNested(); final boolean isSplit = sourceMetaData.getNumberOfShards() < indexShard.indexSettings().getNumberOfShards(); final PlainActionFuture future = PlainActionFuture.newFuture(); - executeRecovery(indexShard, l -> { + ActionListener.completeWith(recoveryListener(indexShard, future), () -> { logger.debug("starting recovery from local shards {}", shards); try { final Directory directory = indexShard.store().directory(); // don't close this directory!! final Directory[] sources = shards.stream().map(LocalShardSnapshot::getSnapshotDirectory).toArray(Directory[]::new); final long maxSeqNo = shards.stream().mapToLong(LocalShardSnapshot::maxSeqNo).max().getAsLong(); final long maxUnsafeAutoIdTimestamp = - shards.stream().mapToLong(LocalShardSnapshot::maxUnsafeAutoIdTimestamp).max().getAsLong(); + shards.stream().mapToLong(LocalShardSnapshot::maxUnsafeAutoIdTimestamp).max().getAsLong(); addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo, maxUnsafeAutoIdTimestamp, indexShard.indexSettings().getIndexMetaData(), indexShard.shardId().id(), isSplit, hasNested); internalRecoverFromStore(indexShard); @@ -140,11 +142,11 @@ boolean recoverFromLocalShards(BiConsumer mappingUpdate // copied segments - we will also see them in stats etc. indexShard.getEngine().forceMerge(false, -1, false, false, false); - l.onResponse(true); + return true; } catch (IOException ex) { throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex); } - }, future); + }); return future.actionGet(); } return false; @@ -280,7 +282,7 @@ void recoverFromRepository(final IndexShard indexShard, Repository repository, A RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); assert recoveryType == RecoverySource.Type.SNAPSHOT : "expected snapshot recovery type: " + recoveryType; SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) indexShard.recoveryState().getRecoverySource(); - executeRecovery(indexShard, l -> restore(indexShard, repository, recoverySource, l), listener); + restore(indexShard, repository, recoverySource, recoveryListener(indexShard, listener)); } else { listener.onResponse(false); } @@ -300,12 +302,8 @@ private boolean canRecover(IndexShard indexShard) { return true; } - /** - * Recovers the state of the shard from the store. - */ - private void executeRecovery(IndexShard indexShard, Consumer> recoveryRunnable, - ActionListener listener) throws IndexShardRecoveryException { - ActionListener finalListener = ActionListener.wrap(res -> { + private ActionListener recoveryListener(IndexShard indexShard, ActionListener listener) { + return ActionListener.wrap(res -> { if (res) { // Check that the gateway didn't leave the shard in init or recovering stage. it is up to the gateway // to call post recovery. @@ -337,37 +335,29 @@ private void executeRecovery(IndexShard indexShard, Consumer { - try { - throw ex; - } catch (IndexShardRecoveryException e) { + if (ex instanceof IndexShardRecoveryException) { if (indexShard.state() == IndexShardState.CLOSED) { // got closed on us, just ignore this recovery listener.onResponse(false); return; } - if ((e.getCause() instanceof IndexShardClosedException) || (e.getCause() instanceof IndexShardNotStartedException)) { + if ((ex.getCause() instanceof IndexShardClosedException) || (ex.getCause() instanceof IndexShardNotStartedException)) { // got closed on us, just ignore this recovery listener.onResponse(false); return; } - throw e; - } catch (IndexShardClosedException | IndexShardNotStartedException e) { - } catch (Exception e) { + listener.onFailure(ex); + } else if (ex instanceof IndexShardClosedException || ex instanceof IndexShardNotStartedException) { + listener.onResponse(false); + } else { if (indexShard.state() == IndexShardState.CLOSED) { // got closed on us, just ignore this recovery listener.onResponse(false); } else { - listener.onFailure(new IndexShardRecoveryException(shardId, "failed recovery", e)); + listener.onFailure(new IndexShardRecoveryException(shardId, "failed recovery", ex)); } - return; } - listener.onResponse(false); }); - try { - recoveryRunnable.accept(finalListener); - } catch (Exception e) { - finalListener.onFailure(e); - } } /** From a8989ccab561f98d6f10d7485bcb7b93fd653d22 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 24 Oct 2019 17:37:12 +0200 Subject: [PATCH 25/27] fixes --- .../blobstore/BlobStoreRepository.java | 11 +++++--- .../blobstore/FileRestoreContext.java | 9 ++++++- .../ShardFollowTaskReplicationTests.java | 26 +++++++++---------- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index b11cd67a88663..90432ce7fdafc 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1249,9 +1249,14 @@ protected void restoreFiles(List filesToRe // restore the files from the snapshot to the Lucene store for (int i = 0; i < workers; ++i) { executor.execute(ActionRunnable.run(allFilesListener, () -> { - BlobStoreIndexShardSnapshot.FileInfo fileToRecover; - while ((fileToRecover = files.poll(0L, TimeUnit.MILLISECONDS)) != null) { - restoreFile(fileToRecover, store); + store.incRef(); + try { + BlobStoreIndexShardSnapshot.FileInfo fileToRecover; + while ((fileToRecover = files.poll(0L, TimeUnit.MILLISECONDS)) != null) { + restoreFile(fileToRecover, store); + } + } finally { + store.decRef(); } })); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java index 2796742dbb3a3..51df31edbda05 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java @@ -152,7 +152,14 @@ public void restore(SnapshotFiles snapshotFiles, Store store, ActionListener afterRestore(snapshotFiles, store, restoredSegmentsFile, listener), listener::onFailure)); + v -> { + store.incRef(); + try { + afterRestore(snapshotFiles, store, restoredSegmentsFile, listener); + } finally { + store.decRef(); + } + }, listener::onFailure)); } catch (IOException ex) { throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 281d9c27b4e76..3fdf16a67946f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -457,24 +457,24 @@ protected synchronized void recoverPrimary(IndexShard primary) { public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState, ActionListener listener) { ActionListener.completeWith(listener, () -> { - try { - IndexShard leader = leaderGroup.getPrimary(); - Lucene.cleanLuceneIndex(primary.store().directory()); - try (Engine.IndexCommitRef sourceCommit = leader.acquireSafeIndexCommit()) { - Store.MetadataSnapshot sourceSnapshot = leader.store().getMetadata(sourceCommit.getIndexCommit()); - for (StoreFileMetaData md : sourceSnapshot) { - primary.store().directory().copyFrom( - leader.store().directory(), md.name(), md.name(), IOContext.DEFAULT); - } + IndexShard leader = leaderGroup.getPrimary(); + Lucene.cleanLuceneIndex(primary.store().directory()); + try (Engine.IndexCommitRef sourceCommit = leader.acquireSafeIndexCommit()) { + Store.MetadataSnapshot sourceSnapshot = leader.store().getMetadata(sourceCommit.getIndexCommit()); + for (StoreFileMetaData md : sourceSnapshot) { + primary.store().directory().copyFrom( + leader.store().directory(), md.name(), md.name(), IOContext.DEFAULT); } - return null; - } catch (Exception ex) { - throw new AssertionError(ex); } + return null; }); } }, future); - future.actionGet(); + try { + future.actionGet(); + } catch (Exception ex) { + throw new AssertionError(ex); + } } }; } From b781b5b1c27610ea47ad97f1a5bfed2283b2f372 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 24 Oct 2019 18:08:29 +0200 Subject: [PATCH 26/27] shorter diff --- .../repositories/blobstore/BlobStoreRepository.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 90432ce7fdafc..f93226e271665 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1263,10 +1263,6 @@ protected void restoreFiles(List filesToRe } } - /** - * Restores a file - * @param fileInfo file to be restored - */ private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) throws IOException { boolean success = false; From 215609e6e67866ac994bd1b8645e3a535e4ac5c3 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 29 Oct 2019 14:17:50 +0100 Subject: [PATCH 27/27] CR: comments --- .../org/elasticsearch/index/shard/IndexShard.java | 11 ++++++----- .../org/elasticsearch/index/shard/StoreRecovery.java | 1 + .../repositories/blobstore/FileRestoreContext.java | 7 +++---- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 5b45a191bc211..dfe0cec46c984 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -42,6 +42,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; @@ -2508,15 +2509,15 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService case SNAPSHOT: markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState.getRecoverySource(); - threadPool.generic().execute(() -> restoreFromRepository( - repositoriesService.repository(recoverySource.snapshot().getRepository()), - ActionListener.wrap(r -> { + threadPool.generic().execute( + ActionRunnable.wrap(ActionListener.wrap(r -> { if (r) { recoveryListener.onRecoveryDone(recoveryState); } }, - e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true))) - ); + e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), + restoreListener -> restoreFromRepository( + repositoriesService.repository(recoverySource.snapshot().getRepository()), restoreListener))); break; case LOCAL_SHARDS: final IndexMetaData indexMetaData = indexSettings().getIndexMetaData(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index c414bf87235c5..07850d5e58f1e 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -147,6 +147,7 @@ boolean recoverFromLocalShards(BiConsumer mappingUpdate throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex); } }); + assert future.isDone(); return future.actionGet(); } return false; diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java index 51df31edbda05..885e7e0ac2ce1 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java @@ -155,7 +155,8 @@ public void restore(SnapshotFiles snapshotFiles, Store store, ActionListener { store.incRef(); try { - afterRestore(snapshotFiles, store, restoredSegmentsFile, listener); + afterRestore(snapshotFiles, store, restoredSegmentsFile); + listener.onResponse(null); } finally { store.decRef(); } @@ -170,8 +171,7 @@ public void restore(SnapshotFiles snapshotFiles, Store store, ActionListener listener) { + private void afterRestore(SnapshotFiles snapshotFiles, Store store, StoreFileMetaData restoredSegmentsFile) { // read the snapshot data persisted try { Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory()); @@ -195,7 +195,6 @@ private void afterRestore(SnapshotFiles snapshotFiles, Store store, StoreFileMet } catch (IOException e) { logger.warn("[{}] [{}] failed to list directory - some of files might not be deleted", shardId, snapshotId); } - listener.onResponse(null); } /**