From a432da5f505de07d055cb8b31a421921dd922787 Mon Sep 17 00:00:00 2001 From: yybmion Date: Sat, 18 Oct 2025 21:36:06 +0900 Subject: [PATCH] Add Jackson JSON serialization for JdbcChannelMessageStore - Add JacksonChannelMessageStorePreparedStatementSetter for serialization - Add JacksonMessageRowMapper for deserialization with trusted package validation - Support PostgreSQL (JSONB), MySQL (JSON), and H2 (CLOB) databases - Add comprehensive test coverage and documentation Fixes: gh-9312 Signed-off-by: Yoobin Yoon --- build.gradle | 1 + .../jdbc/store/JdbcChannelMessageStore.java | 16 +- ...elMessageStorePreparedStatementSetter.java | 126 ++++++++ .../store/channel/JsonMessageRowMapper.java | 120 ++++++++ .../AbstractJsonChannelMessageStoreTests.java | 272 ++++++++++++++++++ .../H2JsonChannelMessageStoreTests.java | 60 ++++ .../MySqlJsonChannelMessageStoreTests.java | 74 +++++ .../PostgresJsonChannelMessageStoreTests.java | 74 +++++ .../jdbc/store/channel/TestMailMessage.java | 30 ++ .../src/test/resources/schema-h2-json.sql | 17 ++ .../src/test/resources/schema-mysql-json.sql | 14 + .../test/resources/schema-postgres-json.sql | 17 ++ src/reference/antora/modules/ROOT/nav.adoc | 1 + .../ROOT/pages/jdbc/message-store-json.adoc | 268 +++++++++++++++++ .../antora/modules/ROOT/pages/whats-new.adoc | 5 + 15 files changed, 1088 insertions(+), 7 deletions(-) create mode 100644 spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/JsonChannelMessageStorePreparedStatementSetter.java create mode 100644 spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/JsonMessageRowMapper.java create mode 100644 spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/AbstractJsonChannelMessageStoreTests.java create mode 100644 spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/H2JsonChannelMessageStoreTests.java create mode 100644 spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlJsonChannelMessageStoreTests.java create mode 100644 spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJsonChannelMessageStoreTests.java create mode 100644 spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/TestMailMessage.java create mode 100644 spring-integration-jdbc/src/test/resources/schema-h2-json.sql create mode 100644 spring-integration-jdbc/src/test/resources/schema-mysql-json.sql create mode 100644 spring-integration-jdbc/src/test/resources/schema-postgres-json.sql create mode 100644 src/reference/antora/modules/ROOT/pages/jdbc/message-store-json.adoc diff --git a/build.gradle b/build.gradle index 9ef25ce3cae..5a7ff96af82 100644 --- a/build.gradle +++ b/build.gradle @@ -679,6 +679,7 @@ project('spring-integration-jdbc') { dependencies { api 'org.springframework:spring-jdbc' optionalApi "org.postgresql:postgresql:$postgresVersion" + optionalApi 'tools.jackson.core:jackson-databind' testImplementation "com.h2database:h2:$h2Version" testImplementation "org.hsqldb:hsqldb:$hsqldbVersion" diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java index a6b1eeebbaa..26c52f1059c 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java @@ -52,6 +52,7 @@ import org.springframework.integration.support.converter.AllowListDeserializingConverter; import org.springframework.integration.util.UUIDConverter; import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.jmx.export.annotation.ManagedAttribute; @@ -89,6 +90,7 @@ * @author Trung Pham * @author Johannes Edmeier * @author Ngoc Nhan + * @author Yoobin Yoon * * @since 2.2 */ @@ -148,7 +150,7 @@ private enum Query { private SerializingConverter serializer; @SuppressWarnings("NullAway.Init") - private MessageRowMapper messageRowMapper; + private RowMapper> messageRowMapper; @SuppressWarnings("NullAway.Init") private ChannelMessageStorePreparedStatementSetter preparedStatementSetter; @@ -232,13 +234,13 @@ public void setJdbcTemplate(JdbcTemplate jdbcTemplate) { } /** - * Allow for passing in a custom {@link MessageRowMapper}. The {@link MessageRowMapper} - * is used to convert the selected database row representing the persisted - * message into the actual {@link Message} object. + * Allow for passing in a custom {@link RowMapper} for {@link Message}. + * The {@link RowMapper} is used to convert the selected database row + * representing the persisted message into the actual {@link Message} object. * @param messageRowMapper Must not be null */ - public void setMessageRowMapper(MessageRowMapper messageRowMapper) { - Assert.notNull(messageRowMapper, "The provided MessageRowMapper must not be null."); + public void setMessageRowMapper(RowMapper> messageRowMapper) { + Assert.notNull(messageRowMapper, "The provided RowMapper must not be null."); this.messageRowMapper = messageRowMapper; } @@ -388,7 +390,7 @@ protected MessageGroupFactory getMessageGroupFactory() { * Check mandatory properties ({@link DataSource} and * {@link #setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider)}). If no {@link MessageRowMapper} * and {@link ChannelMessageStorePreparedStatementSetter} was explicitly set using - * {@link #setMessageRowMapper(MessageRowMapper)} and + * {@link #setMessageRowMapper(RowMapper)} and * {@link #setPreparedStatementSetter(ChannelMessageStorePreparedStatementSetter)} respectively, the default * {@link MessageRowMapper} and {@link ChannelMessageStorePreparedStatementSetter} will be instantiated using the * specified {@link #deserializer}. diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/JsonChannelMessageStorePreparedStatementSetter.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/JsonChannelMessageStorePreparedStatementSetter.java new file mode 100644 index 00000000000..24e57a5b948 --- /dev/null +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/JsonChannelMessageStorePreparedStatementSetter.java @@ -0,0 +1,126 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.store.channel; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; + +import org.springframework.integration.support.json.JacksonJsonObjectMapper; +import org.springframework.integration.support.json.JacksonMessagingUtils; +import org.springframework.integration.support.json.JsonObjectMapper; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + +/** + * A {@link ChannelMessageStorePreparedStatementSetter} implementation that uses JSON + * serialization for {@link Message} objects instead of Java serialization. + *

+ * By default, this implementation stores the entire message (including headers and payload) as JSON, + * with type information embedded using Jackson's {@code @class} property for proper deserialization. + *

+ * IMPORTANT: JSON serialization exposes message content in text format in the database. + * Ensure proper database access controls and encryption for sensitive data. + * Consider the security implications before using this in production with sensitive information. + *

+ * Database Requirements: + * This implementation requires modifying the MESSAGE_CONTENT column to a text-based type: + *

    + *
  • PostgreSQL: Change from {@code BYTEA} to {@code JSONB}
  • + *
  • MySQL: Change from {@code BLOB} to {@code JSON}
  • + *
  • H2: Change from {@code LONGVARBINARY} to {@code CLOB}
  • + *
+ * See the reference documentation for schema migration instructions. + *

+ * Usage Example: + *

{@code
+ * @Bean
+ * JdbcChannelMessageStore messageStore(DataSource dataSource) {
+ *     JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
+ *     store.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
+ *
+ *     // Enable JSON serialization (requires schema modification)
+ *     store.setPreparedStatementSetter(
+ *         new JsonChannelMessageStorePreparedStatementSetter());
+ *     store.setMessageRowMapper(
+ *         new JsonMessageRowMapper("com.example"));
+ *
+ *     return store;
+ * }
+ * }
+ * + * @author Yoobin Yoon + * + * @since 7.0 + */ +public class JsonChannelMessageStorePreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter { + + private final JsonObjectMapper jsonObjectMapper; + + /** + * Create a new {@link JsonChannelMessageStorePreparedStatementSetter} with the + * default {@link JsonObjectMapper} configured for Spring Integration message serialization. + *

+ * This constructor is suitable when serializing standard Spring Integration + * and Java classes. Custom payload types will require their package to be added to the + * corresponding {@link JsonMessageRowMapper}. + */ + public JsonChannelMessageStorePreparedStatementSetter() { + this(new JacksonJsonObjectMapper(JacksonMessagingUtils.messagingAwareMapper())); + } + + /** + * Create a new {@link JsonChannelMessageStorePreparedStatementSetter} with a + * custom {@link JsonObjectMapper}. + *

+ * This constructor allows full control over the JSON serialization configuration. + *

+ * Note: The same JsonObjectMapper configuration should be used in the corresponding + * {@link JsonMessageRowMapper} for consistent serialization and deserialization. + * @param jsonObjectMapper the {@link JsonObjectMapper} to use for JSON serialization + */ + public JsonChannelMessageStorePreparedStatementSetter(JsonObjectMapper jsonObjectMapper) { + super(); + Assert.notNull(jsonObjectMapper, "'jsonObjectMapper' must not be null"); + this.jsonObjectMapper = jsonObjectMapper; + } + + @Override + public void setValues(PreparedStatement preparedStatement, Message requestMessage, + Object groupId, String region, boolean priorityEnabled) throws SQLException { + + super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled); + + try { + String json = this.jsonObjectMapper.toJson(requestMessage); + + String dbProduct = preparedStatement.getConnection().getMetaData().getDatabaseProductName(); + + if ("PostgreSQL".equalsIgnoreCase(dbProduct)) { + preparedStatement.setObject(6, json, Types.OTHER); // NOSONAR magic number + } + else { + preparedStatement.setString(6, json); // NOSONAR magic number + } + } + catch (IOException ex) { + throw new SQLException("Failed to serialize message to JSON: " + requestMessage, ex); + } + } + +} diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/JsonMessageRowMapper.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/JsonMessageRowMapper.java new file mode 100644 index 00000000000..03fa106331e --- /dev/null +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/JsonMessageRowMapper.java @@ -0,0 +1,120 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.store.channel; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.jspecify.annotations.Nullable; + +import org.springframework.integration.support.json.JacksonJsonObjectMapper; +import org.springframework.integration.support.json.JacksonMessagingUtils; +import org.springframework.integration.support.json.JsonObjectMapper; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + +/** + * A {@link RowMapper} implementation that deserializes {@link Message} objects from + * JSON format stored in the database. + *

+ * This mapper works in conjunction with {@link JsonChannelMessageStorePreparedStatementSetter} + * to provide JSON serialization for Spring Integration's JDBC Channel Message Store. + *

+ * Unlike the default {@link MessageRowMapper} which uses Java serialization, + * this implementation uses JSON to deserialize message strings from the MESSAGE_CONTENT column. + *

+ * The underlying {@link JsonObjectMapper} validates all deserialized classes against a + * trusted package list to prevent security vulnerabilities. + *

+ * Usage Example: + *

{@code
+ * @Bean
+ * JdbcChannelMessageStore messageStore(DataSource dataSource) {
+ *     JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
+ *     store.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
+ *
+ *     // Enable JSON serialization
+ *     store.setPreparedStatementSetter(
+ *         new JsonChannelMessageStorePreparedStatementSetter());
+ *     store.setMessageRowMapper(
+ *         new JsonMessageRowMapper("com.example"));
+ *
+ *     return store;
+ * }
+ * }
+ * + * @author Yoobin Yoon + * + * @since 7.0 + */ +public class JsonMessageRowMapper implements RowMapper> { + + private final JsonObjectMapper jsonObjectMapper; + + /** + * Create a new {@link JsonMessageRowMapper} with additional trusted packages + * for deserialization. + *

+ * The provided packages are appended to the default trusted packages, + * enabling deserialization of custom payload types while maintaining security. + * If no packages are provided, only the default trusted packages are used. + * @param trustedPackages the additional packages to trust for deserialization. + * Can be {@code null} or empty to use only default packages + */ + public JsonMessageRowMapper(String @Nullable ... trustedPackages) { + this(new JacksonJsonObjectMapper( + JacksonMessagingUtils.messagingAwareMapper(trustedPackages))); + } + + /** + * Create a new {@link JsonMessageRowMapper} with a custom {@link JsonObjectMapper}. + *

+ * This constructor allows full control over the JSON deserialization configuration. + *

+ * Note: The same JsonObjectMapper configuration should be used in the corresponding + * {@link JsonChannelMessageStorePreparedStatementSetter} for consistent + * serialization and deserialization. + * @param jsonObjectMapper the {@link JsonObjectMapper} to use for JSON deserialization + */ + public JsonMessageRowMapper(JsonObjectMapper jsonObjectMapper) { + Assert.notNull(jsonObjectMapper, "'jsonObjectMapper' must not be null"); + this.jsonObjectMapper = jsonObjectMapper; + } + + @Override + @SuppressWarnings("NullAway") + public Message mapRow(ResultSet rs, int rowNum) throws SQLException { + try { + String json = rs.getString("MESSAGE_CONTENT"); + + if (json == null) { + throw new SQLException("MESSAGE_CONTENT column is null at row " + rowNum); + } + + return this.jsonObjectMapper.fromJson(json, Message.class); + } + catch (IOException ex) { + throw new SQLException( + "Failed to deserialize message from JSON at row " + rowNum + ". " + + "Ensure the JSON and the configured JsonObjectMapper use compatible type handling.", + ex); + } + } + +} diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/AbstractJsonChannelMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/AbstractJsonChannelMessageStoreTests.java new file mode 100644 index 00000000000..d054ecd7ff5 --- /dev/null +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/AbstractJsonChannelMessageStoreTests.java @@ -0,0 +1,272 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.store.channel; + +import java.util.UUID; + +import javax.sql.DataSource; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import tools.jackson.databind.JsonNode; +import tools.jackson.databind.ObjectMapper; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.jdbc.store.JdbcChannelMessageStore; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.messaging.Message; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.annotation.Isolation; +import org.springframework.transaction.support.TransactionTemplate; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Yoobin Yoon + * + * @since 7.0 + */ +@SpringJUnitConfig +@DirtiesContext +public abstract class AbstractJsonChannelMessageStoreTests { + + protected static final String TEST_MESSAGE_GROUP = "AbstractJsonChannelMessageStoreTests"; + + protected static final String REGION = "AbstractJsonChannelMessageStoreTests"; + + @Autowired + protected DataSource dataSource; + + protected JdbcChannelMessageStore messageStore; + + @Autowired + protected PlatformTransactionManager transactionManager; + + @Autowired + protected ChannelMessageStoreQueryProvider queryProvider; + + @BeforeEach + public void init() { + this.messageStore = new JdbcChannelMessageStore(dataSource); + this.messageStore.setRegion(REGION); + this.messageStore.setTablePrefix("JSON_"); + this.messageStore.setChannelMessageStoreQueryProvider(queryProvider); + this.messageStore.setPreparedStatementSetter( + new JsonChannelMessageStorePreparedStatementSetter()); + this.messageStore.setMessageRowMapper( + new JsonMessageRowMapper(getTrustedPackages())); + this.messageStore.afterPropertiesSet(); + this.messageStore.removeMessageGroup(TEST_MESSAGE_GROUP); + } + + protected String[] getTrustedPackages() { + return new String[] { + "org.springframework.integration.jdbc.store.channel" + }; + } + + @Test + public void testGetNonExistentMessageFromGroup() { + Message result = this.messageStore.pollMessageFromGroup(TEST_MESSAGE_GROUP); + assertThat(result).isNull(); + } + + @Test + public void testSimpleStringPayload() { + Message message = MessageBuilder.withPayload("Hello JSON World") + .setHeader("customHeader", "customValue") + .build(); + + TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager); + transactionTemplate.setIsolationLevel(Isolation.READ_COMMITTED.value()); + transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); + + transactionTemplate.executeWithoutResult((status) -> + this.messageStore.addMessageToGroup(TEST_MESSAGE_GROUP, message)); + + Message retrievedMessage = this.messageStore.pollMessageFromGroup(TEST_MESSAGE_GROUP); + + assertThat(retrievedMessage).isNotNull(); + assertThat(retrievedMessage.getPayload()).isEqualTo("Hello JSON World"); + assertThat(retrievedMessage.getHeaders().getId()).isEqualTo(message.getHeaders().getId()); + assertThat(retrievedMessage.getHeaders().get("customHeader")).isEqualTo("customValue"); + } + + @Test + public void testCustomPayloadSerialization() { + TestMailMessage payload = new TestMailMessage( + "Order Confirmation", + "Your order has been confirmed.", + "customer@example.com" + ); + + Message message = MessageBuilder.withPayload(payload) + .setHeader("orderId", "ORDER-12345") + .build(); + + TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager); + transactionTemplate.executeWithoutResult((status) -> + this.messageStore.addMessageToGroup(TEST_MESSAGE_GROUP, message)); + + Message retrievedMessage = this.messageStore.pollMessageFromGroup(TEST_MESSAGE_GROUP); + + assertThat(retrievedMessage).isNotNull(); + assertThat(retrievedMessage.getPayload()).isInstanceOf(TestMailMessage.class); + + TestMailMessage retrievedPayload = (TestMailMessage) retrievedMessage.getPayload(); + assertThat(retrievedPayload).isEqualTo(payload); + assertThat(retrievedPayload.subject()).isEqualTo("Order Confirmation"); + assertThat(retrievedPayload.body()).isEqualTo("Your order has been confirmed."); + assertThat(retrievedPayload.to()).isEqualTo("customer@example.com"); + assertThat(retrievedMessage.getHeaders().get("orderId")).isEqualTo("ORDER-12345"); + } + + @Test + public void testMessageHeadersPreserved() { + Message message = MessageBuilder.withPayload("test") + .setHeader("stringHeader", "value") + .setHeader("intHeader", 42) + .setHeader("longHeader", 12345L) + .setHeader("booleanHeader", true) + .build(); + + TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager); + transactionTemplate.executeWithoutResult((status) -> + this.messageStore.addMessageToGroup(TEST_MESSAGE_GROUP, message)); + + Message retrievedMessage = this.messageStore.pollMessageFromGroup(TEST_MESSAGE_GROUP); + + assertThat(retrievedMessage).isNotNull(); + assertThat(retrievedMessage.getHeaders().get("stringHeader")).isEqualTo("value"); + assertThat(retrievedMessage.getHeaders().get("intHeader")).isEqualTo(42); + assertThat(retrievedMessage.getHeaders().get("longHeader")).isEqualTo(12345L); + assertThat(retrievedMessage.getHeaders().get("booleanHeader")).isEqualTo(true); + } + + @Test + public void testJsonStructureInDatabase() { + TestMailMessage payload = new TestMailMessage( + "Test Subject", + "Test Body", + "test@example.com" + ); + Message message = MessageBuilder.withPayload(payload).build(); + + UUID messageId = message.getHeaders().getId(); + + TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager); + transactionTemplate.executeWithoutResult((status) -> + this.messageStore.addMessageToGroup(TEST_MESSAGE_GROUP, message)); + + JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); + String storedJson = jdbcTemplate.queryForObject( + "SELECT MESSAGE_CONTENT FROM JSON_CHANNEL_MESSAGE WHERE MESSAGE_ID = ? AND REGION = ?", + String.class, + messageId.toString(), + REGION + ); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNode = mapper.readTree(storedJson); + + assertThat(jsonNode.has("@class")).isTrue(); + assertThat(jsonNode.get("@class").asString()) + .isEqualTo("org.springframework.messaging.support.GenericMessage"); + + JsonNode payloadNode = jsonNode.get("payload"); + assertThat(payloadNode.get("@class").asString()) + .isEqualTo("org.springframework.integration.jdbc.store.channel.TestMailMessage"); + assertThat(payloadNode.get("subject").asString()).isEqualTo("Test Subject"); + assertThat(payloadNode.get("body").asString()).isEqualTo("Test Body"); + assertThat(payloadNode.get("to").asString()).isEqualTo("test@example.com"); + + JsonNode headersNode = jsonNode.get("headers"); + assertThat(headersNode.has("id")).isTrue(); + assertThat(headersNode.has("timestamp")).isTrue(); + } + + @Test + public void testMultipleMessagesInGroup() { + Message message1 = MessageBuilder.withPayload("Message 1").build(); + Message message2 = MessageBuilder.withPayload("Message 2").build(); + Message message3 = MessageBuilder.withPayload("Message 3").build(); + + TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager); + transactionTemplate.executeWithoutResult((status) -> { + this.messageStore.addMessageToGroup(TEST_MESSAGE_GROUP, message1); + this.messageStore.addMessageToGroup(TEST_MESSAGE_GROUP, message2); + this.messageStore.addMessageToGroup(TEST_MESSAGE_GROUP, message3); + }); + + Message retrieved1 = this.messageStore.pollMessageFromGroup(TEST_MESSAGE_GROUP); + Message retrieved2 = this.messageStore.pollMessageFromGroup(TEST_MESSAGE_GROUP); + Message retrieved3 = this.messageStore.pollMessageFromGroup(TEST_MESSAGE_GROUP); + + assertThat(retrieved1.getPayload()).isEqualTo("Message 1"); + assertThat(retrieved2.getPayload()).isEqualTo("Message 2"); + assertThat(retrieved3.getPayload()).isEqualTo("Message 3"); + } + + @Test + public void testAddAndGetWithDifferentRegion() { + Message message = MessageBuilder.withPayload("foo").build(); + + TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager); + transactionTemplate.executeWithoutResult((status) -> + this.messageStore.addMessageToGroup(TEST_MESSAGE_GROUP, message)); + + this.messageStore.setRegion("DIFFERENT_REGION"); + this.messageStore.afterPropertiesSet(); + + Message result = this.messageStore.pollMessageFromGroup(TEST_MESSAGE_GROUP); + assertThat(result).isNull(); + + this.messageStore.setRegion(REGION); + this.messageStore.afterPropertiesSet(); + } + + @Test + public void testJsonContainsAllMessageHeaders() { + Message message = MessageBuilder.withPayload("test") + .setHeader("header1", "value1") + .setHeader("header2", 123) + .build(); + + TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager); + transactionTemplate.executeWithoutResult((status) -> + this.messageStore.addMessageToGroup(TEST_MESSAGE_GROUP, message)); + + JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); + String storedJson = jdbcTemplate.queryForObject( + "SELECT MESSAGE_CONTENT FROM JSON_CHANNEL_MESSAGE WHERE MESSAGE_ID = ?", + String.class, + message.getHeaders().getId().toString() + ); + + assertThat(storedJson).contains("\"header1\""); + assertThat(storedJson).contains("\"value1\""); + assertThat(storedJson).contains("\"header2\""); + assertThat(storedJson).contains("123"); + assertThat(storedJson).contains("\"id\""); + assertThat(storedJson).contains("\"timestamp\""); + } + +} diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/H2JsonChannelMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/H2JsonChannelMessageStoreTests.java new file mode 100644 index 00000000000..de9170b763b --- /dev/null +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/H2JsonChannelMessageStoreTests.java @@ -0,0 +1,60 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.store.channel; + +import javax.sql.DataSource; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.transaction.PlatformTransactionManager; + +/** + * @author Yoobin Yoon + * + * @since 7.0 + */ +@ContextConfiguration +public class H2JsonChannelMessageStoreTests extends AbstractJsonChannelMessageStoreTests { + + @Configuration + static class Config { + + @Bean + DataSource dataSource() { + return new EmbeddedDatabaseBuilder() + .setType(EmbeddedDatabaseType.H2) + .addScript("classpath:schema-h2-json.sql") + .build(); + } + + @Bean + PlatformTransactionManager transactionManager(DataSource dataSource) { + return new DataSourceTransactionManager(dataSource); + } + + @Bean + H2ChannelMessageStoreQueryProvider queryProvider() { + return new H2ChannelMessageStoreQueryProvider(); + } + + } + +} diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlJsonChannelMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlJsonChannelMessageStoreTests.java new file mode 100644 index 00000000000..70e52f62ca4 --- /dev/null +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/MySqlJsonChannelMessageStoreTests.java @@ -0,0 +1,74 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.store.channel; + +import javax.sql.DataSource; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.Resource; +import org.springframework.integration.jdbc.mysql.MySqlContainerTest; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.init.DataSourceInitializer; +import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.transaction.PlatformTransactionManager; + +/** + * @author Yoobin Yoon + * + * @since 7.0 + */ +@ContextConfiguration +public class MySqlJsonChannelMessageStoreTests extends AbstractJsonChannelMessageStoreTests + implements MySqlContainerTest { + + @Configuration + static class Config { + + @Value("schema-mysql-json.sql") + Resource createSchemaScript; + + @Bean + DataSource dataSource() { + return MySqlContainerTest.dataSource(); + } + + @Bean + PlatformTransactionManager transactionManager() { + return new DataSourceTransactionManager(dataSource()); + } + + @Bean + MySqlChannelMessageStoreQueryProvider queryProvider() { + return new MySqlChannelMessageStoreQueryProvider(); + } + + @Bean + DataSourceInitializer dataSourceInitializer() { + DataSourceInitializer dataSourceInitializer = new DataSourceInitializer(); + dataSourceInitializer.setDataSource(dataSource()); + dataSourceInitializer.setDatabasePopulator( + new ResourceDatabasePopulator(this.createSchemaScript)); + return dataSourceInitializer; + } + + } + +} + diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJsonChannelMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJsonChannelMessageStoreTests.java new file mode 100644 index 00000000000..2c61486959e --- /dev/null +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJsonChannelMessageStoreTests.java @@ -0,0 +1,74 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.store.channel; + +import javax.sql.DataSource; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.Resource; +import org.springframework.integration.jdbc.postgres.PostgresContainerTest; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.init.DataSourceInitializer; +import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.transaction.PlatformTransactionManager; + +/** + * @author Yoobin Yoon + * + * @since 7.0 + */ +@ContextConfiguration +public class PostgresJsonChannelMessageStoreTests extends AbstractJsonChannelMessageStoreTests + implements PostgresContainerTest { + + @Configuration + static class Config { + + @Value("schema-postgres-json.sql") + Resource createSchemaScript; + + @Bean + DataSource dataSource() { + return PostgresContainerTest.dataSource(); + } + + @Bean + PlatformTransactionManager transactionManager() { + return new DataSourceTransactionManager(dataSource()); + } + + @Bean + PostgresChannelMessageStoreQueryProvider queryProvider() { + return new PostgresChannelMessageStoreQueryProvider(); + } + + @Bean + DataSourceInitializer dataSourceInitializer() { + DataSourceInitializer dataSourceInitializer = new DataSourceInitializer(); + dataSourceInitializer.setDataSource(dataSource()); + dataSourceInitializer.setDatabasePopulator( + new ResourceDatabasePopulator(this.createSchemaScript)); + return dataSourceInitializer; + } + + } + +} + diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/TestMailMessage.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/TestMailMessage.java new file mode 100644 index 00000000000..3ca0c598b27 --- /dev/null +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/TestMailMessage.java @@ -0,0 +1,30 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.store.channel; + +/** + * Test payload class for Jackson JSON serialization tests. + * + * @author Yoobin Yoon + */ +public record TestMailMessage( + String subject, + String body, + String to +) { + +} diff --git a/spring-integration-jdbc/src/test/resources/schema-h2-json.sql b/spring-integration-jdbc/src/test/resources/schema-h2-json.sql new file mode 100644 index 00000000000..6603dbf1bb1 --- /dev/null +++ b/spring-integration-jdbc/src/test/resources/schema-h2-json.sql @@ -0,0 +1,17 @@ +DROP TABLE IF EXISTS JSON_CHANNEL_MESSAGE; + +DROP SEQUENCE IF EXISTS JSON_MESSAGE_SEQ; +CREATE SEQUENCE JSON_MESSAGE_SEQ START WITH 1 INCREMENT BY 1; + +CREATE TABLE JSON_CHANNEL_MESSAGE ( + MESSAGE_ID CHAR(36) NOT NULL, + GROUP_KEY CHAR(36) NOT NULL, + CREATED_DATE BIGINT NOT NULL, + MESSAGE_PRIORITY BIGINT, + MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT NEXT VALUE FOR JSON_MESSAGE_SEQ, + MESSAGE_CONTENT CLOB, + REGION VARCHAR(100) NOT NULL, + constraint JSON_CHANNEL_MESSAGE_PK primary key (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE) +); + +CREATE INDEX JSON_CHANNEL_MSG_DELETE_IDX ON JSON_CHANNEL_MESSAGE (REGION, GROUP_KEY, MESSAGE_ID); diff --git a/spring-integration-jdbc/src/test/resources/schema-mysql-json.sql b/spring-integration-jdbc/src/test/resources/schema-mysql-json.sql new file mode 100644 index 00000000000..2ec7d45be4f --- /dev/null +++ b/spring-integration-jdbc/src/test/resources/schema-mysql-json.sql @@ -0,0 +1,14 @@ +DROP TABLE IF EXISTS JSON_CHANNEL_MESSAGE; + +CREATE TABLE JSON_CHANNEL_MESSAGE ( + MESSAGE_ID CHAR(36) NOT NULL, + GROUP_KEY CHAR(36) NOT NULL, + CREATED_DATE BIGINT NOT NULL, + MESSAGE_PRIORITY BIGINT, + MESSAGE_SEQUENCE BIGINT NOT NULL AUTO_INCREMENT UNIQUE, + MESSAGE_CONTENT JSON, + REGION VARCHAR(100) NOT NULL, + constraint JSON_CHANNEL_MESSAGE_PK primary key (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE) +) ENGINE=InnoDB; + +CREATE INDEX JSON_CHANNEL_MSG_DELETE_IDX ON JSON_CHANNEL_MESSAGE (REGION, GROUP_KEY, MESSAGE_ID); diff --git a/spring-integration-jdbc/src/test/resources/schema-postgres-json.sql b/spring-integration-jdbc/src/test/resources/schema-postgres-json.sql new file mode 100644 index 00000000000..32a40ccedb9 --- /dev/null +++ b/spring-integration-jdbc/src/test/resources/schema-postgres-json.sql @@ -0,0 +1,17 @@ +DROP TABLE IF EXISTS JSON_CHANNEL_MESSAGE CASCADE; +DROP SEQUENCE IF EXISTS JSON_MESSAGE_SEQ; + +CREATE SEQUENCE JSON_MESSAGE_SEQ START WITH 1 INCREMENT BY 1 NO CYCLE; + +CREATE TABLE JSON_CHANNEL_MESSAGE ( + MESSAGE_ID CHAR(36) NOT NULL, + GROUP_KEY CHAR(36) NOT NULL, + CREATED_DATE BIGINT NOT NULL, + MESSAGE_PRIORITY BIGINT, + MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT nextval('JSON_MESSAGE_SEQ'), + MESSAGE_CONTENT JSONB, + REGION VARCHAR(100) NOT NULL, + constraint JSON_CHANNEL_MESSAGE_PK primary key (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE) +); + +CREATE INDEX JSON_CHANNEL_MSG_DELETE_IDX ON JSON_CHANNEL_MESSAGE (REGION, GROUP_KEY, MESSAGE_ID); diff --git a/src/reference/antora/modules/ROOT/nav.adoc b/src/reference/antora/modules/ROOT/nav.adoc index 59f4379d030..292a8fe2b13 100644 --- a/src/reference/antora/modules/ROOT/nav.adoc +++ b/src/reference/antora/modules/ROOT/nav.adoc @@ -184,6 +184,7 @@ ** xref:jdbc/outbound-channel-adapter.adoc[] ** xref:jdbc/outbound-gateway.adoc[] ** xref:jdbc/message-store.adoc[] +** xref:jdbc/message-store-json.adoc[] ** xref:jdbc/stored-procedures.adoc[] ** xref:jdbc/dsl.adoc[] ** xref:jdbc/lock-registry.adoc[] diff --git a/src/reference/antora/modules/ROOT/pages/jdbc/message-store-json.adoc b/src/reference/antora/modules/ROOT/pages/jdbc/message-store-json.adoc new file mode 100644 index 00000000000..e126fc6037e --- /dev/null +++ b/src/reference/antora/modules/ROOT/pages/jdbc/message-store-json.adoc @@ -0,0 +1,268 @@ +[[jdbc-channel-message-store-json]] += JDBC Channel Message Store JSON Serialization + +Version 7.0 introduced JSON serialization support for `JdbcChannelMessageStore`. +By default, Spring Integration uses Java serialization to store messages in the database. +The new JSON serialization option provides an alternative serialization mechanism. + +[IMPORTANT] +==== +**Security Consideration:** JSON serialization stores message content as text in the database, which may expose sensitive data. +Ensure proper database access controls, encryption at rest, and consider your organization's data protection requirements before using JSON serialization in production environments. +==== + +== Configuration + +Two components are available for JSON (de)serialization: + +* `JsonChannelMessageStorePreparedStatementSetter` - Serializes messages to JSON +* `JsonMessageRowMapper` - Deserializes messages from JSON + +[source,java] +---- +@Bean +public JdbcChannelMessageStore messageStore(DataSource dataSource) { + JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource); + store.setChannelMessageStoreQueryProvider( + new PostgresChannelMessageStoreQueryProvider()); + + // Enable JSON serialization + store.setPreparedStatementSetter( + new JsonChannelMessageStorePreparedStatementSetter()); + store.setMessageRowMapper( + new JsonMessageRowMapper("com.example")); + + return store; +} +---- + +The string parameter (`"com.example"`) specifies additional trusted packages for deserialization. +These packages are appended to the default trusted packages (see <> section). +Only classes from trusted packages can be deserialized for security. + +== Database Schema Modification + +[IMPORTANT] +==== +JSON serialization **requires modifying the database schema**. +The default schema with BLOB/BYTEA column types cannot be used for JSON serialization. +==== + +The `MESSAGE_CONTENT` column must be changed to a text-based type that can store JSON. + +[tabs] +====== +PostgreSQL:: ++ +For PostgreSQL, the `JSONB` type can be used. ++ +[source,sql] +---- +-- JSONB (enables JSON queries) +ALTER TABLE INT_CHANNEL_MESSAGE +ALTER COLUMN MESSAGE_CONTENT TYPE JSONB +USING MESSAGE_CONTENT::text::jsonb; +---- + +MySQL:: ++ +For MySQL, the `JSON` type can be used. ++ +[source,sql] +---- +-- JSON type (enables JSON functions) +ALTER TABLE INT_CHANNEL_MESSAGE +MODIFY COLUMN MESSAGE_CONTENT JSON; +---- + +H2:: ++ +For H2 database, the `CLOB` type can be used. ++ +[source,sql] +---- +ALTER TABLE INT_CHANNEL_MESSAGE +ALTER COLUMN MESSAGE_CONTENT CLOB; +---- + +Other Databases:: ++ +For any database that supports large text columns (CLOB, TEXT, etc.), the `MESSAGE_CONTENT` column can be modified to an appropriate text type. +====== + +== Example Schema for JSON Serialization + +The following examples demonstrate how to create a dedicated table for JSON-based message storage. + +[tabs] +====== +PostgreSQL:: ++ +[source,sql] +---- +CREATE TABLE JSON_CHANNEL_MESSAGE ( + MESSAGE_ID CHAR(36) NOT NULL, + GROUP_KEY CHAR(36) NOT NULL, + CREATED_DATE BIGINT NOT NULL, + MESSAGE_PRIORITY BIGINT, + MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT nextval('JSON_MESSAGE_SEQ'), + MESSAGE_CONTENT JSONB, -- JSON message content + REGION VARCHAR(100) NOT NULL, + CONSTRAINT JSON_CHANNEL_MESSAGE_PK + PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE) +); +---- + +MySQL:: ++ +[source,sql] +---- +CREATE TABLE JSON_CHANNEL_MESSAGE ( + MESSAGE_ID CHAR(36) NOT NULL, + GROUP_KEY CHAR(36) NOT NULL, + CREATED_DATE BIGINT NOT NULL, + MESSAGE_PRIORITY BIGINT, + MESSAGE_SEQUENCE BIGINT NOT NULL AUTO_INCREMENT UNIQUE, + MESSAGE_CONTENT JSON, -- JSON message content + REGION VARCHAR(100) NOT NULL, + CONSTRAINT JSON_CHANNEL_MESSAGE_PK + PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE) +); +---- + +H2:: ++ +[source,sql] +---- +CREATE TABLE JSON_CHANNEL_MESSAGE ( + MESSAGE_ID CHAR(36) NOT NULL, + GROUP_KEY CHAR(36) NOT NULL, + CREATED_DATE BIGINT NOT NULL, + MESSAGE_PRIORITY BIGINT, + MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT NEXT VALUE FOR JSON_MESSAGE_SEQ, + MESSAGE_CONTENT CLOB, -- JSON message content + REGION VARCHAR(100) NOT NULL, + CONSTRAINT JSON_CHANNEL_MESSAGE_PK + PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE) +); +---- +====== + +== JSON Structure + +When using Jackson-based serialization (the default), messages are stored with the following JSON structure using Jackson's polymorphic type handling: + +[source,json] +---- +{ + "@class": "org.springframework.messaging.support.GenericMessage", + "payload": { + "@class": "com.example.OrderMessage", + "orderId": "ORDER-12345", + "amount": 1299.99 + }, + "headers": { + "@class": "java.util.HashMap", + "priority": ["java.lang.String", "HIGH"], + "id": ["java.util.UUID", "a1b2c3d4-..."], + "timestamp": ["java.lang.Long", 1234567890] + } +} +---- + +The `@class` properties provide type information necessary for proper deserialization of polymorphic types. + +== Querying JSON Content (Optional) + +If native JSON column types (PostgreSQL JSONB or MySQL JSON) are used, message content can be queried directly. + +=== PostgreSQL JSONB Queries + +[source,sql] +---- +-- Find messages by payload field +SELECT * FROM JSON_CHANNEL_MESSAGE +WHERE MESSAGE_CONTENT @> '{"payload": {"orderId": "ORDER-12345"}}'; + +-- Find high-priority messages +SELECT * FROM JSON_CHANNEL_MESSAGE +WHERE MESSAGE_CONTENT -> 'headers' @> '{"priority": ["java.lang.String", "HIGH"]}'; +---- + +=== MySQL JSON Functions + +[source,sql] +---- +-- Find messages by payload field +SELECT * FROM JSON_CHANNEL_MESSAGE +WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.payload.orderId') = 'ORDER-12345'; + +-- Find high-priority messages +SELECT * FROM JSON_CHANNEL_MESSAGE +WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.headers.priority[1]') = 'HIGH'; +---- + +[NOTE] +==== +If `TEXT` or `CLOB` column types are used, these JSON-specific queries are not available. +However, JSON serialization still works for storage and retrieval through Spring Integration. +==== + +[[trusted-packages]] +== Trusted Packages + +The `JacksonMessagingUtils.messagingAwareMapper()` validates all deserialized classes against a trusted package list to prevent security vulnerabilities. + +Default trusted packages include: +- `java.util` +- `java.lang` +- `org.springframework.messaging.support` +- `org.springframework.integration.support` +- `org.springframework.integration.message` +- `org.springframework.integration.store` +- `org.springframework.integration.history` +- `org.springframework.integration.handler` + +Additional packages can be specified in the constructor and are appended to this list: + +[source,java] +---- +// Trust additional packages for custom payload types +new JsonMessageRowMapper("com.example.orders", "com.example.payments") +---- + +== Custom JsonObjectMapper + +For advanced scenarios, a custom `JsonObjectMapper` can be provided: + +[source,java] +---- +import org.springframework.integration.support.json.JacksonJsonObjectMapper; +import org.springframework.integration.support.json.JacksonMessagingUtils; +import tools.jackson.databind.ObjectMapper; +import tools.jackson.databind.SerializationFeature; + +@Bean +public JdbcChannelMessageStore messageStore(DataSource dataSource) { + ObjectMapper customMapper = JacksonMessagingUtils.messagingAwareMapper("com.example"); + customMapper.enable(SerializationFeature.INDENT_OUTPUT); + customMapper.registerModule(new CustomModule()); + + JacksonJsonObjectMapper jsonObjectMapper = new JacksonJsonObjectMapper(customMapper); + + JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource); + store.setPreparedStatementSetter( + new JsonChannelMessageStorePreparedStatementSetter(jsonObjectMapper)); + store.setMessageRowMapper( + new JsonMessageRowMapper(jsonObjectMapper)); + + return store; +} +---- + +[IMPORTANT] +==== +The custom `JsonObjectMapper` should be configured appropriately for Spring Integration message serialization. +It is recommended to start with `JacksonMessagingUtils.messagingAwareMapper()` and customize from there. +The same configuration must be used in both `JsonChannelMessageStorePreparedStatementSetter` and `JsonMessageRowMapper` for consistent serialization and deserialization. +==== diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index ae8c0efde79..3b1539c5305 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -90,6 +90,11 @@ The message stores now use a `MESSAGE_CONTENT` column name for serialized messag All the out-of-the-box SQL schemas have beed changed, too, to rely on the `MESSAGE_CONTENT` name for the respective column in the `INT_MESSAGE` and `INT_CHANNEL_MESSAGE` tables. See xref:jdbc/message-store.adoc[] for more information. +The `JdbcChannelMessageStore` now supports JSON serialization as an alternative to Java serialization. +New components `JsonChannelMessageStorePreparedStatementSetter` and `JsonMessageRowMapper` enable storing messages in JSON format. +This requires modifying the database schema to use text-based column types (such as JSONB, JSON, TEXT, or CLOB) instead of binary types. +See xref:jdbc/message-store-json.adoc[] for more information. + [[x7.0-redis-changes]] === Redis Changes