From 2ca459502f8e8447a329beb8ab906d9bc803207d Mon Sep 17 00:00:00 2001 From: Przemko Robakowski Date: Tue, 6 Jul 2021 07:54:14 +0200 Subject: [PATCH 1/2] Fix GeoIpProcessor when there's no updated db (#74944) This change fixes problem with GeoIpProcessor when there's GeoIpTaskState present in the cluster state but there's no database matching the one used by the processor. It can happen when there are some but not all databases already updated. --- .../ingest/geoip/GeoIpProcessor.java | 4 +++- .../ingest/geoip/IngestGeoIpPlugin.java | 3 ++- .../geoip/GeoIpProcessorFactoryTests.java | 23 +++++++++++++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index 61b4ad367c146..06246deb2bf6b 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -436,7 +436,9 @@ public GeoIpProcessor create( return true; } GeoIpTaskState state = (GeoIpTaskState) task.getState(); - return state.getDatabases().get(databaseFile).isValid(currentState.metadata().settings()); + GeoIpTaskState.Metadata metadata = state.getDatabases().get(databaseFile); + // we never remove metadata from cluster state, if metadata is null we deal with built-in database, which is always valid + return metadata == null || metadata.isValid(currentState.metadata().settings()); }; return new GeoIpProcessor(processorTag, description, ipField, supplier, isValid, targetField, properties, ignoreMissing, firstOnly); diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index bff1c5c4a0089..79b747dcf17ae 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -64,6 +64,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; +import static org.elasticsearch.ingest.IngestService.INGEST_ORIGIN; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; @@ -169,7 +170,7 @@ public Collection getSystemIndexDescriptors(Settings sett .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1") .build()) - .setOrigin("geoip") + .setOrigin(INGEST_ORIGIN) .setVersionMetaKey("version") .setPrimaryIndex(DATABASES_INDEX) .setNetNew() diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index f7837892e9a8c..d5f3b0f1be4fe 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -9,10 +9,12 @@ package org.elasticsearch.ingest.geoip; import com.carrotsearch.randomizedtesting.generators.RandomPicks; + import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; @@ -20,6 +22,7 @@ import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.RandomDocumentPicks; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.StreamsUtils; import org.elasticsearch.threadpool.TestThreadPool; @@ -367,6 +370,26 @@ public void testFallbackUsingDefaultDatabases() throws Exception { } } + public void testDefaultDatabaseWithTaskPresent() throws Exception { + PersistentTasksCustomMetadata tasks = PersistentTasksCustomMetadata.builder() + .addTask(GeoIpDownloader.GEOIP_DOWNLOADER, GeoIpDownloader.GEOIP_DOWNLOADER, null, null) + .updateTaskState(GeoIpDownloader.GEOIP_DOWNLOADER, GeoIpTaskState.EMPTY) + .build(); + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .metadata(Metadata.builder().putCustom(PersistentTasksCustomMetadata.TYPE, tasks)) + .build(); + when(clusterService.state()).thenReturn(clusterState); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); + + Map config = new HashMap<>(); + config.put("field", "_field"); + String processorTag = randomAlphaOfLength(10); + + GeoIpProcessor processor = factory.create(null, processorTag, null, config); + + processor.execute(RandomDocumentPicks.randomIngestDocument(random(), Map.of("_field", "89.160.20.128"))); + } + public void testFallbackUsingDefaultDatabasesWhileIngesting() throws Exception { copyDatabaseFile(geoipTmpDir, "GeoLite2-City-Test.mmdb"); GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); From e681f79e5deea8a44a1cad4feea2a5a8702c34a1 Mon Sep 17 00:00:00 2001 From: Przemko Robakowski Date: Tue, 6 Jul 2021 09:32:43 +0200 Subject: [PATCH 2/2] fix compilation --- .../elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index d5f3b0f1be4fe..3b6a58fd96443 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -387,7 +387,7 @@ public void testDefaultDatabaseWithTaskPresent() throws Exception { GeoIpProcessor processor = factory.create(null, processorTag, null, config); - processor.execute(RandomDocumentPicks.randomIngestDocument(random(), Map.of("_field", "89.160.20.128"))); + processor.execute(RandomDocumentPicks.randomIngestDocument(random(), org.elasticsearch.core.Map.of("_field", "89.160.20.128"))); } public void testFallbackUsingDefaultDatabasesWhileIngesting() throws Exception {