From 2875dd4230e3e8e0ffa7aea98a0882066ff14df5 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 17 Apr 2014 11:12:36 +0200 Subject: [PATCH 1/2] Fail replica shards locally upon failures When a replication operation (index/delete/update) fails to be executed properly, we fail the replica and allow master to allocate a new copy of it. At the moment, the node hosting the primary shard is responsible of notifying the master of a failed replica. However, if the replica shard is initializing (`POST_RECOVERY` state), we have a racing condition between the failed shard message and moving the shard into the `STARTED` state. If the latter happen first, master will fail to resolve the fail shard message. This PR builds on #5800 and fails the engine of the replica shard if a replication operation fails. This protects us against the above as the shard will reject the `STARTED` command from master. It also makes us more resilient to other racing conditions in this area. --- ...nsportIndexReplicationOperationAction.java | 3 +- ...portIndicesReplicationOperationAction.java | 3 +- ...nsportShardReplicationOperationAction.java | 60 ++++++++------ .../elasticsearch/index/engine/Engine.java | 5 +- .../index/engine/internal/InternalEngine.java | 82 +++++++++++-------- .../index/shard/service/IndexShard.java | 2 + .../shard/service/InternalIndexShard.java | 6 ++ .../cluster/IndicesClusterStateService.java | 17 ++-- 8 files changed, 108 insertions(+), 70 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java index 6184592dcaa10..d11f47690066e 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java @@ -22,7 +22,6 @@ import com.google.common.collect.Lists; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.DefaultShardOperationFailedException; @@ -46,7 +45,7 @@ /** * */ -public abstract class TransportIndexReplicationOperationAction +public abstract class TransportIndexReplicationOperationAction extends TransportAction { protected final ClusterService clusterService; diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java index e4b5ce3d1dad0..4d38084b0857e 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java @@ -21,7 +21,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.cluster.ClusterService; @@ -42,7 +41,7 @@ /** */ public abstract class TransportIndicesReplicationOperationAction + ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse> extends TransportAction { protected final ClusterService clusterService; diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index caffb6bf9af8d..95d9592fc7e80 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -22,7 +22,10 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.*; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.UnavailableShardsException; +import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -44,6 +47,8 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; @@ -54,11 +59,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static org.elasticsearch.ExceptionsHelper.detailedMessage; - /** */ -public abstract class TransportShardReplicationOperationAction extends TransportAction { +public abstract class TransportShardReplicationOperationAction extends TransportAction { protected final TransportService transportService; protected final ClusterService clusterService; @@ -242,7 +245,12 @@ public boolean isForceExecution() { @Override public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception { - shardOperationOnReplica(request); + try { + shardOperationOnReplica(request); + } catch (Throwable t) { + failReplicaIfNeeded(request.request.index(), request.shardId, t); + throw t; + } channel.sendResponse(TransportResponse.Empty.INSTANCE); } } @@ -700,7 +708,7 @@ void performOnReplica(final PrimaryResponse response, final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId().id(), response.replicaRequest()); if (!nodeId.equals(clusterState.nodes().localNodeId())) { - DiscoveryNode node = clusterState.nodes().get(nodeId); + final DiscoveryNode node = clusterState.nodes().get(nodeId); transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleResponse(TransportResponse.Empty vResponse) { @@ -710,9 +718,9 @@ public void handleResponse(TransportResponse.Empty vResponse) { @Override public void handleException(TransportException exp) { if (!ignoreReplicaException(exp.unwrapCause())) { - logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), exp); + logger.warn("Failed to perform " + transportAction + " on remote replica " + node + shardIt.shardId(), exp); shardStateAction.shardFailed(shard, indexMetaData.getUUID(), - "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(exp) + "]"); + "Failed to perform [" + transportAction + "] on replica, message [" + ExceptionsHelper.detailedMessage(exp) + "]"); } finishIfPossible(); } @@ -733,11 +741,7 @@ public void run() { try { shardOperationOnReplica(shardRequest); } catch (Throwable e) { - if (!ignoreReplicaException(e)) { - logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e); - shardStateAction.shardFailed(shard, indexMetaData.getUUID(), - "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]"); - } + failReplicaIfNeeded(shard.index(), shard.id(), e); } if (counter.decrementAndGet() == 0) { listener.onResponse(response.response()); @@ -751,11 +755,7 @@ public boolean isForceExecution() { } }); } catch (Throwable e) { - if (!ignoreReplicaException(e)) { - logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e); - shardStateAction.shardFailed(shard, indexMetaData.getUUID(), - "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]"); - } + failReplicaIfNeeded(shard.index(), shard.id(), e); // we want to decrement the counter here, in teh failure handling, cause we got rejected // from executing on the thread pool if (counter.decrementAndGet() == 0) { @@ -766,11 +766,7 @@ public boolean isForceExecution() { try { shardOperationOnReplica(shardRequest); } catch (Throwable e) { - if (!ignoreReplicaException(e)) { - logger.warn("Failed to perform " + transportAction + " on replica" + shardIt.shardId(), e); - shardStateAction.shardFailed(shard, indexMetaData.getUUID(), - "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]"); - } + failReplicaIfNeeded(shard.index(), shard.id(), e); } if (counter.decrementAndGet() == 0) { listener.onResponse(response.response()); @@ -778,6 +774,24 @@ public boolean isForceExecution() { } } } + + } + + private void failReplicaIfNeeded(String index, int shardId, Throwable t) { + if (!ignoreReplicaException(t)) { + logger.warn("Failed to perform " + transportAction + " on replica [" + index + "][" + shardId + "]. failing shard.", t); + IndexService indexService = indicesService.indexService(index); + if (indexService == null) { + logger.debug("ignoring failed replica [{}][{}] because index was already removed.", index, shardId); + return; + } + IndexShard indexShard = indexService.shard(shardId); + if (indexShard == null) { + logger.debug("ignoring failed replica [{}][{}] because index was already removed.", index, shardId); + return; + } + indexShard.failShard(transportAction + " failed", t); + } } public static class PrimaryResponse { diff --git a/src/main/java/org/elasticsearch/index/engine/Engine.java b/src/main/java/org/elasticsearch/index/engine/Engine.java index 3b4c0422271fe..ea88130a699fb 100644 --- a/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -134,8 +134,11 @@ public interface Engine extends IndexShardComponent, CloseableComponent { void recover(RecoveryHandler recoveryHandler) throws EngineException; + /** fail engine due to some error. the engine will also be closed. */ + void failEngine(String reason, @Nullable Throwable failure); + static interface FailedEngineListener { - void onFailedEngine(ShardId shardId, Throwable t); + void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t); } /** diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index 2ca3e5d22711a..e803081c41032 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -209,7 +209,7 @@ public InternalEngine(ShardId shardId, @IndexSettings Settings indexSettings, Th @Override public void updateIndexingBufferSize(ByteSizeValue indexingBufferSize) { ByteSizeValue preValue = this.indexingBufferSize; - try (InternalLock _ = readLock.acquire()) { + try (InternalLock _ = readLock.acquire()) { this.indexingBufferSize = indexingBufferSize; IndexWriter indexWriter = this.indexWriter; if (indexWriter != null) { @@ -309,7 +309,7 @@ public void enableGcDeletes(boolean enableGcDeletes) { } public GetResult get(Get get) throws EngineException { - try (InternalLock _ = readLock.acquire()) { + try (InternalLock _ = readLock.acquire()) { if (get.realtime()) { VersionValue versionValue = versionMap.get(versionKey(get.uid())); if (versionValue != null) { @@ -368,7 +368,7 @@ public GetResult get(Get get) throws EngineException { @Override public void create(Create create) throws EngineException { - try (InternalLock _ = readLock.acquire()) { + try (InternalLock _ = readLock.acquire()) { IndexWriter writer = this.indexWriter; if (writer == null) { throw new EngineClosedException(shardId, failedEngine); @@ -378,14 +378,14 @@ public void create(Create create) throws EngineException { possibleMergeNeeded = true; flushNeeded = true; } catch (OutOfMemoryError | IllegalStateException | IOException t) { - maybeFailEngine(t); - throw new CreateFailedEngineException(shardId, create, t); + maybeFailEngine(t); + throw new CreateFailedEngineException(shardId, create, t); } } private void maybeFailEngine(Throwable t) { if (t instanceof OutOfMemoryError || (t instanceof IllegalStateException && t.getMessage().contains("OutOfMemoryError"))) { - failEngine(t); + failEngine("out of memory", t); } } @@ -451,7 +451,7 @@ private void innerCreate(Create create, IndexWriter writer) throws IOException { @Override public void index(Index index) throws EngineException { - try (InternalLock _ = readLock.acquire()) { + try (InternalLock _ = readLock.acquire()) { IndexWriter writer = this.indexWriter; if (writer == null) { throw new EngineClosedException(shardId, failedEngine); @@ -523,7 +523,7 @@ private void innerIndex(Index index, IndexWriter writer) throws IOException { @Override public void delete(Delete delete) throws EngineException { - try (InternalLock _ = readLock.acquire()) { + try (InternalLock _ = readLock.acquire()) { IndexWriter writer = this.indexWriter; if (writer == null) { throw new EngineClosedException(shardId, failedEngine); @@ -587,7 +587,7 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException { @Override public void delete(DeleteByQuery delete) throws EngineException { - try (InternalLock _ = readLock.acquire()) { + try (InternalLock _ = readLock.acquire()) { IndexWriter writer = this.indexWriter; if (writer == null) { throw new EngineClosedException(shardId); @@ -645,8 +645,8 @@ public boolean refreshNeeded() { return dirty || !searcherManager.isSearcherCurrent(); } catch (IOException e) { logger.error("failed to access searcher manager", e); - failEngine(e); - throw new EngineException(shardId, "failed to access searcher manager",e); + failEngine("failed to access searcher manager", e); + throw new EngineException(shardId, "failed to access searcher manager", e); } } @@ -668,7 +668,7 @@ public void refresh(Refresh refresh) throws EngineException { } // we obtain a read lock here, since we don't want a flush to happen while we are refreshing // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) - try (InternalLock _ = readLock.acquire()) { + try (InternalLock _ = readLock.acquire()) { ensureOpen(); // maybeRefresh will only allow one refresh to execute, and the rest will "pass through", // but, we want to make sure not to loose ant refresh calls, if one is taking time @@ -687,7 +687,7 @@ public void refresh(Refresh refresh) throws EngineException { } catch (EngineClosedException e) { throw e; } catch (Throwable t) { - failEngine(t); + failEngine("refresh failed", t); throw new RefreshFailedEngineException(shardId, t); } } @@ -746,7 +746,7 @@ public void flush(Flush flush) throws EngineException { } } } else if (flush.type() == Flush.Type.COMMIT_TRANSLOG) { - try (InternalLock _ = readLock.acquire()) { + try (InternalLock _ = readLock.acquire()) { final IndexWriter indexWriter = currentIndexWriter(); if (onGoingRecoveries.get() > 0) { throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed"); @@ -774,7 +774,7 @@ public void flush(Flush flush) throws EngineException { // note, its ok to just commit without cleaning the translog, its perfectly fine to replay a // translog on an index that was opened on a committed point in time that is "in the future" // of that translog - try (InternalLock _ = readLock.acquire()) { + try (InternalLock _ = readLock.acquire()) { final IndexWriter indexWriter = currentIndexWriter(); // we allow to *just* commit if there is an ongoing recovery happening... // its ok to use this, only a flush will cause a new translogId, and we are locked here from @@ -792,7 +792,7 @@ public void flush(Flush flush) throws EngineException { } // reread the last committed segment infos - try (InternalLock _ = readLock.acquire()) { + try (InternalLock _ = readLock.acquire()) { ensureOpen(); readLastCommittedSegmentsInfo(); } catch (Throwable e) { @@ -801,14 +801,15 @@ public void flush(Flush flush) throws EngineException { } } - } catch (FlushFailedEngineException ex){ - maybeFailEngine(ex.getCause()); - throw ex; + } catch (FlushFailedEngineException ex) { + maybeFailEngine(ex.getCause()); + throw ex; } finally { flushLock.unlock(); flushing.decrementAndGet(); } } + private void ensureOpen() { if (indexWriter == null) { throw new EngineClosedException(shardId, failedEngine); @@ -817,6 +818,7 @@ private void ensureOpen() { /** * Returns the current index writer. This method will never return null + * * @throws EngineClosedException if the engine is already closed */ private IndexWriter currentIndexWriter() { @@ -857,7 +859,7 @@ public void maybeMerge() throws EngineException { return; } possibleMergeNeeded = false; - try (InternalLock _ = readLock.acquire()) { + try (InternalLock _ = readLock.acquire()) { Merges.maybeMerge(currentIndexWriter()); } catch (Throwable t) { maybeFailEngine(t); @@ -872,7 +874,7 @@ public void optimize(Optimize optimize) throws EngineException { } if (optimizeMutex.compareAndSet(false, true)) { ElasticsearchMergePolicy elasticsearchMergePolicy = null; - try (InternalLock _ = readLock.acquire()) { + try (InternalLock _ = readLock.acquire()) { final IndexWriter writer = currentIndexWriter(); if (writer.getConfig().getMergePolicy() instanceof ElasticsearchMergePolicy) { @@ -923,7 +925,7 @@ public void optimize(Optimize optimize) throws EngineException { @Override public SnapshotIndexCommit snapshotIndex() throws EngineException { - try (InternalLock _ = readLock.acquire()) { + try (InternalLock _ = readLock.acquire()) { flush(new Flush().type(Flush.Type.COMMIT).waitIfOngoing(true)); ensureOpen(); return deletionPolicy.snapshot(); @@ -1002,7 +1004,7 @@ private static long getReaderRamBytesUsed(AtomicReaderContext reader) { @Override public SegmentsStats segmentsStats() { - try (InternalLock _ = readLock.acquire()) { + try (InternalLock _ = readLock.acquire()) { ensureOpen(); Searcher searcher = acquireSearcher("segments_stats"); try { @@ -1019,7 +1021,7 @@ public SegmentsStats segmentsStats() { @Override public List segments() { - try (InternalLock _ = readLock.acquire()) { + try (InternalLock _ = readLock.acquire()) { ensureOpen(); Map segments = new HashMap<>(); @@ -1133,21 +1135,27 @@ public void close() throws ElasticsearchException { class FailEngineOnMergeFailure implements MergeSchedulerProvider.FailureListener { @Override public void onFailedMerge(MergePolicy.MergeException e) { - failEngine(e); + failEngine("merge exception", e); } } - private void failEngine(Throwable failure) { + @Override + public void failEngine(String reason, @Nullable Throwable failure) { if (failEngineLock.tryLock()) { assert !readLock.assertLockIsHeld() : "readLock is held by a thread that tries to fail the engine"; if (failedEngine != null) { return; } try { - logger.warn("failed engine", failure); - failedEngine = failure; + logger.warn("failed engine [{}]", reason, failure); + // we must set a failure exception, generate one if not supplied + if (failure == null) { + failedEngine = new EngineException(shardId(), reason); + } else { + failedEngine = failure; + } for (FailedEngineListener listener : failedEngineListeners) { - listener.onFailedEngine(shardId, failure); + listener.onFailedEngine(shardId, reason, failure); } } finally { // close the engine whatever happens... @@ -1155,7 +1163,7 @@ private void failEngine(Throwable failure) { } } else { - logger.debug("Tried to fail engine but could not acquire lock - engine should be failed by now", failure); + logger.debug("Tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure); } } @@ -1229,7 +1237,9 @@ public void warm(AtomicReader reader) throws IOException { assert isMergedSegment(reader); final Engine.Searcher searcher = new SimpleSearcher("warmer", new IndexSearcher(reader)); final IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, searcher); - if (warmer != null) warmer.warm(context); + if (warmer != null) { + warmer.warm(context); + } } catch (Throwable t) { // Don't fail a merge if the warm-up failed if (!closed) { @@ -1281,7 +1291,7 @@ public void onRefreshSettings(Settings settings) { !codecName.equals(InternalEngine.this.codecName) || failOnMergeFailure != InternalEngine.this.failOnMergeFailure || codecBloomLoad != codecService.isLoadBloomFilter()) { - try (InternalLock _ = readLock.acquire()) { + try (InternalLock _ = readLock.acquire()) { if (indexConcurrency != InternalEngine.this.indexConcurrency) { logger.info("updating index.index_concurrency from [{}] to [{}]", InternalEngine.this.indexConcurrency, indexConcurrency); InternalEngine.this.indexConcurrency = indexConcurrency; @@ -1488,11 +1498,11 @@ public void close() throws ElasticsearchException { endRecovery(); } } - + private static final class InternalLock implements Releasable { private final ThreadLocal lockIsHeld; private final Lock lock; - + InternalLock(Lock lock) { ThreadLocal tl = null; assert (tl = new ThreadLocal<>()) != null; @@ -1511,8 +1521,8 @@ InternalLock acquire() throws EngineException { assert onAssertLock(); return this; } - - + + protected boolean onAssertRelease() { lockIsHeld.set(Boolean.FALSE); return true; diff --git a/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java index 7da4ca35bbb9d..edc35d2a4f646 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java @@ -156,6 +156,8 @@ public interface IndexShard extends IndexShardComponent { void recover(Engine.RecoveryHandler recoveryHandler) throws EngineException; + void failShard(String reason, @Nullable Throwable e); + Engine.Searcher acquireSearcher(String source); Engine.Searcher acquireSearcher(String source, Mode mode); diff --git a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 2aa1684588fe4..03ba4b9a49f85 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -630,6 +630,12 @@ public void recover(Engine.RecoveryHandler recoveryHandler) throws EngineExcepti engine.recover(recoveryHandler); } + @Override + public void failShard(String reason, @Nullable Throwable e) { + // fail the engine. This will cause this shard to also be removed from the node's index service. + engine.failEngine(reason, e); + } + @Override public Engine.Searcher acquireSearcher(String source) { return acquireSearcher(source, Mode.READ); diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index c7c140f77f4b0..13a89c10273fa 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -37,6 +37,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.CompressedString; @@ -799,7 +800,7 @@ private void removeIndex(String index, String reason) { private class FailedEngineHandler implements Engine.FailedEngineListener { @Override - public void onFailedEngine(final ShardId shardId, final Throwable failure) { + public void onFailedEngine(final ShardId shardId, final String reason, final @Nullable Throwable failure) { ShardRouting shardRouting = null; final IndexService indexService = indicesService.indexService(shardId.index().name()); if (indexService != null) { @@ -809,29 +810,33 @@ public void onFailedEngine(final ShardId shardId, final Throwable failure) { } } if (shardRouting == null) { - logger.warn("[{}][{}] engine failed, but can't find index shard", shardId.index().name(), shardId.id()); + logger.warn("[{}][{}] engine failed, but can't find index shard. failure reason: [{}]", + shardId.index().name(), shardId.id(), reason); return; } final ShardRouting fShardRouting = shardRouting; final String indexUUID = indexService.indexUUID(); // we know indexService is not null here. + final String failureMessage = "engine failure, message [" + reason + "]" + + (failure == null ? "" : "[" + detailedMessage(failure) + "]"); threadPool.generic().execute(new Runnable() { @Override public void run() { synchronized (mutex) { if (indexService.hasShard(shardId.id())) { try { - indexService.removeShard(shardId.id(), "engine failure [" + ExceptionsHelper.detailedMessage(failure) + "]"); + + indexService.removeShard(shardId.id(), failureMessage); } catch (IndexShardMissingException e) { // the node got closed on us, ignore it } catch (Throwable e1) { - logger.warn("[{}][{}] failed to delete shard after failed engine", e1, indexService.index().name(), shardId.id()); + logger.warn("[{}][{}] failed to delete shard after failed engine ([{}])", e1, indexService.index().name(), shardId.id(), reason); } } try { failedShards.put(fShardRouting.shardId(), new FailedShard(fShardRouting.version())); - shardStateAction.shardFailed(fShardRouting, indexUUID, "engine failure, message [" + detailedMessage(failure) + "]"); + shardStateAction.shardFailed(fShardRouting, indexUUID, failureMessage); } catch (Throwable e1) { - logger.warn("[{}][{}] failed to mark shard as failed after a failed engine", e1, indexService.index().name(), shardId.id()); + logger.warn("[{}][{}] failed to mark shard as failed after a failed engine ([{}])", e1, indexService.index().name(), shardId.id(), reason); } } } From 40ff44083c509cbdde3d7ebb7e97d723aec70707 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 18 Apr 2014 09:18:56 +0200 Subject: [PATCH 2/2] removed duplicate logging message and made sure the information that it contained is passed on --- .../replication/TransportShardReplicationOperationAction.java | 3 +-- .../elasticsearch/index/engine/internal/InternalEngine.java | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index 95d9592fc7e80..8f337e0339e0d 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -779,7 +779,6 @@ public boolean isForceExecution() { private void failReplicaIfNeeded(String index, int shardId, Throwable t) { if (!ignoreReplicaException(t)) { - logger.warn("Failed to perform " + transportAction + " on replica [" + index + "][" + shardId + "]. failing shard.", t); IndexService indexService = indicesService.indexService(index); if (indexService == null) { logger.debug("ignoring failed replica [{}][{}] because index was already removed.", index, shardId); @@ -790,7 +789,7 @@ private void failReplicaIfNeeded(String index, int shardId, Throwable t) { logger.debug("ignoring failed replica [{}][{}] because index was already removed.", index, shardId); return; } - indexShard.failShard(transportAction + " failed", t); + indexShard.failShard(transportAction + " failed on replica", t); } } diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index e803081c41032..07932be353146 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -1144,6 +1144,7 @@ public void failEngine(String reason, @Nullable Throwable failure) { if (failEngineLock.tryLock()) { assert !readLock.assertLockIsHeld() : "readLock is held by a thread that tries to fail the engine"; if (failedEngine != null) { + logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure); return; } try { @@ -1163,7 +1164,7 @@ public void failEngine(String reason, @Nullable Throwable failure) { } } else { - logger.debug("Tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure); + logger.debug("tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure); } }