Skip to content

Commit 726f82b

Browse files
committed
feat(filters): add support for dot-notation to RenameFilter
1 parent 434e31a commit 726f82b

File tree

3 files changed

+77
-14
lines changed
  • connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data
  • connect-file-pulse-filters/src

3 files changed

+77
-14
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/TypedStruct.java

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -78,17 +78,6 @@ private TypedStruct(final StructSchema schema) {
7878
this.values = new LinkedList<>();
7979
}
8080

81-
public TypedStruct rename(final String field, final String newField) {
82-
schema.rename(field, newField);
83-
return this;
84-
}
85-
86-
public TypedStruct remove(final String field) {
87-
TypedField removed = schema.remove(field);
88-
if (removed != null) values.remove(removed.index());
89-
return this;
90-
}
91-
9281
/**
9382
* {@inheritDoc}
9483
*/
@@ -319,7 +308,57 @@ public <K, V> Map<K, V> getMap(final String field) throws DataException {
319308
return getCheckedType(field, Type.MAP);
320309
}
321310

311+
312+
/**
313+
* Renames the field present to the given path.
314+
*
315+
* @param path the path of the field.
316+
* @param newField the new field name.
317+
* @return return this.
318+
*/
319+
public TypedStruct rename(final String path, final String newField) {
320+
if (has(path)) {
321+
schema.rename(path, newField);
322+
return this;
323+
}
324+
325+
if (isDotPropertyAccessPath(path)) {
326+
String[] split = path.split("\\.", 2);
327+
if (has(split[0])) {
328+
TypedValue child = get(split[0]);
329+
if (child.schema().type() == Type.STRUCT) {
330+
return child.getStruct().rename(split[1], newField);
331+
}
332+
}
333+
}
334+
return this;
335+
}
336+
337+
public TypedStruct remove(final String field) {
338+
TypedField removed = schema.remove(field);
339+
if (removed != null) values.remove(removed.index());
340+
return this;
341+
}
342+
343+
/**
344+
* Checks if a field exist for the given path.
345+
*
346+
* @param path the path to check.
347+
* @return {@code true} if the path exists.
348+
*/
349+
public boolean exists(final String path) {
350+
Objects.requireNonNull(path, "path cannot be null");
351+
return find(path) != null;
352+
}
353+
354+
/**
355+
* Finds the value for the given path.
356+
*
357+
* @param path the path.
358+
* @return the value or {@code null} if no value exists.
359+
*/
322360
public TypedValue find(final String path) {
361+
Objects.requireNonNull(path, "path cannot be null");
323362
if (has(path)) return get(path);
324363

325364
if (isDotPropertyAccessPath(path)) {
@@ -334,6 +373,13 @@ public TypedValue find(final String path) {
334373
return null;
335374
}
336375

376+
/**
377+
* Inserts the given object value to the given path.
378+
*
379+
* @param path the object path.
380+
* @param value the object value.
381+
* @return return this?
382+
*/
337383
public TypedStruct insert(final String path, final Object value) {
338384
if (path == null || path.isEmpty()) {
339385
throw new IllegalArgumentException("Cannot insert value given null or empty path");

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/RenameFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public RecordsIterable<TypedStruct> apply(final FilterContext context,
5454
final TypedStruct record,
5555
final boolean hasNext) throws FilterException {
5656

57-
if (record.has(configs.field())) {
57+
if (record.exists(configs.field())) {
5858
record.rename(configs.field(), configs.target());
5959
return new RecordsIterable<>(record);
6060

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/RenameFilterTest.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,36 @@ public void setUp() {
4141
}
4242

4343
@Test
44-
public void shouldRenameGivenExistingField() {
44+
public void should_rename_given_existing_fields() {
4545

4646
configs.put(RenameFilterConfig.RENAME_FIELD_CONFIG, "foo");
4747
configs.put(RenameFilterConfig.RENAME_TARGET_CONFIG, "bar");
4848
filter.configure(configs);
4949

5050
TypedStruct record = TypedStruct.create().put("foo", "dummy-value");
51-
List<TypedStruct> results = this.filter.apply(null, record, false).collect();
51+
List<TypedStruct> results = filter.apply(null, record, false).collect();
5252

5353
Assert.assertNotNull(results);
5454
Assert.assertEquals(1, results.size());
5555

5656
TypedStruct result = results.get(0);
5757
Assert.assertEquals("dummy-value", result.getString("bar"));
5858
}
59+
60+
@Test
61+
public void should_rename_given_existing_path() {
62+
63+
configs.put(RenameFilterConfig.RENAME_FIELD_CONFIG, "foo.bar");
64+
configs.put(RenameFilterConfig.RENAME_TARGET_CONFIG, "foo");
65+
filter.configure(configs);
66+
67+
TypedStruct record = TypedStruct.create().insert("foo.bar", "dummy-value");
68+
List<TypedStruct> results = filter.apply(null, record, false).collect();
69+
70+
Assert.assertNotNull(results);
71+
Assert.assertEquals(1, results.size());
72+
73+
TypedStruct result = results.get(0);
74+
Assert.assertEquals("dummy-value", result.find("foo.foo").getString());
75+
}
5976
}

0 commit comments

Comments
 (0)