@@ -415,16 +415,16 @@ void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps
415415 phase1FileNames .size (), new ByteSizeValue (totalSizeInBytes ),
416416 phase1ExistingFileNames .size (), new ByteSizeValue (existingTotalSizeInBytes ));
417417 final StepListener <Void > sendFileInfoStep = new StepListener <>();
418- final StepListener <Void > sendFileChunkStep = new StepListener <>();
418+ final StepListener <Void > sendFilesStep = new StepListener <>();
419419 final StepListener <Void > cleanFilesStep = new StepListener <>();
420420 cancellableThreads .execute (() ->
421421 recoveryTarget .receiveFileInfo (phase1FileNames , phase1FileSizes , phase1ExistingFileNames ,
422422 phase1ExistingFileSizes , translogOps .getAsInt (), sendFileInfoStep ));
423423
424424 sendFileInfoStep .whenComplete (r ->
425- sendFiles (store , phase1Files .toArray (new StoreFileMetaData [0 ]), translogOps , sendFileChunkStep ), listener ::onFailure );
425+ sendFiles (store , phase1Files .toArray (new StoreFileMetaData [0 ]), translogOps , sendFilesStep ), listener ::onFailure );
426426
427- sendFileChunkStep .whenComplete (r ->
427+ sendFilesStep .whenComplete (r ->
428428 cleanFiles (store , recoverySourceMetadata , translogOps , globalCheckpoint , cleanFilesStep ), listener ::onFailure );
429429
430430 final long totalSize = totalSizeInBytes ;
@@ -726,10 +726,20 @@ protected void doRun() throws Exception {
726726 assert Transports .assertNotTransportThread (RecoverySourceHandler .this + "[send file chunk]" );
727727 while (true ) {
728728 assert semaphore .availablePermits () == 0 ;
729+ cancellableThreads .checkForCancel ();
729730 if (error .get () != null ) {
730731 handleErrorOnSendFiles (store , error .get ().v2 (), new StoreFileMetaData []{error .get ().v1 ()});
731732 throw error .get ().v2 ();
732733 }
734+ if (canSendMore () == false ) {
735+ semaphore .release ();
736+ // Here we have to retry before abort to avoid a race situation where the other threads have flipped `canSendMore`
737+ // condition but they are not going to resume the sending process because this thread still holds the semaphore.
738+ final boolean changed = canSendMore () || error .get () != null ;
739+ if (changed == false || semaphore .tryAcquire () == false ) {
740+ break ;
741+ }
742+ }
733743 final FileChunk chunk = readNextChunk ();
734744 if (chunk == null ) {
735745 semaphore .release (); // allow other threads respond if we are not done yet.
@@ -757,15 +767,6 @@ protected void doRun() throws Exception {
757767 })
758768 )
759769 );
760- if (canSendMore () == false ) {
761- semaphore .release ();
762- // Here we have to retry before abort to avoid a race situation where the other threads have flipped `canSendMore`
763- // condition but they are not going to resume the sending process because this thread still holds the semaphore.
764- final boolean changed = canSendMore () || error .get () != null ;
765- if (changed == false || semaphore .tryAcquire () == false ) {
766- break ;
767- }
768- }
769770 }
770771 }
771772
0 commit comments