From e0d13fefcf31a8a9fede25b91cb3e213a2de31ae Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 23 May 2018 10:37:01 +0200 Subject: [PATCH 1/3] Send client headers from TransportClient This change adds a simple header to the transport client that is present on the servers thread context that ensures we can detect if a transport client talks to the server in a specific request. This change also adds a header for xpack to detect if the client has xpack installed. --- .../client/transport/TransportClient.java | 4 +++- .../elasticsearch/transport/TcpTransport.java | 1 + .../client/transport/TransportClientTests.java | 17 +++++++++++++++++ .../xpack/core/XPackClientPlugin.java | 2 ++ 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java index fd56186fd9d4f..2aeea08d1a575 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -19,6 +19,7 @@ package org.elasticsearch.client.transport; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; @@ -129,7 +130,8 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build(); } final PluginsService pluginsService = newPluginService(providedSettings, plugins); - final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).build(); + final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).put(ThreadContext.PREFIX + + "." + "transport_client", true).build(); final List resourcesToClose = new ArrayList<>(); final ThreadPool threadPool = new ThreadPool(settings); resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS)); diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 033b72d04d985..39c6d5292119b 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -1428,6 +1428,7 @@ public final void messageReceived(BytesReference reference, TcpChannel channel) streamIn = new NamedWriteableAwareStreamInput(streamIn, namedWriteableRegistry); streamIn.setVersion(version); threadPool.getThreadContext().readHeaders(streamIn); + threadPool.getThreadContext().putTransient("_remote_address", remoteAddress); if (TransportStatus.isRequest(status)) { handleRequest(channel, profileName, streamIn, requestId, messageLengthBytes, version, remoteAddress, status); } else { diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java index c97418bae373a..1830698d90c6f 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java +++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESTestCase; @@ -63,6 +64,17 @@ public void testPluginNamedWriteablesRegistered() { } } + public void testDefaultHeaderContainsPlugins() { + Settings baseSettings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) + .build(); + try (TransportClient client = new MockTransportClient(baseSettings, Arrays.asList(MockPlugin.class))) { + ThreadContext threadContext = client.threadPool().getThreadContext(); + assertEquals("true", threadContext.getHeader("transport_client")); + assertEquals("true", threadContext.getHeader("test")); + } + } + public static class MockPlugin extends Plugin { @Override @@ -70,6 +82,11 @@ public List getNamedWriteables() { return Arrays.asList(new Entry[]{ new Entry(MockNamedWriteable.class, MockNamedWriteable.NAME, MockNamedWriteable::new)}); } + @Override + public Settings additionalSettings() { + return Settings.builder().put(ThreadContext.PREFIX + "." + "test", true).build(); + } + public class MockNamedWriteable implements NamedWriteable { static final String NAME = "mockNamedWritable"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 4853588bd3ead..c719cdcbd022e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.license.DeleteLicenseAction; @@ -193,6 +194,7 @@ static Settings additionalSettings(final Settings settings, final boolean enable final Settings.Builder builder = Settings.builder(); builder.put(SecuritySettings.addTransportSettings(settings)); builder.put(SecuritySettings.addUserSettings(settings)); + builder.put(ThreadContext.PREFIX + "." + "has_xpack", true); return builder.build(); } else { return Settings.EMPTY; From 90911afebad9f31df32e5c24cf7d9d913c92f109 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 23 May 2018 11:11:46 +0200 Subject: [PATCH 2/3] fix test --- .../elasticsearch/client/AbstractClientHeadersTestCase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java b/server/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java index 8c1b22f7fb171..f899b5b67986a 100644 --- a/server/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java +++ b/server/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java @@ -139,14 +139,14 @@ public void testOverrideHeader() throws Exception { protected static void assertHeaders(Map headers, Map expected) { assertNotNull(headers); - assertEquals(expected.size(), headers.size()); + assertEquals(expected.size()+1, headers.size()); + assertEquals("true", headers.get("transport_client")); for (Map.Entry expectedEntry : expected.entrySet()) { assertEquals(headers.get(expectedEntry.getKey()), expectedEntry.getValue()); } } protected static void assertHeaders(ThreadPool pool) { - Map headers = new HashMap<>(); Settings asSettings = HEADER_SETTINGS.getAsSettings(ThreadContext.PREFIX); assertHeaders(pool.getThreadContext().getHeaders(), asSettings.keySet().stream().collect(Collectors.toMap(Function.identity(), k -> asSettings.get(k)))); From 69e7dd09eded66690cb752bda9f40e22956248d6 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 23 May 2018 11:47:05 +0200 Subject: [PATCH 3/3] simplify fix --- .../elasticsearch/client/AbstractClientHeadersTestCase.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java b/server/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java index f899b5b67986a..db9f9d83c816a 100644 --- a/server/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java +++ b/server/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java @@ -139,8 +139,9 @@ public void testOverrideHeader() throws Exception { protected static void assertHeaders(Map headers, Map expected) { assertNotNull(headers); - assertEquals(expected.size()+1, headers.size()); - assertEquals("true", headers.get("transport_client")); + headers = new HashMap<>(headers); + headers.remove("transport_client"); // default header on TPC + assertEquals(expected.size(), headers.size()); for (Map.Entry expectedEntry : expected.entrySet()) { assertEquals(headers.get(expectedEntry.getKey()), expectedEntry.getValue()); }