1+ package com .basho .riak .client ;
2+
3+ import com .basho .riak .client .api .RiakClient ;
4+ import com .basho .riak .client .api .cap .Quorum ;
5+ import com .basho .riak .client .api .cap .VClock ;
6+ import com .basho .riak .client .api .commands .indexes .BinIndexQuery ;
7+ import com .basho .riak .client .api .commands .indexes .IntIndexQuery ;
8+ import com .basho .riak .client .api .commands .indexes .SecondaryIndexQuery ;
9+ import com .basho .riak .client .api .commands .kv .FetchValue ;
10+ import com .basho .riak .client .api .commands .kv .StoreValue ;
11+ import com .basho .riak .client .core .query .Location ;
12+ import com .basho .riak .client .core .query .Namespace ;
13+ import com .basho .riak .client .core .query .RiakObject ;
14+ import com .basho .riak .client .core .query .indexes .LongIntIndex ;
15+ import com .basho .riak .client .core .query .indexes .RiakIndex ;
16+ import com .basho .riak .client .core .query .indexes .RiakIndexes ;
17+ import com .basho .riak .client .core .query .indexes .StringBinIndex ;
18+ import com .basho .riak .client .core .util .BinaryValue ;
19+ import com .fasterxml .jackson .core .JsonGenerator ;
20+ import com .fasterxml .jackson .core .JsonParser ;
21+ import com .fasterxml .jackson .core .JsonProcessingException ;
22+ import com .fasterxml .jackson .core .type .TypeReference ;
23+ import com .fasterxml .jackson .databind .*;
24+ import com .fasterxml .jackson .databind .module .SimpleModule ;
25+ import net .javacrumbs .jsonunit .core .internal .JsonUtils ;
26+ import net .javacrumbs .jsonunit .core .internal .NodeFactory ;
27+ import org .apache .commons .lang3 .reflect .FieldUtils ;
28+ import org .slf4j .Logger ;
29+ import org .slf4j .LoggerFactory ;
30+
31+ import java .io .IOException ;
32+ import java .util .*;
33+ import java .util .concurrent .ExecutionException ;
34+ import java .util .stream .Collectors ;
35+
36+ public class RiakTestFunctions
37+ {
38+ public static class RiakObjectData
39+ {
40+ public String key ;
41+ public Object value ;
42+ public Map <String , Object > indices ;
43+ }
44+
45+ protected static Logger logger = LoggerFactory .getLogger (RiakTestFunctions .class );
46+
47+ /**
48+ * Tolerant mapper that doesn't require quotation for field names
49+ * and allows to use single quote for string values
50+ */
51+ protected final static ObjectMapper tolerantMapper = initializeJsonUnitMapper ();
52+
53+ /**
54+ * Making JsonAssert to be more tolerant to JSON format.
55+ * And add some useful serializers
56+ */
57+ private static ObjectMapper initializeJsonUnitMapper ()
58+ {
59+ final Object converter ;
60+ try
61+ {
62+ converter = FieldUtils .readStaticField (JsonUtils .class , "converter" , true );
63+
64+ @ SuppressWarnings ("unchecked" )
65+ final List <NodeFactory > factories = (List <NodeFactory >) FieldUtils .readField (converter , "factories" , true );
66+
67+ ObjectMapper mapper ;
68+ for (NodeFactory nf : factories )
69+ {
70+ if (nf .getClass ().getSimpleName ().equals ("Jackson2NodeFactory" ))
71+ {
72+ mapper = (ObjectMapper ) FieldUtils .readField (nf , "mapper" , true );
73+
74+ mapper .configure (DeserializationFeature .FAIL_ON_NULL_FOR_PRIMITIVES , true )
75+ .configure (DeserializationFeature .FAIL_ON_UNKNOWN_PROPERTIES , true )
76+ .configure (JsonParser .Feature .ALLOW_SINGLE_QUOTES , true )
77+ .configure (JsonParser .Feature .ALLOW_UNQUOTED_FIELD_NAMES , true )
78+ .configure (SerializationFeature .WRITE_DATES_AS_TIMESTAMPS , false )
79+ .registerModule ( new SimpleModule ()
80+ .addSerializer (VClock .class , new VClockSerializer ())
81+ );
82+
83+ return mapper ;
84+ }
85+ }
86+ }
87+ catch (IllegalAccessException e )
88+ {
89+ throw new IllegalStateException ("Can't initialize Jackson2 ObjectMapper because of UE" , e );
90+ }
91+
92+ throw new IllegalStateException ("Can't initialize Jackson2 ObjectMapper, Jackson2NodeFactory is not found" );
93+ }
94+
95+ protected static List <Map .Entry <String , RiakObject >> parseRiakObjectsFromJsonData (String json ) throws IOException
96+ {
97+ assert json != null && !json .isEmpty ();
98+
99+ String actualJson = json ;
100+
101+ // Add a list semantic if needed
102+ if (!json .trim ().startsWith ("[" ))
103+ {
104+ actualJson = "[\n " + json + "\n ]" ;
105+ }
106+
107+ final List <RiakObjectData > data = tolerantMapper .readValue (actualJson , new TypeReference <List <RiakTestFunctions .RiakObjectData >>(){});
108+ final List <Map .Entry <String , RiakObject >> r = new ArrayList <>(data .size ());
109+
110+ for (RiakObjectData rod : data )
111+ {
112+ final RiakObject ro = new RiakObject ();
113+ final Map .Entry <String , RiakObject > e = new AbstractMap .SimpleEntry <>(rod .key , ro );
114+
115+ r .add (e );
116+
117+ // populate value, if any
118+ if ( rod .value != null )
119+ {
120+ if ( rod .value instanceof Map || rod instanceof Collection )
121+ {
122+ final String v = tolerantMapper .writerWithDefaultPrettyPrinter ()
123+ .writeValueAsString (rod .value );
124+
125+ ro .setContentType ("application/json" )
126+ .setValue (BinaryValue .create (v ));
127+ }
128+ else
129+ {
130+ ro .setContentType ("text/plain" )
131+ .setValue (BinaryValue .create (rod .value .toString ()));
132+ }
133+ }
134+
135+ // populate 2i, if any
136+ if (rod .indices == null || rod .indices .isEmpty ())
137+ {
138+ continue ;
139+ }
140+
141+ final RiakIndexes riakIndexes = ro .getIndexes ();
142+ for (Map .Entry <String , Object > ie : rod .indices .entrySet ())
143+ {
144+ assert ie .getValue () != null ;
145+
146+ if (ie .getValue () instanceof Long )
147+ {
148+ riakIndexes .getIndex (LongIntIndex .named (ie .getKey ()))
149+ .add ((Long )ie .getValue ());
150+ }
151+ else if (ie .getValue () instanceof Integer )
152+ {
153+ riakIndexes .getIndex (LongIntIndex .named (ie .getKey ()))
154+ .add (((Integer )ie .getValue ()).longValue ());
155+ }
156+ else if (ie .getValue () instanceof String )
157+ {
158+ riakIndexes .getIndex (StringBinIndex .named (ie .getKey ()))
159+ .add ((String )ie .getValue ());
160+ }
161+ else throw new IllegalStateException ("Unsupported 2i value type '" +
162+ ie .getValue ().getClass ().getName () + "'" );
163+ }
164+ }
165+
166+ return r ;
167+ }
168+
169+ public static void createKVData (RiakClient client , Namespace ns , String jsonData ) throws IOException , ExecutionException , InterruptedException
170+ {
171+ final List <Map .Entry <String , RiakObject >> parsedData = parseRiakObjectsFromJsonData (jsonData );
172+
173+ for (Map .Entry <String , RiakObject > pd : parsedData )
174+ {
175+ final String key = createKValue (client , ns , pd .getKey (), pd .getValue (), true );
176+ }
177+ }
178+
179+ protected static String createKValue (RiakClient client , Location location ,
180+ Object value , Boolean checkCreation ) throws ExecutionException , InterruptedException
181+ {
182+ return createKValue (client , location .getNamespace (), location .getKeyAsString (), value , checkCreation );
183+ }
184+
185+ protected static String createKValue (RiakClient client , Namespace ns , String key ,
186+ Object value , Boolean checkCreation ) throws ExecutionException , InterruptedException
187+ {
188+ final StoreValue .Builder builder = new StoreValue .Builder (value )
189+ .withOption (StoreValue .Option .PW , Quorum .allQuorum ());
190+
191+ // Use provided key, if any
192+ if (key != null && !key .isEmpty ())
193+ {
194+ builder .withLocation (new Location (ns , key ));
195+ }
196+ else
197+ {
198+ builder .withNamespace (ns );
199+ }
200+
201+ final StoreValue cmd = builder
202+ .withOption (StoreValue .Option .W , new Quorum (1 ))
203+ .build ();
204+
205+ final StoreValue .Response r = client .execute (cmd );
206+
207+ final String realKey = r .hasGeneratedKey () ? r .getGeneratedKey ().toStringUtf8 () : key ;
208+
209+ if (checkCreation )
210+ {
211+ // -- check creation to be 100% sure that everything was created properly
212+ final Location location = new Location (ns , BinaryValue .create (realKey ));
213+
214+ FetchValue .Response fetchResponse = null ;
215+
216+ for (int retryCount =6 ; retryCount >=0 ; --retryCount )
217+ {
218+ try
219+ {
220+ fetchResponse = fetchByLocation (client , location );
221+ }
222+ catch (IllegalStateException ex )
223+ {
224+ if (ex .getMessage ().startsWith ("Nothing was found" ) && retryCount > 1 )
225+ {
226+ logger .trace ("Value for '{}' hasn't been created yet, attempt {}" , location , retryCount +1 );
227+ Thread .sleep (200 );
228+ continue ;
229+ }
230+
231+ throw ex ;
232+ }
233+ }
234+
235+
236+ // As soon as value is reachable by a key, it is expected that it also will be reachable by 2i
237+
238+ final RiakObject etalonRObj = value instanceof RiakObject ?
239+ (RiakObject ) value : fetchResponse .getValue (RiakObject .class );
240+
241+ for (RiakIndex <?> ri : etalonRObj .getIndexes ())
242+ {
243+ assert (ri .values ().size () == 1 );
244+
245+ ri .values ().forEach ( v -> {
246+ try {
247+ final List <Location > locations = query2i (client , ns , ri .getName (), v );
248+
249+ throwIllegalStateIf ( !locations .contains (location ),
250+ "Location '%s' is not reachable by 2i '%s'" ,
251+ location , ri .getName ());
252+
253+ } catch (Exception e ) {
254+ throw new RuntimeException (e );
255+ }
256+ });
257+ }
258+ }
259+
260+ return realKey ;
261+ }
262+
263+ protected static void throwIllegalStateIf (Boolean flag , String format , Object ... args ) throws IllegalStateException
264+ {
265+ if (flag )
266+ {
267+ throw new IllegalStateException (String .format (format , args ));
268+ }
269+ }
270+
271+ protected static <T > List <Location > query2i (RiakClient client , Namespace ns ,
272+ String indexName , T value ) throws ExecutionException , InterruptedException
273+ {
274+ SecondaryIndexQuery <?,?, ?> cmd = null ;
275+
276+ if (value instanceof String )
277+ {
278+ cmd = new BinIndexQuery .Builder (ns , indexName , (String )value ).build ();
279+ }
280+ else if (value instanceof Integer )
281+ {
282+ cmd = new IntIndexQuery .Builder (ns , indexName , ((Integer )value ).longValue ()).build ();
283+ }
284+ else if (value instanceof Long )
285+ {
286+ cmd = new IntIndexQuery .Builder (ns , indexName , (Long )value ).build ();
287+ }
288+ else throwIllegalStateIf (true , "Type '%s' is not suitable for 2i" , value .getClass ().getName ());
289+
290+ return client .execute (cmd )
291+ .getEntries ().stream ()
292+ .map (e ->e .getRiakObjectLocation ())
293+ .collect (Collectors .toList ());
294+ }
295+
296+ protected static <V > V fetchByLocationAs (RiakClient client , Location location , Class <V > valueClazz )
297+ throws ExecutionException , InterruptedException
298+ {
299+ final FetchValue .Response r = fetchByLocation (client , location );
300+
301+ throwIllegalStateIf (r .isNotFound (), "Nothing was found for location '%s'" , location );
302+ throwIllegalStateIf (r .getNumberOfValues () > 1 ,
303+ "Fetch by Location '$location' returns more than one result: %d were actually returned" ,
304+ r .getNumberOfValues ());
305+
306+ final V v = r .getValue (valueClazz );
307+ return v ;
308+ }
309+
310+ protected static FetchValue .Response fetchByLocation (RiakClient client , Location location )
311+ throws ExecutionException , InterruptedException
312+ {
313+ final FetchValue cmd = new FetchValue .Builder (location ).build ();
314+ final FetchValue .Response r = client .execute (cmd );
315+
316+ return r ;
317+ }
318+
319+ private static class VClockSerializer extends JsonSerializer <VClock >
320+ {
321+ @ Override
322+ public void serialize (VClock value , JsonGenerator gen , SerializerProvider serializers ) throws IOException , JsonProcessingException {
323+ // Due to lack of support binary values in JsonUnit it is required to perform manual conversion to Base64
324+ //gen.writeBinary(value.getBytes());
325+ gen .writeString (Base64 .getEncoder ().encodeToString (value .getBytes ()));
326+ }
327+ }
328+ }
0 commit comments