From dbe0f016115cdac4c51c948f2f2485b00bab1333 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 27 Jul 2016 17:48:57 +0200 Subject: [PATCH 01/15] add locking to store access --- .../java/org/elasticsearch/index/store/Store.java | 11 +++++++---- .../store/TransportNodesListShardStoreMetaData.java | 9 ++++++++- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index 61d970c55948d..a9aa4ad7401dd 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -39,6 +39,7 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.Lock; +import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.SimpleFSDirectory; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.BytesRef; @@ -219,11 +220,11 @@ final void ensureOpen() { */ public MetadataSnapshot getMetadataOrEmpty() throws IOException { try { - return getMetadata(null); + return getMetadata(); } catch (IndexNotFoundException ex) { // that's fine - happens all the time no need to log - } catch (FileNotFoundException | NoSuchFileException ex) { - logger.info("Failed to open / find files while reading metadata snapshot"); + } catch (FileNotFoundException | NoSuchFileException | LockObtainFailedException ex) { + logger.info("Failed to open / find files while reading metadata snapshot", ex); } return MetadataSnapshot.EMPTY; } @@ -240,7 +241,9 @@ public MetadataSnapshot getMetadataOrEmpty() throws IOException { * @throws IndexNotFoundException if no index / valid commit-point can be found in this store */ public MetadataSnapshot getMetadata() throws IOException { - return getMetadata(null); + try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + return getMetadata(null); + } } /** diff --git a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index cdc95e1895ac0..99591a01ba908 100644 --- a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.store; +import org.apache.lucene.index.IndexCommit; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; @@ -122,13 +123,19 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException IndexService indexService = indicesService.indexService(shardId.getIndex()); if (indexService != null) { IndexShard indexShard = indexService.getShardOrNull(shardId.id()); + if (indexShard != null) { final Store store = indexShard.store(); + IndexCommit snapshot = null; store.incRef(); try { exists = true; - return new StoreFilesMetaData(shardId, store.getMetadataOrEmpty()); + snapshot = indexShard.snapshotIndex(false); + return new StoreFilesMetaData(shardId, store.getMetadata(snapshot)); } finally { + if (snapshot != null) { + indexShard.releaseSnapshot(snapshot); + } store.decRef(); } } From 5eca2027cc7f00c7cc9f9650b9dad4747addd37e Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 27 Jul 2016 19:13:12 +0200 Subject: [PATCH 02/15] fix some tests --- .../java/org/elasticsearch/index/store/Store.java | 12 +++++++++++- .../org/elasticsearch/index/store/StoreTests.java | 6 +++--- .../indices/recovery/RecoverySourceHandlerTests.java | 4 +++- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index a9aa4ad7401dd..98b7e388b587e 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -213,6 +213,9 @@ final void ensureOpen() { * Returns a new MetadataSnapshot for the latest commit in this store or * an empty snapshot if no index exists or can not be opened. * + * Note: the method is a simple wrapper around {@link #getMetadata()}, which tries to acquire the + * {@link IndexWriter#WRITE_LOCK_NAME} for the underlying directory not ensure no concurrent file changes are happening. + * * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an * unexpected exception when opening the index reading the segments file. * @throws IndexFormatTooOldException if the lucene index is too old to be opened. @@ -232,6 +235,9 @@ public MetadataSnapshot getMetadataOrEmpty() throws IOException { /** * Returns a new MetadataSnapshot for the latest commit in this store. * + * * Note: the method tries to acquire the + * {@link IndexWriter#WRITE_LOCK_NAME} for the underlying directory not ensure no concurrent file changes are happening. + * * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an * unexpected exception when opening the index reading the segments file. * @throws IndexFormatTooOldException if the lucene index is too old to be opened. @@ -250,6 +256,10 @@ public MetadataSnapshot getMetadata() throws IOException { * Returns a new MetadataSnapshot for the given commit. If the given commit is null * the latest commit point is used. * + * Note that unlike {@link #getMetadata()} and {@link #getMetadataOrEmpty()}, this method doesn't try + * to lock the directory write lock. The owner must verify it has the right to access the store and + * no concurrent file changes are happening. + * * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an * unexpected exception when opening the index reading the segments file. * @throws IndexFormatTooOldException if the lucene index is too old to be opened. @@ -637,7 +647,7 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetaData) thr // ignore, we don't really care, will get deleted later on } } - final Store.MetadataSnapshot metadataOrEmpty = getMetadata(); + final Store.MetadataSnapshot metadataOrEmpty = getMetadata(null); verifyAfterCleanup(sourceMetaData, metadataOrEmpty); } finally { metadataLock.writeLock().unlock(); diff --git a/core/src/test/java/org/elasticsearch/index/store/StoreTests.java b/core/src/test/java/org/elasticsearch/index/store/StoreTests.java index e1636b713a1c9..8174f011de43e 100644 --- a/core/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/core/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -328,7 +328,7 @@ public void testNewChecksums() throws IOException { Store.MetadataSnapshot metadata; // check before we committed try { - store.getMetadata(); + store.getMetadata(null); fail("no index present - expected exception"); } catch (IndexNotFoundException ex) { // expected @@ -336,7 +336,7 @@ public void testNewChecksums() throws IOException { assertThat(store.getMetadataOrEmpty(), is(Store.MetadataSnapshot.EMPTY)); // nothing committed writer.commit(); writer.close(); - metadata = store.getMetadata(); + metadata = store.getMetadata(null); assertThat(metadata.asMap().isEmpty(), is(false)); for (StoreFileMetaData meta : metadata) { try (IndexInput input = store.directory().openInput(meta.name(), IOContext.DEFAULT)) { @@ -723,7 +723,7 @@ public void testCleanupFromSnapshot() throws IOException { writer.addDocument(doc); } - Store.MetadataSnapshot firstMeta = store.getMetadata(); + Store.MetadataSnapshot firstMeta = store.getMetadata(null); if (random().nextBoolean()) { for (int i = 0; i < docs; i++) { diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index cfff28121bab9..b8706c2fef6c5 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -97,6 +97,8 @@ public void testSendFiles() throws Throwable { writer.addDocument(document); } writer.commit(); + writer.close(); + Store.MetadataSnapshot metadata = store.getMetadata(); List metas = new ArrayList<>(); for (StoreFileMetaData md : metadata) { @@ -123,7 +125,7 @@ public void close() throws IOException { assertEquals(0, recoveryDiff.missing.size()); IndexReader reader = DirectoryReader.open(targetStore.directory()); assertEquals(numDocs, reader.maxDoc()); - IOUtils.close(reader, writer, store, targetStore); + IOUtils.close(reader, store, targetStore); } public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable { From ae49df45fb969a009aaf265238cc473c60abf4ae Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 28 Jul 2016 10:53:31 +0200 Subject: [PATCH 03/15] more logging --- .../java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 59f01f56ce13e..65b5c70d91399 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -45,6 +45,7 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.InternalTestCluster.RestartCallback; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.store.MockFSDirectoryService; import org.elasticsearch.test.store.MockFSIndexStore; @@ -387,6 +388,7 @@ public void doAfterNodes(int numNodes, Client client) throws Exception { assertThat(state.metaData().index("test").getAliases().get("test_alias").filter(), notNullValue()); } + @TestLogging("indices.recovery:TRACE") public void testReusePeerRecovery() throws Exception { final Settings settings = Settings.builder() .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) From f4e212231d0f353aecec3790d8c9f2070c3a8c63 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 28 Jul 2016 11:50:18 +0200 Subject: [PATCH 04/15] add a direct access to the store, so engine maybe open or close --- .../elasticsearch/index/engine/Engine.java | 2 +- .../index/engine/InternalEngine.java | 2 +- .../index/engine/ShadowEngine.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 24 +++++++++++++++---- .../index/shard/LocalShardSnapshot.java | 6 ++--- .../recovery/RecoverySourceHandler.java | 4 ++-- .../recovery/RecoveryTargetService.java | 1 + .../TransportNodesListShardStoreMetaData.java | 17 ++----------- .../repositories/Repository.java | 10 ++++---- .../snapshots/SnapshotShardsService.java | 4 ++-- 10 files changed, 35 insertions(+), 37 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index f9186185e61dc..d5dc64e3a5609 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -654,7 +654,7 @@ public void forceMerge(boolean flush) throws IOException { * * @param flushFirst indicates whether the engine should flush before returning the snapshot */ - public abstract IndexCommit snapshotIndex(boolean flushFirst) throws EngineException; + public abstract IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException; /** * fail engine due to some error. the engine will also be closed. diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1775fea67031f..eba6fa1080274 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -852,7 +852,7 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu } @Override - public IndexCommit snapshotIndex(final boolean flushFirst) throws EngineException { + public IndexCommit acquireIndexCommit(final boolean flushFirst) throws EngineException { // we have to flush outside of the readlock otherwise we might have a problem upgrading // the to a write lock when we fail the engine in this operation if (flushFirst) { diff --git a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index 83f9d466f0e36..2d5a134493abb 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -205,7 +205,7 @@ public void refresh(String source) throws EngineException { } @Override - public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException { + public IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException { throw new UnsupportedOperationException("Can not take snapshot from a shadow engine"); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index a97aad7abfa3b..3a65b52615a87 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -789,15 +789,15 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() { /** * Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this - * commit won't be freed until the commit / snapshot is released via {@link #releaseSnapshot(IndexCommit)}. + * commit won't be freed until the commit / snapshot is released via {@link #releaseIndexCommit(IndexCommit)}. * * @param flushFirst true if the index should first be flushed to disk / a low level lucene commit should be executed */ - public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException { + public IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException { IndexShardState state = this.state; // one time volatile read // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) { - return getEngine().snapshotIndex(flushFirst); + return getEngine().acquireIndexCommit(flushFirst); } else { throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed"); } @@ -805,13 +805,27 @@ public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException { /** - * Releases a snapshot taken from {@link #snapshotIndex(boolean)} this must be called to release the resources + * Releases a snapshot taken from {@link #acquireIndexCommit(boolean)} this must be called to release the resources * referenced by the given snapshot {@link IndexCommit}. */ - public void releaseSnapshot(IndexCommit snapshot) throws IOException { + public void releaseIndexCommit(IndexCommit snapshot) throws IOException { deletionPolicy.release(snapshot); } + public Store.MetadataSnapshot snapshotStore() throws IOException { + IndexCommit indexCommit = null; + store.incRef(); + try { + indexCommit = deletionPolicy.snapshot(); + return store.getMetadata(indexCommit); + } finally { + store.decRef(); + if (indexCommit != null) { + deletionPolicy.release(indexCommit); + } + } + } + /** * Fails the shard and marks the shard store as corrupted if * e is caused by index corruption diff --git a/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java index 25c59caef993e..0d53163f15eca 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java @@ -23,7 +23,6 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.Lock; import org.apache.lucene.store.NoLockFactory; @@ -31,7 +30,6 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.index.Index; import org.elasticsearch.index.store.Store; -import org.elasticsearch.indices.recovery.RecoveryState; import java.io.Closeable; import java.io.IOException; @@ -52,7 +50,7 @@ public LocalShardSnapshot(IndexShard shard) { store.incRef(); boolean success = false; try { - indexCommit = shard.snapshotIndex(true); + indexCommit = shard.acquireIndexCommit(true); success = true; } finally { if (success == false) { @@ -120,7 +118,7 @@ public void close() throws IOException { public void close() throws IOException { if (closed.compareAndSet(false, true)) { try { - shard.releaseSnapshot(indexCommit); + shard.releaseIndexCommit(indexCommit); } finally { store.decRef(); } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 7d201a3ca7855..b226af7858e42 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -127,7 +127,7 @@ public RecoveryResponse recoverToTarget() throws IOException { logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration()); final IndexCommit phase1Snapshot; try { - phase1Snapshot = shard.snapshotIndex(false); + phase1Snapshot = shard.acquireIndexCommit(false); } catch (Exception e) { IOUtils.closeWhileHandlingException(translogView); throw new RecoveryEngineException(shard.shardId(), 1, "Snapshot failed", e); @@ -139,7 +139,7 @@ public RecoveryResponse recoverToTarget() throws IOException { throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e); } finally { try { - shard.releaseSnapshot(phase1Snapshot); + shard.releaseIndexCommit(phase1Snapshot); } catch (IOException ex) { logger.warn("releasing snapshot caused exception", ex); } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java index a23b8060b1741..2638963af1763 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java @@ -178,6 +178,7 @@ private void doRecovery(final RecoveryTarget recoveryTarget) { new RecoveryFailedException(recoveryTarget.state(), "failed to list local files", e), true); return; } + logger.trace("{} local file snapshot:", recoveryTarget, metadataSnapshot); final StartRecoveryRequest request = new StartRecoveryRequest(recoveryTarget.shardId(), recoveryTarget.sourceNode(), clusterService.localNode(), metadataSnapshot, recoveryTarget.state().getType(), recoveryTarget.recoveryId()); diff --git a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index 99591a01ba908..035d27096b98e 100644 --- a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -19,7 +19,6 @@ package org.elasticsearch.indices.store; -import org.apache.lucene.index.IndexCommit; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; @@ -123,21 +122,9 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException IndexService indexService = indicesService.indexService(shardId.getIndex()); if (indexService != null) { IndexShard indexShard = indexService.getShardOrNull(shardId.id()); - if (indexShard != null) { - final Store store = indexShard.store(); - IndexCommit snapshot = null; - store.incRef(); - try { - exists = true; - snapshot = indexShard.snapshotIndex(false); - return new StoreFilesMetaData(shardId, store.getMetadata(snapshot)); - } finally { - if (snapshot != null) { - indexShard.releaseSnapshot(snapshot); - } - store.decRef(); - } + exists = true; + return new StoreFilesMetaData(shardId, indexShard.snapshotStore()); } } // try and see if we an list unallocated diff --git a/core/src/main/java/org/elasticsearch/repositories/Repository.java b/core/src/main/java/org/elasticsearch/repositories/Repository.java index 11a060d73e817..0b1236cc62dc1 100644 --- a/core/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/core/src/main/java/org/elasticsearch/repositories/Repository.java @@ -23,14 +23,12 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.env.Environment; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.indices.recovery.RecoveryState; -import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.common.component.LifecycleComponent; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotShardFailure; @@ -173,7 +171,7 @@ interface Factory { /** * Creates a snapshot of the shard based on the index commit point. *

- * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#snapshotIndex} method. + * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireIndexCommit} method. * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller. *

* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 7741ef1c0e6de..a5e7c37f3aa04 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -338,7 +338,7 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina try { // we flush first to make sure we get the latest writes snapshotted - IndexCommit snapshotIndexCommit = indexShard.snapshotIndex(true); + IndexCommit snapshotIndexCommit = indexShard.acquireIndexCommit(true); try { repository.snapshotShard(indexShard, snapshot.getSnapshotId(), snapshotIndexCommit, snapshotStatus); if (logger.isDebugEnabled()) { @@ -348,7 +348,7 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina TimeValue.timeValueMillis(snapshotStatus.time()), sb); } } finally { - indexShard.releaseSnapshot(snapshotIndexCommit); + indexShard.releaseIndexCommit(snapshotIndexCommit); } } catch (SnapshotFailedEngineException e) { throw e; From c1981e9d4d55b804b5ea6ad0661066f24436c289 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 28 Jul 2016 13:44:47 +0200 Subject: [PATCH 05/15] tests ands some fixing --- .../elasticsearch/index/shard/IndexShard.java | 9 +++ .../index/shard/IndexShardTests.java | 73 +++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 3a65b52615a87..86ff9203daa07 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -813,6 +813,15 @@ public void releaseIndexCommit(IndexCommit snapshot) throws IOException { } public Store.MetadataSnapshot snapshotStore() throws IOException { + synchronized (mutex) { + // if the engine is not running, we can access the store directly, but we need to make sure no one starts + // the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized. + // That can be done out of mutex, since the engine can be closed half way. + Engine engine = getEngineOrNull(); + if (engine == null) { + return store.getMetadata(null); + } + } IndexCommit indexCommit = null; store.incRef(); try { diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 95a705f8e27c0..9cf628060c339 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -23,6 +23,7 @@ import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TermQuery; @@ -152,6 +153,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; /** * Simple unit-test IndexShard related operations. @@ -472,6 +474,77 @@ public static void write(ShardStateMetaData shardStateMetaData, ShardStateMetaData.FORMAT.write(shardStateMetaData, shardPaths); } + public void testAcquireIndexCommit() throws IOException { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService(resolveIndex("test")); + final IndexShard shard = test.getShardOrNull(0); + int numDocs = randomInt(20); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test", "type", "id_" + i).setSource("{}").get(); + } + final boolean flushFirst = randomBoolean(); + IndexCommit commit = shard.acquireIndexCommit(flushFirst); + int moreDocs = randomInt(20); + for (int i = 0; i < moreDocs; i++) { + client().prepareIndex("test", "type", "id_" + numDocs + i).setSource("{}").get(); + } + shard.flush(new FlushRequest("index")); + // check that we can still read the commit that we captured + try (IndexReader reader = DirectoryReader.open(commit)) { + assertThat(reader.numDocs(), equalTo(flushFirst ? numDocs : 0)); + } + shard.releaseIndexCommit(commit); + shard.flush(new FlushRequest("index").force(true)); + // check it's clean up + assertThat(DirectoryReader.listCommits(shard.store().directory()), hasSize(1)); + } + + /*** + * test one can snapshot the store at various lifecycle stages + * @throws IOException + */ + public void testSnapshotStore() throws IOException { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService(resolveIndex("test")); + final IndexShard shard = test.getShardOrNull(0); + client().prepareIndex("test", "test", "0").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); + client().admin().indices().prepareFlush().get(); + ShardRouting routing = shard.routingEntry(); + test.removeShard(0, "b/c simon says so"); + routing = ShardRoutingHelper.reinit(routing); + IndexShard newShard = test.createShard(routing); + newShard.updateRoutingEntry(routing); + DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); + + Store.MetadataSnapshot snapshot = newShard.snapshotStore(); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + + newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, + localNode)); + + snapshot = newShard.snapshotStore(); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + + assertTrue(newShard.recoverFromStore()); + + snapshot = newShard.snapshotStore(); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + + newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted()); + + snapshot = newShard.snapshotStore(); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + + newShard.close("test", false); + + snapshot = newShard.snapshotStore(); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + } + public void testDurableFlagHasEffect() { createIndex("test"); ensureGreen(); From 609bb3b9fac2e7cb8cb8100a6d37ce9a434af6aa Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 28 Jul 2016 13:49:55 +0200 Subject: [PATCH 06/15] fix shardow recovery --- .../indices/recovery/RecoveryTargetService.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java index 2638963af1763..9faf28bc9a612 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java @@ -167,7 +167,13 @@ private void doRecovery(final RecoveryTarget recoveryTarget) { logger.trace("collecting local files for {}", recoveryTarget); Store.MetadataSnapshot metadataSnapshot = null; try { - metadataSnapshot = recoveryTarget.store().getMetadataOrEmpty(); + if (recoveryTarget.indexShard().indexSettings().isOnSharedFilesystem()) { + // we are not going to copy any files, so don't bother listing files, potentially running + // into concurrency issues with the primary changing files underneath us. + metadataSnapshot = Store.MetadataSnapshot.EMPTY; + } else { + metadataSnapshot = recoveryTarget.store().getMetadataOrEmpty(); + } } catch (IOException e) { logger.warn("error while listing local files, recover as if there are none", e); metadataSnapshot = Store.MetadataSnapshot.EMPTY; @@ -178,7 +184,7 @@ private void doRecovery(final RecoveryTarget recoveryTarget) { new RecoveryFailedException(recoveryTarget.state(), "failed to list local files", e), true); return; } - logger.trace("{} local file snapshot:", recoveryTarget, metadataSnapshot); + logger.trace("{} local file count: [{}]", recoveryTarget, metadataSnapshot.size()); final StartRecoveryRequest request = new StartRecoveryRequest(recoveryTarget.shardId(), recoveryTarget.sourceNode(), clusterService.localNode(), metadataSnapshot, recoveryTarget.state().getType(), recoveryTarget.recoveryId()); From 6c694f59780deed20f5a9ad13e476fc09aaac920 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 28 Jul 2016 13:58:17 +0200 Subject: [PATCH 07/15] move recovery target service to new indexShard.snapshotStore --- .../java/org/elasticsearch/index/shard/IndexShard.java | 7 +++++++ .../indices/recovery/RecoveryTargetService.java | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 86ff9203daa07..2f2fd969eccf9 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -812,6 +812,13 @@ public void releaseIndexCommit(IndexCommit snapshot) throws IOException { deletionPolicy.release(snapshot); } + /** + * gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard, + * without having to worry about the current state of the engine and concurrent flushes + * + * @return + * @throws IOException + */ public Store.MetadataSnapshot snapshotStore() throws IOException { synchronized (mutex) { // if the engine is not running, we can access the store directly, but we need to make sure no one starts diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java index 9faf28bc9a612..0650f553592a9 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java @@ -172,7 +172,7 @@ private void doRecovery(final RecoveryTarget recoveryTarget) { // into concurrency issues with the primary changing files underneath us. metadataSnapshot = Store.MetadataSnapshot.EMPTY; } else { - metadataSnapshot = recoveryTarget.store().getMetadataOrEmpty(); + metadataSnapshot = recoveryTarget.indexShard().snapshotStore(); } } catch (IOException e) { logger.warn("error while listing local files, recover as if there are none", e); From b5f75b40f68cd2b80b1133a26c7d7ed2b36f7a73 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 28 Jul 2016 13:59:50 +0200 Subject: [PATCH 08/15] sigh --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 2f2fd969eccf9..21adfac4dc3f3 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -815,9 +815,6 @@ public void releaseIndexCommit(IndexCommit snapshot) throws IOException { /** * gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard, * without having to worry about the current state of the engine and concurrent flushes - * - * @return - * @throws IOException */ public Store.MetadataSnapshot snapshotStore() throws IOException { synchronized (mutex) { From 852b5accd5ba891ebda4e0ffc3cf95d620d838d7 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 28 Jul 2016 14:03:07 +0200 Subject: [PATCH 09/15] sigh2 --- .../test/java/org/elasticsearch/index/shard/IndexShardTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 9cf628060c339..b32c61cd5d873 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -503,7 +503,6 @@ public void testAcquireIndexCommit() throws IOException { /*** * test one can snapshot the store at various lifecycle stages - * @throws IOException */ public void testSnapshotStore() throws IOException { createIndex("test"); From 7bb60859eaa9a386e9a4053310d59ab718a8c034 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 29 Jul 2016 08:47:36 +0200 Subject: [PATCH 10/15] reduce unsafe methods in Store --- .../elasticsearch/index/shard/IndexShard.java | 26 +++++++- .../org/elasticsearch/index/store/Store.java | 54 +++------------- .../blobstore/BlobStoreRepository.java | 61 ++++++++++--------- .../ESIndexLevelReplicationTestCase.java | 19 ++++-- .../elasticsearch/index/store/StoreTests.java | 30 ++++----- .../recovery/RecoverySourceHandlerTests.java | 8 +-- 6 files changed, 92 insertions(+), 106 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 21adfac4dc3f3..cb2a03b2c70cc 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -21,7 +21,11 @@ import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.index.CheckIndex; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; @@ -29,6 +33,7 @@ import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.UsageTrackingQueryCachingPolicy; import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Lock; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.ThreadInterruptedException; import org.elasticsearch.ElasticsearchException; @@ -116,10 +121,12 @@ import org.elasticsearch.search.suggest.completion2x.Completion090PostingsFormat; import org.elasticsearch.threadpool.ThreadPool; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; import java.nio.channels.ClosedByInterruptException; import java.nio.charset.StandardCharsets; +import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; @@ -814,7 +821,15 @@ public void releaseIndexCommit(IndexCommit snapshot) throws IOException { /** * gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard, - * without having to worry about the current state of the engine and concurrent flushes + * without having to worry about the current state of the engine and concurrent flushes. + * + * @throws org.apache.lucene.index.IndexNotFoundException if no index is found in the current directory + * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an + * unexpected exception when opening the index reading the segments file. + * @throws IndexFormatTooOldException if the lucene index is too old to be opened. + * @throws IndexFormatTooNewException if the lucene index is too new to be opened. + * @throws FileNotFoundException if one or more files referenced by a commit are not present. + * @throws NoSuchFileException if one or more files referenced by a commit are not present. */ public Store.MetadataSnapshot snapshotStore() throws IOException { synchronized (mutex) { @@ -823,7 +838,12 @@ public Store.MetadataSnapshot snapshotStore() throws IOException { // That can be done out of mutex, since the engine can be closed half way. Engine engine = getEngineOrNull(); if (engine == null) { - return store.getMetadata(null); + store.incRef(); + try (Lock ignored = store.directory().obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + return store.getMetadata(null); + } finally { + store.decRef(); + } } } IndexCommit indexCommit = null; @@ -1337,7 +1357,7 @@ private void doCheckIndex() throws IOException { if ("checksum".equals(checkIndexOnStartup)) { // physical verification only: verify all checksums for the latest commit IOException corrupt = null; - MetadataSnapshot metadata = store.getMetadata(); + MetadataSnapshot metadata = snapshotStore(); for (Map.Entry entry : metadata.asMap().entrySet()) { try { Store.checkIntegrity(entry.getValue(), store.directory()); diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index 98b7e388b587e..9460167dc9eb1 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -39,7 +39,6 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.Lock; -import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.SimpleFSDirectory; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.BytesRef; @@ -74,6 +73,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import java.io.Closeable; @@ -209,56 +209,16 @@ final void ensureOpen() { } } - /** - * Returns a new MetadataSnapshot for the latest commit in this store or - * an empty snapshot if no index exists or can not be opened. - * - * Note: the method is a simple wrapper around {@link #getMetadata()}, which tries to acquire the - * {@link IndexWriter#WRITE_LOCK_NAME} for the underlying directory not ensure no concurrent file changes are happening. - * - * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an - * unexpected exception when opening the index reading the segments file. - * @throws IndexFormatTooOldException if the lucene index is too old to be opened. - * @throws IndexFormatTooNewException if the lucene index is too new to be opened. - */ - public MetadataSnapshot getMetadataOrEmpty() throws IOException { - try { - return getMetadata(); - } catch (IndexNotFoundException ex) { - // that's fine - happens all the time no need to log - } catch (FileNotFoundException | NoSuchFileException | LockObtainFailedException ex) { - logger.info("Failed to open / find files while reading metadata snapshot", ex); - } - return MetadataSnapshot.EMPTY; - } - - /** - * Returns a new MetadataSnapshot for the latest commit in this store. - * - * * Note: the method tries to acquire the - * {@link IndexWriter#WRITE_LOCK_NAME} for the underlying directory not ensure no concurrent file changes are happening. - * - * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an - * unexpected exception when opening the index reading the segments file. - * @throws IndexFormatTooOldException if the lucene index is too old to be opened. - * @throws IndexFormatTooNewException if the lucene index is too new to be opened. - * @throws FileNotFoundException if one or more files referenced by a commit are not present. - * @throws NoSuchFileException if one or more files referenced by a commit are not present. - * @throws IndexNotFoundException if no index / valid commit-point can be found in this store - */ - public MetadataSnapshot getMetadata() throws IOException { - try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - return getMetadata(null); - } - } - /** * Returns a new MetadataSnapshot for the given commit. If the given commit is null * the latest commit point is used. * - * Note that unlike {@link #getMetadata()} and {@link #getMetadataOrEmpty()}, this method doesn't try - * to lock the directory write lock. The owner must verify it has the right to access the store and - * no concurrent file changes are happening. + * Note that this method requires the caller verify it has the right to access the store and + * no concurrent file changes are happening. If in doubt, you probably want to use one of the following: + * + * {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, ESLogger)} to read a meta data while locking + * {@link IndexShard#snapshotStore()} to safely read from an existing shard + * {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed * * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an * unexpected exception when opening the index reading the segments file. diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 06e2b8ff97a1a..05a0c1fd9dfd5 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -23,6 +23,7 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.index.IndexNotFoundException; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.SegmentInfos; @@ -40,25 +41,6 @@ import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Numbers; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.lucene.store.InputStreamIndexInput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.iterable.Iterables; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; -import org.elasticsearch.index.snapshots.IndexShardSnapshotException; -import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; -import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots; -import org.elasticsearch.index.snapshots.blobstore.RateLimitingInputStream; -import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; -import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; -import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.store.StoreFileMetaData; -import org.elasticsearch.indices.recovery.RecoveryState; -import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; @@ -68,26 +50,45 @@ import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.NotXContentException; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; +import org.elasticsearch.index.snapshots.IndexShardSnapshotException; +import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots; +import org.elasticsearch.index.snapshots.blobstore.RateLimitingInputStream; +import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; +import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.snapshots.SnapshotCreationException; import org.elasticsearch.snapshots.SnapshotException; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotShardFailure; @@ -1387,7 +1388,7 @@ protected InputStream openSlice(long slice) throws IOException { */ private class RestoreContext extends Context { - private final Store store; + private final IndexShard targetShard; private final RecoveryState recoveryState; @@ -1402,13 +1403,14 @@ private class RestoreContext extends Context { public RestoreContext(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) { super(snapshotId, version, shard.shardId(), snapshotShardId); this.recoveryState = recoveryState; - store = shard.store(); + this.targetShard = shard; } /** * Performs restore operation */ public void restore() throws IOException { + final Store store = targetShard.store(); store.incRef(); try { logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, metadata.name(), shardId); @@ -1433,12 +1435,15 @@ public void restore() throws IOException { } SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); - final Store.MetadataSnapshot recoveryTargetMetadata; + Store.MetadataSnapshot recoveryTargetMetadata = null; try { - recoveryTargetMetadata = store.getMetadataOrEmpty(); - } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) { - logger.warn("{} Can't read metadata from store", e, shardId); - throw new IndexShardRestoreFailedException(shardId, "Can't restore corrupted shard", e); + recoveryTargetMetadata = targetShard.snapshotStore(); + } catch (IndexNotFoundException e) { + // happens when restore to an empty shard, not a big deal + logger.trace("[{}] [{}] restoring from to an empty shard", shardId, snapshotId); + } catch (IOException e) { + logger.warn("{} Can't read metadata from store, will not reuse any local file while restoring", e, shardId); + recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; } final List filesToRecover = new ArrayList<>(); @@ -1492,7 +1497,7 @@ public void restore() throws IOException { try { for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); - restoreFile(fileToRecover); + restoreFile(fileToRecover, store); } } catch (IOException ex) { throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex); @@ -1539,7 +1544,7 @@ public void restore() throws IOException { * * @param fileInfo file to be restored */ - private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) throws IOException { + private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final Store store) throws IOException { boolean success = false; try (InputStream partSliceStream = new PartSliceStream(blobContainer, fileInfo)) { diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 62a09584d8491..ab1f965b7a317 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.replication; import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexNotFoundException; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.store.AlreadyClosedException; @@ -29,8 +30,6 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; -import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest; -import org.elasticsearch.action.admin.indices.stats.IndexShardStats; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.TransportIndexAction; @@ -81,8 +80,6 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponse; -import org.junit.After; -import org.junit.Before; import java.io.IOException; import java.nio.file.Files; @@ -302,7 +299,7 @@ public void recoverReplica(IndexShard replica, BiFunction 0L, e -> () -> {}, (int) ByteSizeUnit.MB.toKB(1), logger); recovery.recoverToTarget(); @@ -310,6 +307,18 @@ public void recoverReplica(IndexShard replica, BiFunction { + assertEquals(shardId, theLock.getShardId()); + assertEquals(lock, theLock); + count.incrementAndGet(); }); assertEquals(count.get(), 0); @@ -917,11 +913,7 @@ public void testUserDataRead() throws IOException { writer.commit(); writer.close(); Store.MetadataSnapshot metadata; - if (randomBoolean()) { - metadata = store.getMetadata(); - } else { - metadata = store.getMetadata(deletionPolicy.snapshot()); - } + metadata = store.getMetadata(randomBoolean() ? null : deletionPolicy.snapshot()); assertFalse(metadata.asMap().isEmpty()); // do not check for correct files, we have enough tests for that above assertThat(metadata.getCommitUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncId)); @@ -982,7 +974,7 @@ public void testMarkCorruptedOnTruncatedSegmentsFile() throws IOException { try { if (randomBoolean()) { - store.getMetadata(); + store.getMetadata(null); } else { store.readLastCommittedSegmentsInfo(); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index b8706c2fef6c5..e0bb251f47539 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -99,7 +99,7 @@ public void testSendFiles() throws Throwable { writer.commit(); writer.close(); - Store.MetadataSnapshot metadata = store.getMetadata(); + Store.MetadataSnapshot metadata = store.getMetadata(null); List metas = new ArrayList<>(); for (StoreFileMetaData md : metadata) { metas.add(md); @@ -118,7 +118,7 @@ public void close() throws IOException { throw new RuntimeException(e); } }); - Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata(); + Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata(null); Store.RecoveryDiff recoveryDiff = targetStoreMetadata.recoveryDiff(metadata); assertEquals(metas.size(), recoveryDiff.identical.size()); assertEquals(0, recoveryDiff.different.size()); @@ -159,7 +159,7 @@ protected void failEngine(IOException cause) { writer.commit(); writer.close(); - Store.MetadataSnapshot metadata = store.getMetadata(); + Store.MetadataSnapshot metadata = store.getMetadata(null); List metas = new ArrayList<>(); for (StoreFileMetaData md : metadata) { metas.add(md); @@ -223,7 +223,7 @@ protected void failEngine(IOException cause) { writer.commit(); writer.close(); - Store.MetadataSnapshot metadata = store.getMetadata(); + Store.MetadataSnapshot metadata = store.getMetadata(null); List metas = new ArrayList<>(); for (StoreFileMetaData md : metadata) { metas.add(md); From a0556154a605ac7969f959595d3755d22bfa958d Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 29 Jul 2016 12:41:18 +0200 Subject: [PATCH 11/15] fix npe --- .../repositories/blobstore/BlobStoreRepository.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 05a0c1fd9dfd5..d4a3246b6e41e 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1435,12 +1435,13 @@ public void restore() throws IOException { } SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); - Store.MetadataSnapshot recoveryTargetMetadata = null; + Store.MetadataSnapshot recoveryTargetMetadata; try { recoveryTargetMetadata = targetShard.snapshotStore(); } catch (IndexNotFoundException e) { // happens when restore to an empty shard, not a big deal logger.trace("[{}] [{}] restoring from to an empty shard", shardId, snapshotId); + recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; } catch (IOException e) { logger.warn("{} Can't read metadata from store, will not reuse any local file while restoring", e, shardId); recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; From ff9df4c0f526cff10469d434963377d87ed899f8 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 29 Jul 2016 16:14:05 +0200 Subject: [PATCH 12/15] to @mikemccand with love --- .../java/org/elasticsearch/index/shard/IndexShard.java | 4 ++-- .../main/java/org/elasticsearch/index/store/Store.java | 2 +- .../indices/recovery/RecoveryTargetService.java | 2 +- .../store/TransportNodesListShardStoreMetaData.java | 2 +- .../repositories/blobstore/BlobStoreRepository.java | 2 +- .../replication/ESIndexLevelReplicationTestCase.java | 2 +- .../org/elasticsearch/index/shard/IndexShardTests.java | 10 +++++----- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index cb2a03b2c70cc..122ff9172ce7c 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -831,7 +831,7 @@ public void releaseIndexCommit(IndexCommit snapshot) throws IOException { * @throws FileNotFoundException if one or more files referenced by a commit are not present. * @throws NoSuchFileException if one or more files referenced by a commit are not present. */ - public Store.MetadataSnapshot snapshotStore() throws IOException { + public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { synchronized (mutex) { // if the engine is not running, we can access the store directly, but we need to make sure no one starts // the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized. @@ -1357,7 +1357,7 @@ private void doCheckIndex() throws IOException { if ("checksum".equals(checkIndexOnStartup)) { // physical verification only: verify all checksums for the latest commit IOException corrupt = null; - MetadataSnapshot metadata = snapshotStore(); + MetadataSnapshot metadata = snapshotStoreMetadata(); for (Map.Entry entry : metadata.asMap().entrySet()) { try { Store.checkIntegrity(entry.getValue(), store.directory()); diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index 9460167dc9eb1..659b230edab01 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -217,7 +217,7 @@ final void ensureOpen() { * no concurrent file changes are happening. If in doubt, you probably want to use one of the following: * * {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, ESLogger)} to read a meta data while locking - * {@link IndexShard#snapshotStore()} to safely read from an existing shard + * {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard * {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed * * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java index 0650f553592a9..1a13504fb1ad7 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java @@ -172,7 +172,7 @@ private void doRecovery(final RecoveryTarget recoveryTarget) { // into concurrency issues with the primary changing files underneath us. metadataSnapshot = Store.MetadataSnapshot.EMPTY; } else { - metadataSnapshot = recoveryTarget.indexShard().snapshotStore(); + metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); } } catch (IOException e) { logger.warn("error while listing local files, recover as if there are none", e); diff --git a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index 035d27096b98e..341b0e57858b0 100644 --- a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -124,7 +124,7 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException IndexShard indexShard = indexService.getShardOrNull(shardId.id()); if (indexShard != null) { exists = true; - return new StoreFilesMetaData(shardId, indexShard.snapshotStore()); + return new StoreFilesMetaData(shardId, indexShard.snapshotStoreMetadata()); } } // try and see if we an list unallocated diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index d4a3246b6e41e..9e970c341c60e 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1437,7 +1437,7 @@ public void restore() throws IOException { SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); Store.MetadataSnapshot recoveryTargetMetadata; try { - recoveryTargetMetadata = targetShard.snapshotStore(); + recoveryTargetMetadata = targetShard.snapshotStoreMetadata(); } catch (IndexNotFoundException e) { // happens when restore to an empty shard, not a big deal logger.trace("[{}] [{}] restoring from to an empty shard", shardId, snapshotId); diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index ab1f965b7a317..bbed3ea5cc1f3 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -309,7 +309,7 @@ public void recoverReplica(IndexShard replica, BiFunction Date: Tue, 2 Aug 2016 22:49:45 +0200 Subject: [PATCH 13/15] feedback --- .../elasticsearch/index/shard/IndexShard.java | 33 ++++++++++--------- .../gateway/RecoveryFromGatewayIT.java | 2 -- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 122ff9172ce7c..e88360aa5eae4 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -832,31 +832,32 @@ public void releaseIndexCommit(IndexCommit snapshot) throws IOException { * @throws NoSuchFileException if one or more files referenced by a commit are not present. */ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { - synchronized (mutex) { - // if the engine is not running, we can access the store directly, but we need to make sure no one starts - // the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized. - // That can be done out of mutex, since the engine can be closed half way. - Engine engine = getEngineOrNull(); - if (engine == null) { - store.incRef(); - try (Lock ignored = store.directory().obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - return store.getMetadata(null); - } finally { - store.decRef(); - } - } - } IndexCommit indexCommit = null; + Store.MetadataSnapshot result = null; store.incRef(); try { - indexCommit = deletionPolicy.snapshot(); - return store.getMetadata(indexCommit); + synchronized (mutex) { + // if the engine is not running, we can access the store directly, but we need to make sure no one starts + // the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized. + // That can be done out of mutex, since the engine can be closed half way. + Engine engine = getEngineOrNull(); + if (engine == null) { + try (Lock ignored = store.directory().obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + result = store.getMetadata(null); + } + } + } + if (result == null) { + indexCommit = deletionPolicy.snapshot(); + result = store.getMetadata(indexCommit); + } } finally { store.decRef(); if (indexCommit != null) { deletionPolicy.release(indexCommit); } } + return result; } /** diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 65b5c70d91399..59f01f56ce13e 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -45,7 +45,6 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.InternalTestCluster.RestartCallback; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.store.MockFSDirectoryService; import org.elasticsearch.test.store.MockFSIndexStore; @@ -388,7 +387,6 @@ public void doAfterNodes(int numNodes, Client client) throws Exception { assertThat(state.metaData().index("test").getAliases().get("test_alias").filter(), notNullValue()); } - @TestLogging("indices.recovery:TRACE") public void testReusePeerRecovery() throws Exception { final Settings settings = Settings.builder() .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) From a4b275de6593059fee14c63a8839f7c147f7a11e Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 2 Aug 2016 23:03:56 +0200 Subject: [PATCH 14/15] rewrite the right thing --- .../java/org/elasticsearch/index/shard/IndexShard.java | 10 +++------- .../replication/ESIndexLevelReplicationTestCase.java | 8 +++++--- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index e88360aa5eae4..1dff3ad8b9b13 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -833,7 +833,6 @@ public void releaseIndexCommit(IndexCommit snapshot) throws IOException { */ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { IndexCommit indexCommit = null; - Store.MetadataSnapshot result = null; store.incRef(); try { synchronized (mutex) { @@ -843,21 +842,18 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { Engine engine = getEngineOrNull(); if (engine == null) { try (Lock ignored = store.directory().obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - result = store.getMetadata(null); + return store.getMetadata(null); } } } - if (result == null) { - indexCommit = deletionPolicy.snapshot(); - result = store.getMetadata(indexCommit); - } + indexCommit = deletionPolicy.snapshot(); + return store.getMetadata(indexCommit); } finally { store.decRef(); if (indexCommit != null) { deletionPolicy.release(indexCommit); } } - return result; } /** diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 22f6b13b0d5bb..b52c8fe9bdb8b 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -309,15 +309,17 @@ public void recoverReplica(IndexShard replica, BiFunction Date: Tue, 2 Aug 2016 23:05:31 +0200 Subject: [PATCH 15/15] add UOE to ShadowIndexshards --- .../java/org/elasticsearch/index/shard/ShadowIndexShard.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index e35c95ae1f0f6..45a471e1aa9fb 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -109,4 +109,9 @@ public TranslogStats translogStats() { public void addRefreshListener(Translog.Location location, Consumer listener) { throw new UnsupportedOperationException("Can't listen for a refresh on a shadow engine because it doesn't have a translog"); } + + @Override + public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { + throw new UnsupportedOperationException("can't snapshot the directory as the primary may change it underneath us"); + } }