Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### 4.0.0-beta2 (in progress)

- [new feature] JAVA-1932: Send Driver Name and Version in Startup message
- [new feature] JAVA-1917: Add ability to set node on statement
- [improvement] JAVA-1916: Base TimestampCodec.parse on java.util.Date.
- [improvement] JAVA-1940: Clean up test resources when CCM integration tests finish
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ String describe() {
Message getRequest() {
switch (step) {
case STARTUP:
return new Startup(context.getCompressor().algorithm());
return new Startup(context.getStartupOptions());
case GET_CLUSTER_NAME:
return CLUSTER_NAME_QUERY;
case SET_KEYSPACE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ public class DefaultDriverContext implements InternalDriverContext {
new LazyReference<>("metricsFactory", this::buildMetricsFactory, cycleDetector);
private final LazyReference<RequestThrottler> requestThrottlerRef =
new LazyReference<>("requestThrottler", this::buildRequestThrottler, cycleDetector);
private final LazyReference<Map<String, String>> startupOptionsRef =
new LazyReference<>("startupOptions", this::buildStartupOptions, cycleDetector);
private final LazyReference<NodeStateListener> nodeStateListenerRef;
private final LazyReference<SchemaChangeListener> schemaChangeListenerRef;
private final LazyReference<RequestTracker> requestTrackerRef;
Expand Down Expand Up @@ -230,6 +232,15 @@ public DefaultDriverContext(
this.classLoader = classLoader;
}

/**
* Builds a map of options to send in a Startup message.
*
* @see #getStartupOptions()
*/
protected Map<String, String> buildStartupOptions() {
return new StartupOptionsBuilder(this).build();
}

protected Map<String, LoadBalancingPolicy> buildLoadBalancingPolicies() {
return Reflection.buildFromConfigProfiles(
this,
Expand Down Expand Up @@ -731,4 +742,10 @@ public CodecRegistry getCodecRegistry() {
public ProtocolVersion getProtocolVersion() {
return getChannelFactory().getProtocolVersion();
}

@NonNull
@Override
public Map<String, String> getStartupOptions() {
return startupOptionsRef.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.netty.buffer.ByteBuf;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;

Expand Down Expand Up @@ -128,4 +129,12 @@ public interface InternalDriverContext extends DriverContext {
*/
@Nullable
ClassLoader getClassLoader();

/**
* Retrieves the map of options to send in a Startup message. The returned map will be used to
* construct a {@link com.datastax.oss.protocol.internal.request.Startup} instance when
* initializing the native protocol handshake.
*/
@NonNull
Map<String, String> getStartupOptions();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.internal.core.context;

import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.protocol.internal.request.Startup;
import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap;
import java.util.Map;
import net.jcip.annotations.Immutable;

@Immutable
public class StartupOptionsBuilder {

public static final String DRIVER_NAME_KEY = "DRIVER_NAME";
public static final String DRIVER_VERSION_KEY = "DRIVER_VERSION";

protected final InternalDriverContext context;

public StartupOptionsBuilder(InternalDriverContext context) {
this.context = context;
}

/**
* Builds a map of options to send in a Startup message.
*
* <p>The default set of options are built here and include {@link
* com.datastax.oss.protocol.internal.request.Startup#COMPRESSION_KEY} (if the context passed in
* has a compressor/algorithm set), and the driver's {@link #DRIVER_NAME_KEY} and {@link
* #DRIVER_VERSION_KEY}. The {@link com.datastax.oss.protocol.internal.request.Startup}
* constructor will add {@link
* com.datastax.oss.protocol.internal.request.Startup#CQL_VERSION_KEY}.
*
* @return Map of Startup Options.
*/
public Map<String, String> build() {
NullAllowingImmutableMap.Builder<String, String> builder = NullAllowingImmutableMap.builder(3);
// add compression (if configured) and driver name and version
String compressionAlgorithm = context.getCompressor().algorithm();
if (compressionAlgorithm != null && !compressionAlgorithm.trim().isEmpty()) {
builder.put(Startup.COMPRESSION_KEY, compressionAlgorithm.trim());
}
return builder
.put(DRIVER_NAME_KEY, getDriverName())
.put(DRIVER_VERSION_KEY, getDriverVersion())
.build();
}

/**
* Returns this driver's name.
*
* <p>By default, this method will pull from the bundled Driver.properties file. Subclasses should
* override this method if they need to report a different Driver name on Startup.
*/
protected String getDriverName() {
return Session.OSS_DRIVER_COORDINATES.getName();
}

/**
* Returns this driver's version.
*
* <p>By default, this method will pull from the bundled Driver.properties file. Subclasses should
* override this method if they need to report a different Driver version on Startup.
*/
protected String getDriverVersion() {
return Session.OSS_DRIVER_COORDINATES.getVersion().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.datastax.oss.driver.internal.core.TestResponses;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
import com.datastax.oss.protocol.internal.Compressor;
import com.datastax.oss.protocol.internal.Frame;
import com.datastax.oss.protocol.internal.ProtocolConstants;
import com.datastax.oss.protocol.internal.request.AuthResponse;
Expand All @@ -44,7 +43,6 @@
import com.datastax.oss.protocol.internal.response.Ready;
import com.datastax.oss.protocol.internal.response.result.SetKeyspace;
import com.datastax.oss.protocol.internal.util.Bytes;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import java.net.InetSocketAddress;
import java.time.Duration;
Expand All @@ -64,7 +62,6 @@ public class ProtocolInitHandlerTest extends ChannelHandlerTestBase {
@Mock private InternalDriverContext internalDriverContext;
@Mock private DriverConfig driverConfig;
@Mock private DriverExecutionProfile defaultProfile;
@Mock private Compressor<ByteBuf> compressor;

private ProtocolVersionRegistry protocolVersionRegistry =
new CassandraProtocolVersionRegistry("test");
Expand All @@ -83,8 +80,6 @@ public void setup() {
.thenReturn(Duration.ofSeconds(30));
Mockito.when(internalDriverContext.getProtocolVersionRegistry())
.thenReturn(protocolVersionRegistry);
Mockito.when(internalDriverContext.getCompressor()).thenReturn(compressor);
Mockito.when(compressor.algorithm()).thenReturn(null);

channel
.pipeline()
Expand Down Expand Up @@ -120,8 +115,6 @@ public void should_initialize() {
// It should send a STARTUP message
Frame requestFrame = readOutboundFrame();
assertThat(requestFrame.message).isInstanceOf(Startup.class);
Startup startup = (Startup) requestFrame.message;
assertThat(startup.options).doesNotContainKey("COMPRESSION");
assertThat(connectFuture).isNotDone();

// Simulate a READY response
Expand All @@ -136,34 +129,6 @@ public void should_initialize() {
assertThat(connectFuture).isSuccess();
}

@Test
public void should_initialize_with_compression() {
Mockito.when(compressor.algorithm()).thenReturn("lz4");
channel
.pipeline()
.addLast(
"init",
new ProtocolInitHandler(
internalDriverContext,
DefaultProtocolVersion.V4,
null,
DriverChannelOptions.DEFAULT,
heartbeatHandler));

ChannelFuture connectFuture = channel.connect(new InetSocketAddress("localhost", 9042));

Frame requestFrame = readOutboundFrame();
assertThat(requestFrame.message).isInstanceOf(Startup.class);
Startup startup = (Startup) requestFrame.message;

// STARTUP message should request compression
assertThat(startup.options).containsEntry("COMPRESSION", "lz4");

writeInboundFrame(buildInboundFrame(requestFrame, new Ready()));
writeInboundFrame(readOutboundFrame(), TestResponses.clusterNameResponse("someClusterName"));
assertThat(connectFuture).isSuccess();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 that makes sense, the only thing this was testing was the options map, and since it's now built outside of the handler, covering it here wouldn't be very useful.

@Test
public void should_add_heartbeat_handler_to_pipeline_on_success() {
ProtocolInitHandler protocolInitHandler =
Expand All @@ -184,8 +149,6 @@ public void should_add_heartbeat_handler_to_pipeline_on_success() {
// It should send a STARTUP message
Frame requestFrame = readOutboundFrame();
assertThat(requestFrame.message).isInstanceOf(Startup.class);
Startup startup = (Startup) requestFrame.message;
assertThat(startup.options).doesNotContainKey("COMPRESSION");
assertThat(connectFuture).isNotDone();

// Simulate a READY response
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.internal.core.context;

import static org.assertj.core.api.Assertions.assertThat;

import com.datastax.oss.driver.api.core.Version;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfig;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeStateListener;
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import com.datastax.oss.driver.shaded.guava.common.collect.Lists;
import com.datastax.oss.driver.shaded.guava.common.collect.Maps;
import com.datastax.oss.protocol.internal.request.Startup;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

public class StartupOptionsBuilderTest {

private DefaultDriverContext defaultDriverContext;

// Mocks for instantiating the default driver context
@Mock private DriverConfigLoader configLoader;
private List<TypeCodec<?>> typeCodecs = Lists.newArrayList();
@Mock private NodeStateListener nodeStateListener;
@Mock private SchemaChangeListener schemaChangeListener;
@Mock private RequestTracker requestTracker;
private Map<String, Predicate<Node>> nodeFilters = Maps.newHashMap();
@Mock private ClassLoader classLoader;
@Mock private DriverConfig driverConfig;
@Mock private DriverExecutionProfile defaultProfile;

@Before
public void before() {
MockitoAnnotations.initMocks(this);
Mockito.when(configLoader.getInitialConfig()).thenReturn(driverConfig);
Mockito.when(driverConfig.getDefaultProfile()).thenReturn(defaultProfile);
}

private void buildDriverContext() {
defaultDriverContext =
new DefaultDriverContext(
configLoader,
typeCodecs,
nodeStateListener,
schemaChangeListener,
requestTracker,
nodeFilters,
classLoader);
}

private void assertDefaultStartupOptions(Startup startup) {
assertThat(startup.options).containsEntry(Startup.CQL_VERSION_KEY, "3.0.0");
assertThat(startup.options)
.containsEntry(
StartupOptionsBuilder.DRIVER_NAME_KEY, Session.OSS_DRIVER_COORDINATES.getName());
assertThat(startup.options).containsKey(StartupOptionsBuilder.DRIVER_VERSION_KEY);
Version version = Version.parse(startup.options.get(StartupOptionsBuilder.DRIVER_VERSION_KEY));
assertThat(version).isEqualByComparingTo(Session.OSS_DRIVER_COORDINATES.getVersion());
}

@Test
public void should_build_minimal_startup_options() {
buildDriverContext();
Startup startup = new Startup(defaultDriverContext.getStartupOptions());
assertThat(startup.options).doesNotContainKey(Startup.COMPRESSION_KEY);
assertDefaultStartupOptions(startup);
}

@Test
public void should_build_startup_options_with_compression() {
Mockito.when(defaultProfile.isDefined(DefaultDriverOption.PROTOCOL_COMPRESSION))
.thenReturn(Boolean.TRUE);
Mockito.when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION))
.thenReturn("lz4");
buildDriverContext();
Startup startup = new Startup(defaultDriverContext.getStartupOptions());
// assert the compression option is present
assertThat(startup.options).containsEntry(Startup.COMPRESSION_KEY, "lz4");
assertDefaultStartupOptions(startup);
}
}