From b357de444e3eb9da3adcf03f30a183b8b681b3b1 Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Thu, 28 Sep 2023 11:11:08 +0100 Subject: [PATCH 1/2] A dropped IndexRequest should return an IndexResponse, not an UpdateResponse --- .../elasticsearch/action/bulk/TransportBulkAction.java | 4 ++-- .../org/elasticsearch/action/index/IndexResponse.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 13d10be86bd68..ebed3f9ac793f 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -26,13 +26,13 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.ingest.IngestActionForwarder; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; @@ -936,7 +936,7 @@ synchronized void markItemAsDropped(int slot) { BulkItemResponse.success( slot, indexRequest.opType(), - new UpdateResponse( + new IndexResponse( new ShardId(indexRequest.index(), IndexMetadata.INDEX_UUID_NA_VALUE, 0), id, SequenceNumbers.UNASSIGNED_SEQ_NO, diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexResponse.java b/server/src/main/java/org/elasticsearch/action/index/IndexResponse.java index fe631f53c975c..3d2bad64e114d 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexResponse.java @@ -39,12 +39,12 @@ public IndexResponse(ShardId shardId, String id, long seqNo, long primaryTerm, l this(shardId, id, seqNo, primaryTerm, version, created ? Result.CREATED : Result.UPDATED); } - private IndexResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, Result result) { - super(shardId, id, seqNo, primaryTerm, version, assertCreatedOrUpdated(result)); + public IndexResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, Result result) { + super(shardId, id, seqNo, primaryTerm, version, assertCreatedOrUpdatedOrNoop(result)); } - private static Result assertCreatedOrUpdated(Result result) { - assert result == Result.CREATED || result == Result.UPDATED; + private static Result assertCreatedOrUpdatedOrNoop(Result result) { + assert result == Result.CREATED || result == Result.UPDATED || result == Result.NOOP; return result; } From 1cab3bac21ca52352b222b39415f9992c1769f31 Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Thu, 28 Sep 2023 11:51:46 +0100 Subject: [PATCH 2/2] Need to set shard info to avoid an NPE --- .../action/bulk/TransportBulkAction.java | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index ebed3f9ac793f..d1d353e439ae9 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -932,20 +932,16 @@ synchronized void markItemAsDropped(int slot) { IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); failedSlots.set(slot); final String id = indexRequest.id() == null ? DROPPED_ITEM_WITH_AUTO_GENERATED_ID : indexRequest.id(); - itemResponses.add( - BulkItemResponse.success( - slot, - indexRequest.opType(), - new IndexResponse( - new ShardId(indexRequest.index(), IndexMetadata.INDEX_UUID_NA_VALUE, 0), - id, - SequenceNumbers.UNASSIGNED_SEQ_NO, - SequenceNumbers.UNASSIGNED_PRIMARY_TERM, - indexRequest.version(), - DocWriteResponse.Result.NOOP - ) - ) + IndexResponse ir = new IndexResponse( + new ShardId(indexRequest.index(), IndexMetadata.INDEX_UUID_NA_VALUE, 0), + id, + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + indexRequest.version(), + DocWriteResponse.Result.NOOP ); + ir.setShardInfo(new ReplicationResponse.ShardInfo(0, 0)); + itemResponses.add(BulkItemResponse.success(slot, indexRequest.opType(), ir)); } synchronized void markItemAsFailed(int slot, Exception e) {