Skip to content

Commit f513b67

Browse files
authored
Fix KplMessageHandler.setGlueSchema()
The `KplMessageHandler.setGlueSchema(Schema glueSchema)` calls `setPartitionKeyExpression()`. Probably just copy/paste artefact Fix `KplMessageHandler.setGlueSchema(Schema glueSchema)` to call a proper `setGlueSchemaExpression()` Signed-off-by: Aleksey Krichevskiy <[email protected]>
1 parent d873484 commit f513b67

File tree

2 files changed

+12
-1
lines changed

2 files changed

+12
-1
lines changed

src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ public void setHeaderMapper(HeaderMapper<Void> headerMapper) {
229229
* @see UserRecord#setSchema(Schema)
230230
*/
231231
public void setGlueSchema(Schema glueSchema) {
232-
setPartitionKeyExpression(new ValueExpression<>(glueSchema));
232+
setGlueSchemaExpression(new ValueExpression<>(glueSchema));
233233
}
234234

235235
/**

src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.amazonaws.services.kinesis.producer.KinesisProducer;
2020
import com.amazonaws.services.kinesis.producer.UserRecord;
21+
import com.amazonaws.services.schemaregistry.common.Schema;
2122
import org.junit.jupiter.api.AfterEach;
2223
import org.junit.jupiter.api.Test;
2324
import org.mockito.ArgumentCaptor;
@@ -58,6 +59,9 @@
5859
@DirtiesContext
5960
public class KplMessageHandlerTests {
6061

62+
@Autowired
63+
protected Schema schema;
64+
6165
@Autowired
6266
protected KinesisProducer kinesisProducer;
6367

@@ -89,6 +93,7 @@ void kplMessageHandlerWithRawPayloadBackpressureDisabledSuccess() {
8993
assertThat(userRecord.getStreamName()).isEqualTo("someStream");
9094
assertThat(userRecord.getPartitionKey()).isEqualTo("somePartitionKey");
9195
assertThat(userRecord.getExplicitHashKey()).isNull();
96+
assertThat(userRecord.getSchema()).isSameAs(this.schema);
9297
}
9398

9499
@Test
@@ -116,6 +121,7 @@ void kplMessageHandlerWithRawPayloadBackpressureEnabledCapacityAvailable() {
116121
assertThat(userRecord.getStreamName()).isEqualTo("someStream");
117122
assertThat(userRecord.getPartitionKey()).isEqualTo("somePartitionKey");
118123
assertThat(userRecord.getExplicitHashKey()).isNull();
124+
assertThat(userRecord.getSchema()).isSameAs(this.schema);
119125
}
120126

121127
@Test
@@ -174,9 +180,14 @@ public MessageHandler kplMessageHandler(KinesisProducer kinesisProducer) {
174180
KplMessageHandler kplMessageHandler = new KplMessageHandler(kinesisProducer);
175181
kplMessageHandler.setAsync(true);
176182
kplMessageHandler.setStream("someStream");
183+
kplMessageHandler.setGlueSchema(schema());
177184
return kplMessageHandler;
178185
}
179186

187+
@Bean
188+
public Schema schema() {
189+
return new Schema("syntax=\"proto2\";", "PROTOBUF", "testschema");
190+
}
180191
}
181192

182193
}

0 commit comments

Comments
 (0)