1414import org .elasticsearch .test .ESTestCase ;
1515import org .elasticsearch .threadpool .ThreadPool ;
1616import org .elasticsearch .xpack .ccr .Ccr ;
17- import org .elasticsearch .xpack .ccr .action .ShardFollowTasksExecutor .ChunkProcessor ;
17+ import org .elasticsearch .xpack .ccr .action .ShardFollowTasksExecutor .ChunkWorker ;
1818import org .elasticsearch .xpack .ccr .action .ShardFollowTasksExecutor .ChunksCoordinator ;
1919import org .elasticsearch .xpack .ccr .action .ShardFollowTasksExecutor .IndexMetadataVersionChecker ;
2020import org .elasticsearch .xpack .ccr .action .bulk .BulkShardOperationsAction ;
3232import java .util .concurrent .atomic .AtomicBoolean ;
3333import java .util .concurrent .atomic .AtomicReference ;
3434import java .util .function .BiConsumer ;
35+ import java .util .function .BiConsumer ;
3536import java .util .function .Consumer ;
3637import java .util .function .LongConsumer ;
3738
@@ -122,7 +123,8 @@ public void testCoordinator() throws Exception {
122123 IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker (leaderShardId .getIndex (),
123124 followShardId .getIndex (), client , client );
124125 ChunksCoordinator coordinator = new ChunksCoordinator (client , client , threadPool , checker , batchSize ,
125- concurrentProcessors , Long .MAX_VALUE , leaderShardId , followShardId , handler , () -> true , value -> {});
126+ concurrentProcessors , Long .MAX_VALUE , leaderShardId , followShardId , handler ,
127+ () -> true , value -> {});
126128
127129 int numberOfOps = randomIntBetween (batchSize , batchSize * 20 );
128130 long from = randomInt (1000 );
@@ -163,8 +165,9 @@ public void testCoordinator_failure() throws Exception {
163165 assertThat (e , sameInstance (expectedException ));
164166 };
165167 IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker (leaderShardId .getIndex (),
166- followShardId .getIndex (), client , client );ChunksCoordinator coordinator =
167- new ChunksCoordinator (client , client , threadPool , checker ,10 , 1 , Long .MAX_VALUE , leaderShardId , followShardId , handler , () -> true , value -> {});
168+ followShardId .getIndex (), client , client );
169+ ChunksCoordinator coordinator = new ChunksCoordinator (client , client , threadPool , checker ,10 , 1 , Long .MAX_VALUE ,
170+ leaderShardId , followShardId , handler , () -> true , value -> {});
168171 coordinator .start (0 , 20 );
169172
170173 assertThat (coordinator .getChunks ().size (), equalTo (1 ));
@@ -209,7 +212,7 @@ public void testCoordinator_concurrent() throws Exception {
209212 assertThat (calledOnceChecker .get (), is (false ));
210213 }
211214
212- public void testChunkProcessor () {
215+ public void testChunkWorker () {
213216 Client client = createClientMock ();
214217 Queue <long []> chunks = new LinkedList <>();
215218 mockShardChangesApiCall (client );
@@ -223,14 +226,14 @@ public void testChunkProcessor() {
223226 boolean [] invoked = new boolean [1 ];
224227 Exception [] exception = new Exception [1 ];
225228 Consumer <Exception > handler = e -> {invoked [0 ] = true ;exception [0 ] = e ;};
226- ChunkProcessor chunkProcessor = new ChunkProcessor (client , client , chunks , ccrExecutor , checker , leaderShardId ,
229+ ChunkWorker chunkWorker = new ChunkWorker (client , client , chunks , ccrExecutor , checker , leaderShardId ,
227230 followShardId , handler );
228- chunkProcessor .start (0 , 10 , Long .MAX_VALUE );
231+ chunkWorker .start (0 , 10 , Long .MAX_VALUE );
229232 assertThat (invoked [0 ], is (true ));
230233 assertThat (exception [0 ], nullValue ());
231234 }
232235
233- public void testChunkProcessorRetry () {
236+ public void testChunkWorkerRetry () {
234237 Client client = createClientMock ();
235238 Queue <long []> chunks = new LinkedList <>();
236239 mockBulkShardOperationsApiCall (client );
@@ -246,15 +249,15 @@ public void testChunkProcessorRetry() {
246249 boolean [] invoked = new boolean [1 ];
247250 Exception [] exception = new Exception [1 ];
248251 Consumer <Exception > handler = e -> {invoked [0 ] = true ;exception [0 ] = e ;};
249- ChunkProcessor chunkProcessor = new ChunkProcessor (client , client , chunks , ccrExecutor , checker , leaderShardId ,
252+ ChunkWorker chunkWorker = new ChunkWorker (client , client , chunks , ccrExecutor , checker , leaderShardId ,
250253 followShardId , handler );
251- chunkProcessor .start (0 , 10 , Long .MAX_VALUE );
254+ chunkWorker .start (0 , 10 , Long .MAX_VALUE );
252255 assertThat (invoked [0 ], is (true ));
253256 assertThat (exception [0 ], nullValue ());
254- assertThat (chunkProcessor .retryCounter .get (), equalTo (testRetryLimit + 1 ));
257+ assertThat (chunkWorker .retryCounter .get (), equalTo (testRetryLimit + 1 ));
255258 }
256259
257- public void testChunkProcessorRetryTooManyTimes () {
260+ public void testChunkWorkerRetryTooManyTimes () {
258261 Client client = createClientMock ();
259262 Queue <long []> chunks = new LinkedList <>();
260263 mockBulkShardOperationsApiCall (client );
@@ -270,17 +273,17 @@ public void testChunkProcessorRetryTooManyTimes() {
270273 boolean [] invoked = new boolean [1 ];
271274 Exception [] exception = new Exception [1 ];
272275 Consumer <Exception > handler = e -> {invoked [0 ] = true ;exception [0 ] = e ;};
273- ChunkProcessor chunkProcessor = new ChunkProcessor (client , client , chunks , ccrExecutor , checker , leaderShardId ,
276+ ChunkWorker chunkWorker = new ChunkWorker (client , client , chunks , ccrExecutor , checker , leaderShardId ,
274277 followShardId , handler );
275- chunkProcessor .start (0 , 10 , Long .MAX_VALUE );
278+ chunkWorker .start (0 , 10 , Long .MAX_VALUE );
276279 assertThat (invoked [0 ], is (true ));
277280 assertThat (exception [0 ], notNullValue ());
278281 assertThat (exception [0 ].getMessage (), equalTo ("retrying failed [17] times, aborting..." ));
279282 assertThat (exception [0 ].getCause ().getMessage (), equalTo ("connection exception" ));
280- assertThat (chunkProcessor .retryCounter .get (), equalTo (testRetryLimit ));
283+ assertThat (chunkWorker .retryCounter .get (), equalTo (testRetryLimit ));
281284 }
282285
283- public void testChunkProcessorNoneRetryableError () {
286+ public void testChunkWorkerNoneRetryableError () {
284287 Client client = createClientMock ();
285288 Queue <long []> chunks = new LinkedList <>();
286289 mockBulkShardOperationsApiCall (client );
@@ -295,16 +298,16 @@ public void testChunkProcessorNoneRetryableError() {
295298 boolean [] invoked = new boolean [1 ];
296299 Exception [] exception = new Exception [1 ];
297300 Consumer <Exception > handler = e -> {invoked [0 ] = true ;exception [0 ] = e ;};
298- ChunkProcessor chunkProcessor = new ChunkProcessor (client , client , chunks , ccrExecutor , checker , leaderShardId ,
301+ ChunkWorker chunkWorker = new ChunkWorker (client , client , chunks , ccrExecutor , checker , leaderShardId ,
299302 followShardId , handler );
300- chunkProcessor .start (0 , 10 , Long .MAX_VALUE );
303+ chunkWorker .start (0 , 10 , Long .MAX_VALUE );
301304 assertThat (invoked [0 ], is (true ));
302305 assertThat (exception [0 ], notNullValue ());
303306 assertThat (exception [0 ].getMessage (), equalTo ("unexpected" ));
304- assertThat (chunkProcessor .retryCounter .get (), equalTo (0 ));
307+ assertThat (chunkWorker .retryCounter .get (), equalTo (0 ));
305308 }
306309
307- public void testChunkProcessorExceedMaxTranslogsBytes () {
310+ public void testChunkWorkerExceedMaxTranslogsBytes () {
308311 long from = 0 ;
309312 long to = 20 ;
310313 long actualTo = 10 ;
@@ -335,9 +338,9 @@ public void testChunkProcessorExceedMaxTranslogsBytes() {
335338 Exception [] exception = new Exception [1 ];
336339 Consumer <Exception > handler = e -> {invoked [0 ] = true ;exception [0 ] = e ;};
337340 BiConsumer <Long , Consumer <Exception >> versionChecker = (indexVersiuon , consumer ) -> consumer .accept (null );
338- ChunkProcessor chunkProcessor =
339- new ChunkProcessor ( client , client , chunks , ccrExecutor , versionChecker , leaderShardId , followShardId , handler );
340- chunkProcessor .start (from , to , Long .MAX_VALUE );
341+ ChunkWorker chunkWorker = new ChunkWorker ( client , client , chunks , ccrExecutor , versionChecker , leaderShardId ,
342+ followShardId , handler );
343+ chunkWorker .start (from , to , Long .MAX_VALUE );
341344 assertThat (invoked [0 ], is (true ));
342345 assertThat (exception [0 ], nullValue ());
343346 assertThat (chunks .size (), equalTo (1 ));
0 commit comments