3636import org .elasticsearch .cluster .routing .ShardRouting ;
3737import org .elasticsearch .common .StopWatch ;
3838import org .elasticsearch .common .bytes .BytesArray ;
39+ import org .elasticsearch .common .collect .Tuple ;
3940import org .elasticsearch .common .lease .Releasable ;
4041import org .elasticsearch .common .logging .Loggers ;
4142import org .elasticsearch .common .lucene .store .InputStreamIndexInput ;
4445import org .elasticsearch .common .util .CancellableThreads ;
4546import org .elasticsearch .common .util .concurrent .FutureUtils ;
4647import org .elasticsearch .core .internal .io .IOUtils ;
47- import org .elasticsearch .core .internal .io .Streams ;
4848import org .elasticsearch .index .engine .Engine ;
4949import org .elasticsearch .index .engine .RecoveryEngineException ;
5050import org .elasticsearch .index .seqno .LocalCheckpointTracker ;
5959import org .elasticsearch .threadpool .ThreadPool ;
6060import org .elasticsearch .transport .RemoteTransportException ;
6161
62- import java .io .BufferedOutputStream ;
6362import java .io .Closeable ;
6463import java .io .IOException ;
65- import java .io .OutputStream ;
64+ import java .io .InputStream ;
6665import java .util .ArrayList ;
6766import java .util .Collections ;
6867import java .util .Comparator ;
7170import java .util .concurrent .CompletableFuture ;
7271import java .util .concurrent .CopyOnWriteArrayList ;
7372import java .util .concurrent .atomic .AtomicLong ;
74- import java .util .function . Function ;
73+ import java .util .concurrent . atomic . AtomicReference ;
7574import java .util .function .Supplier ;
7675import java .util .stream .StreamSupport ;
7776
77+ import static org .elasticsearch .index .seqno .SequenceNumbers .NO_OPS_PERFORMED ;
78+
7879/**
7980 * RecoverySourceHandler handles the three phases of shard recovery, which is
8081 * everything relating to copying the segment files as well as sending translog
@@ -96,17 +97,19 @@ public class RecoverySourceHandler {
9697 private final StartRecoveryRequest request ;
9798 private final int chunkSizeInBytes ;
9899 private final RecoveryTargetHandler recoveryTarget ;
100+ private final int maxConcurrentFileChunks ;
99101 private final CancellableThreads cancellableThreads = new CancellableThreads ();
100102
101- public RecoverySourceHandler (final IndexShard shard , RecoveryTargetHandler recoveryTarget ,
102- final StartRecoveryRequest request ,
103- final int fileChunkSizeInBytes ) {
103+ public RecoverySourceHandler (final IndexShard shard , RecoveryTargetHandler recoveryTarget , final StartRecoveryRequest request ,
104+ final int fileChunkSizeInBytes , final int maxConcurrentFileChunks ) {
104105 this .shard = shard ;
105106 this .recoveryTarget = recoveryTarget ;
106107 this .request = request ;
107108 this .shardId = this .request .shardId ().id ();
108109 this .logger = Loggers .getLogger (getClass (), request .shardId (), "recover to " + request .targetNode ().getName ());
109110 this .chunkSizeInBytes = fileChunkSizeInBytes ;
111+ // if the target is on an old version, it won't be able to handle out-of-order file chunks.
112+ this .maxConcurrentFileChunks = request .targetNode ().getVersion ().onOrAfter (Version .V_7_0_0 ) ? maxConcurrentFileChunks : 1 ;
110113 }
111114
112115 public StartRecoveryRequest getRequest () {
@@ -407,10 +410,7 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer>
407410 phase1ExistingFileNames .size (), new ByteSizeValue (existingTotalSize ));
408411 cancellableThreads .execute (() -> recoveryTarget .receiveFileInfo (
409412 phase1FileNames , phase1FileSizes , phase1ExistingFileNames , phase1ExistingFileSizes , translogOps .get ()));
410- // How many bytes we've copied since we last called RateLimiter.pause
411- final Function <StoreFileMetaData , OutputStream > outputStreamFactories =
412- md -> new BufferedOutputStream (new RecoveryOutputStream (md , translogOps ), chunkSizeInBytes );
413- sendFiles (store , phase1Files .toArray (new StoreFileMetaData [phase1Files .size ()]), outputStreamFactories );
413+ sendFiles (store , phase1Files .toArray (new StoreFileMetaData [0 ]), translogOps );
414414 // Send the CLEAN_FILES request, which takes all of the files that
415415 // were transferred and renames them from their temporary file
416416 // names to the actual file names. It also writes checksums for
@@ -649,73 +649,72 @@ public String toString() {
649649 '}' ;
650650 }
651651
652-
653- final class RecoveryOutputStream extends OutputStream {
654- private final StoreFileMetaData md ;
655- private final Supplier <Integer > translogOps ;
656- private long position = 0 ;
657-
658- RecoveryOutputStream (StoreFileMetaData md , Supplier <Integer > translogOps ) {
659- this .md = md ;
660- this .translogOps = translogOps ;
661- }
662-
663- @ Override
664- public void write (int b ) throws IOException {
665- throw new UnsupportedOperationException ("we can't send single bytes over the wire" );
652+ void sendFiles (Store store , StoreFileMetaData [] files , Supplier <Integer > translogOps ) throws Exception {
653+ ArrayUtil .timSort (files , Comparator .comparingLong (StoreFileMetaData ::length )); // send smallest first
654+ final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker (NO_OPS_PERFORMED , NO_OPS_PERFORMED );
655+ final AtomicReference <Tuple <StoreFileMetaData , Exception >> error = new AtomicReference <>();
656+ final byte [] buffer = new byte [chunkSizeInBytes ];
657+ for (final StoreFileMetaData md : files ) {
658+ if (error .get () != null ) {
659+ break ;
660+ }
661+ try (IndexInput indexInput = store .directory ().openInput (md .name (), IOContext .READONCE );
662+ InputStream in = new InputStreamIndexInput (indexInput , md .length ())) {
663+ long position = 0 ;
664+ int bytesRead ;
665+ while ((bytesRead = in .read (buffer , 0 , buffer .length )) != -1 ) {
666+ final BytesArray content = new BytesArray (buffer , 0 , bytesRead );
667+ final boolean lastChunk = position + content .length () == md .length ();
668+ final long requestSeqId = requestSeqIdTracker .generateSeqNo ();
669+ cancellableThreads .execute (() -> requestSeqIdTracker .waitForOpsToComplete (requestSeqId - maxConcurrentFileChunks ));
670+ cancellableThreads .checkForCancel ();
671+ if (error .get () != null ) {
672+ break ;
673+ }
674+ final long requestFilePosition = position ;
675+ cancellableThreads .executeIO (() ->
676+ recoveryTarget .writeFileChunk (md , requestFilePosition , content , lastChunk , translogOps .get (),
677+ ActionListener .wrap (
678+ r -> requestSeqIdTracker .markSeqNoAsCompleted (requestSeqId ),
679+ e -> {
680+ error .compareAndSet (null , Tuple .tuple (md , e ));
681+ requestSeqIdTracker .markSeqNoAsCompleted (requestSeqId );
682+ }
683+ )));
684+ position += content .length ();
685+ }
686+ } catch (Exception e ) {
687+ error .compareAndSet (null , Tuple .tuple (md , e ));
688+ break ;
689+ }
666690 }
667-
668- @ Override
669- public void write (byte [] b , int offset , int length ) throws IOException {
670- sendNextChunk (position , new BytesArray (b , offset , length ), md .length () == position + length );
671- position += length ;
672- assert md .length () >= position : "length: " + md .length () + " but positions was: " + position ;
691+ // When we terminate exceptionally, we don't wait for the outstanding requests as we don't use their results anyway.
692+ // This allows us to end quickly and eliminate the complexity of handling requestSeqIds in case of error.
693+ if (error .get () == null ) {
694+ cancellableThreads .execute (() -> requestSeqIdTracker .waitForOpsToComplete (requestSeqIdTracker .getMaxSeqNo ()));
673695 }
674-
675- private void sendNextChunk (long position , BytesArray content , boolean lastChunk ) throws IOException {
676- // Actually send the file chunk to the target node, waiting for it to complete
677- cancellableThreads .executeIO (() ->
678- recoveryTarget .writeFileChunk (md , position , content , lastChunk , translogOps .get ())
679- );
680- if (shard .state () == IndexShardState .CLOSED ) { // check if the shard got closed on us
681- throw new IndexShardClosedException (request .shardId ());
682- }
696+ if (error .get () != null ) {
697+ handleErrorOnSendFiles (store , error .get ().v1 (), error .get ().v2 ());
683698 }
684699 }
685700
686- void sendFiles (Store store , StoreFileMetaData [] files , Function <StoreFileMetaData , OutputStream > outputStreamFactory ) throws Exception {
687- store .incRef ();
688- try {
689- ArrayUtil .timSort (files , Comparator .comparingLong (StoreFileMetaData ::length )); // send smallest first
690- for (int i = 0 ; i < files .length ; i ++) {
691- final StoreFileMetaData md = files [i ];
692- try (IndexInput indexInput = store .directory ().openInput (md .name (), IOContext .READONCE )) {
693- // it's fine that we are only having the indexInput in the try/with block. The copy methods handles
694- // exceptions during close correctly and doesn't hide the original exception.
695- Streams .copy (new InputStreamIndexInput (indexInput , md .length ()), outputStreamFactory .apply (md ));
696- } catch (Exception e ) {
697- final IOException corruptIndexException ;
698- if ((corruptIndexException = ExceptionsHelper .unwrapCorruption (e )) != null ) {
699- if (store .checkIntegrityNoException (md ) == false ) { // we are corrupted on the primary -- fail!
700- logger .warn ("{} Corrupted file detected {} checksum mismatch" , shardId , md );
701- failEngine (corruptIndexException );
702- throw corruptIndexException ;
703- } else { // corruption has happened on the way to replica
704- RemoteTransportException exception = new RemoteTransportException ("File corruption occurred on recovery but " +
705- "checksums are ok" , null );
706- exception .addSuppressed (e );
707- logger .warn (() -> new ParameterizedMessage (
708- "{} Remote file corruption on node {}, recovering {}. local checksum OK" ,
709- shardId , request .targetNode (), md ), corruptIndexException );
710- throw exception ;
711- }
712- } else {
713- throw e ;
714- }
715- }
701+ private void handleErrorOnSendFiles (Store store , StoreFileMetaData md , Exception e ) throws Exception {
702+ final IOException corruptIndexException ;
703+ if ((corruptIndexException = ExceptionsHelper .unwrapCorruption (e )) != null ) {
704+ if (store .checkIntegrityNoException (md ) == false ) { // we are corrupted on the primary -- fail!
705+ logger .warn ("{} Corrupted file detected {} checksum mismatch" , shardId , md );
706+ failEngine (corruptIndexException );
707+ throw corruptIndexException ;
708+ } else { // corruption has happened on the way to replica
709+ RemoteTransportException exception = new RemoteTransportException (
710+ "File corruption occurred on recovery but checksums are ok" , null );
711+ exception .addSuppressed (e );
712+ logger .warn (() -> new ParameterizedMessage ("{} Remote file corruption on node {}, recovering {}. local checksum OK" ,
713+ shardId , request .targetNode (), md ), corruptIndexException );
714+ throw exception ;
716715 }
717- } finally {
718- store . decRef () ;
716+ } else {
717+ throw e ;
719718 }
720719 }
721720
0 commit comments