1414import org .apache .lucene .util .CollectionUtil ;
1515import org .elasticsearch .ElasticsearchException ;
1616import org .elasticsearch .action .ActionListener ;
17- import org .elasticsearch .action .ActionRunnable ;
1817import org .elasticsearch .action .StepListener ;
1918import org .elasticsearch .action .admin .cluster .repositories .get .TransportGetRepositoriesAction ;
2019import org .elasticsearch .action .support .ActionFilters ;
3433import org .elasticsearch .repositories .RepositoriesService ;
3534import org .elasticsearch .repositories .Repository ;
3635import org .elasticsearch .repositories .RepositoryData ;
36+ import org .elasticsearch .repositories .RepositoryMissingException ;
3737import org .elasticsearch .snapshots .SnapshotException ;
3838import org .elasticsearch .snapshots .SnapshotId ;
3939import org .elasticsearch .snapshots .SnapshotInfo ;
4646import org .elasticsearch .transport .TransportService ;
4747
4848import java .util .ArrayList ;
49+ import java .util .Collection ;
4950import java .util .Collections ;
5051import java .util .HashMap ;
5152import java .util .HashSet ;
5253import java .util .List ;
5354import java .util .Map ;
5455import java .util .Set ;
56+ import java .util .concurrent .BlockingQueue ;
57+ import java .util .concurrent .LinkedBlockingQueue ;
5558import java .util .stream .Collectors ;
5659
5760import static java .util .Collections .unmodifiableList ;
@@ -211,8 +214,7 @@ private void loadSnapshotInfos(SnapshotsInProgress snapshotsInProgress, String r
211214 }
212215
213216 if (verbose ) {
214- threadPool .generic ().execute (ActionRunnable .supply (
215- listener , () -> snapshots (snapshotsInProgress , repo , new ArrayList <>(toResolve ), ignoreUnavailable , task )));
217+ snapshots (snapshotsInProgress , repo , toResolve , ignoreUnavailable , task , listener );
216218 } else {
217219 final List <SnapshotInfo > snapshotInfos ;
218220 if (repositoryData != null ) {
@@ -235,12 +237,16 @@ private void loadSnapshotInfos(SnapshotsInProgress snapshotsInProgress, String r
235237 * @param snapshotIds snapshots for which to fetch snapshot information
236238 * @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning,
237239 * if false, they will throw an error
238- * @return list of snapshots
239240 */
240- private List <SnapshotInfo > snapshots (SnapshotsInProgress snapshotsInProgress , String repositoryName ,
241- List <SnapshotId > snapshotIds , boolean ignoreUnavailable , CancellableTask task ) {
241+ private void snapshots (SnapshotsInProgress snapshotsInProgress ,
242+ String repositoryName ,
243+ Collection <SnapshotId > snapshotIds ,
244+ boolean ignoreUnavailable ,
245+ CancellableTask task ,
246+ ActionListener <List <SnapshotInfo >> listener ) {
242247 if (task .isCancelled ()) {
243- throw new TaskCancelledException ("task cancelled" );
248+ listener .onFailure (new TaskCancelledException ("task cancelled" ));
249+ return ;
244250 }
245251 final Set <SnapshotInfo > snapshotSet = new HashSet <>();
246252 final Set <SnapshotId > snapshotIdsToIterate = new HashSet <>(snapshotIds );
@@ -252,28 +258,88 @@ private List<SnapshotInfo> snapshots(SnapshotsInProgress snapshotsInProgress, St
252258 snapshotSet .add (new SnapshotInfo (entry ));
253259 }
254260 }
255- // then, look in the repository
256- final Repository repository = repositoriesService .repository (repositoryName );
257- for (SnapshotId snapshotId : snapshotIdsToIterate ) {
261+ // then, look in the repository if there's any matching snapshots left
262+ final List <SnapshotInfo > snapshotInfos ;
263+ if (snapshotIdsToIterate .isEmpty ()) {
264+ snapshotInfos = Collections .emptyList ();
265+ } else {
266+ snapshotInfos = Collections .synchronizedList (new ArrayList <>());
267+ }
268+ final ActionListener <Collection <Void >> allDoneListener = listener .delegateFailure ((l , v ) -> {
269+ final ArrayList <SnapshotInfo > snapshotList = new ArrayList <>(snapshotInfos );
270+ snapshotList .addAll (snapshotSet );
271+ CollectionUtil .timSort (snapshotList );
272+ listener .onResponse (unmodifiableList (snapshotList ));
273+ });
274+ if (snapshotIdsToIterate .isEmpty ()) {
275+ allDoneListener .onResponse (Collections .emptyList ());
276+ return ;
277+ }
278+ // put snapshot info downloads into a task queue instead of pushing them all into the queue to not completely monopolize the
279+ // snapshot meta pool for a single request
280+ final int workers = Math .min (threadPool .info (ThreadPool .Names .SNAPSHOT_META ).getMax (), snapshotIdsToIterate .size ());
281+ final BlockingQueue <SnapshotId > queue = new LinkedBlockingQueue <>(snapshotIdsToIterate );
282+ final ActionListener <Void > workerDoneListener = new GroupedActionListener <>(allDoneListener , workers ).delegateResponse ((l , e ) -> {
283+ queue .clear (); // Stop fetching the remaining snapshots once we've failed fetching one since the response is an error response
284+ // anyway in this case
285+ l .onFailure (e );
286+ });
287+ final Repository repository ;
288+ try {
289+ repository = repositoriesService .repository (repositoryName );
290+ } catch (RepositoryMissingException e ) {
291+ listener .onFailure (e );
292+ return ;
293+ }
294+ for (int i = 0 ; i < workers ; i ++) {
295+ getOneSnapshotInfo (
296+ ignoreUnavailable ,
297+ repository ,
298+ queue ,
299+ snapshotInfos ,
300+ task ,
301+ workerDoneListener
302+ );
303+ }
304+ }
305+
306+ /**
307+ * Tries to poll a {@link SnapshotId} to load {@link SnapshotInfo} for from the given {@code queue}. If it finds one in the queue,
308+ * loads the snapshot info from the repository and adds it to the given {@code snapshotInfos} collection, then invokes itself again to
309+ * try and poll another task from the queue.
310+ * If the queue is empty resolves {@code} listener.
311+ */
312+ private void getOneSnapshotInfo (boolean ignoreUnavailable ,
313+ Repository repository ,
314+ BlockingQueue <SnapshotId > queue ,
315+ Collection <SnapshotInfo > snapshotInfos ,
316+ CancellableTask task ,
317+ ActionListener <Void > listener ) {
318+ final SnapshotId snapshotId = queue .poll ();
319+ if (snapshotId == null ) {
320+ listener .onResponse (null );
321+ return ;
322+ }
323+ threadPool .executor (ThreadPool .Names .SNAPSHOT_META ).execute (() -> {
258324 if (task .isCancelled ()) {
259- throw new TaskCancelledException ("task cancelled" );
325+ listener .onFailure (new TaskCancelledException ("task cancelled" ));
326+ return ;
260327 }
261328 try {
262- snapshotSet .add (repository .getSnapshotInfo (snapshotId ));
329+ snapshotInfos .add (repository .getSnapshotInfo (snapshotId ));
263330 } catch (Exception ex ) {
264331 if (ignoreUnavailable ) {
265332 logger .warn (() -> new ParameterizedMessage ("failed to get snapshot [{}]" , snapshotId ), ex );
266333 } else {
267- if (ex instanceof SnapshotException ) {
268- throw ex ;
269- }
270- throw new SnapshotException (repositoryName , snapshotId , "Snapshot could not be read" , ex );
334+ listener .onFailure (
335+ ex instanceof SnapshotException
336+ ? ex
337+ : new SnapshotException (repository .getMetadata ().name (), snapshotId , "Snapshot could not be read" , ex )
338+ );
271339 }
272340 }
273- }
274- final ArrayList <SnapshotInfo > snapshotList = new ArrayList <>(snapshotSet );
275- CollectionUtil .timSort (snapshotList );
276- return unmodifiableList (snapshotList );
341+ getOneSnapshotInfo (ignoreUnavailable , repository , queue , snapshotInfos , task , listener );
342+ });
277343 }
278344
279345 private boolean isAllSnapshots (String [] snapshots ) {
0 commit comments