Skip to content

Commit 35e24ba

Browse files
committed
[CCR] Change FollowIndexAction.Request class to be more user friendly (#33810)
Instead of having one constructor that accepts all arguments, all parameters should be provided via setters. Only leader and follower index are required arguments. This makes using this class in tests and transport client easier.
1 parent 4480718 commit 35e24ba

File tree

10 files changed

+267
-190
lines changed

10 files changed

+267
-190
lines changed

x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,13 +199,13 @@ private static void refresh(String index) throws IOException {
199199

200200
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
201201
final Request request = new Request("POST", "/" + followIndex + "/_ccr/follow");
202-
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
202+
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
203203
assertOK(client().performRequest(request));
204204
}
205205

206206
private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
207207
final Request request = new Request("POST", "/" + followIndex + "/_ccr/create_and_follow");
208-
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
208+
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
209209
assertOK(client().performRequest(request));
210210
}
211211

x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,13 @@ private static void refresh(String index) throws IOException {
141141

142142
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
143143
final Request request = new Request("POST", "/" + followIndex + "/_ccr/follow");
144-
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
144+
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
145145
assertOK(client().performRequest(request));
146146
}
147147

148148
private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
149149
final Request request = new Request("POST", "/" + followIndex + "/_ccr/create_and_follow");
150-
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
150+
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
151151
assertOK(client().performRequest(request));
152152
}
153153

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -297,12 +297,16 @@ private void followLeaderIndex(String clusterAlias, Index indexToFollow,
297297

298298
String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName :
299299
clusterAlias + ":" + leaderIndexName;
300-
FollowIndexAction.Request request =
301-
new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName,
302-
pattern.getMaxBatchOperationCount(), pattern.getMaxConcurrentReadBatches(),
303-
pattern.getMaxOperationSizeInBytes(), pattern.getMaxConcurrentWriteBatches(),
304-
pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(),
305-
pattern.getIdleShardRetryDelay());
300+
FollowIndexAction.Request request = new FollowIndexAction.Request();
301+
request.setLeaderIndex(leaderIndexNameWithClusterAliasPrefix);
302+
request.setFollowerIndex(followIndexName);
303+
request.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount());
304+
request.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches());
305+
request.setMaxOperationSizeInBytes(pattern.getMaxOperationSizeInBytes());
306+
request.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches());
307+
request.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
308+
request.setMaxRetryDelay(pattern.getMaxRetryDelay());
309+
request.setPollTimeout(pattern.getIdleShardRetryDelay());
306310

307311
// Execute if the create and follow api call succeeds:
308312
Runnable successHandler = () -> {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.elasticsearch.indices.IndicesService;
3535
import org.elasticsearch.threadpool.ThreadPool;
3636
import org.elasticsearch.transport.TransportService;
37-
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
3837

3938
import java.io.IOException;
4039
import java.util.ArrayList;
@@ -71,8 +70,8 @@ public static class Request extends SingleShardRequest<Request> {
7170
private int maxOperationCount;
7271
private ShardId shardId;
7372
private String expectedHistoryUUID;
74-
private TimeValue pollTimeout = FollowIndexAction.DEFAULT_POLL_TIMEOUT;
75-
private long maxOperationSizeInBytes = FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
73+
private TimeValue pollTimeout = TransportFollowIndexAction.DEFAULT_POLL_TIMEOUT;
74+
private long maxOperationSizeInBytes = TransportFollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
7675

7776
public Request(ShardId shardId, String expectedHistoryUUID) {
7877
super(shardId.getIndexName());

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java

Lines changed: 74 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.common.inject.Inject;
2121
import org.elasticsearch.common.settings.Setting;
2222
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.common.unit.TimeValue;
2324
import org.elasticsearch.index.IndexNotFoundException;
2425
import org.elasticsearch.index.IndexSettings;
2526
import org.elasticsearch.index.IndexingSlowLog;
@@ -55,6 +56,14 @@
5556

5657
public class TransportFollowIndexAction extends HandledTransportAction<FollowIndexAction.Request, AcknowledgedResponse> {
5758

59+
static final long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = Long.MAX_VALUE;
60+
private static final TimeValue DEFAULT_MAX_RETRY_DELAY = new TimeValue(500);
61+
private static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 1;
62+
private static final int DEFAULT_MAX_WRITE_BUFFER_SIZE = 10240;
63+
private static final int DEFAULT_MAX_BATCH_OPERATION_COUNT = 1024;
64+
private static final int DEFAULT_MAX_CONCURRENT_READ_BATCHES = 1;
65+
static final TimeValue DEFAULT_POLL_TIMEOUT = TimeValue.timeValueMinutes(1);
66+
5867
private final Client client;
5968
private final ThreadPool threadPool;
6069
private final ClusterService clusterService;
@@ -185,19 +194,8 @@ void start(
185194
String[] recordedLeaderShardHistoryUUIDs = extractIndexShardHistoryUUIDs(ccrIndexMetadata);
186195
String recordedLeaderShardHistoryUUID = recordedLeaderShardHistoryUUIDs[shardId];
187196

188-
ShardFollowTask shardFollowTask = new ShardFollowTask(
189-
clusterNameAlias,
190-
new ShardId(followIndexMetadata.getIndex(), shardId),
191-
new ShardId(leaderIndexMetadata.getIndex(), shardId),
192-
request.getMaxBatchOperationCount(),
193-
request.getMaxConcurrentReadBatches(),
194-
request.getMaxOperationSizeInBytes(),
195-
request.getMaxConcurrentWriteBatches(),
196-
request.getMaxWriteBufferSize(),
197-
request.getMaxRetryDelay(),
198-
request.getPollTimeout(),
199-
recordedLeaderShardHistoryUUID,
200-
filteredHeaders);
197+
final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request,
198+
leaderIndexMetadata, followIndexMetadata, recordedLeaderShardHistoryUUID, filteredHeaders);
201199
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
202200
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
203201
@Override
@@ -305,6 +303,69 @@ static void validate(
305303
followerMapperService.merge(leaderIndex, MapperService.MergeReason.MAPPING_RECOVERY, true);
306304
}
307305

306+
private static ShardFollowTask createShardFollowTask(
307+
int shardId,
308+
String clusterAliasName,
309+
FollowIndexAction.Request request,
310+
IndexMetaData leaderIndexMetadata,
311+
IndexMetaData followIndexMetadata,
312+
String recordedLeaderShardHistoryUUID,
313+
Map<String, String> filteredHeaders
314+
) {
315+
int maxBatchOperationCount;
316+
if (request.getMaxBatchOperationCount() != null) {
317+
maxBatchOperationCount = request.getMaxBatchOperationCount();
318+
} else {
319+
maxBatchOperationCount = DEFAULT_MAX_BATCH_OPERATION_COUNT;
320+
}
321+
322+
int maxConcurrentReadBatches;
323+
if (request.getMaxConcurrentReadBatches() != null){
324+
maxConcurrentReadBatches = request.getMaxConcurrentReadBatches();
325+
} else {
326+
maxConcurrentReadBatches = DEFAULT_MAX_CONCURRENT_READ_BATCHES;
327+
}
328+
329+
long maxOperationSizeInBytes;
330+
if (request.getMaxOperationSizeInBytes() != null) {
331+
maxOperationSizeInBytes = request.getMaxOperationSizeInBytes();
332+
} else {
333+
maxOperationSizeInBytes = DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
334+
}
335+
336+
int maxConcurrentWriteBatches;
337+
if (request.getMaxConcurrentWriteBatches() != null) {
338+
maxConcurrentWriteBatches = request.getMaxConcurrentWriteBatches();
339+
} else {
340+
maxConcurrentWriteBatches = DEFAULT_MAX_CONCURRENT_WRITE_BATCHES;
341+
}
342+
343+
int maxWriteBufferSize;
344+
if (request.getMaxWriteBufferSize() != null) {
345+
maxWriteBufferSize = request.getMaxWriteBufferSize();
346+
} else {
347+
maxWriteBufferSize = DEFAULT_MAX_WRITE_BUFFER_SIZE;
348+
}
349+
350+
TimeValue maxRetryDelay = request.getMaxRetryDelay() == null ? DEFAULT_MAX_RETRY_DELAY : request.getMaxRetryDelay();
351+
TimeValue pollTimeout = request.getPollTimeout() == null ? DEFAULT_POLL_TIMEOUT : request.getPollTimeout();
352+
353+
return new ShardFollowTask(
354+
clusterAliasName,
355+
new ShardId(followIndexMetadata.getIndex(), shardId),
356+
new ShardId(leaderIndexMetadata.getIndex(), shardId),
357+
maxBatchOperationCount,
358+
maxConcurrentReadBatches,
359+
maxOperationSizeInBytes,
360+
maxConcurrentWriteBatches,
361+
maxWriteBufferSize,
362+
maxRetryDelay,
363+
pollTimeout,
364+
recordedLeaderShardHistoryUUID,
365+
filteredHeaders
366+
);
367+
}
368+
308369
private static String[] extractIndexShardHistoryUUIDs(Map<String, String> ccrIndexMetaData) {
309370
String historyUUIDs = ccrIndexMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS);
310371
return historyUUIDs.split(",");

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -192,16 +192,12 @@ private void assertNonCompliantLicense(final Exception e) {
192192
}
193193

194194
private FollowIndexAction.Request getFollowRequest() {
195-
return new FollowIndexAction.Request(
196-
"leader",
197-
"follower",
198-
FollowIndexAction.DEFAULT_MAX_BATCH_OPERATION_COUNT,
199-
FollowIndexAction.DEFAULT_MAX_CONCURRENT_READ_BATCHES,
200-
FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES,
201-
FollowIndexAction.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES,
202-
FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE,
203-
TimeValue.timeValueMillis(10),
204-
TimeValue.timeValueMillis(10));
195+
FollowIndexAction.Request request = new FollowIndexAction.Request();
196+
request.setLeaderIndex("leader");
197+
request.setFollowerIndex("follower");
198+
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
199+
request.setPollTimeout(TimeValue.timeValueMillis(10));
200+
return request;
205201
}
206202

207203
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -318,9 +318,11 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
318318
long numDocsIndexed = Math.min(3000 * 2, randomLongBetween(maxReadSize, maxReadSize * 10));
319319
atLeastDocsIndexed("index1", numDocsIndexed / 3);
320320

321-
final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", maxReadSize,
322-
randomIntBetween(2, 10), Long.MAX_VALUE, randomIntBetween(2, 10),
323-
randomIntBetween(1024, 10240), TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
321+
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
322+
followRequest.setMaxBatchOperationCount(maxReadSize);
323+
followRequest.setMaxConcurrentReadBatches(randomIntBetween(2, 10));
324+
followRequest.setMaxConcurrentWriteBatches(randomIntBetween(2, 10));
325+
followRequest.setMaxWriteBufferSize(randomIntBetween(1024, 10240));
324326
CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
325327
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
326328

@@ -357,9 +359,10 @@ public void testFollowIndexAndCloseNode() throws Exception {
357359
});
358360
thread.start();
359361

360-
final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", randomIntBetween(32, 2048),
361-
randomIntBetween(2, 10), Long.MAX_VALUE, randomIntBetween(2, 10),
362-
FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
362+
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
363+
followRequest.setMaxBatchOperationCount(randomIntBetween(32, 2048));
364+
followRequest.setMaxConcurrentReadBatches(randomIntBetween(2, 10));
365+
followRequest.setMaxConcurrentWriteBatches(randomIntBetween(2, 10));
363366
client().execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest)).get();
364367

365368
long maxNumDocsReplicated = Math.min(1000, randomLongBetween(followRequest.getMaxBatchOperationCount(),
@@ -446,7 +449,7 @@ public void testFollowNonExistentIndex() throws Exception {
446449
.actionGet());
447450
}
448451

449-
public void testFollowIndex_lowMaxTranslogBytes() throws Exception {
452+
public void testFollowIndexMaxOperationSizeInBytes() throws Exception {
450453
final String leaderIndexSettings = getIndexSettings(1, between(0, 1),
451454
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
452455
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
@@ -459,8 +462,8 @@ public void testFollowIndex_lowMaxTranslogBytes() throws Exception {
459462
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
460463
}
461464

462-
final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", 1024, 1, 1024L,
463-
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
465+
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
466+
followRequest.setMaxOperationSizeInBytes(1L);
464467
final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
465468
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
466469

@@ -488,25 +491,21 @@ public void testDontFollowTheWrongIndex() throws Exception {
488491
assertAcked(client().admin().indices().prepareCreate("index3").setSource(leaderIndexSettings, XContentType.JSON));
489492
ensureGreen("index3");
490493

491-
FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", 1024, 1, 1024L,
492-
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
494+
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
493495
CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
494496
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
495497

496-
followRequest = new FollowIndexAction.Request("index3", "index4", 1024, 1, 1024L,
497-
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
498+
followRequest = createFollowRequest("index3", "index4");
498499
createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
499500
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
500501
unfollowIndex("index2", "index4");
501502

502-
FollowIndexAction.Request wrongRequest1 = new FollowIndexAction.Request("index1", "index4", 1024, 1, 1024L,
503-
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
503+
FollowIndexAction.Request wrongRequest1 = createFollowRequest("index1", "index4");
504504
Exception e = expectThrows(IllegalArgumentException.class,
505505
() -> client().execute(FollowIndexAction.INSTANCE, wrongRequest1).actionGet());
506506
assertThat(e.getMessage(), containsString("follow index [index4] should reference"));
507507

508-
FollowIndexAction.Request wrongRequest2 = new FollowIndexAction.Request("index3", "index2", 1024, 1, 1024L,
509-
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
508+
FollowIndexAction.Request wrongRequest2 = createFollowRequest("index3", "index2");
510509
e = expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, wrongRequest2).actionGet());
511510
assertThat(e.getMessage(), containsString("follow index [index2] should reference"));
512511
}
@@ -715,10 +714,12 @@ private void assertSameDocCount(String index1, String index2) throws Exception {
715714
}, 60, TimeUnit.SECONDS);
716715
}
717716

718-
public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followIndex) {
719-
return new FollowIndexAction.Request(leaderIndex, followIndex, FollowIndexAction.DEFAULT_MAX_BATCH_OPERATION_COUNT,
720-
FollowIndexAction.DEFAULT_MAX_CONCURRENT_READ_BATCHES, FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES,
721-
FollowIndexAction.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES, FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE,
722-
TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10));
717+
public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followerIndex) {
718+
FollowIndexAction.Request request = new FollowIndexAction.Request();
719+
request.setLeaderIndex(leaderIndex);
720+
request.setFollowerIndex(followerIndex);
721+
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
722+
request.setPollTimeout(TimeValue.timeValueMillis(10));
723+
return request;
723724
}
724725
}

0 commit comments

Comments
 (0)