diff --git a/app/build.gradle b/app/build.gradle index 83102a3..8791b2a 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -1,39 +1,101 @@ buildscript { - ext.gradleVersion = '7.5.1' - ext.protobufPlugInVersion = '0.8.12' - ext.protobufVersion = '3.21.7' - ext.jerseyVersion = '3.1.0' - ext.junitVersion = '5.9.0' + ext.gradleVersion = '7.5.1' + ext.protobufPlugInVersion = '0.8.12' + ext.protobufVersion = '3.21.7' + ext.jerseyVersion = '3.1.0' + ext.junitVersion = '5.9.0' + ext.postgresVersion = '42.5.1' + ext.jooqVersion = '3.17.7' + ext.guiceVersion = '5.1.0' } plugins { - id 'java' - id 'com.google.protobuf' version "${protobufPlugInVersion}" - id 'war' - id 'idea' + id 'java' + id 'com.google.protobuf' version "${protobufPlugInVersion}" + id 'war' + id 'idea' + id 'nu.studer.jooq' version '8.0' } repositories { - mavenCentral() + mavenCentral() } idea { - module { - generatedSourceDirs.add(file("build/generated/proto/main")) - } + module { + generatedSourceDirs.add(file("build/generated/proto/main")) + } } group 'org.vss' version '1.0' - dependencies { - implementation "com.google.protobuf:protobuf-java:$protobufVersion" + implementation "com.google.protobuf:protobuf-java:$protobufVersion" + + //jOOQ & Postgres impl deps + implementation "org.jooq:jooq:$jooqVersion" + implementation "org.jooq:jooq-meta:$jooqVersion" + implementation "org.jooq:jooq-codegen:$jooqVersion" + runtimeOnly "org.postgresql:postgresql:$postgresVersion" + jooqGenerator "org.postgresql:postgresql:$postgresVersion" - testImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion" - testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion" + implementation "com.google.inject:guice:$guiceVersion" + + testImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion" + testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion" + testImplementation "org.hamcrest:hamcrest-library:2.2" + testImplementation "org.testcontainers:junit-jupiter:1.17.6" + testImplementation "org.testcontainers:postgresql:1.17.6" } test { - useJUnitPlatform() -} \ No newline at end of file + useJUnitPlatform() +} + +protobuf { + protoc { + artifact = "com.google.protobuf:protoc:$protobufVersion" + } +} + +jooq { + configurations { + main { + generateSchemaSourceOnCompilation = true + + generationTool { + jdbc { + driver = 'org.postgresql.Driver' + url = 'jdbc:postgresql://localhost:5432/postgres' + user = 'postgres' + password = '' + properties { + property { + key = 'ssl' + value = 'false' + } + } + } + generator { + name = 'org.jooq.codegen.DefaultGenerator' + database { + name = 'org.jooq.meta.postgres.PostgresDatabase' + inputSchema = 'public' + } + generate { + deprecated = false + records = true + immutablePojos = true + fluentSetters = true + } + target { + packageName = 'org.vss.postgres' + directory = 'build/generated-src/jooq/main' + } + strategy.name = 'org.jooq.codegen.DefaultGeneratorStrategy' + } + } + } + } +} diff --git a/app/src/main/java/org/vss/KVStore.java b/app/src/main/java/org/vss/KVStore.java new file mode 100644 index 0000000..dec1da6 --- /dev/null +++ b/app/src/main/java/org/vss/KVStore.java @@ -0,0 +1,12 @@ +package org.vss; + +public interface KVStore { + + String GLOBAL_VERSION_KEY = "vss_global_version"; + + GetObjectResponse get(GetObjectRequest request); + + PutObjectResponse put(PutObjectRequest request); + + ListKeyVersionsResponse listKeyVersions(ListKeyVersionsRequest request); +} diff --git a/app/src/main/java/org/vss/exception/ConflictException.java b/app/src/main/java/org/vss/exception/ConflictException.java new file mode 100644 index 0000000..f4edd2e --- /dev/null +++ b/app/src/main/java/org/vss/exception/ConflictException.java @@ -0,0 +1,7 @@ +package org.vss.exception; + +public class ConflictException extends RuntimeException { + public ConflictException(String message) { + super(message); + } +} diff --git a/app/src/main/java/org/vss/impl/postgres/PostgresBackendImpl.java b/app/src/main/java/org/vss/impl/postgres/PostgresBackendImpl.java new file mode 100644 index 0000000..03e9371 --- /dev/null +++ b/app/src/main/java/org/vss/impl/postgres/PostgresBackendImpl.java @@ -0,0 +1,132 @@ +package org.vss.impl.postgres; + +import com.google.inject.Inject; +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import javax.inject.Singleton; +import org.jooq.DSLContext; +import org.jooq.Insert; +import org.jooq.Query; +import org.jooq.Update; +import org.vss.GetObjectRequest; +import org.vss.GetObjectResponse; +import org.vss.KVStore; +import org.vss.KeyValue; +import org.vss.ListKeyVersionsRequest; +import org.vss.ListKeyVersionsResponse; +import org.vss.PutObjectRequest; +import org.vss.PutObjectResponse; +import org.vss.exception.ConflictException; +import org.vss.postgres.tables.records.VssDbRecord; + +import static org.jooq.impl.DSL.val; +import static org.vss.postgres.tables.VssDb.VSS_DB; + +@Singleton +public class PostgresBackendImpl implements KVStore { + + private final DSLContext context; + + @Inject + public PostgresBackendImpl(DSLContext context) { + this.context = context; + } + + @Override + public GetObjectResponse get(GetObjectRequest request) { + + VssDbRecord vssDbRecord = context.selectFrom(VSS_DB) + .where(VSS_DB.STORE_ID.eq(request.getStoreId()) + .and(VSS_DB.KEY.eq(request.getKey()))) + .fetchOne(); + + final KeyValue keyValue; + + if (vssDbRecord != null) { + keyValue = KeyValue.newBuilder() + .setKey(vssDbRecord.getKey()) + .setValue(ByteString.copyFrom(vssDbRecord.getValue())) + .setVersion(vssDbRecord.getVersion()) + .build(); + } else { + keyValue = KeyValue.newBuilder() + .setKey(request.getKey()).build(); + } + + return GetObjectResponse.newBuilder() + .setValue(keyValue) + .build(); + } + + @Override + public PutObjectResponse put(PutObjectRequest request) { + + String storeId = request.getStoreId(); + + List vssRecords = new ArrayList<>(request.getTransactionItemsList().stream() + .map(kv -> buildVssRecord(storeId, kv)).toList()); + + if (request.hasGlobalVersion()) { + VssDbRecord globalVersionRecord = buildVssRecord(storeId, + KeyValue.newBuilder() + .setKey(GLOBAL_VERSION_KEY) + .setVersion(request.getGlobalVersion()) + .build()); + + vssRecords.add(globalVersionRecord); + } + + context.transaction((ctx) -> { + DSLContext dsl = ctx.dsl(); + List batchQueries = vssRecords.stream() + .map(vssRecord -> buildPutObjectQuery(dsl, vssRecord)).toList(); + + int[] batchResult = dsl.batch(batchQueries).execute(); + + for (int numOfRowsUpdated : batchResult) { + if (numOfRowsUpdated == 0) { + throw new ConflictException( + "Transaction could not be completed due to a possible conflict"); + } + } + }); + + return PutObjectResponse.newBuilder().build(); + } + + private Query buildPutObjectQuery(DSLContext dsl, VssDbRecord vssRecord) { + return vssRecord.getVersion() == 0 ? buildInsertRecordQuery(dsl, vssRecord) + : buildUpdateRecordQuery(dsl, vssRecord); + } + + private Insert buildInsertRecordQuery(DSLContext dsl, VssDbRecord vssRecord) { + return dsl.insertInto(VSS_DB) + .values(vssRecord.getStoreId(), vssRecord.getKey(), + vssRecord.getValue(), 1) + .onDuplicateKeyIgnore(); + } + + private Update buildUpdateRecordQuery(DSLContext dsl, VssDbRecord vssRecord) { + return dsl.update(VSS_DB) + .set(Map.of(VSS_DB.VALUE, vssRecord.getValue(), + VSS_DB.VERSION, vssRecord.getVersion() + 1)) + .where(VSS_DB.STORE_ID.eq(vssRecord.getStoreId()) + .and(VSS_DB.KEY.eq(vssRecord.getKey())) + .and(VSS_DB.VERSION.eq(vssRecord.getVersion()))); + } + + private VssDbRecord buildVssRecord(String storeId, KeyValue kv) { + return new VssDbRecord() + .setStoreId(storeId) + .setKey(kv.getKey()) + .setValue(kv.getValue().toByteArray()) + .setVersion(kv.getVersion()); + } + + @Override + public ListKeyVersionsResponse listKeyVersions(ListKeyVersionsRequest request) { + throw new UnsupportedOperationException("Operation not implemented"); + } +} diff --git a/app/src/main/java/org/vss/impl/postgres/sql/v0_create_vss_db.sql b/app/src/main/java/org/vss/impl/postgres/sql/v0_create_vss_db.sql new file mode 100644 index 0000000..2e27028 --- /dev/null +++ b/app/src/main/java/org/vss/impl/postgres/sql/v0_create_vss_db.sql @@ -0,0 +1,7 @@ +CREATE TABLE vss_db ( + store_id character varying(120) NOT NULL CHECK (store_id <> ''), + key character varying(120) NOT NULL, + value bytea NULL, + version bigint NOT NULL, + PRIMARY KEY (store_id, key) +); diff --git a/app/src/test/java/org/vss/AbstractKVStoreIntegrationTest.java b/app/src/test/java/org/vss/AbstractKVStoreIntegrationTest.java new file mode 100644 index 0000000..b0e6742 --- /dev/null +++ b/app/src/test/java/org/vss/AbstractKVStoreIntegrationTest.java @@ -0,0 +1,191 @@ +package org.vss; + +import com.google.protobuf.ByteString; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Objects; +import org.junit.jupiter.api.Test; +import org.vss.exception.ConflictException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public abstract class AbstractKVStoreIntegrationTest { + + private final String STORE_ID = "storeId"; + + protected KVStore kvStore; + + @Test + void putShouldSucceedWhenSingleObjectPutOperation() { + assertDoesNotThrow(() -> putObjects(0L, List.of(kv("k1", "k1v1", 0)))); + assertDoesNotThrow(() -> putObjects(1L, List.of(kv("k1", "k1v2", 1)))); + + KeyValue response = getObject("k1"); + assertThat(response.getKey(), is("k1")); + assertThat(response.getVersion(), is(2L)); + assertThat(response.getValue().toStringUtf8(), is("k1v2")); + } + + @Test + void putShouldSucceedWhenMultiObjectPutOperation() { + final List keyValues = List.of(kv("k1", "k1v1", 0), + kv("k2", "k2v1", 0)); + + assertDoesNotThrow(() -> putObjects(0L, keyValues)); + + List second_request = List.of(kv("k1", "k1v2", 1), + kv("k2", "k2v2", 1)); + putObjects(1L, second_request); + + KeyValue response = getObject("k1"); + assertThat(response.getKey(), is("k1")); + assertThat(response.getVersion(), is(2L)); + assertThat(response.getValue().toStringUtf8(), is("k1v2")); + + response = getObject("k2"); + assertThat(response.getKey(), is("k2")); + assertThat(response.getVersion(), is(2L)); + assertThat(response.getValue().toStringUtf8(), is("k2v2")); + } + + @Test + void putShouldFailWhenKeyVersionMismatched() { + putObjects(0L, List.of(kv("k1", "k1v1", 0))); + + // global_version correctly changed but key-version conflict. + assertThrows(ConflictException.class, () -> putObjects(1L, List.of(kv("k1", "k1v2", 0)))); + + //Verify that values didn't change + KeyValue response = getObject("k1"); + assertThat(response.getKey(), is("k1")); + assertThat(response.getVersion(), is(1L)); + assertThat(response.getValue().toStringUtf8(), is("k1v1")); + } + + @Test + void putMultiObjectShouldFailWhenSingleKeyVersionMismatched() { + final List keyValues = List.of(kv("k1", "k1v1", 0), + kv("k2", "k2v1", 0)); + + assertDoesNotThrow(() -> putObjects(null, keyValues)); + + List second_request = List.of(kv("k1", "k1v2", 0), + kv("k2", "k2v2", 1)); + + assertThrows(ConflictException.class, () -> putObjects(null, second_request)); + + //Verify that values didn't change + KeyValue response = getObject("k1"); + assertThat(response.getKey(), is("k1")); + assertThat(response.getVersion(), is(1L)); + assertThat(response.getValue().toStringUtf8(), is("k1v1")); + + response = getObject("k2"); + assertThat(response.getKey(), is("k2")); + assertThat(response.getVersion(), is(1L)); + assertThat(response.getValue().toStringUtf8(), is("k2v1")); + } + + @Test + void putShouldFailWhenGlobalVersionMismatched() { + putObjects(0L, List.of(kv("k1", "k1v1", 0))); + + // key-version correctly changed but global_version conflict. + assertThrows(ConflictException.class, () -> putObjects(0L, List.of(kv("k1", "k1v2", 1)))); + + //Verify that values didn't change + KeyValue response = getObject("k1"); + assertThat(response.getKey(), is("k1")); + assertThat(response.getVersion(), is(1L)); + assertThat(response.getValue().toStringUtf8(), is("k1v1")); + } + + @Test + void putShouldSucceedWhenNoGlobalVersionIsGiven() { + assertDoesNotThrow(() -> putObjects(null, List.of(kv("k1", "k1v1", 0)))); + assertDoesNotThrow(() -> putObjects(null, List.of(kv("k1", "k1v2", 1)))); + + KeyValue response = getObject("k1"); + assertThat(response.getKey(), is("k1")); + assertThat(response.getVersion(), is(2L)); + assertThat(response.getValue().toStringUtf8(), is("k1v2")); + } + + @Test + void getShouldReturnEmptyResponseWhenKeyDoesNotExist() { + KeyValue response = getObject("non_existent_key"); + + assertThat(response.getKey(), is("non_existent_key")); + assertTrue(response.getValue().isEmpty()); + assertThat(response.getVersion(), is(0L)); + } + + @Test + void getShouldReturnCorrectValueWhenKeyExists() { + + putObjects(0L, List.of(kv("k1", "k1v1", 0))); + + KeyValue response = getObject("k1"); + assertThat(response.getKey(), is("k1")); + assertThat(response.getVersion(), is(1L)); + assertThat(response.getValue().toStringUtf8(), is("k1v1")); + + List keyValues = List.of(kv("k1", "k1v2", 1), + kv("k2", "k2v1", 0)); + putObjects(1L, keyValues); + + response = getObject("k1"); + assertThat(response.getKey(), is("k1")); + assertThat(response.getVersion(), is(2L)); + assertThat(response.getValue().toStringUtf8(), is("k1v2")); + + response = getObject("k2"); + assertThat(response.getKey(), is("k2")); + assertThat(response.getVersion(), is(1L)); + assertThat(response.getValue().toStringUtf8(), is("k2v1")); + + keyValues = List.of(kv("k2", "k2v2", 1), + kv("k3", "k3v1", 0)); + putObjects(2L, keyValues); + + response = getObject("k2"); + assertThat(response.getKey(), is("k2")); + assertThat(response.getVersion(), is(2L)); + assertThat(response.getValue().toStringUtf8(), is("k2v2")); + + response = getObject("k3"); + assertThat(response.getKey(), is("k3")); + assertThat(response.getVersion(), is(1L)); + assertThat(response.getValue().toStringUtf8(), is("k3v1")); + } + + private KeyValue getObject(String key) { + GetObjectRequest getRequest = GetObjectRequest.newBuilder() + .setStoreId(STORE_ID) + .setKey(key) + .build(); + return this.kvStore.get(getRequest).getValue(); + } + + private void putObjects(Long globalVersion, List keyValues) { + PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.newBuilder() + .setStoreId(STORE_ID) + .addAllTransactionItems(keyValues); + + if (Objects.nonNull(globalVersion)) { + putObjectRequestBuilder.setGlobalVersion(globalVersion); + } + + this.kvStore.put(putObjectRequestBuilder.build()); + } + + private KeyValue kv(String key, String value, int version) { + return KeyValue.newBuilder().setKey(key).setVersion(version).setValue( + ByteString.copyFrom(value.getBytes( + StandardCharsets.UTF_8))).build(); + } +} diff --git a/app/src/test/java/org/vss/impl/postgres/PostgresBackendImplIntegrationTest.java b/app/src/test/java/org/vss/impl/postgres/PostgresBackendImplIntegrationTest.java new file mode 100644 index 0000000..76bf78c --- /dev/null +++ b/app/src/test/java/org/vss/impl/postgres/PostgresBackendImplIntegrationTest.java @@ -0,0 +1,57 @@ +package org.vss.impl.postgres; + +import java.sql.Connection; +import java.sql.DriverManager; +import org.jooq.DSLContext; +import org.jooq.SQLDialect; +import org.jooq.impl.DSL; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.vss.AbstractKVStoreIntegrationTest; + +@Testcontainers +public class PostgresBackendImplIntegrationTest extends AbstractKVStoreIntegrationTest { + + private final String POSTGRES_TEST_CONTAINER_DOCKER_IMAGE = "postgres:15"; + + @Container + private final PostgreSQLContainer postgreSQLContainer = + new PostgreSQLContainer(POSTGRES_TEST_CONTAINER_DOCKER_IMAGE) + .withDatabaseName("postgres") + .withUsername("postgres") + .withPassword("postgres"); + + private Connection connection; + + @BeforeEach + void initEach() throws Exception { + + // This is required to get postgres driver in classpath before we attempt to fetch a connection + Class.forName("org.postgresql.Driver"); + this.connection = DriverManager.getConnection(postgreSQLContainer.getJdbcUrl(), + postgreSQLContainer.getUsername(), postgreSQLContainer.getPassword()); + DSLContext dslContext = DSL.using(connection, SQLDialect.POSTGRES); + + this.kvStore = new PostgresBackendImpl(dslContext); + + createTable(dslContext); + } + + @AfterEach + void destroy() throws Exception { + this.connection.close(); + } + + private void createTable(DSLContext dslContext) { + dslContext.execute("CREATE TABLE vss_db (" + + "store_id character varying(120) NOT NULL CHECK (store_id <> '')," + + "key character varying(120) NOT NULL," + + "value bytea NULL," + + "version bigint NOT NULL," + + "PRIMARY KEY (store_id, key)" + + ");"); + } +}