Skip to content

Commit e2cf27c

Browse files
yybmionartembilan
authored andcommitted
GH-9312: Add JSON support for JdbcChannelMessageStore
Fixes: #9312 - Add `JsonChannelMessageStorePreparedStatementSetter` for serialization - Add `JsonMessageRowMapper` for deserialization with trusted package validation - Support PostgreSQL (JSONB), MySQL (JSON), and H2 (CLOB) databases - Add comprehensive test coverage and documentation - Document the feature Signed-off-by: Yoobin Yoon <[email protected]>
1 parent fff5389 commit e2cf27c

File tree

17 files changed

+1120
-22
lines changed

17 files changed

+1120
-22
lines changed

build.gradle

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -686,16 +686,14 @@ project('spring-integration-jdbc') {
686686
testImplementation "org.apache.derby:derbytools:$derbyVersion"
687687
testImplementation "org.apache.derby:derbyclient:$derbyVersion"
688688
testImplementation "org.postgresql:postgresql:$postgresVersion"
689-
testImplementation "com.mysql:mysql-connector-j:$mysqlVersion"
690-
testImplementation("org.apache.commons:commons-dbcp2:$commonsDbcp2Version") {
691-
exclude group: 'commons-logging'
692-
}
689+
testImplementation "org.apache.commons:commons-dbcp2:$commonsDbcp2Version"
693690
testImplementation 'org.testcontainers:testcontainers-mysql'
694691
testImplementation 'org.testcontainers:testcontainers-postgresql'
695692
testImplementation 'org.testcontainers:testcontainers-oracle-xe'
693+
testImplementation 'tools.jackson.core:jackson-databind'
696694

697-
testRuntimeOnly 'tools.jackson.core:jackson-databind'
698695
testRuntimeOnly "com.oracle.database.jdbc:ojdbc11:$oracleVersion"
696+
testRuntimeOnly "com.mysql:mysql-connector-j:$mysqlVersion"
699697
}
700698
}
701699

spring-integration-core/src/main/java/org/springframework/integration/support/json/JacksonJsonUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ private JacksonJsonUtils() {
8383
* @throws IllegalStateException if an implementation is not available.
8484
* @since 4.3.10
8585
*/
86-
public static ObjectMapper messagingAwareMapper(String... trustedPackages) {
86+
public static ObjectMapper messagingAwareMapper(String @Nullable ... trustedPackages) {
8787
if (JacksonPresent.isJackson2Present()) {
8888
ObjectMapper mapper = new Jackson2JsonObjectMapper().getObjectMapper();
8989

spring-integration-core/src/main/java/org/springframework/integration/support/json/JsonObjectMapperProvider.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.support.json;
1818

19+
import org.jspecify.annotations.Nullable;
20+
1921
/**
2022
* Simple factory to provide {@linkplain JsonObjectMapper}
2123
* instances based on jackson-databind lib in the classpath.
@@ -53,6 +55,25 @@ else if (JacksonPresent.isJackson2Present()) {
5355
}
5456
}
5557

58+
/**
59+
* Return an object mapper (if available) aware of messaging classes to (de)serialize.
60+
* @return the mapper.
61+
* @throws IllegalStateException if an implementation is not available.
62+
* @since 7.0
63+
*/
64+
@SuppressWarnings("removal")
65+
public static JsonObjectMapper<?, ?> newMessagingAwareInstance(String @Nullable ... trustedPackages) {
66+
if (JacksonPresent.isJackson3Present()) {
67+
return new JacksonJsonObjectMapper(JacksonMessagingUtils.messagingAwareMapper(trustedPackages));
68+
}
69+
else if (JacksonPresent.isJackson2Present()) {
70+
return new Jackson2JsonObjectMapper(JacksonJsonUtils.messagingAwareMapper(trustedPackages));
71+
}
72+
else {
73+
throw new IllegalStateException("No jackson-databind.jar is present in the classpath.");
74+
}
75+
}
76+
5677
/**
5778
* Returns true if a supported JSON implementation is on the class path.
5879
* @return true if {@link #newInstance()} will return a mapper.

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.springframework.integration.support.converter.AllowListDeserializingConverter;
5353
import org.springframework.integration.util.UUIDConverter;
5454
import org.springframework.jdbc.core.JdbcTemplate;
55+
import org.springframework.jdbc.core.RowMapper;
5556
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
5657
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
5758
import org.springframework.jmx.export.annotation.ManagedAttribute;
@@ -89,6 +90,7 @@
8990
* @author Trung Pham
9091
* @author Johannes Edmeier
9192
* @author Ngoc Nhan
93+
* @author Yoobin Yoon
9294
*
9395
* @since 2.2
9496
*/
@@ -109,7 +111,7 @@ public class JdbcChannelMessageStore implements PriorityCapableChannelMessageSto
109111
*/
110112
public static final String DEFAULT_TABLE_PREFIX = "INT_";
111113

112-
private enum Query {
114+
protected enum Query {
113115
CREATE_MESSAGE,
114116
COUNT_GROUPS,
115117
GROUP_SIZE,
@@ -148,7 +150,7 @@ private enum Query {
148150
private SerializingConverter serializer;
149151

150152
@SuppressWarnings("NullAway.Init")
151-
private MessageRowMapper messageRowMapper;
153+
private RowMapper<Message<?>> messageRowMapper;
152154

153155
@SuppressWarnings("NullAway.Init")
154156
private ChannelMessageStorePreparedStatementSetter preparedStatementSetter;
@@ -208,7 +210,7 @@ public void setDeserializer(Deserializer<? extends Message<?>> deserializer) {
208210

209211
/**
210212
* Add patterns for packages/classes that are allowed to be deserialized. A class can
211-
* be fully qualified or a wildcard '*' is allowed at the beginning or end of the
213+
* be fully qualified, or a wildcard '*' is allowed at the beginning or end of the
212214
* class name. Examples: {@code com.foo.*}, {@code *.MyClass}.
213215
* @param patterns the patterns.
214216
* @since 5.4
@@ -232,18 +234,18 @@ public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
232234
}
233235

234236
/**
235-
* Allow for passing in a custom {@link MessageRowMapper}. The {@link MessageRowMapper}
236-
* is used to convert the selected database row representing the persisted
237-
* message into the actual {@link Message} object.
237+
* Allow for passing in a custom {@link RowMapper} for {@link Message}.
238+
* The {@link RowMapper} is used to convert the selected database row
239+
* representing the persisted message into the actual {@link Message} object.
238240
* @param messageRowMapper Must not be null
239241
*/
240-
public void setMessageRowMapper(MessageRowMapper messageRowMapper) {
241-
Assert.notNull(messageRowMapper, "The provided MessageRowMapper must not be null.");
242+
public void setMessageRowMapper(RowMapper<Message<?>> messageRowMapper) {
243+
Assert.notNull(messageRowMapper, "The provided RowMapper must not be null.");
242244
this.messageRowMapper = messageRowMapper;
243245
}
244246

245247
/**
246-
* Set a {@link ChannelMessageStorePreparedStatementSetter} to insert message into the database.
248+
* Set a {@link ChannelMessageStorePreparedStatementSetter} to insert a message into the database.
247249
* @param preparedStatementSetter {@link ChannelMessageStorePreparedStatementSetter} to use.
248250
* Must not be null
249251
* @since 5.0
@@ -259,7 +261,7 @@ public void setPreparedStatementSetter(ChannelMessageStorePreparedStatementSette
259261
* The {@link JdbcChannelMessageStore} provides the SQL queries to retrieve messages from
260262
* the database. See the JavaDocs {@link ChannelMessageStoreQueryProvider} (all known
261263
* implementing classes) to see those implementations provided by the framework.
262-
* <p> You can provide your own query implementations, if you need to support additional
264+
* <p> You can provide your own query implementations if you need to support additional
263265
* databases and/or need to fine-tune the queries for your requirements.
264266
* @param channelMessageStoreQueryProvider Must not be null.
265267
*/
@@ -313,7 +315,7 @@ public void setTablePrefix(String tablePrefix) {
313315
* using a task executor.</p>
314316
* <p>The issue is that the {@link #pollMessageFromGroup(Object)} looks for the
315317
* oldest entry for a giving channel (groupKey) and region ({@link #setRegion(String)}).
316-
* If you do that with multiple threads and you are using transactions, other
318+
* If you do that with multiple threads, and you are using transactions, other
317319
* threads may be waiting for that same locked row.</p>
318320
* <p>If using the provided
319321
* {@link org.springframework.integration.jdbc.store.channel.OracleChannelMessageStoreQueryProvider},
@@ -323,7 +325,7 @@ public void setTablePrefix(String tablePrefix) {
323325
* message id in an in-memory collection for the duration of processing. With
324326
* that, any polling threads will explicitly exclude those messages from
325327
* being polled.</p>
326-
* <p>For this to work, you must setup the corresponding
328+
* <p>For this to work, you must set up the corresponding
327329
* {@link org.springframework.integration.transaction.TransactionSynchronizationFactory}:</p>
328330
* <pre class="code">
329331
* {@code
@@ -388,7 +390,7 @@ protected MessageGroupFactory getMessageGroupFactory() {
388390
* Check mandatory properties ({@link DataSource} and
389391
* {@link #setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider)}). If no {@link MessageRowMapper}
390392
* and {@link ChannelMessageStorePreparedStatementSetter} was explicitly set using
391-
* {@link #setMessageRowMapper(MessageRowMapper)} and
393+
* {@link #setMessageRowMapper(RowMapper)} and
392394
* {@link #setPreparedStatementSetter(ChannelMessageStorePreparedStatementSetter)} respectively, the default
393395
* {@link MessageRowMapper} and {@link ChannelMessageStorePreparedStatementSetter} will be instantiated using the
394396
* specified {@link #deserializer}.
@@ -493,7 +495,7 @@ public MessageGroup getMessageGroup(Object groupId) {
493495
}
494496

495497
/**
496-
* Return the number of message groups in the store for configured region.
498+
* Return the number of message groups in the store for a configured region.
497499
* @return The message group count.
498500
*/
499501
@ManagedAttribute
@@ -564,8 +566,8 @@ private boolean isSingleStatementForPoll() {
564566
* This method executes a call to the DB to get the oldest Message in the
565567
* MessageGroup which in the context of the {@link JdbcChannelMessageStore}
566568
* means the channel identifier.
567-
* @param groupIdKey String representation of message group (Channel) ID
568-
* @return a message; could be null if query produced no Messages
569+
* @param groupIdKey String representation of a message group (Channel) ID
570+
* @return a message; could be null if a query produced no Messages
569571
*/
570572
protected @Nullable Message<?> doPollForMessage(String groupIdKey) {
571573
NamedParameterJdbcTemplate namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(this.jdbcTemplate);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jdbc.store.channel;
18+
19+
import java.io.IOException;
20+
import java.sql.PreparedStatement;
21+
import java.sql.SQLException;
22+
import java.sql.Types;
23+
24+
import org.springframework.integration.support.json.JsonObjectMapper;
25+
import org.springframework.integration.support.json.JsonObjectMapperProvider;
26+
import org.springframework.messaging.Message;
27+
import org.springframework.util.Assert;
28+
29+
/**
30+
* A {@link ChannelMessageStorePreparedStatementSetter} implementation that uses JSON
31+
* serialization for {@link Message} objects instead of Java serialization.
32+
* <p>
33+
* By default, this implementation stores the entire message (including headers and payload) as JSON,
34+
* with type information embedded using Jackson's {@code @class} property for proper deserialization.
35+
* <p>
36+
* <b>IMPORTANT:</b> JSON serialization exposes message content in text format in the database.
37+
* Ensure proper database access controls and encryption for sensitive data.
38+
* Consider the security implications before using this in production with sensitive information.
39+
* <p>
40+
* <b>Database Requirements:</b>
41+
* This implementation requires modifying the MESSAGE_CONTENT column to a text-based type:
42+
* <ul>
43+
* <li>PostgreSQL: Change from {@code BYTEA} to {@code JSONB}</li>
44+
* <li>MySQL: Change from {@code BLOB} to {@code JSON}</li>
45+
* <li>H2: Change from {@code LONGVARBINARY} to {@code CLOB}</li>
46+
* </ul>
47+
* See the reference documentation for schema migration instructions.
48+
* <p>
49+
* <b>Usage Example:</b>
50+
* <pre>{@code
51+
* &#64;Bean
52+
* JdbcChannelMessageStore messageStore(DataSource dataSource) {
53+
* JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
54+
* store.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
55+
*
56+
* // Enable JSON serialization (requires schema modification)
57+
* store.setPreparedStatementSetter(
58+
* new JsonChannelMessageStorePreparedStatementSetter());
59+
* store.setMessageRowMapper(
60+
* new JsonMessageRowMapper("com.example"));
61+
*
62+
* return store;
63+
* }
64+
* }</pre>
65+
*
66+
* @author Yoobin Yoon
67+
* @author Artem Bilan
68+
*
69+
* @since 7.0
70+
*/
71+
public class JsonChannelMessageStorePreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
72+
73+
private final JsonObjectMapper<?, ?> jsonObjectMapper;
74+
75+
/**
76+
* Create a new {@link JsonChannelMessageStorePreparedStatementSetter} with the
77+
* default {@link JsonObjectMapper} configured for Spring Integration message serialization.
78+
* <p>
79+
* This constructor is suitable when serializing standard Spring Integration
80+
* and Java classes. Custom payload types will require their package to be added to the
81+
* corresponding {@link JsonMessageRowMapper}.
82+
*/
83+
public JsonChannelMessageStorePreparedStatementSetter() {
84+
this(JsonObjectMapperProvider.newMessagingAwareInstance());
85+
}
86+
87+
/**
88+
* Create a new {@link JsonChannelMessageStorePreparedStatementSetter} with a
89+
* custom {@link JsonObjectMapper}.
90+
* <p>
91+
* This constructor allows full control over the JSON serialization configuration.
92+
* <p>
93+
* <b>Note:</b> The same JsonObjectMapper configuration should be used in the corresponding
94+
* {@link JsonMessageRowMapper} for consistent serialization and deserialization.
95+
* @param jsonObjectMapper the {@link JsonObjectMapper} to use for JSON serialization
96+
*/
97+
public JsonChannelMessageStorePreparedStatementSetter(JsonObjectMapper<?, ?> jsonObjectMapper) {
98+
super();
99+
Assert.notNull(jsonObjectMapper, "'jsonObjectMapper' must not be null");
100+
this.jsonObjectMapper = jsonObjectMapper;
101+
}
102+
103+
@Override
104+
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
105+
Object groupId, String region, boolean priorityEnabled) throws SQLException {
106+
107+
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
108+
109+
try {
110+
String json = this.jsonObjectMapper.toJson(requestMessage);
111+
112+
String dbProduct = preparedStatement.getConnection().getMetaData().getDatabaseProductName();
113+
114+
if ("PostgreSQL".equalsIgnoreCase(dbProduct)) {
115+
preparedStatement.setObject(6, json, Types.OTHER);
116+
}
117+
else {
118+
preparedStatement.setString(6, json);
119+
}
120+
}
121+
catch (IOException ex) {
122+
throw new SQLException("Failed to serialize message to JSON: " + requestMessage, ex);
123+
}
124+
}
125+
126+
}

0 commit comments

Comments
 (0)