From db8f5a1cd7f44ebe00361b544433dd6914bf81e5 Mon Sep 17 00:00:00 2001 From: Fabian Steeg Date: Wed, 25 Aug 2021 15:24:52 +0200 Subject: [PATCH 1/3] Add `recordPath` option to JsonDecoder (#382) Instead of using the entire JSON as a single record, provide a JSON path to query the JSON for the records to process, e.g. `$.data` to process every entry in a `data` array as a record. --- metafacture-json/build.gradle | 2 + .../org/metafacture/json/JsonDecoder.java | 58 ++++++++++++++----- .../org/metafacture/json/JsonDecoderTest.java | 16 +++++ 3 files changed, 63 insertions(+), 13 deletions(-) diff --git a/metafacture-json/build.gradle b/metafacture-json/build.gradle index b349373b9..dd63d0eca 100644 --- a/metafacture-json/build.gradle +++ b/metafacture-json/build.gradle @@ -20,6 +20,8 @@ description = 'Modules for processing JSON data in Metafacture' dependencies { api project(':metafacture-framework') implementation 'com.fasterxml.jackson.core:jackson-core:2.8.5' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.8.5' + implementation 'com.jayway.jsonpath:json-path:2.6.0' testImplementation 'junit:junit:4.12' testImplementation 'org.mockito:mockito-core:2.5.5' } diff --git a/metafacture-json/src/main/java/org/metafacture/json/JsonDecoder.java b/metafacture-json/src/main/java/org/metafacture/json/JsonDecoder.java index 80de6981b..1681bc7de 100644 --- a/metafacture-json/src/main/java/org/metafacture/json/JsonDecoder.java +++ b/metafacture-json/src/main/java/org/metafacture/json/JsonDecoder.java @@ -17,12 +17,19 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.jayway.jsonpath.JsonPath; + import org.metafacture.framework.MetafactureException; import org.metafacture.framework.StreamReceiver; import org.metafacture.framework.helpers.DefaultObjectPipe; import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; /** * Decodes a record in JSON format. @@ -38,6 +45,8 @@ public final class JsonDecoder extends DefaultObjectPipe public static final String DEFAULT_RECORD_ID = "%d"; + public static final String DEFAULT_ROOT_PATH = ""; + private final JsonFactory jsonFactory = new JsonFactory(); private JsonParser jsonParser; @@ -46,12 +55,15 @@ public final class JsonDecoder extends DefaultObjectPipe private String recordId; private int recordCount; + private String recordPath; + public JsonDecoder() { super(); setArrayMarker(DEFAULT_ARRAY_MARKER); setArrayName(DEFAULT_ARRAY_NAME); setRecordId(DEFAULT_RECORD_ID); + setRecordPath(DEFAULT_ROOT_PATH); resetRecordCount(); } @@ -96,25 +108,45 @@ public int getRecordCount() { return recordCount; } + public void setRecordPath(final String recordPath) { + this.recordPath = recordPath; + } + + public String getRecordPath() { + return recordPath; + } + public void resetRecordCount() { setRecordCount(0); } @Override - public void process(final String string) { + public void process(final String json) { assert !isClosed(); - - createParser(string); - - try { - decode(); - } - catch (final IOException e) { - throw new MetafactureException(e); - } - finally { - closeParser(); - } + final List records = recordPath.isEmpty() ? Arrays.asList(json) + : matches(JsonPath.read(json, recordPath)); + records.forEach(record -> { + createParser(record); + try { + decode(); + } catch (final IOException e) { + throw new MetafactureException(e); + } finally { + closeParser(); + } + }); + } + + private List matches(Object obj) { + final List records = (obj instanceof List) ? ((List) obj) : Arrays.asList(obj); + return records.stream().map(doc -> { + try { + return new ObjectMapper().writeValueAsString(doc); + } catch (JsonProcessingException e) { + e.printStackTrace(); + return doc.toString(); + } + }).collect(Collectors.toList()); } @Override diff --git a/metafacture-json/src/test/java/org/metafacture/json/JsonDecoderTest.java b/metafacture-json/src/test/java/org/metafacture/json/JsonDecoderTest.java index 86c6ae2e1..67cd3a3a8 100644 --- a/metafacture-json/src/test/java/org/metafacture/json/JsonDecoderTest.java +++ b/metafacture-json/src/test/java/org/metafacture/json/JsonDecoderTest.java @@ -151,6 +151,22 @@ public void testShouldProcessConcatenatedRecords() { ordered.verify(receiver).endRecord(); } + @Test + public void testShouldProcessRecordsInArray() { + jsonDecoder.setRecordPath("$.data"); + jsonDecoder.process( + "{\"data\":[" + "{\"lit\": \"record 1\"}," + + "{\"lit\": \"record 2\"}" + "]}"); + + final InOrder ordered = inOrder(receiver); + ordered.verify(receiver).startRecord("1"); + ordered.verify(receiver).literal("lit", "record 1"); + ordered.verify(receiver).endRecord(); + ordered.verify(receiver).startRecord("2"); + ordered.verify(receiver).literal("lit", "record 2"); + ordered.verify(receiver).endRecord(); + } + @Test public void testShouldProcessMultipleRecords() { jsonDecoder.process("{\"lit\": \"record 1\"}"); From 8761d9834da51c57dc04aed02c629d6270a52f6e Mon Sep 17 00:00:00 2001 From: Pascal Christoph Date: Thu, 26 Aug 2021 17:40:47 +0200 Subject: [PATCH 2/3] Add flux annotations See #382. --- .../main/java/org/metafacture/json/JsonDecoder.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/metafacture-json/src/main/java/org/metafacture/json/JsonDecoder.java b/metafacture-json/src/main/java/org/metafacture/json/JsonDecoder.java index 1681bc7de..ac504b53f 100644 --- a/metafacture-json/src/main/java/org/metafacture/json/JsonDecoder.java +++ b/metafacture-json/src/main/java/org/metafacture/json/JsonDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 hbz + * Copyright 2017, 2021 hbz * * Licensed under the Apache License, Version 2.0 the "License"; * you may not use this file except in compliance with the License. @@ -22,8 +22,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.jayway.jsonpath.JsonPath; +import org.metafacture.framework.FluxCommand; import org.metafacture.framework.MetafactureException; import org.metafacture.framework.StreamReceiver; +import org.metafacture.framework.annotations.Description; +import org.metafacture.framework.annotations.In; +import org.metafacture.framework.annotations.Out; import org.metafacture.framework.helpers.DefaultObjectPipe; import java.io.IOException; @@ -37,6 +41,11 @@ * @author Jens Wille * */ +@Description("Decodes JSON to metadata events. The \'setRecordPath\' option can be used to set a JsonPath " + + "to extract a path as JSON - or to split the data into multiple JSON documents.") +@In(String.class) +@Out(StreamReceiver.class) +@FluxCommand("decode-json") public final class JsonDecoder extends DefaultObjectPipe { public static final String DEFAULT_ARRAY_MARKER = JsonEncoder.ARRAY_MARKER; From 8fe67c132231490d64643f09103fb6c31483a227 Mon Sep 17 00:00:00 2001 From: Fabian Steeg Date: Fri, 27 Aug 2021 14:40:03 +0200 Subject: [PATCH 3/3] Tweak description annotation for Flux usage (#382) --- .../src/main/java/org/metafacture/json/JsonDecoder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metafacture-json/src/main/java/org/metafacture/json/JsonDecoder.java b/metafacture-json/src/main/java/org/metafacture/json/JsonDecoder.java index ac504b53f..f6dbd2238 100644 --- a/metafacture-json/src/main/java/org/metafacture/json/JsonDecoder.java +++ b/metafacture-json/src/main/java/org/metafacture/json/JsonDecoder.java @@ -41,7 +41,7 @@ * @author Jens Wille * */ -@Description("Decodes JSON to metadata events. The \'setRecordPath\' option can be used to set a JsonPath " +@Description("Decodes JSON to metadata events. The \'recordPath\' option can be used to set a JsonPath " + "to extract a path as JSON - or to split the data into multiple JSON documents.") @In(String.class) @Out(StreamReceiver.class)