diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java deleted file mode 100644 index 83a3374b504dd..0000000000000 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.ingest.geoip; - -import com.fasterxml.jackson.databind.JsonNode; -import com.maxmind.db.NodeCache; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.cache.Cache; -import org.elasticsearch.common.cache.CacheBuilder; - -import java.io.IOException; -import java.util.concurrent.ExecutionException; - -final class GeoIpCache implements NodeCache { - private final Cache cache; - - GeoIpCache(long maxSize) { - this.cache = CacheBuilder.builder().setMaximumWeight(maxSize).build(); - } - - @Override - public JsonNode get(int key, Loader loader) throws IOException { - try { - return cache.computeIfAbsent(key, loader::load); - } catch (ExecutionException e) { - Throwable cause = e.getCause() != null ? e.getCause() : e; - throw new ElasticsearchException(cause); - } - } -} diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index b5dbf5a7f34de..a0be7557a5a8a 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -36,6 +36,7 @@ import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; +import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache; import java.net.InetAddress; import java.security.AccessController; @@ -66,14 +67,18 @@ public final class GeoIpProcessor extends AbstractProcessor { private final DatabaseReader dbReader; private final Set properties; private final boolean ignoreMissing; + private final GeoIpCache cache; - GeoIpProcessor(String tag, String field, DatabaseReader dbReader, String targetField, Set properties, boolean ignoreMissing) { + + GeoIpProcessor(String tag, String field, DatabaseReader dbReader, String targetField, Set properties, boolean ignoreMissing, + GeoIpCache cache) { super(tag); this.field = field; this.targetField = targetField; this.dbReader = dbReader; this.properties = properties; this.ignoreMissing = ignoreMissing; + this.cache = cache; } boolean isIgnoreMissing() { @@ -146,15 +151,16 @@ Set getProperties() { private Map retrieveCityGeoData(InetAddress ipAddress) { SpecialPermission.check(); - CityResponse response = AccessController.doPrivileged((PrivilegedAction) () -> { - try { - return dbReader.city(ipAddress); - } catch (AddressNotFoundException e) { - throw new AddressNotFoundRuntimeException(e); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + CityResponse response = AccessController.doPrivileged((PrivilegedAction) () -> + cache.putIfAbsent(ipAddress, CityResponse.class, ip -> { + try { + return dbReader.city(ip); + } catch (AddressNotFoundException e) { + throw new AddressNotFoundRuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } + })); Country country = response.getCountry(); City city = response.getCity(); @@ -231,15 +237,16 @@ private Map retrieveCityGeoData(InetAddress ipAddress) { private Map retrieveCountryGeoData(InetAddress ipAddress) { SpecialPermission.check(); - CountryResponse response = AccessController.doPrivileged((PrivilegedAction) () -> { - try { - return dbReader.country(ipAddress); - } catch (AddressNotFoundException e) { - throw new AddressNotFoundRuntimeException(e); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + CountryResponse response = AccessController.doPrivileged((PrivilegedAction) () -> + cache.putIfAbsent(ipAddress, CountryResponse.class, ip -> { + try { + return dbReader.country(ip); + } catch (AddressNotFoundException e) { + throw new AddressNotFoundRuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } + })); Country country = response.getCountry(); Continent continent = response.getContinent(); @@ -275,15 +282,16 @@ private Map retrieveCountryGeoData(InetAddress ipAddress) { private Map retrieveAsnGeoData(InetAddress ipAddress) { SpecialPermission.check(); - AsnResponse response = AccessController.doPrivileged((PrivilegedAction) () -> { - try { - return dbReader.asn(ipAddress); - } catch (AddressNotFoundException e) { - throw new AddressNotFoundRuntimeException(e); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + AsnResponse response = AccessController.doPrivileged((PrivilegedAction) () -> + cache.putIfAbsent(ipAddress, AsnResponse.class, ip -> { + try { + return dbReader.asn(ip); + } catch (AddressNotFoundException e) { + throw new AddressNotFoundRuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } + })); Integer asn = response.getAutonomousSystemNumber(); String organization_name = response.getAutonomousSystemOrganization(); @@ -322,9 +330,11 @@ public static final class Factory implements Processor.Factory { ); private final Map databaseReaders; + private final GeoIpCache cache; - public Factory(Map databaseReaders) { + public Factory(Map databaseReaders, GeoIpCache cache) { this.databaseReaders = databaseReaders; + this.cache = cache; } @Override @@ -368,14 +378,15 @@ public GeoIpProcessor create(Map registry, String pro } } - return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, properties, ignoreMissing); + return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, properties, ignoreMissing, cache); } } // Geoip2's AddressNotFoundException is checked and due to the fact that we need run their code // inside a PrivilegedAction code block, we are forced to catch any checked exception and rethrow // it with an unchecked exception. - private static final class AddressNotFoundRuntimeException extends RuntimeException { + //package private for testing + static final class AddressNotFoundRuntimeException extends RuntimeException { AddressNotFoundRuntimeException(Throwable cause) { super(cause); diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index c9c742d178980..95e20f340b5ae 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -23,16 +23,20 @@ import com.maxmind.db.NodeCache; import com.maxmind.db.Reader; import com.maxmind.geoip2.DatabaseReader; -import org.elasticsearch.core.internal.io.IOUtils; +import com.maxmind.geoip2.model.AbstractResponse; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.cache.Cache; +import org.elasticsearch.common.cache.CacheBuilder; import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.Plugin; import java.io.Closeable; import java.io.IOException; +import java.net.InetAddress; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.PathMatcher; @@ -42,6 +46,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.function.Function; import java.util.stream.Stream; public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable { @@ -61,24 +67,18 @@ public Map getProcessors(Processor.Parameters paramet throw new IllegalStateException("getProcessors called twice for geoip plugin!!"); } Path geoIpConfigDirectory = parameters.env.configFile().resolve("ingest-geoip"); - NodeCache cache; long cacheSize = CACHE_SIZE.get(parameters.env.settings()); - if (cacheSize > 0) { - cache = new GeoIpCache(cacheSize); - } else { - cache = NoCache.getInstance(); - } try { - databaseReaders = loadDatabaseReaders(geoIpConfigDirectory, cache); + databaseReaders = loadDatabaseReaders(geoIpConfigDirectory); } catch (IOException e) { throw new RuntimeException(e); } - return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders)); + return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(cacheSize))); } - static Map loadDatabaseReaders(Path geoIpConfigDirectory, NodeCache cache) throws IOException { + static Map loadDatabaseReaders(Path geoIpConfigDirectory) throws IOException { if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) { - throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist"); + throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist"); } boolean loadDatabaseOnHeap = Booleans.parseBoolean(System.getProperty("es.geoip.load_db_on_heap", "false")); Map databaseReaders = new HashMap<>(); @@ -92,7 +92,7 @@ static Map loadDatabaseReaders(Path geoIpConfi String databaseFileName = databasePath.getFileName().toString(); DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader(databaseFileName, () -> { - DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(cache); + DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(NoCache.getInstance()); if (loadDatabaseOnHeap) { builder.fileMode(Reader.FileMode.MEMORY); } else { @@ -119,4 +119,75 @@ public void close() throws IOException { } } + /** + * The in-memory cache for the geoip data. There should only be 1 instance of this class.. + * This cache differs from the maxmind's {@link NodeCache} such that this cache stores the deserialized Json objects to avoid the + * cost of deserialization for each lookup (cached or not). This comes at slight expense of higher memory usage, but significant + * reduction of CPU usage. + */ + static class GeoIpCache { + private final Cache cache; + + //package private for testing + GeoIpCache(long maxSize) { + if (maxSize < 0) { + throw new IllegalArgumentException("geoip max cache size must be 0 or greater"); + } + this.cache = CacheBuilder.builder().setMaximumWeight(maxSize).build(); + } + + T putIfAbsent(InetAddress ip, Class responseType, + Function retrieveFunction) { + + //can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader) + CacheKey cacheKey = new CacheKey<>(ip, responseType); + //intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition. + AbstractResponse response = cache.get(cacheKey); + if (response == null) { + response = retrieveFunction.apply(ip); + cache.put(cacheKey, response); + } + return responseType.cast(response); + } + + //only useful for testing + T get(InetAddress ip, Class responseType) { + CacheKey cacheKey = new CacheKey<>(ip, responseType); + return responseType.cast(cache.get(cacheKey)); + } + + /** + * The key to use for the cache. Since this cache can span multiple geoip processors that all use different databases, the response + * type is needed to be included in the cache key. For example, if we only used the IP address as the key the City and ASN the same + * IP may be in both with different values and we need to cache both. The response type scopes the IP to the correct database + * provides a means to safely cast the return objects. + * @param The AbstractResponse type used to scope the key and cast the result. + */ + private static class CacheKey { + + private final InetAddress ip; + private final Class responseType; + + private CacheKey(InetAddress ip, Class responseType) { + this.ip = ip; + this.responseType = responseType; + } + + //generated + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CacheKey cacheKey = (CacheKey) o; + return Objects.equals(ip, cacheKey.ip) && + Objects.equals(responseType, cacheKey.responseType); + } + + //generated + @Override + public int hashCode() { + return Objects.hash(ip, responseType); + } + } + } } diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpCacheTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpCacheTests.java deleted file mode 100644 index 71cab99115fc7..0000000000000 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpCacheTests.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.ingest.geoip; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.IntNode; -import com.maxmind.db.NodeCache; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.test.ESTestCase; - -public class GeoIpCacheTests extends ESTestCase { - public void testCachesAndEvictsResults() throws Exception { - GeoIpCache cache = new GeoIpCache(1); - final NodeCache.Loader loader = key -> new IntNode(key); - - JsonNode jsonNode1 = cache.get(1, loader); - assertSame(jsonNode1, cache.get(1, loader)); - - // evict old key by adding another value - cache.get(2, loader); - - assertNotSame(jsonNode1, cache.get(1, loader)); - } - - public void testThrowsElasticsearchException() throws Exception { - GeoIpCache cache = new GeoIpCache(1); - NodeCache.Loader loader = (int key) -> { - throw new IllegalArgumentException("Illegal key"); - }; - ElasticsearchException ex = expectThrows(ElasticsearchException.class, () -> cache.get(1, loader)); - assertTrue("Expected cause to be of type IllegalArgumentException but was [" + ex.getCause().getClass() + "]", - ex.getCause() instanceof IllegalArgumentException); - assertEquals("Illegal key", ex.getCause().getMessage()); - } -} diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index 7a5d6f5808f76..316cfbc152c57 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -20,11 +20,10 @@ package org.elasticsearch.ingest.geoip; import com.carrotsearch.randomizedtesting.generators.RandomPicks; -import com.maxmind.db.NoCache; -import com.maxmind.db.NodeCache; import org.apache.lucene.util.Constants; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Randomness; +import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.StreamsUtils; import org.junit.AfterClass; @@ -69,8 +68,7 @@ public static void loadDatabaseReaders() throws IOException { Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")), geoIpConfigDir.resolve("GeoLite2-ASN.mmdb")); - NodeCache cache = randomFrom(NoCache.getInstance(), new GeoIpCache(randomNonNegativeLong())); - databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir, cache); + databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir); } @AfterClass @@ -92,7 +90,7 @@ public void testBuildDefaults() throws Exception { // This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected. // As a consequence, the corresponding file appears to be still in use and Windows cannot delete it. assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); Map config = new HashMap<>(); config.put("field", "_field"); @@ -111,7 +109,7 @@ public void testSetIgnoreMissing() throws Exception { // This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected. // As a consequence, the corresponding file appears to be still in use and Windows cannot delete it. assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); Map config = new HashMap<>(); config.put("field", "_field"); @@ -131,7 +129,7 @@ public void testCountryBuildDefaults() throws Exception { // This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected. // As a consequence, the corresponding file appears to be still in use and Windows cannot delete it. assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); Map config = new HashMap<>(); config.put("field", "_field"); @@ -152,7 +150,7 @@ public void testAsnBuildDefaults() throws Exception { // This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected. // As a consequence, the corresponding file appears to be still in use and Windows cannot delete it. assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); Map config = new HashMap<>(); config.put("field", "_field"); @@ -173,7 +171,7 @@ public void testBuildTargetField() throws Exception { // This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected. // As a consequence, the corresponding file appears to be still in use and Windows cannot delete it. assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); Map config = new HashMap<>(); config.put("field", "_field"); config.put("target_field", "_field"); @@ -187,7 +185,7 @@ public void testBuildDbFile() throws Exception { // This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected. // As a consequence, the corresponding file appears to be still in use and Windows cannot delete it. assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-Country.mmdb"); @@ -203,7 +201,7 @@ public void testBuildWithCountryDbAndAsnFields() throws Exception { // This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected. // As a consequence, the corresponding file appears to be still in use and Windows cannot delete it. assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-Country.mmdb"); @@ -220,7 +218,7 @@ public void testBuildWithAsnDbAndCityFields() throws Exception { // This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected. // As a consequence, the corresponding file appears to be still in use and Windows cannot delete it. assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-ASN.mmdb"); @@ -237,7 +235,7 @@ public void testBuildNonExistingDbFile() throws Exception { // This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected. // As a consequence, the corresponding file appears to be still in use and Windows cannot delete it. assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); Map config = new HashMap<>(); config.put("field", "_field"); @@ -250,7 +248,7 @@ public void testBuildFields() throws Exception { // This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected. // As a consequence, the corresponding file appears to be still in use and Windows cannot delete it. assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); Set properties = EnumSet.noneOf(GeoIpProcessor.Property.class); List fieldNames = new ArrayList<>(); @@ -277,7 +275,7 @@ public void testBuildIllegalFieldOption() throws Exception { // This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected. // As a consequence, the corresponding file appears to be still in use and Windows cannot delete it. assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); Map config1 = new HashMap<>(); config1.put("field", "_field"); @@ -311,8 +309,8 @@ public void testLazyLoading() throws Exception { // database readers used at class level are reused between tests. (we want to keep that otherwise running this // test will take roughly 4 times more time) Map databaseReaders = - IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir, NoCache.getInstance()); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); + IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) { assertNull(lazyLoader.databaseReader.get()); } diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java index 4c04d4e340a71..4da680f186e5b 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java @@ -20,8 +20,9 @@ package org.elasticsearch.ingest.geoip; import com.maxmind.geoip2.DatabaseReader; -import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.RandomDocumentPicks; +import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache; import org.elasticsearch.test.ESTestCase; import java.io.InputStream; @@ -40,7 +41,8 @@ public class GeoIpProcessorTests extends ESTestCase { public void testCity() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false); + new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); Map document = new HashMap<>(); document.put("source_field", "8.8.8.8"); @@ -64,7 +66,8 @@ public void testCity() throws Exception { public void testNullValueWithIgnoreMissing() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true); + new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, + new GeoIpCache(1000)); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("source_field", null)); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); @@ -75,7 +78,8 @@ public void testNullValueWithIgnoreMissing() throws Exception { public void testNonExistentWithIgnoreMissing() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true); + new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, + new GeoIpCache(1000)); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); processor.execute(ingestDocument); @@ -85,7 +89,8 @@ public void testNonExistentWithIgnoreMissing() throws Exception { public void testNullWithoutIgnoreMissing() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false); + new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("source_field", null)); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); @@ -96,7 +101,8 @@ public void testNullWithoutIgnoreMissing() throws Exception { public void testNonExistentWithoutIgnoreMissing() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false); + new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); Exception exception = expectThrows(Exception.class, () -> processor.execute(ingestDocument)); @@ -106,7 +112,8 @@ public void testNonExistentWithoutIgnoreMissing() throws Exception { public void testCity_withIpV6() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false); + new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); String address = "2602:306:33d3:8000::3257:9652"; Map document = new HashMap<>(); @@ -135,7 +142,8 @@ public void testCity_withIpV6() throws Exception { public void testCityWithMissingLocation() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false); + new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); Map document = new HashMap<>(); document.put("source_field", "80.231.5.0"); @@ -152,7 +160,8 @@ public void testCityWithMissingLocation() throws Exception { public void testCountry() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-Country.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false); + new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); Map document = new HashMap<>(); document.put("source_field", "82.170.213.79"); @@ -172,7 +181,8 @@ public void testCountry() throws Exception { public void testCountryWithMissingLocation() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-Country.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false); + new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); Map document = new HashMap<>(); document.put("source_field", "80.231.5.0"); @@ -190,7 +200,8 @@ public void testAsn() throws Exception { String ip = "82.170.213.79"; InputStream database = getDatabaseFileInputStream("/GeoLite2-ASN.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false); + new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); Map document = new HashMap<>(); document.put("source_field", ip); @@ -209,7 +220,8 @@ public void testAsn() throws Exception { public void testAddressIsNotInTheDatabase() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false); + new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); Map document = new HashMap<>(); document.put("source_field", "127.0.0.1"); @@ -222,7 +234,8 @@ public void testAddressIsNotInTheDatabase() throws Exception { public void testInvalid() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false); + new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); Map document = new HashMap<>(); document.put("source_field", "www.google.com"); diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/IngestGeoIpPluginTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/IngestGeoIpPluginTests.java new file mode 100644 index 0000000000000..884056bb0be8b --- /dev/null +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/IngestGeoIpPluginTests.java @@ -0,0 +1,64 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.geoip; + +import com.maxmind.geoip2.model.AbstractResponse; +import org.elasticsearch.common.network.InetAddresses; +import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache; +import org.elasticsearch.test.ESTestCase; + +import static org.mockito.Mockito.mock; + +public class IngestGeoIpPluginTests extends ESTestCase { + + public void testCachesAndEvictsResults() { + GeoIpCache cache = new GeoIpCache(1); + AbstractResponse response1 = mock(AbstractResponse.class); + AbstractResponse response2 = mock(AbstractResponse.class); + + //add a key + AbstractResponse cachedResponse = cache.putIfAbsent(InetAddresses.forString("127.0.0.1"), AbstractResponse.class, ip -> response1); + assertSame(cachedResponse, response1); + assertSame(cachedResponse, cache.putIfAbsent(InetAddresses.forString("127.0.0.1"), AbstractResponse.class, ip -> response1)); + assertSame(cachedResponse, cache.get(InetAddresses.forString("127.0.0.1"), AbstractResponse.class)); + + + // evict old key by adding another value + cachedResponse = cache.putIfAbsent(InetAddresses.forString("127.0.0.2"), AbstractResponse.class, ip -> response2); + assertSame(cachedResponse, response2); + assertSame(cachedResponse, cache.putIfAbsent(InetAddresses.forString("127.0.0.2"), AbstractResponse.class, ip -> response2)); + assertSame(cachedResponse, cache.get(InetAddresses.forString("127.0.0.2"), AbstractResponse.class)); + + assertNotSame(response1, cache.get(InetAddresses.forString("127.0.0.1"), AbstractResponse.class)); + } + + public void testThrowsFunctionsException() { + GeoIpCache cache = new GeoIpCache(1); + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, + () -> cache.putIfAbsent(InetAddresses.forString("127.0.0.1"), AbstractResponse.class, + ip -> { throw new IllegalArgumentException("bad"); })); + assertEquals("bad", ex.getMessage()); + } + + public void testInvalidInit() { + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new GeoIpCache(-1)); + assertEquals("geoip max cache size must be 0 or greater", ex.getMessage()); + } +}