diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index 56bd137ddf342..2ca3e5d22711a 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -85,7 +85,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -118,7 +117,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin private final CodecService codecService; - private final ReadWriteLock rwl = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); + private final InternalLock readLock = new InternalLock(rwl.readLock()); + private final InternalLock writeLock = new InternalLock(rwl.writeLock()); private volatile IndexWriter indexWriter; @@ -154,7 +155,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin private volatile boolean failOnMergeFailure; private Throwable failedEngine = null; - private final Object failedEngineMutex = new Object(); + private final Lock failEngineLock = new ReentrantLock(); private final CopyOnWriteArrayList failedEngineListeners = new CopyOnWriteArrayList<>(); private final AtomicLong translogIdGenerator = new AtomicLong(); @@ -208,15 +209,12 @@ public InternalEngine(ShardId shardId, @IndexSettings Settings indexSettings, Th @Override public void updateIndexingBufferSize(ByteSizeValue indexingBufferSize) { ByteSizeValue preValue = this.indexingBufferSize; - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { this.indexingBufferSize = indexingBufferSize; IndexWriter indexWriter = this.indexWriter; if (indexWriter != null) { indexWriter.getConfig().setRAMBufferSizeMB(this.indexingBufferSize.mbFrac()); } - } finally { - rwl.readLock().unlock(); } if (preValue.bytes() != indexingBufferSize.bytes()) { // its inactive, make sure we do a full flush in this case, since the memory @@ -245,8 +243,7 @@ public void addFailedEngineListener(FailedEngineListener listener) { @Override public void start() throws EngineException { - rwl.writeLock().lock(); - try { + try (InternalLock _ = writeLock.acquire()) { if (indexWriter != null) { throw new EngineAlreadyStartedException(shardId); } @@ -292,8 +289,6 @@ public void start() throws EngineException { } throw new EngineCreationFailureException(shardId, "failed to open reader on writer", e); } - } finally { - rwl.writeLock().unlock(); } } @@ -314,8 +309,7 @@ public void enableGcDeletes(boolean enableGcDeletes) { } public GetResult get(Get get) throws EngineException { - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { if (get.realtime()) { VersionValue versionValue = versionMap.get(versionKey(get.uid())); if (versionValue != null) { @@ -369,16 +363,12 @@ public GetResult get(Get get) throws EngineException { Releasables.close(searcher); return GetResult.NOT_EXISTS; } - - } finally { - rwl.readLock().unlock(); } } @Override public void create(Create create) throws EngineException { - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { IndexWriter writer = this.indexWriter; if (writer == null) { throw new EngineClosedException(shardId, failedEngine); @@ -387,18 +377,15 @@ public void create(Create create) throws EngineException { dirty = true; possibleMergeNeeded = true; flushNeeded = true; - } catch (IOException e) { - throw new CreateFailedEngineException(shardId, create, e); - } catch (OutOfMemoryError e) { - failEngine(e); - throw new CreateFailedEngineException(shardId, create, e); - } catch (IllegalStateException e) { - if (e.getMessage().contains("OutOfMemoryError")) { - failEngine(e); - } - throw new CreateFailedEngineException(shardId, create, e); - } finally { - rwl.readLock().unlock(); + } catch (OutOfMemoryError | IllegalStateException | IOException t) { + maybeFailEngine(t); + throw new CreateFailedEngineException(shardId, create, t); + } + } + + private void maybeFailEngine(Throwable t) { + if (t instanceof OutOfMemoryError || (t instanceof IllegalStateException && t.getMessage().contains("OutOfMemoryError"))) { + failEngine(t); } } @@ -464,8 +451,7 @@ private void innerCreate(Create create, IndexWriter writer) throws IOException { @Override public void index(Index index) throws EngineException { - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { IndexWriter writer = this.indexWriter; if (writer == null) { throw new EngineClosedException(shardId, failedEngine); @@ -475,18 +461,9 @@ public void index(Index index) throws EngineException { dirty = true; possibleMergeNeeded = true; flushNeeded = true; - } catch (IOException e) { - throw new IndexFailedEngineException(shardId, index, e); - } catch (OutOfMemoryError e) { - failEngine(e); - throw new IndexFailedEngineException(shardId, index, e); - } catch (IllegalStateException e) { - if (e.getMessage().contains("OutOfMemoryError")) { - failEngine(e); - } - throw new IndexFailedEngineException(shardId, index, e); - } finally { - rwl.readLock().unlock(); + } catch (OutOfMemoryError | IllegalStateException | IOException t) { + maybeFailEngine(t); + throw new IndexFailedEngineException(shardId, index, t); } } @@ -546,8 +523,7 @@ private void innerIndex(Index index, IndexWriter writer) throws IOException { @Override public void delete(Delete delete) throws EngineException { - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { IndexWriter writer = this.indexWriter; if (writer == null) { throw new EngineClosedException(shardId, failedEngine); @@ -556,18 +532,9 @@ public void delete(Delete delete) throws EngineException { dirty = true; possibleMergeNeeded = true; flushNeeded = true; - } catch (IOException e) { - throw new DeleteFailedEngineException(shardId, delete, e); - } catch (OutOfMemoryError e) { - failEngine(e); - throw new DeleteFailedEngineException(shardId, delete, e); - } catch (IllegalStateException e) { - if (e.getMessage().contains("OutOfMemoryError")) { - failEngine(e); - } - throw new DeleteFailedEngineException(shardId, delete, e); - } finally { - rwl.readLock().unlock(); + } catch (OutOfMemoryError | IllegalStateException | IOException t) { + maybeFailEngine(t); + throw new DeleteFailedEngineException(shardId, delete, t); } } @@ -620,8 +587,7 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException { @Override public void delete(DeleteByQuery delete) throws EngineException { - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { IndexWriter writer = this.indexWriter; if (writer == null) { throw new EngineClosedException(shardId); @@ -643,10 +609,9 @@ public void delete(DeleteByQuery delete) throws EngineException { dirty = true; possibleMergeNeeded = true; flushNeeded = true; - } catch (IOException e) { - throw new DeleteByQueryFailedEngineException(shardId, delete, e); - } finally { - rwl.readLock().unlock(); + } catch (Throwable t) { + maybeFailEngine(t); + throw new DeleteByQueryFailedEngineException(shardId, delete, t); } //TODO: This is heavy, since we refresh, but we really have to... refreshVersioningTable(System.currentTimeMillis()); @@ -703,48 +668,27 @@ public void refresh(Refresh refresh) throws EngineException { } // we obtain a read lock here, since we don't want a flush to happen while we are refreshing // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) - rwl.readLock().lock(); - try { - // this engine always acts as if waitForOperations=true - IndexWriter currentWriter = indexWriter; - if (currentWriter == null) { - throw new EngineClosedException(shardId, failedEngine); - } - try { - // maybeRefresh will only allow one refresh to execute, and the rest will "pass through", - // but, we want to make sure not to loose ant refresh calls, if one is taking time - synchronized (refreshMutex) { - if (refreshNeeded() || refresh.force()) { - // we set dirty to false, even though the refresh hasn't happened yet - // as the refresh only holds for data indexed before it. Any data indexed during - // the refresh will not be part of it and will set the dirty flag back to true - dirty = false; - boolean refreshed = searcherManager.maybeRefresh(); - assert refreshed : "failed to refresh even though refreshMutex was acquired"; - } - } - } catch (AlreadyClosedException e) { - // an index writer got replaced on us, ignore - } catch (OutOfMemoryError e) { - failEngine(e); - throw new RefreshFailedEngineException(shardId, e); - } catch (IllegalStateException e) { - if (e.getMessage().contains("OutOfMemoryError")) { - failEngine(e); - } - throw new RefreshFailedEngineException(shardId, e); - } catch (Throwable e) { - if (indexWriter == null) { - throw new EngineClosedException(shardId, failedEngine); - } else if (currentWriter != indexWriter) { - // an index writer got replaced on us, ignore - } else { - failEngine(e); - throw new RefreshFailedEngineException(shardId, e); + try (InternalLock _ = readLock.acquire()) { + ensureOpen(); + // maybeRefresh will only allow one refresh to execute, and the rest will "pass through", + // but, we want to make sure not to loose ant refresh calls, if one is taking time + synchronized (refreshMutex) { + if (refreshNeeded() || refresh.force()) { + // we set dirty to false, even though the refresh hasn't happened yet + // as the refresh only holds for data indexed before it. Any data indexed during + // the refresh will not be part of it and will set the dirty flag back to true + dirty = false; + boolean refreshed = searcherManager.maybeRefresh(); + assert refreshed : "failed to refresh even though refreshMutex was acquired"; } } - } finally { - rwl.readLock().unlock(); + } catch (AlreadyClosedException e) { + // an index writer got replaced on us, ignore + } catch (EngineClosedException e) { + throw e; + } catch (Throwable t) { + failEngine(t); + throw new RefreshFailedEngineException(shardId, t); } } @@ -766,9 +710,7 @@ public void flush(Flush flush) throws EngineException { flushLock.lock(); try { if (flush.type() == Flush.Type.NEW_WRITER) { - rwl.writeLock().lock(); - try { - ensureOpen(); + try (InternalLock _ = writeLock.acquire()) { if (onGoingRecoveries.get() > 0) { throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed"); } @@ -778,7 +720,7 @@ public void flush(Flush flush) throws EngineException { // that's ok if the index writer failed and is in inconsistent state // we will get an exception on a dirty operation, and will cause the shard // to be allocated to a different node - indexWriter.close(false); + currentIndexWriter().close(false); indexWriter = createWriter(); // commit on a just opened writer will commit even if there are no changes done to it @@ -799,24 +741,13 @@ public void flush(Flush flush) throws EngineException { logger.warn("Failed to close current SearcherManager", t); } refreshVersioningTable(threadPool.estimatedTimeInMillis()); - } catch (OutOfMemoryError e) { - failEngine(e); - throw new FlushFailedEngineException(shardId, e); - } catch (IllegalStateException e) { - if (e.getMessage().contains("OutOfMemoryError")) { - failEngine(e); - } - throw new FlushFailedEngineException(shardId, e); - } catch (Throwable e) { - throw new FlushFailedEngineException(shardId, e); + } catch (Throwable t) { + throw new FlushFailedEngineException(shardId, t); } - } finally { - rwl.writeLock().unlock(); } } else if (flush.type() == Flush.Type.COMMIT_TRANSLOG) { - rwl.readLock().lock(); - try { - ensureOpen(); + try (InternalLock _ = readLock.acquire()) { + final IndexWriter indexWriter = currentIndexWriter(); if (onGoingRecoveries.get() > 0) { throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed"); } @@ -833,30 +764,18 @@ public void flush(Flush flush) throws EngineException { // so items added to current will still be around for realtime get // when tans overrides it translog.makeTransientCurrent(); - } catch (OutOfMemoryError e) { - translog.revertTransient(); - failEngine(e); - throw new FlushFailedEngineException(shardId, e); - } catch (IllegalStateException e) { - if (e.getMessage().contains("OutOfMemoryError")) { - failEngine(e); - } - throw new FlushFailedEngineException(shardId, e); } catch (Throwable e) { translog.revertTransient(); throw new FlushFailedEngineException(shardId, e); } } - } finally { - rwl.readLock().unlock(); } } else if (flush.type() == Flush.Type.COMMIT) { // note, its ok to just commit without cleaning the translog, its perfectly fine to replay a // translog on an index that was opened on a committed point in time that is "in the future" // of that translog - rwl.readLock().lock(); - try { - ensureOpen(); + try (InternalLock _ = readLock.acquire()) { + final IndexWriter indexWriter = currentIndexWriter(); // we allow to *just* commit if there is an ongoing recovery happening... // its ok to use this, only a flush will cause a new translogId, and we are locked here from // other flushes use flushLock @@ -864,50 +783,50 @@ public void flush(Flush flush) throws EngineException { long translogId = translog.currentId(); indexWriter.setCommitData(MapBuilder.newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map()); indexWriter.commit(); - } catch (OutOfMemoryError e) { - translog.revertTransient(); - failEngine(e); - throw new FlushFailedEngineException(shardId, e); - } catch (IllegalStateException e) { - if (e.getMessage().contains("OutOfMemoryError")) { - failEngine(e); - } - throw new FlushFailedEngineException(shardId, e); } catch (Throwable e) { throw new FlushFailedEngineException(shardId, e); } - } finally { - rwl.readLock().unlock(); } } else { throw new ElasticsearchIllegalStateException("flush type [" + flush.type() + "] not supported"); } // reread the last committed segment infos - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { ensureOpen(); readLastCommittedSegmentsInfo(); } catch (Throwable e) { if (!closed) { logger.warn("failed to read latest segment infos on flush", e); } - } finally { - rwl.readLock().unlock(); } + } catch (FlushFailedEngineException ex){ + maybeFailEngine(ex.getCause()); + throw ex; } finally { flushLock.unlock(); flushing.decrementAndGet(); } } - private void ensureOpen() { if (indexWriter == null) { throw new EngineClosedException(shardId, failedEngine); } } + /** + * Returns the current index writer. This method will never return null + * @throws EngineClosedException if the engine is already closed + */ + private IndexWriter currentIndexWriter() { + final IndexWriter writer = indexWriter; + if (writer == null) { + throw new EngineClosedException(shardId, failedEngine); + } + return writer; + } + private void refreshVersioningTable(long time) { // we need to refresh in order to clear older version values refresh(new Refresh("version_table").force(true)); @@ -938,22 +857,11 @@ public void maybeMerge() throws EngineException { return; } possibleMergeNeeded = false; - rwl.readLock().lock(); - try { - ensureOpen(); - Merges.maybeMerge(indexWriter); - } catch (OutOfMemoryError e) { - failEngine(e); - throw new OptimizeFailedEngineException(shardId, e); - } catch (IllegalStateException e) { - if (e.getMessage().contains("OutOfMemoryError")) { - failEngine(e); - } - throw new OptimizeFailedEngineException(shardId, e); - } catch (Throwable e) { - throw new OptimizeFailedEngineException(shardId, e); - } finally { - rwl.readLock().unlock(); + try (InternalLock _ = readLock.acquire()) { + Merges.maybeMerge(currentIndexWriter()); + } catch (Throwable t) { + maybeFailEngine(t); + throw new OptimizeFailedEngineException(shardId, t); } } @@ -964,16 +872,15 @@ public void optimize(Optimize optimize) throws EngineException { } if (optimizeMutex.compareAndSet(false, true)) { ElasticsearchMergePolicy elasticsearchMergePolicy = null; - rwl.readLock().lock(); - try { - ensureOpen(); + try (InternalLock _ = readLock.acquire()) { + final IndexWriter writer = currentIndexWriter(); - if (indexWriter.getConfig().getMergePolicy() instanceof ElasticsearchMergePolicy) { - elasticsearchMergePolicy = (ElasticsearchMergePolicy) indexWriter.getConfig().getMergePolicy(); + if (writer.getConfig().getMergePolicy() instanceof ElasticsearchMergePolicy) { + elasticsearchMergePolicy = (ElasticsearchMergePolicy) writer.getConfig().getMergePolicy(); } if (optimize.force() && elasticsearchMergePolicy == null) { throw new ElasticsearchIllegalStateException("The `force` flag can only be used if the merge policy is an instance of " - + ElasticsearchMergePolicy.class.getSimpleName() + ", got [" + indexWriter.getConfig().getMergePolicy().getClass().getName() + "]"); + + ElasticsearchMergePolicy.class.getSimpleName() + ", got [" + writer.getConfig().getMergePolicy().getClass().getName() + "]"); } /* @@ -986,34 +893,27 @@ public void optimize(Optimize optimize) throws EngineException { elasticsearchMergePolicy.setForce(true); } if (optimize.onlyExpungeDeletes()) { - Merges.forceMergeDeletes(indexWriter, false); + Merges.forceMergeDeletes(writer, false); } else if (optimize.maxNumSegments() <= 0) { - Merges.maybeMerge(indexWriter); + Merges.maybeMerge(writer); possibleMergeNeeded = false; } else { - Merges.forceMerge(indexWriter, optimize.maxNumSegments(), false); - } - } catch (OutOfMemoryError e) { - failEngine(e); - throw new OptimizeFailedEngineException(shardId, e); - } catch (IllegalStateException e) { - if (e.getMessage().contains("OutOfMemoryError")) { - failEngine(e); + Merges.forceMerge(writer, optimize.maxNumSegments(), false); } - throw new OptimizeFailedEngineException(shardId, e); - } catch (Throwable e) { - throw new OptimizeFailedEngineException(shardId, e); + } catch (Throwable t) { + maybeFailEngine(t); + throw new OptimizeFailedEngineException(shardId, t); } finally { if (elasticsearchMergePolicy != null) { elasticsearchMergePolicy.setForce(false); } - rwl.readLock().unlock(); optimizeMutex.set(false); } + } // wait for the merges outside of the read lock if (optimize.waitForMerge()) { - indexWriter.waitForMerges(); + currentIndexWriter().waitForMerges(); } if (optimize.flush()) { flush(new Flush().force(true).waitIfOngoing(true)); @@ -1023,15 +923,12 @@ public void optimize(Optimize optimize) throws EngineException { @Override public SnapshotIndexCommit snapshotIndex() throws EngineException { - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { flush(new Flush().type(Flush.Type.COMMIT).waitIfOngoing(true)); ensureOpen(); return deletionPolicy.snapshot(); } catch (IOException e) { throw new SnapshotFailedEngineException(shardId, e); - } finally { - rwl.readLock().unlock(); } } @@ -1039,14 +936,11 @@ public SnapshotIndexCommit snapshotIndex() throws EngineException { public void recover(RecoveryHandler recoveryHandler) throws EngineException { // take a write lock here so it won't happen while a flush is in progress // this means that next commits will not be allowed once the lock is released - rwl.writeLock().lock(); - try { + try (InternalLock _ = writeLock.acquire()) { if (closed) { throw new EngineClosedException(shardId); } onGoingRecoveries.startRecovery(); - } finally { - rwl.writeLock().unlock(); } SnapshotIndexCommit phase1Snapshot; @@ -1061,10 +955,7 @@ public void recover(RecoveryHandler recoveryHandler) throws EngineException { recoveryHandler.phase1(phase1Snapshot); } catch (Throwable e) { Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot); - if (closed) { - e = new EngineClosedException(shardId, e); - } - throw new RecoveryEngineException(shardId, 1, "Execution failed", e); + throw new RecoveryEngineException(shardId, 1, "Execution failed", wrapIfClosed(e)); } Translog.Snapshot phase2Snapshot; @@ -1072,23 +963,17 @@ public void recover(RecoveryHandler recoveryHandler) throws EngineException { phase2Snapshot = translog.snapshot(); } catch (Throwable e) { Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot); - if (closed) { - e = new EngineClosedException(shardId, e); - } - throw new RecoveryEngineException(shardId, 2, "Snapshot failed", e); + throw new RecoveryEngineException(shardId, 2, "Snapshot failed", wrapIfClosed(e)); } try { recoveryHandler.phase2(phase2Snapshot); } catch (Throwable e) { Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot, phase2Snapshot); - if (closed) { - e = new EngineClosedException(shardId, e); - } - throw new RecoveryEngineException(shardId, 2, "Execution failed", e); + throw new RecoveryEngineException(shardId, 2, "Execution failed", wrapIfClosed(e)); } - rwl.writeLock().lock(); + writeLock.acquire(); Translog.Snapshot phase3Snapshot = null; boolean success = false; try { @@ -1096,12 +981,18 @@ public void recover(RecoveryHandler recoveryHandler) throws EngineException { recoveryHandler.phase3(phase3Snapshot); success = true; } catch (Throwable e) { - throw new RecoveryEngineException(shardId, 3, "Execution failed", e); + throw new RecoveryEngineException(shardId, 3, "Execution failed", wrapIfClosed(e)); } finally { - Releasables.close(success, onGoingRecoveries); - rwl.writeLock().unlock(); - Releasables.close(success, phase1Snapshot, phase2Snapshot, phase3Snapshot); + Releasables.close(success, onGoingRecoveries, writeLock, phase1Snapshot, + phase2Snapshot, phase3Snapshot); // hmm why can't we use try-with here? + } + } + + private Throwable wrapIfClosed(Throwable t) { + if (closed) { + return new EngineClosedException(shardId, t); } + return t; } private static long getReaderRamBytesUsed(AtomicReaderContext reader) { @@ -1111,8 +1002,7 @@ private static long getReaderRamBytesUsed(AtomicReaderContext reader) { @Override public SegmentsStats segmentsStats() { - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { ensureOpen(); Searcher searcher = acquireSearcher("segments_stats"); try { @@ -1124,15 +1014,12 @@ public SegmentsStats segmentsStats() { } finally { searcher.close(); } - } finally { - rwl.readLock().unlock(); } } @Override public List segments() { - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { ensureOpen(); Map segments = new HashMap<>(); @@ -1208,18 +1095,38 @@ public int compare(Segment o1, Segment o2) { } return Arrays.asList(segmentsArr); - } finally { - rwl.readLock().unlock(); } } @Override public void close() throws ElasticsearchException { - rwl.writeLock().lock(); - try { - innerClose(); - } finally { - rwl.writeLock().unlock(); + try (InternalLock _ = writeLock.acquire()) { + if (!closed) { + try { + closed = true; + indexSettingsService.removeListener(applySettings); + this.versionMap.clear(); + this.failedEngineListeners.clear(); + try { + IOUtils.close(searcherManager); + } catch (Throwable t) { + logger.warn("Failed to close SearcherManager", t); + } + // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed + if (indexWriter != null) { + try { + indexWriter.rollback(); + } catch (AlreadyClosedException e) { + // ignore + } + } + } catch (Throwable e) { + logger.warn("failed to rollback writer on close", e); + } finally { + indexWriter = null; + store.decRef(); + } + } } } @@ -1231,46 +1138,24 @@ public void onFailedMerge(MergePolicy.MergeException e) { } private void failEngine(Throwable failure) { - synchronized (failedEngineMutex) { + if (failEngineLock.tryLock()) { + assert !readLock.assertLockIsHeld() : "readLock is held by a thread that tries to fail the engine"; if (failedEngine != null) { return; } - logger.warn("failed engine", failure); - failedEngine = failure; - for (FailedEngineListener listener : failedEngineListeners) { - listener.onFailedEngine(shardId, failure); - } - // TODO - should we acquire the writeLock here? - innerClose(); - } - } - - private void innerClose() { - if (!closed) { try { - closed = true; - indexSettingsService.removeListener(applySettings); - this.versionMap.clear(); - this.failedEngineListeners.clear(); - try { - IOUtils.close(searcherManager); - } catch (Throwable t) { - logger.warn("Failed to close SearcherManager", t); - } - // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed - if (indexWriter != null) { - try { - indexWriter.rollback(); - } catch (AlreadyClosedException e) { - // ignore - } + logger.warn("failed engine", failure); + failedEngine = failure; + for (FailedEngineListener listener : failedEngineListeners) { + listener.onFailedEngine(shardId, failure); } - } catch (Throwable e) { - logger.warn("failed to rollback writer on close", e); } finally { - indexWriter = null; - store.decRef(); + // close the engine whatever happens... + close(); } + + } else { + logger.debug("Tried to fail engine but could not acquire lock - engine should be failed by now", failure); } } @@ -1396,8 +1281,7 @@ public void onRefreshSettings(Settings settings) { !codecName.equals(InternalEngine.this.codecName) || failOnMergeFailure != InternalEngine.this.failOnMergeFailure || codecBloomLoad != codecService.isLoadBloomFilter()) { - rwl.readLock().lock(); - try { + try (InternalLock _ = readLock.acquire()) { if (indexConcurrency != InternalEngine.this.indexConcurrency) { logger.info("updating index.index_concurrency from [{}] to [{}]", InternalEngine.this.indexConcurrency, indexConcurrency); InternalEngine.this.indexConcurrency = indexConcurrency; @@ -1420,8 +1304,6 @@ public void onRefreshSettings(Settings settings) { // we need to flush in this case, to load/unload the bloom filters requiresFlushing = true; } - } finally { - rwl.readLock().unlock(); } if (requiresFlushing) { flush(new Flush().type(Flush.Type.NEW_WRITER)); @@ -1606,5 +1488,44 @@ public void close() throws ElasticsearchException { endRecovery(); } } + + private static final class InternalLock implements Releasable { + private final ThreadLocal lockIsHeld; + private final Lock lock; + + InternalLock(Lock lock) { + ThreadLocal tl = null; + assert (tl = new ThreadLocal<>()) != null; + lockIsHeld = tl; + this.lock = lock; + } + + @Override + public void close() { + lock.unlock(); + assert onAssertRelease(); + } + + InternalLock acquire() throws EngineException { + lock.lock(); + assert onAssertLock(); + return this; + } + + + protected boolean onAssertRelease() { + lockIsHeld.set(Boolean.FALSE); + return true; + } + protected boolean onAssertLock() { + lockIsHeld.remove(); + return true; + } + + boolean assertLockIsHeld() { + Boolean aBoolean = lockIsHeld.get(); + return aBoolean != null && aBoolean.booleanValue(); + } + } }