Skip to content

Commit fc28d87

Browse files
committed
fix(filters): fix AppendFilter overwrite propery not working with ScEL (#62)
Resolves: #62
1 parent 588e606 commit fc28d87

File tree

3 files changed

+36
-6
lines changed

3 files changed

+36
-6
lines changed

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/AppendFilterConfig.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.config;
2020

21+
import io.streamthoughts.kafka.connect.filepulse.expression.Expression;
22+
import io.streamthoughts.kafka.connect.filepulse.expression.parser.ExpressionParsers;
2123
import org.apache.kafka.common.config.ConfigDef;
2224

2325
import java.util.Map;
@@ -42,8 +44,8 @@ public AppendFilterConfig(final Map<?, ?> originals) {
4244
super(configDef(), originals);
4345
}
4446

45-
public String field() {
46-
return getString(APPEND_FIELD_CONFIG);
47+
public Expression field() {
48+
return ExpressionParsers.parseExpression(getString(APPEND_FIELD_CONFIG));
4749
}
4850

4951
public boolean overwrite() {

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AppendFilter.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.streamthoughts.kafka.connect.filepulse.config.AppendFilterConfig;
2222
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
2323
import io.streamthoughts.kafka.connect.filepulse.expression.Expression;
24+
import io.streamthoughts.kafka.connect.filepulse.expression.PropertyExpression;
2425
import io.streamthoughts.kafka.connect.filepulse.expression.StandardEvaluationContext;
2526
import io.streamthoughts.kafka.connect.filepulse.expression.parser.ExpressionParsers;
2627
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
@@ -38,6 +39,8 @@ public class AppendFilter extends AbstractMergeRecordFilter<AppendFilter> {
3839
private List<Expression> values;
3940
private Expression fieldExpression;
4041

42+
protected String target;
43+
4144
/**
4245
* {@inheritDoc}
4346
*/
@@ -49,7 +52,7 @@ public void configure(final Map<String, ?> props) {
4952
// currently, multiple expressions is not supported
5053
values = Collections.singletonList(ExpressionParsers.parseExpression(config.value()));
5154

52-
fieldExpression = ExpressionParsers.parseExpression(config.field());
55+
fieldExpression = config.field();
5356
}
5457

5558
/**
@@ -97,15 +100,18 @@ protected RecordsIterable<TypedStruct> apply(final FilterContext context,
97100
}
98101

99102
private Expression mayEvaluateWriteExpression(final StandardEvaluationContext evaluationContext) {
103+
Expression expression = fieldExpression;
100104
if (!fieldExpression.canWrite()) {
101105
final String evaluated = fieldExpression.readValue(evaluationContext, String.class);
102106
if (evaluated == null) {
103107
throw new FilterException("Invalid value for property 'field'. Evaluation of expression '"
104108
+ fieldExpression.originalExpression() + " 'returns 'null'.");
105109
}
106-
return ExpressionParsers.parseExpression(evaluated);
110+
expression = ExpressionParsers.parseExpression(evaluated);
107111
}
108-
return fieldExpression;
112+
113+
target = ((PropertyExpression)expression).getAttribute();
114+
return expression;
109115
}
110116

111117
/**
@@ -114,7 +120,7 @@ private Expression mayEvaluateWriteExpression(final StandardEvaluationContext ev
114120
@Override
115121
protected Set<String> overwrite() {
116122
if (config.overwrite()) {
117-
return Collections.singleton(config.field());
123+
return Collections.singleton(target);
118124
}
119125
return Collections.emptySet();
120126
}

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/AppendFilterTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,26 @@ public void shouldSupportPropertyExpressionWithScopeForFieldConfig() {
9696
filter.apply(context, STRUCT);
9797
Assert.assertEquals("my-topic-foo", context.topic());
9898
}
99+
100+
@Test
101+
public void shouldOverwriteExistingValueGivenOverwriteTrue() {
102+
configs.put(AppendFilterConfig.APPEND_FIELD_CONFIG, "$value.field");
103+
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, "bar");
104+
configs.put(AppendFilterConfig.APPEND_OVERWRITE_CONFIG, "true");
105+
filter.configure(configs);
106+
final TypedStruct input = TypedStruct.create().put("field", "foo");
107+
RecordsIterable<TypedStruct> results = filter.apply(context, input, false);
108+
Assert.assertEquals("bar", results.last().getString("field"));
109+
}
110+
111+
@Test
112+
public void shouldMergeExistingValueGivenOverwriteFalse() {
113+
configs.put(AppendFilterConfig.APPEND_FIELD_CONFIG, "$value.field");
114+
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, "bar");
115+
configs.put(AppendFilterConfig.APPEND_OVERWRITE_CONFIG, "false");
116+
filter.configure(configs);
117+
final TypedStruct input = TypedStruct.create().put("field", "foo");
118+
RecordsIterable<TypedStruct> results = filter.apply(context, input, false);
119+
Assert.assertEquals("[foo, bar]", results.last().getArray("field").toString());
120+
}
99121
}

0 commit comments

Comments
 (0)