Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache;

import java.net.InetAddress;
import java.security.AccessController;
Expand Down Expand Up @@ -66,14 +67,18 @@ public final class GeoIpProcessor extends AbstractProcessor {
private final DatabaseReader dbReader;
private final Set<Property> properties;
private final boolean ignoreMissing;
private final GeoIpCache cache;

GeoIpProcessor(String tag, String field, DatabaseReader dbReader, String targetField, Set<Property> properties, boolean ignoreMissing) {

GeoIpProcessor(String tag, String field, DatabaseReader dbReader, String targetField, Set<Property> properties, boolean ignoreMissing,
GeoIpCache cache) {
super(tag);
this.field = field;
this.targetField = targetField;
this.dbReader = dbReader;
this.properties = properties;
this.ignoreMissing = ignoreMissing;
this.cache = cache;
}

boolean isIgnoreMissing() {
Expand Down Expand Up @@ -146,15 +151,16 @@ Set<Property> getProperties() {

private Map<String, Object> retrieveCityGeoData(InetAddress ipAddress) {
SpecialPermission.check();
CityResponse response = AccessController.doPrivileged((PrivilegedAction<CityResponse>) () -> {
try {
return dbReader.city(ipAddress);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
CityResponse response = AccessController.doPrivileged((PrivilegedAction<CityResponse>) () ->
cache.putIfAbsent(ipAddress, CityResponse.class, ip -> {
try {
return dbReader.city(ip);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}));

Country country = response.getCountry();
City city = response.getCity();
Expand Down Expand Up @@ -231,15 +237,16 @@ private Map<String, Object> retrieveCityGeoData(InetAddress ipAddress) {

private Map<String, Object> retrieveCountryGeoData(InetAddress ipAddress) {
SpecialPermission.check();
CountryResponse response = AccessController.doPrivileged((PrivilegedAction<CountryResponse>) () -> {
try {
return dbReader.country(ipAddress);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
CountryResponse response = AccessController.doPrivileged((PrivilegedAction<CountryResponse>) () ->
cache.putIfAbsent(ipAddress, CountryResponse.class, ip -> {
try {
return dbReader.country(ip);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}));

Country country = response.getCountry();
Continent continent = response.getContinent();
Expand Down Expand Up @@ -275,15 +282,16 @@ private Map<String, Object> retrieveCountryGeoData(InetAddress ipAddress) {

private Map<String, Object> retrieveAsnGeoData(InetAddress ipAddress) {
SpecialPermission.check();
AsnResponse response = AccessController.doPrivileged((PrivilegedAction<AsnResponse>) () -> {
try {
return dbReader.asn(ipAddress);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
AsnResponse response = AccessController.doPrivileged((PrivilegedAction<AsnResponse>) () ->
cache.putIfAbsent(ipAddress, AsnResponse.class, ip -> {
try {
return dbReader.asn(ip);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}));

Integer asn = response.getAutonomousSystemNumber();
String organization_name = response.getAutonomousSystemOrganization();
Expand Down Expand Up @@ -322,9 +330,11 @@ public static final class Factory implements Processor.Factory {
);

private final Map<String, DatabaseReaderLazyLoader> databaseReaders;
private final GeoIpCache cache;

public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders) {
public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders, GeoIpCache cache) {
this.databaseReaders = databaseReaders;
this.cache = cache;
}

@Override
Expand Down Expand Up @@ -368,14 +378,15 @@ public GeoIpProcessor create(Map<String, Processor.Factory> registry, String pro
}
}

return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, properties, ignoreMissing);
return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, properties, ignoreMissing, cache);
}
}

// Geoip2's AddressNotFoundException is checked and due to the fact that we need run their code
// inside a PrivilegedAction code block, we are forced to catch any checked exception and rethrow
// it with an unchecked exception.
private static final class AddressNotFoundRuntimeException extends RuntimeException {
//package private for testing
static final class AddressNotFoundRuntimeException extends RuntimeException {

AddressNotFoundRuntimeException(Throwable cause) {
super(cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,20 @@
import com.maxmind.db.NodeCache;
import com.maxmind.db.Reader;
import com.maxmind.geoip2.DatabaseReader;
import org.elasticsearch.core.internal.io.IOUtils;
import com.maxmind.geoip2.model.AbstractResponse;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
Expand All @@ -42,6 +46,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Stream;

public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable {
Expand All @@ -61,24 +67,18 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
throw new IllegalStateException("getProcessors called twice for geoip plugin!!");
}
Path geoIpConfigDirectory = parameters.env.configFile().resolve("ingest-geoip");
NodeCache cache;
long cacheSize = CACHE_SIZE.get(parameters.env.settings());
if (cacheSize > 0) {
cache = new GeoIpCache(cacheSize);
} else {
cache = NoCache.getInstance();
}
try {
databaseReaders = loadDatabaseReaders(geoIpConfigDirectory, cache);
databaseReaders = loadDatabaseReaders(geoIpConfigDirectory);
} catch (IOException e) {
throw new RuntimeException(e);
}
return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders));
return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(cacheSize)));
}

static Map<String, DatabaseReaderLazyLoader> loadDatabaseReaders(Path geoIpConfigDirectory, NodeCache cache) throws IOException {
static Map<String, DatabaseReaderLazyLoader> loadDatabaseReaders(Path geoIpConfigDirectory) throws IOException {
if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) {
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
}
boolean loadDatabaseOnHeap = Booleans.parseBoolean(System.getProperty("es.geoip.load_db_on_heap", "false"));
Map<String, DatabaseReaderLazyLoader> databaseReaders = new HashMap<>();
Expand All @@ -92,7 +92,7 @@ static Map<String, DatabaseReaderLazyLoader> loadDatabaseReaders(Path geoIpConfi
String databaseFileName = databasePath.getFileName().toString();
DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader(databaseFileName,
() -> {
DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(cache);
DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(NoCache.getInstance());
if (loadDatabaseOnHeap) {
builder.fileMode(Reader.FileMode.MEMORY);
} else {
Expand All @@ -119,4 +119,75 @@ public void close() throws IOException {
}
}

/**
* The in-memory cache for the geoip data. There should only be 1 instance of this class..
* This cache differs from the maxmind's {@link NodeCache} such that this cache stores the deserialized Json objects to avoid the
* cost of deserialization for each lookup (cached or not). This comes at slight expense of higher memory usage, but significant
* reduction of CPU usage.
*/
static class GeoIpCache {
private final Cache<CacheKey, AbstractResponse> cache;

//package private for testing
GeoIpCache(long maxSize) {
if (maxSize < 0) {
throw new IllegalArgumentException("geoip max cache size must be 0 or greater");
}
this.cache = CacheBuilder.<CacheKey, AbstractResponse>builder().setMaximumWeight(maxSize).build();
}

<T extends AbstractResponse> T putIfAbsent(InetAddress ip, Class<T> responseType,
Function<InetAddress, AbstractResponse> retrieveFunction) {

//can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader)
CacheKey<T> cacheKey = new CacheKey<>(ip, responseType);
//intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
AbstractResponse response = cache.get(cacheKey);
if (response == null) {
response = retrieveFunction.apply(ip);
cache.put(cacheKey, response);
}
return responseType.cast(response);
}

//only useful for testing
<T extends AbstractResponse> T get(InetAddress ip, Class<T> responseType) {
CacheKey<T> cacheKey = new CacheKey<>(ip, responseType);
return responseType.cast(cache.get(cacheKey));
}

/**
* The key to use for the cache. Since this cache can span multiple geoip processors that all use different databases, the response
* type is needed to be included in the cache key. For example, if we only used the IP address as the key the City and ASN the same
* IP may be in both with different values and we need to cache both. The response type scopes the IP to the correct database
* provides a means to safely cast the return objects.
* @param <T> The AbstractResponse type used to scope the key and cast the result.
*/
private static class CacheKey<T extends AbstractResponse> {

private final InetAddress ip;
private final Class<T> responseType;

private CacheKey(InetAddress ip, Class<T> responseType) {
this.ip = ip;
this.responseType = responseType;
}

//generated
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CacheKey<?> cacheKey = (CacheKey<?>) o;
return Objects.equals(ip, cacheKey.ip) &&
Objects.equals(responseType, cacheKey.responseType);
}

//generated
@Override
public int hashCode() {
return Objects.hash(ip, responseType);
}
}
}
}

This file was deleted.

Loading