From 79e408ff143ae82b71bba010c888d9a46f4a72b6 Mon Sep 17 00:00:00 2001 From: Gursharan Singh <3442979+G8XSU@users.noreply.github.com> Date: Wed, 12 Apr 2023 18:14:07 -0700 Subject: [PATCH 1/2] Add ListKeyVersions Api Implementation --- .../impl/postgres/PostgresBackendImpl.java | 53 ++++++++++++++++++- 1 file changed, 51 insertions(+), 2 deletions(-) diff --git a/app/src/main/java/org/vss/impl/postgres/PostgresBackendImpl.java b/app/src/main/java/org/vss/impl/postgres/PostgresBackendImpl.java index 03e9371..73b1d3e 100644 --- a/app/src/main/java/org/vss/impl/postgres/PostgresBackendImpl.java +++ b/app/src/main/java/org/vss/impl/postgres/PostgresBackendImpl.java @@ -5,6 +5,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import javax.inject.Singleton; import org.jooq.DSLContext; import org.jooq.Insert; @@ -21,12 +22,12 @@ import org.vss.exception.ConflictException; import org.vss.postgres.tables.records.VssDbRecord; -import static org.jooq.impl.DSL.val; import static org.vss.postgres.tables.VssDb.VSS_DB; @Singleton public class PostgresBackendImpl implements KVStore { + private static final int LIST_KEY_VERSIONS_MAX_PAGE_SIZE = 100; private final DSLContext context; @Inject @@ -127,6 +128,54 @@ private VssDbRecord buildVssRecord(String storeId, KeyValue kv) { @Override public ListKeyVersionsResponse listKeyVersions(ListKeyVersionsRequest request) { - throw new UnsupportedOperationException("Operation not implemented"); + String storeId = request.getStoreId(); + String keyPrefix = request.getKeyPrefix(); + String pageToken = request.getPageToken(); + int pageSize = request.hasPageSize() ? request.getPageSize() : Integer.MAX_VALUE; + + // Only fetch global_version for first page. + // Fetch global_version before fetching any key_versions to ensure that, + // all current key_versions were stored at global_version or later. + Long globalVersion = null; + if (!request.hasPageToken()) { + GetObjectRequest getGlobalVersionRequest = GetObjectRequest.newBuilder() + .setStoreId(storeId) + .setKey(GLOBAL_VERSION_KEY) + .build(); + globalVersion = get(getGlobalVersionRequest).getValue().getVersion(); + } + + List vssDbRecords = context.select(VSS_DB.KEY, VSS_DB.VERSION).from(VSS_DB) + .where(VSS_DB.STORE_ID.eq(storeId) + .and(VSS_DB.KEY.startsWith(keyPrefix))) + .orderBy(VSS_DB.KEY) + .seek(pageToken) + .limit(Math.min(pageSize, LIST_KEY_VERSIONS_MAX_PAGE_SIZE)) + .stream() + .map(record -> record.into(VssDbRecord.class)) + .toList(); + + List keyVersions = vssDbRecords.stream() + .filter(kv -> !GLOBAL_VERSION_KEY.equals(kv.getKey())) + .map(kv -> KeyValue.newBuilder() + .setKey(kv.getKey()) + .setVersion(kv.getVersion()) + .build()) + .toList(); + + String nextPageToken = ""; + if (!keyVersions.isEmpty()) { + nextPageToken = keyVersions.get(keyVersions.size() - 1).getKey(); + } + + ListKeyVersionsResponse.Builder responseBuilder = ListKeyVersionsResponse.newBuilder() + .addAllKeyVersions(keyVersions) + .setNextPageToken(nextPageToken); + + if (Objects.nonNull(globalVersion)) { + responseBuilder.setGlobalVersion(globalVersion); + } + + return responseBuilder.build(); } } From 69fb1197e01c25a266e64cc89267c1eff751624d Mon Sep 17 00:00:00 2001 From: Gursharan Singh <3442979+G8XSU@users.noreply.github.com> Date: Fri, 21 Apr 2023 16:00:12 -0700 Subject: [PATCH 2/2] Add ListKeyVersions Api AbstractKVStore Integration Tests --- .../vss/AbstractKVStoreIntegrationTest.java | 211 +++++++++++++++++- 1 file changed, 208 insertions(+), 3 deletions(-) diff --git a/app/src/test/java/org/vss/AbstractKVStoreIntegrationTest.java b/app/src/test/java/org/vss/AbstractKVStoreIntegrationTest.java index b0e6742..183b5ff 100644 --- a/app/src/test/java/org/vss/AbstractKVStoreIntegrationTest.java +++ b/app/src/test/java/org/vss/AbstractKVStoreIntegrationTest.java @@ -2,14 +2,22 @@ import com.google.protobuf.ByteString; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils; import org.vss.exception.ConflictException; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -28,6 +36,8 @@ void putShouldSucceedWhenSingleObjectPutOperation() { assertThat(response.getKey(), is("k1")); assertThat(response.getVersion(), is(2L)); assertThat(response.getValue().toStringUtf8(), is("k1v2")); + + assertThat(getObject(KVStore.GLOBAL_VERSION_KEY).getVersion(), is(2L)); } @Test @@ -50,6 +60,8 @@ void putShouldSucceedWhenMultiObjectPutOperation() { assertThat(response.getKey(), is("k2")); assertThat(response.getVersion(), is(2L)); assertThat(response.getValue().toStringUtf8(), is("k2v2")); + + assertThat(getObject(KVStore.GLOBAL_VERSION_KEY).getVersion(), is(2L)); } @Test @@ -59,11 +71,13 @@ void putShouldFailWhenKeyVersionMismatched() { // global_version correctly changed but key-version conflict. assertThrows(ConflictException.class, () -> putObjects(1L, List.of(kv("k1", "k1v2", 0)))); - //Verify that values didn't change + // Verify that values didn't change KeyValue response = getObject("k1"); assertThat(response.getKey(), is("k1")); assertThat(response.getVersion(), is(1L)); assertThat(response.getValue().toStringUtf8(), is("k1v1")); + + assertThat(getObject(KVStore.GLOBAL_VERSION_KEY).getVersion(), is(1L)); } @Test @@ -78,7 +92,7 @@ void putMultiObjectShouldFailWhenSingleKeyVersionMismatched() { assertThrows(ConflictException.class, () -> putObjects(null, second_request)); - //Verify that values didn't change + // Verify that values didn't change KeyValue response = getObject("k1"); assertThat(response.getKey(), is("k1")); assertThat(response.getVersion(), is(1L)); @@ -113,6 +127,8 @@ void putShouldSucceedWhenNoGlobalVersionIsGiven() { assertThat(response.getKey(), is("k1")); assertThat(response.getVersion(), is(2L)); assertThat(response.getValue().toStringUtf8(), is("k1v2")); + + assertThat(getObject(KVStore.GLOBAL_VERSION_KEY).getVersion(), is(0L)); } @Test @@ -163,6 +179,177 @@ void getShouldReturnCorrectValueWhenKeyExists() { assertThat(response.getValue().toStringUtf8(), is("k3v1")); } + @Test + void listShouldReturnPaginatedResponse() { + + int totalKvObjects = 1000; + for (int i = 0; i < totalKvObjects; i++) { + putObjects((long) i, List.of(kv("k" + i, "k1v1", 0))); + } + // Overwrite k1 once and k2 twice. + putObjects(1000L, List.of(kv("k1", "k1v2", 1))); + putObjects(1001L, List.of(kv("k2", "k2v2", 1))); + putObjects(1002L, List.of(kv("k2", "k2v3", 2))); + + ListKeyVersionsResponse previousPage = null; + List allKeyVersions = new ArrayList<>(); + + while (previousPage == null || !previousPage.getKeyVersionsList().isEmpty()) { + ListKeyVersionsResponse currentPage; + + if (previousPage == null) { + currentPage = list(null, null, null); + + // Ensure first page contains correct global version + assertThat(currentPage.getGlobalVersion(), is(1003L)); + } else { + String nextPageToken = previousPage.getNextPageToken(); + currentPage = list(nextPageToken, null, null); + + // Ensure pages after first page dont contain global version. + assertThat(currentPage.hasGlobalVersion(), is(false)); + } + + allKeyVersions.addAll(currentPage.getKeyVersionsList()); + previousPage = currentPage; + } + + // Ensure page results don't intersect/duplicate and return complete view. + Set uniqueKeys = allKeyVersions.stream().map(KeyValue::getKey).distinct() + .collect(Collectors.toSet()); + assertThat(uniqueKeys.size(), is(totalKvObjects)); + + // Ensure that we don't return "vss_global_version" as part of keys. + assertFalse(uniqueKeys.contains(KVStore.GLOBAL_VERSION_KEY)); + + // Ensure correct key version for k1 + KeyValue k1_response = + allKeyVersions.stream().filter(kv -> "k1".equals(kv.getKey())).findFirst().get(); + assertThat(k1_response.getKey(), is("k1")); + assertThat(k1_response.getVersion(), is(2L)); + assertThat(k1_response.getValue().toStringUtf8(), is("")); + + // Ensure correct key version for k2 + KeyValue k2_response = + allKeyVersions.stream().filter(kv -> "k2".equals(kv.getKey())).findFirst().get(); + assertThat(k2_response.getKey(), is("k2")); + assertThat(k2_response.getVersion(), is(3L)); + assertThat(k2_response.getValue().toStringUtf8(), is("")); + } + + @Test + void listShouldHonourPageSizeAndKeyPrefixIfProvided() { + int totalKvObjects = 20; + int pageSize = 5; + for (int i = 0; i < totalKvObjects; i++) { + putObjects((long) i, List.of(kv(i + "k", "k1v1", 0))); + } + + ListKeyVersionsResponse previousPage = null; + List allKeyVersions = new ArrayList<>(); + String keyPrefix = "1"; + + while (previousPage == null || !previousPage.getKeyVersionsList().isEmpty()) { + ListKeyVersionsResponse currentPage; + + if (previousPage == null) { + currentPage = list(null, pageSize, keyPrefix); + } else { + String nextPageToken = previousPage.getNextPageToken(); + currentPage = list(nextPageToken, pageSize, keyPrefix); + } + + allKeyVersions.addAll(currentPage.getKeyVersionsList()); + + // Each page.size() is less than or equal to pageSize in request. + assertThat(currentPage.getKeyVersionsList().size(), lessThanOrEqualTo(pageSize)); + previousPage = currentPage; + } + + Set uniqueKeys = + allKeyVersions.stream().map(KeyValue::getKey).collect(Collectors.toSet()); + + // Returns keys only with provided keyPrefix + assertThat(uniqueKeys.size(), is(11)); + assertThat(uniqueKeys, + is(Set.of("1k", "10k", "11k", "12k", "13k", "14k", "15k", "16k", "17k", "18k", "19k"))); + } + + @Test + void listShouldReturnZeroGlobalVersionWhenGlobalVersioningNotEnabled() { + int totalKvObjects = 1000; + for (int i = 0; i < totalKvObjects; i++) { + putObjects(null, List.of(kv("k" + i, "k1v1", 0))); + } + + ListKeyVersionsResponse previousPage = null; + List allKeyVersions = new ArrayList<>(); + + while (previousPage == null || !previousPage.getKeyVersionsList().isEmpty()) { + ListKeyVersionsResponse currentPage; + + if (previousPage == null) { + currentPage = list(null, null, null); + + // Ensure first page returns global version as ZERO + assertThat(currentPage.getGlobalVersion(), is(0L)); + } else { + String nextPageToken = previousPage.getNextPageToken(); + currentPage = list(nextPageToken, null, null); + + // Ensure pages after first page do not contain global version. + assertThat(currentPage.hasGlobalVersion(), is(false)); + } + + allKeyVersions.addAll(currentPage.getKeyVersionsList()); + previousPage = currentPage; + } + // Returns complete view. + Set uniqueKeys = allKeyVersions.stream().map(KeyValue::getKey).distinct() + .collect(Collectors.toSet()); + assertThat(uniqueKeys.size(), is(totalKvObjects)); + + // Ensure that we don't return "vss_global_version" as part of keys. + assertFalse(uniqueKeys.contains(KVStore.GLOBAL_VERSION_KEY)); + } + + @Test + void listShouldLimitMaxPageSize() { + + int totalKvObjects = 10000; + + // Each implementation is free to choose its own max_page_size but there should be a reasonable max + // keeping scalability and performance in mind. + // Revisit this test case if some implementation wants to support higher page size. + int vssArbitraryPageSizeMax = 3000; + + for (int i = 0; i < totalKvObjects; i++) { + putObjects((long) i, List.of(kv("k" + i, "k1v1", 0))); + } + + ListKeyVersionsResponse previousPage = null; + List allKeyVersions = new ArrayList<>(); + + while (previousPage == null || !previousPage.getKeyVersionsList().isEmpty()) { + ListKeyVersionsResponse currentPage; + + if (previousPage == null) { + currentPage = list(null, null, null); + } else { + String nextPageToken = previousPage.getNextPageToken(); + currentPage = list(nextPageToken, null, null); + } + + allKeyVersions.addAll(currentPage.getKeyVersionsList()); + + // Each page.size() is less than MAX_PAGE_SIZE + assertThat(currentPage.getKeyVersionsList().size(), lessThan(vssArbitraryPageSizeMax)); + previousPage = currentPage; + } + + assertThat(allKeyVersions.size(), is(totalKvObjects)); + } + private KeyValue getObject(String key) { GetObjectRequest getRequest = GetObjectRequest.newBuilder() .setStoreId(STORE_ID) @@ -171,7 +358,7 @@ private KeyValue getObject(String key) { return this.kvStore.get(getRequest).getValue(); } - private void putObjects(Long globalVersion, List keyValues) { + private void putObjects(@Nullable Long globalVersion, List keyValues) { PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.newBuilder() .setStoreId(STORE_ID) .addAllTransactionItems(keyValues); @@ -183,6 +370,24 @@ private void putObjects(Long globalVersion, List keyValues) { this.kvStore.put(putObjectRequestBuilder.build()); } + private ListKeyVersionsResponse list(@Nullable String nextPageToken, @Nullable Integer pageSize, + @Nullable String keyPrefix) { + ListKeyVersionsRequest.Builder listRequestBuilder = ListKeyVersionsRequest.newBuilder() + .setStoreId(STORE_ID); + + if (StringUtils.isNotBlank(nextPageToken)) { + listRequestBuilder.setPageToken(nextPageToken); + } + if (pageSize != null) { + listRequestBuilder.setPageSize(pageSize); + } + if (StringUtils.isNotBlank(keyPrefix)) { + listRequestBuilder.setKeyPrefix(keyPrefix); + } + + return this.kvStore.listKeyVersions(listRequestBuilder.build()); + } + private KeyValue kv(String key, String value, int version) { return KeyValue.newBuilder().setKey(key).setVersion(version).setValue( ByteString.copyFrom(value.getBytes(