Skip to content

Commit 5e18333

Browse files
authored
Improve GeoIpDownloaderIT test suite and improve geoip failure tagging. (#79131)
In the case a database couldn't be loaded, the geoip processor factory checks whether any databases are available and then returns a processor implementation that tags documents with the fact that required database wasn't available. The GeoIpProcessor itself also loads the database, but in case a database can't be loaded then it always fails with resource missing exception. The GeoIpProcessor is modified in this change to also check whether any database is available and in that case tag documents instead of failing. GeoIpDownloaderIT improvements: * The `testUseGeoIpProcessorWithDownloadedDBs()` was adding databases to config dirs, but not cleaning it up. Which broke assumptions in others in this suite, because the test cluster is reused. * Use the geoip stats api after each test to wait for a clean state, which means wait for database downloader to be disabled and all database files to be removed on all ingest nodes. * Don't use `IngestDocument#getFieldValue(...)` in test code surrounded by `assertBusy(...)`. If a field isn't there an illegal state exception is thrown, which isn't caught by `assertBusy(...)`. Only assertion errors are handled. Closes #79074
1 parent bf15ccc commit 5e18333

File tree

3 files changed

+109
-67
lines changed

3 files changed

+109
-67
lines changed

modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.cluster.node.DiscoveryNode;
2020
import org.elasticsearch.common.bytes.BytesReference;
2121
import org.elasticsearch.common.settings.Settings;
22+
import org.elasticsearch.core.internal.io.IOUtils;
2223
import org.elasticsearch.xcontent.XContentBuilder;
2324
import org.elasticsearch.xcontent.XContentType;
2425
import org.elasticsearch.xcontent.json.JsonXContent;
@@ -109,6 +110,18 @@ public void cleanUp() throws Exception {
109110
assertThat(state.getDatabases(), anEmptyMap());
110111
}
111112
});
113+
assertBusy(() -> {
114+
GeoIpDownloaderStatsAction.Response response =
115+
client().execute(GeoIpDownloaderStatsAction.INSTANCE, new GeoIpDownloaderStatsAction.Request()).actionGet();
116+
assertThat(response.getStats().getDatabasesCount(), equalTo(0));
117+
assertThat(response.getNodes(), not(empty()));
118+
for (GeoIpDownloaderStatsAction.NodeResponse nodeResponse : response.getNodes()) {
119+
assertThat(nodeResponse.getConfigDatabases(), empty());
120+
assertThat(nodeResponse.getDatabases(), empty());
121+
assertThat(nodeResponse.getFilesInTemp().stream().filter(s -> s.endsWith(".txt") == false).collect(Collectors.toList()),
122+
empty());
123+
}
124+
});
112125
assertBusy(() -> {
113126
List<Path> geoIpTmpDirs = getGeoIpTmpDirs();
114127
for (Path geoIpTmpDir : geoIpTmpDirs) {
@@ -263,7 +276,6 @@ public void testGeoIpDatabasesDownload() throws Exception {
263276
}
264277
}
265278

266-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/79074")
267279
@TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/69972")
268280
public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
269281
assumeTrue("only test with fixture to have stable results", ENDPOINT != null);
@@ -310,9 +322,9 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
310322
}
311323
}
312324
});
325+
deleteDatabasesInConfigDirectory();
313326
}
314327

315-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/79074")
316328
public void testStartWithNoDatabases() throws Exception {
317329
assumeTrue("only test with fixture to have stable results", ENDPOINT != null);
318330
putPipeline();
@@ -330,31 +342,24 @@ public void testStartWithNoDatabases() throws Exception {
330342
// Enable downloader:
331343
Settings.Builder settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true);
332344
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));
333-
{
334-
assertBusy(() -> {
335-
SimulateDocumentBaseResult result = simulatePipeline();
336-
assertThat(result.getFailure(), nullValue());
337-
assertThat(result.getIngestDocument(), notNullValue());
338-
Map<?, ?> source = result.getIngestDocument().getSourceAndMetadata();
339-
assertThat(source, not(hasKey("tags")));
340-
assertThat(source, hasKey("ip-city"));
341-
assertThat(source, hasKey("ip-asn"));
342-
assertThat(source, hasKey("ip-country"));
343-
344-
assertThat(((Map<?, ?>) source.get("ip-city")).get("city_name"), equalTo("Linköping"));
345-
assertThat(((Map<?, ?>) source.get("ip-asn")).get("organization_name"), equalTo("Bredband2 AB"));
346-
assertThat(((Map<?, ?>) source.get("ip-country")).get("country_name"), equalTo("Sweden"));
347-
});
348-
}
345+
verifyUpdatedDatabase();
349346
}
350347

351348
private void verifyUpdatedDatabase() throws Exception {
352349
assertBusy(() -> {
353350
SimulateDocumentBaseResult result = simulatePipeline();
354351
assertThat(result.getFailure(), nullValue());
355-
assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Linköping"));
356-
assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB"));
357-
assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden"));
352+
assertThat(result.getIngestDocument(), notNullValue());
353+
354+
Map<?, ?> source = result.getIngestDocument().getSourceAndMetadata();
355+
assertThat(source, not(hasKey("tags")));
356+
assertThat(source, hasKey("ip-city"));
357+
assertThat(source, hasKey("ip-asn"));
358+
assertThat(source, hasKey("ip-country"));
359+
360+
assertThat(((Map<?, ?>) source.get("ip-city")).get("city_name"), equalTo("Linköping"));
361+
assertThat(((Map<?, ?>) source.get("ip-asn")).get("organization_name"), equalTo("Bredband2 AB"));
362+
assertThat(((Map<?, ?>) source.get("ip-country")).get("country_name"), equalTo("Sweden"));
358363
});
359364
}
360365

@@ -487,6 +492,29 @@ private void setupDatabasesInConfigDirectory() throws Exception {
487492
});
488493
}
489494

495+
private void deleteDatabasesInConfigDirectory() throws Exception {
496+
StreamSupport.stream(internalCluster().getInstances(Environment.class).spliterator(), false)
497+
.map(Environment::configFile)
498+
.map(path -> path.resolve("ingest-geoip"))
499+
.distinct()
500+
.forEach(path -> {
501+
try {
502+
IOUtils.rm(path);
503+
} catch (IOException e) {
504+
throw new UncheckedIOException(e);
505+
}
506+
});
507+
508+
assertBusy(() -> {
509+
GeoIpDownloaderStatsAction.Response response =
510+
client().execute(GeoIpDownloaderStatsAction.INSTANCE, new GeoIpDownloaderStatsAction.Request()).actionGet();
511+
assertThat(response.getNodes(), not(empty()));
512+
for (GeoIpDownloaderStatsAction.NodeResponse nodeResponse : response.getNodes()) {
513+
assertThat(nodeResponse.getConfigDatabases(), empty());
514+
}
515+
});
516+
}
517+
490518
@SuppressForbidden(reason = "Maxmind API requires java.io.File")
491519
private void parseDatabase(Path tempFile) throws IOException {
492520
try (DatabaseReader databaseReader = new DatabaseReader.Builder(tempFile.toFile()).build()) {

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,11 @@ public final class GeoIpProcessor extends AbstractProcessor {
7070
private final Set<Property> properties;
7171
private final boolean ignoreMissing;
7272
private final boolean firstOnly;
73+
private final String databaseFile;
7374

7475
/**
7576
* Construct a geo-IP processor.
76-
*
77-
* @param tag the processor tag
77+
* @param tag the processor tag
7878
* @param description the processor description
7979
* @param field the source field to geo-IP map
8080
* @param supplier a supplier of a geo-IP database reader; ideally this is lazily-loaded once on first use
@@ -83,6 +83,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
8383
* @param properties the properties; ideally this is lazily-loaded once on first use
8484
* @param ignoreMissing true if documents with a missing value for the field should be ignored
8585
* @param firstOnly true if only first result should be returned in case of array
86+
* @param databaseFile
8687
*/
8788
GeoIpProcessor(
8889
final String tag,
@@ -93,7 +94,8 @@ public final class GeoIpProcessor extends AbstractProcessor {
9394
final String targetField,
9495
final Set<Property> properties,
9596
final boolean ignoreMissing,
96-
final boolean firstOnly) {
97+
final boolean firstOnly,
98+
final String databaseFile) {
9799
super(tag, description);
98100
this.field = field;
99101
this.isValid = isValid;
@@ -102,6 +104,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
102104
this.properties = properties;
103105
this.ignoreMissing = ignoreMissing;
104106
this.firstOnly = firstOnly;
107+
this.databaseFile = databaseFile;
105108
}
106109

107110
boolean isIgnoreMissing() {
@@ -121,8 +124,14 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException
121124
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract geoip information.");
122125
}
123126

127+
DatabaseReaderLazyLoader lazyLoader = this.supplier.get();
128+
if (lazyLoader == null) {
129+
tag(ingestDocument, databaseFile);
130+
return ingestDocument;
131+
}
132+
124133
if (ip instanceof String) {
125-
Map<String, Object> geoData = getGeoData((String) ip);
134+
Map<String, Object> geoData = getGeoData(lazyLoader, (String) ip);
126135
if (geoData.isEmpty() == false) {
127136
ingestDocument.setFieldValue(targetField, geoData);
128137
}
@@ -133,7 +142,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException
133142
if (ipAddr instanceof String == false) {
134143
throw new IllegalArgumentException("array in field [" + field + "] should only contain strings");
135144
}
136-
Map<String, Object> geoData = getGeoData((String) ipAddr);
145+
Map<String, Object> geoData = getGeoData(lazyLoader, (String) ipAddr);
137146
if (geoData.isEmpty()) {
138147
geoDataList.add(null);
139148
continue;
@@ -154,8 +163,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException
154163
return ingestDocument;
155164
}
156165

157-
private Map<String, Object> getGeoData(String ip) throws IOException {
158-
DatabaseReaderLazyLoader lazyLoader = this.supplier.get();
166+
private Map<String, Object> getGeoData(DatabaseReaderLazyLoader lazyLoader, String ip) throws IOException {
159167
try {
160168
final String databaseType = lazyLoader.getDatabaseType();
161169
final InetAddress ipAddress = InetAddresses.forString(ip);
@@ -431,7 +439,9 @@ public Processor create(
431439
}
432440
CheckedSupplier<DatabaseReaderLazyLoader, IOException> supplier = () -> {
433441
DatabaseReaderLazyLoader loader = databaseRegistry.getDatabase(databaseFile);
434-
if (loader == null) {
442+
if (loader == null && databaseRegistry.getAvailableDatabases().isEmpty() == false) {
443+
return null;
444+
} else if (loader == null) {
435445
throw new ResourceNotFoundException("database file [" + databaseFile + "] doesn't exist");
436446
}
437447
// Only check whether the suffix has changed and not the entire database type.
@@ -467,7 +477,7 @@ public Processor create(
467477
return valid;
468478
};
469479
return new GeoIpProcessor(processorTag, description, ipField, supplier, isValid, targetField, properties, ignoreMissing,
470-
firstOnly);
480+
firstOnly, databaseFile);
471481
}
472482
}
473483

@@ -543,7 +553,7 @@ static class DatabaseUnavailableProcessor extends AbstractProcessor {
543553

544554
@Override
545555
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
546-
ingestDocument.appendFieldValue("tags", "_geoip_database_unavailable_" + databaseName, true);
556+
tag(ingestDocument, databaseName);
547557
return ingestDocument;
548558
}
549559

@@ -556,4 +566,8 @@ public String getDatabaseName() {
556566
return databaseName;
557567
}
558568
}
569+
570+
private static void tag(IngestDocument ingestDocument, String databaseName) {
571+
ingestDocument.appendFieldValue("tags", "_geoip_database_unavailable_" + databaseName, true);
572+
}
559573
}

0 commit comments

Comments
 (0)