1414 * limitations under the License.
1515 */
1616
17+
1718package org .springframework .integration .cloudevents .transformer ;
1819
1920import java .net .URI ;
2021import java .time .OffsetDateTime ;
22+ import java .util .Collections ;
2123import java .util .HashMap ;
2224import java .util .Map ;
2325import java .util .Objects ;
2426import java .util .Set ;
25- import java .util .function .Function ;
2627
2728import io .cloudevents .CloudEvent ;
2829import io .cloudevents .CloudEventExtension ;
3839import org .springframework .expression .common .LiteralExpression ;
3940import org .springframework .integration .expression .ExpressionUtils ;
4041import org .springframework .integration .expression .FunctionExpression ;
42+ import org .springframework .integration .expression .ValueExpression ;
43+ import org .springframework .integration .support .utils .PatternMatchUtils ;
4144import org .springframework .integration .transformer .AbstractTransformer ;
4245import org .springframework .integration .transformer .MessageTransformationException ;
4346import org .springframework .messaging .Message ;
4447import org .springframework .messaging .MessageHeaders ;
4548import org .springframework .messaging .support .MessageBuilder ;
46- import org .springframework .util .StringUtils ;
49+ import org .springframework .util .Assert ;
4750
4851/**
49- * A Spring Integration transformer that converts messages to CloudEvent format.
50- * Attribute and extension mapping is performed based on {@link Expression}s.
52+ * Converts messages to CloudEvent format.
53+ * Performs attribute and extension mapping based on {@link Expression}s.
5154 *
5255 * @author Glenn Renfro
5356 *
5457 * @since 7.0
5558 */
56- public class ToCloudEventTransformer extends AbstractTransformer {
59+ public class ToCloudEventsTransformer extends AbstractTransformer {
5760
58- private Expression idExpression = new FunctionExpression <Message <?>>(
61+ private Expression eventIdExpression = new FunctionExpression <Message <?>>(
5962 msg -> Objects .requireNonNull (msg .getHeaders ().getId ()).toString ());
6063
6164 @ SuppressWarnings ("NullAway.Init" )
@@ -64,82 +67,75 @@ public class ToCloudEventTransformer extends AbstractTransformer {
6467 private Expression typeExpression = new LiteralExpression ("spring.message" );
6568
6669 @ SuppressWarnings ("NullAway.Init" )
67- private Expression dataSchemaExpression ;
70+ private @ Nullable Expression dataSchemaExpression ;
6871
69- private Expression subjectExpression = new FunctionExpression <>((Function <Message <?>, @ Nullable String >)
70- message -> null );
72+ private @ Nullable Expression subjectExpression ;
7173
72- private final Expression @ Nullable [] cloudEventExtensionExpressions ;
74+ private final String [] extensionPatterns ;
7375
7476 @ SuppressWarnings ("NullAway.Init" )
7577 private EvaluationContext evaluationContext ;
7678
7779 private final EventFormatProvider eventFormatProvider = EventFormatProvider .getInstance ();
7880
7981 /**
80- * Construct a ToCloudEventTransformer .
81- *
82- * @param cloudEventExtensionExpressions an array of {@link Expression}s for establishing CloudEvent extensions
82+ * Construct a ToCloudEventsTransformer .
83+ * @param extensionPatterns patterns to evaluate whether message headers should be added as extensions
84+ * to the CloudEvent
8385 */
84- public ToCloudEventTransformer ( Expression @ Nullable ... cloudEventExtensionExpressions ) {
85- this .cloudEventExtensionExpressions = cloudEventExtensionExpressions ;
86+ public ToCloudEventsTransformer ( String ... extensionPatterns ) {
87+ this .extensionPatterns = extensionPatterns ;
8688 }
8789
8890 /**
89- * Construct a ToCloudEventTransformer with no {@link Expression}s for extensions.
90- *
91+ * Construct a ToCloudEventsTransformer with no extensionPatterns.
9192 */
92- public ToCloudEventTransformer () {
93- this (( Expression []) null ) ;
93+ public ToCloudEventsTransformer () {
94+ this . extensionPatterns = new String [ 0 ] ;
9495 }
9596
9697 /**
9798 * Set the {@link Expression} for creating CloudEvent ids.
98- * Default expression extracts the id from the {@link MessageHeaders} of the message.
99- *
100- * @param idExpression the expression used to create the id for each CloudEvent
99+ * Defaults to extracting the id from the {@link MessageHeaders} of the message.
100+ * @param eventIdExpression the expression to create the id for each CloudEvent
101101 */
102- public void setIdExpression (Expression idExpression ) {
103- this .idExpression = idExpression ;
102+ public void setEventIdExpression (Expression eventIdExpression ) {
103+ this .eventIdExpression = eventIdExpression ;
104104 }
105105
106106 /**
107107 * Set the {@link Expression} for creating CloudEvent source.
108- * Default expression is {@code "/spring/" + appName + "." + getBeanName())}.
109- *
110- * @param sourceExpression the expression used to create the source for each CloudEvent
108+ * Defaults to {@code "/spring/" + appName + "." + getBeanName())}.
109+ * @param sourceExpression the expression to create the source for each CloudEvent
111110 */
112111 public void setSourceExpression (Expression sourceExpression ) {
113112 this .sourceExpression = sourceExpression ;
114113 }
115114
116115 /**
117116 * Set the {@link Expression} for extracting the type for the CloudEvent.
118- * Default expression sets the default to "spring.message".
119- *
120- * @param typeExpression the expression used to create the type for each CloudEvent
117+ * Defaults to "spring.message".
118+ * @param typeExpression the expression to create the type for each CloudEvent
121119 */
122120 public void setTypeExpression (Expression typeExpression ) {
123121 this .typeExpression = typeExpression ;
124122 }
125123
126124 /**
127125 * Set the {@link Expression} for creating the dataSchema for the CloudEvent.
128- * Default {@link Expression} evaluates to a null.
129- *
130- * @param dataSchemaExpression the expression used to create the dataSchema for each CloudEvent
126+ * Defaults to null.
127+ * @param dataSchemaExpression the expression to create the dataSchema for each CloudEvent
131128 */
132- public void setDataSchemaExpression (Expression dataSchemaExpression ) {
129+ public void setDataSchemaExpression (@ Nullable Expression dataSchemaExpression ) {
133130 this .dataSchemaExpression = dataSchemaExpression ;
134131 }
135132
136133 /**
137134 * Set the {@link Expression} for creating the subject for the CloudEvent.
138- * Default {@link Expression} evaluates to a null.
139- *
140- * @param subjectExpression the expression used to create the subject for each CloudEvent
135+ * Defaults to null.
136+ * @param subjectExpression the expression to create the subject for each CloudEvent
141137 */
142- public void setSubjectExpression (Expression subjectExpression ) {
138+ public void setSubjectExpression (@ Nullable Expression subjectExpression ) {
143139 this .subjectExpression = subjectExpression ;
144140 }
145141
@@ -149,43 +145,26 @@ protected void onInit() {
149145 this .evaluationContext = ExpressionUtils .createStandardEvaluationContext (getBeanFactory ());
150146 ApplicationContext applicationContext = getApplicationContext ();
151147 if (this .sourceExpression == null ) { // in the case the user sets the value prior to onInit.
152- this .sourceExpression = new FunctionExpression <>((Function <Message <?>, URI >) message -> {
153- String appName = applicationContext .getEnvironment ().getProperty ("spring.application.name" );
154- appName = appName == null ? "unknown" : appName ;
155- return URI .create ("/spring/" + appName + "." + getBeanName ());
156- });
157- }
158- if (this .dataSchemaExpression == null ) { // in the case the user sets the value prior to onInit.
159- this .dataSchemaExpression = new FunctionExpression <>((Function <Message <?>, @ Nullable URI >)
160- message -> null );
148+ String appName = applicationContext .getEnvironment ().getProperty ("spring.application.name" );
149+ appName = appName == null ? "unknown" : appName ;
150+ this .sourceExpression = new ValueExpression <>(URI .create ("/spring/" + appName + "." + getBeanName ()));
161151 }
162152 }
163153
164154 /**
165155 * Transform the input message into a CloudEvent message.
166- *
167156 * @param message the input Spring Integration message to transform
168157 * @return CloudEvent message in the specified format
169158 * @throws RuntimeException if serialization fails
170159 */
171160 @ SuppressWarnings ("unchecked" )
172161 @ Override
173162 protected Object doTransform (Message <?> message ) {
163+ Assert .isInstanceOf (byte [].class , message .getPayload (), "Message payload must be of type byte[]" );
174164
175- String id = this .idExpression .getValue (this .evaluationContext , message , String .class );
176- if (!StringUtils .hasText (id )) {
177- throw new MessageTransformationException (message , "No id was found with the specified expression" );
178- }
179-
165+ String id = this .eventIdExpression .getValue (this .evaluationContext , message , String .class );
180166 URI source = this .sourceExpression .getValue (this .evaluationContext , message , URI .class );
181- if (source == null ) {
182- throw new MessageTransformationException (message , "No source was found with the specified expression" );
183- }
184-
185167 String type = this .typeExpression .getValue (this .evaluationContext , message , String .class );
186- if (type == null ) {
187- throw new MessageTransformationException (message , "No type was found with the specified expression" );
188- }
189168
190169 String contentType = message .getHeaders ().get (MessageHeaders .CONTENT_TYPE , String .class );
191170 if (contentType == null ) {
@@ -198,23 +177,30 @@ protected Object doTransform(Message<?> message) {
198177 }
199178
200179 ToCloudEventTransformerExtensions extensions =
201- new ToCloudEventTransformerExtensions (this . evaluationContext , ( Message < byte []>) message ,
202- this .cloudEventExtensionExpressions );
180+ new ToCloudEventTransformerExtensions (message . getHeaders () ,
181+ this .extensionPatterns );
203182
204- CloudEvent cloudEvent = CloudEventBuilder .v1 ()
183+ CloudEventBuilder cloudEventBuilder = CloudEventBuilder .v1 ()
205184 .withId (id )
206185 .withSource (source )
207186 .withType (type )
208187 .withTime (OffsetDateTime .now ())
209- .withDataContentType (contentType )
210- .withDataSchema (this .dataSchemaExpression .getValue (this .evaluationContext , message , URI .class ))
211- .withSubject (this .subjectExpression .getValue (this .evaluationContext , message , String .class ))
212- .withData (getPayload (message ))
188+ .withDataContentType (contentType );
189+
190+ if (this .subjectExpression != null ) {
191+ cloudEventBuilder .withSubject (this .subjectExpression .getValue (this .evaluationContext , message , String .class ));
192+ }
193+ if (this .dataSchemaExpression != null ) {
194+ cloudEventBuilder .withDataSchema (this .dataSchemaExpression .getValue (this .evaluationContext , message , URI .class ));
195+ }
196+
197+ CloudEvent cloudEvent = cloudEventBuilder .withData ((byte [])message .getPayload ())
213198 .withExtension (extensions )
214199 .build ();
215200
216201 return MessageBuilder .withPayload (eventFormat .serialize (cloudEvent ))
217202 .copyHeaders (message .getHeaders ())
203+ .setHeader (MessageHeaders .CONTENT_TYPE , "application/cloudevents" )
218204 .build ();
219205 }
220206
@@ -223,42 +209,27 @@ public String getComponentType() {
223209 return "ce:to-cloudevents-transformer" ;
224210 }
225211
226- private byte [] getPayload (Message <?> message ) {
227- if (message .getPayload () instanceof byte [] messagePayload ) {
228- return messagePayload ;
229- }
230- throw new MessageTransformationException ("Message payload is not a byte array" );
231- }
232-
233212 private static class ToCloudEventTransformerExtensions implements CloudEventExtension {
234213
235214 /**
236- * Map storing the CloudEvent extensions extracted from message headers.
215+ * Stores the CloudEvent extensions extracted from message headers.
237216 */
238217 private final Map <String , Object > cloudEventExtensions ;
239218
240219 /**
241220 * Construct CloudEvent extensions by processing a message using expressions.
242221 *
243- * @param message the Spring Integration message
244- * @param expressions an array of {@link Expression}s where each accepts a message and returns a
245- * {@code Map<String, Object>} of extensions
222+ * @param headers the headers from the Spring Integration message
223+ * @param extensionPatterns patterns to determine whether message headers are extensions
246224 */
247225 @ SuppressWarnings ("unchecked" )
248- ToCloudEventTransformerExtensions (EvaluationContext evaluationContext , Message <byte []> message ,
249- Expression @ Nullable ... expressions ) {
226+ ToCloudEventTransformerExtensions (Map <String , Object > headers , String ... extensionPatterns ) {
250227 this .cloudEventExtensions = new HashMap <>();
251- if (expressions == null ) {
252- return ;
253- }
254- for (Expression expression : expressions ) {
255- Map <String , Object > result = (Map <String , Object >) expression .getValue (evaluationContext , message ,
256- Map .class );
257- if (result == null ) {
258- continue ;
259- }
260- for (String key : result .keySet ()) {
261- this .cloudEventExtensions .put (key , result .get (key ));
228+ Boolean result = null ;
229+ for (Map .Entry <String , Object > header : headers .entrySet ()) {
230+ result = PatternMatchUtils .smartMatch (header .getKey (), extensionPatterns );
231+ if (result != null && result ) {
232+ this .cloudEventExtensions .put (header .getKey (), header .getValue ());
262233 }
263234 }
264235 }
0 commit comments