Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,12 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -391,8 +389,6 @@ public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardReq
Exception failure = operationResult.getFailure();
assert failure instanceof VersionConflictEngineException
|| failure instanceof MapperParsingException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

|| failure instanceof EngineClosedException
|| failure instanceof IndexShardClosedException
: "expected any one of [version conflict, mapper parsing, engine closed, index shard closed]" +
" failures. got " + failure;
if (!TransportActions.isShardNotAvailableException(failure)) {
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
Expand Down Expand Up @@ -675,7 +674,7 @@ private void maybeFSyncTranslogs() {
if (translog.syncNeeded()) {
translog.sync();
}
} catch (EngineClosedException | AlreadyClosedException ex) {
} catch (AlreadyClosedException ex) {
// fine - continue;
} catch (IOException e) {
logger.warn("failed to sync translog", e);
Expand Down Expand Up @@ -723,7 +722,7 @@ private void maybeUpdateGlobalCheckpoints() {
case STARTED:
try {
shard.updateGlobalCheckpointOnPrimary();
} catch (EngineClosedException | AlreadyClosedException ex) {
} catch (AlreadyClosedException ex) {
// fine - continue, the shard was concurrently closed on us.
}
continue;
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ public final Searcher acquireSearcher(String source) throws EngineException {
manager.release(searcher);
}
}
} catch (EngineClosedException ex) {
} catch (AlreadyClosedException ex) {
throw ex;
} catch (Exception ex) {
ensureOpen(); // throw EngineCloseException here if we are already closed
Expand All @@ -530,7 +530,7 @@ public final Searcher acquireSearcher(String source) throws EngineException {

protected void ensureOpen() {
if (isClosed.get()) {
throw new EngineClosedException(shardId, failedEngine.get());
throw new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
}
}

Expand Down Expand Up @@ -1017,6 +1017,7 @@ public static class Index extends Operation {
public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin,
long startTime, long autoGeneratedIdTimestamp, boolean isRetry) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert uid.bytes().equals(doc.uid()) : "term uid " + uid + " doesn't match doc uid " + doc.uid();
this.doc = doc;
this.isRetry = isRetry;
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
Expand Down Expand Up @@ -1282,7 +1283,7 @@ public void flushAndClose() throws IOException {
logger.debug("flushing shard on close - this might take some time to sync files to disk");
try {
flush(); // TODO we might force a flush in the future since we have the write lock already even though recoveries are running.
} catch (EngineClosedException ex) {
} catch (AlreadyClosedException ex) {
logger.debug("engine already closed - skipping flushAndClose");
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
*
*
*/
@Deprecated
public class EngineClosedException extends IndexShardClosedException {

public EngineClosedException(ShardId shardId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,10 @@ final Exception checkIfDocumentFailureOrThrow(final Operation operation, final E
// and set the error in operation.setFailure. In case of environment related errors, the failure
// is bubbled up
isDocumentFailure = maybeFailEngine(operation.operationType().getLowercase(), failure) == false;
if (failure instanceof AlreadyClosedException) {
// ensureOpen throws AlreadyClosedException which is not a document level issue
isDocumentFailure = false;
}
} catch (Exception inner) {
// we failed checking whether the failure can fail the engine, treat it as a persistent engine failure
isDocumentFailure = false;
Expand Down Expand Up @@ -901,8 +905,6 @@ public void refresh(String source) throws EngineException {
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
} catch (EngineClosedException e) {
throw e;
} catch (Exception e) {
try {
failEngine("refresh failed", e);
Expand Down Expand Up @@ -949,8 +951,6 @@ public void writeIndexingBuffer() throws EngineException {
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
} catch (EngineClosedException e) {
throw e;
} catch (Exception e) {
try {
failEngine("writeIndexingBuffer failed", e);
Expand Down Expand Up @@ -1129,7 +1129,7 @@ private void pruneDeletedTombstones() {

@Override
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, EngineClosedException, IOException {
final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, IOException {
/*
* We do NOT acquire the readlock here since we are waiting on the merges to finish
* that's fine since the IW.rollback should stop all the threads and trigger an IOException
Expand Down Expand Up @@ -1215,7 +1215,8 @@ public IndexCommit acquireIndexCommit(final boolean flushFirst) throws EngineExc
}

@SuppressWarnings("finally")
private void failOnTragicEvent(AlreadyClosedException ex) {
private boolean failOnTragicEvent(AlreadyClosedException ex) {
final boolean engineFailed;
// if we are already closed due to some tragic exception
// we need to fail the engine. it might have already been failed before
// but we are double-checking it's failed and closed
Expand All @@ -1228,14 +1229,19 @@ private void failOnTragicEvent(AlreadyClosedException ex) {
}
} else {
failEngine("already closed by tragic event on the index writer", (Exception) indexWriter.getTragicException());
engineFailed = true;
}
} else if (translog.isOpen() == false && translog.getTragicException() != null) {
failEngine("already closed by tragic event on the translog", translog.getTragicException());
} else if (failedEngine.get() == null) { // we are closed but the engine is not failed yet?
engineFailed = true;
} else if (failedEngine.get() == null && isClosed.get() == false) { // we are closed but the engine is not failed yet?
// this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by
// a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error
throw new AssertionError("Unexpected AlreadyClosedException", ex);
} else {
engineFailed = false;
}
return engineFailed;
}

@Override
Expand All @@ -1248,8 +1254,7 @@ protected boolean maybeFailEngine(String source, Exception e) {
// exception that should only be thrown in a tragic event. we pass on the checks to failOnTragicEvent which will
// throw and AssertionError if the tragic event condition is not met.
if (e instanceof AlreadyClosedException) {
failOnTragicEvent((AlreadyClosedException)e);
return true;
return failOnTragicEvent((AlreadyClosedException)e);
} else if (e != null &&
((indexWriter.isOpen() == false && indexWriter.getTragicException() == e)
|| (translog.isOpen() == false && translog.getTragicException() == e))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,6 @@ public void refresh(String source) throws EngineException {
ensureOpen();
searcherManager.maybeRefreshBlocking();
} catch (AlreadyClosedException e) {
// This means there's a bug somewhere: don't suppress it
throw new AssertionError(e);
} catch (EngineClosedException e) {
throw e;
} catch (Exception e) {
try {
Expand Down
20 changes: 9 additions & 11 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineFactory;
Expand Down Expand Up @@ -622,7 +621,7 @@ public Engine.GetResult get(Engine.Get get) {
}

/**
* Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}.
* Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link AlreadyClosedException}.
*/
public void refresh(String source) {
verifyNotClosed();
Expand Down Expand Up @@ -1265,7 +1264,7 @@ boolean shouldFlush() {
try {
Translog translog = engine.getTranslog();
return translog.sizeInBytes() > indexSettings.getFlushThresholdSize().getBytes();
} catch (AlreadyClosedException | EngineClosedException ex) {
} catch (AlreadyClosedException ex) {
// that's fine we are already close - no need to flush
}
}
Expand Down Expand Up @@ -1304,21 +1303,21 @@ public IndexEventListener getIndexEventListener() {
public void activateThrottling() {
try {
getEngine().activateThrottling();
} catch (EngineClosedException ex) {
} catch (AlreadyClosedException ex) {
// ignore
}
}

public void deactivateThrottling() {
try {
getEngine().deactivateThrottling();
} catch (EngineClosedException ex) {
} catch (AlreadyClosedException ex) {
// ignore
}
}

private void handleRefreshException(Exception e) {
if (e instanceof EngineClosedException) {
if (e instanceof AlreadyClosedException) {
// ignore
} else if (e instanceof RefreshFailedEngineException) {
RefreshFailedEngineException rfee = (RefreshFailedEngineException) e;
Expand Down Expand Up @@ -1530,7 +1529,7 @@ private void doCheckIndex() throws IOException {
Engine getEngine() {
Engine engine = getEngineOrNull();
if (engine == null) {
throw new EngineClosedException(shardId);
throw new AlreadyClosedException("engine is closed");
}
return engine;
}
Expand Down Expand Up @@ -1667,7 +1666,7 @@ public void onFailedEngine(String reason, @Nullable Exception failure) {
private Engine createNewEngine(EngineConfig config) {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new EngineClosedException(shardId);
throw new AlreadyClosedException(shardId + " can't create engine - shard is closed");
}
assert this.currentEngineReference.get() == null;
Engine engine = newEngine(config);
Expand Down Expand Up @@ -1769,7 +1768,7 @@ protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candida
try {
final Engine engine = getEngine();
engine.getTranslog().ensureSynced(candidates.stream().map(Tuple::v1));
} catch (EngineClosedException ex) {
} catch (AlreadyClosedException ex) {
// that's fine since we already synced everything on engine close - this also is conform with the methods
// documentation
} catch (IOException ex) { // if this fails we are in deep shit - fail the request
Expand Down Expand Up @@ -1884,8 +1883,7 @@ EngineFactory getEngineFactory() {
* refresh listeners.
* Otherwise <code>false</code>.
*
* @throws EngineClosedException if the engine is already closed
* @throws AlreadyClosedException if the internal indexwriter in the engine is already closed
* @throws AlreadyClosedException if the engine or internal indexwriter in the engine is already closed
*/
public boolean isRefreshNeeded() {
return getEngine().refreshNeeded() || (refreshListeners != null && refreshListeners.refreshNeeded());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand All @@ -30,7 +31,6 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexingOperationListener;
Expand Down Expand Up @@ -384,7 +384,7 @@ private void runUnlocked() {
protected void checkIdle(IndexShard shard, long inactiveTimeNS) {
try {
shard.checkIdle(inactiveTimeNS);
} catch (EngineClosedException e) {
} catch (AlreadyClosedException e) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("ignore exception while checking if shard {} is inactive", shard.shardId()), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.support.replication;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
Expand Down Expand Up @@ -55,7 +56,6 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardState;
Expand Down Expand Up @@ -431,12 +431,12 @@ public void testStalePrimaryShardOnReroute() throws InterruptedException {
}
}

private ElasticsearchException randomRetryPrimaryException(ShardId shardId) {
private Exception randomRetryPrimaryException(ShardId shardId) {
return randomFrom(
new ShardNotFoundException(shardId),
new IndexNotFoundException(shardId.getIndex()),
new IndexShardClosedException(shardId),
new EngineClosedException(shardId),
new AlreadyClosedException(shardId + " primary is closed"),
new ReplicationOperation.RetryOnPrimaryException(shardId, "hello")
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngineTests;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexingOperationListener;
Expand Down Expand Up @@ -247,7 +249,8 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
assertEquals(IndexingSlowLog.class, indexService.getIndexOperationListeners().get(0).getClass());
assertSame(listener, indexService.getIndexOperationListeners().get(1));

Engine.Index index = new Engine.Index(new Term("_uid", "1"), null);
ParsedDocument doc = InternalEngineTests.createParsedDoc("1", "test", null);
Engine.Index index = new Engine.Index(new Term("_uid", doc.uid()), doc);
ShardId shardId = new ShardId(new Index("foo", "bar"), 0);
for (IndexingOperationListener l : indexService.getIndexOperationListeners()) {
l.preIndex(shardId, index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.index;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.DocWriteResponse;
Expand All @@ -34,7 +33,6 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.common.Priority;
Expand All @@ -58,9 +56,6 @@
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -91,7 +86,6 @@
/**
* Tests for indices that use shadow replicas and a shared filesystem
*/
@LuceneTestCase.AwaitsFix(bugUrl = "fix this fails intermittently")
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class IndexWithShadowReplicasIT extends ESIntegTestCase {

Expand Down Expand Up @@ -459,7 +453,6 @@ public void run() {
assertHitCount(resp, numPhase1Docs + numPhase2Docs);
}

@AwaitsFix(bugUrl = "uncaught exception")
public void testPrimaryRelocationWhereRecoveryFails() throws Exception {
Path dataPath = createTempDir();
Settings nodeSettings = Settings.builder()
Expand Down
Loading