3131import org .apache .lucene .util .ArrayUtil ;
3232import org .elasticsearch .ExceptionsHelper ;
3333import org .elasticsearch .action .ActionListener ;
34- import org .elasticsearch .action .ActionRunnable ;
3534import org .elasticsearch .action .StepListener ;
3635import org .elasticsearch .cluster .routing .IndexShardRoutingTable ;
3736import org .elasticsearch .cluster .routing .ShardRouting ;
4746import org .elasticsearch .common .unit .ByteSizeValue ;
4847import org .elasticsearch .common .unit .TimeValue ;
4948import org .elasticsearch .common .util .CancellableThreads ;
50- import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
49+ import org .elasticsearch .common .util .concurrent .AsyncIOProcessor ;
5150import org .elasticsearch .common .util .concurrent .FutureUtils ;
5251import org .elasticsearch .core .internal .io .IOUtils ;
5352import org .elasticsearch .index .engine .Engine ;
6867
6968import java .io .Closeable ;
7069import java .io .IOException ;
70+ import java .util .ArrayDeque ;
7171import java .util .ArrayList ;
7272import java .util .Arrays ;
7373import java .util .Collections ;
7979import java .util .Objects ;
8080import java .util .concurrent .CompletableFuture ;
8181import java .util .concurrent .CopyOnWriteArrayList ;
82- import java .util .concurrent .Executor ;
83- import java .util .concurrent .Semaphore ;
82+ import java .util .concurrent .atomic .AtomicBoolean ;
8483import java .util .concurrent .atomic .AtomicInteger ;
85- import java .util .concurrent .atomic .AtomicReference ;
8684import java .util .function .Consumer ;
8785import java .util .function .IntSupplier ;
8886import java .util .stream .StreamSupport ;
@@ -113,12 +111,10 @@ public class RecoverySourceHandler {
113111 private final int maxConcurrentFileChunks ;
114112 private final CancellableThreads cancellableThreads = new CancellableThreads ();
115113 private final List <Closeable > resources = new CopyOnWriteArrayList <>();
116- private final Executor sendFileExecutor ;
117114
118115 public RecoverySourceHandler (IndexShard shard , RecoveryTargetHandler recoveryTarget , StartRecoveryRequest request ,
119- Executor sendFileExecutor , int fileChunkSizeInBytes , int maxConcurrentFileChunks ) {
116+ int fileChunkSizeInBytes , int maxConcurrentFileChunks ) {
120117 this .shard = shard ;
121- this .sendFileExecutor = sendFileExecutor ;
122118 this .recoveryTarget = recoveryTarget ;
123119 this .request = request ;
124120 this .shardId = this .request .shardId ().id ();
@@ -701,77 +697,81 @@ public String toString() {
701697 * one of the networking threads which receive/handle the acknowledgments of the current pending file chunk requests. This process will
702698 * continue until all chunks are sent and acknowledged.
703699 */
704- private class MultiFileSender extends ActionRunnable < Void > implements Closeable {
700+ private class MultiFileSender extends AsyncIOProcessor < FileChunkResponse > implements Closeable {
705701 private final Store store ;
706702 private final IntSupplier translogOps ;
703+ private final AtomicBoolean done = new AtomicBoolean (false );
704+ private final ActionListener <Void > listener ;
707705 private final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker (NO_OPS_PERFORMED , NO_OPS_PERFORMED );
708- private final Semaphore semaphore = new Semaphore (0 );
709706 private final Iterator <StoreFileMetaData > remainingFiles ;
710707 private StoreFileMetaData currentFile ;
711708 private InputStreamIndexInput currentInput = null ;
712709 private long currentChunkPosition = 0 ;
713- private final Deque <byte []> recycledBuffers = ConcurrentCollections . newDeque ();
714- private final AtomicReference < Tuple < StoreFileMetaData , Exception >> error = new AtomicReference <>( );
710+ private final Deque <byte []> recycledBuffers = new ArrayDeque <> ();
711+ private final FileChunkResponse INITIAL_RESPONSE = new FileChunkResponse ( SequenceNumbers . UNASSIGNED_SEQ_NO , null , null );
715712
716713 MultiFileSender (Store store , IntSupplier translogOps , StoreFileMetaData [] files , ActionListener <Void > listener ) {
717- super (ActionListener . notifyOnce ( listener ));
714+ super (logger , maxConcurrentFileChunks * 2 , shard . getThreadPool (). getThreadContext ( ));
718715 this .store = store ;
719716 this .translogOps = translogOps ;
720717 this .remainingFiles = Arrays .asList (files ).iterator ();
718+ this .listener = ActionListener .wrap (
719+ r -> {
720+ if (done .compareAndSet (false , true )) {
721+ listener .onResponse (r );
722+ }
723+ },
724+ e -> {
725+ if (done .compareAndSet (false , true )) {
726+ listener .onFailure (e );
727+ }
728+ });
729+ }
730+
731+ void start () {
732+ put (INITIAL_RESPONSE , e -> {});
721733 }
722734
723735 @ Override
724- protected void doRun () throws Exception {
725- assert ThreadPool .assertCurrentMethodIsNotCalledRecursively ();
726- assert Transports .assertNotTransportThread (RecoverySourceHandler .this + "[send file chunk]" );
727- while (true ) {
728- assert semaphore .availablePermits () == 0 ;
729- cancellableThreads .checkForCancel ();
730- if (canSendMore () == false ) {
731- semaphore .release ();
732- // Here we have to retry before abort to avoid a race situation where the other threads have flipped `canSendMore`
733- // condition but they are not going to resume the sending process because this thread still holds the semaphore.
734- final boolean changed = canSendMore () || error .get () != null ;
735- if (changed == false || semaphore .tryAcquire () == false ) {
736- break ;
736+ protected void write (List <Tuple <FileChunkResponse , Consumer <Exception >>> responses ) {
737+ assert Transports .assertNotTransportThread (RecoverySourceHandler .this + "[send file chunks]" );
738+ if (done .get ()) {
739+ return ;
740+ }
741+ try {
742+ for (Tuple <FileChunkResponse , Consumer <Exception >> response : responses ) {
743+ if (response .v1 () == INITIAL_RESPONSE ) {
744+ continue ; // not an actual response, a marker to initialize the sending process.
745+ }
746+ requestSeqIdTracker .markSeqNoAsProcessed (response .v1 ().seqNo );
747+ response .v1 ().chunk .close ();
748+ if (response .v1 ().failure != null ) {
749+ handleErrorOnSendFiles (store , response .v1 ().failure , new StoreFileMetaData []{response .v1 ().chunk .md });
750+ throw response .v1 ().failure ;
737751 }
738752 }
739- if (error .get () != null ) {
740- handleErrorOnSendFiles (store , error .get ().v2 (), new StoreFileMetaData []{error .get ().v1 ()});
741- throw error .get ().v2 ();
742- }
743- final FileChunk chunk = readNextChunk ();
744- if (chunk == null ) {
745- semaphore .release (); // allow other threads respond if we are not done yet.
746- if (requestSeqIdTracker .getMaxSeqNo () == requestSeqIdTracker .getProcessedCheckpoint () && semaphore .tryAcquire ()) {
747- listener .onResponse (null );
753+ while (requestSeqIdTracker .getMaxSeqNo () - requestSeqIdTracker .getProcessedCheckpoint () < maxConcurrentFileChunks ) {
754+ cancellableThreads .checkForCancel ();
755+ final FileChunk chunk = readNextChunk ();
756+ if (chunk == null ) {
757+ if (requestSeqIdTracker .getProcessedCheckpoint () == requestSeqIdTracker .getMaxSeqNo ()) {
758+ listener .onResponse (null );
759+ }
760+ return ;
748761 }
749- break ;
762+ final long requestSeqId = requestSeqIdTracker .generateSeqNo ();
763+ cancellableThreads .execute (() -> recoveryTarget .writeFileChunk (chunk .md , chunk .position , chunk .content , chunk .lastChunk ,
764+ translogOps .getAsInt (), ActionListener .wrap (
765+ r -> this .put (new FileChunkResponse (requestSeqId , chunk , null ), ignored -> {}),
766+ e -> this .put (new FileChunkResponse (requestSeqId , chunk , e ), ignored -> {})
767+ )));
750768 }
751- final long requestSeqId = requestSeqIdTracker .generateSeqNo ();
752- cancellableThreads .execute (() ->
753- recoveryTarget .writeFileChunk (chunk .md , chunk .position , chunk .content , chunk .lastChunk , translogOps .getAsInt (),
754- ActionListener .wrap (
755- r -> {
756- chunk .close (); // release the buffer so we can reuse to reduce allocation
757- requestSeqIdTracker .markSeqNoAsProcessed (requestSeqId );
758- if (canSendMore () && semaphore .tryAcquire ()) {
759- sendFileExecutor .execute (this ); // fork off from the network thread
760- }
761- },
762- e -> {
763- if (error .compareAndSet (null , Tuple .tuple (chunk .md , e )) && semaphore .tryAcquire ()) {
764- // have to fork as handleErrorOnSendFiles can read file which should not happen on the network thread.
765- sendFileExecutor .execute (this );
766- }
767- })
768- )
769- );
769+ } catch (Exception e ) {
770+ listener .onFailure (e );
770771 }
771772 }
772773
773- FileChunk readNextChunk () throws Exception {
774- assert semaphore .availablePermits () == 0 ;
774+ private FileChunk readNextChunk () throws Exception {
775775 try {
776776 if (currentInput == null ) {
777777 if (remainingFiles .hasNext () == false ) {
@@ -808,13 +808,8 @@ public void close() throws IOException {
808808 }
809809 }
810810
811- boolean canSendMore () {
812- return requestSeqIdTracker .getMaxSeqNo () - requestSeqIdTracker .getProcessedCheckpoint () < maxConcurrentFileChunks ;
813- }
814-
815811 @ Override
816812 public void close () throws IOException {
817- assert semaphore .availablePermits () == 0 ;
818813 IOUtils .close (recycledBuffers ::clear , currentInput , () -> currentInput = null );
819814 }
820815 }
@@ -840,6 +835,18 @@ public void close() {
840835 }
841836 }
842837
838+ private static class FileChunkResponse {
839+ final long seqNo ;
840+ final FileChunk chunk ;
841+ final Exception failure ;
842+
843+ FileChunkResponse (long seqNo , FileChunk chunk , Exception failure ) {
844+ this .seqNo = seqNo ;
845+ this .chunk = chunk ;
846+ this .failure = failure ;
847+ }
848+ }
849+
843850 void sendFiles (Store store , StoreFileMetaData [] files , IntSupplier translogOps , ActionListener <Void > listener ) {
844851 ArrayUtil .timSort (files , Comparator .comparingLong (StoreFileMetaData ::length )); // send smallest first
845852 StepListener <Void > wrappedListener = new StepListener <>();
@@ -852,7 +859,7 @@ void sendFiles(Store store, StoreFileMetaData[] files, IntSupplier translogOps,
852859 listener .onFailure (e );
853860 });
854861 resources .add (multiFileSender );
855- multiFileSender .run ();
862+ multiFileSender .start ();
856863 }
857864
858865 private void cleanFiles (Store store , Store .MetadataSnapshot sourceMetadata , IntSupplier translogOps ,
0 commit comments