Skip to content

Commit ea84b96

Browse files
authored
Remove test-only customisation from TransReplAct (#40863)
The `getIndexShard()` and `sendReplicaRequest()` methods in TransportReplicationAction are effectively only used to customise some behaviour in tests. However there are other ways to do this that do not cause such an obstacle to separating the TransportReplicationAction into its two halves (see #40706). This commit removes these customisation points and injects the test-only behaviour using other techniques.
1 parent 6562c62 commit ea84b96

File tree

5 files changed

+108
-109
lines changed

5 files changed

+108
-109
lines changed

server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.elasticsearch.cluster.action.shard.ShardStateAction;
2929
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3030
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
31-
import org.elasticsearch.cluster.node.DiscoveryNode;
3231
import org.elasticsearch.cluster.routing.ShardRouting;
3332
import org.elasticsearch.cluster.service.ClusterService;
3433
import org.elasticsearch.common.inject.Inject;
@@ -73,14 +72,6 @@ protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) {
7372
return new ResyncActionReplicasProxy(primaryTerm);
7473
}
7574

76-
@Override
77-
protected void sendReplicaRequest(
78-
final ConcreteReplicaRequest<ResyncReplicationRequest> replicaRequest,
79-
final DiscoveryNode node,
80-
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
81-
super.sendReplicaRequest(replicaRequest, node, listener);
82-
}
83-
8475
@Override
8576
protected ClusterBlockLevel globalBlockLevel() {
8677
// resync should never be blocked because it's an internal action

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,7 @@ public void onFailure(Exception e) {
618618
}
619619
}
620620

621-
protected IndexShard getIndexShard(final ShardId shardId) {
621+
private IndexShard getIndexShard(final ShardId shardId) {
622622
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
623623
return indexService.getShard(shardId.id());
624624
}
@@ -1044,7 +1044,12 @@ public void performOn(
10441044
}
10451045
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest = new ConcreteReplicaRequest<>(
10461046
request, replica.allocationId().getId(), primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
1047-
sendReplicaRequest(replicaRequest, node, listener);
1047+
final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(listener, in -> {
1048+
ReplicaResponse replicaResponse = new ReplicaResponse();
1049+
replicaResponse.readFrom(in);
1050+
return replicaResponse;
1051+
});
1052+
transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler);
10481053
}
10491054

10501055
@Override
@@ -1066,25 +1071,6 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, A
10661071
}
10671072
}
10681073

1069-
/**
1070-
* Sends the specified replica request to the specified node.
1071-
*
1072-
* @param replicaRequest the replica request
1073-
* @param node the node to send the request to
1074-
* @param listener callback for handling the response or failure
1075-
*/
1076-
protected void sendReplicaRequest(
1077-
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest,
1078-
final DiscoveryNode node,
1079-
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
1080-
final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(listener, in -> {
1081-
ReplicaResponse replicaResponse = new ReplicaResponse();
1082-
replicaResponse.readFrom(in);
1083-
return replicaResponse;
1084-
});
1085-
transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler);
1086-
}
1087-
10881074
/** a wrapper class to encapsulate a request when being sent to a specific allocation id **/
10891075
public static class ConcreteShardRequest<R extends TransportRequest> extends TransportRequest {
10901076

server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,13 @@
2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.apache.lucene.store.AlreadyClosedException;
2424
import org.elasticsearch.ExceptionsHelper;
25-
import org.elasticsearch.Version;
2625
import org.elasticsearch.action.ActionListener;
2726
import org.elasticsearch.action.support.ActionFilters;
28-
import org.elasticsearch.action.support.replication.ReplicationOperation;
2927
import org.elasticsearch.action.support.replication.ReplicationRequest;
3028
import org.elasticsearch.action.support.replication.ReplicationResponse;
3129
import org.elasticsearch.action.support.replication.TransportReplicationAction;
3230
import org.elasticsearch.cluster.action.shard.ShardStateAction;
3331
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
34-
import org.elasticsearch.cluster.node.DiscoveryNode;
3532
import org.elasticsearch.cluster.service.ClusterService;
3633
import org.elasticsearch.common.inject.Inject;
3734
import org.elasticsearch.common.settings.Settings;
@@ -103,19 +100,6 @@ protected ReplicationResponse newResponseInstance() {
103100
return new ReplicationResponse();
104101
}
105102

106-
@Override
107-
protected void sendReplicaRequest(
108-
final ConcreteReplicaRequest<Request> replicaRequest,
109-
final DiscoveryNode node,
110-
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
111-
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
112-
super.sendReplicaRequest(replicaRequest, node, listener);
113-
} else {
114-
final long pre60NodeCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT;
115-
listener.onResponse(new ReplicaResponse(pre60NodeCheckpoint, pre60NodeCheckpoint));
116-
}
117-
}
118-
119103
@Override
120104
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
121105
final Request request, final IndexShard indexShard) throws Exception {

server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -757,7 +757,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException {
757757
assertEquals(0, shardFailedRequests.length);
758758
}
759759

760-
public void testSeqNoIsSetOnPrimary() throws Exception {
760+
public void testSeqNoIsSetOnPrimary() {
761761
final String index = "test";
762762
final ShardId shardId = new ShardId(index, "_na_", 0);
763763
// we use one replica to check the primary term was set on the operation and sent to the replica
@@ -788,14 +788,14 @@ public void testSeqNoIsSetOnPrimary() throws Exception {
788788
return null;
789789
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject());
790790

791-
TestAction action =
792-
new TestAction(Settings.EMPTY, "internal:testSeqNoIsSetOnPrimary", transportService, clusterService, shardStateAction,
793-
threadPool) {
794-
@Override
795-
protected IndexShard getIndexShard(ShardId shardId) {
796-
return shard;
797-
}
798-
};
791+
final IndexService indexService = mock(IndexService.class);
792+
when(indexService.getShard(shard.shardId().id())).thenReturn(shard);
793+
794+
final IndicesService indicesService = mock(IndicesService.class);
795+
when(indicesService.indexServiceSafe(shard.shardId().getIndex())).thenReturn(indexService);
796+
797+
TestAction action = new TestAction(Settings.EMPTY, "internal:testSeqNoIsSetOnPrimary", transportService, clusterService,
798+
shardStateAction, threadPool, indicesService);
799799

800800
action.handlePrimaryRequest(concreteShardRequest, createTransportChannel(listener), null);
801801
CapturingTransport.CapturedRequest[] requestsToReplicas = transport.capturedRequests();
@@ -1207,11 +1207,16 @@ static class TestResponse extends ReplicationResponse {
12071207

12081208
private class TestAction extends TransportReplicationAction<Request, Request, TestResponse> {
12091209

1210-
12111210
TestAction(Settings settings, String actionName, TransportService transportService,
12121211
ClusterService clusterService, ShardStateAction shardStateAction,
12131212
ThreadPool threadPool) {
1214-
super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool,
1213+
this(settings, actionName, transportService, clusterService, shardStateAction, threadPool, mockIndicesService(clusterService));
1214+
}
1215+
1216+
TestAction(Settings settings, String actionName, TransportService transportService,
1217+
ClusterService clusterService, ShardStateAction shardStateAction,
1218+
ThreadPool threadPool, IndicesService indicesService) {
1219+
super(settings, actionName, transportService, clusterService, indicesService, threadPool,
12151220
shardStateAction,
12161221
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(),
12171222
Request::new, Request::new, ThreadPool.Names.SAME);
@@ -1241,7 +1246,7 @@ protected boolean resolveIndex() {
12411246
}
12421247
}
12431248

1244-
final IndicesService mockIndicesService(ClusterService clusterService) {
1249+
private IndicesService mockIndicesService(ClusterService clusterService) {
12451250
final IndicesService indicesService = mock(IndicesService.class);
12461251
when(indicesService.indexServiceSafe(any(Index.class))).then(invocation -> {
12471252
Index index = (Index)invocation.getArguments()[0];
@@ -1261,7 +1266,7 @@ final IndicesService mockIndicesService(ClusterService clusterService) {
12611266
return indicesService;
12621267
}
12631268

1264-
final IndexService mockIndexService(final IndexMetaData indexMetaData, ClusterService clusterService) {
1269+
private IndexService mockIndexService(final IndexMetaData indexMetaData, ClusterService clusterService) {
12651270
final IndexService indexService = mock(IndexService.class);
12661271
when(indexService.getShard(anyInt())).then(invocation -> {
12671272
int shard = (Integer) invocation.getArguments()[0];

0 commit comments

Comments
 (0)