diff --git a/app/build.gradle b/app/build.gradle index 83102a3..8791b2a 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -1,39 +1,101 @@ buildscript { - ext.gradleVersion = '7.5.1' - ext.protobufPlugInVersion = '0.8.12' - ext.protobufVersion = '3.21.7' - ext.jerseyVersion = '3.1.0' - ext.junitVersion = '5.9.0' + ext.gradleVersion = '7.5.1' + ext.protobufPlugInVersion = '0.8.12' + ext.protobufVersion = '3.21.7' + ext.jerseyVersion = '3.1.0' + ext.junitVersion = '5.9.0' + ext.postgresVersion = '42.5.1' + ext.jooqVersion = '3.17.7' + ext.guiceVersion = '5.1.0' } plugins { - id 'java' - id 'com.google.protobuf' version "${protobufPlugInVersion}" - id 'war' - id 'idea' + id 'java' + id 'com.google.protobuf' version "${protobufPlugInVersion}" + id 'war' + id 'idea' + id 'nu.studer.jooq' version '8.0' } repositories { - mavenCentral() + mavenCentral() } idea { - module { - generatedSourceDirs.add(file("build/generated/proto/main")) - } + module { + generatedSourceDirs.add(file("build/generated/proto/main")) + } } group 'org.vss' version '1.0' - dependencies { - implementation "com.google.protobuf:protobuf-java:$protobufVersion" + implementation "com.google.protobuf:protobuf-java:$protobufVersion" + + //jOOQ & Postgres impl deps + implementation "org.jooq:jooq:$jooqVersion" + implementation "org.jooq:jooq-meta:$jooqVersion" + implementation "org.jooq:jooq-codegen:$jooqVersion" + runtimeOnly "org.postgresql:postgresql:$postgresVersion" + jooqGenerator "org.postgresql:postgresql:$postgresVersion" - testImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion" - testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion" + implementation "com.google.inject:guice:$guiceVersion" + + testImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion" + testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion" + testImplementation "org.hamcrest:hamcrest-library:2.2" + testImplementation "org.testcontainers:junit-jupiter:1.17.6" + testImplementation "org.testcontainers:postgresql:1.17.6" } test { - useJUnitPlatform() -} \ No newline at end of file + useJUnitPlatform() +} + +protobuf { + protoc { + artifact = "com.google.protobuf:protoc:$protobufVersion" + } +} + +jooq { + configurations { + main { + generateSchemaSourceOnCompilation = true + + generationTool { + jdbc { + driver = 'org.postgresql.Driver' + url = 'jdbc:postgresql://localhost:5432/postgres' + user = 'postgres' + password = '' + properties { + property { + key = 'ssl' + value = 'false' + } + } + } + generator { + name = 'org.jooq.codegen.DefaultGenerator' + database { + name = 'org.jooq.meta.postgres.PostgresDatabase' + inputSchema = 'public' + } + generate { + deprecated = false + records = true + immutablePojos = true + fluentSetters = true + } + target { + packageName = 'org.vss.postgres' + directory = 'build/generated-src/jooq/main' + } + strategy.name = 'org.jooq.codegen.DefaultGeneratorStrategy' + } + } + } + } +} diff --git a/app/src/main/java/org/vss/KVStore.java b/app/src/main/java/org/vss/KVStore.java new file mode 100644 index 0000000..772dc20 --- /dev/null +++ b/app/src/main/java/org/vss/KVStore.java @@ -0,0 +1,13 @@ +package org.vss; + +public interface KVStore { + + String GLOBAL_VERSION_KEY = "vss_global_version"; + + GetObjectResponse get(GetObjectRequest request); + + PutObjectResponse put(PutObjectRequest request); + + ListKeyVersionsResponse listKeyVersions(ListKeyVersionsRequest request); +} + diff --git a/app/src/main/java/org/vss/exception/ConflictException.java b/app/src/main/java/org/vss/exception/ConflictException.java new file mode 100644 index 0000000..afec1f5 --- /dev/null +++ b/app/src/main/java/org/vss/exception/ConflictException.java @@ -0,0 +1,8 @@ +package org.vss.exception; + +public class ConflictException extends RuntimeException { + public ConflictException(String message) { + super(message); + } +} + diff --git a/app/src/main/java/org/vss/impl/postgres/PostgresBackendImpl.java b/app/src/main/java/org/vss/impl/postgres/PostgresBackendImpl.java new file mode 100644 index 0000000..d69c6f7 --- /dev/null +++ b/app/src/main/java/org/vss/impl/postgres/PostgresBackendImpl.java @@ -0,0 +1,133 @@ +package org.vss.impl.postgres; + +import com.google.inject.Inject; +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import javax.inject.Singleton; +import org.jooq.DSLContext; +import org.jooq.Insert; +import org.jooq.Query; +import org.jooq.Update; +import org.vss.GetObjectRequest; +import org.vss.GetObjectResponse; +import org.vss.KVStore; +import org.vss.KeyValue; +import org.vss.ListKeyVersionsRequest; +import org.vss.ListKeyVersionsResponse; +import org.vss.PutObjectRequest; +import org.vss.PutObjectResponse; +import org.vss.exception.ConflictException; +import org.vss.postgres.tables.records.VssDbRecord; + +import static org.jooq.impl.DSL.val; +import static org.vss.postgres.tables.VssDb.VSS_DB; + +@Singleton +public class PostgresBackendImpl implements KVStore { + + private final DSLContext context; + + @Inject + public PostgresBackendImpl(DSLContext context) { + this.context = context; + } + + @Override + public GetObjectResponse get(GetObjectRequest request) { + + VssDbRecord vssDbRecord = context.selectFrom(VSS_DB) + .where(VSS_DB.STORE_ID.eq(request.getStoreId()) + .and(VSS_DB.KEY.eq(request.getKey()))) + .fetchOne(); + + final KeyValue keyValue; + + if (vssDbRecord != null) { + keyValue = KeyValue.newBuilder() + .setKey(vssDbRecord.getKey()) + .setValue(ByteString.copyFrom(vssDbRecord.getValue())) + .setVersion(vssDbRecord.getVersion()) + .build(); + } else { + keyValue = KeyValue.newBuilder() + .setKey(request.getKey()).build(); + } + + return GetObjectResponse.newBuilder() + .setValue(keyValue) + .build(); + } + + @Override + public PutObjectResponse put(PutObjectRequest request) { + + String storeId = request.getStoreId(); + + List vssRecords = new ArrayList<>(request.getTransactionItemsList().stream() + .map(kv -> buildVssRecord(storeId, kv)).toList()); + + if (request.hasGlobalVersion()) { + VssDbRecord globalVersionRecord = buildVssRecord(storeId, + KeyValue.newBuilder() + .setKey(GLOBAL_VERSION_KEY) + .setVersion(request.getGlobalVersion()) + .build()); + + vssRecords.add(globalVersionRecord); + } + + context.transaction((ctx) -> { + DSLContext dsl = ctx.dsl(); + List batchQueries = vssRecords.stream() + .map(vssRecord -> buildPutObjectQuery(dsl, vssRecord)).toList(); + + int[] batchResult = dsl.batch(batchQueries).execute(); + + for (int numOfRowsUpdated : batchResult) { + if (numOfRowsUpdated == 0) { + throw new ConflictException( + "Transaction could not be completed due to a possible conflict"); + } + } + }); + + return PutObjectResponse.newBuilder().build(); + } + + private Query buildPutObjectQuery(DSLContext dsl, VssDbRecord vssRecord) { + return vssRecord.getVersion() == 0 ? buildInsertRecordQuery(dsl, vssRecord) + : buildUpdateRecordQuery(dsl, vssRecord); + } + + private Insert buildInsertRecordQuery(DSLContext dsl, VssDbRecord vssRecord) { + return dsl.insertInto(VSS_DB) + .values(vssRecord.getStoreId(), vssRecord.getKey(), + vssRecord.getValue(), 1) + .onDuplicateKeyIgnore(); + } + + private Update buildUpdateRecordQuery(DSLContext dsl, VssDbRecord vssRecord) { + return dsl.update(VSS_DB) + .set(Map.of(VSS_DB.VALUE, vssRecord.getValue(), + VSS_DB.VERSION, vssRecord.getVersion() + 1)) + .where(VSS_DB.STORE_ID.eq(vssRecord.getStoreId()) + .and(VSS_DB.KEY.eq(vssRecord.getKey())) + .and(VSS_DB.VERSION.eq(vssRecord.getVersion()))); + } + + private VssDbRecord buildVssRecord(String storeId, KeyValue kv) { + return new VssDbRecord() + .setStoreId(storeId) + .setKey(kv.getKey()) + .setValue(kv.getValue().toByteArray()) + .setVersion(kv.getVersion()); + } + + @Override + public ListKeyVersionsResponse listKeyVersions(ListKeyVersionsRequest request) { + throw new UnsupportedOperationException("Operation not implemented"); + } +} + diff --git a/app/src/main/java/org/vss/impl/postgres/sql/v0_create_vss_db.sql b/app/src/main/java/org/vss/impl/postgres/sql/v0_create_vss_db.sql new file mode 100644 index 0000000..4e8ae49 --- /dev/null +++ b/app/src/main/java/org/vss/impl/postgres/sql/v0_create_vss_db.sql @@ -0,0 +1,8 @@ +CREATE TABLE vss_db ( + store_id character varying(120) NOT NULL, + key character varying(120) NOT NULL, + value bytea NULL, + version bigint NOT NULL, + PRIMARY KEY (store_id, key) +); + diff --git a/app/src/main/proto/vss.proto b/app/src/main/proto/vss.proto index 7d673b7..23e1648 100644 --- a/app/src/main/proto/vss.proto +++ b/app/src/main/proto/vss.proto @@ -9,7 +9,7 @@ message GetObjectRequest { // All APIs operate within a single store_id. // It is up to clients to use single or multiple stores for their use-case. // This can be used for client-isolation/ rate-limiting / throttling on the server-side. - // Authorization and billing can also be performed at the storeId level. + // Authorization and billing can also be performed at the store_id level. string store_id = 1; // Key for which the value is to be fetched. @@ -98,6 +98,79 @@ message PutObjectRequest { message PutObjectResponse { } +message ListKeyVersionsRequest { + + // store_id is a keyspace identifier. + // Ref: https://en.wikipedia.org/wiki/Keyspace_(distributed_data_store) + // All APIs operate within a single store_id. + // It is up to clients to use single or multiple stores for their use-case. + // This can be used for client-isolation/ rate-limiting / throttling on the server-side. + // Authorization and billing can also be performed at the store_id level. + string store_id = 1; + + // A key_prefix is a string of characters at the beginning of the key. Prefixes can be used as + // a way to organize key-values in a similar way to directories. + // + // If key_prefix is specified, the response results will be limited to those keys that begin with + // the specified prefix. + // + // If no key_prefix is specified or it is empty (""), all the keys are eligible to be returned in + // the response. + optional string key_prefix = 2; + + // page_size is used by clients to specify the maximum number of results that can be returned by + // the server. + // The server may further constrain the maximum number of results returned in a single page. + // If the page_size is 0 or not set, the server will decide the number of results to be returned. + optional int32 page_size = 3; + + // page_token is a pagination token. + // + // To query for the first page of ListKeyVersions, page_token must not be specified. + // + // For subsequent pages, use the value that was returned as `next_page_token` in the previous + // page's ListKeyVersionsResponse. + optional string page_token = 4; +} + +message ListKeyVersionsResponse { + + // Fetched keys and versions. + // Even though this API reuses KeyValue struct, the value sub-field will not be set by the server. + repeated KeyValue key_versions = 1; + + // next_page_token is a pagination token, used to retrieve the next page of results. + // Use this value to query for next_page of paginated ListKeyVersions operation, by specifying + // this value as the `page_token` in the next request. + // + // If next_page_token is empty (""), then the "last page" of results has been processed and + // there is no more data to be retrieved. + // + // If next_page_token is not empty, it does not necessarily mean that there is more data in the + // result set. The only way to know when you have reached the end of the result set is when + // next_page_token is empty. + // + // Caution: Clients must not assume a specific number of key_versions to be present in a page for + // paginated response. + optional string next_page_token = 2; + + // global_version is a sequence-number/version of the whole store. + // + // global_version is only returned in response for the first page of the ListKeyVersionsResponse + // and is guaranteed to be read before reading any key-versions. + // + // In case of refreshing complete key-version view on the client-side, correct usage for + // the returned global_version is as following: + // 1. Read global_version from the first page of paginated response, store it as local_variable. + // 2. Update all the key_versions on client-side from all the pages of paginated response. + // 3. Update global_version on client_side from the local_variable stored in step-1. + // This ensures that on client-side, we can guarantee that all current key_versions are at least + // from the corresponding global_version. This guarantee is helpful for ensuring the versioning + // correctness if using the global_version in PutObject API and can help avoid the race conditions + // related to it. + optional int64 global_version = 3; +} + // When HttpStatusCode is not ok (200), the response `content` contains a serialized ErrorResponse // with the relevant ErrorCode and message message ErrorResponse { diff --git a/app/src/test/java/org/vss/AbstractKVStoreIntegrationTest.java b/app/src/test/java/org/vss/AbstractKVStoreIntegrationTest.java new file mode 100644 index 0000000..6b15031 --- /dev/null +++ b/app/src/test/java/org/vss/AbstractKVStoreIntegrationTest.java @@ -0,0 +1,166 @@ +package org.vss; + +import com.google.protobuf.ByteString; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Objects; +import org.junit.jupiter.api.Test; +import org.vss.exception.ConflictException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public abstract class AbstractKVStoreIntegrationTest { + + private final String STORE_ID = "storeId"; + + protected KVStore kvStore; + + @Test + void putShouldSucceedWhenSingleObjectPutOperation() { + assertDoesNotThrow(() -> putObjects(0L, List.of(kv("k1", "k1v1", 0)))); + assertDoesNotThrow(() -> putObjects(1L, List.of(kv("k1", "k1v2", 1)))); + + KeyValue response = getObject("k1"); + assertThat(response.getKey(), is("k1")); + assertThat(response.getVersion(), is(2L)); + assertThat(response.getValue().toStringUtf8(), is("k1v2")); + } + + @Test + void putShouldSucceedWhenMultiObjectPutOperation() { + final List keyValues = List.of(kv("k1", "k1v1", 0), + kv("k2", "k2v1", 0)); + + assertDoesNotThrow(() -> putObjects(0L, keyValues)); + + List second_request = List.of(kv("k1", "k1v2", 1), + kv("k2", "k2v2", 1)); + putObjects(1L, second_request); + + KeyValue response = getObject("k1"); + assertThat(response.getKey(), is("k1")); + assertThat(response.getVersion(), is(2L)); + assertThat(response.getValue().toStringUtf8(), is("k1v2")); + + response = getObject("k2"); + assertThat(response.getKey(), is("k2")); + assertThat(response.getVersion(), is(2L)); + assertThat(response.getValue().toStringUtf8(), is("k2v2")); + } + + @Test + void putShouldFailWhenKeyVersionMismatched() { + putObjects(0L, List.of(kv("k1", "k1v1", 0))); + + // global_version correctly changed but key-version conflict. + assertThrows(ConflictException.class, () -> putObjects(1L, List.of(kv("k1", "k1v2", 0)))); + + //Verify that values didn't change + KeyValue response = getObject("k1"); + assertThat(response.getKey(), is("k1")); + assertThat(response.getVersion(), is(1L)); + assertThat(response.getValue().toStringUtf8(), is("k1v1")); + } + + @Test + void putShouldFailWhenGlobalVersionMismatched() { + putObjects(0L, List.of(kv("k1", "k1v1", 0))); + + // key-version correctly changed but global_version conflict. + assertThrows(ConflictException.class, () -> putObjects(0L, List.of(kv("k1", "k1v2", 1)))); + + //Verify that values didn't change + KeyValue response = getObject("k1"); + assertThat(response.getKey(), is("k1")); + assertThat(response.getVersion(), is(1L)); + assertThat(response.getValue().toStringUtf8(), is("k1v1")); + } + + @Test + void putShouldSucceedWhenNoGlobalVersionIsGiven() { + assertDoesNotThrow(() -> putObjects(null, List.of(kv("k1", "k1v1", 0)))); + assertDoesNotThrow(() -> putObjects(null, List.of(kv("k1", "k1v2", 1)))); + + KeyValue response = getObject("k1"); + assertThat(response.getKey(), is("k1")); + assertThat(response.getVersion(), is(2L)); + assertThat(response.getValue().toStringUtf8(), is("k1v2")); + } + + @Test + void getShouldReturnEmptyResponseWhenKeyDoesNotExist() { + KeyValue response = getObject("non_existent_key"); + + assertThat(response.getKey(), is("non_existent_key")); + assertTrue(response.getValue().isEmpty()); + } + + @Test + void getShouldReturnCorrectValueWhenKeyExists() { + + putObjects(0L, List.of(kv("k1", "k1v1", 0))); + + KeyValue response = getObject("k1"); + assertThat(response.getKey(), is("k1")); + assertThat(response.getVersion(), is(1L)); + assertThat(response.getValue().toStringUtf8(), is("k1v1")); + + List keyValues = List.of(kv("k1", "k1v2", 1), + kv("k2", "k2v1", 0)); + putObjects(1L, keyValues); + + response = getObject("k1"); + assertThat(response.getKey(), is("k1")); + assertThat(response.getVersion(), is(2L)); + assertThat(response.getValue().toStringUtf8(), is("k1v2")); + + response = getObject("k2"); + assertThat(response.getKey(), is("k2")); + assertThat(response.getVersion(), is(1L)); + assertThat(response.getValue().toStringUtf8(), is("k2v1")); + + keyValues = List.of(kv("k2", "k2v2", 1), + kv("k3", "k3v1", 0)); + putObjects(2L, keyValues); + + response = getObject("k2"); + assertThat(response.getKey(), is("k2")); + assertThat(response.getVersion(), is(2L)); + assertThat(response.getValue().toStringUtf8(), is("k2v2")); + + response = getObject("k3"); + assertThat(response.getKey(), is("k3")); + assertThat(response.getVersion(), is(1L)); + assertThat(response.getValue().toStringUtf8(), is("k3v1")); + } + + private KeyValue getObject(String key) { + GetObjectRequest getRequest = GetObjectRequest.newBuilder() + .setStoreId(STORE_ID) + .setKey(key) + .build(); + return this.kvStore.get(getRequest).getValue(); + } + + private void putObjects(Long globalVersion, List keyValues) { + PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.newBuilder() + .setStoreId(STORE_ID) + .addAllTransactionItems(keyValues); + + if (Objects.nonNull(globalVersion)) { + putObjectRequestBuilder.setGlobalVersion(globalVersion); + } + + this.kvStore.put(putObjectRequestBuilder.build()); + } + + private KeyValue kv(String key, String value, int version) { + return KeyValue.newBuilder().setKey(key).setVersion(version).setValue( + ByteString.copyFrom(value.getBytes( + StandardCharsets.UTF_8))).build(); + } +} diff --git a/app/src/test/java/org/vss/impl/postgres/PostgresBackendImplIntegrationTest.java b/app/src/test/java/org/vss/impl/postgres/PostgresBackendImplIntegrationTest.java new file mode 100644 index 0000000..d4c9eff --- /dev/null +++ b/app/src/test/java/org/vss/impl/postgres/PostgresBackendImplIntegrationTest.java @@ -0,0 +1,48 @@ +package org.vss.impl.postgres; + +import java.sql.Connection; +import java.sql.DriverManager; +import org.jooq.DSLContext; +import org.jooq.SQLDialect; +import org.jooq.impl.DSL; +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.vss.AbstractKVStoreIntegrationTest; + +@Testcontainers +public class PostgresBackendImplIntegrationTest extends AbstractKVStoreIntegrationTest { + + private final String POSTGRES_TEST_CONTAINER_DOCKER_IMAGE = "postgres:15"; + + @Container + private final PostgreSQLContainer postgreSQLContainer = + new PostgreSQLContainer(POSTGRES_TEST_CONTAINER_DOCKER_IMAGE) + .withDatabaseName("postgres") + .withUsername("postgres") + .withPassword("postgres"); + + @BeforeEach + public void initEach() throws Exception { + + // This is required to get postgres driver in classpath before we attempt to fetch a connection + Class.forName("org.postgresql.Driver"); + Connection conn = DriverManager.getConnection(postgreSQLContainer.getJdbcUrl(), + postgreSQLContainer.getUsername(), postgreSQLContainer.getPassword()); + DSLContext dslContext = DSL.using(conn, SQLDialect.POSTGRES); + + this.kvStore = new PostgresBackendImpl(dslContext); + + createTable(dslContext); + } + + private void createTable(DSLContext dslContext) { + dslContext.execute("CREATE TABLE vss_db (" + + "store_id character varying(120) NOT NULL CHECK (store_id <> '')," + + "key character varying(120) NOT NULL," + + "value bytea NULL," + + "version bigint NOT NULL," + + "PRIMARY KEY (store_id, key));"); + } +}