diff --git a/build.gradle b/build.gradle index 19023d602e..6811953483 100644 --- a/build.gradle +++ b/build.gradle @@ -53,6 +53,7 @@ ext { avroVersion = '1.12.1' awaitilityVersion = '4.3.0' camelVersion = '4.15.0' + cloudEventsVersion = '4.0.1' commonsDbcp2Version = '2.13.0' commonsIoVersion = '2.20.0' commonsNetVersion = '3.12.0' @@ -486,6 +487,21 @@ project('spring-integration-cassandra') { } } +project('spring-integration-cloudevents') { + description = 'Spring Integration CloudEvents Support' + + dependencies { + api "io.cloudevents:cloudevents-core:$cloudEventsVersion" + testImplementation "io.cloudevents:cloudevents-json-jackson:$cloudEventsVersion" + + testImplementation("io.cloudevents:cloudevents-avro-compact:$cloudEventsVersion") { + exclude group: 'org.apache.avro', module: 'avro' + } + testImplementation "org.apache.avro:avro:$avroVersion" + testImplementation "io.cloudevents:cloudevents-xml:$cloudEventsVersion" + } +} + project('spring-integration-core') { description = 'Spring Integration Core' diff --git a/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/package-info.java b/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/package-info.java new file mode 100644 index 0000000000..116ccfd7f8 --- /dev/null +++ b/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/package-info.java @@ -0,0 +1,3 @@ + +@org.jspecify.annotations.NullMarked +package org.springframework.integration.cloudevents; diff --git a/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/CloudEventMessageConverter.java b/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/CloudEventMessageConverter.java new file mode 100644 index 0000000000..75ee879953 --- /dev/null +++ b/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/CloudEventMessageConverter.java @@ -0,0 +1,76 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cloudevents.transformer; + +import java.util.Objects; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.CloudEventUtils; +import org.jspecify.annotations.Nullable; + +import org.springframework.integration.transformer.MessageTransformationException; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.MessageConverter; + +/** + * Convert Spring Integration {@link Message}s to CloudEvents. + * + * @author Glenn Renfro + * + * @since 7.0 + */ +class CloudEventMessageConverter implements MessageConverter { + + private final String cloudEventPrefix; + + private final String specVersionKey; + + private final String dataContentTypeKey; + + /** + * Construct a CloudEventMessageConverter with the specified configuration. + * @param cloudEventPrefix the prefix for CloudEvent headers in binary content mode + * @param specVersionKey the header name for the specification version + * @param dataContentTypeKey the header name for the data content type + */ + CloudEventMessageConverter(String cloudEventPrefix, String specVersionKey, String dataContentTypeKey) { + this.cloudEventPrefix = cloudEventPrefix; + this.specVersionKey = specVersionKey; + this.dataContentTypeKey = dataContentTypeKey; + } + + /** + * This converter only supports CloudEvent to Message conversion. + * @throws UnsupportedOperationException always, as this operation is not supported + */ + @Override + public @Nullable Object fromMessage(Message message, Class targetClass) { + throw new UnsupportedOperationException("CloudEventMessageConverter does not support fromMessage method"); + } + + @Override + public Message toMessage(Object payload, @Nullable MessageHeaders headers) { + if (payload instanceof CloudEvent event) { + return CloudEventUtils.toReader(event).read(new MessageBuilderMessageWriter(this.cloudEventPrefix, + this.specVersionKey, this.dataContentTypeKey, Objects.requireNonNull(headers))); + } + throw new MessageTransformationException("Unsupported payload type. Should be CloudEvent but was: " + + payload.getClass()); + } + +} diff --git a/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/MessageBuilderMessageWriter.java b/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/MessageBuilderMessageWriter.java new file mode 100644 index 0000000000..63f8e68f81 --- /dev/null +++ b/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/MessageBuilderMessageWriter.java @@ -0,0 +1,137 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cloudevents.transformer; + +import java.util.HashMap; +import java.util.Map; + +import io.cloudevents.CloudEventData; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.rw.CloudEventContextWriter; +import io.cloudevents.rw.CloudEventRWException; +import io.cloudevents.rw.CloudEventWriter; + +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; + +/** + * Adapt CloudEvents to Spring Integration {@link Message}s using the CloudEvents SDK + * {@link MessageWriter} abstraction. + * Write CloudEvent attributes as message headers with a configurable prefix for + * binary content mode serialization. Used internally by {@link CloudEventMessageConverter} + * to convert CloudEvent objects into Spring Integration messages. + * + * @author Glenn Renfro + * + * @since 7.0 + * + * @see CloudEventMessageConverter + */ +class MessageBuilderMessageWriter + implements CloudEventWriter>, MessageWriter> { + + private final String cloudEventPrefix; + + private final String specVersionKey; + + private final String dataContentTypeKey; + + private final Map headers = new HashMap<>(); + + /** + * Construct a MessageBuilderMessageWriter with the specified configuration. + * @param cloudEventPrefix the prefix to prepend to CloudEvent attribute names in message headers + * @param specVersionKey the header name for the CloudEvent specification version + * @param dataContentTypeKey the header name for the data content type + * @param headers the base message headers to include in the output message + */ + MessageBuilderMessageWriter(String cloudEventPrefix, String specVersionKey, String dataContentTypeKey, Map headers) { + this.headers.putAll(headers); + this.cloudEventPrefix = cloudEventPrefix; + this.specVersionKey = specVersionKey; + this.dataContentTypeKey = dataContentTypeKey; + } + + /** + * Set the event in structured content mode. + * Create a message with the serialized CloudEvent as the payload and set the + * data content type header to the format's serialized content type. + * @param format the event format used to serialize the CloudEvent + * @param value the serialized CloudEvent bytes + * @return the Spring Integration message containing the serialized CloudEvent + * @throws CloudEventRWException if an error occurs during message creation + */ + @Override + public Message setEvent(EventFormat format, byte[] value) throws CloudEventRWException { + this.headers.put(this.dataContentTypeKey, format.serializedContentType()); + return MessageBuilder.withPayload(value).copyHeaders(this.headers).build(); + } + + /** + * Complete the message creation with CloudEvent data. + * Create a message with the CloudEvent data as the payload. CloudEvent attributes + * are already set as headers via {@link #withContextAttribute(String, String)}. + * @param value the CloudEvent data to use as the message payload + * @return the Spring Integration message with CloudEvent data and attributes + * @throws CloudEventRWException if an error occurs during message creation + */ + @Override + public Message end(CloudEventData value) throws CloudEventRWException { + return MessageBuilder.withPayload(value.toBytes()).copyHeaders(this.headers).build(); + } + + /** + * Complete the message creation without CloudEvent data. + * Create a message with an empty payload when the CloudEvent contains no data. + * CloudEvent attributes are set as headers via {@link #withContextAttribute(String, String)}. + * @return the Spring Integration message with an empty payload and CloudEvent attributes as headers + */ + @Override + public Message end() { + return MessageBuilder.withPayload(new byte[0]).copyHeaders(this.headers).build(); + } + + /** + * Add a CloudEvent context attribute to the message headers. + * Map the CloudEvent attribute to a message header by prepending the configured prefix + * to the attribute name (e.g., "id" becomes "ce-id" with default prefix). + * @param name the CloudEvent attribute name + * @param value the CloudEvent attribute value + * @return this writer for method chaining + * @throws CloudEventRWException if an error occurs while setting the attribute + */ + @Override + public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException { + this.headers.put(this.cloudEventPrefix + name, value); + return this; + } + + /** + * Initialize the writer with the CloudEvent specification version. + * Set the specification version as a message header using the configured version key. + * @param version the CloudEvent specification version + * @return this writer for method chaining + */ + @Override + public MessageBuilderMessageWriter create(SpecVersion version) { + this.headers.put(this.specVersionKey, version.toString()); + return this; + } + +} diff --git a/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/ToCloudEventsTransformer.java b/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/ToCloudEventsTransformer.java new file mode 100644 index 0000000000..508138783a --- /dev/null +++ b/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/ToCloudEventsTransformer.java @@ -0,0 +1,320 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.springframework.integration.cloudevents.transformer; + +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventExtension; +import io.cloudevents.CloudEventExtensions; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.provider.EventFormatProvider; +import org.jspecify.annotations.Nullable; + +import org.springframework.context.ApplicationContext; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.expression.common.LiteralExpression; +import org.springframework.integration.expression.ExpressionUtils; +import org.springframework.integration.expression.FunctionExpression; +import org.springframework.integration.expression.ValueExpression; +import org.springframework.integration.support.utils.PatternMatchUtils; +import org.springframework.integration.transformer.AbstractTransformer; +import org.springframework.integration.transformer.MessageTransformationException; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.Assert; + +/** + * Converts messages to CloudEvent format. + * Performs attribute and extension mapping based on {@link Expression}s. + * + * @author Glenn Renfro + * + * @since 7.0 + */ +public class ToCloudEventsTransformer extends AbstractTransformer { + + private static final String DEFAULT_PREFIX = "ce-"; + + private static final String DEFAULT_SPECVERSION_KEY = "specversion"; + + private static final String DEFAULT_DATACONTENTTYPE_KEY = "datacontenttype"; + + private String cloudEventPrefix = DEFAULT_PREFIX; + + private boolean noFormat = false; + + private Expression eventIdExpression = new FunctionExpression>( + msg -> Objects.requireNonNull(msg.getHeaders().getId()).toString()); + + @SuppressWarnings("NullAway.Init") + private Expression sourceExpression; + + private Expression typeExpression = new LiteralExpression("spring.message"); + + @SuppressWarnings("NullAway.Init") + private @Nullable Expression dataSchemaExpression; + + private @Nullable Expression subjectExpression; + + private final String[] extensionPatterns; + + @SuppressWarnings("NullAway.Init") + private CloudEventMessageConverter cloudEventMessageConverter; + + @SuppressWarnings("NullAway.Init") + private EvaluationContext evaluationContext; + + private final EventFormatProvider eventFormatProvider = EventFormatProvider.getInstance(); + + /** + * Construct a ToCloudEventsTransformer. + * @param extensionPatterns patterns to evaluate whether message headers should be added as extensions + * to the CloudEvent + */ + public ToCloudEventsTransformer(String ... extensionPatterns) { + this.extensionPatterns = extensionPatterns; + } + + /** + * Construct a ToCloudEventsTransformer with no extensionPatterns. + */ + public ToCloudEventsTransformer() { + this.extensionPatterns = new String[0]; + } + + /** + * Set the {@link Expression} for creating CloudEvent ids. + * Defaults to extracting the id from the {@link MessageHeaders} of the message. + * @param eventIdExpression the expression to create the id for each CloudEvent + */ + public void setEventIdExpression(Expression eventIdExpression) { + this.eventIdExpression = eventIdExpression; + } + + /** + * Set the {@link Expression} for creating CloudEvent source. + * Defaults to {@code "/spring/" + appName + "." + getBeanName())}. + * @param sourceExpression the expression to create the source for each CloudEvent + */ + public void setSourceExpression(Expression sourceExpression) { + this.sourceExpression = sourceExpression; + } + + /** + * Set the {@link Expression} for extracting the type for the CloudEvent. + * Defaults to "spring.message". + * @param typeExpression the expression to create the type for each CloudEvent + */ + public void setTypeExpression(Expression typeExpression) { + this.typeExpression = typeExpression; + } + + /** + * Set the {@link Expression} for creating the dataSchema for the CloudEvent. + * Defaults to null. + * @param dataSchemaExpression the expression to create the dataSchema for each CloudEvent + */ + public void setDataSchemaExpression(@Nullable Expression dataSchemaExpression) { + this.dataSchemaExpression = dataSchemaExpression; + } + + /** + * Set the {@link Expression} for creating the subject for the CloudEvent. + * Defaults to null. + * @param subjectExpression the expression to create the subject for each CloudEvent + */ + public void setSubjectExpression(@Nullable Expression subjectExpression) { + this.subjectExpression = subjectExpression; + } + + @Override + protected void onInit() { + super.onInit(); + this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); + ApplicationContext applicationContext = getApplicationContext(); + if (this.sourceExpression == null) { // in the case the user sets the value prior to onInit. + String appName = applicationContext.getEnvironment().getProperty("spring.application.name"); + appName = appName == null ? "unknown" : appName; + this.sourceExpression = new ValueExpression<>(URI.create("/spring/" + appName + "." + getBeanName())); + } + this.cloudEventMessageConverter = new CloudEventMessageConverter(this.cloudEventPrefix, this.getSpecVersionKey(), this.getDataContentTypeKey()); + } + + /** + * Transform the input message into a CloudEvent message. + * @param message the input Spring Integration message to transform + * @return CloudEvent message in the specified format + * @throws RuntimeException if serialization fails + */ + @SuppressWarnings("unchecked") + @Override + protected Object doTransform(Message message) { + Assert.isInstanceOf(byte[].class, message.getPayload(), "Message payload must be of type byte[]"); + + String id = this.eventIdExpression.getValue(this.evaluationContext, message, String.class); + URI source = this.sourceExpression.getValue(this.evaluationContext, message, URI.class); + String type = this.typeExpression.getValue(this.evaluationContext, message, String.class); + + String contentType = message.getHeaders().get(MessageHeaders.CONTENT_TYPE, String.class); + if (contentType == null) { + throw new MessageTransformationException(message, "Missing 'Content-Type' header"); + } + + ToCloudEventTransformerExtensions extensions = + new ToCloudEventTransformerExtensions(message.getHeaders(), + this.extensionPatterns); + + CloudEventBuilder cloudEventBuilder = CloudEventBuilder.v1() + .withId(id) + .withSource(source) + .withType(type) + .withTime(OffsetDateTime.now()) + .withDataContentType(contentType); + + if (this.subjectExpression != null) { + cloudEventBuilder.withSubject(this.subjectExpression.getValue(this.evaluationContext, message, String.class)); + } + if (this.dataSchemaExpression != null) { + cloudEventBuilder.withDataSchema(this.dataSchemaExpression.getValue(this.evaluationContext, message, URI.class)); + } + + CloudEvent cloudEvent = cloudEventBuilder.withData((byte[]) message.getPayload()) + .withExtension(extensions) + .build(); + + EventFormat eventFormat = this.eventFormatProvider.resolveFormat(contentType); + if (eventFormat == null && !this.noFormat) { + throw new MessageTransformationException("No EventFormat found for '" + contentType + "'"); + } + + if (eventFormat != null) { + return MessageBuilder.withPayload(eventFormat.serialize(cloudEvent)) + .copyHeaders(message.getHeaders()) + .setHeader(MessageHeaders.CONTENT_TYPE, "application/cloudevents") + .build(); + } + Map headers = new HashMap<>(message.getHeaders()); + headers.putAll(extensions.cloudEventExtensions); + + return this.cloudEventMessageConverter.toMessage(cloudEvent, new MessageHeaders(headers)); + } + + @Override + public String getComponentType() { + return "ce:to-cloudevents-transformer"; + } + + /** + * Returns CloudEvent information to the header if no {@link EventFormat} is found for content type. + * @return true if CloudEvent information should be added to header if no {@link EventFormat} is found. + */ + public boolean isNoFormat() { + return this.noFormat; + } + + /** + * Set CloudEvent information to the header if no {@link EventFormat} is found for content type. + * When true and no {@link EventFormat} is found for the content type, CloudEvents are sent with headers instead of + * structured format. + * @param noFormat true to disable format serialization + */ + public void setNoFormat(boolean noFormat) { + this.noFormat = noFormat; + } + + /** + * Return the prefix used for CloudEvent headers in binary content mode. + * @return the CloudEvent header prefix + */ + public String getCloudEventPrefix() { + return this.cloudEventPrefix; + } + + /** + * Set the prefix for CloudEvent headers in binary content mode. + * @param cloudEventPrefix the prefix to use for CloudEvent headers + */ + public void setCloudEventPrefix(String cloudEventPrefix) { + this.cloudEventPrefix = cloudEventPrefix; + } + + private String getSpecVersionKey() { + return this.cloudEventPrefix + DEFAULT_SPECVERSION_KEY; + } + + private String getDataContentTypeKey() { + return this.cloudEventPrefix + DEFAULT_DATACONTENTTYPE_KEY; + } + + private static class ToCloudEventTransformerExtensions implements CloudEventExtension { + + /** + * Stores the CloudEvent extensions extracted from message headers. + */ + private final Map cloudEventExtensions; + + /** + * Construct CloudEvent extensions by processing a message using expressions. + * + * @param headers the headers from the Spring Integration message + * @param extensionPatterns patterns to determine whether message headers are extensions + */ + @SuppressWarnings("unchecked") + ToCloudEventTransformerExtensions(Map headers, String ... extensionPatterns) { + this.cloudEventExtensions = new HashMap<>(); + Boolean result = null; + for (Map.Entry header : headers.entrySet()) { + result = PatternMatchUtils.smartMatch(header.getKey(), extensionPatterns); + if (result != null && result) { + this.cloudEventExtensions.put(header.getKey(), header.getValue()); + } + } + } + + @Override + public void readFrom(CloudEventExtensions extensions) { + extensions.getExtensionNames() + .forEach(key -> { + Object value = extensions.getExtension(key); + if (value != null) { + this.cloudEventExtensions.put(key, value); + } + }); + } + + @Override + public @Nullable Object getValue(String key) throws IllegalArgumentException { + return this.cloudEventExtensions.get(key); + } + + @Override + public Set getKeys() { + return this.cloudEventExtensions.keySet(); + } + } + +} diff --git a/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/package-info.java b/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/package-info.java new file mode 100644 index 0000000000..02b690ceec --- /dev/null +++ b/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/package-info.java @@ -0,0 +1,3 @@ + +@org.jspecify.annotations.NullMarked +package org.springframework.integration.cloudevents.transformer; diff --git a/spring-integration-cloudevents/src/test/java/org/springframework/integration/cloudevents/transformer/CloudEventMessageConverterTests.java b/spring-integration-cloudevents/src/test/java/org/springframework/integration/cloudevents/transformer/CloudEventMessageConverterTests.java new file mode 100644 index 0000000000..c0c3863c7d --- /dev/null +++ b/spring-integration-cloudevents/src/test/java/org/springframework/integration/cloudevents/transformer/CloudEventMessageConverterTests.java @@ -0,0 +1,63 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cloudevents.transformer; + +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.Collections; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.data.BytesCloudEventData; +import io.cloudevents.core.v1.CloudEventV1; +import org.junit.jupiter.api.Test; + +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + +public class CloudEventMessageConverterTests { + + private static final String DEFAULT_PREFIX = "ce-"; + + private static final String DEFAULT_SPECVERSION_KEY = DEFAULT_PREFIX + "specversion"; + + private static final String DEFAULT_DATACONTENTTYPE_KEY = DEFAULT_PREFIX + "datacontenttype"; + + @Test + void binaryModeContainsAllCloudEventAttributes() { + CloudEventMessageConverter cloudEventMessageConverter = new CloudEventMessageConverter(DEFAULT_PREFIX, + DEFAULT_SPECVERSION_KEY, DEFAULT_DATACONTENTTYPE_KEY); + Message message = MessageBuilder.withPayload(new CloudEventV1("1", URI.create("http://localhost:8080/cloudevents"), + "sampleType", "text/plain", + URI.create("http://sample:8080/sample"), "sample subject", OffsetDateTime.now(), + BytesCloudEventData.wrap(new byte[0]), Collections.emptyMap())).build(); + Message convertedMessage = cloudEventMessageConverter.toMessage(message.getPayload(), message.getHeaders()); + assertThat(convertedMessage.getHeaders()).containsKeys(DEFAULT_DATACONTENTTYPE_KEY, DEFAULT_SPECVERSION_KEY, "ce-time", "ce-id"); + } + + @Test + void fromMessageThrowsUnsupportedOperation() { + CloudEventMessageConverter cloudEventMessageConverter = new CloudEventMessageConverter(DEFAULT_PREFIX, + DEFAULT_SPECVERSION_KEY, DEFAULT_DATACONTENTTYPE_KEY); + assertThatExceptionOfType(UnsupportedOperationException.class).isThrownBy(() -> + cloudEventMessageConverter.fromMessage(MessageBuilder.withPayload(new byte[0]).build(), + CloudEvent.class)).withMessage( + "CloudEventMessageConverter does not support fromMessage method"); + } +} diff --git a/spring-integration-cloudevents/src/test/java/org/springframework/integration/cloudevents/transformer/ToCloudEventsTransformerTests.java b/spring-integration-cloudevents/src/test/java/org/springframework/integration/cloudevents/transformer/ToCloudEventsTransformerTests.java new file mode 100644 index 0000000000..0b1ecd816d --- /dev/null +++ b/spring-integration-cloudevents/src/test/java/org/springframework/integration/cloudevents/transformer/ToCloudEventsTransformerTests.java @@ -0,0 +1,339 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cloudevents.transformer; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.avro.compact.AvroCompactFormat; +import io.cloudevents.core.format.EventDeserializationException; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.format.EventSerializationException; +import io.cloudevents.jackson.JsonFormat; +import io.cloudevents.rw.CloudEventDataMapper; +import io.cloudevents.xml.XMLFormat; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.expression.ExpressionParser; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.transformer.MessageTransformationException; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +@DirtiesContext +@SpringJUnitConfig +class ToCloudEventsTransformerTests { + + private static final String TRACE_HEADER = "traceId"; + + private static final String SPAN_HEADER = "spanId"; + + private static final String USER_HEADER = "userId"; + + private static final String[] TEST_PATTERNS = {"trace*", SPAN_HEADER, USER_HEADER}; + + private static final byte[] PAYLOAD = "\"test message\"".getBytes(StandardCharsets.UTF_8); + + @Autowired + private ToCloudEventsTransformer transformerWithNoExtensions; + + @Autowired + private ToCloudEventsTransformer transformerWithExtensions; + + @Autowired + private ToCloudEventsTransformer transformerWithNoExtensionsNoFormat; + + @Autowired + private ToCloudEventsTransformer transformerWithExtensionsNoFormat; + + @Autowired + private ToCloudEventsTransformer transformerWithExtensionsNoFormatWithPrefix; + + @Autowired + private ToCloudEventsTransformer transformerWithInvalidIDExpression; + + private final JsonFormat jsonFormat = new JsonFormat(); + + private final AvroCompactFormat avroFormat = new AvroCompactFormat(); + + private final XMLFormat xmlFormat = new XMLFormat(); + + @Test + @SuppressWarnings("NullAway") + void doJsonTransformWithPayloadBasedOnContentType() { + CloudEvent cloudEvent = getTransformerNoExtensions(PAYLOAD, jsonFormat); + assertThat(cloudEvent.getData().toBytes()).isEqualTo(PAYLOAD); + assertThat(cloudEvent.getSource().toString()).isEqualTo("/spring/unknown.transformerWithNoExtensions"); + assertThat(cloudEvent.getDataSchema()).isNull(); + assertThat(cloudEvent.getDataContentType()).isEqualTo(JsonFormat.CONTENT_TYPE); + } + + @Test + @SuppressWarnings("NullAway") + void doXMLTransformWithPayloadBasedOnContentType() { + String xmlPayload = ("" + + "testmessage"); + CloudEvent cloudEvent = getTransformerNoExtensions(xmlPayload.getBytes(), xmlFormat); + assertThat(cloudEvent.getData().toBytes()).isEqualTo(xmlPayload.getBytes()); + assertThat(cloudEvent.getSource().toString()).isEqualTo("/spring/unknown.transformerWithNoExtensions"); + assertThat(cloudEvent.getDataSchema()).isNull(); + assertThat(cloudEvent.getDataContentType()).isEqualTo(XMLFormat.XML_CONTENT_TYPE); + } + + @Test + @SuppressWarnings("NullAway") + void doAvroTransformWithPayloadBasedOnContentType() { + CloudEvent cloudEvent = getTransformerNoExtensions(PAYLOAD, avroFormat); + assertThat(cloudEvent.getData().toBytes()).isEqualTo(PAYLOAD); + assertThat(cloudEvent.getSource().toString()).isEqualTo("/spring/unknown.transformerWithNoExtensions"); + assertThat(cloudEvent.getDataSchema()).isNull(); + assertThat(cloudEvent.getDataContentType()).isEqualTo(AvroCompactFormat.AVRO_COMPACT_CONTENT_TYPE); + } + + @Test + void unregisteredFormatType() { + EventFormat testFormat = new EventFormat() { + + @Override + public byte[] serialize(CloudEvent event) throws EventSerializationException { + return new byte[0]; + } + + @Override + public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper mapper) throws EventDeserializationException { + return Mockito.mock(CloudEvent.class); + } + + @Override + public String serializedContentType() { + return "application/cloudevents+invalid"; + } + }; + assertThatThrownBy(() -> getTransformerNoExtensions(PAYLOAD, testFormat)) + .hasMessage("No EventFormat found for 'application/cloudevents+invalid'"); + } + + @Test + void convertMessageNoExtensions() { + Message message = MessageBuilder.withPayload(PAYLOAD) + .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") + .setHeader(TRACE_HEADER, "test-value") + .setHeader(SPAN_HEADER, "other-value") + .build(); + Message result = transformMessage(message, this.transformerWithNoExtensionsNoFormat); + assertThat(result.getPayload()).isEqualTo(PAYLOAD); + assertThat(result.getHeaders()).containsKeys(TRACE_HEADER, SPAN_HEADER); + assertThat(result.getHeaders()).doesNotContainKeys("ce-" + TRACE_HEADER, "ce-" + SPAN_HEADER); + } + + @Test + void convertMessageWithExtensions() { + Message message = MessageBuilder.withPayload(PAYLOAD) + .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") + .setHeader(TRACE_HEADER, "test-value") + .setHeader(SPAN_HEADER, "other-value") + .build(); + Message result = transformMessage(message, this.transformerWithExtensionsNoFormat); + assertThat(result.getHeaders()).containsKeys(TRACE_HEADER, SPAN_HEADER); + assertThat(result.getHeaders()).containsKeys("ce-" + TRACE_HEADER, "ce-" + SPAN_HEADER); + } + + @Test + void convertMessageWithExtensionsNewPrefix() { + Message message = MessageBuilder.withPayload(PAYLOAD) + .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") + .setHeader(TRACE_HEADER, "test-value") + .setHeader(SPAN_HEADER, "other-value") + .build(); + Message result = transformMessage(message, this.transformerWithExtensionsNoFormatWithPrefix); + assertThat(result.getHeaders()).containsKeys(TRACE_HEADER, SPAN_HEADER); + assertThat(result.getHeaders()).containsKeys("CLOUDEVENTS-" + TRACE_HEADER, "CLOUDEVENTS-" + SPAN_HEADER, + "CLOUDEVENTS-id", "CLOUDEVENTS-specversion", "CLOUDEVENTS-datacontenttype"); + + } + + @Test + @SuppressWarnings("unchecked") + void doTransformWithObjectPayload() throws Exception { + TestRecord testRecord = new TestRecord("sample data"); + byte[] payload = convertPayloadToBytes(testRecord); + Message message = MessageBuilder.withPayload(payload).setHeader("test_id", "test-id") + .setHeader("contentType", JsonFormat.CONTENT_TYPE) + .build(); + Object result = this.transformerWithNoExtensions.doTransform(message); + + assertThat(result).isNotNull(); + assertThat(result).isInstanceOf(Message.class); + + Message resultMessage = (Message) result; + assertThat(resultMessage.getPayload()).isNotNull(); + assertThat(new String(resultMessage.getPayload())).endsWith(new String(payload) + "}"); + } + + @Test + @SuppressWarnings("NullAway") + void emptyExtensionNames() { + Message message = createBaseMessage(PAYLOAD, "application/cloudevents+json").build(); + + Object result = this.transformerWithNoExtensions.doTransform(message); + assertThat(result).isNotNull(); + Message resultMessage = (Message) result; + assertThat(resultMessage.getPayload()).isNotNull(); + } + + @Test + void noContentType() { + Message message = MessageBuilder.withPayload(PAYLOAD).build(); + assertThatThrownBy(() -> this.transformerWithNoExtensions.transform(message)) + .isInstanceOf(MessageTransformationException.class) + .hasMessageContaining("Missing 'Content-Type' header"); + } + + @Test + @SuppressWarnings({"unchecked", "NullAway"}) + void multipleExtensionMappings() { + String payload = "test message"; + Message message = createBaseMessage(payload.getBytes(), "application/cloudevents+json") + .setHeader("correlation-id", "corr-999") + .setHeader(TRACE_HEADER, "trace-123") + .build(); + + Object result = this.transformerWithExtensions.doTransform(message); + + assertThat(result).isNotNull(); + Message resultMessage = (Message) result; + + assertThat(resultMessage.getHeaders()).containsKeys("correlation-id"); + assertThat(resultMessage.getHeaders().get("correlation-id")).isEqualTo("corr-999"); + assertThat(new String(resultMessage.getPayload())).contains("\"traceId\":\"trace-123\""); + assertThat(new String(resultMessage.getPayload())).doesNotContain("\"spanId\":\"span-456\"", + "\"userId\":\"user-789\""); + } + + @Test + void emptyStringPayloadHandling() { + Message message = createBaseMessage("".getBytes(), "application/cloudevents+json").build(); + Object result = this.transformerWithNoExtensions.doTransform(message); + + assertThat(result).isNotNull(); + assertThat(result).isInstanceOf(Message.class); + } + + @Test + void failWhenNoIdHeaderAndNoDefault() { + Message message = MessageBuilder.withPayload(PAYLOAD) + .setHeader("contentType", JsonFormat.CONTENT_TYPE) + .build(); + + assertThatThrownBy(() -> this.transformerWithInvalidIDExpression.transform(message)).isInstanceOf(MessageTransformationException.class) + .hasMessageContaining("failed to transform message"); + } + + private CloudEvent getTransformerNoExtensions(byte[] payload, EventFormat eventFormat) { + Message message = createBaseMessage(payload, eventFormat.serializedContentType()) + .setHeader("customheader", "test-value") + .setHeader("otherheader", "other-value") + .build(); + Message result = transformMessage(message, this.transformerWithNoExtensions); + return eventFormat.deserialize(result.getPayload()); + } + + @SuppressWarnings("unchecked") + private Message transformMessage(Message message, ToCloudEventsTransformer transformer) { + Object result = transformer.doTransform(message); + + assertThat(result).isNotNull(); + assertThat(result).isInstanceOf(Message.class); + return (Message) result; + } + + private byte[] convertPayloadToBytes(TestRecord testRecord) throws Exception { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(byteArrayOutputStream); + out.writeObject(testRecord); + out.flush(); + return byteArrayOutputStream.toByteArray(); + } + + private static MessageBuilder createBaseMessage(byte[] payload, String contentType) { + return MessageBuilder.withPayload(payload) + .setHeader(MessageHeaders.CONTENT_TYPE, contentType); + } + + @Configuration + @EnableIntegration + public static class ContextConfiguration { + + private static final ExpressionParser parser = new SpelExpressionParser(); + + @Bean + public ToCloudEventsTransformer transformerWithNoExtensions() { + return new ToCloudEventsTransformer(); + } + + @Bean + public ToCloudEventsTransformer transformerWithExtensions() { + return new ToCloudEventsTransformer(TEST_PATTERNS); + } + + @Bean + public ToCloudEventsTransformer transformerWithNoExtensionsNoFormat() { + ToCloudEventsTransformer toCloudEventsTransformer = new ToCloudEventsTransformer(); + toCloudEventsTransformer.setNoFormat(true); + return toCloudEventsTransformer; + } + + @Bean + public ToCloudEventsTransformer transformerWithExtensionsNoFormat() { + ToCloudEventsTransformer toCloudEventsTransformer = new ToCloudEventsTransformer(TEST_PATTERNS); + toCloudEventsTransformer.setNoFormat(true); + return toCloudEventsTransformer; + } + + @Bean + public ToCloudEventsTransformer transformerWithExtensionsNoFormatWithPrefix() { + ToCloudEventsTransformer toCloudEventsTransformer = new ToCloudEventsTransformer(TEST_PATTERNS); + toCloudEventsTransformer.setNoFormat(true); + toCloudEventsTransformer.setCloudEventPrefix("CLOUDEVENTS-"); + return toCloudEventsTransformer; + } + + @Bean + public ToCloudEventsTransformer transformerWithInvalidIDExpression() { + ToCloudEventsTransformer transformer = new ToCloudEventsTransformer(); + transformer.setEventIdExpression(parser.parseExpression("null")); + return transformer; + } + } + + private record TestRecord(String sampleValue) implements Serializable { } +} diff --git a/src/reference/antora/modules/ROOT/nav.adoc b/src/reference/antora/modules/ROOT/nav.adoc index 292a8fe2b1..3f7d1f14d4 100644 --- a/src/reference/antora/modules/ROOT/nav.adoc +++ b/src/reference/antora/modules/ROOT/nav.adoc @@ -124,6 +124,7 @@ ** xref:amqp/amqp-1.0.adoc[] * xref:camel.adoc[] * xref:cassandra.adoc[] +* xref:cloudevents.adoc[] * xref:debezium.adoc[] * xref:event.adoc[] * xref:feed.adoc[] diff --git a/src/reference/antora/modules/ROOT/pages/cloudevents.adoc b/src/reference/antora/modules/ROOT/pages/cloudevents.adoc new file mode 100644 index 0000000000..8ef29d4f59 --- /dev/null +++ b/src/reference/antora/modules/ROOT/pages/cloudevents.adoc @@ -0,0 +1,271 @@ +[[cloudevents]] += CloudEvents Support + +Spring Integration provides transformers (starting with version 7.0) for transforming messages into CloudEvent messages. +It is fully based on the https://github.com/cloudevents/sdk-java[CloudEvents SDK] project. + +This dependency is required for the project: + +[tabs] +====== +Maven:: ++ +[source, xml, subs="normal", role="primary"] +---- + + org.springframework.integration + spring-integration-cloudevents + {project-version} + +---- + +Gradle:: ++ +[source, groovy, subs="normal", role="secondary"] +---- +compile "org.springframework.integration:spring-integration-cloudevents:{project-version}" +---- +====== + +[[cloudevent-transformers]] + +== CloudEvent Transformers + +[[tocloudeventstransformer]] +=== ToCloudEventsTransformer + +The `ToCloudEventsTransformer` converts Spring Integration messages into CloudEvents compliant messages. +This transformer provides support for the CloudEvents specification v1.0 with configurable output format and defining attributes and extensions using `Expression` s. + +[[cloudevent-transformer-overview]] +==== Overview + +The CloudEvents transformer (`ToCloudEventsTransformer`) converts messages to CloudEvents format. +The CloudEvents transformer utilizes `io.cloudevents.core.provider.EventFormatProvider` to find `EventFormat` classes in the classpath and registers these as the available serializers for CloudEvents. +The type of serialization (JSON, XML, AVRO, etc) of the CloudEvents' message is determined by `contentType` of the message. + +NOTE: Messages to be transformed must have a payload of `byte[]`. + + + +[[configure-transformer]] +===== Configuring Transformer + +The `ToCloudEventsTransformer` allows the user to use SpEL `Expression`s to populate the attributes as well as the extensions. + +====== Attribute Expressions + +Users are allowed to set the CloudEvents' attributes of `id`, `source`, `type`, `dataSchema`, `subject` through SpEL `Expression` s. +The example below shows where a `ToCloudEventTransformer` is created with a null `expression` 's variable. +This indicates that this transformer will not place any `extensions` in the CloudEvent. +But the user does want to set the `type` of the CloudEvent to `sampleType`. + +NOTE: The `time` attribute is set to the time that the CloudEvent message was created by the `ToCloudEventsTransformer` transformer. + +[source,java] +---- +ToCloudEventTransformer transformer = new ToCloudEventTransformer(null); +transformer.setTypeExpression(new LiteralExpression("sampleType")); +---- + +====== Extension Expressions + +The expressions constructor parameter is an array of `Expression` s. +If the array is `null`, then no extensions will be added to the CloudEvent. +Each `Expression` in the array must return the type `Map`. +Where the key is a `String` and the value is of type `Object`. +In the example below the extensions are hard coded to return 3 `Map` objects each containing one extension. + +[source,java] +---- +ExpressionParser parser = new SpelExpressionParser(); +Expression[] extensionExpressions = { + parser.parseExpression("{'trace-id' : 'trace-123'}"), + parser.parseExpression("{'span-id' : 'span-456'}"), + parser.parseExpression("{'user-id' : 'user-789'}")}; +return new ToCloudEventTransformer(extensionExpressions); +---- + +[[cloudevent-attribute-defaults]] +====== Default Values +The following table contains the Attribute names and the value returned by the default `Expression`s. + +|=== +| Attribute Name | Default Value + +| `id` +| the id of the message. + +| `source` +| Prefix of "/spring/" followed by the appName a period then the name of the transformer's bean. i.e. `/spring/myapp.toCloudEventsTransformerBean` + +| `type` +| "spring.message" + +| `dataContentType` +| The `contentType` of the message. + +| `dataSchema` +| `null` + +| `subject` +| `null` + +| `time` +| The time the CloudEvent message is created +|=== + +[[cloudevent-transformer-integration]] +==== Integration with Spring Integration Flows + +The CloudEvent transformer integrates with Spring Integration flows: + +====== Basic Flow + +[source,java] +---- +@Bean +public IntegrationFlow cloudEventTransformFlow() { + return IntegrationFlows + .from("inputChannel") + .transform(cloudEventTransformer()) + .channel("outputChannel") + .get(); +} +---- + +[[cloudevent-transformer-transformation-process]] +===== Transformation Process + +The transformer follows the process below: + +1. **CloudEvent Building**: Build CloudEvent attributes +2. **Extension Extraction**: Build the CloudEvent extensions using the array of extensionExpressions passed into the constructor. +3. **Format Conversion**: Apply the specified `EventFormat` based on the message's `contentType to create the CloudEvent. + +[[cloudevent-transformer-examples]] +===== Examples + +[[cloudevent-transformer-example-basic]] +====== Basic Message Transformation + +[source,java] +---- +// Input message with headers +Message inputMessage = MessageBuilder + .withPayload("Hello CloudEvents") + .withHeader("contentType", "application/octet-stream") + .build(); +// Transformer with extension patterns +ToCloudEventTransformer transformer = new ToCloudEventTransformer(); + +// Transform to CloudEvent +Object cloudEventMessage = transformer.transform(inputMessage); +---- + +[[eventformats]] +===== EventFormats + +The `ToCloudEventsTransformer` uses `EventFormat` s to serialize the CloudEvent into the message's payload. +The `EventFormat` s used by the `ToCloudEventsTransformer` are obtained from the classpath of the project. +The `EventFormat` s that are available from the https://github.com/cloudevents/sdk-java[CloudEvents SDK] project are as follows: + +[[jsonformat]] +====== JsonFormat +The following dependency can be used to include this `JsonFormat` in your project. + +[tabs] +====== +Maven:: ++ +[source, xml, subs="normal", role="primary"] +---- + + io.cloudevents + cloudevents-json-jackson + $cloudEventsVersion + +---- + +Gradle:: ++ +[source, groovy, subs="normal", role="secondary"] +---- +compile "io.cloudevents:cloudevents-json-jackson:$cloudEventsVersion" +---- +====== + +[[xmlformat]] +====== XMLFormat +The following dependency can be used to include this `XMLFormat` in your project. + +[tabs] +====== +Maven:: ++ +[source, xml, subs="normal", role="primary"] +---- + + io.cloudevents + cloudevents-xml + $cloudEventsVersion + +---- + +Gradle:: ++ +[source, groovy, subs="normal", role="secondary"] +---- +compile "io.cloudevents:cloudevents-xml:$cloudEventsVersion" +---- +====== + +[[avrocompactformat]] +====== AvroCompactFormat +The following dependency can be used to include this `AvroCompactFormat` in your project. + +[tabs] +====== +Maven:: ++ +[source, xml, subs="normal", role="primary"] +---- + + io.cloudevents + cloudevents-avro-compact + $cloudEventsVersion + +---- + +Gradle:: ++ +[source, groovy, subs="normal", role="secondary"] +---- +compile "io.cloudevents:cloudevents-avro-compact:$cloudEventsVersion" +---- +====== + +[[protobufformat]] +====== ProtobufFormat +The following dependency can be used to include this `ProtobufFormat` in your project. + +[tabs] +====== +Maven:: ++ +[source, xml, subs="normal", role="primary"] +---- + + io.cloudevents + cloudevents-protobuf + $cloudEventsVersion + +---- + +Gradle:: ++ +[source, groovy, subs="normal", role="secondary"] +---- +compile "io.cloudevents:cloudevents-protobuf:$cloudEventsVersion" +---- +====== \ No newline at end of file diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 480eb36965..62c91287db 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -142,3 +142,9 @@ The JCIFS library behind the SMB support module has been upgraded to 3.0.0. It is a major rewrite of the codebase and implements a new package structure. This is a breaking change, and direct references to components of the JCIFS library will need to be updated. See xref:smb.adoc[] for more information. + +[[x7.0-cloudevents]] +=== CloudEvents + +CloudEvents are now supported in the `spring-integration-cloudevents` module. +See xref:cloudevents.adoc[] for more information.