Skip to content

Commit 6f9c9ab

Browse files
authored
[ingest] geo-ip performance improvements (#33029)
Re-implement the cache to avoid jackson JSON de-serialization for every IP lookup. The built in maxmind cache caches JsonNode objects. This requires de-serialization for every lookup, even if the object is found in cache. Profiling shows that is very expensive (CPU). The cache will now consist of the fully de-serialized objects. Profiling shows that the new footprint for the CityDB is ~6KB per cache entry. This may result in ~6MB increase with the 1000 entry default. The performance has been measured up to 40% faster on a modern 4 core/8 thread CPU for an ingest (minimal indexing) workflow. Further, the since prior implementation cached the JsonNode objects, and there is not a 1:1 relationship between an IP lookup / JsonNode object, the default cache size was most likely too small to be very effective. While this change does not change the 1000 default cache size, it will now cache more since there is now a 1:1 relationship between an IP lookup and value in the cache.
1 parent ef1066d commit 6f9c9ab

File tree

7 files changed

+230
-170
lines changed

7 files changed

+230
-170
lines changed

plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java

Lines changed: 0 additions & 46 deletions
This file was deleted.

plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java

Lines changed: 42 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.ingest.AbstractProcessor;
3737
import org.elasticsearch.ingest.IngestDocument;
3838
import org.elasticsearch.ingest.Processor;
39+
import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache;
3940

4041
import java.net.InetAddress;
4142
import java.security.AccessController;
@@ -66,14 +67,18 @@ public final class GeoIpProcessor extends AbstractProcessor {
6667
private final DatabaseReader dbReader;
6768
private final Set<Property> properties;
6869
private final boolean ignoreMissing;
70+
private final GeoIpCache cache;
6971

70-
GeoIpProcessor(String tag, String field, DatabaseReader dbReader, String targetField, Set<Property> properties, boolean ignoreMissing) {
72+
73+
GeoIpProcessor(String tag, String field, DatabaseReader dbReader, String targetField, Set<Property> properties, boolean ignoreMissing,
74+
GeoIpCache cache) {
7175
super(tag);
7276
this.field = field;
7377
this.targetField = targetField;
7478
this.dbReader = dbReader;
7579
this.properties = properties;
7680
this.ignoreMissing = ignoreMissing;
81+
this.cache = cache;
7782
}
7883

7984
boolean isIgnoreMissing() {
@@ -146,15 +151,16 @@ Set<Property> getProperties() {
146151

147152
private Map<String, Object> retrieveCityGeoData(InetAddress ipAddress) {
148153
SpecialPermission.check();
149-
CityResponse response = AccessController.doPrivileged((PrivilegedAction<CityResponse>) () -> {
150-
try {
151-
return dbReader.city(ipAddress);
152-
} catch (AddressNotFoundException e) {
153-
throw new AddressNotFoundRuntimeException(e);
154-
} catch (Exception e) {
155-
throw new RuntimeException(e);
156-
}
157-
});
154+
CityResponse response = AccessController.doPrivileged((PrivilegedAction<CityResponse>) () ->
155+
cache.putIfAbsent(ipAddress, CityResponse.class, ip -> {
156+
try {
157+
return dbReader.city(ip);
158+
} catch (AddressNotFoundException e) {
159+
throw new AddressNotFoundRuntimeException(e);
160+
} catch (Exception e) {
161+
throw new RuntimeException(e);
162+
}
163+
}));
158164

159165
Country country = response.getCountry();
160166
City city = response.getCity();
@@ -231,15 +237,16 @@ private Map<String, Object> retrieveCityGeoData(InetAddress ipAddress) {
231237

232238
private Map<String, Object> retrieveCountryGeoData(InetAddress ipAddress) {
233239
SpecialPermission.check();
234-
CountryResponse response = AccessController.doPrivileged((PrivilegedAction<CountryResponse>) () -> {
235-
try {
236-
return dbReader.country(ipAddress);
237-
} catch (AddressNotFoundException e) {
238-
throw new AddressNotFoundRuntimeException(e);
239-
} catch (Exception e) {
240-
throw new RuntimeException(e);
241-
}
242-
});
240+
CountryResponse response = AccessController.doPrivileged((PrivilegedAction<CountryResponse>) () ->
241+
cache.putIfAbsent(ipAddress, CountryResponse.class, ip -> {
242+
try {
243+
return dbReader.country(ip);
244+
} catch (AddressNotFoundException e) {
245+
throw new AddressNotFoundRuntimeException(e);
246+
} catch (Exception e) {
247+
throw new RuntimeException(e);
248+
}
249+
}));
243250

244251
Country country = response.getCountry();
245252
Continent continent = response.getContinent();
@@ -275,15 +282,16 @@ private Map<String, Object> retrieveCountryGeoData(InetAddress ipAddress) {
275282

276283
private Map<String, Object> retrieveAsnGeoData(InetAddress ipAddress) {
277284
SpecialPermission.check();
278-
AsnResponse response = AccessController.doPrivileged((PrivilegedAction<AsnResponse>) () -> {
279-
try {
280-
return dbReader.asn(ipAddress);
281-
} catch (AddressNotFoundException e) {
282-
throw new AddressNotFoundRuntimeException(e);
283-
} catch (Exception e) {
284-
throw new RuntimeException(e);
285-
}
286-
});
285+
AsnResponse response = AccessController.doPrivileged((PrivilegedAction<AsnResponse>) () ->
286+
cache.putIfAbsent(ipAddress, AsnResponse.class, ip -> {
287+
try {
288+
return dbReader.asn(ip);
289+
} catch (AddressNotFoundException e) {
290+
throw new AddressNotFoundRuntimeException(e);
291+
} catch (Exception e) {
292+
throw new RuntimeException(e);
293+
}
294+
}));
287295

288296
Integer asn = response.getAutonomousSystemNumber();
289297
String organization_name = response.getAutonomousSystemOrganization();
@@ -322,9 +330,11 @@ public static final class Factory implements Processor.Factory {
322330
);
323331

324332
private final Map<String, DatabaseReaderLazyLoader> databaseReaders;
333+
private final GeoIpCache cache;
325334

326-
public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders) {
335+
public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders, GeoIpCache cache) {
327336
this.databaseReaders = databaseReaders;
337+
this.cache = cache;
328338
}
329339

330340
@Override
@@ -368,14 +378,15 @@ public GeoIpProcessor create(Map<String, Processor.Factory> registry, String pro
368378
}
369379
}
370380

371-
return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, properties, ignoreMissing);
381+
return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, properties, ignoreMissing, cache);
372382
}
373383
}
374384

375385
// Geoip2's AddressNotFoundException is checked and due to the fact that we need run their code
376386
// inside a PrivilegedAction code block, we are forced to catch any checked exception and rethrow
377387
// it with an unchecked exception.
378-
private static final class AddressNotFoundRuntimeException extends RuntimeException {
388+
//package private for testing
389+
static final class AddressNotFoundRuntimeException extends RuntimeException {
379390

380391
AddressNotFoundRuntimeException(Throwable cause) {
381392
super(cause);

plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java

Lines changed: 83 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,20 @@
2323
import com.maxmind.db.NodeCache;
2424
import com.maxmind.db.Reader;
2525
import com.maxmind.geoip2.DatabaseReader;
26-
import org.elasticsearch.core.internal.io.IOUtils;
26+
import com.maxmind.geoip2.model.AbstractResponse;
2727
import org.elasticsearch.common.Booleans;
2828
import org.elasticsearch.common.SuppressForbidden;
29+
import org.elasticsearch.common.cache.Cache;
30+
import org.elasticsearch.common.cache.CacheBuilder;
2931
import org.elasticsearch.common.settings.Setting;
32+
import org.elasticsearch.core.internal.io.IOUtils;
3033
import org.elasticsearch.ingest.Processor;
3134
import org.elasticsearch.plugins.IngestPlugin;
3235
import org.elasticsearch.plugins.Plugin;
3336

3437
import java.io.Closeable;
3538
import java.io.IOException;
39+
import java.net.InetAddress;
3640
import java.nio.file.Files;
3741
import java.nio.file.Path;
3842
import java.nio.file.PathMatcher;
@@ -42,6 +46,8 @@
4246
import java.util.Iterator;
4347
import java.util.List;
4448
import java.util.Map;
49+
import java.util.Objects;
50+
import java.util.function.Function;
4551
import java.util.stream.Stream;
4652

4753
public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable {
@@ -61,24 +67,18 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
6167
throw new IllegalStateException("getProcessors called twice for geoip plugin!!");
6268
}
6369
Path geoIpConfigDirectory = parameters.env.configFile().resolve("ingest-geoip");
64-
NodeCache cache;
6570
long cacheSize = CACHE_SIZE.get(parameters.env.settings());
66-
if (cacheSize > 0) {
67-
cache = new GeoIpCache(cacheSize);
68-
} else {
69-
cache = NoCache.getInstance();
70-
}
7171
try {
72-
databaseReaders = loadDatabaseReaders(geoIpConfigDirectory, cache);
72+
databaseReaders = loadDatabaseReaders(geoIpConfigDirectory);
7373
} catch (IOException e) {
7474
throw new RuntimeException(e);
7575
}
76-
return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders));
76+
return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(cacheSize)));
7777
}
7878

79-
static Map<String, DatabaseReaderLazyLoader> loadDatabaseReaders(Path geoIpConfigDirectory, NodeCache cache) throws IOException {
79+
static Map<String, DatabaseReaderLazyLoader> loadDatabaseReaders(Path geoIpConfigDirectory) throws IOException {
8080
if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) {
81-
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
81+
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
8282
}
8383
boolean loadDatabaseOnHeap = Booleans.parseBoolean(System.getProperty("es.geoip.load_db_on_heap", "false"));
8484
Map<String, DatabaseReaderLazyLoader> databaseReaders = new HashMap<>();
@@ -92,7 +92,7 @@ static Map<String, DatabaseReaderLazyLoader> loadDatabaseReaders(Path geoIpConfi
9292
String databaseFileName = databasePath.getFileName().toString();
9393
DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader(databaseFileName,
9494
() -> {
95-
DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(cache);
95+
DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(NoCache.getInstance());
9696
if (loadDatabaseOnHeap) {
9797
builder.fileMode(Reader.FileMode.MEMORY);
9898
} else {
@@ -119,4 +119,75 @@ public void close() throws IOException {
119119
}
120120
}
121121

122+
/**
123+
* The in-memory cache for the geoip data. There should only be 1 instance of this class..
124+
* This cache differs from the maxmind's {@link NodeCache} such that this cache stores the deserialized Json objects to avoid the
125+
* cost of deserialization for each lookup (cached or not). This comes at slight expense of higher memory usage, but significant
126+
* reduction of CPU usage.
127+
*/
128+
static class GeoIpCache {
129+
private final Cache<CacheKey, AbstractResponse> cache;
130+
131+
//package private for testing
132+
GeoIpCache(long maxSize) {
133+
if (maxSize < 0) {
134+
throw new IllegalArgumentException("geoip max cache size must be 0 or greater");
135+
}
136+
this.cache = CacheBuilder.<CacheKey, AbstractResponse>builder().setMaximumWeight(maxSize).build();
137+
}
138+
139+
<T extends AbstractResponse> T putIfAbsent(InetAddress ip, Class<T> responseType,
140+
Function<InetAddress, AbstractResponse> retrieveFunction) {
141+
142+
//can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader)
143+
CacheKey<T> cacheKey = new CacheKey<>(ip, responseType);
144+
//intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
145+
AbstractResponse response = cache.get(cacheKey);
146+
if (response == null) {
147+
response = retrieveFunction.apply(ip);
148+
cache.put(cacheKey, response);
149+
}
150+
return responseType.cast(response);
151+
}
152+
153+
//only useful for testing
154+
<T extends AbstractResponse> T get(InetAddress ip, Class<T> responseType) {
155+
CacheKey<T> cacheKey = new CacheKey<>(ip, responseType);
156+
return responseType.cast(cache.get(cacheKey));
157+
}
158+
159+
/**
160+
* The key to use for the cache. Since this cache can span multiple geoip processors that all use different databases, the response
161+
* type is needed to be included in the cache key. For example, if we only used the IP address as the key the City and ASN the same
162+
* IP may be in both with different values and we need to cache both. The response type scopes the IP to the correct database
163+
* provides a means to safely cast the return objects.
164+
* @param <T> The AbstractResponse type used to scope the key and cast the result.
165+
*/
166+
private static class CacheKey<T extends AbstractResponse> {
167+
168+
private final InetAddress ip;
169+
private final Class<T> responseType;
170+
171+
private CacheKey(InetAddress ip, Class<T> responseType) {
172+
this.ip = ip;
173+
this.responseType = responseType;
174+
}
175+
176+
//generated
177+
@Override
178+
public boolean equals(Object o) {
179+
if (this == o) return true;
180+
if (o == null || getClass() != o.getClass()) return false;
181+
CacheKey<?> cacheKey = (CacheKey<?>) o;
182+
return Objects.equals(ip, cacheKey.ip) &&
183+
Objects.equals(responseType, cacheKey.responseType);
184+
}
185+
186+
//generated
187+
@Override
188+
public int hashCode() {
189+
return Objects.hash(ip, responseType);
190+
}
191+
}
192+
}
122193
}

plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpCacheTests.java

Lines changed: 0 additions & 51 deletions
This file was deleted.

0 commit comments

Comments
 (0)