Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ public void forceMerge(boolean flush) throws IOException {
*
* @param flushFirst indicates whether the engine should flush before returning the snapshot
*/
public abstract IndexCommit snapshotIndex(boolean flushFirst) throws EngineException;
public abstract IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException;

/**
* fail engine due to some error. the engine will also be closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
}

@Override
public IndexCommit snapshotIndex(final boolean flushFirst) throws EngineException {
public IndexCommit acquireIndexCommit(final boolean flushFirst) throws EngineException {
// we have to flush outside of the readlock otherwise we might have a problem upgrading
// the to a write lock when we fail the engine in this operation
if (flushFirst) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void refresh(String source) throws EngineException {
}

@Override
public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException {
public IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException {
throw new UnsupportedOperationException("Can not take snapshot from a shadow engine");
}

Expand Down
56 changes: 50 additions & 6 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@

import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Lock;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
import org.elasticsearch.ElasticsearchException;
Expand Down Expand Up @@ -116,10 +121,12 @@
import org.elasticsearch.search.suggest.completion2x.Completion090PostingsFormat;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
Expand Down Expand Up @@ -789,29 +796,66 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() {

/**
* Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this
* commit won't be freed until the commit / snapshot is released via {@link #releaseSnapshot(IndexCommit)}.
* commit won't be freed until the commit / snapshot is released via {@link #releaseIndexCommit(IndexCommit)}.
*
* @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed
*/
public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException {
public IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException {
IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
return getEngine().snapshotIndex(flushFirst);
return getEngine().acquireIndexCommit(flushFirst);
} else {
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
}
}


/**
* Releases a snapshot taken from {@link #snapshotIndex(boolean)} this must be called to release the resources
* Releases a snapshot taken from {@link #acquireIndexCommit(boolean)} this must be called to release the resources
* referenced by the given snapshot {@link IndexCommit}.
*/
public void releaseSnapshot(IndexCommit snapshot) throws IOException {
public void releaseIndexCommit(IndexCommit snapshot) throws IOException {
deletionPolicy.release(snapshot);
}

/**
* gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard,
* without having to worry about the current state of the engine and concurrent flushes.
*
* @throws org.apache.lucene.index.IndexNotFoundException if no index is found in the current directory
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
* unexpected exception when opening the index reading the segments file.
* @throws IndexFormatTooOldException if the lucene index is too old to be opened.
* @throws IndexFormatTooNewException if the lucene index is too new to be opened.
* @throws FileNotFoundException if one or more files referenced by a commit are not present.
* @throws NoSuchFileException if one or more files referenced by a commit are not present.
*/
public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
IndexCommit indexCommit = null;
store.incRef();
try {
synchronized (mutex) {
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized.
// That can be done out of mutex, since the engine can be closed half way.
Engine engine = getEngineOrNull();
if (engine == null) {
try (Lock ignored = store.directory().obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
return store.getMetadata(null);
}
}
}
indexCommit = deletionPolicy.snapshot();
return store.getMetadata(indexCommit);
} finally {
store.decRef();
if (indexCommit != null) {
deletionPolicy.release(indexCommit);
}
}
}

/**
* Fails the shard and marks the shard store as corrupted if
* <code>e</code> is caused by index corruption
Expand Down Expand Up @@ -1310,7 +1354,7 @@ private void doCheckIndex() throws IOException {
if ("checksum".equals(checkIndexOnStartup)) {
// physical verification only: verify all checksums for the latest commit
IOException corrupt = null;
MetadataSnapshot metadata = store.getMetadata();
MetadataSnapshot metadata = snapshotStoreMetadata();
for (Map.Entry<String, StoreFileMetaData> entry : metadata.asMap().entrySet()) {
try {
Store.checkIntegrity(entry.getValue(), store.directory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.NoLockFactory;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -52,7 +50,7 @@ public LocalShardSnapshot(IndexShard shard) {
store.incRef();
boolean success = false;
try {
indexCommit = shard.snapshotIndex(true);
indexCommit = shard.acquireIndexCommit(true);
success = true;
} finally {
if (success == false) {
Expand Down Expand Up @@ -120,7 +118,7 @@ public void close() throws IOException {
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try {
shard.releaseSnapshot(indexCommit);
shard.releaseIndexCommit(indexCommit);
} finally {
store.decRef();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,9 @@ public TranslogStats translogStats() {
public void addRefreshListener(Translog.Location location, Consumer<Boolean> listener) {
throw new UnsupportedOperationException("Can't listen for a refresh on a shadow engine because it doesn't have a translog");
}

@Override
public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
throw new UnsupportedOperationException("can't snapshot the directory as the primary may change it underneath us");
}
}
45 changes: 9 additions & 36 deletions core/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;

import java.io.Closeable;
Expand Down Expand Up @@ -208,45 +209,17 @@ final void ensureOpen() {
}
}

/**
* Returns a new MetadataSnapshot for the latest commit in this store or
* an empty snapshot if no index exists or can not be opened.
*
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
* unexpected exception when opening the index reading the segments file.
* @throws IndexFormatTooOldException if the lucene index is too old to be opened.
* @throws IndexFormatTooNewException if the lucene index is too new to be opened.
*/
public MetadataSnapshot getMetadataOrEmpty() throws IOException {
try {
return getMetadata(null);
} catch (IndexNotFoundException ex) {
// that's fine - happens all the time no need to log
} catch (FileNotFoundException | NoSuchFileException ex) {
logger.info("Failed to open / find files while reading metadata snapshot");
}
return MetadataSnapshot.EMPTY;
}

/**
* Returns a new MetadataSnapshot for the latest commit in this store.
*
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
* unexpected exception when opening the index reading the segments file.
* @throws IndexFormatTooOldException if the lucene index is too old to be opened.
* @throws IndexFormatTooNewException if the lucene index is too new to be opened.
* @throws FileNotFoundException if one or more files referenced by a commit are not present.
* @throws NoSuchFileException if one or more files referenced by a commit are not present.
* @throws IndexNotFoundException if no index / valid commit-point can be found in this store
*/
public MetadataSnapshot getMetadata() throws IOException {
return getMetadata(null);
}

/**
* Returns a new MetadataSnapshot for the given commit. If the given commit is <code>null</code>
* the latest commit point is used.
*
* Note that this method requires the caller verify it has the right to access the store and
* no concurrent file changes are happening. If in doubt, you probably want to use one of the following:
*
* {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, ESLogger)} to read a meta data while locking
* {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
* {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
*
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
* unexpected exception when opening the index reading the segments file.
* @throws IndexFormatTooOldException if the lucene index is too old to be opened.
Expand Down Expand Up @@ -634,7 +607,7 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetaData) thr
// ignore, we don't really care, will get deleted later on
}
}
final Store.MetadataSnapshot metadataOrEmpty = getMetadata();
final Store.MetadataSnapshot metadataOrEmpty = getMetadata(null);
verifyAfterCleanup(sourceMetaData, metadataOrEmpty);
} finally {
metadataLock.writeLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration());
final IndexCommit phase1Snapshot;
try {
phase1Snapshot = shard.snapshotIndex(false);
phase1Snapshot = shard.acquireIndexCommit(false);
} catch (Exception e) {
IOUtils.closeWhileHandlingException(translogView);
throw new RecoveryEngineException(shard.shardId(), 1, "Snapshot failed", e);
Expand All @@ -139,7 +139,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
} finally {
try {
shard.releaseSnapshot(phase1Snapshot);
shard.releaseIndexCommit(phase1Snapshot);
} catch (IOException ex) {
logger.warn("releasing snapshot caused exception", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,13 @@ private void doRecovery(final RecoveryTarget recoveryTarget) {
logger.trace("collecting local files for {}", recoveryTarget);
Store.MetadataSnapshot metadataSnapshot = null;
try {
metadataSnapshot = recoveryTarget.store().getMetadataOrEmpty();
if (recoveryTarget.indexShard().indexSettings().isOnSharedFilesystem()) {
// we are not going to copy any files, so don't bother listing files, potentially running
// into concurrency issues with the primary changing files underneath us.
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
} else {
metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
}
} catch (IOException e) {
logger.warn("error while listing local files, recover as if there are none", e);
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
Expand All @@ -178,6 +184,7 @@ private void doRecovery(final RecoveryTarget recoveryTarget) {
new RecoveryFailedException(recoveryTarget.state(), "failed to list local files", e), true);
return;
}
logger.trace("{} local file count: [{}]", recoveryTarget, metadataSnapshot.size());
final StartRecoveryRequest request = new StartRecoveryRequest(recoveryTarget.shardId(), recoveryTarget.sourceNode(),
clusterService.localNode(),
metadataSnapshot, recoveryTarget.state().getType(), recoveryTarget.recoveryId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,8 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException
if (indexService != null) {
IndexShard indexShard = indexService.getShardOrNull(shardId.id());
if (indexShard != null) {
final Store store = indexShard.store();
store.incRef();
try {
exists = true;
return new StoreFilesMetaData(shardId, store.getMetadataOrEmpty());
} finally {
store.decRef();
}
exists = true;
return new StoreFilesMetaData(shardId, indexShard.snapshotStoreMetadata());
}
}
// try and see if we an list unallocated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;

Expand Down Expand Up @@ -174,7 +172,7 @@ interface Factory {
/**
* Creates a snapshot of the shard based on the index commit point.
* <p>
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#snapshotIndex} method.
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireIndexCommit} method.
* Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
Expand Down
Loading