true if the index should first be flushed to disk / a low level lucene commit should be executed
*/
- public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException {
+ public IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException {
IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
- return getEngine().snapshotIndex(flushFirst);
+ return getEngine().acquireIndexCommit(flushFirst);
} else {
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
}
@@ -805,13 +812,50 @@ public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException {
/**
- * Releases a snapshot taken from {@link #snapshotIndex(boolean)} this must be called to release the resources
+ * Releases a snapshot taken from {@link #acquireIndexCommit(boolean)} this must be called to release the resources
* referenced by the given snapshot {@link IndexCommit}.
*/
- public void releaseSnapshot(IndexCommit snapshot) throws IOException {
+ public void releaseIndexCommit(IndexCommit snapshot) throws IOException {
deletionPolicy.release(snapshot);
}
+ /**
+ * gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard,
+ * without having to worry about the current state of the engine and concurrent flushes.
+ *
+ * @throws org.apache.lucene.index.IndexNotFoundException if no index is found in the current directory
+ * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
+ * unexpected exception when opening the index reading the segments file.
+ * @throws IndexFormatTooOldException if the lucene index is too old to be opened.
+ * @throws IndexFormatTooNewException if the lucene index is too new to be opened.
+ * @throws FileNotFoundException if one or more files referenced by a commit are not present.
+ * @throws NoSuchFileException if one or more files referenced by a commit are not present.
+ */
+ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
+ IndexCommit indexCommit = null;
+ store.incRef();
+ try {
+ synchronized (mutex) {
+ // if the engine is not running, we can access the store directly, but we need to make sure no one starts
+ // the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized.
+ // That can be done out of mutex, since the engine can be closed half way.
+ Engine engine = getEngineOrNull();
+ if (engine == null) {
+ try (Lock ignored = store.directory().obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
+ return store.getMetadata(null);
+ }
+ }
+ }
+ indexCommit = deletionPolicy.snapshot();
+ return store.getMetadata(indexCommit);
+ } finally {
+ store.decRef();
+ if (indexCommit != null) {
+ deletionPolicy.release(indexCommit);
+ }
+ }
+ }
+
/**
* Fails the shard and marks the shard store as corrupted if
* e is caused by index corruption
@@ -1310,7 +1354,7 @@ private void doCheckIndex() throws IOException {
if ("checksum".equals(checkIndexOnStartup)) {
// physical verification only: verify all checksums for the latest commit
IOException corrupt = null;
- MetadataSnapshot metadata = store.getMetadata();
+ MetadataSnapshot metadata = snapshotStoreMetadata();
for (Map.Entrynull
* the latest commit point is used.
*
+ * Note that this method requires the caller verify it has the right to access the store and
+ * no concurrent file changes are happening. If in doubt, you probably want to use one of the following:
+ *
+ * {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, ESLogger)} to read a meta data while locking
+ * {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
+ * {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
+ *
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
* unexpected exception when opening the index reading the segments file.
* @throws IndexFormatTooOldException if the lucene index is too old to be opened.
@@ -634,7 +607,7 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetaData) thr
// ignore, we don't really care, will get deleted later on
}
}
- final Store.MetadataSnapshot metadataOrEmpty = getMetadata();
+ final Store.MetadataSnapshot metadataOrEmpty = getMetadata(null);
verifyAfterCleanup(sourceMetaData, metadataOrEmpty);
} finally {
metadataLock.writeLock().unlock();
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
index 7d201a3ca7855..b226af7858e42 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
@@ -127,7 +127,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration());
final IndexCommit phase1Snapshot;
try {
- phase1Snapshot = shard.snapshotIndex(false);
+ phase1Snapshot = shard.acquireIndexCommit(false);
} catch (Exception e) {
IOUtils.closeWhileHandlingException(translogView);
throw new RecoveryEngineException(shard.shardId(), 1, "Snapshot failed", e);
@@ -139,7 +139,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
} finally {
try {
- shard.releaseSnapshot(phase1Snapshot);
+ shard.releaseIndexCommit(phase1Snapshot);
} catch (IOException ex) {
logger.warn("releasing snapshot caused exception", ex);
}
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java
index a23b8060b1741..1a13504fb1ad7 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java
@@ -167,7 +167,13 @@ private void doRecovery(final RecoveryTarget recoveryTarget) {
logger.trace("collecting local files for {}", recoveryTarget);
Store.MetadataSnapshot metadataSnapshot = null;
try {
- metadataSnapshot = recoveryTarget.store().getMetadataOrEmpty();
+ if (recoveryTarget.indexShard().indexSettings().isOnSharedFilesystem()) {
+ // we are not going to copy any files, so don't bother listing files, potentially running
+ // into concurrency issues with the primary changing files underneath us.
+ metadataSnapshot = Store.MetadataSnapshot.EMPTY;
+ } else {
+ metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
+ }
} catch (IOException e) {
logger.warn("error while listing local files, recover as if there are none", e);
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
@@ -178,6 +184,7 @@ private void doRecovery(final RecoveryTarget recoveryTarget) {
new RecoveryFailedException(recoveryTarget.state(), "failed to list local files", e), true);
return;
}
+ logger.trace("{} local file count: [{}]", recoveryTarget, metadataSnapshot.size());
final StartRecoveryRequest request = new StartRecoveryRequest(recoveryTarget.shardId(), recoveryTarget.sourceNode(),
clusterService.localNode(),
metadataSnapshot, recoveryTarget.state().getType(), recoveryTarget.recoveryId());
diff --git a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java
index cdc95e1895ac0..341b0e57858b0 100644
--- a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java
+++ b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java
@@ -123,14 +123,8 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException
if (indexService != null) {
IndexShard indexShard = indexService.getShardOrNull(shardId.id());
if (indexShard != null) {
- final Store store = indexShard.store();
- store.incRef();
- try {
- exists = true;
- return new StoreFilesMetaData(shardId, store.getMetadataOrEmpty());
- } finally {
- store.decRef();
- }
+ exists = true;
+ return new StoreFilesMetaData(shardId, indexShard.snapshotStoreMetadata());
}
}
// try and see if we an list unallocated
diff --git a/core/src/main/java/org/elasticsearch/repositories/Repository.java b/core/src/main/java/org/elasticsearch/repositories/Repository.java
index 544f757737ca8..b1f534bf684cd 100644
--- a/core/src/main/java/org/elasticsearch/repositories/Repository.java
+++ b/core/src/main/java/org/elasticsearch/repositories/Repository.java
@@ -23,14 +23,12 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.env.Environment;
-import org.elasticsearch.index.shard.IndexShard;
-import org.elasticsearch.indices.recovery.RecoveryState;
-import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.common.component.LifecycleComponent;
+import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
+import org.elasticsearch.indices.recovery.RecoveryState;
+import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
@@ -174,7 +172,7 @@ interface Factory {
/**
* Creates a snapshot of the shard based on the index commit point.
* - * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#snapshotIndex} method. + * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireIndexCommit} method. * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller. *
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index efc3c2cfe2112..2bb92dd0c2353 100644
--- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -23,6 +23,7 @@
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
+import org.apache.lucene.index.IndexNotFoundException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
@@ -40,29 +41,6 @@
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Numbers;
-import org.elasticsearch.common.collect.Tuple;
-import org.elasticsearch.common.lucene.Lucene;
-import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.iterable.Iterables;
-import org.elasticsearch.common.util.set.Sets;
-import org.elasticsearch.common.xcontent.ToXContent;
-import org.elasticsearch.index.shard.IndexShard;
-import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
-import org.elasticsearch.index.snapshots.IndexShardSnapshotException;
-import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
-import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
-import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
-import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots;
-import org.elasticsearch.index.snapshots.blobstore.RateLimitingInputStream;
-import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream;
-import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
-import org.elasticsearch.index.store.Store;
-import org.elasticsearch.index.store.StoreFileMetaData;
-import org.elasticsearch.indices.recovery.RecoveryState;
-import org.elasticsearch.repositories.IndexId;
-import org.elasticsearch.repositories.RepositoryData;
-import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
@@ -72,26 +50,49 @@
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.lucene.Lucene;
+import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.metrics.CounterMetric;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.util.iterable.Iterables;
+import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
+import org.elasticsearch.index.snapshots.IndexShardSnapshotException;
+import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
+import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
+import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
+import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots;
+import org.elasticsearch.index.snapshots.blobstore.RateLimitingInputStream;
+import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream;
+import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
+import org.elasticsearch.index.store.Store;
+import org.elasticsearch.index.store.StoreFileMetaData;
+import org.elasticsearch.indices.recovery.RecoveryState;
+import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
+import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.SnapshotCreationException;
import org.elasticsearch.snapshots.SnapshotException;
+import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotMissingException;
import org.elasticsearch.snapshots.SnapshotShardFailure;
@@ -1444,7 +1445,7 @@ protected InputStream openSlice(long slice) throws IOException {
*/
private class RestoreContext extends Context {
- private final Store store;
+ private final IndexShard targetShard;
private final RecoveryState recoveryState;
@@ -1460,13 +1461,14 @@ private class RestoreContext extends Context {
public RestoreContext(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
super(snapshotId, version, indexId, shard.shardId(), snapshotShardId);
this.recoveryState = recoveryState;
- store = shard.store();
+ this.targetShard = shard;
}
/**
* Performs restore operation
*/
public void restore() throws IOException {
+ final Store store = targetShard.store();
store.incRef();
try {
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, metadata.name(), shardId);
@@ -1491,12 +1493,16 @@ public void restore() throws IOException {
}
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
- final Store.MetadataSnapshot recoveryTargetMetadata;
+ Store.MetadataSnapshot recoveryTargetMetadata;
try {
- recoveryTargetMetadata = store.getMetadataOrEmpty();
- } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
- logger.warn("{} Can't read metadata from store", e, shardId);
- throw new IndexShardRestoreFailedException(shardId, "Can't restore corrupted shard", e);
+ recoveryTargetMetadata = targetShard.snapshotStoreMetadata();
+ } catch (IndexNotFoundException e) {
+ // happens when restore to an empty shard, not a big deal
+ logger.trace("[{}] [{}] restoring from to an empty shard", shardId, snapshotId);
+ recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY;
+ } catch (IOException e) {
+ logger.warn("{} Can't read metadata from store, will not reuse any local file while restoring", e, shardId);
+ recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY;
}
final List