Skip to content

Commit 791656a

Browse files
emerkle826olim7t
authored andcommitted
JAVA-1932: Send DRIVER_NAME and DRIVER_VERSION in Startup message
1 parent 575992a commit 791656a

File tree

7 files changed

+215
-38
lines changed

7 files changed

+215
-38
lines changed

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### 4.0.0-beta2 (in progress)
66

7+
- [new feature] JAVA-1932: Send Driver Name and Version in Startup message
78
- [new feature] JAVA-1917: Add ability to set node on statement
89
- [improvement] JAVA-1916: Base TimestampCodec.parse on java.util.Date.
910
- [improvement] JAVA-1940: Clean up test resources when CCM integration tests finish

core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ String describe() {
142142
Message getRequest() {
143143
switch (step) {
144144
case STARTUP:
145-
return new Startup(context.getCompressor().algorithm());
145+
return new Startup(context.getStartupOptions());
146146
case GET_CLUSTER_NAME:
147147
return CLUSTER_NAME_QUERY;
148148
case SET_KEYSPACE:

core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,8 @@ public class DefaultDriverContext implements InternalDriverContext {
178178
new LazyReference<>("metricsFactory", this::buildMetricsFactory, cycleDetector);
179179
private final LazyReference<RequestThrottler> requestThrottlerRef =
180180
new LazyReference<>("requestThrottler", this::buildRequestThrottler, cycleDetector);
181+
private final LazyReference<Map<String, String>> startupOptionsRef =
182+
new LazyReference<>("startupOptions", this::buildStartupOptions, cycleDetector);
181183
private final LazyReference<NodeStateListener> nodeStateListenerRef;
182184
private final LazyReference<SchemaChangeListener> schemaChangeListenerRef;
183185
private final LazyReference<RequestTracker> requestTrackerRef;
@@ -230,6 +232,15 @@ public DefaultDriverContext(
230232
this.classLoader = classLoader;
231233
}
232234

235+
/**
236+
* Builds a map of options to send in a Startup message.
237+
*
238+
* @see #getStartupOptions()
239+
*/
240+
protected Map<String, String> buildStartupOptions() {
241+
return new StartupOptionsBuilder(this).build();
242+
}
243+
233244
protected Map<String, LoadBalancingPolicy> buildLoadBalancingPolicies() {
234245
return Reflection.buildFromConfigProfiles(
235246
this,
@@ -731,4 +742,10 @@ public CodecRegistry getCodecRegistry() {
731742
public ProtocolVersion getProtocolVersion() {
732743
return getChannelFactory().getProtocolVersion();
733744
}
745+
746+
@NonNull
747+
@Override
748+
public Map<String, String> getStartupOptions() {
749+
return startupOptionsRef.get();
750+
}
734751
}

core/src/main/java/com/datastax/oss/driver/internal/core/context/InternalDriverContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import edu.umd.cs.findbugs.annotations.NonNull;
4242
import edu.umd.cs.findbugs.annotations.Nullable;
4343
import io.netty.buffer.ByteBuf;
44+
import java.util.Map;
4445
import java.util.Optional;
4546
import java.util.function.Predicate;
4647

@@ -128,4 +129,12 @@ public interface InternalDriverContext extends DriverContext {
128129
*/
129130
@Nullable
130131
ClassLoader getClassLoader();
132+
133+
/**
134+
* Retrieves the map of options to send in a Startup message. The returned map will be used to
135+
* construct a {@link com.datastax.oss.protocol.internal.request.Startup} instance when
136+
* initializing the native protocol handshake.
137+
*/
138+
@NonNull
139+
Map<String, String> getStartupOptions();
131140
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.internal.core.context;
17+
18+
import com.datastax.oss.driver.api.core.session.Session;
19+
import com.datastax.oss.protocol.internal.request.Startup;
20+
import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap;
21+
import java.util.Map;
22+
import net.jcip.annotations.Immutable;
23+
24+
@Immutable
25+
public class StartupOptionsBuilder {
26+
27+
public static final String DRIVER_NAME_KEY = "DRIVER_NAME";
28+
public static final String DRIVER_VERSION_KEY = "DRIVER_VERSION";
29+
30+
protected final InternalDriverContext context;
31+
32+
public StartupOptionsBuilder(InternalDriverContext context) {
33+
this.context = context;
34+
}
35+
36+
/**
37+
* Builds a map of options to send in a Startup message.
38+
*
39+
* <p>The default set of options are built here and include {@link
40+
* com.datastax.oss.protocol.internal.request.Startup#COMPRESSION_KEY} (if the context passed in
41+
* has a compressor/algorithm set), and the driver's {@link #DRIVER_NAME_KEY} and {@link
42+
* #DRIVER_VERSION_KEY}. The {@link com.datastax.oss.protocol.internal.request.Startup}
43+
* constructor will add {@link
44+
* com.datastax.oss.protocol.internal.request.Startup#CQL_VERSION_KEY}.
45+
*
46+
* @return Map of Startup Options.
47+
*/
48+
public Map<String, String> build() {
49+
NullAllowingImmutableMap.Builder<String, String> builder = NullAllowingImmutableMap.builder(3);
50+
// add compression (if configured) and driver name and version
51+
String compressionAlgorithm = context.getCompressor().algorithm();
52+
if (compressionAlgorithm != null && !compressionAlgorithm.trim().isEmpty()) {
53+
builder.put(Startup.COMPRESSION_KEY, compressionAlgorithm.trim());
54+
}
55+
return builder
56+
.put(DRIVER_NAME_KEY, getDriverName())
57+
.put(DRIVER_VERSION_KEY, getDriverVersion())
58+
.build();
59+
}
60+
61+
/**
62+
* Returns this driver's name.
63+
*
64+
* <p>By default, this method will pull from the bundled Driver.properties file. Subclasses should
65+
* override this method if they need to report a different Driver name on Startup.
66+
*/
67+
protected String getDriverName() {
68+
return Session.OSS_DRIVER_COORDINATES.getName();
69+
}
70+
71+
/**
72+
* Returns this driver's version.
73+
*
74+
* <p>By default, this method will pull from the bundled Driver.properties file. Subclasses should
75+
* override this method if they need to report a different Driver version on Startup.
76+
*/
77+
protected String getDriverVersion() {
78+
return Session.OSS_DRIVER_COORDINATES.getVersion().toString();
79+
}
80+
}

core/src/test/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandlerTest.java

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import com.datastax.oss.driver.internal.core.TestResponses;
3131
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
3232
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
33-
import com.datastax.oss.protocol.internal.Compressor;
3433
import com.datastax.oss.protocol.internal.Frame;
3534
import com.datastax.oss.protocol.internal.ProtocolConstants;
3635
import com.datastax.oss.protocol.internal.request.AuthResponse;
@@ -44,7 +43,6 @@
4443
import com.datastax.oss.protocol.internal.response.Ready;
4544
import com.datastax.oss.protocol.internal.response.result.SetKeyspace;
4645
import com.datastax.oss.protocol.internal.util.Bytes;
47-
import io.netty.buffer.ByteBuf;
4846
import io.netty.channel.ChannelFuture;
4947
import java.net.InetSocketAddress;
5048
import java.time.Duration;
@@ -64,7 +62,6 @@ public class ProtocolInitHandlerTest extends ChannelHandlerTestBase {
6462
@Mock private InternalDriverContext internalDriverContext;
6563
@Mock private DriverConfig driverConfig;
6664
@Mock private DriverExecutionProfile defaultProfile;
67-
@Mock private Compressor<ByteBuf> compressor;
6865

6966
private ProtocolVersionRegistry protocolVersionRegistry =
7067
new CassandraProtocolVersionRegistry("test");
@@ -83,8 +80,6 @@ public void setup() {
8380
.thenReturn(Duration.ofSeconds(30));
8481
Mockito.when(internalDriverContext.getProtocolVersionRegistry())
8582
.thenReturn(protocolVersionRegistry);
86-
Mockito.when(internalDriverContext.getCompressor()).thenReturn(compressor);
87-
Mockito.when(compressor.algorithm()).thenReturn(null);
8883

8984
channel
9085
.pipeline()
@@ -120,8 +115,6 @@ public void should_initialize() {
120115
// It should send a STARTUP message
121116
Frame requestFrame = readOutboundFrame();
122117
assertThat(requestFrame.message).isInstanceOf(Startup.class);
123-
Startup startup = (Startup) requestFrame.message;
124-
assertThat(startup.options).doesNotContainKey("COMPRESSION");
125118
assertThat(connectFuture).isNotDone();
126119

127120
// Simulate a READY response
@@ -136,34 +129,6 @@ public void should_initialize() {
136129
assertThat(connectFuture).isSuccess();
137130
}
138131

139-
@Test
140-
public void should_initialize_with_compression() {
141-
Mockito.when(compressor.algorithm()).thenReturn("lz4");
142-
channel
143-
.pipeline()
144-
.addLast(
145-
"init",
146-
new ProtocolInitHandler(
147-
internalDriverContext,
148-
DefaultProtocolVersion.V4,
149-
null,
150-
DriverChannelOptions.DEFAULT,
151-
heartbeatHandler));
152-
153-
ChannelFuture connectFuture = channel.connect(new InetSocketAddress("localhost", 9042));
154-
155-
Frame requestFrame = readOutboundFrame();
156-
assertThat(requestFrame.message).isInstanceOf(Startup.class);
157-
Startup startup = (Startup) requestFrame.message;
158-
159-
// STARTUP message should request compression
160-
assertThat(startup.options).containsEntry("COMPRESSION", "lz4");
161-
162-
writeInboundFrame(buildInboundFrame(requestFrame, new Ready()));
163-
writeInboundFrame(readOutboundFrame(), TestResponses.clusterNameResponse("someClusterName"));
164-
assertThat(connectFuture).isSuccess();
165-
}
166-
167132
@Test
168133
public void should_add_heartbeat_handler_to_pipeline_on_success() {
169134
ProtocolInitHandler protocolInitHandler =
@@ -184,8 +149,6 @@ public void should_add_heartbeat_handler_to_pipeline_on_success() {
184149
// It should send a STARTUP message
185150
Frame requestFrame = readOutboundFrame();
186151
assertThat(requestFrame.message).isInstanceOf(Startup.class);
187-
Startup startup = (Startup) requestFrame.message;
188-
assertThat(startup.options).doesNotContainKey("COMPRESSION");
189152
assertThat(connectFuture).isNotDone();
190153

191154
// Simulate a READY response
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.internal.core.context;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import com.datastax.oss.driver.api.core.Version;
21+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
22+
import com.datastax.oss.driver.api.core.config.DriverConfig;
23+
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
24+
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
25+
import com.datastax.oss.driver.api.core.metadata.Node;
26+
import com.datastax.oss.driver.api.core.metadata.NodeStateListener;
27+
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener;
28+
import com.datastax.oss.driver.api.core.session.Session;
29+
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
30+
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
31+
import com.datastax.oss.driver.shaded.guava.common.collect.Lists;
32+
import com.datastax.oss.driver.shaded.guava.common.collect.Maps;
33+
import com.datastax.oss.protocol.internal.request.Startup;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.function.Predicate;
37+
import org.junit.Before;
38+
import org.junit.Test;
39+
import org.mockito.Mock;
40+
import org.mockito.Mockito;
41+
import org.mockito.MockitoAnnotations;
42+
43+
public class StartupOptionsBuilderTest {
44+
45+
private DefaultDriverContext defaultDriverContext;
46+
47+
// Mocks for instantiating the default driver context
48+
@Mock private DriverConfigLoader configLoader;
49+
private List<TypeCodec<?>> typeCodecs = Lists.newArrayList();
50+
@Mock private NodeStateListener nodeStateListener;
51+
@Mock private SchemaChangeListener schemaChangeListener;
52+
@Mock private RequestTracker requestTracker;
53+
private Map<String, Predicate<Node>> nodeFilters = Maps.newHashMap();
54+
@Mock private ClassLoader classLoader;
55+
@Mock private DriverConfig driverConfig;
56+
@Mock private DriverExecutionProfile defaultProfile;
57+
58+
@Before
59+
public void before() {
60+
MockitoAnnotations.initMocks(this);
61+
Mockito.when(configLoader.getInitialConfig()).thenReturn(driverConfig);
62+
Mockito.when(driverConfig.getDefaultProfile()).thenReturn(defaultProfile);
63+
}
64+
65+
private void buildDriverContext() {
66+
defaultDriverContext =
67+
new DefaultDriverContext(
68+
configLoader,
69+
typeCodecs,
70+
nodeStateListener,
71+
schemaChangeListener,
72+
requestTracker,
73+
nodeFilters,
74+
classLoader);
75+
}
76+
77+
private void assertDefaultStartupOptions(Startup startup) {
78+
assertThat(startup.options).containsEntry(Startup.CQL_VERSION_KEY, "3.0.0");
79+
assertThat(startup.options)
80+
.containsEntry(
81+
StartupOptionsBuilder.DRIVER_NAME_KEY, Session.OSS_DRIVER_COORDINATES.getName());
82+
assertThat(startup.options).containsKey(StartupOptionsBuilder.DRIVER_VERSION_KEY);
83+
Version version = Version.parse(startup.options.get(StartupOptionsBuilder.DRIVER_VERSION_KEY));
84+
assertThat(version).isEqualByComparingTo(Session.OSS_DRIVER_COORDINATES.getVersion());
85+
}
86+
87+
@Test
88+
public void should_build_minimal_startup_options() {
89+
buildDriverContext();
90+
Startup startup = new Startup(defaultDriverContext.getStartupOptions());
91+
assertThat(startup.options).doesNotContainKey(Startup.COMPRESSION_KEY);
92+
assertDefaultStartupOptions(startup);
93+
}
94+
95+
@Test
96+
public void should_build_startup_options_with_compression() {
97+
Mockito.when(defaultProfile.isDefined(DefaultDriverOption.PROTOCOL_COMPRESSION))
98+
.thenReturn(Boolean.TRUE);
99+
Mockito.when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION))
100+
.thenReturn("lz4");
101+
buildDriverContext();
102+
Startup startup = new Startup(defaultDriverContext.getStartupOptions());
103+
// assert the compression option is present
104+
assertThat(startup.options).containsEntry(Startup.COMPRESSION_KEY, "lz4");
105+
assertDefaultStartupOptions(startup);
106+
}
107+
}

0 commit comments

Comments
 (0)