Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
Expand Down Expand Up @@ -109,6 +110,18 @@ public void cleanUp() throws Exception {
assertThat(state.getDatabases(), anEmptyMap());
}
});
assertBusy(() -> {
GeoIpDownloaderStatsAction.Response response =
client().execute(GeoIpDownloaderStatsAction.INSTANCE, new GeoIpDownloaderStatsAction.Request()).actionGet();
assertThat(response.getStats().getDatabasesCount(), equalTo(0));
assertThat(response.getNodes(), not(empty()));
for (GeoIpDownloaderStatsAction.NodeResponse nodeResponse : response.getNodes()) {
assertThat(nodeResponse.getConfigDatabases(), empty());
assertThat(nodeResponse.getDatabases(), empty());
assertThat(nodeResponse.getFilesInTemp().stream().filter(s -> s.endsWith(".txt") == false).collect(Collectors.toList()),
empty());
}
});
assertBusy(() -> {
List<Path> geoIpTmpDirs = getGeoIpTmpDirs();
for (Path geoIpTmpDir : geoIpTmpDirs) {
Expand Down Expand Up @@ -263,7 +276,6 @@ public void testGeoIpDatabasesDownload() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/79074")
@TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/69972")
public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
assumeTrue("only test with fixture to have stable results", ENDPOINT != null);
Expand Down Expand Up @@ -310,9 +322,9 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
}
}
});
deleteDatabasesInConfigDirectory();
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/79074")
public void testStartWithNoDatabases() throws Exception {
assumeTrue("only test with fixture to have stable results", ENDPOINT != null);
putPipeline();
Expand All @@ -330,31 +342,24 @@ public void testStartWithNoDatabases() throws Exception {
// Enable downloader:
Settings.Builder settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true);
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));
{
assertBusy(() -> {
SimulateDocumentBaseResult result = simulatePipeline();
assertThat(result.getFailure(), nullValue());
assertThat(result.getIngestDocument(), notNullValue());
Map<?, ?> source = result.getIngestDocument().getSourceAndMetadata();
assertThat(source, not(hasKey("tags")));
assertThat(source, hasKey("ip-city"));
assertThat(source, hasKey("ip-asn"));
assertThat(source, hasKey("ip-country"));

assertThat(((Map<?, ?>) source.get("ip-city")).get("city_name"), equalTo("Linköping"));
assertThat(((Map<?, ?>) source.get("ip-asn")).get("organization_name"), equalTo("Bredband2 AB"));
assertThat(((Map<?, ?>) source.get("ip-country")).get("country_name"), equalTo("Sweden"));
});
}
verifyUpdatedDatabase();
}

private void verifyUpdatedDatabase() throws Exception {
assertBusy(() -> {
SimulateDocumentBaseResult result = simulatePipeline();
assertThat(result.getFailure(), nullValue());
assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Linköping"));
assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB"));
assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden"));
assertThat(result.getIngestDocument(), notNullValue());

Map<?, ?> source = result.getIngestDocument().getSourceAndMetadata();
assertThat(source, not(hasKey("tags")));
assertThat(source, hasKey("ip-city"));
assertThat(source, hasKey("ip-asn"));
assertThat(source, hasKey("ip-country"));

assertThat(((Map<?, ?>) source.get("ip-city")).get("city_name"), equalTo("Linköping"));
assertThat(((Map<?, ?>) source.get("ip-asn")).get("organization_name"), equalTo("Bredband2 AB"));
assertThat(((Map<?, ?>) source.get("ip-country")).get("country_name"), equalTo("Sweden"));
});
}

Expand Down Expand Up @@ -487,6 +492,29 @@ private void setupDatabasesInConfigDirectory() throws Exception {
});
}

private void deleteDatabasesInConfigDirectory() throws Exception {
StreamSupport.stream(internalCluster().getInstances(Environment.class).spliterator(), false)
.map(Environment::configFile)
.map(path -> path.resolve("ingest-geoip"))
.distinct()
.forEach(path -> {
try {
IOUtils.rm(path);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});

assertBusy(() -> {
GeoIpDownloaderStatsAction.Response response =
client().execute(GeoIpDownloaderStatsAction.INSTANCE, new GeoIpDownloaderStatsAction.Request()).actionGet();
assertThat(response.getNodes(), not(empty()));
for (GeoIpDownloaderStatsAction.NodeResponse nodeResponse : response.getNodes()) {
assertThat(nodeResponse.getConfigDatabases(), empty());
}
});
}

@SuppressForbidden(reason = "Maxmind API requires java.io.File")
private void parseDatabase(Path tempFile) throws IOException {
try (DatabaseReader databaseReader = new DatabaseReader.Builder(tempFile.toFile()).build()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ public final class GeoIpProcessor extends AbstractProcessor {
private final Set<Property> properties;
private final boolean ignoreMissing;
private final boolean firstOnly;
private final String databaseFile;

/**
* Construct a geo-IP processor.
*
* @param tag the processor tag
* @param tag the processor tag
* @param description the processor description
* @param field the source field to geo-IP map
* @param supplier a supplier of a geo-IP database reader; ideally this is lazily-loaded once on first use
Expand All @@ -83,6 +83,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
* @param properties the properties; ideally this is lazily-loaded once on first use
* @param ignoreMissing true if documents with a missing value for the field should be ignored
* @param firstOnly true if only first result should be returned in case of array
* @param databaseFile
*/
GeoIpProcessor(
final String tag,
Expand All @@ -93,7 +94,8 @@ public final class GeoIpProcessor extends AbstractProcessor {
final String targetField,
final Set<Property> properties,
final boolean ignoreMissing,
final boolean firstOnly) {
final boolean firstOnly,
final String databaseFile) {
super(tag, description);
this.field = field;
this.isValid = isValid;
Expand All @@ -102,6 +104,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
this.properties = properties;
this.ignoreMissing = ignoreMissing;
this.firstOnly = firstOnly;
this.databaseFile = databaseFile;
}

boolean isIgnoreMissing() {
Expand All @@ -121,8 +124,14 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract geoip information.");
}

DatabaseReaderLazyLoader lazyLoader = this.supplier.get();
if (lazyLoader == null) {
tag(ingestDocument, databaseFile);
return ingestDocument;
}

if (ip instanceof String) {
Map<String, Object> geoData = getGeoData((String) ip);
Map<String, Object> geoData = getGeoData(lazyLoader, (String) ip);
if (geoData.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, geoData);
}
Expand All @@ -133,7 +142,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException
if (ipAddr instanceof String == false) {
throw new IllegalArgumentException("array in field [" + field + "] should only contain strings");
}
Map<String, Object> geoData = getGeoData((String) ipAddr);
Map<String, Object> geoData = getGeoData(lazyLoader, (String) ipAddr);
if (geoData.isEmpty()) {
geoDataList.add(null);
continue;
Expand All @@ -154,8 +163,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException
return ingestDocument;
}

private Map<String, Object> getGeoData(String ip) throws IOException {
DatabaseReaderLazyLoader lazyLoader = this.supplier.get();
private Map<String, Object> getGeoData(DatabaseReaderLazyLoader lazyLoader, String ip) throws IOException {
try {
final String databaseType = lazyLoader.getDatabaseType();
final InetAddress ipAddress = InetAddresses.forString(ip);
Expand Down Expand Up @@ -431,7 +439,9 @@ public Processor create(
}
CheckedSupplier<DatabaseReaderLazyLoader, IOException> supplier = () -> {
DatabaseReaderLazyLoader loader = databaseRegistry.getDatabase(databaseFile);
if (loader == null) {
if (loader == null && databaseRegistry.getAvailableDatabases().isEmpty() == false) {
return null;
} else if (loader == null) {
throw new ResourceNotFoundException("database file [" + databaseFile + "] doesn't exist");
}
// Only check whether the suffix has changed and not the entire database type.
Expand Down Expand Up @@ -467,7 +477,7 @@ public Processor create(
return valid;
};
return new GeoIpProcessor(processorTag, description, ipField, supplier, isValid, targetField, properties, ignoreMissing,
firstOnly);
firstOnly, databaseFile);
}
}

Expand Down Expand Up @@ -543,7 +553,7 @@ static class DatabaseUnavailableProcessor extends AbstractProcessor {

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
ingestDocument.appendFieldValue("tags", "_geoip_database_unavailable_" + databaseName, true);
tag(ingestDocument, databaseName);
return ingestDocument;
}

Expand All @@ -556,4 +566,8 @@ public String getDatabaseName() {
return databaseName;
}
}

private static void tag(IngestDocument ingestDocument, String databaseName) {
ingestDocument.appendFieldValue("tags", "_geoip_database_unavailable_" + databaseName, true);
}
}
Loading