1212import org .apache .logging .log4j .Logger ;
1313import org .apache .logging .log4j .message .ParameterizedMessage ;
1414import org .apache .lucene .index .IndexCommit ;
15- import org .elasticsearch .ElasticsearchException ;
1615import org .elasticsearch .ElasticsearchSecurityException ;
1716import org .elasticsearch .ExceptionsHelper ;
1817import org .elasticsearch .action .ActionListener ;
2120import org .elasticsearch .action .admin .indices .mapping .put .PutMappingRequest ;
2221import org .elasticsearch .action .support .ListenerTimeouts ;
2322import org .elasticsearch .action .support .PlainActionFuture ;
23+ import org .elasticsearch .action .support .ThreadedActionListener ;
2424import org .elasticsearch .client .Client ;
2525import org .elasticsearch .cluster .ClusterName ;
2626import org .elasticsearch .cluster .metadata .IndexMetaData ;
3131import org .elasticsearch .common .Strings ;
3232import org .elasticsearch .common .UUIDs ;
3333import org .elasticsearch .common .collect .ImmutableOpenMap ;
34- import org .elasticsearch .common .collect .Tuple ;
3534import org .elasticsearch .common .component .AbstractLifecycleComponent ;
35+ import org .elasticsearch .common .lease .Releasable ;
3636import org .elasticsearch .common .metrics .CounterMetric ;
3737import org .elasticsearch .common .settings .Settings ;
3838import org .elasticsearch .common .unit .ByteSizeValue ;
3939import org .elasticsearch .common .unit .TimeValue ;
40- import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
4140import org .elasticsearch .common .util .concurrent .ThreadContext ;
4241import org .elasticsearch .index .Index ;
4342import org .elasticsearch .index .engine .EngineException ;
4443import org .elasticsearch .index .mapper .MapperService ;
45- import org .elasticsearch .index .seqno .LocalCheckpointTracker ;
4644import org .elasticsearch .index .seqno .RetentionLeaseAlreadyExistsException ;
4745import org .elasticsearch .index .seqno .RetentionLeaseInvalidRetainingSeqNoException ;
4846import org .elasticsearch .index .seqno .RetentionLeaseNotFoundException ;
5452import org .elasticsearch .index .snapshots .blobstore .SnapshotFiles ;
5553import org .elasticsearch .index .store .Store ;
5654import org .elasticsearch .index .store .StoreFileMetaData ;
55+ import org .elasticsearch .indices .recovery .MultiFileTransfer ;
5756import org .elasticsearch .indices .recovery .MultiFileWriter ;
5857import org .elasticsearch .indices .recovery .RecoveryState ;
5958import org .elasticsearch .repositories .IndexId ;
8786import java .util .Map ;
8887import java .util .Optional ;
8988import java .util .Set ;
90- import java .util .concurrent .atomic .AtomicReference ;
9189import java .util .function .LongConsumer ;
9290import java .util .function .Supplier ;
91+ import java .util .stream .Collectors ;
9392
9493import static org .elasticsearch .index .seqno .RetentionLeaseActions .RETAIN_ALL ;
95- import static org .elasticsearch .index .seqno .SequenceNumbers .NO_OPS_PERFORMED ;
9694import static org .elasticsearch .xpack .ccr .CcrRetentionLeases .retentionLeaseId ;
9795import static org .elasticsearch .xpack .ccr .CcrRetentionLeases .syncAddRetentionLease ;
9896import static org .elasticsearch .xpack .ccr .CcrRetentionLeases .syncRenewRetentionLease ;
@@ -477,97 +475,82 @@ void restoreFiles(Store store) {
477475 }
478476
479477 @ Override
480- protected void restoreFiles (List <FileInfo > filesToRecover , Store store ) throws IOException {
478+ protected void restoreFiles (List <FileInfo > filesToRecover , Store store ) {
481479 logger .trace ("[{}] starting CCR restore of {} files" , shardId , filesToRecover );
480+ final PlainActionFuture <Void > restoreFilesFuture = new PlainActionFuture <>();
481+ final List <StoreFileMetaData > mds = filesToRecover .stream ().map (FileInfo ::metadata ).collect (Collectors .toList ());
482+ final MultiFileTransfer <FileChunk > multiFileTransfer = new MultiFileTransfer <FileChunk >(
483+ logger , threadPool .getThreadContext (), restoreFilesFuture , ccrSettings .getMaxConcurrentFileChunks (), mds ) {
482484
483- try (MultiFileWriter multiFileWriter = new MultiFileWriter (store , recoveryState .getIndex (), "" , logger , () -> {
484- })) {
485- final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker (NO_OPS_PERFORMED , NO_OPS_PERFORMED );
486- final AtomicReference <Tuple <StoreFileMetaData , Exception >> error = new AtomicReference <>();
485+ final MultiFileWriter multiFileWriter = new MultiFileWriter (store , recoveryState .getIndex (), "" , logger , () -> {});
486+ long offset = 0 ;
487487
488- for (FileInfo fileInfo : filesToRecover ) {
489- final long fileLength = fileInfo .length ();
490- long offset = 0 ;
491- while (offset < fileLength && error .get () == null ) {
492- final long requestSeqId = requestSeqIdTracker .generateSeqNo ();
493- try {
494- requestSeqIdTracker .waitForProcessedOpsToComplete (requestSeqId - ccrSettings .getMaxConcurrentFileChunks ());
495-
496- if (error .get () != null ) {
497- requestSeqIdTracker .markSeqNoAsProcessed (requestSeqId );
498- break ;
499- }
500-
501- final int bytesRequested = Math .toIntExact (
502- Math .min (ccrSettings .getChunkSize ().getBytes (), fileLength - offset ));
503- offset += bytesRequested ;
504-
505- final GetCcrRestoreFileChunkRequest request =
506- new GetCcrRestoreFileChunkRequest (node , sessionUUID , fileInfo .name (), bytesRequested );
507- logger .trace ("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}" , shardId , snapshotId ,
508- fileInfo .name (), offset , bytesRequested );
509-
510- TimeValue timeout = ccrSettings .getRecoveryActionTimeout ();
511- ActionListener <GetCcrRestoreFileChunkAction .GetCcrRestoreFileChunkResponse > listener =
512- ListenerTimeouts .wrapWithTimeout (threadPool , ActionListener .wrap (
513- r -> threadPool .generic ().execute (new AbstractRunnable () {
514- @ Override
515- public void onFailure (Exception e ) {
516- error .compareAndSet (null , Tuple .tuple (fileInfo .metadata (), e ));
517- requestSeqIdTracker .markSeqNoAsProcessed (requestSeqId );
518- }
519-
520- @ Override
521- protected void doRun () throws Exception {
522- final int actualChunkSize = r .getChunk ().length ();
523- logger .trace ("[{}] [{}] got response for file [{}], offset: {}, length: {}" , shardId ,
524- snapshotId , fileInfo .name (), r .getOffset (), actualChunkSize );
525- final long nanosPaused = ccrSettings .getRateLimiter ().maybePause (actualChunkSize );
526- throttleListener .accept (nanosPaused );
527- final boolean lastChunk = r .getOffset () + actualChunkSize >= fileLength ;
528- multiFileWriter .writeFileChunk (fileInfo .metadata (), r .getOffset (), r .getChunk (), lastChunk );
529- requestSeqIdTracker .markSeqNoAsProcessed (requestSeqId );
530- }
531- }),
532- e -> {
533- error .compareAndSet (null , Tuple .tuple (fileInfo .metadata (), e ));
534- requestSeqIdTracker .markSeqNoAsProcessed (requestSeqId );
535- }
536- ), timeout , ThreadPool .Names .GENERIC , GetCcrRestoreFileChunkAction .NAME );
537- remoteClient .execute (GetCcrRestoreFileChunkAction .INSTANCE , request , listener );
538- } catch (Exception e ) {
539- error .compareAndSet (null , Tuple .tuple (fileInfo .metadata (), e ));
540- requestSeqIdTracker .markSeqNoAsProcessed (requestSeqId );
541- }
542- }
488+ @ Override
489+ protected void onNewFile (StoreFileMetaData md ) {
490+ offset = 0 ;
543491 }
544492
545- try {
546- requestSeqIdTracker . waitForProcessedOpsToComplete ( requestSeqIdTracker . getMaxSeqNo ());
547- } catch ( InterruptedException e ) {
548- Thread . currentThread (). interrupt () ;
549- throw new ElasticsearchException ( e );
493+ @ Override
494+ protected FileChunk nextChunkRequest ( StoreFileMetaData md ) {
495+ final int bytesRequested = Math . toIntExact ( Math . min ( ccrSettings . getChunkSize (). getBytes (), md . length () - offset ));
496+ offset += bytesRequested ;
497+ return new FileChunk ( md , bytesRequested , offset == md . length () );
550498 }
551- if (error .get () != null ) {
552- handleError (store , error .get ().v2 ());
499+
500+ @ Override
501+ protected void executeChunkRequest (FileChunk request , ActionListener <Void > listener ) {
502+ final ActionListener <GetCcrRestoreFileChunkAction .GetCcrRestoreFileChunkResponse > threadedListener
503+ = new ThreadedActionListener <>(logger , threadPool , ThreadPool .Names .GENERIC , ActionListener .wrap (
504+ r -> {
505+ writeFileChunk (request .md , r );
506+ listener .onResponse (null );
507+ }, listener ::onFailure ), false );
508+
509+ remoteClient .execute (GetCcrRestoreFileChunkAction .INSTANCE ,
510+ new GetCcrRestoreFileChunkRequest (node , sessionUUID , request .md .name (), request .bytesRequested ),
511+ ListenerTimeouts .wrapWithTimeout (threadPool , threadedListener , ccrSettings .getRecoveryActionTimeout (),
512+ ThreadPool .Names .GENERIC , GetCcrRestoreFileChunkAction .NAME ));
553513 }
554- }
555514
556- logger .trace ("[{}] completed CCR restore" , shardId );
557- }
515+ private void writeFileChunk (StoreFileMetaData md ,
516+ GetCcrRestoreFileChunkAction .GetCcrRestoreFileChunkResponse r ) throws Exception {
517+ final int actualChunkSize = r .getChunk ().length ();
518+ logger .trace ("[{}] [{}] got response for file [{}], offset: {}, length: {}" ,
519+ shardId , snapshotId , md .name (), r .getOffset (), actualChunkSize );
520+ final long nanosPaused = ccrSettings .getRateLimiter ().maybePause (actualChunkSize );
521+ throttleListener .accept (nanosPaused );
522+ multiFileWriter .incRef ();
523+ try (Releasable ignored = multiFileWriter ::decRef ) {
524+ final boolean lastChunk = r .getOffset () + actualChunkSize >= md .length ();
525+ multiFileWriter .writeFileChunk (md , r .getOffset (), r .getChunk (), lastChunk );
526+ } catch (Exception e ) {
527+ handleError (md , e );
528+ throw e ;
529+ }
530+ }
531+
532+ @ Override
533+ protected void handleError (StoreFileMetaData md , Exception e ) throws Exception {
534+ final IOException corruptIndexException ;
535+ if ((corruptIndexException = ExceptionsHelper .unwrapCorruption (e )) != null ) {
536+ try {
537+ store .markStoreCorrupted (corruptIndexException );
538+ } catch (IOException ioe ) {
539+ logger .warn ("store cannot be marked as corrupted" , e );
540+ }
541+ throw corruptIndexException ;
542+ }
543+ throw e ;
544+ }
558545
559- private void handleError (Store store , Exception e ) throws IOException {
560- final IOException corruptIndexException ;
561- if ((corruptIndexException = ExceptionsHelper .unwrapCorruption (e )) != null ) {
562- try {
563- store .markStoreCorrupted (corruptIndexException );
564- } catch (IOException ioe ) {
565- logger .warn ("store cannot be marked as corrupted" , e );
546+ @ Override
547+ public void close () {
548+ multiFileWriter .close ();
566549 }
567- throw corruptIndexException ;
568- } else {
569- ExceptionsHelper . reThrowIfNotNull ( e );
570- }
550+ } ;
551+ multiFileTransfer . start ();
552+ restoreFilesFuture . actionGet ( );
553+ logger . trace ( "[{}] completed CCR restore" , shardId );
571554 }
572555
573556 @ Override
@@ -576,5 +559,22 @@ public void close() {
576559 ClearCcrRestoreSessionAction .ClearCcrRestoreSessionResponse response =
577560 remoteClient .execute (ClearCcrRestoreSessionAction .INSTANCE , clearRequest ).actionGet (ccrSettings .getRecoveryActionTimeout ());
578561 }
562+
563+ private static class FileChunk implements MultiFileTransfer .ChunkRequest {
564+ final StoreFileMetaData md ;
565+ final int bytesRequested ;
566+ final boolean lastChunk ;
567+
568+ FileChunk (StoreFileMetaData md , int bytesRequested , boolean lastChunk ) {
569+ this .md = md ;
570+ this .bytesRequested = bytesRequested ;
571+ this .lastChunk = lastChunk ;
572+ }
573+
574+ @ Override
575+ public boolean lastChunk () {
576+ return lastChunk ;
577+ }
578+ }
579579 }
580580}
0 commit comments