diff --git a/changelog/README.md b/changelog/README.md index 9c8ebb09c7f..1c09a4c0ca7 100644 --- a/changelog/README.md +++ b/changelog/README.md @@ -4,6 +4,7 @@ ### 4.0.0-beta2 (in progress) +- [new feature] JAVA-1932: Send Driver Name and Version in Startup message - [new feature] JAVA-1917: Add ability to set node on statement - [improvement] JAVA-1916: Base TimestampCodec.parse on java.util.Date. - [improvement] JAVA-1940: Clean up test resources when CCM integration tests finish diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java index eda34429267..240e474709c 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java @@ -142,7 +142,7 @@ String describe() { Message getRequest() { switch (step) { case STARTUP: - return new Startup(context.getCompressor().algorithm()); + return new Startup(context.getStartupOptions()); case GET_CLUSTER_NAME: return CLUSTER_NAME_QUERY; case SET_KEYSPACE: diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java b/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java index 89424a6f568..86f4fecd199 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java @@ -178,6 +178,8 @@ public class DefaultDriverContext implements InternalDriverContext { new LazyReference<>("metricsFactory", this::buildMetricsFactory, cycleDetector); private final LazyReference requestThrottlerRef = new LazyReference<>("requestThrottler", this::buildRequestThrottler, cycleDetector); + private final LazyReference> startupOptionsRef = + new LazyReference<>("startupOptions", this::buildStartupOptions, cycleDetector); private final LazyReference nodeStateListenerRef; private final LazyReference schemaChangeListenerRef; private final LazyReference requestTrackerRef; @@ -230,6 +232,15 @@ public DefaultDriverContext( this.classLoader = classLoader; } + /** + * Builds a map of options to send in a Startup message. + * + * @see #getStartupOptions() + */ + protected Map buildStartupOptions() { + return new StartupOptionsBuilder(this).build(); + } + protected Map buildLoadBalancingPolicies() { return Reflection.buildFromConfigProfiles( this, @@ -731,4 +742,10 @@ public CodecRegistry getCodecRegistry() { public ProtocolVersion getProtocolVersion() { return getChannelFactory().getProtocolVersion(); } + + @NonNull + @Override + public Map getStartupOptions() { + return startupOptionsRef.get(); + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/context/InternalDriverContext.java b/core/src/main/java/com/datastax/oss/driver/internal/core/context/InternalDriverContext.java index feae0df50b5..ee926a253fa 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/context/InternalDriverContext.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/context/InternalDriverContext.java @@ -41,6 +41,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import io.netty.buffer.ByteBuf; +import java.util.Map; import java.util.Optional; import java.util.function.Predicate; @@ -128,4 +129,12 @@ public interface InternalDriverContext extends DriverContext { */ @Nullable ClassLoader getClassLoader(); + + /** + * Retrieves the map of options to send in a Startup message. The returned map will be used to + * construct a {@link com.datastax.oss.protocol.internal.request.Startup} instance when + * initializing the native protocol handshake. + */ + @NonNull + Map getStartupOptions(); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java b/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java new file mode 100644 index 00000000000..49718b7df97 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java @@ -0,0 +1,80 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.context; + +import com.datastax.oss.driver.api.core.session.Session; +import com.datastax.oss.protocol.internal.request.Startup; +import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap; +import java.util.Map; +import net.jcip.annotations.Immutable; + +@Immutable +public class StartupOptionsBuilder { + + public static final String DRIVER_NAME_KEY = "DRIVER_NAME"; + public static final String DRIVER_VERSION_KEY = "DRIVER_VERSION"; + + protected final InternalDriverContext context; + + public StartupOptionsBuilder(InternalDriverContext context) { + this.context = context; + } + + /** + * Builds a map of options to send in a Startup message. + * + *

The default set of options are built here and include {@link + * com.datastax.oss.protocol.internal.request.Startup#COMPRESSION_KEY} (if the context passed in + * has a compressor/algorithm set), and the driver's {@link #DRIVER_NAME_KEY} and {@link + * #DRIVER_VERSION_KEY}. The {@link com.datastax.oss.protocol.internal.request.Startup} + * constructor will add {@link + * com.datastax.oss.protocol.internal.request.Startup#CQL_VERSION_KEY}. + * + * @return Map of Startup Options. + */ + public Map build() { + NullAllowingImmutableMap.Builder builder = NullAllowingImmutableMap.builder(3); + // add compression (if configured) and driver name and version + String compressionAlgorithm = context.getCompressor().algorithm(); + if (compressionAlgorithm != null && !compressionAlgorithm.trim().isEmpty()) { + builder.put(Startup.COMPRESSION_KEY, compressionAlgorithm.trim()); + } + return builder + .put(DRIVER_NAME_KEY, getDriverName()) + .put(DRIVER_VERSION_KEY, getDriverVersion()) + .build(); + } + + /** + * Returns this driver's name. + * + *

By default, this method will pull from the bundled Driver.properties file. Subclasses should + * override this method if they need to report a different Driver name on Startup. + */ + protected String getDriverName() { + return Session.OSS_DRIVER_COORDINATES.getName(); + } + + /** + * Returns this driver's version. + * + *

By default, this method will pull from the bundled Driver.properties file. Subclasses should + * override this method if they need to report a different Driver version on Startup. + */ + protected String getDriverVersion() { + return Session.OSS_DRIVER_COORDINATES.getVersion().toString(); + } +} diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandlerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandlerTest.java index 7c334180177..d1618dcb4be 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandlerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandlerTest.java @@ -30,7 +30,6 @@ import com.datastax.oss.driver.internal.core.TestResponses; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; -import com.datastax.oss.protocol.internal.Compressor; import com.datastax.oss.protocol.internal.Frame; import com.datastax.oss.protocol.internal.ProtocolConstants; import com.datastax.oss.protocol.internal.request.AuthResponse; @@ -44,7 +43,6 @@ import com.datastax.oss.protocol.internal.response.Ready; import com.datastax.oss.protocol.internal.response.result.SetKeyspace; import com.datastax.oss.protocol.internal.util.Bytes; -import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import java.net.InetSocketAddress; import java.time.Duration; @@ -64,7 +62,6 @@ public class ProtocolInitHandlerTest extends ChannelHandlerTestBase { @Mock private InternalDriverContext internalDriverContext; @Mock private DriverConfig driverConfig; @Mock private DriverExecutionProfile defaultProfile; - @Mock private Compressor compressor; private ProtocolVersionRegistry protocolVersionRegistry = new CassandraProtocolVersionRegistry("test"); @@ -83,8 +80,6 @@ public void setup() { .thenReturn(Duration.ofSeconds(30)); Mockito.when(internalDriverContext.getProtocolVersionRegistry()) .thenReturn(protocolVersionRegistry); - Mockito.when(internalDriverContext.getCompressor()).thenReturn(compressor); - Mockito.when(compressor.algorithm()).thenReturn(null); channel .pipeline() @@ -120,8 +115,6 @@ public void should_initialize() { // It should send a STARTUP message Frame requestFrame = readOutboundFrame(); assertThat(requestFrame.message).isInstanceOf(Startup.class); - Startup startup = (Startup) requestFrame.message; - assertThat(startup.options).doesNotContainKey("COMPRESSION"); assertThat(connectFuture).isNotDone(); // Simulate a READY response @@ -136,34 +129,6 @@ public void should_initialize() { assertThat(connectFuture).isSuccess(); } - @Test - public void should_initialize_with_compression() { - Mockito.when(compressor.algorithm()).thenReturn("lz4"); - channel - .pipeline() - .addLast( - "init", - new ProtocolInitHandler( - internalDriverContext, - DefaultProtocolVersion.V4, - null, - DriverChannelOptions.DEFAULT, - heartbeatHandler)); - - ChannelFuture connectFuture = channel.connect(new InetSocketAddress("localhost", 9042)); - - Frame requestFrame = readOutboundFrame(); - assertThat(requestFrame.message).isInstanceOf(Startup.class); - Startup startup = (Startup) requestFrame.message; - - // STARTUP message should request compression - assertThat(startup.options).containsEntry("COMPRESSION", "lz4"); - - writeInboundFrame(buildInboundFrame(requestFrame, new Ready())); - writeInboundFrame(readOutboundFrame(), TestResponses.clusterNameResponse("someClusterName")); - assertThat(connectFuture).isSuccess(); - } - @Test public void should_add_heartbeat_handler_to_pipeline_on_success() { ProtocolInitHandler protocolInitHandler = @@ -184,8 +149,6 @@ public void should_add_heartbeat_handler_to_pipeline_on_success() { // It should send a STARTUP message Frame requestFrame = readOutboundFrame(); assertThat(requestFrame.message).isInstanceOf(Startup.class); - Startup startup = (Startup) requestFrame.message; - assertThat(startup.options).doesNotContainKey("COMPRESSION"); assertThat(connectFuture).isNotDone(); // Simulate a READY response diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilderTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilderTest.java new file mode 100644 index 00000000000..3868212179a --- /dev/null +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilderTest.java @@ -0,0 +1,107 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.context; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.datastax.oss.driver.api.core.Version; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverConfig; +import com.datastax.oss.driver.api.core.config.DriverConfigLoader; +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.NodeStateListener; +import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener; +import com.datastax.oss.driver.api.core.session.Session; +import com.datastax.oss.driver.api.core.tracker.RequestTracker; +import com.datastax.oss.driver.api.core.type.codec.TypeCodec; +import com.datastax.oss.driver.shaded.guava.common.collect.Lists; +import com.datastax.oss.driver.shaded.guava.common.collect.Maps; +import com.datastax.oss.protocol.internal.request.Startup; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +public class StartupOptionsBuilderTest { + + private DefaultDriverContext defaultDriverContext; + + // Mocks for instantiating the default driver context + @Mock private DriverConfigLoader configLoader; + private List> typeCodecs = Lists.newArrayList(); + @Mock private NodeStateListener nodeStateListener; + @Mock private SchemaChangeListener schemaChangeListener; + @Mock private RequestTracker requestTracker; + private Map> nodeFilters = Maps.newHashMap(); + @Mock private ClassLoader classLoader; + @Mock private DriverConfig driverConfig; + @Mock private DriverExecutionProfile defaultProfile; + + @Before + public void before() { + MockitoAnnotations.initMocks(this); + Mockito.when(configLoader.getInitialConfig()).thenReturn(driverConfig); + Mockito.when(driverConfig.getDefaultProfile()).thenReturn(defaultProfile); + } + + private void buildDriverContext() { + defaultDriverContext = + new DefaultDriverContext( + configLoader, + typeCodecs, + nodeStateListener, + schemaChangeListener, + requestTracker, + nodeFilters, + classLoader); + } + + private void assertDefaultStartupOptions(Startup startup) { + assertThat(startup.options).containsEntry(Startup.CQL_VERSION_KEY, "3.0.0"); + assertThat(startup.options) + .containsEntry( + StartupOptionsBuilder.DRIVER_NAME_KEY, Session.OSS_DRIVER_COORDINATES.getName()); + assertThat(startup.options).containsKey(StartupOptionsBuilder.DRIVER_VERSION_KEY); + Version version = Version.parse(startup.options.get(StartupOptionsBuilder.DRIVER_VERSION_KEY)); + assertThat(version).isEqualByComparingTo(Session.OSS_DRIVER_COORDINATES.getVersion()); + } + + @Test + public void should_build_minimal_startup_options() { + buildDriverContext(); + Startup startup = new Startup(defaultDriverContext.getStartupOptions()); + assertThat(startup.options).doesNotContainKey(Startup.COMPRESSION_KEY); + assertDefaultStartupOptions(startup); + } + + @Test + public void should_build_startup_options_with_compression() { + Mockito.when(defaultProfile.isDefined(DefaultDriverOption.PROTOCOL_COMPRESSION)) + .thenReturn(Boolean.TRUE); + Mockito.when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION)) + .thenReturn("lz4"); + buildDriverContext(); + Startup startup = new Startup(defaultDriverContext.getStartupOptions()); + // assert the compression option is present + assertThat(startup.options).containsEntry(Startup.COMPRESSION_KEY, "lz4"); + assertDefaultStartupOptions(startup); + } +}