1717import org .elasticsearch .search .aggregations .bucket .composite .CompositeAggregation ;
1818import org .elasticsearch .search .aggregations .bucket .composite .CompositeAggregationBuilder ;
1919import org .elasticsearch .search .aggregations .bucket .composite .CompositeValuesSourceBuilder ;
20- import org .elasticsearch .search .aggregations .metrics .NumericMetricsAggregation ;
2120import org .elasticsearch .search .builder .SearchSourceBuilder ;
2221import org .elasticsearch .xpack .core .indexing .AsyncTwoPhaseIndexer ;
2322import org .elasticsearch .xpack .core .indexing .IndexerState ;
2423import org .elasticsearch .xpack .core .indexing .IterationResult ;
2524
2625import java .io .IOException ;
2726import java .io .UncheckedIOException ;
27+ import java .util .Collection ;
2828import java .util .List ;
2929import java .util .Map ;
3030import java .util .concurrent .Executor ;
3131import java .util .concurrent .atomic .AtomicReference ;
3232import java .util .stream .Collectors ;
33+ import java .util .stream .Stream ;
3334
34- import static org .elasticsearch .xpack .core .ml .job .persistence .ElasticsearchMappings .DOC_TYPE ;
3535import static org .elasticsearch .common .xcontent .XContentFactory .jsonBuilder ;
36+ import static org .elasticsearch .xpack .core .ml .job .persistence .ElasticsearchMappings .DOC_TYPE ;
3637
3738public abstract class FeatureIndexBuilderIndexer extends AsyncTwoPhaseIndexer <Map <String , Object >, FeatureIndexBuilderJobStats > {
3839
@@ -58,36 +59,37 @@ protected void onStartJob(long now) {
5859 @ Override
5960 protected IterationResult <Map <String , Object >> doProcess (SearchResponse searchResponse ) {
6061 final CompositeAggregation agg = searchResponse .getAggregations ().get ("feature" );
61- return new IterationResult <>(processBuckets (agg ), agg .afterKey (), agg .getBuckets ().isEmpty ());
62+ return new IterationResult <>(processBucketsToIndexRequests (agg ).collect (Collectors .toList ()), agg .afterKey (),
63+ agg .getBuckets ().isEmpty ());
6264 }
6365
6466 /*
65- * Mocked demo case
67+ * Parses the result and creates a stream of indexable documents
6668 *
67- * TODO: replace with proper implementation
69+ * Implementation decisions:
70+ *
71+ * Extraction uses generic maps as intermediate exchange format in order to hook in ingest pipelines/processors
72+ * in later versions, see {@link IngestDocument).
6873 */
69- private List <IndexRequest > processBuckets (CompositeAggregation agg ) {
70- // for now only 1 source supported
71- String destinationFieldName = job .getConfig ().getSourceConfig ().getSources (). get ( 0 ). name ();
72- String aggName = job .getConfig ().getAggregationConfig ().getAggregatorFactories (). iterator (). next (). getName ();
74+ private Stream <IndexRequest > processBucketsToIndexRequests (CompositeAggregation agg ) {
75+ String indexName = job . getConfig (). getDestinationIndex ();
76+ List < CompositeValuesSourceBuilder <?>> sources = job .getConfig ().getSourceConfig ().getSources ();
77+ Collection < AggregationBuilder > aggregationBuilders = job .getConfig ().getAggregationConfig ().getAggregatorFactories ();
7378
74- return agg .getBuckets ().stream ().map (b -> {
75- NumericMetricsAggregation .SingleValue aggResult = b .getAggregations ().get (aggName );
79+ return AggregationResultUtils .extractCompositeAggregationResults (agg , sources , aggregationBuilders ).map (document -> {
7680 XContentBuilder builder ;
7781 try {
7882 builder = jsonBuilder ();
7983 builder .startObject ();
80- builder .field (destinationFieldName , b .getKey ().get (destinationFieldName ));
81- builder .field (aggName , aggResult .value ());
84+ builder .map (document );
8285 builder .endObject ();
8386 } catch (IOException e ) {
8487 throw new UncheckedIOException (e );
8588 }
8689
87- String indexName = job .getConfig ().getDestinationIndex ();
8890 IndexRequest request = new IndexRequest (indexName , DOC_TYPE ).source (builder );
8991 return request ;
90- }). collect ( Collectors . toList ()) ;
92+ });
9193 }
9294
9395 @ Override
0 commit comments