Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -862,13 +862,17 @@ public void forceMerge(boolean flush) throws IOException {
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException, IOException;

/**
* Snapshots the index and returns a handle to it. If needed will try and "commit" the
* Snapshots the most recent index and returns a handle to it. If needed will try and "commit" the
* lucene index to make sure we have a "fresh" copy of the files to snapshot.
*
* @param safeCommit indicates whether the engine should acquire the most recent safe commit, or the most recent commit.
* @param flushFirst indicates whether the engine should flush before returning the snapshot
*/
public abstract IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException;
public abstract IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException;

/**
* Snapshots the most recent safe index commit from the engine.
*/
public abstract IndexCommitRef acquireSafeIndexCommit() throws EngineException;

/**
* fail engine due to some error. the engine will also be closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1687,16 +1687,22 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
}

@Override
public IndexCommitRef acquireIndexCommit(final boolean safeCommit, final boolean flushFirst) throws EngineException {
public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws EngineException {
// we have to flush outside of the readlock otherwise we might have a problem upgrading
// the to a write lock when we fail the engine in this operation
if (flushFirst) {
logger.trace("start flush for snapshot");
flush(false, true);
logger.trace("finish flush for snapshot");
}
final IndexCommit snapshotCommit = combinedDeletionPolicy.acquireIndexCommit(safeCommit);
return new Engine.IndexCommitRef(snapshotCommit, () -> combinedDeletionPolicy.releaseCommit(snapshotCommit));
final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false);
return new Engine.IndexCommitRef(lastCommit, () -> combinedDeletionPolicy.releaseCommit(lastCommit));
}

@Override
public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true);
return new Engine.IndexCommitRef(safeCommit, () -> combinedDeletionPolicy.releaseCommit(safeCommit));
}

private boolean failOnTragicEvent(AlreadyClosedException ex) {
Expand Down
24 changes: 18 additions & 6 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1083,22 +1083,34 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() {
}

/**
* Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this
* Creates a new {@link IndexCommit} snapshot from the currently running engine. All resources referenced by this
* commit won't be freed until the commit / snapshot is closed.
*
* @param safeCommit <code>true</code> capture the most recent safe commit point; otherwise the most recent commit point.
* @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed
*/
public Engine.IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException {
IndexShardState state = this.state; // one time volatile read
public Engine.IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException {
final IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
return getEngine().acquireIndexCommit(safeCommit, flushFirst);
return getEngine().acquireLastIndexCommit(flushFirst);
} else {
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
}
}

/**
* Snapshots the most recent safe index commit from the currently running engine.
* All index files referenced by this index commit won't be freed until the commit/snapshot is closed.
*/
public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException {
final IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
return getEngine().acquireSafeIndexCommit();
} else {
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
}
}

/**
* gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard,
Expand Down Expand Up @@ -1127,7 +1139,7 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
return store.getMetadata(null, true);
}
}
indexCommit = engine.acquireIndexCommit(false, false);
indexCommit = engine.acquireLastIndexCommit(false);
return store.getMetadata(indexCommit.getIndexCommit());
} finally {
store.decRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ final class LocalShardSnapshot implements Closeable {
store.incRef();
boolean success = false;
try {
indexCommit = shard.acquireIndexCommit(false, true);
indexCommit = shard.acquireLastIndexCommit(true);
success = true;
} finally {
if (success == false) {
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ final void ensureOpen() {
*
* {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
* {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
* {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
* {@link IndexShard#acquireLastIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
* @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
* directory
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
Expand All @@ -262,7 +262,7 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
*
* {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
* {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
* {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
* {@link IndexShard#acquireLastIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
*
* @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
* directory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
} else {
final Engine.IndexCommitRef phase1Snapshot;
try {
phase1Snapshot = shard.acquireIndexCommit(true, false);
phase1Snapshot = shard.acquireSafeIndexCommit();
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long
/**
* Creates a snapshot of the shard based on the index commit point.
* <p>
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireIndexCommit} method.
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireLastIndexCommit} method.
* Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina
final Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository());
try {
// we flush first to make sure we get the latest writes snapshotted
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(false, true)) {
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireLastIndexCommit(true)) {
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2118,7 +2118,7 @@ public void testConcurrentWritesAndCommits() throws Exception {
boolean doneIndexing;
do {
doneIndexing = doneLatch.await(sleepTime, TimeUnit.MILLISECONDS);
commits.add(engine.acquireIndexCommit(false, true));
commits.add(engine.acquireLastIndexCommit(true));
if (commits.size() > commitLimit) { // don't keep on piling up too many commits
IOUtils.close(commits.remove(randomIntBetween(0, commits.size()-1)));
// we increase the wait time to make sure we eventually if things are slow wait for threads to finish.
Expand Down Expand Up @@ -4343,19 +4343,24 @@ public void testAcquireIndexCommit() throws Exception {
}
final boolean flushFirst = randomBoolean();
final boolean safeCommit = randomBoolean();
Engine.IndexCommitRef commit = engine.acquireIndexCommit(safeCommit, flushFirst);
final Engine.IndexCommitRef snapshot;
if (safeCommit) {
snapshot = engine.acquireSafeIndexCommit();
} else {
snapshot = engine.acquireLastIndexCommit(flushFirst);
}
int moreDocs = between(1, 20);
for (int i = 0; i < moreDocs; i++) {
index(engine, numDocs + i);
}
globalCheckpoint.set(numDocs + moreDocs - 1);
engine.flush();
// check that we can still read the commit that we captured
try (IndexReader reader = DirectoryReader.open(commit.getIndexCommit())) {
try (IndexReader reader = DirectoryReader.open(snapshot.getIndexCommit())) {
assertThat(reader.numDocs(), equalTo(flushFirst && (safeCommit == false || inSync) ? numDocs : 0));
}
assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(2));
commit.close();
snapshot.close();
// check it's clean up
engine.flush(true, true);
assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
when(shard.segmentStats(anyBoolean())).thenReturn(mock(SegmentsStats.class));
when(shard.state()).thenReturn(IndexShardState.RELOCATED);
when(shard.acquireIndexCommit(anyBoolean(), anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class));
when(shard.acquireSafeIndexCommit()).thenReturn(mock(Engine.IndexCommitRef.class));
doAnswer(invocation -> {
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> {});
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ protected void snapshotShard(final IndexShard shard,
final Snapshot snapshot,
final Repository repository) throws IOException {
final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing();
try (Engine.IndexCommitRef indexCommitRef = shard.acquireIndexCommit(false, true)) {
try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) {
Index index = shard.shardId().getIndex();
IndexId indexId = new IndexId(index.getName(), index.getUUID());

Expand Down