Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/plugins/ingest-geoip.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,14 @@ Which returns:
}
--------------------------------------------------
// TESTRESPONSE

[[ingest-geoip-settings]]
===== Node Settings

The geoip processor supports the following setting:

`ingest.geoip.cache_size`::
Copy link
Contributor

@dadoonet dadoonet Dec 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also simply that and have one single setting actually. If ingest.geoip.cache_size == 0, then use a NoCache instance. Otherwise use the one you created.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, neat idea.


The maximum number of results that should be cached. Defaults to `1000`.

Note that these settings are node settings and apply to all geoip processors, i.e. there is one cache for all defined geoip processors.
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.geoip;

import com.fasterxml.jackson.databind.JsonNode;
import com.maxmind.db.NodeCache;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

final class GeoIpCache implements NodeCache {
private final Cache<Integer, JsonNode> cache;

GeoIpCache(long maxSize) {
this.cache = CacheBuilder.<Integer, JsonNode>builder().setMaximumWeight(maxSize).build();
}

@Override
public JsonNode get(int key, Loader loader) throws IOException {
try {
return cache.computeIfAbsent(key, loader::load);
} catch (ExecutionException e) {
Throwable cause = e.getCause() != null ? e.getCause() : e;
throw new ElasticsearchException(cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,38 +26,57 @@
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;

import com.maxmind.db.NoCache;
import com.maxmind.db.NodeCache;
import com.maxmind.geoip2.DatabaseReader;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;

public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable {
public static final Setting<Long> CACHE_SIZE =
Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope);

private Map<String, DatabaseReader> databaseReaders;

@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(CACHE_SIZE);
}

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
if (databaseReaders != null) {
throw new IllegalStateException("getProcessors called twice for geoip plugin!!");
}
Path geoIpConfigDirectory = parameters.env.configFile().resolve("ingest-geoip");
NodeCache cache;
long cacheSize = CACHE_SIZE.get(parameters.env.settings());
if (cacheSize > 0) {
cache = new GeoIpCache(cacheSize);
} else {
cache = NoCache.getInstance();
}
try {
databaseReaders = loadDatabaseReaders(geoIpConfigDirectory);
databaseReaders = loadDatabaseReaders(geoIpConfigDirectory, cache);
} catch (IOException e) {
throw new RuntimeException(e);
}
return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders));
}

static Map<String, DatabaseReader> loadDatabaseReaders(Path geoIpConfigDirectory) throws IOException {
static Map<String, DatabaseReader> loadDatabaseReaders(Path geoIpConfigDirectory, NodeCache cache) throws IOException {
if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) {
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
}
Expand All @@ -71,7 +90,8 @@ static Map<String, DatabaseReader> loadDatabaseReaders(Path geoIpConfigDirectory
Path databasePath = iterator.next();
if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) {
try (InputStream inputStream = new GZIPInputStream(Files.newInputStream(databasePath, StandardOpenOption.READ))) {
databaseReaders.put(databasePath.getFileName().toString(), new DatabaseReader.Builder(inputStream).build());
databaseReaders.put(databasePath.getFileName().toString(),
new DatabaseReader.Builder(inputStream).withCache(cache).build());
}
}
}
Expand All @@ -85,4 +105,5 @@ public void close() throws IOException {
IOUtils.close(databaseReaders.values());
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.geoip;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.maxmind.db.NodeCache;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.test.ESTestCase;

public class GeoIpCacheTests extends ESTestCase {
public void testCachesAndEvictsResults() throws Exception {
GeoIpCache cache = new GeoIpCache(1);
final NodeCache.Loader loader = key -> new IntNode(key);

JsonNode jsonNode1 = cache.get(1, loader);
assertSame(jsonNode1, cache.get(1, loader));

// evict old key by adding another value
cache.get(2, loader);

assertNotSame(jsonNode1, cache.get(1, loader));
}

public void testThrowsElasticsearchException() throws Exception {
GeoIpCache cache = new GeoIpCache(1);
NodeCache.Loader loader = (int key) -> {
throw new IllegalArgumentException("Illegal key");
};
ElasticsearchException ex = expectThrows(ElasticsearchException.class, () -> cache.get(1, loader));
assertTrue("Expected cause to be of type IllegalArgumentException but was [" + ex.getCause().getClass() + "]",
ex.getCause() instanceof IllegalArgumentException);
assertEquals("Illegal key", ex.getCause().getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.ingest.geoip;

import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.maxmind.db.NoCache;
import com.maxmind.db.NodeCache;
import com.maxmind.geoip2.DatabaseReader;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Randomness;
Expand Down Expand Up @@ -57,7 +59,9 @@ public static void loadDatabaseReaders() throws IOException {
geoIpConfigDir.resolve("GeoLite2-City.mmdb.gz"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb.gz")),
geoIpConfigDir.resolve("GeoLite2-Country.mmdb.gz"));
databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir);

NodeCache cache = randomFrom(NoCache.getInstance(), new GeoIpCache(randomPositiveLong()));
databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir, cache);
}

@AfterClass
Expand Down