11/*
2- * Copyright 2017 hbz
2+ * Copyright 2017, 2021 hbz
33 *
44 * Licensed under the Apache License, Version 2.0 the "License";
55 * you may not use this file except in compliance with the License.
1717
1818import com .fasterxml .jackson .core .JsonFactory ;
1919import com .fasterxml .jackson .core .JsonParser ;
20+ import com .fasterxml .jackson .core .JsonProcessingException ;
2021import com .fasterxml .jackson .core .JsonToken ;
22+ import com .fasterxml .jackson .databind .ObjectMapper ;
23+ import com .jayway .jsonpath .JsonPath ;
24+
25+ import org .metafacture .framework .FluxCommand ;
2126import org .metafacture .framework .MetafactureException ;
2227import org .metafacture .framework .StreamReceiver ;
28+ import org .metafacture .framework .annotations .Description ;
29+ import org .metafacture .framework .annotations .In ;
30+ import org .metafacture .framework .annotations .Out ;
2331import org .metafacture .framework .helpers .DefaultObjectPipe ;
2432
2533import java .io .IOException ;
34+ import java .util .Arrays ;
35+ import java .util .List ;
36+ import java .util .stream .Collectors ;
2637
2738/**
2839 * Decodes a record in JSON format.
2940 *
3041 * @author Jens Wille
3142 *
3243 */
44+ @ Description ("Decodes JSON to metadata events. The \' recordPath\' option can be used to set a JsonPath "
45+ + "to extract a path as JSON - or to split the data into multiple JSON documents." )
46+ @ In (String .class )
47+ @ Out (StreamReceiver .class )
48+ @ FluxCommand ("decode-json" )
3349public final class JsonDecoder extends DefaultObjectPipe <String , StreamReceiver > {
3450
3551 public static final String DEFAULT_ARRAY_MARKER = JsonEncoder .ARRAY_MARKER ;
@@ -38,6 +54,8 @@ public final class JsonDecoder extends DefaultObjectPipe<String, StreamReceiver>
3854
3955 public static final String DEFAULT_RECORD_ID = "%d" ;
4056
57+ public static final String DEFAULT_ROOT_PATH = "" ;
58+
4159 private final JsonFactory jsonFactory = new JsonFactory ();
4260
4361 private JsonParser jsonParser ;
@@ -46,12 +64,15 @@ public final class JsonDecoder extends DefaultObjectPipe<String, StreamReceiver>
4664 private String recordId ;
4765 private int recordCount ;
4866
67+ private String recordPath ;
68+
4969 public JsonDecoder () {
5070 super ();
5171
5272 setArrayMarker (DEFAULT_ARRAY_MARKER );
5373 setArrayName (DEFAULT_ARRAY_NAME );
5474 setRecordId (DEFAULT_RECORD_ID );
75+ setRecordPath (DEFAULT_ROOT_PATH );
5576
5677 resetRecordCount ();
5778 }
@@ -96,25 +117,45 @@ public int getRecordCount() {
96117 return recordCount ;
97118 }
98119
120+ public void setRecordPath (final String recordPath ) {
121+ this .recordPath = recordPath ;
122+ }
123+
124+ public String getRecordPath () {
125+ return recordPath ;
126+ }
127+
99128 public void resetRecordCount () {
100129 setRecordCount (0 );
101130 }
102131
103132 @ Override
104- public void process (final String string ) {
133+ public void process (final String json ) {
105134 assert !isClosed ();
106-
107- createParser (string );
108-
109- try {
110- decode ();
111- }
112- catch (final IOException e ) {
113- throw new MetafactureException (e );
114- }
115- finally {
116- closeParser ();
117- }
135+ final List <String > records = recordPath .isEmpty () ? Arrays .asList (json )
136+ : matches (JsonPath .read (json , recordPath ));
137+ records .forEach (record -> {
138+ createParser (record );
139+ try {
140+ decode ();
141+ } catch (final IOException e ) {
142+ throw new MetafactureException (e );
143+ } finally {
144+ closeParser ();
145+ }
146+ });
147+ }
148+
149+ private List <String > matches (Object obj ) {
150+ final List <?> records = (obj instanceof List <?>) ? ((List <?>) obj ) : Arrays .asList (obj );
151+ return records .stream ().map (doc -> {
152+ try {
153+ return new ObjectMapper ().writeValueAsString (doc );
154+ } catch (JsonProcessingException e ) {
155+ e .printStackTrace ();
156+ return doc .toString ();
157+ }
158+ }).collect (Collectors .toList ());
118159 }
119160
120161 @ Override
0 commit comments