Skip to content

Commit 202e96d

Browse files
committed
Add Jackson JSON serialization for JdbcChannelMessageStore
Enable human-readable JSON storage in JdbcChannelMessageStore as an alternative to binary Java serialization. - Add JacksonChannelMessageStorePreparedStatementSetter for serialization - Add JacksonMessageRowMapper for deserialization - Add protected constructor to MessageRowMapper for extensibility - Add JSON schemas for PostgreSQL (JSONB), MySQL (JSON), and H2 (CLOB) - Add comprehensive test coverage with database-specific tests - Add documentation for JSON serialization support Fixes: gh-9312 Signed-off-by: yybmion <[email protected]>
1 parent 72960e1 commit 202e96d

File tree

16 files changed

+1352
-1
lines changed

16 files changed

+1352
-1
lines changed

build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ ext {
7676
jmsApiVersion = '3.1.0'
7777
jpaApiVersion = '3.2.0'
7878
jrubyVersion = '10.0.2.0'
79+
jsonassertVersion = '1.5.1'
7980
jsonpathVersion = '2.9.0'
8081
junit4Version = '4.13.2'
8182
junitJupiterVersion = '6.0.0'
@@ -691,6 +692,7 @@ project('spring-integration-jdbc') {
691692
dependencies {
692693
api 'org.springframework:spring-jdbc'
693694
optionalApi "org.postgresql:postgresql:$postgresVersion"
695+
optionalApi 'tools.jackson.core:jackson-databind'
694696

695697
testImplementation "com.h2database:h2:$h2Version"
696698
testImplementation "org.hsqldb:hsqldb:$hsqldbVersion"
@@ -702,6 +704,7 @@ project('spring-integration-jdbc') {
702704
testImplementation("org.apache.commons:commons-dbcp2:$commonsDbcp2Version") {
703705
exclude group: 'commons-logging'
704706
}
707+
testImplementation "org.skyscreamer:jsonassert:$jsonassertVersion"
705708
testImplementation 'org.testcontainers:mysql'
706709
testImplementation 'org.testcontainers:postgresql'
707710
testImplementation 'org.testcontainers:oracle-xe'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jdbc.store.channel;
18+
19+
import java.sql.PreparedStatement;
20+
import java.sql.SQLException;
21+
import java.sql.Types;
22+
23+
import tools.jackson.core.JacksonException;
24+
import tools.jackson.databind.ObjectMapper;
25+
26+
import org.springframework.integration.support.json.JacksonMessagingUtils;
27+
import org.springframework.messaging.Message;
28+
import org.springframework.util.Assert;
29+
30+
/**
31+
* A {@link ChannelMessageStorePreparedStatementSetter} implementation that uses Jackson
32+
* to serialize {@link Message} objects to JSON format instead of Java serialization.
33+
* <p>
34+
* This implementation stores the entire message (including headers and payload) as JSON,
35+
* with type information embedded using Jackson's {@code @class} property. This makes
36+
* the stored data human-readable and useful for debugging and troubleshooting.
37+
* <p>
38+
* The {@link ObjectMapper} is configured using {@link JacksonMessagingUtils#messagingAwareMapper(String...)}
39+
* which includes custom serializers/deserializers for Spring Integration message types
40+
* and embeds class type information for secure deserialization.
41+
* <p>
42+
* <b>Usage Example:</b>
43+
* <pre>{@code
44+
* &#64;Bean
45+
* JdbcChannelMessageStore messageStore(DataSource dataSource) {
46+
* JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
47+
* store.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
48+
*
49+
* // Enable JSON serialization
50+
* store.setPreparedStatementSetter(
51+
* new JacksonChannelMessageStorePreparedStatementSetter("com.example"));
52+
* store.setMessageRowMapper(
53+
* new JacksonMessageRowMapper("com.example"));
54+
*
55+
* return store;
56+
* }
57+
* }</pre>
58+
* <p>
59+
* <b>Database Column Type:</b>
60+
* This implementation requires a text-based column type that supports JSON storage:
61+
* <ul>
62+
* <li>PostgreSQL: {@code JSONB} (recommended) or {@code JSON}</li>
63+
* <li>MySQL: {@code JSON} </li>
64+
* <li>H2: {@code CLOB} </li>
65+
* </ul>
66+
* <p>
67+
* <b>Note:</b> The standard Spring Integration schemas use {@code BLOB}/{@code BYTEA} columns
68+
* which are not suitable for human-readable JSON storage. Use the provided JSON-specific
69+
* schemas from {@code schema-*-json.sql} files.
70+
*
71+
* @author Yoobin Yoon
72+
* @since 7.0
73+
*
74+
* @see JacksonMessageRowMapper
75+
* @see JacksonMessagingUtils#messagingAwareMapper(String...)
76+
*/
77+
public class JacksonChannelMessageStorePreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
78+
79+
private final ObjectMapper objectMapper;
80+
81+
/**
82+
* Create a new {@link JacksonChannelMessageStorePreparedStatementSetter} with the
83+
* default trusted packages from {@link JacksonMessagingUtils#DEFAULT_TRUSTED_PACKAGES}.
84+
* <p>
85+
* This constructor is suitable when you only need to serialize/deserialize
86+
* standard Spring Integration and Java classes.
87+
*/
88+
public JacksonChannelMessageStorePreparedStatementSetter() {
89+
this(new String[0]);
90+
}
91+
92+
/**
93+
* Create a new {@link JacksonChannelMessageStorePreparedStatementSetter} with
94+
* additional trusted packages for deserialization.
95+
* <p>
96+
* The provided packages are appended to the default allow-list from
97+
* {@link JacksonMessagingUtils#DEFAULT_TRUSTED_PACKAGES}, enabling deserialization
98+
* of custom payload types.
99+
* <p>
100+
* <b>Package Matching:</b>
101+
* <ul>
102+
* <li>{@code "com.example"} – trusts {@code com.example} and all subpackages
103+
* (e.g., {@code com.example.MyClass}, {@code com.example.sub.AnotherClass})</li>
104+
* <li>{@code "com.example.MyClass"} – trusts only the specific class</li>
105+
* <li>{@code "*"} – trusts all packages (not recommended for production)</li>
106+
* </ul>
107+
* <p>
108+
* <b>Note:</b> Subpackages are automatically included without wildcards.
109+
*
110+
* @param trustedPackages the additional packages to trust for deserialization
111+
*/
112+
public JacksonChannelMessageStorePreparedStatementSetter(String... trustedPackages) {
113+
super(); // Call protected constructor - no serializer needed
114+
this.objectMapper = JacksonMessagingUtils.messagingAwareMapper(trustedPackages);
115+
}
116+
117+
/**
118+
* Create a new {@link JacksonChannelMessageStorePreparedStatementSetter} with
119+
* a custom {@link ObjectMapper}.
120+
* <p>
121+
* This constructor allows full control over the JSON serialization configuration.
122+
* The provided mapper should be configured appropriately for Message serialization,
123+
* typically using {@link JacksonMessagingUtils#messagingAwareMapper(String...)}.
124+
* @param objectMapper the {@link ObjectMapper} to use for JSON serialization
125+
*/
126+
public JacksonChannelMessageStorePreparedStatementSetter(ObjectMapper objectMapper) {
127+
super(); // Call protected constructor - no serializer needed
128+
Assert.notNull(objectMapper, "'objectMapper' must not be null");
129+
this.objectMapper = objectMapper;
130+
}
131+
132+
/**
133+
* Set the prepared statement values, serializing the message to JSON format.
134+
* <p>
135+
* This implementation:
136+
* <ol>
137+
* <li>Calls {@code super.setValues()} to populate common fields (parameters 1-5)</li>
138+
* <li>Serializes the entire {@link Message} (payload and headers) to JSON</li>
139+
* <li>Sets parameter 6 using a database-specific method:
140+
* <ul>
141+
* <li><b>PostgreSQL:</b> {@link PreparedStatement#setObject(int, Object, int)}
142+
* with {@link Types#OTHER} for automatic JSONB/JSON conversion</li>
143+
* <li><b>Other databases:</b> {@link PreparedStatement#setString(int, String)}
144+
* for VARCHAR, CLOB, or JSON columns</li>
145+
* </ul>
146+
* </li>
147+
* </ol>
148+
* <p>
149+
* The resulting JSON includes {@code @class} properties for type information,
150+
* enabling proper deserialization of polymorphic types.
151+
*
152+
* @param preparedStatement the {@link PreparedStatement} to populate
153+
* @param requestMessage the {@link Message} to store
154+
* @param groupId the group identifier
155+
* @param region the region in the target table
156+
* @param priorityEnabled whether message priority should be stored
157+
* @throws SQLException if statement preparation or JSON serialization fails
158+
*/
159+
@Override
160+
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage, Object groupId,
161+
String region, boolean priorityEnabled) throws SQLException {
162+
163+
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
164+
165+
try {
166+
String json = this.objectMapper.writeValueAsString(requestMessage);
167+
168+
String dbProduct = preparedStatement.getConnection().getMetaData().getDatabaseProductName();
169+
170+
if ("PostgreSQL".equalsIgnoreCase(dbProduct)) {
171+
preparedStatement.setObject(6, json, Types.OTHER); // NOSONAR magic number
172+
}
173+
else {
174+
preparedStatement.setString(6, json); // NOSONAR magic number
175+
}
176+
}
177+
catch (JacksonException ex) {
178+
throw new SQLException("Failed to serialize message to JSON. Message ID: " +
179+
requestMessage.getHeaders().getId(), ex);
180+
}
181+
}
182+
183+
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jdbc.store.channel;
18+
19+
import java.sql.ResultSet;
20+
import java.sql.SQLException;
21+
22+
import tools.jackson.core.JacksonException;
23+
import tools.jackson.core.type.TypeReference;
24+
import tools.jackson.databind.ObjectMapper;
25+
26+
import org.springframework.integration.support.json.JacksonMessagingUtils;
27+
import org.springframework.messaging.Message;
28+
import org.springframework.util.Assert;
29+
30+
/**
31+
* A {@link MessageRowMapper} implementation that deserializes {@link Message} objects from
32+
* JSON format stored in the database.
33+
* <p>
34+
* This mapper works in conjunction with {@link JacksonChannelMessageStorePreparedStatementSetter}
35+
* to provide human-readable JSON serialization for Spring Integration's JDBC Channel Message Store.
36+
* <p>
37+
* Unlike the default {@link MessageRowMapper} which uses Java serialization via
38+
* {@link org.springframework.integration.support.converter.AllowListDeserializingConverter},
39+
* this implementation uses Jackson to deserialize JSON strings directly from the MESSAGE_BYTES column.
40+
* <p>
41+
* The {@link ObjectMapper} is configured using {@link JacksonMessagingUtils#messagingAwareMapper(String...)}
42+
* which includes custom deserializers for Spring Integration message types and validates
43+
* class types using an allow-list of trusted packages for security.
44+
* <p>
45+
* <b>Usage Example:</b>
46+
* <pre>{@code
47+
* &#64;Bean
48+
* JdbcChannelMessageStore messageStore(DataSource dataSource) {
49+
* JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
50+
* store.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
51+
*
52+
* // Enable JSON serialization
53+
* store.setPreparedStatementSetter(
54+
* new JacksonChannelMessageStorePreparedStatementSetter("com.example"));
55+
* store.setMessageRowMapper(
56+
* new JacksonMessageRowMapper("com.example"));
57+
*
58+
* return store;
59+
* }
60+
* }</pre>
61+
*
62+
* @author Yoobin Yoon
63+
* @since 7.0
64+
*
65+
* @see JacksonChannelMessageStorePreparedStatementSetter
66+
* @see JacksonMessagingUtils#messagingAwareMapper(String...)
67+
*/
68+
public class JacksonMessageRowMapper extends MessageRowMapper {
69+
70+
private final ObjectMapper objectMapper;
71+
72+
/**
73+
* Create a new {@link JacksonMessageRowMapper} with the default trusted packages
74+
* from {@link JacksonMessagingUtils#DEFAULT_TRUSTED_PACKAGES}.
75+
* <p>
76+
* This constructor is suitable when you only need to deserialize
77+
* standard Spring Integration and Java classes.
78+
*/
79+
public JacksonMessageRowMapper() {
80+
this(new String[0]);
81+
}
82+
83+
/**
84+
* Create a new {@link JacksonMessageRowMapper} with additional trusted packages
85+
* for deserialization.
86+
* <p>
87+
* The provided packages are appended to the default allow-list from
88+
* {@link JacksonMessagingUtils#DEFAULT_TRUSTED_PACKAGES}, enabling deserialization
89+
* of custom payload types.
90+
* <p>
91+
* <b>Package Matching:</b>
92+
* <ul>
93+
* <li>{@code "com.example"} – trusts {@code com.example} and all subpackages
94+
* (e.g., {@code com.example.MyClass}, {@code com.example.sub.AnotherClass})</li>
95+
* <li>{@code "com.example.MyClass"} – trusts only the specific class</li>
96+
* <li>{@code "*"} – trusts all packages (not recommended for production)</li>
97+
* </ul>
98+
* <p>
99+
* <b>Note:</b> Subpackages are automatically included without wildcards.
100+
*
101+
* @param trustedPackages the additional packages to trust for deserialization
102+
*/
103+
public JacksonMessageRowMapper(String... trustedPackages) {
104+
super(); // Call protected constructor - no deserializer needed
105+
this.objectMapper = JacksonMessagingUtils.messagingAwareMapper(trustedPackages);
106+
}
107+
108+
/**
109+
* Create a new {@link JacksonMessageRowMapper} with a custom {@link ObjectMapper}.
110+
* <p>
111+
* This constructor allows full control over the JSON deserialization configuration.
112+
* The provided mapper should be configured appropriately for Message deserialization,
113+
* typically using {@link JacksonMessagingUtils#messagingAwareMapper(String...)}.
114+
* @param objectMapper the {@link ObjectMapper} to use for JSON deserialization
115+
*/
116+
public JacksonMessageRowMapper(ObjectMapper objectMapper) {
117+
super(); // Call protected constructor - no deserializer needed
118+
Assert.notNull(objectMapper, "'objectMapper' must not be null");
119+
this.objectMapper = objectMapper;
120+
}
121+
122+
/**
123+
* Map a row from the result set to a {@link Message} by deserializing JSON.
124+
* <p>
125+
* This implementation:
126+
* <ol>
127+
* <li>Reads the JSON string from the MESSAGE_BYTES column</li>
128+
* <li>Deserializes it to a Message using Jackson</li>
129+
* <li>Validates class types against trusted packages</li>
130+
* </ol>
131+
* The JSON must include {@code @class} properties for proper deserialization
132+
* of polymorphic types and payload classes.
133+
* @param rs the {@link ResultSet} to map
134+
* @param rowNum the current row number
135+
* @return the deserialized {@link Message}
136+
* @throws SQLException if an error occurs reading from the result set or
137+
* if JSON deserialization fails
138+
*/
139+
@Override
140+
public Message<?> mapRow(ResultSet rs, int rowNum) throws SQLException {
141+
try {
142+
String json = rs.getString("MESSAGE_BYTES");
143+
144+
if (json == null) {
145+
throw new SQLException("MESSAGE_BYTES column is null at row " + rowNum);
146+
}
147+
148+
return this.objectMapper.readValue(json, new TypeReference<Message<?>>() {
149+
150+
});
151+
}
152+
catch (JacksonException ex) {
153+
throw new SQLException("Failed to deserialize message from JSON at row " + rowNum +
154+
". Ensure the JSON was created by JacksonChannelMessageStorePreparedStatementSetter "
155+
+
156+
"and contains proper @class type information.", ex);
157+
}
158+
}
159+
160+
}

0 commit comments

Comments
 (0)