From fdf1e50c9abc77db520252e571b8ed3d8297ae5b Mon Sep 17 00:00:00 2001 From: James Baiera Date: Wed, 8 May 2019 15:48:09 -0400 Subject: [PATCH] Add step to forcemerge enrich index after reindex --- .../xpack/enrich/EnrichPolicyRunner.java | 23 ++++++++-- .../xpack/enrich/EnrichPolicyRunnerTests.java | 42 +++++++++++++++++++ 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index 3447d83c90d72..a86ccab979401 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -21,6 +21,8 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; @@ -105,14 +107,14 @@ private void validateMappings(final GetIndexResponse getIndexResponse) { logger.debug("Policy [{}]: Validating [{}] source mappings", policyName, sourceIndices); for (String sourceIndex : sourceIndices) { Map mapping = getMappings(getIndexResponse, sourceIndex); - Set properties = ((Map) mapping.get("properties")).keySet(); + Map properties = ((Map) mapping.get("properties")); if (properties == null) { listener.onFailure( new ElasticsearchException( "Enrich policy execution for [{}] failed. Could not read mapping for source [{}] included by pattern [{}]", policyName, sourceIndex, policy.getIndices())); } - if (properties.contains(policy.getEnrichKey()) == false) { + if (properties.containsKey(policy.getEnrichKey()) == false) { listener.onFailure( new ElasticsearchException( "Enrich policy execution for [{}] failed. Could not locate enrich key field [{}] on mapping for index [{}]", @@ -204,7 +206,7 @@ public void onResponse(BulkByScrollResponse bulkByScrollResponse) { } else { logger.info("Policy [{}]: Transferred [{}] documents to enrich index [{}]", policyName, bulkByScrollResponse.getCreated(), destinationIndexName); - refreshEnrichIndex(destinationIndexName); + forceMergeEnrichIndex(destinationIndexName); } } @@ -215,6 +217,21 @@ public void onFailure(Exception e) { }); } + private void forceMergeEnrichIndex(final String destinationIndexName) { + logger.debug("Policy [{}]: Force merging newly created enrich index [{}]", policyName, destinationIndexName); + client.admin().indices().forceMerge(new ForceMergeRequest(destinationIndexName).maxNumSegments(1), new ActionListener<>() { + @Override + public void onResponse(ForceMergeResponse forceMergeResponse) { + refreshEnrichIndex(destinationIndexName); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + private void refreshEnrichIndex(final String destinationIndexName) { logger.debug("Policy [{}]: Refreshing newly created enrich index [{}]", policyName, destinationIndexName); client.admin().indices().refresh(new RefreshRequest(destinationIndexName), new ActionListener<>() { diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java index b3dc8a7a5a9b8..a4ff936885da0 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java @@ -16,6 +16,11 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; +import org.elasticsearch.action.admin.indices.segments.IndexSegments; +import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.elasticsearch.action.admin.indices.segments.ShardSegments; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; @@ -25,6 +30,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.ReindexPlugin; import org.elasticsearch.plugins.Plugin; @@ -117,6 +123,7 @@ public void onFailure(Exception e) { throw exception.get(); } + // Validate Index definition String createdEnrichIndex = ".enrich-test1-" + createTime; GetIndexResponse enrichIndex = client().admin().indices() .getIndex(new GetIndexRequest().indices(".enrich-test1")).get(); @@ -125,6 +132,8 @@ public void onFailure(Exception e) { Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); assertNotNull(settings); assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + + // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).get("_doc").sourceAsMap(); assertThat(mapping.get("dynamic"), is("false")); Map properties = (Map) mapping.get("properties"); @@ -135,6 +144,7 @@ public void onFailure(Exception e) { assertThat(field1.get("type"), is(equalTo("keyword"))); assertThat(field1.get("doc_values"), is(false)); + // Validate document structure SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1") .source(SearchSourceBuilder.searchSource() @@ -147,6 +157,20 @@ public void onFailure(Exception e) { assertThat(enrichDocument.get("field1"), is(equalTo("value1"))); assertThat(enrichDocument.get("field2"), is(equalTo(2))); assertThat(enrichDocument.get("field5"), is(equalTo("value5"))); + + // Validate segments + IndicesSegmentResponse indicesSegmentResponse = client().admin().indices() + .segments(new IndicesSegmentsRequest(createdEnrichIndex)).get(); + IndexSegments indexSegments = indicesSegmentResponse.getIndices().get(createdEnrichIndex); + assertNotNull(indexSegments); + assertThat(indexSegments.getShards().size(), is(equalTo(1))); + IndexShardSegments shardSegments = indexSegments.getShards().get(0); + assertNotNull(shardSegments); + assertThat(shardSegments.getShards().length, is(equalTo(1))); + ShardSegments shard = shardSegments.getShards()[0]; + assertThat(shard.getSegments().size(), is(equalTo(1))); + Segment segment = shard.getSegments().iterator().next(); + assertThat(segment.getNumDocs(), is(equalTo(1))); } public void testRunnerMultiSource() throws Exception { @@ -233,6 +257,7 @@ public void onFailure(Exception e) { throw exception.get(); } + // Validate Index definition String createdEnrichIndex = ".enrich-test1-" + createTime; GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); assertThat(enrichIndex.getIndices().length, equalTo(1)); @@ -240,6 +265,8 @@ public void onFailure(Exception e) { Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); assertNotNull(settings); assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + + // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).get("_doc").sourceAsMap(); assertThat(mapping.get("dynamic"), is("false")); Map properties = (Map) mapping.get("properties"); @@ -250,6 +277,7 @@ public void onFailure(Exception e) { assertThat(field1.get("type"), is(equalTo("keyword"))); assertThat(field1.get("doc_values"), is(false)); + // Validate document structure SearchResponse enrichSearchResponse = client().search( new SearchRequest(".enrich-test1") .source(SearchSourceBuilder.searchSource() @@ -262,5 +290,19 @@ public void onFailure(Exception e) { assertThat(enrichDocument.get("field1"), is(equalTo("value1"))); assertThat(enrichDocument.get("field2"), is(equalTo(2))); assertThat(enrichDocument.get("field5"), is(equalTo("value5"))); + + // Validate segments + IndicesSegmentResponse indicesSegmentResponse = client().admin().indices() + .segments(new IndicesSegmentsRequest(createdEnrichIndex)).get(); + IndexSegments indexSegments = indicesSegmentResponse.getIndices().get(createdEnrichIndex); + assertNotNull(indexSegments); + assertThat(indexSegments.getShards().size(), is(equalTo(1))); + IndexShardSegments shardSegments = indexSegments.getShards().get(0); + assertNotNull(shardSegments); + assertThat(shardSegments.getShards().length, is(equalTo(1))); + ShardSegments shard = shardSegments.getShards()[0]; + assertThat(shard.getSegments().size(), is(equalTo(1))); + Segment segment = shard.getSegments().iterator().next(); + assertThat(segment.getNumDocs(), is(equalTo(3))); } }