66package org .elasticsearch .xpack .ml .job .retention ;
77
88import org .elasticsearch .action .ActionListener ;
9- import org .elasticsearch .cluster .ClusterState ;
10- import org .elasticsearch .cluster .service .ClusterService ;
9+ import org .elasticsearch .client .Client ;
1110import org .elasticsearch .common .unit .TimeValue ;
1211import org .elasticsearch .index .query .BoolQueryBuilder ;
1312import org .elasticsearch .index .query .QueryBuilders ;
14- import org .elasticsearch .xpack .core .ml .MlMetadata ;
1513import org .elasticsearch .xpack .core .ml .job .config .Job ;
14+ import org .elasticsearch .xpack .core .ml .job .persistence .AnomalyDetectorsIndex ;
1615import org .elasticsearch .xpack .core .ml .job .results .Result ;
16+ import org .elasticsearch .xpack .ml .job .persistence .BatchedJobsIterator ;
1717import org .elasticsearch .xpack .ml .utils .VolatileCursorIterator ;
1818import org .joda .time .DateTime ;
1919import org .joda .time .chrono .ISOChronology ;
2020
21- import java .util .ArrayList ;
21+ import java .util .Deque ;
2222import java .util .Iterator ;
2323import java .util .List ;
24- import java .util .Objects ;
2524import java .util .concurrent .TimeUnit ;
25+ import java .util .stream .Collectors ;
2626
2727/**
2828 * Removes job data that expired with respect to their retention period.
3333 */
3434abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
3535
36- private final ClusterService clusterService ;
36+ private final Client client ;
3737
38- AbstractExpiredJobDataRemover (ClusterService clusterService ) {
39- this .clusterService = Objects . requireNonNull ( clusterService ) ;
38+ AbstractExpiredJobDataRemover (Client client ) {
39+ this .client = client ;
4040 }
4141
4242 @ Override
4343 public void remove (ActionListener <Boolean > listener ) {
4444 removeData (newJobIterator (), listener );
4545 }
4646
47- private void removeData (Iterator < Job > jobIterator , ActionListener <Boolean > listener ) {
47+ private void removeData (WrappedBatchedJobsIterator jobIterator , ActionListener <Boolean > listener ) {
4848 if (jobIterator .hasNext () == false ) {
4949 listener .onResponse (true );
5050 return ;
5151 }
5252 Job job = jobIterator .next ();
53+ if (job == null ) {
54+ // maybe null if the batched iterator search return no results
55+ listener .onResponse (true );
56+ return ;
57+ }
58+
5359 Long retentionDays = getRetentionDays (job );
5460 if (retentionDays == null ) {
5561 removeData (jobIterator , listener );
@@ -59,14 +65,9 @@ private void removeData(Iterator<Job> jobIterator, ActionListener<Boolean> liste
5965 removeDataBefore (job , cutoffEpochMs , ActionListener .wrap (response -> removeData (jobIterator , listener ), listener ::onFailure ));
6066 }
6167
62- private Iterator <Job > newJobIterator () {
63- ClusterState clusterState = clusterService .state ();
64- List <Job > jobs = new ArrayList <>(MlMetadata .getMlMetadata (clusterState ).getJobs ().values ());
65- return createVolatileCursorIterator (jobs );
66- }
67-
68- protected static <T > Iterator <T > createVolatileCursorIterator (List <T > items ) {
69- return new VolatileCursorIterator <T >(items );
68+ private WrappedBatchedJobsIterator newJobIterator () {
69+ BatchedJobsIterator jobsIterator = new BatchedJobsIterator (client , AnomalyDetectorsIndex .configIndexName ());
70+ return new WrappedBatchedJobsIterator (jobsIterator );
7071 }
7172
7273 private long calcCutoffEpochMs (long retentionDays ) {
@@ -87,4 +88,49 @@ protected static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs)
8788 .filter (QueryBuilders .termQuery (Job .ID .getPreferredName (), jobId ))
8889 .filter (QueryBuilders .rangeQuery (Result .TIMESTAMP .getPreferredName ()).lt (cutoffEpochMs ).format ("epoch_millis" ));
8990 }
91+
92+ /**
93+ * BatchedJobsIterator efficiently returns batches of jobs using a scroll
94+ * search but AbstractExpiredJobDataRemover works with one job at a time.
95+ * This class abstracts away the logic of pulling one job at a time from
96+ * multiple batches.
97+ */
98+ private class WrappedBatchedJobsIterator implements Iterator <Job > {
99+ private final BatchedJobsIterator batchedIterator ;
100+ private VolatileCursorIterator <Job > currentBatch ;
101+
102+ WrappedBatchedJobsIterator (BatchedJobsIterator batchedIterator ) {
103+ this .batchedIterator = batchedIterator ;
104+ }
105+
106+ @ Override
107+ public boolean hasNext () {
108+ return (currentBatch != null && currentBatch .hasNext ()) || batchedIterator .hasNext ();
109+ }
110+
111+ /**
112+ * Before BatchedJobsIterator has run a search it reports hasNext == true
113+ * but the first search may return no results. In that case null is return
114+ * and clients have to handle null.
115+ */
116+ @ Override
117+ public Job next () {
118+ if (currentBatch != null && currentBatch .hasNext ()) {
119+ return currentBatch .next ();
120+ }
121+
122+ // currentBatch is either null or all its elements have been iterated.
123+ // get the next currentBatch
124+ currentBatch = createBatchIteratorFromBatch (batchedIterator .next ());
125+
126+ // BatchedJobsIterator.hasNext maybe true if searching the first time
127+ // but no results are returned.
128+ return currentBatch .hasNext () ? currentBatch .next () : null ;
129+ }
130+
131+ private VolatileCursorIterator <Job > createBatchIteratorFromBatch (Deque <Job .Builder > builders ) {
132+ List <Job > jobs = builders .stream ().map (Job .Builder ::build ).collect (Collectors .toList ());
133+ return new VolatileCursorIterator <>(jobs );
134+ }
135+ }
90136}
0 commit comments