1212import org .apache .logging .log4j .Logger ;
1313import org .apache .logging .log4j .message .ParameterizedMessage ;
1414import org .apache .lucene .index .IndexCommit ;
15- import org .elasticsearch .ElasticsearchException ;
1615import org .elasticsearch .ElasticsearchSecurityException ;
1716import org .elasticsearch .ExceptionsHelper ;
1817import org .elasticsearch .action .ActionListener ;
2120import org .elasticsearch .action .admin .indices .mapping .put .PutMappingRequest ;
2221import org .elasticsearch .action .support .ListenerTimeouts ;
2322import org .elasticsearch .action .support .PlainActionFuture ;
23+ import org .elasticsearch .action .support .ThreadedActionListener ;
2424import org .elasticsearch .client .Client ;
2525import org .elasticsearch .cluster .ClusterName ;
2626import org .elasticsearch .cluster .metadata .IndexMetaData ;
3131import org .elasticsearch .common .Strings ;
3232import org .elasticsearch .common .UUIDs ;
3333import org .elasticsearch .common .collect .ImmutableOpenMap ;
34- import org .elasticsearch .common .collect .Tuple ;
3534import org .elasticsearch .common .component .AbstractLifecycleComponent ;
35+ import org .elasticsearch .common .lease .Releasable ;
3636import org .elasticsearch .common .metrics .CounterMetric ;
3737import org .elasticsearch .common .settings .Settings ;
3838import org .elasticsearch .common .unit .ByteSizeValue ;
3939import org .elasticsearch .common .unit .TimeValue ;
40- import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
4140import org .elasticsearch .common .util .concurrent .ThreadContext ;
4241import org .elasticsearch .index .Index ;
4342import org .elasticsearch .index .engine .EngineException ;
4443import org .elasticsearch .index .mapper .MapperService ;
45- import org .elasticsearch .index .seqno .LocalCheckpointTracker ;
4644import org .elasticsearch .index .seqno .RetentionLeaseAlreadyExistsException ;
4745import org .elasticsearch .index .seqno .RetentionLeaseInvalidRetainingSeqNoException ;
4846import org .elasticsearch .index .seqno .RetentionLeaseNotFoundException ;
5452import org .elasticsearch .index .snapshots .blobstore .SnapshotFiles ;
5553import org .elasticsearch .index .store .Store ;
5654import org .elasticsearch .index .store .StoreFileMetaData ;
55+ import org .elasticsearch .indices .recovery .MultiFileTransfer ;
5756import org .elasticsearch .indices .recovery .MultiFileWriter ;
5857import org .elasticsearch .indices .recovery .RecoveryState ;
5958import org .elasticsearch .repositories .IndexId ;
8786import java .util .Map ;
8887import java .util .Optional ;
8988import java .util .Set ;
90- import java .util .concurrent .atomic .AtomicReference ;
9189import java .util .function .LongConsumer ;
9290import java .util .function .Supplier ;
91+ import java .util .stream .Collectors ;
9392
9493import static org .elasticsearch .index .seqno .RetentionLeaseActions .RETAIN_ALL ;
95- import static org .elasticsearch .index .seqno .SequenceNumbers .NO_OPS_PERFORMED ;
9694import static org .elasticsearch .xpack .ccr .CcrRetentionLeases .retentionLeaseId ;
9795import static org .elasticsearch .xpack .ccr .CcrRetentionLeases .syncAddRetentionLease ;
9896import static org .elasticsearch .xpack .ccr .CcrRetentionLeases .syncRenewRetentionLease ;
@@ -473,97 +471,82 @@ void restoreFiles(Store store) {
473471 }
474472
475473 @ Override
476- protected void restoreFiles (List <FileInfo > filesToRecover , Store store ) throws IOException {
474+ protected void restoreFiles (List <FileInfo > filesToRecover , Store store ) {
477475 logger .trace ("[{}] starting CCR restore of {} files" , shardId , filesToRecover );
476+ final PlainActionFuture <Void > restoreFilesFuture = new PlainActionFuture <>();
477+ final List <StoreFileMetaData > mds = filesToRecover .stream ().map (FileInfo ::metadata ).collect (Collectors .toList ());
478+ final MultiFileTransfer <FileChunk > multiFileTransfer = new MultiFileTransfer <>(
479+ logger , threadPool .getThreadContext (), restoreFilesFuture , ccrSettings .getMaxConcurrentFileChunks (), mds ) {
478480
479- try (MultiFileWriter multiFileWriter = new MultiFileWriter (store , recoveryState .getIndex (), "" , logger , () -> {
480- })) {
481- final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker (NO_OPS_PERFORMED , NO_OPS_PERFORMED );
482- final AtomicReference <Tuple <StoreFileMetaData , Exception >> error = new AtomicReference <>();
481+ final MultiFileWriter multiFileWriter = new MultiFileWriter (store , recoveryState .getIndex (), "" , logger , () -> {});
482+ long offset = 0 ;
483483
484- for (FileInfo fileInfo : filesToRecover ) {
485- final long fileLength = fileInfo .length ();
486- long offset = 0 ;
487- while (offset < fileLength && error .get () == null ) {
488- final long requestSeqId = requestSeqIdTracker .generateSeqNo ();
489- try {
490- requestSeqIdTracker .waitForProcessedOpsToComplete (requestSeqId - ccrSettings .getMaxConcurrentFileChunks ());
491-
492- if (error .get () != null ) {
493- requestSeqIdTracker .markSeqNoAsProcessed (requestSeqId );
494- break ;
495- }
496-
497- final int bytesRequested = Math .toIntExact (
498- Math .min (ccrSettings .getChunkSize ().getBytes (), fileLength - offset ));
499- offset += bytesRequested ;
500-
501- final GetCcrRestoreFileChunkRequest request =
502- new GetCcrRestoreFileChunkRequest (node , sessionUUID , fileInfo .name (), bytesRequested );
503- logger .trace ("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}" , shardId , snapshotId ,
504- fileInfo .name (), offset , bytesRequested );
505-
506- TimeValue timeout = ccrSettings .getRecoveryActionTimeout ();
507- ActionListener <GetCcrRestoreFileChunkAction .GetCcrRestoreFileChunkResponse > listener =
508- ListenerTimeouts .wrapWithTimeout (threadPool , ActionListener .wrap (
509- r -> threadPool .generic ().execute (new AbstractRunnable () {
510- @ Override
511- public void onFailure (Exception e ) {
512- error .compareAndSet (null , Tuple .tuple (fileInfo .metadata (), e ));
513- requestSeqIdTracker .markSeqNoAsProcessed (requestSeqId );
514- }
515-
516- @ Override
517- protected void doRun () throws Exception {
518- final int actualChunkSize = r .getChunk ().length ();
519- logger .trace ("[{}] [{}] got response for file [{}], offset: {}, length: {}" , shardId ,
520- snapshotId , fileInfo .name (), r .getOffset (), actualChunkSize );
521- final long nanosPaused = ccrSettings .getRateLimiter ().maybePause (actualChunkSize );
522- throttleListener .accept (nanosPaused );
523- final boolean lastChunk = r .getOffset () + actualChunkSize >= fileLength ;
524- multiFileWriter .writeFileChunk (fileInfo .metadata (), r .getOffset (), r .getChunk (), lastChunk );
525- requestSeqIdTracker .markSeqNoAsProcessed (requestSeqId );
526- }
527- }),
528- e -> {
529- error .compareAndSet (null , Tuple .tuple (fileInfo .metadata (), e ));
530- requestSeqIdTracker .markSeqNoAsProcessed (requestSeqId );
531- }
532- ), timeout , ThreadPool .Names .GENERIC , GetCcrRestoreFileChunkAction .NAME );
533- remoteClient .execute (GetCcrRestoreFileChunkAction .INSTANCE , request , listener );
534- } catch (Exception e ) {
535- error .compareAndSet (null , Tuple .tuple (fileInfo .metadata (), e ));
536- requestSeqIdTracker .markSeqNoAsProcessed (requestSeqId );
537- }
538- }
484+ @ Override
485+ protected void onNewFile (StoreFileMetaData md ) {
486+ offset = 0 ;
539487 }
540488
541- try {
542- requestSeqIdTracker . waitForProcessedOpsToComplete ( requestSeqIdTracker . getMaxSeqNo ());
543- } catch ( InterruptedException e ) {
544- Thread . currentThread (). interrupt () ;
545- throw new ElasticsearchException ( e );
489+ @ Override
490+ protected FileChunk nextChunkRequest ( StoreFileMetaData md ) {
491+ final int bytesRequested = Math . toIntExact ( Math . min ( ccrSettings . getChunkSize (). getBytes (), md . length () - offset ));
492+ offset += bytesRequested ;
493+ return new FileChunk ( md , bytesRequested , offset == md . length () );
546494 }
547- if (error .get () != null ) {
548- handleError (store , error .get ().v2 ());
495+
496+ @ Override
497+ protected void executeChunkRequest (FileChunk request , ActionListener <Void > listener ) {
498+ final ActionListener <GetCcrRestoreFileChunkAction .GetCcrRestoreFileChunkResponse > threadedListener
499+ = new ThreadedActionListener <>(logger , threadPool , ThreadPool .Names .GENERIC , ActionListener .wrap (
500+ r -> {
501+ writeFileChunk (request .md , r );
502+ listener .onResponse (null );
503+ }, listener ::onFailure ), false );
504+
505+ remoteClient .execute (GetCcrRestoreFileChunkAction .INSTANCE ,
506+ new GetCcrRestoreFileChunkRequest (node , sessionUUID , request .md .name (), request .bytesRequested ),
507+ ListenerTimeouts .wrapWithTimeout (threadPool , threadedListener , ccrSettings .getRecoveryActionTimeout (),
508+ ThreadPool .Names .GENERIC , GetCcrRestoreFileChunkAction .NAME ));
549509 }
550- }
551510
552- logger .trace ("[{}] completed CCR restore" , shardId );
553- }
511+ private void writeFileChunk (StoreFileMetaData md ,
512+ GetCcrRestoreFileChunkAction .GetCcrRestoreFileChunkResponse r ) throws Exception {
513+ final int actualChunkSize = r .getChunk ().length ();
514+ logger .trace ("[{}] [{}] got response for file [{}], offset: {}, length: {}" ,
515+ shardId , snapshotId , md .name (), r .getOffset (), actualChunkSize );
516+ final long nanosPaused = ccrSettings .getRateLimiter ().maybePause (actualChunkSize );
517+ throttleListener .accept (nanosPaused );
518+ multiFileWriter .incRef ();
519+ try (Releasable ignored = multiFileWriter ::decRef ) {
520+ final boolean lastChunk = r .getOffset () + actualChunkSize >= md .length ();
521+ multiFileWriter .writeFileChunk (md , r .getOffset (), r .getChunk (), lastChunk );
522+ } catch (Exception e ) {
523+ handleError (md , e );
524+ throw e ;
525+ }
526+ }
527+
528+ @ Override
529+ protected void handleError (StoreFileMetaData md , Exception e ) throws Exception {
530+ final IOException corruptIndexException ;
531+ if ((corruptIndexException = ExceptionsHelper .unwrapCorruption (e )) != null ) {
532+ try {
533+ store .markStoreCorrupted (corruptIndexException );
534+ } catch (IOException ioe ) {
535+ logger .warn ("store cannot be marked as corrupted" , e );
536+ }
537+ throw corruptIndexException ;
538+ }
539+ throw e ;
540+ }
554541
555- private void handleError (Store store , Exception e ) throws IOException {
556- final IOException corruptIndexException ;
557- if ((corruptIndexException = ExceptionsHelper .unwrapCorruption (e )) != null ) {
558- try {
559- store .markStoreCorrupted (corruptIndexException );
560- } catch (IOException ioe ) {
561- logger .warn ("store cannot be marked as corrupted" , e );
542+ @ Override
543+ public void close () {
544+ multiFileWriter .close ();
562545 }
563- throw corruptIndexException ;
564- } else {
565- ExceptionsHelper . reThrowIfNotNull ( e );
566- }
546+ } ;
547+ multiFileTransfer . start ();
548+ restoreFilesFuture . actionGet ( );
549+ logger . trace ( "[{}] completed CCR restore" , shardId );
567550 }
568551
569552 @ Override
@@ -572,5 +555,22 @@ public void close() {
572555 ClearCcrRestoreSessionAction .ClearCcrRestoreSessionResponse response =
573556 remoteClient .execute (ClearCcrRestoreSessionAction .INSTANCE , clearRequest ).actionGet (ccrSettings .getRecoveryActionTimeout ());
574557 }
558+
559+ private static class FileChunk implements MultiFileTransfer .ChunkRequest {
560+ final StoreFileMetaData md ;
561+ final int bytesRequested ;
562+ final boolean lastChunk ;
563+
564+ FileChunk (StoreFileMetaData md , int bytesRequested , boolean lastChunk ) {
565+ this .md = md ;
566+ this .bytesRequested = bytesRequested ;
567+ this .lastChunk = lastChunk ;
568+ }
569+
570+ @ Override
571+ public boolean lastChunk () {
572+ return lastChunk ;
573+ }
574+ }
575575 }
576576}
0 commit comments