Skip to content

Commit d115df2

Browse files
committed
Enable CloudEvents to be sent with headers
Enable CloudEvents to be sent with headers instead of requiring structured format serialization. This provides flexibility when integrating with systems that don't support CloudEvents structured formats. Introduce `CloudEventMessageConverter` to handle CloudEvent to Message conversion, utilizing the CloudEvents SDK's `MessageWriter` abstraction. Add `noFormat` configuration option to `ToCloudEventsTransformer`. When enabled and no `EventFormat` is available for the content type, CloudEvent attributes are written to message headers with configurable prefix (defaults to "ce-"). Add `cloudEventPrefix` property to customize the header prefix when `noFormat` is set to true, supporting different integration scenarios. Add test coverage for binary content mode including extension handling, custom prefixes, and validation that original headers are preserved alongside CloudEvent headers.
1 parent 075b280 commit d115df2

File tree

6 files changed

+421
-31
lines changed

6 files changed

+421
-31
lines changed
Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,3 @@
11

2-
/*
3-
* Copyright 2025-present the original author or authors.
4-
*
5-
* Licensed under the Apache License, Version 2.0 (the "License");
6-
* you may not use this file except in compliance with the License.
7-
* You may obtain a copy of the License at
8-
*
9-
* https://www.apache.org/licenses/LICENSE-2.0
10-
*
11-
* Unless required by applicable law or agreed to in writing, software
12-
* distributed under the License is distributed on an "AS IS" BASIS,
13-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14-
* See the License for the specific language governing permissions and
15-
* limitations under the License.
16-
*
17-
*/
18-
192
@org.jspecify.annotations.NullMarked
203
package org.springframework.integration.cloudevents;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.cloudevents.transformer;
18+
19+
import java.util.Objects;
20+
21+
import io.cloudevents.CloudEvent;
22+
import io.cloudevents.core.CloudEventUtils;
23+
import org.jspecify.annotations.Nullable;
24+
25+
import org.springframework.integration.transformer.MessageTransformationException;
26+
import org.springframework.messaging.Message;
27+
import org.springframework.messaging.MessageHeaders;
28+
import org.springframework.messaging.converter.MessageConverter;
29+
30+
/**
31+
* Convert Spring Integration {@link Message}s to CloudEvents.
32+
*
33+
* @author Glenn Renfro
34+
*
35+
* @since 7.0
36+
*/
37+
class CloudEventMessageConverter implements MessageConverter {
38+
39+
private final String cloudEventPrefix;
40+
41+
private final String specVersionKey;
42+
43+
private final String dataContentTypeKey;
44+
45+
/**
46+
* Construct a CloudEventMessageConverter with the specified configuration.
47+
* @param cloudEventPrefix the prefix for CloudEvent headers in binary content mode
48+
* @param specVersionKey the header name for the specification version
49+
* @param dataContentTypeKey the header name for the data content type
50+
*/
51+
CloudEventMessageConverter(String cloudEventPrefix, String specVersionKey, String dataContentTypeKey) {
52+
this.cloudEventPrefix = cloudEventPrefix;
53+
this.specVersionKey = specVersionKey;
54+
this.dataContentTypeKey = dataContentTypeKey;
55+
}
56+
57+
/**
58+
* This converter only supports CloudEvent to Message conversion.
59+
* @throws UnsupportedOperationException always, as this operation is not supported
60+
*/
61+
@Override
62+
public @Nullable Object fromMessage(Message<?> message, Class<?> targetClass) {
63+
throw new UnsupportedOperationException("CloudEventMessageConverter does not support fromMessage method");
64+
}
65+
66+
@Override
67+
public Message<?> toMessage(Object payload, @Nullable MessageHeaders headers) {
68+
if (payload instanceof CloudEvent event) {
69+
return CloudEventUtils.toReader(event).read(new MessageBuilderMessageWriter(this.cloudEventPrefix,
70+
this.specVersionKey, this.dataContentTypeKey, Objects.requireNonNull(headers)));
71+
}
72+
throw new MessageTransformationException("Unsupported payload type. Should be CloudEvent but was: " +
73+
payload.getClass());
74+
}
75+
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.cloudevents.transformer;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
22+
import io.cloudevents.CloudEventData;
23+
import io.cloudevents.SpecVersion;
24+
import io.cloudevents.core.format.EventFormat;
25+
import io.cloudevents.core.message.MessageWriter;
26+
import io.cloudevents.rw.CloudEventContextWriter;
27+
import io.cloudevents.rw.CloudEventRWException;
28+
import io.cloudevents.rw.CloudEventWriter;
29+
30+
import org.springframework.integration.support.MessageBuilder;
31+
import org.springframework.messaging.Message;
32+
33+
/**
34+
* Adapt CloudEvents to Spring Integration {@link Message}s using the CloudEvents SDK
35+
* {@link MessageWriter} abstraction.
36+
* Write CloudEvent attributes as message headers with a configurable prefix for
37+
* binary content mode serialization. Used internally by {@link CloudEventMessageConverter}
38+
* to convert CloudEvent objects into Spring Integration messages.
39+
*
40+
* @author Glenn Renfro
41+
*
42+
* @since 7.0
43+
*
44+
* @see CloudEventMessageConverter
45+
*/
46+
class MessageBuilderMessageWriter
47+
implements CloudEventWriter<Message<byte[]>>, MessageWriter<MessageBuilderMessageWriter, Message<byte[]>> {
48+
49+
private final String cloudEventPrefix;
50+
51+
private final String specVersionKey;
52+
53+
private final String dataContentTypeKey;
54+
55+
private final Map<String, Object> headers = new HashMap<>();
56+
57+
/**
58+
* Construct a MessageBuilderMessageWriter with the specified configuration.
59+
* @param cloudEventPrefix the prefix to prepend to CloudEvent attribute names in message headers
60+
* @param specVersionKey the header name for the CloudEvent specification version
61+
* @param dataContentTypeKey the header name for the data content type
62+
* @param headers the base message headers to include in the output message
63+
*/
64+
MessageBuilderMessageWriter(String cloudEventPrefix, String specVersionKey, String dataContentTypeKey, Map<String, Object> headers) {
65+
this.headers.putAll(headers);
66+
this.cloudEventPrefix = cloudEventPrefix;
67+
this.specVersionKey = specVersionKey;
68+
this.dataContentTypeKey = dataContentTypeKey;
69+
}
70+
71+
/**
72+
* Set the event in structured content mode.
73+
* Create a message with the serialized CloudEvent as the payload and set the
74+
* data content type header to the format's serialized content type.
75+
* @param format the event format used to serialize the CloudEvent
76+
* @param value the serialized CloudEvent bytes
77+
* @return the Spring Integration message containing the serialized CloudEvent
78+
* @throws CloudEventRWException if an error occurs during message creation
79+
*/
80+
@Override
81+
public Message<byte[]> setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
82+
this.headers.put(this.dataContentTypeKey, format.serializedContentType());
83+
return MessageBuilder.withPayload(value).copyHeaders(this.headers).build();
84+
}
85+
86+
/**
87+
* Complete the message creation with CloudEvent data.
88+
* Create a message with the CloudEvent data as the payload. CloudEvent attributes
89+
* are already set as headers via {@link #withContextAttribute(String, String)}.
90+
* @param value the CloudEvent data to use as the message payload
91+
* @return the Spring Integration message with CloudEvent data and attributes
92+
* @throws CloudEventRWException if an error occurs during message creation
93+
*/
94+
@Override
95+
public Message<byte[]> end(CloudEventData value) throws CloudEventRWException {
96+
return MessageBuilder.withPayload(value.toBytes()).copyHeaders(this.headers).build();
97+
}
98+
99+
/**
100+
* Complete the message creation without CloudEvent data.
101+
* Create a message with an empty payload when the CloudEvent contains no data.
102+
* CloudEvent attributes are set as headers via {@link #withContextAttribute(String, String)}.
103+
* @return the Spring Integration message with an empty payload and CloudEvent attributes as headers
104+
*/
105+
@Override
106+
public Message<byte[]> end() {
107+
return MessageBuilder.withPayload(new byte[0]).copyHeaders(this.headers).build();
108+
}
109+
110+
/**
111+
* Add a CloudEvent context attribute to the message headers.
112+
* Map the CloudEvent attribute to a message header by prepending the configured prefix
113+
* to the attribute name (e.g., "id" becomes "ce-id" with default prefix).
114+
* @param name the CloudEvent attribute name
115+
* @param value the CloudEvent attribute value
116+
* @return this writer for method chaining
117+
* @throws CloudEventRWException if an error occurs while setting the attribute
118+
*/
119+
@Override
120+
public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
121+
this.headers.put(this.cloudEventPrefix + name, value);
122+
return this;
123+
}
124+
125+
/**
126+
* Initialize the writer with the CloudEvent specification version.
127+
* Set the specification version as a message header using the configured version key.
128+
* @param version the CloudEvent specification version
129+
* @return this writer for method chaining
130+
*/
131+
@Override
132+
public MessageBuilderMessageWriter create(SpecVersion version) {
133+
this.headers.put(this.specVersionKey, version.toString());
134+
return this;
135+
}
136+
137+
}

spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/ToCloudEventsTransformer.java

Lines changed: 73 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.net.URI;
2121
import java.time.OffsetDateTime;
22-
import java.util.Collections;
2322
import java.util.HashMap;
2423
import java.util.Map;
2524
import java.util.Objects;
@@ -58,6 +57,16 @@
5857
*/
5958
public class ToCloudEventsTransformer extends AbstractTransformer {
6059

60+
private static final String DEFAULT_PREFIX = "ce-";
61+
62+
private static final String DEFAULT_SPECVERSION_KEY = "specversion";
63+
64+
private static final String DEFAULT_DATACONTENTTYPE_KEY = "datacontenttype";
65+
66+
private String cloudEventPrefix = DEFAULT_PREFIX;
67+
68+
private boolean noFormat = false;
69+
6170
private Expression eventIdExpression = new FunctionExpression<Message<?>>(
6271
msg -> Objects.requireNonNull(msg.getHeaders().getId()).toString());
6372

@@ -71,7 +80,10 @@ public class ToCloudEventsTransformer extends AbstractTransformer {
7180

7281
private @Nullable Expression subjectExpression;
7382

74-
private final String [] extensionPatterns;
83+
private final String[] extensionPatterns;
84+
85+
@SuppressWarnings("NullAway.Init")
86+
private CloudEventMessageConverter cloudEventMessageConverter;
7587

7688
@SuppressWarnings("NullAway.Init")
7789
private EvaluationContext evaluationContext;
@@ -149,6 +161,7 @@ protected void onInit() {
149161
appName = appName == null ? "unknown" : appName;
150162
this.sourceExpression = new ValueExpression<>(URI.create("/spring/" + appName + "." + getBeanName()));
151163
}
164+
this.cloudEventMessageConverter = new CloudEventMessageConverter(this.cloudEventPrefix, this.getSpecVersionKey(), this.getDataContentTypeKey());
152165
}
153166

154167
/**
@@ -171,11 +184,6 @@ protected Object doTransform(Message<?> message) {
171184
throw new MessageTransformationException(message, "Missing 'Content-Type' header");
172185
}
173186

174-
EventFormat eventFormat = this.eventFormatProvider.resolveFormat(contentType);
175-
if (eventFormat == null) {
176-
throw new MessageTransformationException("No EventFormat found for '" + contentType + "'");
177-
}
178-
179187
ToCloudEventTransformerExtensions extensions =
180188
new ToCloudEventTransformerExtensions(message.getHeaders(),
181189
this.extensionPatterns);
@@ -194,21 +202,74 @@ protected Object doTransform(Message<?> message) {
194202
cloudEventBuilder.withDataSchema(this.dataSchemaExpression.getValue(this.evaluationContext, message, URI.class));
195203
}
196204

197-
CloudEvent cloudEvent = cloudEventBuilder.withData((byte[])message.getPayload())
205+
CloudEvent cloudEvent = cloudEventBuilder.withData((byte[]) message.getPayload())
198206
.withExtension(extensions)
199207
.build();
200208

201-
return MessageBuilder.withPayload(eventFormat.serialize(cloudEvent))
202-
.copyHeaders(message.getHeaders())
203-
.setHeader(MessageHeaders.CONTENT_TYPE, "application/cloudevents")
204-
.build();
209+
EventFormat eventFormat = this.eventFormatProvider.resolveFormat(contentType);
210+
if (eventFormat == null && !this.noFormat) {
211+
throw new MessageTransformationException("No EventFormat found for '" + contentType + "'");
212+
}
213+
214+
if (eventFormat != null) {
215+
return MessageBuilder.withPayload(eventFormat.serialize(cloudEvent))
216+
.copyHeaders(message.getHeaders())
217+
.setHeader(MessageHeaders.CONTENT_TYPE, "application/cloudevents")
218+
.build();
219+
}
220+
Map<String, Object> headers = new HashMap<>(message.getHeaders());
221+
headers.putAll(extensions.cloudEventExtensions);
222+
223+
return this.cloudEventMessageConverter.toMessage(cloudEvent, new MessageHeaders(headers));
205224
}
206225

207226
@Override
208227
public String getComponentType() {
209228
return "ce:to-cloudevents-transformer";
210229
}
211230

231+
/**
232+
* Returns CloudEvent information to the header if no {@link EventFormat} is found for content type.
233+
* @return true if CloudEvent information should be added to header if no {@link EventFormat} is found.
234+
*/
235+
public boolean isNoFormat() {
236+
return this.noFormat;
237+
}
238+
239+
/**
240+
* Set CloudEvent information to the header if no {@link EventFormat} is found for content type.
241+
* When true and no {@link EventFormat} is found for the content type, CloudEvents are sent with headers instead of
242+
* structured format.
243+
* @param noFormat true to disable format serialization
244+
*/
245+
public void setNoFormat(boolean noFormat) {
246+
this.noFormat = noFormat;
247+
}
248+
249+
/**
250+
* Return the prefix used for CloudEvent headers in binary content mode.
251+
* @return the CloudEvent header prefix
252+
*/
253+
public String getCloudEventPrefix() {
254+
return this.cloudEventPrefix;
255+
}
256+
257+
/**
258+
* Set the prefix for CloudEvent headers in binary content mode.
259+
* @param cloudEventPrefix the prefix to use for CloudEvent headers
260+
*/
261+
public void setCloudEventPrefix(String cloudEventPrefix) {
262+
this.cloudEventPrefix = cloudEventPrefix;
263+
}
264+
265+
private String getSpecVersionKey() {
266+
return this.cloudEventPrefix + DEFAULT_SPECVERSION_KEY;
267+
}
268+
269+
private String getDataContentTypeKey() {
270+
return this.cloudEventPrefix + DEFAULT_DATACONTENTTYPE_KEY;
271+
}
272+
212273
private static class ToCloudEventTransformerExtensions implements CloudEventExtension {
213274

214275
/**

0 commit comments

Comments
 (0)