Skip to content

Commit ed345f7

Browse files
committed
feat(filters): add support for dot-notation to SplitFilter
1 parent 16dc51d commit ed345f7

File tree

2 files changed

+23
-6
lines changed
  • connect-file-pulse-filters/src

2 files changed

+23
-6
lines changed

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/SplitFilter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ public RecordsIterable<TypedStruct> apply(final FilterContext context,
5959

6060
for (final String key : configs.split()) {
6161

62-
if (!record.has(key)) {
62+
if (!record.exists(key)) {
6363
throw new FilterException("Invalid field name '" + key + "'");
6464
}
6565

66-
TypedValue value = record.get(key);
66+
TypedValue value = record.find(key);
6767
if (value.type() != Type.STRING) {
6868
throw new FilterException("Cannot split field '" + key + "' of type '" + value.type() + "'");
6969
}

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/SplitFilterTest.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,21 +41,38 @@ public void setUp() {
4141
}
4242

4343
@Test
44-
public void shouldSplitGivenExistingField() {
44+
public void should_split_given_existing_field() {
4545

4646
configs.put(SplitFilterConfig.MUTATE_SPLIT_CONFIG, "foo");
4747
filter.configure(configs);
4848

4949
TypedStruct record = TypedStruct.create().put("foo", "val0,val1,val2");
50-
List<TypedStruct> results = this.filter.apply(null, record, false).collect();
50+
List<TypedStruct> results = filter.apply(null, record, false).collect();
51+
52+
assertOutput(results, "foo");
53+
}
54+
55+
@Test
56+
public void should_split_given_existing_path() {
57+
58+
configs.put(SplitFilterConfig.MUTATE_SPLIT_CONFIG, "foo.bar");
59+
filter.configure(configs);
60+
61+
TypedStruct record = TypedStruct.create().insert("foo.bar", "val0,val1,val2");
62+
List<TypedStruct> results = filter.apply(null, record, false).collect();
5163

5264
Assert.assertNotNull(results);
5365
Assert.assertEquals(1, results.size());
66+
assertOutput(results, "foo.bar");
67+
}
5468

69+
private void assertOutput(final List<TypedStruct> results, final String field) {
70+
Assert.assertNotNull(results);
71+
Assert.assertEquals(1, results.size());
5572
TypedStruct result = results.get(0);
56-
List<String> array = result.getArray("foo");
73+
List<String> array = result.getArray(field);
5774
Assert.assertEquals(3, array.size());
58-
for (int i = 0 ; i < 3; i++) {
75+
for (int i = 0; i < 3; i++) {
5976
Assert.assertEquals("val" + i, array.get(i));
6077
}
6178
}

0 commit comments

Comments
 (0)