Skip to content

Commit 199db65

Browse files
committed
fix(plugins): fix XMLFileInputReader on the root document when force.array.on.fields is used (#75)
Resolves: #75
1 parent 1c75a3e commit 199db65

File tree

2 files changed

+27
-9
lines changed

2 files changed

+27
-9
lines changed

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/reader/XMLFileInputReader.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,16 +215,19 @@ private static class Node2StructConverter {
215215
*/
216216
private static TypedStruct convertNodeObjectTree(final Node node, final FieldPaths forceArrayFields) {
217217
Objects.requireNonNull(node, "node cannot be null");
218-
final FieldPaths currentForceArrayFields = forceArrayFields.next(determineNodeName(node));
218+
String nodeName = determineNodeName(node);
219+
final FieldPaths currentForceArrayFields = nodeName.equals("#document")
220+
? forceArrayFields :
221+
forceArrayFields.next(nodeName);
219222
TypedStruct container = TypedStruct.create();
220223
addAllNodeAttributes(container, node.getAttributes());
221224
for (Node child = node.getFirstChild(); child != null; child = child.getNextSibling()) {
222-
final String nodeName = isTextNode(child) ? determineNodeName(node) : determineNodeName(child);
225+
final String childNodeName = isTextNode(child) ? nodeName : determineNodeName(child);
223226
Optional<?> optional = readNodeObject(child, currentForceArrayFields);
224227
if (optional.isPresent()) {
225228
Object nodeValue = optional.get();
226-
final boolean isArray = currentForceArrayFields.anyMatches(nodeName);
227-
container = enrichStructWithObject(container, nodeName, nodeValue, isArray);
229+
final boolean isArray = currentForceArrayFields.anyMatches(childNodeName);
230+
container = enrichStructWithObject(container, childNodeName, nodeValue, isArray);
228231

229232
}
230233
}

connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/reader/XMLFileInputReaderTest.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,14 +183,30 @@ public void should_read_record_given_valid_force_array_fields() {
183183
iterator.forEachRemaining(r -> records.addAll(r.collect()));
184184

185185
Assert.assertEquals(1, records.size());
186-
TypedStruct broker = records.get(0).value();
187-
Assert.assertEquals(Type.ARRAY, broker.get("topicPartition").type());
188-
Assert.assertEquals(1, broker.get("topicPartition").getArray().size());
189-
TypedValue segment = ((TypedStruct) broker.get("topicPartition").getArray().iterator().next()).find("numSegments");
186+
187+
TypedStruct record = records.get(0).value();
188+
Assert.assertEquals(Type.ARRAY, record.get("topicPartition").type());
189+
Assert.assertEquals(1, record.get("topicPartition").getArray().size());
190+
TypedValue segment = ((TypedStruct) record.get("topicPartition").getArray().iterator().next()).find("numSegments");
190191
Assert.assertEquals(Type.ARRAY, segment.type());
191192
Assert.assertEquals(1, segment.getArray().size());
192193
}
193194

195+
@Test
196+
public void should_read_record_given_valid_force_array_fields_and_default_xpath() {
197+
reader.configure(new HashMap<String, String>(){{
198+
put(FORCE_ARRAY_ON_FIELDS_CONFIG, "cluster.broker.topicPartition");
199+
}});
200+
201+
FileInputIterator<FileRecord<TypedStruct>> iterator = reader.newIterator(context);
202+
List<FileRecord<TypedStruct>> records = new ArrayList<>();
203+
iterator.forEachRemaining(r -> records.addAll(r.collect()));
204+
205+
TypedStruct record = records.get(0).value();
206+
TypedStruct element = (TypedStruct)record.find("cluster.broker").getArray().iterator().next();
207+
Assert.assertEquals(Type.ARRAY, element.find("topicPartition").type());
208+
}
209+
194210
@Test
195211
public void should_read_record_given_single_text_node_with_attrs() throws IOException {
196212
try(XMLFileInputReader reader = createNewXMLFileInputReader(TEXT_NODE_TEST_XML_DOCUMENT)) {
@@ -205,7 +221,6 @@ public void should_read_record_given_single_text_node_with_attrs() throws IOExce
205221
}
206222
}
207223

208-
209224
private XMLFileInputReader createNewXMLFileInputReader(final String xmlDocument) throws IOException {
210225
File file = testFolder.newFile();
211226
try (BufferedWriter bw = Files.newBufferedWriter(file.toPath(), Charset.defaultCharset())) {

0 commit comments

Comments
 (0)