From be14968c44f09a0ae988c9768afeeddd114aac3e Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 14 Apr 2014 16:37:41 +0200 Subject: [PATCH] Ensure close is called under lock in the case of an engine failure Until today we did close the engine without aqcuireing the write lock since most calls were still holding a read lock. This commit removes the code that holds on to the readlock when failing the engine which means we can simply call #close() --- .../index/engine/internal/InternalEngine.java | 451 ++++++++---------- 1 file changed, 186 insertions(+), 265 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index 56bd137ddf342..2ca3e5d22711a 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -85,7 +85,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -118,7 +117,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin private final CodecService codecService; - private final ReadWriteLock rwl = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); + private final InternalLock readLock = new InternalLock(rwl.readLock()); + private final InternalLock writeLock = new InternalLock(rwl.writeLock()); private volatile IndexWriter indexWriter; @@ -154,7 +155,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin private volatile boolean failOnMergeFailure; private Throwable failedEngine = null; - private final Object failedEngineMutex = new Object(); + private final Lock failEngineLock = new ReentrantLock(); private final CopyOnWriteArrayList failedEngineListeners = new CopyOnWriteArrayList<>(); private final AtomicLong translogIdGenerator = new AtomicLong(); @@ -208,15 +209,12 @@ public InternalEngine(ShardId shardId, @IndexSettings Settings indexSettings, Th @Override public void updateIndexingBufferSize(ByteSizeValue indexingBufferSize) { ByteSizeValue preValue = this.indexingBufferSize; - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { this.indexingBufferSize = indexingBufferSize; IndexWriter indexWriter = this.indexWriter; if (indexWriter != null) { indexWriter.getConfig().setRAMBufferSizeMB(this.indexingBufferSize.mbFrac()); } - } finally { - rwl.readLock().unlock(); } if (preValue.bytes() != indexingBufferSize.bytes()) { // its inactive, make sure we do a full flush in this case, since the memory @@ -245,8 +243,7 @@ public void addFailedEngineListener(FailedEngineListener listener) { @Override public void start() throws EngineException { - rwl.writeLock().lock(); - try { + try (InternalLock _ = writeLock.acquire()) { if (indexWriter != null) { throw new EngineAlreadyStartedException(shardId); } @@ -292,8 +289,6 @@ public void start() throws EngineException { } throw new EngineCreationFailureException(shardId, "failed to open reader on writer", e); } - } finally { - rwl.writeLock().unlock(); } } @@ -314,8 +309,7 @@ public void enableGcDeletes(boolean enableGcDeletes) { } public GetResult get(Get get) throws EngineException { - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { if (get.realtime()) { VersionValue versionValue = versionMap.get(versionKey(get.uid())); if (versionValue != null) { @@ -369,16 +363,12 @@ public GetResult get(Get get) throws EngineException { Releasables.close(searcher); return GetResult.NOT_EXISTS; } - - } finally { - rwl.readLock().unlock(); } } @Override public void create(Create create) throws EngineException { - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { IndexWriter writer = this.indexWriter; if (writer == null) { throw new EngineClosedException(shardId, failedEngine); @@ -387,18 +377,15 @@ public void create(Create create) throws EngineException { dirty = true; possibleMergeNeeded = true; flushNeeded = true; - } catch (IOException e) { - throw new CreateFailedEngineException(shardId, create, e); - } catch (OutOfMemoryError e) { - failEngine(e); - throw new CreateFailedEngineException(shardId, create, e); - } catch (IllegalStateException e) { - if (e.getMessage().contains("OutOfMemoryError")) { - failEngine(e); - } - throw new CreateFailedEngineException(shardId, create, e); - } finally { - rwl.readLock().unlock(); + } catch (OutOfMemoryError | IllegalStateException | IOException t) { + maybeFailEngine(t); + throw new CreateFailedEngineException(shardId, create, t); + } + } + + private void maybeFailEngine(Throwable t) { + if (t instanceof OutOfMemoryError || (t instanceof IllegalStateException && t.getMessage().contains("OutOfMemoryError"))) { + failEngine(t); } } @@ -464,8 +451,7 @@ private void innerCreate(Create create, IndexWriter writer) throws IOException { @Override public void index(Index index) throws EngineException { - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { IndexWriter writer = this.indexWriter; if (writer == null) { throw new EngineClosedException(shardId, failedEngine); @@ -475,18 +461,9 @@ public void index(Index index) throws EngineException { dirty = true; possibleMergeNeeded = true; flushNeeded = true; - } catch (IOException e) { - throw new IndexFailedEngineException(shardId, index, e); - } catch (OutOfMemoryError e) { - failEngine(e); - throw new IndexFailedEngineException(shardId, index, e); - } catch (IllegalStateException e) { - if (e.getMessage().contains("OutOfMemoryError")) { - failEngine(e); - } - throw new IndexFailedEngineException(shardId, index, e); - } finally { - rwl.readLock().unlock(); + } catch (OutOfMemoryError | IllegalStateException | IOException t) { + maybeFailEngine(t); + throw new IndexFailedEngineException(shardId, index, t); } } @@ -546,8 +523,7 @@ private void innerIndex(Index index, IndexWriter writer) throws IOException { @Override public void delete(Delete delete) throws EngineException { - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { IndexWriter writer = this.indexWriter; if (writer == null) { throw new EngineClosedException(shardId, failedEngine); @@ -556,18 +532,9 @@ public void delete(Delete delete) throws EngineException { dirty = true; possibleMergeNeeded = true; flushNeeded = true; - } catch (IOException e) { - throw new DeleteFailedEngineException(shardId, delete, e); - } catch (OutOfMemoryError e) { - failEngine(e); - throw new DeleteFailedEngineException(shardId, delete, e); - } catch (IllegalStateException e) { - if (e.getMessage().contains("OutOfMemoryError")) { - failEngine(e); - } - throw new DeleteFailedEngineException(shardId, delete, e); - } finally { - rwl.readLock().unlock(); + } catch (OutOfMemoryError | IllegalStateException | IOException t) { + maybeFailEngine(t); + throw new DeleteFailedEngineException(shardId, delete, t); } } @@ -620,8 +587,7 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException { @Override public void delete(DeleteByQuery delete) throws EngineException { - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { IndexWriter writer = this.indexWriter; if (writer == null) { throw new EngineClosedException(shardId); @@ -643,10 +609,9 @@ public void delete(DeleteByQuery delete) throws EngineException { dirty = true; possibleMergeNeeded = true; flushNeeded = true; - } catch (IOException e) { - throw new DeleteByQueryFailedEngineException(shardId, delete, e); - } finally { - rwl.readLock().unlock(); + } catch (Throwable t) { + maybeFailEngine(t); + throw new DeleteByQueryFailedEngineException(shardId, delete, t); } //TODO: This is heavy, since we refresh, but we really have to... refreshVersioningTable(System.currentTimeMillis()); @@ -703,48 +668,27 @@ public void refresh(Refresh refresh) throws EngineException { } // we obtain a read lock here, since we don't want a flush to happen while we are refreshing // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) - rwl.readLock().lock(); - try { - // this engine always acts as if waitForOperations=true - IndexWriter currentWriter = indexWriter; - if (currentWriter == null) { - throw new EngineClosedException(shardId, failedEngine); - } - try { - // maybeRefresh will only allow one refresh to execute, and the rest will "pass through", - // but, we want to make sure not to loose ant refresh calls, if one is taking time - synchronized (refreshMutex) { - if (refreshNeeded() || refresh.force()) { - // we set dirty to false, even though the refresh hasn't happened yet - // as the refresh only holds for data indexed before it. Any data indexed during - // the refresh will not be part of it and will set the dirty flag back to true - dirty = false; - boolean refreshed = searcherManager.maybeRefresh(); - assert refreshed : "failed to refresh even though refreshMutex was acquired"; - } - } - } catch (AlreadyClosedException e) { - // an index writer got replaced on us, ignore - } catch (OutOfMemoryError e) { - failEngine(e); - throw new RefreshFailedEngineException(shardId, e); - } catch (IllegalStateException e) { - if (e.getMessage().contains("OutOfMemoryError")) { - failEngine(e); - } - throw new RefreshFailedEngineException(shardId, e); - } catch (Throwable e) { - if (indexWriter == null) { - throw new EngineClosedException(shardId, failedEngine); - } else if (currentWriter != indexWriter) { - // an index writer got replaced on us, ignore - } else { - failEngine(e); - throw new RefreshFailedEngineException(shardId, e); + try (InternalLock _ = readLock.acquire()) { + ensureOpen(); + // maybeRefresh will only allow one refresh to execute, and the rest will "pass through", + // but, we want to make sure not to loose ant refresh calls, if one is taking time + synchronized (refreshMutex) { + if (refreshNeeded() || refresh.force()) { + // we set dirty to false, even though the refresh hasn't happened yet + // as the refresh only holds for data indexed before it. Any data indexed during + // the refresh will not be part of it and will set the dirty flag back to true + dirty = false; + boolean refreshed = searcherManager.maybeRefresh(); + assert refreshed : "failed to refresh even though refreshMutex was acquired"; } } - } finally { - rwl.readLock().unlock(); + } catch (AlreadyClosedException e) { + // an index writer got replaced on us, ignore + } catch (EngineClosedException e) { + throw e; + } catch (Throwable t) { + failEngine(t); + throw new RefreshFailedEngineException(shardId, t); } } @@ -766,9 +710,7 @@ public void flush(Flush flush) throws EngineException { flushLock.lock(); try { if (flush.type() == Flush.Type.NEW_WRITER) { - rwl.writeLock().lock(); - try { - ensureOpen(); + try (InternalLock _ = writeLock.acquire()) { if (onGoingRecoveries.get() > 0) { throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed"); } @@ -778,7 +720,7 @@ public void flush(Flush flush) throws EngineException { // that's ok if the index writer failed and is in inconsistent state // we will get an exception on a dirty operation, and will cause the shard // to be allocated to a different node - indexWriter.close(false); + currentIndexWriter().close(false); indexWriter = createWriter(); // commit on a just opened writer will commit even if there are no changes done to it @@ -799,24 +741,13 @@ public void flush(Flush flush) throws EngineException { logger.warn("Failed to close current SearcherManager", t); } refreshVersioningTable(threadPool.estimatedTimeInMillis()); - } catch (OutOfMemoryError e) { - failEngine(e); - throw new FlushFailedEngineException(shardId, e); - } catch (IllegalStateException e) { - if (e.getMessage().contains("OutOfMemoryError")) { - failEngine(e); - } - throw new FlushFailedEngineException(shardId, e); - } catch (Throwable e) { - throw new FlushFailedEngineException(shardId, e); + } catch (Throwable t) { + throw new FlushFailedEngineException(shardId, t); } - } finally { - rwl.writeLock().unlock(); } } else if (flush.type() == Flush.Type.COMMIT_TRANSLOG) { - rwl.readLock().lock(); - try { - ensureOpen(); + try (InternalLock _ = readLock.acquire()) { + final IndexWriter indexWriter = currentIndexWriter(); if (onGoingRecoveries.get() > 0) { throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed"); } @@ -833,30 +764,18 @@ public void flush(Flush flush) throws EngineException { // so items added to current will still be around for realtime get // when tans overrides it translog.makeTransientCurrent(); - } catch (OutOfMemoryError e) { - translog.revertTransient(); - failEngine(e); - throw new FlushFailedEngineException(shardId, e); - } catch (IllegalStateException e) { - if (e.getMessage().contains("OutOfMemoryError")) { - failEngine(e); - } - throw new FlushFailedEngineException(shardId, e); } catch (Throwable e) { translog.revertTransient(); throw new FlushFailedEngineException(shardId, e); } } - } finally { - rwl.readLock().unlock(); } } else if (flush.type() == Flush.Type.COMMIT) { // note, its ok to just commit without cleaning the translog, its perfectly fine to replay a // translog on an index that was opened on a committed point in time that is "in the future" // of that translog - rwl.readLock().lock(); - try { - ensureOpen(); + try (InternalLock _ = readLock.acquire()) { + final IndexWriter indexWriter = currentIndexWriter(); // we allow to *just* commit if there is an ongoing recovery happening... // its ok to use this, only a flush will cause a new translogId, and we are locked here from // other flushes use flushLock @@ -864,50 +783,50 @@ public void flush(Flush flush) throws EngineException { long translogId = translog.currentId(); indexWriter.setCommitData(MapBuilder.newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map()); indexWriter.commit(); - } catch (OutOfMemoryError e) { - translog.revertTransient(); - failEngine(e); - throw new FlushFailedEngineException(shardId, e); - } catch (IllegalStateException e) { - if (e.getMessage().contains("OutOfMemoryError")) { - failEngine(e); - } - throw new FlushFailedEngineException(shardId, e); } catch (Throwable e) { throw new FlushFailedEngineException(shardId, e); } - } finally { - rwl.readLock().unlock(); } } else { throw new ElasticsearchIllegalStateException("flush type [" + flush.type() + "] not supported"); } // reread the last committed segment infos - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { ensureOpen(); readLastCommittedSegmentsInfo(); } catch (Throwable e) { if (!closed) { logger.warn("failed to read latest segment infos on flush", e); } - } finally { - rwl.readLock().unlock(); } + } catch (FlushFailedEngineException ex){ + maybeFailEngine(ex.getCause()); + throw ex; } finally { flushLock.unlock(); flushing.decrementAndGet(); } } - private void ensureOpen() { if (indexWriter == null) { throw new EngineClosedException(shardId, failedEngine); } } + /** + * Returns the current index writer. This method will never return null + * @throws EngineClosedException if the engine is already closed + */ + private IndexWriter currentIndexWriter() { + final IndexWriter writer = indexWriter; + if (writer == null) { + throw new EngineClosedException(shardId, failedEngine); + } + return writer; + } + private void refreshVersioningTable(long time) { // we need to refresh in order to clear older version values refresh(new Refresh("version_table").force(true)); @@ -938,22 +857,11 @@ public void maybeMerge() throws EngineException { return; } possibleMergeNeeded = false; - rwl.readLock().lock(); - try { - ensureOpen(); - Merges.maybeMerge(indexWriter); - } catch (OutOfMemoryError e) { - failEngine(e); - throw new OptimizeFailedEngineException(shardId, e); - } catch (IllegalStateException e) { - if (e.getMessage().contains("OutOfMemoryError")) { - failEngine(e); - } - throw new OptimizeFailedEngineException(shardId, e); - } catch (Throwable e) { - throw new OptimizeFailedEngineException(shardId, e); - } finally { - rwl.readLock().unlock(); + try (InternalLock _ = readLock.acquire()) { + Merges.maybeMerge(currentIndexWriter()); + } catch (Throwable t) { + maybeFailEngine(t); + throw new OptimizeFailedEngineException(shardId, t); } } @@ -964,16 +872,15 @@ public void optimize(Optimize optimize) throws EngineException { } if (optimizeMutex.compareAndSet(false, true)) { ElasticsearchMergePolicy elasticsearchMergePolicy = null; - rwl.readLock().lock(); - try { - ensureOpen(); + try (InternalLock _ = readLock.acquire()) { + final IndexWriter writer = currentIndexWriter(); - if (indexWriter.getConfig().getMergePolicy() instanceof ElasticsearchMergePolicy) { - elasticsearchMergePolicy = (ElasticsearchMergePolicy) indexWriter.getConfig().getMergePolicy(); + if (writer.getConfig().getMergePolicy() instanceof ElasticsearchMergePolicy) { + elasticsearchMergePolicy = (ElasticsearchMergePolicy) writer.getConfig().getMergePolicy(); } if (optimize.force() && elasticsearchMergePolicy == null) { throw new ElasticsearchIllegalStateException("The `force` flag can only be used if the merge policy is an instance of " - + ElasticsearchMergePolicy.class.getSimpleName() + ", got [" + indexWriter.getConfig().getMergePolicy().getClass().getName() + "]"); + + ElasticsearchMergePolicy.class.getSimpleName() + ", got [" + writer.getConfig().getMergePolicy().getClass().getName() + "]"); } /* @@ -986,34 +893,27 @@ public void optimize(Optimize optimize) throws EngineException { elasticsearchMergePolicy.setForce(true); } if (optimize.onlyExpungeDeletes()) { - Merges.forceMergeDeletes(indexWriter, false); + Merges.forceMergeDeletes(writer, false); } else if (optimize.maxNumSegments() <= 0) { - Merges.maybeMerge(indexWriter); + Merges.maybeMerge(writer); possibleMergeNeeded = false; } else { - Merges.forceMerge(indexWriter, optimize.maxNumSegments(), false); - } - } catch (OutOfMemoryError e) { - failEngine(e); - throw new OptimizeFailedEngineException(shardId, e); - } catch (IllegalStateException e) { - if (e.getMessage().contains("OutOfMemoryError")) { - failEngine(e); + Merges.forceMerge(writer, optimize.maxNumSegments(), false); } - throw new OptimizeFailedEngineException(shardId, e); - } catch (Throwable e) { - throw new OptimizeFailedEngineException(shardId, e); + } catch (Throwable t) { + maybeFailEngine(t); + throw new OptimizeFailedEngineException(shardId, t); } finally { if (elasticsearchMergePolicy != null) { elasticsearchMergePolicy.setForce(false); } - rwl.readLock().unlock(); optimizeMutex.set(false); } + } // wait for the merges outside of the read lock if (optimize.waitForMerge()) { - indexWriter.waitForMerges(); + currentIndexWriter().waitForMerges(); } if (optimize.flush()) { flush(new Flush().force(true).waitIfOngoing(true)); @@ -1023,15 +923,12 @@ public void optimize(Optimize optimize) throws EngineException { @Override public SnapshotIndexCommit snapshotIndex() throws EngineException { - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { flush(new Flush().type(Flush.Type.COMMIT).waitIfOngoing(true)); ensureOpen(); return deletionPolicy.snapshot(); } catch (IOException e) { throw new SnapshotFailedEngineException(shardId, e); - } finally { - rwl.readLock().unlock(); } } @@ -1039,14 +936,11 @@ public SnapshotIndexCommit snapshotIndex() throws EngineException { public void recover(RecoveryHandler recoveryHandler) throws EngineException { // take a write lock here so it won't happen while a flush is in progress // this means that next commits will not be allowed once the lock is released - rwl.writeLock().lock(); - try { + try (InternalLock _ = writeLock.acquire()) { if (closed) { throw new EngineClosedException(shardId); } onGoingRecoveries.startRecovery(); - } finally { - rwl.writeLock().unlock(); } SnapshotIndexCommit phase1Snapshot; @@ -1061,10 +955,7 @@ public void recover(RecoveryHandler recoveryHandler) throws EngineException { recoveryHandler.phase1(phase1Snapshot); } catch (Throwable e) { Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot); - if (closed) { - e = new EngineClosedException(shardId, e); - } - throw new RecoveryEngineException(shardId, 1, "Execution failed", e); + throw new RecoveryEngineException(shardId, 1, "Execution failed", wrapIfClosed(e)); } Translog.Snapshot phase2Snapshot; @@ -1072,23 +963,17 @@ public void recover(RecoveryHandler recoveryHandler) throws EngineException { phase2Snapshot = translog.snapshot(); } catch (Throwable e) { Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot); - if (closed) { - e = new EngineClosedException(shardId, e); - } - throw new RecoveryEngineException(shardId, 2, "Snapshot failed", e); + throw new RecoveryEngineException(shardId, 2, "Snapshot failed", wrapIfClosed(e)); } try { recoveryHandler.phase2(phase2Snapshot); } catch (Throwable e) { Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot, phase2Snapshot); - if (closed) { - e = new EngineClosedException(shardId, e); - } - throw new RecoveryEngineException(shardId, 2, "Execution failed", e); + throw new RecoveryEngineException(shardId, 2, "Execution failed", wrapIfClosed(e)); } - rwl.writeLock().lock(); + writeLock.acquire(); Translog.Snapshot phase3Snapshot = null; boolean success = false; try { @@ -1096,12 +981,18 @@ public void recover(RecoveryHandler recoveryHandler) throws EngineException { recoveryHandler.phase3(phase3Snapshot); success = true; } catch (Throwable e) { - throw new RecoveryEngineException(shardId, 3, "Execution failed", e); + throw new RecoveryEngineException(shardId, 3, "Execution failed", wrapIfClosed(e)); } finally { - Releasables.close(success, onGoingRecoveries); - rwl.writeLock().unlock(); - Releasables.close(success, phase1Snapshot, phase2Snapshot, phase3Snapshot); + Releasables.close(success, onGoingRecoveries, writeLock, phase1Snapshot, + phase2Snapshot, phase3Snapshot); // hmm why can't we use try-with here? + } + } + + private Throwable wrapIfClosed(Throwable t) { + if (closed) { + return new EngineClosedException(shardId, t); } + return t; } private static long getReaderRamBytesUsed(AtomicReaderContext reader) { @@ -1111,8 +1002,7 @@ private static long getReaderRamBytesUsed(AtomicReaderContext reader) { @Override public SegmentsStats segmentsStats() { - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { ensureOpen(); Searcher searcher = acquireSearcher("segments_stats"); try { @@ -1124,15 +1014,12 @@ public SegmentsStats segmentsStats() { } finally { searcher.close(); } - } finally { - rwl.readLock().unlock(); } } @Override public List segments() { - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { ensureOpen(); Map segments = new HashMap<>(); @@ -1208,18 +1095,38 @@ public int compare(Segment o1, Segment o2) { } return Arrays.asList(segmentsArr); - } finally { - rwl.readLock().unlock(); } } @Override public void close() throws ElasticsearchException { - rwl.writeLock().lock(); - try { - innerClose(); - } finally { - rwl.writeLock().unlock(); + try (InternalLock _ = writeLock.acquire()) { + if (!closed) { + try { + closed = true; + indexSettingsService.removeListener(applySettings); + this.versionMap.clear(); + this.failedEngineListeners.clear(); + try { + IOUtils.close(searcherManager); + } catch (Throwable t) { + logger.warn("Failed to close SearcherManager", t); + } + // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed + if (indexWriter != null) { + try { + indexWriter.rollback(); + } catch (AlreadyClosedException e) { + // ignore + } + } + } catch (Throwable e) { + logger.warn("failed to rollback writer on close", e); + } finally { + indexWriter = null; + store.decRef(); + } + } } } @@ -1231,46 +1138,24 @@ public void onFailedMerge(MergePolicy.MergeException e) { } private void failEngine(Throwable failure) { - synchronized (failedEngineMutex) { + if (failEngineLock.tryLock()) { + assert !readLock.assertLockIsHeld() : "readLock is held by a thread that tries to fail the engine"; if (failedEngine != null) { return; } - logger.warn("failed engine", failure); - failedEngine = failure; - for (FailedEngineListener listener : failedEngineListeners) { - listener.onFailedEngine(shardId, failure); - } - // TODO - should we acquire the writeLock here? - innerClose(); - } - } - - private void innerClose() { - if (!closed) { try { - closed = true; - indexSettingsService.removeListener(applySettings); - this.versionMap.clear(); - this.failedEngineListeners.clear(); - try { - IOUtils.close(searcherManager); - } catch (Throwable t) { - logger.warn("Failed to close SearcherManager", t); - } - // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed - if (indexWriter != null) { - try { - indexWriter.rollback(); - } catch (AlreadyClosedException e) { - // ignore - } + logger.warn("failed engine", failure); + failedEngine = failure; + for (FailedEngineListener listener : failedEngineListeners) { + listener.onFailedEngine(shardId, failure); } - } catch (Throwable e) { - logger.warn("failed to rollback writer on close", e); } finally { - indexWriter = null; - store.decRef(); + // close the engine whatever happens... + close(); } + + } else { + logger.debug("Tried to fail engine but could not acquire lock - engine should be failed by now", failure); } } @@ -1396,8 +1281,7 @@ public void onRefreshSettings(Settings settings) { !codecName.equals(InternalEngine.this.codecName) || failOnMergeFailure != InternalEngine.this.failOnMergeFailure || codecBloomLoad != codecService.isLoadBloomFilter()) { - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { if (indexConcurrency != InternalEngine.this.indexConcurrency) { logger.info("updating index.index_concurrency from [{}] to [{}]", InternalEngine.this.indexConcurrency, indexConcurrency); InternalEngine.this.indexConcurrency = indexConcurrency; @@ -1420,8 +1304,6 @@ public void onRefreshSettings(Settings settings) { // we need to flush in this case, to load/unload the bloom filters requiresFlushing = true; } - } finally { - rwl.readLock().unlock(); } if (requiresFlushing) { flush(new Flush().type(Flush.Type.NEW_WRITER)); @@ -1606,5 +1488,44 @@ public void close() throws ElasticsearchException { endRecovery(); } } + + private static final class InternalLock implements Releasable { + private final ThreadLocal lockIsHeld; + private final Lock lock; + + InternalLock(Lock lock) { + ThreadLocal tl = null; + assert (tl = new ThreadLocal<>()) != null; + lockIsHeld = tl; + this.lock = lock; + } + + @Override + public void close() { + lock.unlock(); + assert onAssertRelease(); + } + + InternalLock acquire() throws EngineException { + lock.lock(); + assert onAssertLock(); + return this; + } + + + protected boolean onAssertRelease() { + lockIsHeld.set(Boolean.FALSE); + return true; + } + protected boolean onAssertLock() { + lockIsHeld.remove(); + return true; + } + + boolean assertLockIsHeld() { + Boolean aBoolean = lockIsHeld.get(); + return aBoolean != null && aBoolean.booleanValue(); + } + } }