Skip to content

Commit e1fbfaf

Browse files
jasontedorywelsch
andcommitted
Introduce client feature tracking
This commit introduces the ability for a client to communicate to the server features that it can support and for these features to be used in influencing the decisions that the server makes when communicating with the client. To this end we carry the features from the client to the underlying stream as we carry the version of the client today. This enables us to enhance the logic where we make protocol decisions on the basis of the version on the stream to also make protocol decisions on the basis of the features on the stream. With such functionality, the client can communicate to the server if it is a transport client, or if it has, for example, X-Pack installed. This enables us to support rolling upgrades from the OSS distribution to the default distribution without breaking client connectivity as we can now elect to serialize customs in the cluster state depending on whether or not the client reports to us using the feature capabilities that it can under these customs. This means that we would avoid sending a client pieces of the cluster state that it can not understand. However, we want to take care and always send the full cluster state during node-to-node communication as otherwise we would end up with different understanding of what is in the cluster state across nodes depending on which features they reported to have. This is why when deciding whether or not to write out a custom we always send the custom if the client is not a transport client and otherwise do not send the custom if the client is transport client that does not report to have the feature required by the custom. Co-authored-by: Yannick Welsch <[email protected]>
1 parent 7c74318 commit e1fbfaf

File tree

24 files changed

+879
-68
lines changed

24 files changed

+879
-68
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ else if (readableBytes >= TcpHeader.HEADER_SIZE) {
104104
try (ThreadContext context = new ThreadContext(Settings.EMPTY)) {
105105
context.readHeaders(in);
106106
}
107+
// now we decode the features
108+
in.readStringArray();
107109
// now we can decode the action name
108110
sb.append(", action: ").append(in.readString());
109111
}

server/src/main/java/org/elasticsearch/client/transport/TransportClient.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ public abstract class TransportClient extends AbstractClient {
9898
public static final Setting<Boolean> CLIENT_TRANSPORT_SNIFF =
9999
Setting.boolSetting("client.transport.sniff", false, Setting.Property.NodeScope);
100100

101+
public static final String TRANSPORT_CLIENT_FEATURE = "transport_client";
102+
101103
private static PluginsService newPluginService(final Settings settings, Collection<Class<? extends Plugin>> plugins) {
102104
final Settings.Builder settingsBuilder = Settings.builder()
103105
.put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
@@ -130,8 +132,12 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
130132
providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
131133
}
132134
final PluginsService pluginsService = newPluginService(providedSettings, plugins);
133-
final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).put(ThreadContext.PREFIX
134-
+ "." + "transport_client", true).build();
135+
final Settings settings =
136+
Settings.builder()
137+
.put(defaultSettings)
138+
.put(pluginsService.updatedSettings())
139+
.put(TcpTransport.FEATURE_PREFIX + "." + TRANSPORT_CLIENT_FEATURE, true)
140+
.build();
135141
final List<Closeable> resourcesToClose = new ArrayList<>();
136142
final ThreadPool threadPool = new ThreadPool(settings);
137143
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));

server/src/main/java/org/elasticsearch/cluster/ClusterState.java

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.carrotsearch.hppc.cursors.ObjectCursor;
2424
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
2525

26+
import org.elasticsearch.client.transport.TransportClient;
2627
import org.elasticsearch.cluster.block.ClusterBlock;
2728
import org.elasticsearch.cluster.block.ClusterBlocks;
2829
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -61,6 +62,7 @@
6162
import java.util.HashMap;
6263
import java.util.Locale;
6364
import java.util.Map;
65+
import java.util.Optional;
6466
import java.util.Set;
6567

6668
/**
@@ -90,7 +92,51 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
9092

9193
public static final ClusterState EMPTY_STATE = builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();
9294

93-
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
95+
/**
96+
* An interface that implementors use when a class requires a client to maybe have a feature.
97+
*/
98+
public interface FeatureAware {
99+
100+
/**
101+
* An optional feature that is required for the client to have.
102+
*
103+
* @return an empty optional if no feature is required otherwise a string representing the required feature
104+
*/
105+
default Optional<String> getRequiredFeature() {
106+
return Optional.empty();
107+
}
108+
109+
/**
110+
* Tests whether or not the custom should be serialized. The criteria are:
111+
* <ul>
112+
* <li>the output stream must be at least the minimum supported version of the custom</li>
113+
* <li>the output stream must have the feature required by the custom (if any) or not be a transport client</li>
114+
* </ul>
115+
* <p>
116+
* That is, we only serialize customs to clients than can understand the custom based on the version of the client and the features
117+
* that the client has. For transport clients we can be lenient in requiring a feature in which case we do not send the custom but
118+
* for connected nodes we always require that the node has the required feature.
119+
*
120+
* @param out the output stream
121+
* @param custom the custom to serialize
122+
* @param <T> the type of the custom
123+
* @return true if the custom should be serialized and false otherwise
124+
*/
125+
static <T extends NamedDiffable & FeatureAware> boolean shouldSerializeCustom(final StreamOutput out, final T custom) {
126+
if (out.getVersion().before(custom.getMinimalSupportedVersion())) {
127+
return false;
128+
}
129+
if (custom.getRequiredFeature().isPresent()) {
130+
final String requiredFeature = custom.getRequiredFeature().get();
131+
// if it is a transport client we are lenient yet for a connected node it must have the required feature
132+
return out.hasFeature(requiredFeature) || out.hasFeature(TransportClient.TRANSPORT_CLIENT_FEATURE) == false;
133+
}
134+
return true;
135+
}
136+
137+
}
138+
139+
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, FeatureAware {
94140

95141
/**
96142
* Returns <code>true</code> iff this {@link Custom} is private to the cluster and should never be send to a client.
@@ -99,6 +145,7 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
99145
default boolean isPrivate() {
100146
return false;
101147
}
148+
102149
}
103150

104151
private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
@@ -244,6 +291,15 @@ public String toString() {
244291
sb.append("isa_ids ").append(indexMetaData.inSyncAllocationIds(shard)).append("\n");
245292
}
246293
}
294+
if (metaData.customs().isEmpty() == false) {
295+
sb.append("metadata customs:\n");
296+
for (final ObjectObjectCursor<String, MetaData.Custom> cursor : metaData.customs()) {
297+
final String type = cursor.key;
298+
final MetaData.Custom custom = cursor.value;
299+
sb.append(TAB).append(type).append(": ").append(custom);
300+
}
301+
sb.append("\n");
302+
}
247303
sb.append(blocks());
248304
sb.append(nodes());
249305
sb.append(routingTable());
@@ -691,14 +747,14 @@ public void writeTo(StreamOutput out) throws IOException {
691747
blocks.writeTo(out);
692748
// filter out custom states not supported by the other node
693749
int numberOfCustoms = 0;
694-
for (ObjectCursor<Custom> cursor : customs.values()) {
695-
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
750+
for (final ObjectCursor<Custom> cursor : customs.values()) {
751+
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
696752
numberOfCustoms++;
697753
}
698754
}
699755
out.writeVInt(numberOfCustoms);
700-
for (ObjectCursor<Custom> cursor : customs.values()) {
701-
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
756+
for (final ObjectCursor<Custom> cursor : customs.values()) {
757+
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
702758
out.writeNamedWriteable(cursor.value);
703759
}
704760
}

server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
2525
import org.apache.logging.log4j.Logger;
2626
import org.apache.lucene.util.CollectionUtil;
27+
import org.elasticsearch.cluster.ClusterState;
28+
import org.elasticsearch.cluster.ClusterState.FeatureAware;
2729
import org.elasticsearch.cluster.Diff;
2830
import org.elasticsearch.cluster.Diffable;
2931
import org.elasticsearch.cluster.DiffableUtils;
@@ -117,9 +119,10 @@ public enum XContentContext {
117119
*/
118120
public static EnumSet<XContentContext> ALL_CONTEXTS = EnumSet.allOf(XContentContext.class);
119121

120-
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
122+
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, ClusterState.FeatureAware {
121123

122124
EnumSet<XContentContext> context();
125+
123126
}
124127

125128
public static final Setting<Boolean> SETTING_READ_ONLY_SETTING =
@@ -782,14 +785,14 @@ public void writeTo(StreamOutput out) throws IOException {
782785
}
783786
// filter out custom states not supported by the other node
784787
int numberOfCustoms = 0;
785-
for (ObjectCursor<Custom> cursor : customs.values()) {
786-
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
788+
for (final ObjectCursor<Custom> cursor : customs.values()) {
789+
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
787790
numberOfCustoms++;
788791
}
789792
}
790793
out.writeVInt(numberOfCustoms);
791-
for (ObjectCursor<Custom> cursor : customs.values()) {
792-
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
794+
for (final ObjectCursor<Custom> cursor : customs.values()) {
795+
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
793796
out.writeNamedWriteable(cursor.value);
794797
}
795798
}

server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,12 @@
5858
import java.util.EnumMap;
5959
import java.util.EnumSet;
6060
import java.util.HashMap;
61+
import java.util.HashSet;
6162
import java.util.Iterator;
6263
import java.util.LinkedHashMap;
6364
import java.util.List;
6465
import java.util.Map;
66+
import java.util.Set;
6567
import java.util.concurrent.TimeUnit;
6668
import java.util.function.IntFunction;
6769

@@ -98,6 +100,7 @@ public abstract class StreamOutput extends OutputStream {
98100
}
99101

100102
private Version version = Version.CURRENT;
103+
private Set<String> features = Collections.emptySet();
101104

102105
/**
103106
* The version of the node on the other side of this stream.
@@ -113,6 +116,14 @@ public void setVersion(Version version) {
113116
this.version = version;
114117
}
115118

119+
public boolean hasFeature(final String feature) {
120+
return this.features.contains(feature);
121+
}
122+
123+
public void setFeatures(final Set<String> features) {
124+
this.features = Collections.unmodifiableSet(new HashSet<>(features));
125+
}
126+
116127
public long position() throws IOException {
117128
throw new UnsupportedOperationException();
118129
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,7 @@ public void apply(Settings value, Settings current, Settings previous) {
379379
ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING,
380380
EsExecutors.PROCESSORS_SETTING,
381381
ThreadContext.DEFAULT_HEADERS_SETTING,
382+
TcpTransport.DEFAULT_FEATURES_SETTING,
382383
Loggers.LOG_DEFAULT_LEVEL_SETTING,
383384
Loggers.LOG_LEVEL_SETTING,
384385
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING,

server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.List;
5050
import java.util.Map;
5151
import java.util.Objects;
52+
import java.util.Optional;
5253
import java.util.Set;
5354
import java.util.function.Predicate;
5455
import java.util.stream.Collectors;
@@ -189,7 +190,7 @@ public long getNumberOfTasksOnNode(String nodeId, String taskName) {
189190

190191
@Override
191192
public Version getMinimalSupportedVersion() {
192-
return Version.V_5_4_0;
193+
return Version.V_6_3_0;
193194
}
194195

195196
@Override

server/src/main/java/org/elasticsearch/plugins/Plugin.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.Collections;
5757
import java.util.List;
5858
import java.util.Map;
59+
import java.util.Optional;
5960
import java.util.function.UnaryOperator;
6061

6162
/**
@@ -79,6 +80,17 @@
7980
*/
8081
public abstract class Plugin implements Closeable {
8182

83+
/**
84+
* A feature exposed by the plugin. This should be used if a plugin exposes {@link org.elasticsearch.cluster.ClusterState.Custom} or
85+
* {@link MetaData.Custom}; see also {@link org.elasticsearch.cluster.ClusterState.FeatureAware}.
86+
*
87+
* @return a feature set represented by this plugin, or the empty optional if the plugin does not expose cluster state or metadata
88+
* customs
89+
*/
90+
protected Optional<String> getFeature() {
91+
return Optional.empty();
92+
}
93+
8294
/**
8395
* Node level guice modules.
8496
*/

server/src/main/java/org/elasticsearch/plugins/PluginsService.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@
4141
import org.elasticsearch.common.settings.Setting;
4242
import org.elasticsearch.common.settings.Setting.Property;
4343
import org.elasticsearch.common.settings.Settings;
44+
import org.elasticsearch.common.util.concurrent.ThreadContext;
4445
import org.elasticsearch.index.IndexModule;
4546
import org.elasticsearch.threadpool.ExecutorBuilder;
47+
import org.elasticsearch.transport.TcpTransport;
4648

4749
import java.io.IOException;
4850
import java.lang.reflect.Constructor;
@@ -57,16 +59,17 @@
5759
import java.util.HashMap;
5860
import java.util.HashSet;
5961
import java.util.Iterator;
60-
import java.util.LinkedHashMap;
6162
import java.util.LinkedHashSet;
6263
import java.util.List;
6364
import java.util.Locale;
6465
import java.util.Map;
6566
import java.util.Objects;
67+
import java.util.Optional;
6668
import java.util.Set;
69+
import java.util.TreeMap;
70+
import java.util.TreeSet;
6771
import java.util.function.Function;
6872
import java.util.stream.Collectors;
69-
import java.util.stream.Stream;
7073

7174
import static org.elasticsearch.common.io.FileSystemUtils.isAccessibleDirectory;
7275

@@ -196,6 +199,7 @@ private static void logPluginInfo(final List<PluginInfo> pluginInfos, final Stri
196199

197200
public Settings updatedSettings() {
198201
Map<String, String> foundSettings = new HashMap<>();
202+
final Map<String, String> features = new TreeMap<>();
199203
final Settings.Builder builder = Settings.builder();
200204
for (Tuple<PluginInfo, Plugin> plugin : plugins) {
201205
Settings settings = plugin.v2().additionalSettings();
@@ -207,6 +211,23 @@ public Settings updatedSettings() {
207211
}
208212
}
209213
builder.put(settings);
214+
final Optional<String> maybeFeature = plugin.v2().getFeature();
215+
if (maybeFeature.isPresent()) {
216+
final String feature = maybeFeature.get();
217+
if (features.containsKey(feature)) {
218+
final String message = String.format(
219+
Locale.ROOT,
220+
"duplicate feature [%s] in plugin [%s], already added in [%s]",
221+
feature,
222+
plugin.v1().getName(),
223+
features.get(feature));
224+
throw new IllegalArgumentException(message);
225+
}
226+
features.put(feature, plugin.v1().getName());
227+
}
228+
}
229+
for (final String feature : features.keySet()) {
230+
builder.put(TcpTransport.FEATURE_PREFIX + "." + feature, true);
210231
}
211232
return builder.put(this.settings).build();
212233
}

0 commit comments

Comments
 (0)