3030import org .elasticsearch .common .transport .NetworkExceptionHelper ;
3131import org .elasticsearch .common .unit .TimeValue ;
3232import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
33- import org .elasticsearch .common .util .concurrent .CountDown ;
3433import org .elasticsearch .common .xcontent .XContentType ;
3534import org .elasticsearch .index .Index ;
3635import org .elasticsearch .common .util .concurrent .ThreadContext ;
5655import java .util .concurrent .Semaphore ;
5756import java .util .concurrent .atomic .AtomicInteger ;
5857import java .util .concurrent .atomic .AtomicLong ;
59- import java .util .concurrent .atomic .AtomicReference ;
6058import java .util .function .BiConsumer ;
6159import java .util .function .Consumer ;
6260import java .util .function .LongConsumer ;
@@ -69,7 +67,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
6967 static final int PROCESSOR_RETRY_LIMIT = 16 ;
7068 static final int DEFAULT_CONCURRENT_PROCESSORS = 1 ;
7169 static final long DEFAULT_MAX_TRANSLOG_BYTES = Long .MAX_VALUE ;
72- private static final TimeValue RETRY_TIMEOUT = TimeValue .timeValueMillis ( 500 );
70+ private static final TimeValue CHECK_LEADER_GLOBAL_CHECKPOINT_INTERVAL = TimeValue .timeValueSeconds ( 3 );
7371
7472 private final Client client ;
7573 private final ThreadPool threadPool ;
@@ -130,58 +128,20 @@ protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask param
130128 void prepare (Client leaderClient , Client followerClient , ShardFollowNodeTask task , ShardFollowTask params ,
131129 long followGlobalCheckPoint ,
132130 IndexMetadataVersionChecker imdVersionChecker ) {
133- if (task .isRunning () == false ) {
134- // TODO: need better cancellation control
135- return ;
136- }
137131
138132 final ShardId leaderShard = params .getLeaderShardId ();
139133 final ShardId followerShard = params .getFollowShardId ();
140134 fetchGlobalCheckpoint (leaderClient , leaderShard , leaderGlobalCheckPoint -> {
141- // TODO: check if both indices have the same history uuid
142- if (leaderGlobalCheckPoint == followGlobalCheckPoint ) {
143- logger .debug ("{} no write operations to fetch" , followerShard );
144- retry (leaderClient , followerClient , task , params , followGlobalCheckPoint , imdVersionChecker );
145- } else {
146- assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint +
147- "] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]" ;
148- logger .debug ("{} fetching write operations, leaderGlobalCheckPoint={}, followGlobalCheckPoint={}" , followerShard ,
149- leaderGlobalCheckPoint , followGlobalCheckPoint );
150- Executor ccrExecutor = threadPool .executor (Ccr .CCR_THREAD_POOL_NAME );
151- Consumer <Exception > handler = e -> {
152- if (e == null ) {
153- task .updateProcessedGlobalCheckpoint (leaderGlobalCheckPoint );
154- prepare (leaderClient , followerClient , task , params , leaderGlobalCheckPoint , imdVersionChecker );
155- } else {
156- task .markAsFailed (e );
157- }
158- };
159- ChunksCoordinator coordinator = new ChunksCoordinator (followerClient , leaderClient , ccrExecutor , imdVersionChecker ,
160- params .getMaxChunkSize (), params .getNumConcurrentChunks (), params .getProcessorMaxTranslogBytes (), leaderShard ,
161- followerShard , handler );
162- coordinator .createChucks (followGlobalCheckPoint , leaderGlobalCheckPoint );
163- coordinator .start ();
164- }
135+ logger .debug ("{} fetching write operations, leaderGlobalCheckPoint={}, followGlobalCheckPoint={}" , followerShard ,
136+ leaderGlobalCheckPoint , followGlobalCheckPoint );
137+ ChunksCoordinator coordinator = new ChunksCoordinator (followerClient , leaderClient , threadPool , imdVersionChecker , params .getMaxChunkSize (),
138+ params .getNumConcurrentChunks (), params .getProcessorMaxTranslogBytes (), leaderShard , followerShard , task ::markAsFailed ,
139+ task ::isRunning , task ::updateProcessedGlobalCheckpoint );
140+ coordinator .start (followGlobalCheckPoint , leaderGlobalCheckPoint );
165141 }, task ::markAsFailed );
166142 }
167143
168- private void retry (Client leaderClient , Client followerClient , ShardFollowNodeTask task , ShardFollowTask params ,
169- long followGlobalCheckPoint ,
170- IndexMetadataVersionChecker imdVersionChecker ) {
171- threadPool .schedule (RETRY_TIMEOUT , Ccr .CCR_THREAD_POOL_NAME , new AbstractRunnable () {
172- @ Override
173- public void onFailure (Exception e ) {
174- task .markAsFailed (e );
175- }
176-
177- @ Override
178- protected void doRun () throws Exception {
179- prepare (leaderClient , followerClient , task , params , followGlobalCheckPoint , imdVersionChecker );
180- }
181- });
182- }
183-
184- private void fetchGlobalCheckpoint (Client client , ShardId shardId , LongConsumer handler , Consumer <Exception > errorHandler ) {
144+ private static void fetchGlobalCheckpoint (Client client , ShardId shardId , LongConsumer handler , Consumer <Exception > errorHandler ) {
185145 client .admin ().indices ().stats (new IndicesStatsRequest ().indices (shardId .getIndexName ()), ActionListener .wrap (r -> {
186146 IndexStats indexStats = r .getIndex (shardId .getIndexName ());
187147 Optional <ShardStats > filteredShardStats = Arrays .stream (indexStats .getShards ())
@@ -201,37 +161,53 @@ private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer
201161 static class ChunksCoordinator {
202162
203163 private static final Logger LOGGER = Loggers .getLogger (ChunksCoordinator .class );
204-
164+
205165 private final Client followerClient ;
206166 private final Client leaderClient ;
207- private final Executor ccrExecutor ;
167+ private final ThreadPool threadPool ;
208168 private final IndexMetadataVersionChecker imdVersionChecker ;
209169
210170 private final long batchSize ;
211- private final int concurrentProcessors ;
171+ private final int maxConcurrentWorker ;
212172 private final long processorMaxTranslogBytes ;
213173 private final ShardId leaderShard ;
214174 private final ShardId followerShard ;
215- private final Consumer <Exception > handler ;
216-
217- private final CountDown countDown ;
175+ private final Consumer <Exception > failureHandler ;
176+ private final Supplier <Boolean > stateSupplier ;
177+ private final LongConsumer processedGlobalCheckpointUpdater ;
178+
179+ private final AtomicInteger activeWorkers ;
180+ private final AtomicLong lastPolledGlobalCheckpoint ;
181+ private final AtomicLong lastProcessedGlobalCheckPoint ;
218182 private final Queue <long []> chunks = new ConcurrentLinkedQueue <>();
219- private final AtomicReference <Exception > failureHolder = new AtomicReference <>();
220183
221- ChunksCoordinator (Client followerClient , Client leaderClient , Executor ccrExecutor , IndexMetadataVersionChecker imdVersionChecker ,
222- long batchSize , int concurrentProcessors , long processorMaxTranslogBytes , ShardId leaderShard ,
223- ShardId followerShard , Consumer <Exception > handler ) {
184+ ChunksCoordinator (Client followerClient ,
185+ Client leaderClient ,
186+ ThreadPool threadPool ,
187+ IndexMetadataVersionChecker imdVersionChecker ,
188+ long batchSize ,
189+ int maxConcurrentWorker ,
190+ long processorMaxTranslogBytes ,
191+ ShardId leaderShard ,
192+ ShardId followerShard ,
193+ Consumer <Exception > failureHandler ,
194+ Supplier <Boolean > runningSuppler ,
195+ LongConsumer processedGlobalCheckpointUpdater ) {
224196 this .followerClient = followerClient ;
225197 this .leaderClient = leaderClient ;
226- this .ccrExecutor = ccrExecutor ;
198+ this .threadPool = threadPool ;
227199 this .imdVersionChecker = imdVersionChecker ;
228200 this .batchSize = batchSize ;
229- this .concurrentProcessors = concurrentProcessors ;
201+ this .maxConcurrentWorker = maxConcurrentWorker ;
230202 this .processorMaxTranslogBytes = processorMaxTranslogBytes ;
231203 this .leaderShard = leaderShard ;
232204 this .followerShard = followerShard ;
233- this .handler = handler ;
234- this .countDown = new CountDown (concurrentProcessors );
205+ this .failureHandler = failureHandler ;
206+ this .stateSupplier = runningSuppler ;
207+ this .processedGlobalCheckpointUpdater = processedGlobalCheckpointUpdater ;
208+ this .activeWorkers = new AtomicInteger ();
209+ this .lastPolledGlobalCheckpoint = new AtomicLong ();
210+ this .lastProcessedGlobalCheckPoint = new AtomicLong ();
235211 }
236212
237213 void createChucks (long from , long to ) {
@@ -242,56 +218,105 @@ void createChucks(long from, long to) {
242218 }
243219 }
244220
245- void start () {
221+ void update () {
222+ schedule (() -> {
223+ if (stateSupplier .get () == false ) {
224+ return ;
225+ }
226+
227+ fetchGlobalCheckpoint (leaderClient , leaderShard , leaderGlobalCheckPoint -> {
228+ long followerGlobalCheckpoint = lastPolledGlobalCheckpoint .get ();
229+ if (leaderGlobalCheckPoint != followerGlobalCheckpoint ) {
230+ assert followerGlobalCheckpoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followerGlobalCheckpoint +
231+ "] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]" ;
232+ createChucks (lastPolledGlobalCheckpoint .get (), leaderGlobalCheckPoint );
233+ initiateChunkWorkers ();
234+ } else {
235+ LOGGER .debug ("{} no write operations to fetch" , followerShard );
236+ }
237+ update ();
238+ }, failureHandler );
239+ });
240+ }
241+
242+ void start (long followerGlobalCheckpoint , long leaderGlobalCheckPoint ) {
243+ createChucks (followerGlobalCheckpoint , leaderGlobalCheckPoint );
244+ lastPolledGlobalCheckpoint .set (leaderGlobalCheckPoint );
246245 LOGGER .debug ("{} Start coordination of [{}] chunks with [{}] concurrent processors" ,
247- leaderShard , chunks .size (), concurrentProcessors );
248- for (int i = 0 ; i < concurrentProcessors ; i ++) {
249- ccrExecutor .execute (new AbstractRunnable () {
246+ leaderShard , chunks .size (), maxConcurrentWorker );
247+ initiateChunkWorkers ();
248+ update ();
249+ }
250+
251+ void initiateChunkWorkers () {
252+ int workersToStart = maxConcurrentWorker - activeWorkers .get ();
253+ if (workersToStart == 0 ) {
254+ LOGGER .debug ("{} No new chunk workers were started" , followerShard );
255+ return ;
256+ }
257+
258+ LOGGER .debug ("{} Starting [{}] new chunk workers" , followerShard , workersToStart );
259+ for (int i = 0 ; i < workersToStart ; i ++) {
260+ threadPool .executor (Ccr .CCR_THREAD_POOL_NAME ).execute (new AbstractRunnable () {
250261 @ Override
251262 public void onFailure (Exception e ) {
252263 assert e != null ;
253- LOGGER .error (() -> new ParameterizedMessage ("{} Failure starting processor" , leaderShard ), e );
254- postProcessChuck (e );
264+ LOGGER .error (() -> new ParameterizedMessage ("{} Failure starting processor" , followerShard ), e );
265+ failureHandler . accept (e );
255266 }
256-
267+
257268 @ Override
258269 protected void doRun () throws Exception {
259270 processNextChunk ();
260271 }
261272 });
273+ activeWorkers .incrementAndGet ();
262274 }
263275 }
264276
265277 void processNextChunk () {
278+ if (stateSupplier .get () == false ) {
279+ return ;
280+ }
281+
266282 long [] chunk = chunks .poll ();
267283 if (chunk == null ) {
268- postProcessChuck (null );
284+ int activeWorkers = this .activeWorkers .decrementAndGet ();
285+ LOGGER .debug ("{} No more chunks to process, active workers [{}]" , leaderShard , activeWorkers );
269286 return ;
270287 }
271288 LOGGER .debug ("{} Processing chunk [{}/{}]" , leaderShard , chunk [0 ], chunk [1 ]);
272289 Consumer <Exception > processorHandler = e -> {
273290 if (e == null ) {
274291 LOGGER .debug ("{} Successfully processed chunk [{}/{}]" , leaderShard , chunk [0 ], chunk [1 ]);
292+ if (lastPolledGlobalCheckpoint .updateAndGet (x -> x < chunk [1 ] ? chunk [1 ] : x ) == chunk [1 ]) {
293+ processedGlobalCheckpointUpdater .accept (chunk [1 ]);
294+ }
275295 processNextChunk ();
276296 } else {
277297 LOGGER .error (() -> new ParameterizedMessage ("{} Failure processing chunk [{}/{}]" ,
278298 leaderShard , chunk [0 ], chunk [1 ]), e );
279- postProcessChuck (e );
299+ failureHandler . accept (e );
280300 }
281301 };
302+ Executor ccrExecutor = threadPool .executor (Ccr .CCR_THREAD_POOL_NAME );
282303 ChunkProcessor processor = new ChunkProcessor (leaderClient , followerClient , chunks , ccrExecutor , imdVersionChecker ,
283304 leaderShard , followerShard , processorHandler );
284305 processor .start (chunk [0 ], chunk [1 ], processorMaxTranslogBytes );
285306 }
286307
287- void postProcessChuck (Exception e ) {
288- if (failureHolder .compareAndSet (null , e ) == false ) {
289- Exception firstFailure = failureHolder .get ();
290- firstFailure .addSuppressed (e );
291- }
292- if (countDown .countDown ()) {
293- handler .accept (failureHolder .get ());
294- }
308+ void schedule (Runnable runnable ) {
309+ threadPool .schedule (CHECK_LEADER_GLOBAL_CHECKPOINT_INTERVAL , Ccr .CCR_THREAD_POOL_NAME , new AbstractRunnable () {
310+ @ Override
311+ public void onFailure (Exception e ) {
312+ failureHandler .accept (e );
313+ }
314+
315+ @ Override
316+ protected void doRun () throws Exception {
317+ runnable .run ();
318+ }
319+ });
295320 }
296321
297322 Queue <long []> getChunks () {
0 commit comments