Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ ext {
micrometerVersion = '1.7.6'
mockitoVersion = '3.12.4'
mongoDriverVersion = '4.3.2'
mysqlVersion = '8.0.26'
mysqlVersion = '8.0.27'
pahoMqttClientVersion = '1.2.5'
postgresVersion = '42.2.23'
r2dbch2Version='0.8.4.RELEASE'
Expand All @@ -105,6 +105,7 @@ ext {
springSecurityVersion = project.hasProperty('springSecurityVersion') ? project.springSecurityVersion : '5.5.3'
springVersion = project.hasProperty('springVersion') ? project.springVersion : '5.3.13'
springWsVersion = '3.1.1'
testcontainersVersion = '1.16.2'
tomcatVersion = '9.0.52'
xmlUnitVersion = '2.8.2'
xstreamVersion = '1.4.17'
Expand Down Expand Up @@ -255,7 +256,7 @@ configure(javaProjects) { subproject ->
testImplementation 'org.jetbrains.kotlin:kotlin-reflect'
testImplementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
testImplementation 'io.projectreactor:reactor-test'
testImplementation 'org.testcontainers:junit-jupiter:1.16.0'
testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"

testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
Expand Down Expand Up @@ -607,6 +608,7 @@ project('spring-integration-jdbc') {
testImplementation "org.postgresql:postgresql:$postgresVersion"
testImplementation "mysql:mysql-connector-java:$mysqlVersion"
testImplementation "org.apache.commons:commons-dbcp2:$commonsDbcp2Version"
testImplementation "org.testcontainers:mysql:$testcontainersVersion"

testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.integration.jdbc.store;

import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
Expand Down Expand Up @@ -47,6 +48,9 @@
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.SingleColumnRowMapper;
import org.springframework.jdbc.support.JdbcAccessor;
import org.springframework.jdbc.support.JdbcUtils;
import org.springframework.jdbc.support.MetaDataAccessException;
import org.springframework.jdbc.support.lob.DefaultLobHandler;
import org.springframework.jdbc.support.lob.LobHandler;
import org.springframework.jmx.export.annotation.ManagedAttribute;
Expand Down Expand Up @@ -87,7 +91,7 @@ private enum Query {
"(GROUP_KEY, REGION, COMPLETE, LAST_RELEASED_SEQUENCE, CREATED_DATE, UPDATED_DATE)"
+ " values (?, ?, 0, 0, ?, ?)"),

UPDATE_MESSAGE_GROUP("UPDATE %PREFIX%MESSAGE_GROUP set UPDATED_DATE=?, CONDITION=? " +
UPDATE_MESSAGE_GROUP("UPDATE %PREFIX%MESSAGE_GROUP set UPDATED_DATE=?, \"CONDITION\"=? " +
"where GROUP_KEY=? and REGION=?"),

REMOVE_MESSAGE_FROM_GROUP("DELETE from %PREFIX%GROUP_TO_MESSAGE where GROUP_KEY=? and MESSAGE_ID=? and " +
Expand Down Expand Up @@ -117,7 +121,7 @@ private enum Query {
"and %PREFIX%GROUP_TO_MESSAGE.GROUP_KEY = ? " +
"and m.REGION = ?)"),

GET_GROUP_INFO("SELECT COMPLETE, LAST_RELEASED_SEQUENCE, CREATED_DATE, UPDATED_DATE, CONDITION" +
GET_GROUP_INFO("SELECT COMPLETE, LAST_RELEASED_SEQUENCE, CREATED_DATE, UPDATED_DATE, \"CONDITION\"" +
" from %PREFIX%MESSAGE_GROUP where GROUP_KEY=? and REGION=?"),

GET_MESSAGE("SELECT MESSAGE_ID, CREATED_DATE, MESSAGE_BYTES from %PREFIX%MESSAGE where MESSAGE_ID=? and " +
Expand Down Expand Up @@ -165,6 +169,8 @@ public String getSql() {

private final JdbcOperations jdbcTemplate;

private final String vendorName;

private final Map<Query, String> queryCache = new HashMap<>();

private String region = "DEFAULT";
Expand Down Expand Up @@ -195,6 +201,14 @@ public JdbcMessageStore(JdbcOperations jdbcOperations) {
this.jdbcTemplate = jdbcOperations;
this.deserializer = new AllowListDeserializingConverter();
this.serializer = new SerializingConverter();
try {
this.vendorName =
JdbcUtils.extractDatabaseMetaData(((JdbcAccessor) jdbcOperations).getDataSource(), // NOSONAR
DatabaseMetaData::getDatabaseProductName);
}
catch (MetaDataAccessException ex) {
throw new IllegalStateException("Cannot extract database vendor name", ex);
}
}

/**
Expand Down Expand Up @@ -549,7 +563,7 @@ public Stream<Message<?>> streamMessagesForGroup(Object groupId) {
public Iterator<MessageGroup> iterator() {

final Iterator<String> iterator = this.jdbcTemplate.query(getQuery(Query.LIST_GROUP_KEYS),
new SingleColumnRowMapper<String>(), this.region)
new SingleColumnRowMapper<String>(), this.region)
.iterator();

return new Iterator<MessageGroup>() {
Expand Down Expand Up @@ -580,14 +594,16 @@ public void remove() {
* @return a transformed query with replacements
*/
protected String getQuery(Query base) {
String query = this.queryCache.get(base);
return this.queryCache.computeIfAbsent(base,
query -> {
String parsedSql = StringUtils.replace(query.getSql(), "%PREFIX%", this.tablePrefix);
if ((Query.GET_GROUP_INFO.equals(base) || Query.UPDATE_MESSAGE_GROUP.equals(base))
&& this.vendorName.equals("MySQL")) {

if (query == null) {
query = StringUtils.replace(base.getSql(), "%PREFIX%", this.tablePrefix);
this.queryCache.put(base, query);
}

return query;
parsedSql = parsedSql.replaceFirst("\"(CONDITION)\"", "`$1`");
}
return parsedSql;
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ CREATE TABLE INT_GROUP_TO_MESSAGE (
CREATE TABLE INT_MESSAGE_GROUP (
GROUP_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CONDITION VARCHAR(255),
"CONDITION" VARCHAR(255),
COMPLETE BIGINT,
LAST_RELEASED_SEQUENCE BIGINT,
CREATED_DATE TIMESTAMP NOT NULL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ CREATE TABLE INT_GROUP_TO_MESSAGE (
CREATE TABLE INT_MESSAGE_GROUP (
GROUP_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CONDITION VARCHAR(255),
"CONDITION" VARCHAR(255),
COMPLETE BIGINT,
LAST_RELEASED_SEQUENCE BIGINT,
CREATED_DATE TIMESTAMP NOT NULL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ CREATE TABLE INT_GROUP_TO_MESSAGE (
CREATE TABLE INT_MESSAGE_GROUP (
GROUP_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CONDITION VARCHAR(255),
`CONDITION` VARCHAR(255),
COMPLETE BIGINT,
LAST_RELEASED_SEQUENCE BIGINT,
CREATED_DATE DATETIME(6) NOT NULL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ CREATE TABLE INT_GROUP_TO_MESSAGE (
CREATE TABLE INT_MESSAGE_GROUP (
GROUP_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CONDITION VARCHAR(255),
"CONDITION" VARCHAR(255),
COMPLETE BIGINT,
LAST_RELEASED_SEQUENCE BIGINT,
CREATED_DATE TIMESTAMP NOT NULL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ CREATE TABLE INT_GROUP_TO_MESSAGE (
CREATE TABLE INT_MESSAGE_GROUP (
GROUP_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CONDITION VARCHAR(255),
"CONDITION" VARCHAR(255),
COMPLETE BIGINT,
LAST_RELEASED_SEQUENCE BIGINT,
CREATED_DATE DATETIME NOT NULL,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.jdbc.mysql;

import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import org.springframework.integration.test.util.TestUtils;

/**
* The base contract for JUnit tests based on the container for MqSQL.
*
* @author Artem Bilan
*
* @since 5.5.7
*/
@Testcontainers(disabledWithoutDocker = true)
public interface MySqlContainerTest {

@Container
MySQLContainer<?> MY_SQL_CONTAINER =
new MySQLContainer<>(TestUtils.dockerRegistryFromEnv() + "mysql:latest")
.withReuse(true);


static String getDriverClassName() {
return MY_SQL_CONTAINER.getDriverClassName();
}

static String getJdbcUrl() {
return MY_SQL_CONTAINER.getJdbcUrl();
}

static String getUsername() {
return MY_SQL_CONTAINER.getUsername();
}

static String getPassword() {
return MY_SQL_CONTAINER.getPassword();
}

}

This file was deleted.

Loading