Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
4616ea0
GeoIP database downloader
probakowski Feb 3, 2021
135c91b
delete old chunks
probakowski Feb 3, 2021
cc945c0
remove unneeded changes
probakowski Feb 4, 2021
d1d1406
fix forbidden apis
probakowski Feb 4, 2021
fd5eb8f
fix test
probakowski Feb 4, 2021
1fd0506
endpoint setting static
probakowski Feb 4, 2021
1948c6b
endpoint setting static
probakowski Feb 4, 2021
b26a088
disable downloader in all testclusters
probakowski Feb 4, 2021
ba28d0f
Merge branch 'master' into geoipv2
probakowski Feb 4, 2021
b300312
disable downloader in tests
probakowski Feb 4, 2021
e66cd87
disable downloader in tests
probakowski Feb 5, 2021
1b8a46c
add feature flag for geoipv2 downloader
probakowski Feb 9, 2021
5d58fc3
Merge branch 'master' into geoipv2
probakowski Feb 9, 2021
5720a6e
remove unneeded changes
probakowski Feb 9, 2021
3d4fef9
tests
probakowski Feb 10, 2021
5fec318
register setting based on flag
probakowski Feb 10, 2021
0f6e402
move local http server to fixture
probakowski Feb 11, 2021
ca23c99
Merge branch 'master' into geoipv2
elasticmachine Feb 11, 2021
ff472aa
fix setting
probakowski Feb 11, 2021
04fa17e
Task
probakowski Feb 15, 2021
385ba61
Merge branch 'master' into geoipv2
probakowski Feb 16, 2021
a6a709a
move download to allocated task
probakowski Feb 16, 2021
770a2fc
additional check
probakowski Feb 16, 2021
39fbc7a
remove unneeded origins
probakowski Feb 16, 2021
d635af4
unused import
probakowski Feb 16, 2021
16af3b4
comment
probakowski Feb 16, 2021
e44dedb
cleanup
probakowski Feb 17, 2021
6727338
review comments
probakowski Feb 17, 2021
d5060fb
serialization test
probakowski Feb 17, 2021
356bde7
md5 verification
probakowski Feb 17, 2021
dcb3d67
unused imports
probakowski Feb 17, 2021
411e422
Merge branch 'master' into geoipv2
elasticmachine Feb 17, 2021
042b631
review comment
probakowski Feb 19, 2021
4c0f157
add javadocs
probakowski Feb 23, 2021
bccc426
Merge branch 'master' into geoipv2
elasticmachine Feb 23, 2021
09c90f9
Merge branch 'master' into geoipv2
probakowski Feb 23, 2021
d3e5aee
remove unused imports
probakowski Feb 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 43 additions & 15 deletions modules/ingest-geoip/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import org.apache.tools.ant.taskdefs.condition.Os
apply plugin: 'elasticsearch.yaml-rest-test'
apply plugin: 'elasticsearch.internal-cluster-test'

final Project fixture = project(':test:fixtures:geoip-fixture')

esplugin {
description 'Ingest processor that uses lookup geo data based on IP adresses using the MaxMind geo database'
classname 'org.elasticsearch.ingest.geoip.IngestGeoIpPlugin'
Expand All @@ -24,6 +26,7 @@ dependencies {
api('com.maxmind.db:maxmind-db:1.3.1')

testImplementation 'org.elasticsearch:geolite2-databases:20191119'
internalClusterTestImplementation project(path: ":modules:reindex")
}

restResources {
Expand All @@ -32,6 +35,31 @@ restResources {
}
}

def useFixture = System.getenv("geoip_use_service") != "true"

if (useFixture) {
apply plugin: 'elasticsearch.test.fixtures'
testFixtures.useFixture(fixture.path, 'geoip-fixture')

task "beforeInternalClusterTest" {
dependsOn ':test:fixtures:geoip-fixture:postProcessFixture'
doLast {
int ephemeralPort = fixture.postProcessFixture.ext."test.fixtures.geoip-fixture.tcp.80"
assert ephemeralPort > 0
internalClusterTest {
nonInputProperties.systemProperty "geoip_endpoint", "http://127.0.0.1:" + ephemeralPort
}
}
}
}

internalClusterTest {
systemProperty "es.geoip_v2_feature_flag_enabled", "true"
if (useFixture) {
dependsOn "beforeInternalClusterTest"
}
}

tasks.register("copyDefaultGeoIp2DatabaseFiles", Copy) {
from { zipTree(configurations.testCompileClasspath.files.find { it.name.contains('geolite2-databases') }) }
into "${project.buildDir}/ingest-geoip"
Expand All @@ -47,21 +75,21 @@ tasks.named("bundlePlugin").configure {

tasks.named("thirdPartyAudit").configure {
ignoreMissingClasses(
// geoip WebServiceClient needs apache http client, but we're not using WebServiceClient:
'org.apache.http.HttpEntity',
'org.apache.http.HttpHost',
'org.apache.http.HttpResponse',
'org.apache.http.StatusLine',
'org.apache.http.auth.UsernamePasswordCredentials',
'org.apache.http.client.config.RequestConfig$Builder',
'org.apache.http.client.config.RequestConfig',
'org.apache.http.client.methods.CloseableHttpResponse',
'org.apache.http.client.methods.HttpGet',
'org.apache.http.client.utils.URIBuilder',
'org.apache.http.impl.auth.BasicScheme',
'org.apache.http.impl.client.CloseableHttpClient',
'org.apache.http.impl.client.HttpClientBuilder',
'org.apache.http.util.EntityUtils'
// geoip WebServiceClient needs apache http client, but we're not using WebServiceClient:
'org.apache.http.HttpEntity',
'org.apache.http.HttpHost',
'org.apache.http.HttpResponse',
'org.apache.http.StatusLine',
'org.apache.http.auth.UsernamePasswordCredentials',
'org.apache.http.client.config.RequestConfig$Builder',
'org.apache.http.client.config.RequestConfig',
'org.apache.http.client.methods.CloseableHttpResponse',
'org.apache.http.client.methods.HttpGet',
'org.apache.http.client.utils.URIBuilder',
'org.apache.http.impl.auth.BasicScheme',
'org.apache.http.impl.client.CloseableHttpClient',
'org.apache.http.impl.client.HttpClientBuilder',
'org.apache.http.util.EntityUtils'
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest.geoip;

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.StreamsUtils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

public abstract class AbstractGeoIpIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(IngestGeoIpPlugin.class, IngestGeoIpSettingsPlugin.class);
}

@Override
protected Settings nodeSettings(final int nodeOrdinal) {
final Path databasePath = createTempDir();
try {
Files.createDirectories(databasePath);
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")),
databasePath.resolve("GeoLite2-City.mmdb"));
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")),
databasePath.resolve("GeoLite2-Country.mmdb"));
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")),
databasePath.resolve("GeoLite2-ASN.mmdb"));
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
return Settings.builder()
.put("ingest.geoip.database_path", databasePath)
.put(super.nodeSettings(nodeOrdinal))
.build();
}

public static class IngestGeoIpSettingsPlugin extends Plugin {

@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest.geoip;

import com.maxmind.geoip2.DatabaseReader;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;

import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;

import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
import static java.nio.file.StandardOpenOption.WRITE;

@ClusterScope(scope = Scope.TEST, maxNumDataNodes = 1)
public class GeoIpDownloaderIT extends AbstractGeoIpIT {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(ReindexPlugin.class, IngestGeoIpPlugin.class, GeoIpProcessorNonIngestNodeIT.IngestGeoIpSettingsPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder settings = Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), false);
String endpoint = System.getProperty("geoip_endpoint");
if (endpoint != null) {
settings.put(GeoIpDownloader.ENDPOINT_SETTING.getKey(), endpoint);
}
return settings.build();
}

public void testGeoIpDatabasesDownload() throws Exception {
ClusterUpdateSettingsResponse settingsResponse = client().admin().cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true))
.get();
assertTrue(settingsResponse.isAcknowledged());
assertBusy(() -> {
PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> task = getTask();
assertNotNull(task);
GeoIpTaskState state = (GeoIpTaskState) task.getState();
assertNotNull(state);
assertEquals(Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"), state.getDatabases().keySet());
}, 2, TimeUnit.MINUTES);

GeoIpTaskState state = (GeoIpTaskState) getTask().getState();
for (String id : List.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb")) {
assertBusy(() -> {
GeoIpTaskState.Metadata metadata = state.get(id);
BoolQueryBuilder queryBuilder = new BoolQueryBuilder()
.filter(new MatchQueryBuilder("name", id))
.filter(new RangeQueryBuilder("chunk")
.from(metadata.getFirstChunk())
.to(metadata.getLastChunk(), true));
int size = metadata.getLastChunk() - metadata.getFirstChunk() + 1;
SearchResponse res = client().prepareSearch(GeoIpDownloader.DATABASES_INDEX)
.setSize(size)
.setQuery(queryBuilder)
.addSort("chunk", SortOrder.ASC)
.get();
TotalHits totalHits = res.getHits().getTotalHits();
assertEquals(TotalHits.Relation.EQUAL_TO, totalHits.relation);
assertEquals(size, totalHits.value);
assertEquals(size, res.getHits().getHits().length);

List<byte[]> data = new ArrayList<>();

for (SearchHit hit : res.getHits().getHits()) {
data.add((byte[]) hit.getSourceAsMap().get("data"));
}

GZIPInputStream stream = new GZIPInputStream(new MultiByteArrayInputStream(data));
Path tempFile = createTempFile();
try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tempFile, TRUNCATE_EXISTING, WRITE, CREATE))) {
stream.transferTo(os);
}

parseDatabase(tempFile);
});
}
}

@SuppressForbidden(reason = "Maxmind API requires java.io.File")
private void parseDatabase(Path tempFile) throws IOException {
try (DatabaseReader databaseReader = new DatabaseReader.Builder(tempFile.toFile()).build()) {
assertNotNull(databaseReader.getMetadata());
}
}

private PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> getTask() {
return PersistentTasksCustomMetadata.getTaskWithId(clusterService().state(), GeoIpDownloader.GEOIP_DOWNLOADER);
}

private static class MultiByteArrayInputStream extends InputStream {

private final Iterator<byte[]> data;
private ByteArrayInputStream current;

private MultiByteArrayInputStream(List<byte[]> data) {
this.data = data.iterator();
}

@Override
public int read() {
if (current == null) {
if (data.hasNext() == false) {
return -1;
}

current = new ByteArrayInputStream(data.next());
}
int read = current.read();
if (read == -1) {
current = null;
return read();
}
return read;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (current == null) {
if (data.hasNext() == false) {
return -1;
}

current = new ByteArrayInputStream(data.next());
}
int read = current.read(b, off, len);
if (read == -1) {
current = null;
return read(b, off, len);
}
return read;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,69 +13,27 @@
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.NodeRoles;
import org.elasticsearch.test.StreamsUtils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.elasticsearch.test.NodeRoles.nonIngestNode;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class GeoIpProcessorNonIngestNodeIT extends ESIntegTestCase {

public static class IngestGeoIpSettingsPlugin extends Plugin {

@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope));
}
}
public class GeoIpProcessorNonIngestNodeIT extends AbstractGeoIpIT {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(IngestGeoIpPlugin.class, IngestGeoIpSettingsPlugin.class);
}

@Override
protected Settings nodeSettings(final int nodeOrdinal) {
final Path databasePath = createTempDir();
try {
Files.createDirectories(databasePath);
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")),
databasePath.resolve("GeoLite2-City.mmdb"));
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")),
databasePath.resolve("GeoLite2-Country.mmdb"));
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")),
databasePath.resolve("GeoLite2-ASN.mmdb"));
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
return Settings.builder()
.put("ingest.geoip.database_path", databasePath)
.put(nonIngestNode())
.put(super.nodeSettings(nodeOrdinal))
.build();
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(nonIngestNode()).build();
}

/**
Expand Down
Loading