Skip to content

Commit a7d36d5

Browse files
committed
JAVA-1932: Send DRIVER_NAME and DRIVER_VERSION in Startup
1 parent 1a646b7 commit a7d36d5

File tree

9 files changed

+239
-43
lines changed

9 files changed

+239
-43
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/session/SessionBuilder.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.ArrayList;
4242
import java.util.Collection;
4343
import java.util.Collections;
44+
import java.util.HashMap;
4445
import java.util.HashSet;
4546
import java.util.List;
4647
import java.util.Map;
@@ -71,6 +72,7 @@ public abstract class SessionBuilder<SelfT extends SessionBuilder, SessionT> {
7172
private ImmutableMap.Builder<String, Predicate<Node>> nodeFilters = ImmutableMap.builder();
7273
protected CqlIdentifier keyspace;
7374
private ClassLoader classLoader = null;
75+
protected Map<String, String> startupOptions = new HashMap<>();
7476

7577
/**
7678
* Sets the configuration loader to use.
@@ -340,10 +342,17 @@ protected DriverContext buildContext(
340342
schemaChangeListener,
341343
requestTracker,
342344
nodeFilters,
343-
classLoader);
345+
classLoader,
346+
startupOptions);
344347
}
345348

346349
private static <T> T buildIfNull(T value, Supplier<T> builder) {
347350
return (value == null) ? builder.get() : value;
348351
}
352+
353+
@NonNull
354+
public SelfT withStartupOptions(@Nullable Map<String, String> options) {
355+
this.startupOptions = options;
356+
return self;
357+
}
349358
}

core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ String describe() {
142142
Message getRequest() {
143143
switch (step) {
144144
case STARTUP:
145-
return new Startup(context.getCompressor().algorithm());
145+
return new Startup(context.getStartupOptions());
146146
case GET_CLUSTER_NAME:
147147
return CLUSTER_NAME_QUERY;
148148
case SET_KEYSPACE:

core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import com.datastax.oss.driver.internal.core.util.concurrent.LazyReference;
7474
import com.datastax.oss.protocol.internal.Compressor;
7575
import com.datastax.oss.protocol.internal.FrameCodec;
76+
import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap;
7677
import edu.umd.cs.findbugs.annotations.NonNull;
7778
import edu.umd.cs.findbugs.annotations.Nullable;
7879
import io.netty.buffer.ByteBuf;
@@ -192,6 +193,7 @@ public class DefaultDriverContext implements InternalDriverContext {
192193
private final RequestTracker requestTrackerFromBuilder;
193194
private final Map<String, Predicate<Node>> nodeFiltersFromBuilder;
194195
private final ClassLoader classLoader;
196+
private final Map<String, String> startupOptions;
195197

196198
public DefaultDriverContext(
197199
DriverConfigLoader configLoader,
@@ -200,7 +202,8 @@ public DefaultDriverContext(
200202
SchemaChangeListener schemaChangeListener,
201203
RequestTracker requestTracker,
202204
Map<String, Predicate<Node>> nodeFilters,
203-
ClassLoader classLoader) {
205+
ClassLoader classLoader,
206+
Map<String, String> extraStartupOptions) {
204207
this.config = configLoader.getInitialConfig();
205208
this.configLoader = configLoader;
206209
DriverExecutionProfile defaultProfile = config.getDefaultProfile();
@@ -228,6 +231,20 @@ public DefaultDriverContext(
228231
"requestTracker", () -> buildRequestTracker(requestTrackerFromBuilder), cycleDetector);
229232
this.nodeFiltersFromBuilder = nodeFilters;
230233
this.classLoader = classLoader;
234+
this.startupOptions = buildStartupOptions(extraStartupOptions);
235+
}
236+
237+
/**
238+
* Builds a map of options to use when sending a Startup message. The options argument passed in
239+
* will append to, or overwrite, the Internal default options sent by the driver.
240+
*/
241+
protected Map<String, String> buildStartupOptions(Map<String, String> options) {
242+
NullAllowingImmutableMap.Builder<String, String> builder = NullAllowingImmutableMap.builder();
243+
builder.putAll(new InternalStartupOptions(this).getOptions());
244+
if (options != null) {
245+
builder.putAll(options);
246+
}
247+
return builder.build();
231248
}
232249

233250
protected Map<String, LoadBalancingPolicy> buildLoadBalancingPolicies() {
@@ -731,4 +748,10 @@ public CodecRegistry getCodecRegistry() {
731748
public ProtocolVersion getProtocolVersion() {
732749
return getChannelFactory().getProtocolVersion();
733750
}
751+
752+
@NonNull
753+
@Override
754+
public Map<String, String> getStartupOptions() {
755+
return startupOptions;
756+
}
734757
}

core/src/main/java/com/datastax/oss/driver/internal/core/context/InternalDriverContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import edu.umd.cs.findbugs.annotations.NonNull;
4242
import edu.umd.cs.findbugs.annotations.Nullable;
4343
import io.netty.buffer.ByteBuf;
44+
import java.util.Map;
4445
import java.util.Optional;
4546
import java.util.function.Predicate;
4647

@@ -128,4 +129,12 @@ public interface InternalDriverContext extends DriverContext {
128129
*/
129130
@Nullable
130131
ClassLoader getClassLoader();
132+
133+
/**
134+
* Retrieves the map of options to send in a Startup message. The returned map should be used to
135+
* construct a {@link com.datastax.oss.protocol.internal.request.Startup} instance when
136+
* initializing the native protocol handshake.
137+
*/
138+
@NonNull
139+
Map<String, String> getStartupOptions();
131140
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.internal.core.context;
17+
18+
import com.datastax.oss.driver.api.core.MavenCoordinates;
19+
import com.datastax.oss.driver.internal.core.DefaultMavenCoordinates;
20+
import com.datastax.oss.protocol.internal.request.Startup;
21+
import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap;
22+
import java.util.Map;
23+
import net.jcip.annotations.Immutable;
24+
25+
@Immutable
26+
public class InternalStartupOptions {
27+
28+
public static final String DRIVER_NAME_KEY = "DRIVER_NAME";
29+
public static final String DRIVER_VERSION_KEY = "DRIVER_VERSION";
30+
31+
private static final MavenCoordinates MAVEN_COORDINATES =
32+
DefaultMavenCoordinates.buildFromResource(
33+
InternalStartupOptions.class.getResource("/com/datastax/oss/driver/Driver.properties"));
34+
35+
private final Map<String, String> options;
36+
37+
public InternalStartupOptions(InternalDriverContext context) {
38+
this.options = buildOptions(context);
39+
}
40+
41+
/**
42+
* Builds a map of options to send in a Startup message. The default set of options are built here
43+
* and include {@link com.datastax.oss.protocol.internal.request.Startup#COMPRESSION_KEY} (if the
44+
* context passed in has a compressor/algorithm set), and the driver's {@link #DRIVER_NAME_KEY}
45+
* and {@link #DRIVER_VERSION_KEY}. The {@link com.datastax.oss.protocol.internal.request.Startup}
46+
* constructor will add {@link com.datastax.oss.protocol.internal.request.Startup#COMPRESSION_KEY}
47+
* to the options if it is not present when an instance of Startup is constructed.
48+
*
49+
* <p>NOTE: the InternalDriverContext implementation may choose to override any values setup by
50+
* default here.
51+
*
52+
* @param context InternalDriverContext implementation with an optional compression algorithm.
53+
* @return Map of Startup Options.
54+
*/
55+
private Map<String, String> buildOptions(InternalDriverContext context) {
56+
NullAllowingImmutableMap.Builder<String, String> builder = NullAllowingImmutableMap.builder(3);
57+
String compressionAlgorithm = context.getCompressor().algorithm();
58+
if (compressionAlgorithm != null && !compressionAlgorithm.trim().isEmpty()) {
59+
builder.put(Startup.COMPRESSION_KEY, compressionAlgorithm.trim());
60+
}
61+
return builder
62+
.put(DRIVER_NAME_KEY, MAVEN_COORDINATES.getName())
63+
.put(DRIVER_VERSION_KEY, MAVEN_COORDINATES.getVersion().toString())
64+
.build();
65+
}
66+
67+
public Map<String, String> getOptions() {
68+
return options;
69+
}
70+
}

core/src/test/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandlerTest.java

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import com.datastax.oss.driver.internal.core.TestResponses;
3131
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
3232
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
33-
import com.datastax.oss.protocol.internal.Compressor;
3433
import com.datastax.oss.protocol.internal.Frame;
3534
import com.datastax.oss.protocol.internal.ProtocolConstants;
3635
import com.datastax.oss.protocol.internal.request.AuthResponse;
@@ -44,7 +43,6 @@
4443
import com.datastax.oss.protocol.internal.response.Ready;
4544
import com.datastax.oss.protocol.internal.response.result.SetKeyspace;
4645
import com.datastax.oss.protocol.internal.util.Bytes;
47-
import io.netty.buffer.ByteBuf;
4846
import io.netty.channel.ChannelFuture;
4947
import java.net.InetSocketAddress;
5048
import java.time.Duration;
@@ -64,7 +62,6 @@ public class ProtocolInitHandlerTest extends ChannelHandlerTestBase {
6462
@Mock private InternalDriverContext internalDriverContext;
6563
@Mock private DriverConfig driverConfig;
6664
@Mock private DriverExecutionProfile defaultProfile;
67-
@Mock private Compressor<ByteBuf> compressor;
6865

6966
private ProtocolVersionRegistry protocolVersionRegistry =
7067
new CassandraProtocolVersionRegistry("test");
@@ -83,8 +80,6 @@ public void setup() {
8380
.thenReturn(Duration.ofSeconds(30));
8481
Mockito.when(internalDriverContext.getProtocolVersionRegistry())
8582
.thenReturn(protocolVersionRegistry);
86-
Mockito.when(internalDriverContext.getCompressor()).thenReturn(compressor);
87-
Mockito.when(compressor.algorithm()).thenReturn(null);
8883

8984
channel
9085
.pipeline()
@@ -120,8 +115,6 @@ public void should_initialize() {
120115
// It should send a STARTUP message
121116
Frame requestFrame = readOutboundFrame();
122117
assertThat(requestFrame.message).isInstanceOf(Startup.class);
123-
Startup startup = (Startup) requestFrame.message;
124-
assertThat(startup.options).doesNotContainKey("COMPRESSION");
125118
assertThat(connectFuture).isNotDone();
126119

127120
// Simulate a READY response
@@ -136,34 +129,6 @@ public void should_initialize() {
136129
assertThat(connectFuture).isSuccess();
137130
}
138131

139-
@Test
140-
public void should_initialize_with_compression() {
141-
Mockito.when(compressor.algorithm()).thenReturn("lz4");
142-
channel
143-
.pipeline()
144-
.addLast(
145-
"init",
146-
new ProtocolInitHandler(
147-
internalDriverContext,
148-
DefaultProtocolVersion.V4,
149-
null,
150-
DriverChannelOptions.DEFAULT,
151-
heartbeatHandler));
152-
153-
ChannelFuture connectFuture = channel.connect(new InetSocketAddress("localhost", 9042));
154-
155-
Frame requestFrame = readOutboundFrame();
156-
assertThat(requestFrame.message).isInstanceOf(Startup.class);
157-
Startup startup = (Startup) requestFrame.message;
158-
159-
// STARTUP message should request compression
160-
assertThat(startup.options).containsEntry("COMPRESSION", "lz4");
161-
162-
writeInboundFrame(buildInboundFrame(requestFrame, new Ready()));
163-
writeInboundFrame(readOutboundFrame(), TestResponses.clusterNameResponse("someClusterName"));
164-
assertThat(connectFuture).isSuccess();
165-
}
166-
167132
@Test
168133
public void should_add_heartbeat_handler_to_pipeline_on_success() {
169134
ProtocolInitHandler protocolInitHandler =
@@ -184,8 +149,6 @@ public void should_add_heartbeat_handler_to_pipeline_on_success() {
184149
// It should send a STARTUP message
185150
Frame requestFrame = readOutboundFrame();
186151
assertThat(requestFrame.message).isInstanceOf(Startup.class);
187-
Startup startup = (Startup) requestFrame.message;
188-
assertThat(startup.options).doesNotContainKey("COMPRESSION");
189152
assertThat(connectFuture).isNotDone();
190153

191154
// Simulate a READY response
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.internal.core.context;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import com.datastax.oss.driver.api.core.MavenCoordinates;
21+
import com.datastax.oss.driver.api.core.Version;
22+
import com.datastax.oss.driver.api.core.config.DriverConfig;
23+
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
24+
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
25+
import com.datastax.oss.driver.api.core.metadata.Node;
26+
import com.datastax.oss.driver.api.core.metadata.NodeStateListener;
27+
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener;
28+
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
29+
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
30+
import com.datastax.oss.driver.internal.core.DefaultMavenCoordinates;
31+
import com.datastax.oss.driver.shaded.guava.common.collect.Lists;
32+
import com.datastax.oss.driver.shaded.guava.common.collect.Maps;
33+
import com.datastax.oss.protocol.internal.request.Startup;
34+
import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.function.Predicate;
38+
import org.junit.Before;
39+
import org.junit.Test;
40+
import org.mockito.Mock;
41+
import org.mockito.Mockito;
42+
import org.mockito.MockitoAnnotations;
43+
44+
public class DefaultDriverContextTest {
45+
46+
private final MavenCoordinates driverProperties =
47+
DefaultMavenCoordinates.buildFromResource(
48+
DefaultDriverContextTest.class.getResource("/com/datastax/oss/driver/Driver.properties"));
49+
50+
private DefaultDriverContext defaultDriverContext;
51+
52+
// Mocks for instantiating the default driver context
53+
@Mock private DriverConfigLoader configLoader;
54+
List<TypeCodec<?>> typeCodecs = Lists.newArrayList();
55+
@Mock private NodeStateListener nodeStateListener;
56+
@Mock private SchemaChangeListener schemaChangeListener;
57+
@Mock private RequestTracker requestTracker;
58+
Map<String, Predicate<Node>> nodeFilters = Maps.newHashMap();
59+
@Mock private ClassLoader classLoader;
60+
@Mock private DriverConfig driverConfig;
61+
@Mock private DriverExecutionProfile defaultProfile;
62+
63+
@Before
64+
public void before() {
65+
MockitoAnnotations.initMocks(this);
66+
Mockito.when(configLoader.getInitialConfig()).thenReturn(driverConfig);
67+
Mockito.when(driverConfig.getDefaultProfile()).thenReturn(defaultProfile);
68+
defaultDriverContext =
69+
new DefaultDriverContext(
70+
configLoader,
71+
typeCodecs,
72+
nodeStateListener,
73+
schemaChangeListener,
74+
requestTracker,
75+
nodeFilters,
76+
classLoader,
77+
null);
78+
}
79+
80+
private void assertDefaultStartupOptions(Startup startup) {
81+
assertThat(startup.getOptions()).containsEntry(Startup.CQL_VERSION_KEY, "3.0.0");
82+
assertThat(startup.getOptions())
83+
.containsEntry(InternalStartupOptions.DRIVER_NAME_KEY, driverProperties.getName());
84+
assertThat(startup.getOptions()).containsKey(InternalStartupOptions.DRIVER_VERSION_KEY);
85+
Version version =
86+
Version.parse(startup.getOptions().get(InternalStartupOptions.DRIVER_VERSION_KEY));
87+
// ensure it's a 4.x version, even if a pre-release
88+
assertThat(version).isEqualByComparingTo(driverProperties.getVersion());
89+
}
90+
91+
@Test
92+
public void test_default_startup_options() {
93+
Startup startup = new Startup(defaultDriverContext.buildStartupOptions(null));
94+
assertThat(startup.getOptions()).doesNotContainKey(Startup.COMPRESSION_KEY);
95+
assertDefaultStartupOptions(startup);
96+
}
97+
98+
@Test
99+
public void test_custom_startup_options() {
100+
Map<String, String> customOptions =
101+
NullAllowingImmutableMap.of("Custom_Key1", "Custom_Value1", "Custom_Key2", "Custom_Value2");
102+
Startup startup = new Startup(defaultDriverContext.buildStartupOptions(customOptions));
103+
// assert the custom options are present
104+
assertThat(startup.getOptions()).containsEntry("Custom_Key1", "Custom_Value1");
105+
assertThat(startup.getOptions()).containsEntry("Custom_Key2", "Custom_Value2");
106+
assertThat(startup.getOptions()).doesNotContainKey(Startup.COMPRESSION_KEY);
107+
assertDefaultStartupOptions(startup);
108+
}
109+
110+
@Test
111+
public void test_compression_startup_options() {
112+
Map<String, String> compressionOptions =
113+
NullAllowingImmutableMap.of(Startup.COMPRESSION_KEY, "lz4");
114+
Startup startup = new Startup(defaultDriverContext.buildStartupOptions(compressionOptions));
115+
// assert the custom options are present
116+
assertThat(startup.getOptions()).containsEntry(Startup.COMPRESSION_KEY, "lz4");
117+
assertDefaultStartupOptions(startup);
118+
}
119+
}

0 commit comments

Comments
 (0)