Skip to content

Commit 2ff19bc

Browse files
authored
Use Writeable for TransportReplAction derivatives (#40905)
Relates #34389, backport of #40894.
1 parent a8dbb07 commit 2ff19bc

32 files changed

+297
-288
lines changed

server/src/main/java/org/elasticsearch/action/DocWriteRequest.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -222,13 +222,9 @@ static DocWriteRequest<?> readDocumentRequest(StreamInput in) throws IOException
222222
byte type = in.readByte();
223223
DocWriteRequest<?> docWriteRequest;
224224
if (type == 0) {
225-
IndexRequest indexRequest = new IndexRequest();
226-
indexRequest.readFrom(in);
227-
docWriteRequest = indexRequest;
225+
docWriteRequest = new IndexRequest(in);
228226
} else if (type == 1) {
229-
DeleteRequest deleteRequest = new DeleteRequest();
230-
deleteRequest.readFrom(in);
231-
docWriteRequest = deleteRequest;
227+
docWriteRequest = new DeleteRequest(in);
232228
} else if (type == 2) {
233229
UpdateRequest updateRequest = new UpdateRequest();
234230
updateRequest.readFrom(in);

server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,11 @@ public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String all
136136

137137
public static class ShardRequest extends ReplicationRequest<ShardRequest> {
138138

139-
private ClusterBlock clusterBlock;
139+
private final ClusterBlock clusterBlock;
140140

141-
ShardRequest(){
141+
ShardRequest(StreamInput in) throws IOException {
142+
super(in);
143+
clusterBlock = new ClusterBlock(in);
142144
}
143145

144146
public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) {
@@ -153,9 +155,8 @@ public String toString() {
153155
}
154156

155157
@Override
156-
public void readFrom(final StreamInput in) throws IOException {
157-
super.readFrom(in);
158-
clusterBlock = new ClusterBlock(in);
158+
public void readFrom(final StreamInput in) {
159+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
159160
}
160161

161162
@Override

server/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ public FlushRequest(String... indices) {
5252
super(indices);
5353
}
5454

55+
public FlushRequest(StreamInput in) throws IOException {
56+
super(in);
57+
force = in.readBoolean();
58+
waitIfOngoing = in.readBoolean();
59+
}
60+
5561
/**
5662
* Returns {@code true} iff a flush should block
5763
* if a another flush operation is already running. Otherwise {@code false}
@@ -103,9 +109,7 @@ public void writeTo(StreamOutput out) throws IOException {
103109

104110
@Override
105111
public void readFrom(StreamInput in) throws IOException {
106-
super.readFrom(in);
107-
force = in.readBoolean();
108-
waitIfOngoing = in.readBoolean();
112+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
109113
}
110114

111115
@Override

server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,17 @@
2929

3030
public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {
3131

32-
private FlushRequest request = new FlushRequest();
32+
private final FlushRequest request;
3333

3434
public ShardFlushRequest(FlushRequest request, ShardId shardId) {
3535
super(shardId);
3636
this.request = request;
3737
this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default
3838
}
3939

40-
public ShardFlushRequest() {
40+
public ShardFlushRequest(StreamInput in) throws IOException {
41+
super(in);
42+
request = new FlushRequest(in);
4143
}
4244

4345
FlushRequest getRequest() {
@@ -46,8 +48,7 @@ FlushRequest getRequest() {
4648

4749
@Override
4850
public void readFrom(StreamInput in) throws IOException {
49-
super.readFrom(in);
50-
request.readFrom(in);
51+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
5152
}
5253

5354
@Override

server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ protected PrimaryResult<ShardFlushRequest, ReplicationResponse> shardOperationOn
5555
IndexShard primary) {
5656
primary.flush(shardRequest.getRequest());
5757
logger.trace("{} flush request executed on primary", primary.shardId());
58-
return new PrimaryResult<ShardFlushRequest, ReplicationResponse>(shardRequest, new ReplicationResponse());
58+
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
5959
}
6060

6161
@Override

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
package org.elasticsearch.action.admin.indices.refresh;
2121

2222
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
23+
import org.elasticsearch.common.io.stream.StreamInput;
24+
25+
import java.io.IOException;
2326

2427
/**
2528
* A refresh request making all operations performed since the last refresh available for search. The (near) real-time
@@ -35,4 +38,8 @@ public class RefreshRequest extends BroadcastRequest<RefreshRequest> {
3538
public RefreshRequest(String... indices) {
3639
super(indices);
3740
}
41+
42+
public RefreshRequest(StreamInput in) throws IOException {
43+
super(in);
44+
}
3845
}

server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,14 @@ public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> {
3333

3434
private BulkItemRequest[] items;
3535

36-
public BulkShardRequest() {
36+
public BulkShardRequest(StreamInput in) throws IOException {
37+
super(in);
38+
items = new BulkItemRequest[in.readVInt()];
39+
for (int i = 0; i < items.length; i++) {
40+
if (in.readBoolean()) {
41+
items[i] = BulkItemRequest.readBulkItem(in);
42+
}
43+
}
3744
}
3845

3946
public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
@@ -60,7 +67,7 @@ public String[] indices() {
6067
indices.add(item.index());
6168
}
6269
}
63-
return indices.toArray(new String[indices.size()]);
70+
return indices.toArray(new String[0]);
6471
}
6572

6673
@Override
@@ -78,14 +85,8 @@ public void writeTo(StreamOutput out) throws IOException {
7885
}
7986

8087
@Override
81-
public void readFrom(StreamInput in) throws IOException {
82-
super.readFrom(in);
83-
items = new BulkItemRequest[in.readVInt()];
84-
for (int i = 0; i < items.length; i++) {
85-
if (in.readBoolean()) {
86-
items[i] = BulkItemRequest.readBulkItem(in);
87-
}
88-
}
88+
public void readFrom(StreamInput in) {
89+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
8990
}
9091

9192
@Override

server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,10 @@
2828
import org.elasticsearch.action.support.WriteResponse;
2929
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
3030
import org.elasticsearch.action.support.replication.ReplicationResponse;
31+
import org.elasticsearch.common.io.stream.Writeable;
3132
import org.elasticsearch.tasks.Task;
3233
import org.elasticsearch.transport.TransportService;
3334

34-
import java.util.function.Supplier;
35-
3635
/** use transport bulk action directly */
3736
@Deprecated
3837
public abstract class TransportSingleItemBulkWriteAction<
@@ -43,8 +42,8 @@ public abstract class TransportSingleItemBulkWriteAction<
4342
private final TransportBulkAction bulkAction;
4443

4544
protected TransportSingleItemBulkWriteAction(String actionName, TransportService transportService, ActionFilters actionFilters,
46-
Supplier<Request> request, TransportBulkAction bulkAction) {
47-
super(actionName, transportService, actionFilters, request);
45+
Writeable.Reader<Request> requestReader, TransportBulkAction bulkAction) {
46+
super(actionName, transportService, actionFilters, requestReader);
4847
this.bulkAction = bulkAction;
4948
}
5049

server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@
5353
public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
5454
implements DocWriteRequest<DeleteRequest>, CompositeIndicesRequest {
5555

56+
private static final ShardId NO_SHARD_ID = null;
57+
5658
// Set to null initially so we can know to override in bulk requests that have a default type.
5759
private String type;
5860
private String id;
@@ -63,14 +65,35 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
6365
private long ifSeqNo = UNASSIGNED_SEQ_NO;
6466
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
6567

68+
public DeleteRequest(StreamInput in) throws IOException {
69+
super(in);
70+
type = in.readString();
71+
id = in.readString();
72+
routing = in.readOptionalString();
73+
if (in.getVersion().before(Version.V_7_0_0)) {
74+
in.readOptionalString(); // _parent
75+
}
76+
version = in.readLong();
77+
versionType = VersionType.fromValue(in.readByte());
78+
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
79+
ifSeqNo = in.readZLong();
80+
ifPrimaryTerm = in.readVLong();
81+
} else {
82+
ifSeqNo = UNASSIGNED_SEQ_NO;
83+
ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
84+
}
85+
}
86+
6687
public DeleteRequest() {
88+
super(NO_SHARD_ID);
6789
}
6890

6991
/**
7092
* Constructs a new delete request against the specified index. The {@link #type(String)} and {@link #id(String)}
7193
* must be set.
7294
*/
7395
public DeleteRequest(String index) {
96+
super(NO_SHARD_ID);
7497
this.index = index;
7598
}
7699

@@ -85,6 +108,7 @@ public DeleteRequest(String index) {
85108
*/
86109
@Deprecated
87110
public DeleteRequest(String index, String type, String id) {
111+
super(NO_SHARD_ID);
88112
this.index = index;
89113
this.type = type;
90114
this.id = id;
@@ -97,6 +121,7 @@ public DeleteRequest(String index, String type, String id) {
97121
* @param id The id of the document
98122
*/
99123
public DeleteRequest(String index, String id) {
124+
super(NO_SHARD_ID);
100125
this.index = index;
101126
this.id = id;
102127
}
@@ -274,23 +299,8 @@ public OpType opType() {
274299
}
275300

276301
@Override
277-
public void readFrom(StreamInput in) throws IOException {
278-
super.readFrom(in);
279-
type = in.readString();
280-
id = in.readString();
281-
routing = in.readOptionalString();
282-
if (in.getVersion().before(Version.V_7_0_0)) {
283-
in.readOptionalString(); // _parent
284-
}
285-
version = in.readLong();
286-
versionType = VersionType.fromValue(in.readByte());
287-
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
288-
ifSeqNo = in.readZLong();
289-
ifPrimaryTerm = in.readVLong();
290-
} else {
291-
ifSeqNo = UNASSIGNED_SEQ_NO;
292-
ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
293-
}
302+
public void readFrom(StreamInput in) {
303+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
294304
}
295305

296306
@Override
@@ -321,14 +331,4 @@ public void writeTo(StreamOutput out) throws IOException {
321331
public String toString() {
322332
return "delete {[" + index + "][" + type() + "][" + id + "]}";
323333
}
324-
325-
/**
326-
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
327-
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
328-
* use because the DeleteRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set.
329-
*/
330-
@Override
331-
public DeleteRequest setShardId(ShardId shardId) {
332-
throw new UnsupportedOperationException("shard id should never be set on DeleteRequest");
333-
}
334334
}

0 commit comments

Comments
 (0)