From 94be8374eacb9ee164a6be40559cfa385474603f Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Fri, 16 Dec 2016 16:03:33 +0100 Subject: [PATCH 1/4] Cache results of geoip lookups With this commit, we introduce a cache to the geoip ingest processor. The cache is disabled by default but can be enabled by setting `ingest.geoip.cache_enabled` to `true`. The cache size is controlled by the setting `ingest.geoip.cache_size`. Closes #22074 --- docs/plugins/ingest-geoip.asciidoc | 18 +++++++ .../ingest/geoip/GeoIpCache.java | 46 +++++++++++++++++ .../ingest/geoip/IngestGeoIpPlugin.java | 30 +++++++++-- .../ingest/geoip/GeoIpCacheTests.java | 51 +++++++++++++++++++ .../geoip/GeoIpProcessorFactoryTests.java | 15 +++++- 5 files changed, 156 insertions(+), 4 deletions(-) create mode 100644 plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java create mode 100644 plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpCacheTests.java diff --git a/docs/plugins/ingest-geoip.asciidoc b/docs/plugins/ingest-geoip.asciidoc index 0481ad40ab653..c55d81db92011 100644 --- a/docs/plugins/ingest-geoip.asciidoc +++ b/docs/plugins/ingest-geoip.asciidoc @@ -203,3 +203,21 @@ Which returns: } -------------------------------------------------- // TESTRESPONSE + +[[ingest-geoip-settings]] +===== Settings + +The geoip processor can cache results. Caching is disabled by default but can be enabled with the +setting `ingest.geoip.cache_enabled`. The cache size is controlled by . + +The geoip processor supports the following settings: + +`ingest.geoip.cache_enabled`:: + + Whether to enable caching of results. Defaults to `false`. + +`ingest.geoip.cache_size`:: + + The maximum number of results that should be cached. Defaults to `1000`. + +Note that these settings apply to all geoip processors, i.e. there is one cache for all defined processors. \ No newline at end of file diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java new file mode 100644 index 0000000000000..83a3374b504dd --- /dev/null +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java @@ -0,0 +1,46 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.ingest.geoip; + +import com.fasterxml.jackson.databind.JsonNode; +import com.maxmind.db.NodeCache; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.cache.Cache; +import org.elasticsearch.common.cache.CacheBuilder; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +final class GeoIpCache implements NodeCache { + private final Cache cache; + + GeoIpCache(long maxSize) { + this.cache = CacheBuilder.builder().setMaximumWeight(maxSize).build(); + } + + @Override + public JsonNode get(int key, Loader loader) throws IOException { + try { + return cache.computeIfAbsent(key, loader::load); + } catch (ExecutionException e) { + Throwable cause = e.getCause() != null ? e.getCause() : e; + throw new ElasticsearchException(cause); + } + } +} diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index 6d5af71aa5bdf..1c2426549d7e7 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -26,23 +26,37 @@ import java.nio.file.Path; import java.nio.file.PathMatcher; import java.nio.file.StandardOpenOption; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.stream.Stream; import java.util.zip.GZIPInputStream; +import com.maxmind.db.NoCache; +import com.maxmind.db.NodeCache; import com.maxmind.geoip2.DatabaseReader; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.Plugin; public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable { + public static final Setting CACHE_ENABLED = + Setting.boolSetting("ingest.geoip.cache_enabled", false, Setting.Property.NodeScope); + public static final Setting CACHE_SIZE = + Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope); private Map databaseReaders; + @Override + public List> getSettings() { + return Arrays.asList(CACHE_ENABLED, CACHE_SIZE); + } + @Override public Map getProcessors(Processor.Parameters parameters) { if (databaseReaders != null) { @@ -50,18 +64,26 @@ public Map getProcessors(Processor.Parameters paramet } Path geoIpConfigDirectory = parameters.env.configFile().resolve("ingest-geoip"); try { - databaseReaders = loadDatabaseReaders(geoIpConfigDirectory); + databaseReaders = loadDatabaseReaders(parameters, geoIpConfigDirectory); } catch (IOException e) { throw new RuntimeException(e); } return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders)); } - static Map loadDatabaseReaders(Path geoIpConfigDirectory) throws IOException { + static Map loadDatabaseReaders(Processor.Parameters parameters, Path geoIpConfigDirectory) throws IOException { if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) { throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist"); } + NodeCache cache; + if (CACHE_ENABLED.get(parameters.env.settings())) { + long cacheSize = CACHE_SIZE.get(parameters.env.settings()); + cache = new GeoIpCache(cacheSize); + } else { + cache = NoCache.getInstance(); + } + Map databaseReaders = new HashMap<>(); try (Stream databaseFiles = Files.list(geoIpConfigDirectory)) { PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb.gz"); @@ -71,7 +93,8 @@ static Map loadDatabaseReaders(Path geoIpConfigDirectory Path databasePath = iterator.next(); if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) { try (InputStream inputStream = new GZIPInputStream(Files.newInputStream(databasePath, StandardOpenOption.READ))) { - databaseReaders.put(databasePath.getFileName().toString(), new DatabaseReader.Builder(inputStream).build()); + databaseReaders.put(databasePath.getFileName().toString(), + new DatabaseReader.Builder(inputStream).withCache(cache).build()); } } } @@ -85,4 +108,5 @@ public void close() throws IOException { IOUtils.close(databaseReaders.values()); } } + } diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpCacheTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpCacheTests.java new file mode 100644 index 0000000000000..71cab99115fc7 --- /dev/null +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpCacheTests.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.ingest.geoip; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.IntNode; +import com.maxmind.db.NodeCache; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.test.ESTestCase; + +public class GeoIpCacheTests extends ESTestCase { + public void testCachesAndEvictsResults() throws Exception { + GeoIpCache cache = new GeoIpCache(1); + final NodeCache.Loader loader = key -> new IntNode(key); + + JsonNode jsonNode1 = cache.get(1, loader); + assertSame(jsonNode1, cache.get(1, loader)); + + // evict old key by adding another value + cache.get(2, loader); + + assertNotSame(jsonNode1, cache.get(1, loader)); + } + + public void testThrowsElasticsearchException() throws Exception { + GeoIpCache cache = new GeoIpCache(1); + NodeCache.Loader loader = (int key) -> { + throw new IllegalArgumentException("Illegal key"); + }; + ElasticsearchException ex = expectThrows(ElasticsearchException.class, () -> cache.get(1, loader)); + assertTrue("Expected cause to be of type IllegalArgumentException but was [" + ex.getCause().getClass() + "]", + ex.getCause() instanceof IllegalArgumentException); + assertEquals("Illegal key", ex.getCause().getMessage()); + } +} diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index ec4db09cd960f..418c8cf885623 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -23,6 +23,10 @@ import com.maxmind.geoip2.DatabaseReader; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.env.Environment; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.StreamsUtils; import org.junit.AfterClass; @@ -57,7 +61,16 @@ public static void loadDatabaseReaders() throws IOException { geoIpConfigDir.resolve("GeoLite2-City.mmdb.gz")); Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb.gz")), geoIpConfigDir.resolve("GeoLite2-Country.mmdb.gz")); - databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir); + + Settings settings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) + .put(IngestGeoIpPlugin.CACHE_ENABLED.getKey(), randomBoolean()) + .put(IngestGeoIpPlugin.CACHE_SIZE.getKey(), randomPositiveLong()) + .build(); + + databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders( + new Processor.Parameters(new Environment(settings), null, null, null, + new ThreadContext(settings)), geoIpConfigDir); } @AfterClass From 43df574bea9d177b09ddf3983f40c047a4163dea Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Fri, 16 Dec 2016 16:08:14 +0100 Subject: [PATCH 2/4] Remove outdated docs --- docs/plugins/ingest-geoip.asciidoc | 3 --- 1 file changed, 3 deletions(-) diff --git a/docs/plugins/ingest-geoip.asciidoc b/docs/plugins/ingest-geoip.asciidoc index c55d81db92011..61edba359a776 100644 --- a/docs/plugins/ingest-geoip.asciidoc +++ b/docs/plugins/ingest-geoip.asciidoc @@ -207,9 +207,6 @@ Which returns: [[ingest-geoip-settings]] ===== Settings -The geoip processor can cache results. Caching is disabled by default but can be enabled with the -setting `ingest.geoip.cache_enabled`. The cache size is controlled by . - The geoip processor supports the following settings: `ingest.geoip.cache_enabled`:: From c77e21616bc7e50e66140bd25177a8f6fedc656d Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Mon, 19 Dec 2016 09:52:16 +0100 Subject: [PATCH 3/4] Address review comments --- docs/plugins/ingest-geoip.asciidoc | 10 +++------ .../ingest/geoip/IngestGeoIpPlugin.java | 21 +++++++++---------- .../geoip/GeoIpProcessorFactoryTests.java | 17 ++++----------- 3 files changed, 17 insertions(+), 31 deletions(-) diff --git a/docs/plugins/ingest-geoip.asciidoc b/docs/plugins/ingest-geoip.asciidoc index 61edba359a776..95e7a0442a454 100644 --- a/docs/plugins/ingest-geoip.asciidoc +++ b/docs/plugins/ingest-geoip.asciidoc @@ -205,16 +205,12 @@ Which returns: // TESTRESPONSE [[ingest-geoip-settings]] -===== Settings +===== Node Settings -The geoip processor supports the following settings: - -`ingest.geoip.cache_enabled`:: - - Whether to enable caching of results. Defaults to `false`. +The geoip processor supports the following setting: `ingest.geoip.cache_size`:: The maximum number of results that should be cached. Defaults to `1000`. -Note that these settings apply to all geoip processors, i.e. there is one cache for all defined processors. \ No newline at end of file +Note that these settings are node settings and apply to all geoip processors, i.e. there is one cache for all defined geoip processors. \ No newline at end of file diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index 1c2426549d7e7..6667ae30521d5 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -54,7 +54,7 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable @Override public List> getSettings() { - return Arrays.asList(CACHE_ENABLED, CACHE_SIZE); + return Arrays.asList(CACHE_SIZE); } @Override @@ -63,27 +63,26 @@ public Map getProcessors(Processor.Parameters paramet throw new IllegalStateException("getProcessors called twice for geoip plugin!!"); } Path geoIpConfigDirectory = parameters.env.configFile().resolve("ingest-geoip"); + NodeCache cache; + long cacheSize = CACHE_SIZE.get(parameters.env.settings()); + if (cacheSize > 0) { + cache = new GeoIpCache(cacheSize); + } else { + cache = NoCache.getInstance(); + } try { - databaseReaders = loadDatabaseReaders(parameters, geoIpConfigDirectory); + databaseReaders = loadDatabaseReaders(geoIpConfigDirectory, cache); } catch (IOException e) { throw new RuntimeException(e); } return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders)); } - static Map loadDatabaseReaders(Processor.Parameters parameters, Path geoIpConfigDirectory) throws IOException { + static Map loadDatabaseReaders(Path geoIpConfigDirectory, NodeCache cache) throws IOException { if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) { throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist"); } - NodeCache cache; - if (CACHE_ENABLED.get(parameters.env.settings())) { - long cacheSize = CACHE_SIZE.get(parameters.env.settings()); - cache = new GeoIpCache(cacheSize); - } else { - cache = NoCache.getInstance(); - } - Map databaseReaders = new HashMap<>(); try (Stream databaseFiles = Files.list(geoIpConfigDirectory)) { PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb.gz"); diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index 418c8cf885623..162137b5f3cd8 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -20,13 +20,11 @@ package org.elasticsearch.ingest.geoip; import com.carrotsearch.randomizedtesting.generators.RandomPicks; +import com.maxmind.db.NoCache; +import com.maxmind.db.NodeCache; import com.maxmind.geoip2.DatabaseReader; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Randomness; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.env.Environment; -import org.elasticsearch.ingest.Processor; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.StreamsUtils; import org.junit.AfterClass; @@ -62,15 +60,8 @@ public static void loadDatabaseReaders() throws IOException { Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb.gz")), geoIpConfigDir.resolve("GeoLite2-Country.mmdb.gz")); - Settings settings = Settings.builder() - .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) - .put(IngestGeoIpPlugin.CACHE_ENABLED.getKey(), randomBoolean()) - .put(IngestGeoIpPlugin.CACHE_SIZE.getKey(), randomPositiveLong()) - .build(); - - databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders( - new Processor.Parameters(new Environment(settings), null, null, null, - new ThreadContext(settings)), geoIpConfigDir); + NodeCache cache = randomFrom(NoCache.getInstance(), new GeoIpCache(randomPositiveLong())); + databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir, cache); } @AfterClass From 61ac9e356411a1c1bdb3b4f0711f7b75ead691f7 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Mon, 19 Dec 2016 09:53:22 +0100 Subject: [PATCH 4/4] Remove CACHE_ENABLED setting --- .../java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index 6667ae30521d5..4e5cc5c023702 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -45,8 +45,6 @@ import org.elasticsearch.plugins.Plugin; public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable { - public static final Setting CACHE_ENABLED = - Setting.boolSetting("ingest.geoip.cache_enabled", false, Setting.Property.NodeScope); public static final Setting CACHE_SIZE = Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope);