22
33import com .google .protobuf .ByteString ;
44import java .nio .charset .StandardCharsets ;
5+ import java .util .ArrayList ;
56import java .util .List ;
67import java .util .Objects ;
8+ import java .util .Set ;
9+ import java .util .stream .Collectors ;
10+ import javax .annotation .Nullable ;
711import org .junit .jupiter .api .Test ;
12+ import org .testcontainers .shaded .org .apache .commons .lang3 .StringUtils ;
813import org .vss .exception .ConflictException ;
914
1015import static org .hamcrest .MatcherAssert .assertThat ;
1116import static org .hamcrest .Matchers .is ;
17+ import static org .hamcrest .Matchers .lessThan ;
18+ import static org .hamcrest .Matchers .lessThanOrEqualTo ;
1219import static org .junit .jupiter .api .Assertions .assertDoesNotThrow ;
20+ import static org .junit .jupiter .api .Assertions .assertFalse ;
1321import static org .junit .jupiter .api .Assertions .assertThrows ;
1422import static org .junit .jupiter .api .Assertions .assertTrue ;
1523
@@ -28,6 +36,8 @@ void putShouldSucceedWhenSingleObjectPutOperation() {
2836 assertThat (response .getKey (), is ("k1" ));
2937 assertThat (response .getVersion (), is (2L ));
3038 assertThat (response .getValue ().toStringUtf8 (), is ("k1v2" ));
39+
40+ assertThat (getObject (KVStore .GLOBAL_VERSION_KEY ).getVersion (), is (2L ));
3141 }
3242
3343 @ Test
@@ -50,6 +60,8 @@ void putShouldSucceedWhenMultiObjectPutOperation() {
5060 assertThat (response .getKey (), is ("k2" ));
5161 assertThat (response .getVersion (), is (2L ));
5262 assertThat (response .getValue ().toStringUtf8 (), is ("k2v2" ));
63+
64+ assertThat (getObject (KVStore .GLOBAL_VERSION_KEY ).getVersion (), is (2L ));
5365 }
5466
5567 @ Test
@@ -59,11 +71,13 @@ void putShouldFailWhenKeyVersionMismatched() {
5971 // global_version correctly changed but key-version conflict.
6072 assertThrows (ConflictException .class , () -> putObjects (1L , List .of (kv ("k1" , "k1v2" , 0 ))));
6173
62- //Verify that values didn't change
74+ // Verify that values didn't change
6375 KeyValue response = getObject ("k1" );
6476 assertThat (response .getKey (), is ("k1" ));
6577 assertThat (response .getVersion (), is (1L ));
6678 assertThat (response .getValue ().toStringUtf8 (), is ("k1v1" ));
79+
80+ assertThat (getObject (KVStore .GLOBAL_VERSION_KEY ).getVersion (), is (1L ));
6781 }
6882
6983 @ Test
@@ -78,7 +92,7 @@ void putMultiObjectShouldFailWhenSingleKeyVersionMismatched() {
7892
7993 assertThrows (ConflictException .class , () -> putObjects (null , second_request ));
8094
81- //Verify that values didn't change
95+ // Verify that values didn't change
8296 KeyValue response = getObject ("k1" );
8397 assertThat (response .getKey (), is ("k1" ));
8498 assertThat (response .getVersion (), is (1L ));
@@ -113,6 +127,8 @@ void putShouldSucceedWhenNoGlobalVersionIsGiven() {
113127 assertThat (response .getKey (), is ("k1" ));
114128 assertThat (response .getVersion (), is (2L ));
115129 assertThat (response .getValue ().toStringUtf8 (), is ("k1v2" ));
130+
131+ assertThat (getObject (KVStore .GLOBAL_VERSION_KEY ).getVersion (), is (0L ));
116132 }
117133
118134 @ Test
@@ -163,6 +179,177 @@ void getShouldReturnCorrectValueWhenKeyExists() {
163179 assertThat (response .getValue ().toStringUtf8 (), is ("k3v1" ));
164180 }
165181
182+ @ Test
183+ void listShouldReturnPaginatedResponse () {
184+
185+ int totalKvObjects = 1000 ;
186+ for (int i = 0 ; i < totalKvObjects ; i ++) {
187+ putObjects ((long ) i , List .of (kv ("k" + i , "k1v1" , 0 )));
188+ }
189+ // Overwrite k1 once and k2 twice.
190+ putObjects (1000L , List .of (kv ("k1" , "k1v2" , 1 )));
191+ putObjects (1001L , List .of (kv ("k2" , "k2v2" , 1 )));
192+ putObjects (1002L , List .of (kv ("k2" , "k2v3" , 2 )));
193+
194+ ListKeyVersionsResponse previousPage = null ;
195+ List <KeyValue > allKeyVersions = new ArrayList <>();
196+
197+ while (previousPage == null || !previousPage .getKeyVersionsList ().isEmpty ()) {
198+ ListKeyVersionsResponse currentPage ;
199+
200+ if (previousPage == null ) {
201+ currentPage = list (null , null , null );
202+
203+ // Ensure first page contains correct global version
204+ assertThat (currentPage .getGlobalVersion (), is (1003L ));
205+ } else {
206+ String nextPageToken = previousPage .getNextPageToken ();
207+ currentPage = list (nextPageToken , null , null );
208+
209+ // Ensure pages after first page dont contain global version.
210+ assertThat (currentPage .hasGlobalVersion (), is (false ));
211+ }
212+
213+ allKeyVersions .addAll (currentPage .getKeyVersionsList ());
214+ previousPage = currentPage ;
215+ }
216+
217+ // Ensure page results don't intersect/duplicate and return complete view.
218+ Set <String > uniqueKeys = allKeyVersions .stream ().map (KeyValue ::getKey ).distinct ()
219+ .collect (Collectors .toSet ());
220+ assertThat (uniqueKeys .size (), is (totalKvObjects ));
221+
222+ // Ensure that we don't return "vss_global_version" as part of keys.
223+ assertFalse (uniqueKeys .contains (KVStore .GLOBAL_VERSION_KEY ));
224+
225+ // Ensure correct key version for k1
226+ KeyValue k1_response =
227+ allKeyVersions .stream ().filter (kv -> "k1" .equals (kv .getKey ())).findFirst ().get ();
228+ assertThat (k1_response .getKey (), is ("k1" ));
229+ assertThat (k1_response .getVersion (), is (2L ));
230+ assertThat (k1_response .getValue ().toStringUtf8 (), is ("" ));
231+
232+ // Ensure correct key version for k2
233+ KeyValue k2_response =
234+ allKeyVersions .stream ().filter (kv -> "k2" .equals (kv .getKey ())).findFirst ().get ();
235+ assertThat (k2_response .getKey (), is ("k2" ));
236+ assertThat (k2_response .getVersion (), is (3L ));
237+ assertThat (k2_response .getValue ().toStringUtf8 (), is ("" ));
238+ }
239+
240+ @ Test
241+ void listShouldHonourPageSizeAndKeyPrefixIfProvided () {
242+ int totalKvObjects = 20 ;
243+ int pageSize = 5 ;
244+ for (int i = 0 ; i < totalKvObjects ; i ++) {
245+ putObjects ((long ) i , List .of (kv (i + "k" , "k1v1" , 0 )));
246+ }
247+
248+ ListKeyVersionsResponse previousPage = null ;
249+ List <KeyValue > allKeyVersions = new ArrayList <>();
250+ String keyPrefix = "1" ;
251+
252+ while (previousPage == null || !previousPage .getKeyVersionsList ().isEmpty ()) {
253+ ListKeyVersionsResponse currentPage ;
254+
255+ if (previousPage == null ) {
256+ currentPage = list (null , pageSize , keyPrefix );
257+ } else {
258+ String nextPageToken = previousPage .getNextPageToken ();
259+ currentPage = list (nextPageToken , pageSize , keyPrefix );
260+ }
261+
262+ allKeyVersions .addAll (currentPage .getKeyVersionsList ());
263+
264+ // Each page.size() is less than or equal to pageSize in request.
265+ assertThat (currentPage .getKeyVersionsList ().size (), lessThanOrEqualTo (pageSize ));
266+ previousPage = currentPage ;
267+ }
268+
269+ Set <String > uniqueKeys =
270+ allKeyVersions .stream ().map (KeyValue ::getKey ).collect (Collectors .toSet ());
271+
272+ // Returns keys only with provided keyPrefix
273+ assertThat (uniqueKeys .size (), is (11 ));
274+ assertThat (uniqueKeys ,
275+ is (Set .of ("1k" , "10k" , "11k" , "12k" , "13k" , "14k" , "15k" , "16k" , "17k" , "18k" , "19k" )));
276+ }
277+
278+ @ Test
279+ void listShouldReturnZeroGlobalVersionWhenGlobalVersioningNotEnabled () {
280+ int totalKvObjects = 1000 ;
281+ for (int i = 0 ; i < totalKvObjects ; i ++) {
282+ putObjects (null , List .of (kv ("k" + i , "k1v1" , 0 )));
283+ }
284+
285+ ListKeyVersionsResponse previousPage = null ;
286+ List <KeyValue > allKeyVersions = new ArrayList <>();
287+
288+ while (previousPage == null || !previousPage .getKeyVersionsList ().isEmpty ()) {
289+ ListKeyVersionsResponse currentPage ;
290+
291+ if (previousPage == null ) {
292+ currentPage = list (null , null , null );
293+
294+ // Ensure first page returns global version as ZERO
295+ assertThat (currentPage .getGlobalVersion (), is (0L ));
296+ } else {
297+ String nextPageToken = previousPage .getNextPageToken ();
298+ currentPage = list (nextPageToken , null , null );
299+
300+ // Ensure pages after first page do not contain global version.
301+ assertThat (currentPage .hasGlobalVersion (), is (false ));
302+ }
303+
304+ allKeyVersions .addAll (currentPage .getKeyVersionsList ());
305+ previousPage = currentPage ;
306+ }
307+ // Returns complete view.
308+ Set <String > uniqueKeys = allKeyVersions .stream ().map (KeyValue ::getKey ).distinct ()
309+ .collect (Collectors .toSet ());
310+ assertThat (uniqueKeys .size (), is (totalKvObjects ));
311+
312+ // Ensure that we don't return "vss_global_version" as part of keys.
313+ assertFalse (uniqueKeys .contains (KVStore .GLOBAL_VERSION_KEY ));
314+ }
315+
316+ @ Test
317+ void listShouldLimitMaxPageSize () {
318+
319+ int totalKvObjects = 10000 ;
320+
321+ // Each implementation is free to choose its own max_page_size but there should be a reasonable max
322+ // keeping scalability and performance in mind.
323+ // Revisit this test case if some implementation wants to support higher page size.
324+ int vssArbitraryPageSizeMax = 3000 ;
325+
326+ for (int i = 0 ; i < totalKvObjects ; i ++) {
327+ putObjects ((long ) i , List .of (kv ("k" + i , "k1v1" , 0 )));
328+ }
329+
330+ ListKeyVersionsResponse previousPage = null ;
331+ List <KeyValue > allKeyVersions = new ArrayList <>();
332+
333+ while (previousPage == null || !previousPage .getKeyVersionsList ().isEmpty ()) {
334+ ListKeyVersionsResponse currentPage ;
335+
336+ if (previousPage == null ) {
337+ currentPage = list (null , null , null );
338+ } else {
339+ String nextPageToken = previousPage .getNextPageToken ();
340+ currentPage = list (nextPageToken , null , null );
341+ }
342+
343+ allKeyVersions .addAll (currentPage .getKeyVersionsList ());
344+
345+ // Each page.size() is less than MAX_PAGE_SIZE
346+ assertThat (currentPage .getKeyVersionsList ().size (), lessThan (vssArbitraryPageSizeMax ));
347+ previousPage = currentPage ;
348+ }
349+
350+ assertThat (allKeyVersions .size (), is (totalKvObjects ));
351+ }
352+
166353 private KeyValue getObject (String key ) {
167354 GetObjectRequest getRequest = GetObjectRequest .newBuilder ()
168355 .setStoreId (STORE_ID )
@@ -171,7 +358,7 @@ private KeyValue getObject(String key) {
171358 return this .kvStore .get (getRequest ).getValue ();
172359 }
173360
174- private void putObjects (Long globalVersion , List <KeyValue > keyValues ) {
361+ private void putObjects (@ Nullable Long globalVersion , List <KeyValue > keyValues ) {
175362 PutObjectRequest .Builder putObjectRequestBuilder = PutObjectRequest .newBuilder ()
176363 .setStoreId (STORE_ID )
177364 .addAllTransactionItems (keyValues );
@@ -183,6 +370,24 @@ private void putObjects(Long globalVersion, List<KeyValue> keyValues) {
183370 this .kvStore .put (putObjectRequestBuilder .build ());
184371 }
185372
373+ private ListKeyVersionsResponse list (@ Nullable String nextPageToken , @ Nullable Integer pageSize ,
374+ @ Nullable String keyPrefix ) {
375+ ListKeyVersionsRequest .Builder listRequestBuilder = ListKeyVersionsRequest .newBuilder ()
376+ .setStoreId (STORE_ID );
377+
378+ if (StringUtils .isNotBlank (nextPageToken )) {
379+ listRequestBuilder .setPageToken (nextPageToken );
380+ }
381+ if (pageSize != null ) {
382+ listRequestBuilder .setPageSize (pageSize );
383+ }
384+ if (StringUtils .isNotBlank (keyPrefix )) {
385+ listRequestBuilder .setKeyPrefix (keyPrefix );
386+ }
387+
388+ return this .kvStore .listKeyVersions (listRequestBuilder .build ());
389+ }
390+
186391 private KeyValue kv (String key , String value , int version ) {
187392 return KeyValue .newBuilder ().setKey (key ).setVersion (version ).setValue (
188393 ByteString .copyFrom (value .getBytes (
0 commit comments