Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1097,15 +1097,21 @@ public IndexCommit acquireIndexCommit(final boolean flushFirst) throws EngineExc
}
}

@SuppressWarnings("finally")
private void failOnTragicEvent(AlreadyClosedException ex) {
// if we are already closed due to some tragic exception
// we need to fail the engine. it might have already been failed before
// but we are double-checking it's failed and closed
if (indexWriter.isOpen() == false && indexWriter.getTragicException() != null) {
final Exception tragedy = indexWriter.getTragicException() instanceof Exception ?
(Exception) indexWriter.getTragicException() :
new Exception(indexWriter.getTragicException());
failEngine("already closed by tragic event on the index writer", tragedy);
if (indexWriter.getTragicException() instanceof Error) {
try {
logger.error("tragic event in index writer", ex);
} finally {
throw (Error) indexWriter.getTragicException();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we still try to fail the engine here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, the node is going to tear itself down anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd feel better if we do since if there is a bug catching that error somehow we didn't fail the shard?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see that means we gotta pass Throwable to failEngine hmm not sure here... ok I guess I am ok with not failing

} else {
failEngine("already closed by tragic event on the index writer", (Exception) indexWriter.getTragicException());
}
} else if (translog.isOpen() == false && translog.getTragicException() != null) {
failEngine("already closed by tragic event on the translog", translog.getTragicException());
} else if (failedEngine.get() == null) { // we are closed but the engine is not failed yet?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.filter.RegexFilter;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.Tokenizer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Field;
Expand Down Expand Up @@ -125,6 +127,7 @@
import org.junit.After;
import org.junit.Before;

import java.io.IOError;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -229,8 +232,12 @@ public void setUp() throws Exception {
}

public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode) {
return copy(config, openMode, config.getAnalyzer());
}

public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) {
return new EngineConfig(openMode, config.getShardId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(),
config.getStore(), config.getDeletionPolicy(), config.getMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
config.getStore(), config.getDeletionPolicy(), config.getMergePolicy(), analyzer, config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), config.getQueryCache(),
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(),
config.getMaxUnsafeAutoIdTimestamp());
Expand Down Expand Up @@ -2849,4 +2856,38 @@ public void afterRefresh(boolean didRefresh) throws IOException {
assertTrue(internalEngine.failedEngine.get() instanceof MockDirectoryWrapper.FakeIOException);
}
}

public void testTragicEventErrorBubblesUp() throws IOException {
engine.close();
final AtomicBoolean failWithFatalError = new AtomicBoolean(true);
final VirtualMachineError error = randomFrom(
new InternalError(),
new OutOfMemoryError(),
new StackOverflowError(),
new UnknownError());
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, new Analyzer() {
@Override
protected TokenStreamComponents createComponents(String fieldName) {
return new TokenStreamComponents(new Tokenizer() {
@Override
public boolean incrementToken() throws IOException {
if (failWithFatalError.get()) {
throw error;
} else {
throw new AssertionError("should not get to this point");
}
}
});
}
}));
final Document document = testDocument();
document.add(new TextField("value", "test", Field.Store.YES));
final ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
final Engine.Index first = new Engine.Index(newUid("1"), doc);
expectThrows(error.getClass(), () -> engine.index(first));
failWithFatalError.set(false);
expectThrows(error.getClass(), () -> engine.index(first));
assertNull(engine.failedEngine.get());
}

}