Skip to content

Commit 9dd12bb

Browse files
authored
Simplify write failure handling (backport of #19105) (#22778)
* Simplify write failure handling (backport of #19105) Currently, any write (e.g. `index`, `delete`) operation failure can be categorized as: - request failure (e.g. analysis, parsing error, version conflict) - transient operation failure (e.g. due to shard initializing, relocation) - environment failure (e.g. out of disk, corruption, lucene tragic event) The main motivation of the PR is to handle these failure types appropriately for a write request. Each failure type needs to be handled differently: - request failure (being request specific) should be replicated and then failed - transient failure should be retried (eventually succeeding) - environment failure (persistent primary shard failure) should fail the request immediately. Currently, transient operation failures are retried in replication action but no distinction is made between request and environment failures, both fails write request immediately. In this PR, we distinguish between request and environment failures for a write operation. In case of environment failures, the exception is bubbled up failing the request and in case of request failures, the exception is captured and replication continues (we ignore performing on replicas when such failures occur in primary). Transient operation failures are bubbled up to be retried by the replication operation, as before. * incorporate feedback
1 parent 07de55d commit 9dd12bb

27 files changed

+1132
-892
lines changed

core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 221 additions & 145 deletions
Large diffs are not rendered by default.

core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.elasticsearch.index.engine.Engine;
4141
import org.elasticsearch.index.shard.IndexShard;
4242
import org.elasticsearch.index.shard.ShardId;
43-
import org.elasticsearch.index.translog.Translog.Location;
4443
import org.elasticsearch.indices.IndicesService;
4544
import org.elasticsearch.tasks.Task;
4645
import org.elasticsearch.threadpool.ThreadPool;
@@ -49,7 +48,7 @@
4948
/**
5049
* Performs the delete operation.
5150
*/
52-
public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, DeleteResponse> {
51+
public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, DeleteRequest,DeleteResponse> {
5352

5453
private final AutoCreateIndex autoCreateIndex;
5554
private final TransportCreateIndexAction createIndexAction;
@@ -61,7 +60,7 @@ public TransportDeleteAction(Settings settings, TransportService transportServic
6160
IndexNameExpressionResolver indexNameExpressionResolver,
6261
AutoCreateIndex autoCreateIndex) {
6362
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
64-
indexNameExpressionResolver, DeleteRequest::new, ThreadPool.Names.INDEX);
63+
indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.INDEX);
6564
this.createIndexAction = createIndexAction;
6665
this.autoCreateIndex = autoCreateIndex;
6766
}
@@ -70,7 +69,11 @@ public TransportDeleteAction(Settings settings, TransportService transportServic
7069
protected void doExecute(Task task, final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
7170
ClusterState state = clusterService.state();
7271
if (autoCreateIndex.shouldAutoCreate(request.index(), state)) {
73-
createIndexAction.execute(task, new CreateIndexRequest().index(request.index()).cause("auto(delete api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
72+
CreateIndexRequest createIndexRequest = new CreateIndexRequest()
73+
.index(request.index())
74+
.cause("auto(delete api)")
75+
.masterNodeTimeout(request.timeout());
76+
createIndexAction.execute(task, createIndexRequest, new ActionListener<CreateIndexResponse>() {
7477
@Override
7578
public void onResponse(CreateIndexResponse result) {
7679
innerExecute(task, request, listener);
@@ -119,30 +122,33 @@ protected DeleteResponse newResponseInstance() {
119122
}
120123

121124
@Override
122-
protected WriteResult<DeleteResponse> onPrimaryShard(DeleteRequest request, IndexShard indexShard) {
123-
return executeDeleteRequestOnPrimary(request, indexShard);
125+
protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, IndexShard primary) throws Exception {
126+
final Engine.DeleteResult result = executeDeleteRequestOnPrimary(request, primary);
127+
final DeleteResponse response = result.hasFailure() ? null :
128+
new DeleteResponse(primary.shardId(), request.type(), request.id(), result.getVersion(), result.isFound());
129+
return new WritePrimaryResult(request, response, result.getTranslogLocation(), result.getFailure(), primary);
124130
}
125131

126132
@Override
127-
protected Location onReplicaShard(DeleteRequest request, IndexShard indexShard) {
128-
return executeDeleteRequestOnReplica(request, indexShard).getTranslogLocation();
133+
protected WriteReplicaResult shardOperationOnReplica(DeleteRequest request, IndexShard replica) throws Exception {
134+
final Engine.DeleteResult result = executeDeleteRequestOnReplica(request, replica);
135+
return new WriteReplicaResult(request, result.getTranslogLocation(), result.getFailure(), replica);
129136
}
130137

131-
public static WriteResult<DeleteResponse> executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard indexShard) {
132-
Engine.Delete delete = indexShard.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
133-
indexShard.delete(delete);
134-
// update the request with the version so it will go to the replicas
135-
request.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
136-
request.version(delete.version());
137-
138-
assert request.versionType().validateVersionForWrites(request.version());
139-
DeleteResponse response = new DeleteResponse(indexShard.shardId(), request.type(), request.id(), delete.version(), delete.found());
140-
return new WriteResult<>(response, delete.getTranslogLocation());
138+
public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) {
139+
Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
140+
Engine.DeleteResult result = primary.delete(delete);
141+
if (result.hasFailure() == false) {
142+
// update the request with the version so it will go to the replicas
143+
request.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
144+
request.version(result.getVersion());
145+
assert request.versionType().validateVersionForWrites(request.version());
146+
}
147+
return result;
141148
}
142149

143-
public static Engine.Delete executeDeleteRequestOnReplica(DeleteRequest request, IndexShard indexShard) {
144-
Engine.Delete delete = indexShard.prepareDeleteOnReplica(request.type(), request.id(), request.version(), request.versionType());
145-
indexShard.delete(delete);
146-
return delete;
150+
public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
151+
Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(), request.version(), request.versionType());
152+
return replica.delete(delete);
147153
}
148154
}

core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

Lines changed: 53 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@
4444
import org.elasticsearch.common.inject.Inject;
4545
import org.elasticsearch.common.settings.Settings;
4646
import org.elasticsearch.index.engine.Engine;
47+
import org.elasticsearch.index.mapper.MapperParsingException;
4748
import org.elasticsearch.index.mapper.Mapping;
4849
import org.elasticsearch.index.mapper.SourceToParse;
4950
import org.elasticsearch.index.shard.IndexShard;
5051
import org.elasticsearch.index.shard.ShardId;
51-
import org.elasticsearch.index.translog.Translog.Location;
5252
import org.elasticsearch.indices.IndicesService;
5353
import org.elasticsearch.ingest.IngestService;
5454
import org.elasticsearch.tasks.Task;
@@ -65,7 +65,7 @@
6565
* <li><b>allowIdGeneration</b>: If the id is set not, should it be generated. Defaults to <tt>true</tt>.
6666
* </ul>
6767
*/
68-
public class TransportIndexAction extends TransportWriteAction<IndexRequest, IndexResponse> {
68+
public class TransportIndexAction extends TransportWriteAction<IndexRequest, IndexRequest, IndexResponse> {
6969

7070
private final AutoCreateIndex autoCreateIndex;
7171
private final boolean allowIdGeneration;
@@ -83,7 +83,7 @@ public TransportIndexAction(Settings settings, TransportService transportService
8383
MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters,
8484
IndexNameExpressionResolver indexNameExpressionResolver, AutoCreateIndex autoCreateIndex) {
8585
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
86-
actionFilters, indexNameExpressionResolver, IndexRequest::new, ThreadPool.Names.INDEX);
86+
actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX);
8787
this.mappingUpdatedAction = mappingUpdatedAction;
8888
this.createIndexAction = createIndexAction;
8989
this.autoCreateIndex = autoCreateIndex;
@@ -162,65 +162,88 @@ protected IndexResponse newResponseInstance() {
162162
}
163163

164164
@Override
165-
protected WriteResult<IndexResponse> onPrimaryShard(IndexRequest request, IndexShard indexShard) throws Exception {
166-
return executeIndexRequestOnPrimary(request, indexShard, mappingUpdatedAction);
165+
protected WritePrimaryResult shardOperationOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
166+
final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary, mappingUpdatedAction);
167+
final IndexResponse response = indexResult.hasFailure() ? null :
168+
new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getVersion(),
169+
indexResult.isCreated());
170+
return new WritePrimaryResult(request, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary);
167171
}
168172

169173
@Override
170-
protected Location onReplicaShard(IndexRequest request, IndexShard indexShard) {
171-
return executeIndexRequestOnReplica(request, indexShard).getTranslogLocation();
174+
protected WriteReplicaResult shardOperationOnReplica(IndexRequest request, IndexShard replica) throws Exception {
175+
final Engine.IndexResult indexResult = executeIndexRequestOnReplica(request, replica);
176+
return new WriteReplicaResult(request, indexResult.getTranslogLocation(), indexResult.getFailure(), replica);
172177
}
173178

174179
/**
175180
* Execute the given {@link IndexRequest} on a replica shard, throwing a
176181
* {@link RetryOnReplicaException} if the operation needs to be re-tried.
177182
*/
178-
public static Engine.Index executeIndexRequestOnReplica(IndexRequest request, IndexShard indexShard) {
179-
final ShardId shardId = indexShard.shardId();
183+
public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) {
184+
final ShardId shardId = replica.shardId();
180185
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source())
181186
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
182187

183-
final Engine.Index operation = indexShard.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
188+
final Engine.Index operation;
189+
try {
190+
operation = replica.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
191+
} catch (MapperParsingException e) {
192+
return new Engine.IndexResult(e, request.version());
193+
}
184194
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
185195
if (update != null) {
186196
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
187197
}
188-
indexShard.index(operation);
189-
return operation;
198+
return replica.index(operation);
190199
}
191200

192201
/** Utility method to prepare an index operation on primary shards */
193-
public static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard indexShard) {
202+
static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) {
194203
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), request.id(), request.source())
195204
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
196-
return indexShard.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
205+
return primary.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
197206
}
198207

199-
public static WriteResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard,
208+
public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
200209
MappingUpdatedAction mappingUpdatedAction) throws Exception {
201-
Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard);
210+
Engine.Index operation;
211+
try {
212+
operation = prepareIndexOperationOnPrimary(request, primary);
213+
} catch (MapperParsingException | IllegalArgumentException e) {
214+
return new Engine.IndexResult(e, request.version());
215+
}
202216
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
203-
final ShardId shardId = indexShard.shardId();
217+
final ShardId shardId = primary.shardId();
204218
if (update != null) {
205-
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
206-
operation = prepareIndexOperationOnPrimary(request, indexShard);
219+
// can throw timeout exception when updating mappings or ISE for attempting to update default mappings
220+
// which are bubbled up
221+
try {
222+
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
223+
} catch (IllegalArgumentException e) {
224+
// throws IAE on conflicts merging dynamic mappings
225+
return new Engine.IndexResult(e, request.version());
226+
}
227+
try {
228+
operation = prepareIndexOperationOnPrimary(request, primary);
229+
} catch (MapperParsingException | IllegalArgumentException e) {
230+
return new Engine.IndexResult(e, request.version());
231+
}
207232
update = operation.parsedDoc().dynamicMappingsUpdate();
208233
if (update != null) {
209234
throw new ReplicationOperation.RetryOnPrimaryException(shardId,
210235
"Dynamic mappings are not available on the node that holds the primary yet");
211236
}
212237
}
213-
indexShard.index(operation);
214-
215-
// update the version on request so it will happen on the replicas
216-
final long version = operation.version();
217-
request.version(version);
218-
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
219-
220-
assert request.versionType().validateVersionForWrites(request.version());
221-
222-
IndexResponse response = new IndexResponse(shardId, request.type(), request.id(), request.version(), operation.isCreated());
223-
return new WriteResult<>(response, operation.getTranslogLocation());
238+
Engine.IndexResult result = primary.index(operation);
239+
if (result.hasFailure() == false) {
240+
// update the version on request so it will happen on the replicas
241+
final long version = result.getVersion();
242+
request.version(version);
243+
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
244+
assert request.versionType().validateVersionForWrites(request.version());
245+
}
246+
return result;
224247
}
225248

226249
private void processIngestIndexRequest(Task task, IndexRequest indexRequest, ActionListener listener) {

core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.cluster.routing.IndexRoutingTable;
3333
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
3434
import org.elasticsearch.cluster.routing.ShardRouting;
35+
import org.elasticsearch.common.Nullable;
3536
import org.elasticsearch.common.io.stream.StreamInput;
3637
import org.elasticsearch.common.util.set.Sets;
3738
import org.elasticsearch.index.engine.VersionConflictEngineException;
@@ -112,21 +113,23 @@ public void execute() throws Exception {
112113
pendingActions.incrementAndGet();
113114
primaryResult = primary.perform(request);
114115
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
115-
assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term";
116-
if (logger.isTraceEnabled()) {
117-
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
118-
}
116+
if (replicaRequest != null) {
117+
assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term";
118+
if (logger.isTraceEnabled()) {
119+
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
120+
}
119121

120-
// we have to get a new state after successfully indexing into the primary in order to honour recovery semantics.
121-
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
122-
// to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.
123-
ClusterState clusterState = clusterStateSupplier.get();
124-
final List<ShardRouting> shards = getShards(primaryId, clusterState);
125-
Set<String> inSyncAllocationIds = getInSyncAllocationIds(primaryId, clusterState);
122+
// we have to get a new state after successfully indexing into the primary in order to honour recovery semantics.
123+
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
124+
// to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.
125+
ClusterState clusterState = clusterStateSupplier.get();
126+
final List<ShardRouting> shards = getShards(primaryId, clusterState);
127+
Set<String> inSyncAllocationIds = getInSyncAllocationIds(primaryId, clusterState);
126128

127-
markUnavailableShardsAsStale(replicaRequest, inSyncAllocationIds, shards);
129+
markUnavailableShardsAsStale(replicaRequest, inSyncAllocationIds, shards);
128130

129-
performOnReplicas(replicaRequest, shards);
131+
performOnReplicas(replicaRequest, shards);
132+
}
130133

131134
successfulShards.incrementAndGet();
132135
decPendingAndFinishIfNeeded();
@@ -419,7 +422,11 @@ public RetryOnPrimaryException(StreamInput in) throws IOException {
419422

420423
public interface PrimaryResult<R extends ReplicationRequest<R>> {
421424

422-
R replicaRequest();
425+
/**
426+
* @return null if no operation needs to be sent to a replica
427+
* (for example when the operation failed on the primary due to a parsing exception)
428+
*/
429+
@Nullable R replicaRequest();
423430

424431
void setShardInfo(ReplicationResponse.ShardInfo shardInfo);
425432
}

0 commit comments

Comments
 (0)