diff --git a/app/src/main/java/org/vss/KVStore.java b/app/src/main/java/org/vss/KVStore.java index dec1da6..23aae28 100644 --- a/app/src/main/java/org/vss/KVStore.java +++ b/app/src/main/java/org/vss/KVStore.java @@ -8,5 +8,7 @@ public interface KVStore { PutObjectResponse put(PutObjectRequest request); + DeleteObjectResponse delete(DeleteObjectRequest request); + ListKeyVersionsResponse listKeyVersions(ListKeyVersionsRequest request); } diff --git a/app/src/main/java/org/vss/api/DeleteObjectApi.java b/app/src/main/java/org/vss/api/DeleteObjectApi.java new file mode 100644 index 0000000..dd4b874 --- /dev/null +++ b/app/src/main/java/org/vss/api/DeleteObjectApi.java @@ -0,0 +1,34 @@ +package org.vss.api; + +import jakarta.inject.Inject; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import lombok.extern.slf4j.Slf4j; +import org.vss.DeleteObjectRequest; +import org.vss.DeleteObjectResponse; +import org.vss.KVStore; + +@Path(VssApiEndpoint.DELETE_OBJECT) +@Slf4j +public class DeleteObjectApi extends AbstractVssApi { + @Inject + public DeleteObjectApi(KVStore kvstore) { + super(kvstore); + } + + @POST + @Produces(MediaType.APPLICATION_OCTET_STREAM) + public Response execute(byte[] payload) { + try { + DeleteObjectRequest request = DeleteObjectRequest.parseFrom(payload); + DeleteObjectResponse response = kvStore.delete(request); + return toResponse(response); + } catch (Exception e) { + log.error("Exception in DeleteObjectApi: ", e); + return toErrorResponse(e); + } + } +} diff --git a/app/src/main/java/org/vss/api/VssApiEndpoint.java b/app/src/main/java/org/vss/api/VssApiEndpoint.java index e8c6ab4..983bc27 100644 --- a/app/src/main/java/org/vss/api/VssApiEndpoint.java +++ b/app/src/main/java/org/vss/api/VssApiEndpoint.java @@ -3,5 +3,6 @@ public class VssApiEndpoint { public static final String GET_OBJECT = "/getObject"; public static final String PUT_OBJECTS = "/putObjects"; + public static final String DELETE_OBJECT = "/deleteObject"; public static final String LIST_KEY_VERSIONS = "/listKeyVersions"; } diff --git a/app/src/main/java/org/vss/impl/postgres/PostgresBackendImpl.java b/app/src/main/java/org/vss/impl/postgres/PostgresBackendImpl.java index 73b1d3e..5fec767 100644 --- a/app/src/main/java/org/vss/impl/postgres/PostgresBackendImpl.java +++ b/app/src/main/java/org/vss/impl/postgres/PostgresBackendImpl.java @@ -11,6 +11,8 @@ import org.jooq.Insert; import org.jooq.Query; import org.jooq.Update; +import org.vss.DeleteObjectRequest; +import org.vss.DeleteObjectResponse; import org.vss.GetObjectRequest; import org.vss.GetObjectResponse; import org.vss.KVStore; @@ -66,7 +68,10 @@ public PutObjectResponse put(PutObjectRequest request) { String storeId = request.getStoreId(); - List vssRecords = new ArrayList<>(request.getTransactionItemsList().stream() + List vssPutRecords = new ArrayList<>(request.getTransactionItemsList().stream() + .map(kv -> buildVssRecord(storeId, kv)).toList()); + + List vssDeleteRecords = new ArrayList<>(request.getDeleteItemsList().stream() .map(kv -> buildVssRecord(storeId, kv)).toList()); if (request.hasGlobalVersion()) { @@ -74,15 +79,20 @@ public PutObjectResponse put(PutObjectRequest request) { KeyValue.newBuilder() .setKey(GLOBAL_VERSION_KEY) .setVersion(request.getGlobalVersion()) + .setValue(ByteString.EMPTY) .build()); - vssRecords.add(globalVersionRecord); + vssPutRecords.add(globalVersionRecord); } context.transaction((ctx) -> { DSLContext dsl = ctx.dsl(); - List batchQueries = vssRecords.stream() - .map(vssRecord -> buildPutObjectQuery(dsl, vssRecord)).toList(); + List batchQueries = new ArrayList<>(); + + batchQueries.addAll(vssPutRecords.stream() + .map(vssRecord -> buildPutObjectQuery(dsl, vssRecord)).toList()); + batchQueries.addAll(vssDeleteRecords.stream() + .map(vssRecord -> buildDeleteObjectQuery(dsl, vssRecord)).toList()); int[] batchResult = dsl.batch(batchQueries).execute(); @@ -97,6 +107,12 @@ public PutObjectResponse put(PutObjectRequest request) { return PutObjectResponse.newBuilder().build(); } + private Query buildDeleteObjectQuery(DSLContext dsl, VssDbRecord vssRecord) { + return dsl.deleteFrom(VSS_DB).where(VSS_DB.STORE_ID.eq(vssRecord.getStoreId()) + .and(VSS_DB.KEY.eq(vssRecord.getKey())) + .and(VSS_DB.VERSION.eq(vssRecord.getVersion()))); + } + private Query buildPutObjectQuery(DSLContext dsl, VssDbRecord vssRecord) { return vssRecord.getVersion() == 0 ? buildInsertRecordQuery(dsl, vssRecord) : buildUpdateRecordQuery(dsl, vssRecord); @@ -126,6 +142,20 @@ private VssDbRecord buildVssRecord(String storeId, KeyValue kv) { .setVersion(kv.getVersion()); } + @Override + public DeleteObjectResponse delete(DeleteObjectRequest request) { + String storeId = request.getStoreId(); + VssDbRecord vssDbRecord = buildVssRecord(storeId, request.getKeyValue()); + + context.transaction((ctx) -> { + DSLContext dsl = ctx.dsl(); + Query deleteObjectQuery = buildDeleteObjectQuery(dsl, vssDbRecord); + dsl.execute(deleteObjectQuery); + }); + + return DeleteObjectResponse.newBuilder().build(); + } + @Override public ListKeyVersionsResponse listKeyVersions(ListKeyVersionsRequest request) { String storeId = request.getStoreId(); diff --git a/app/src/main/proto/vss.proto b/app/src/main/proto/vss.proto index c620273..2ce6c90 100644 --- a/app/src/main/proto/vss.proto +++ b/app/src/main/proto/vss.proto @@ -1,7 +1,9 @@ syntax = "proto3"; +package vss; option java_multiple_files = true; -package org.vss; +option java_package = "org.vss"; +// Request payload to be used for `GetObject` API call to server. message GetObjectRequest { // store_id is a keyspace identifier. @@ -24,12 +26,14 @@ message GetObjectRequest { string key = 2; } +// Server response for `GetObject` API. message GetObjectResponse { // Fetched value and version along with the corresponding key in the request. KeyValue value = 2; } +// Request payload to be used for `PutObject` API call to server. message PutObjectRequest { // store_id is a keyspace identifier. @@ -65,9 +69,9 @@ message PutObjectRequest { // Clients can choose to encrypt the keys client-side in order to obfuscate their usage patterns. // If the write is successful, the previous value corresponding to the key will be overwritten. // - // Multiple items in transaction_items of a single PutObjectRequest are written in + // Multiple items in transaction_items and delete_items of a single PutObjectRequest are written in // a database-transaction in an all-or-nothing fashion. - // Items in a single PutObjectRequest must have distinct keys. + // All Items in a single PutObjectRequest must have distinct keys. // // Clients are expected to store a version against every key. // The write will succeed if the current DB version against the key is the same as in the request. @@ -93,11 +97,56 @@ message PutObjectRequest { // All PutObjectRequests are strongly consistent i.e. they provide read-after-write and // read-after-update consistency guarantees. repeated KeyValue transaction_items = 3; + + // Items to be deleted as a result of this PutObjectRequest. + // + // Each item in the `delete_items` field consists of a key and its corresponding version. + // The version is used to perform a version check before deleting the item. + // The delete will only succeed if the current database version against the key is the same as the version + // specified in the request. + // + // Fails with `CONFLICT_EXCEPTION` as the ErrorCode if: + // * The requested item does not exist. + // * The requested item does exist but there is a version-number mismatch with the one in the database. + // + // Multiple items in the `delete_items` field, along with the `transaction_items`, are written in a + // database transaction in an all-or-nothing fashion. + // + // All items within a single `PutObjectRequest` must have distinct keys. + repeated KeyValue delete_items = 4; } +// Server response for `PutObject` API. message PutObjectResponse { } +// Request payload to be used for `DeleteObject` API call to server. +message DeleteObjectRequest { + // store_id is a keyspace identifier. + // Ref: https://en.wikipedia.org/wiki/Keyspace_(distributed_data_store) + // All APIs operate within a single store_id. + // It is up to clients to use single or multiple stores for their use-case. + // This can be used for client-isolation/ rate-limiting / throttling on the server-side. + // Authorization and billing can also be performed at the store_id level. + string store_id = 1; + + // Item to be deleted as a result of this DeleteObjectRequest. + // + // An item consists of a key and its corresponding version. + // The item is only deleted if the current database version against the key is the same as the version + // specified in the request. + // This operation is idempotent, that is, multiple delete calls for the same item will not fail. + // + // If the requested item does not exist, this operation will not fail. + // If you wish to perform stricter checks while deleting an item, consider using PutObject API. + KeyValue key_value = 2; +} + +// Server response for `DeleteObject` API. +message DeleteObjectResponse{ +} + +// Request payload to be used for `ListKeyVersions` API call to server. message ListKeyVersionsRequest { // store_id is a keyspace identifier. @@ -133,6 +182,7 @@ message ListKeyVersionsRequest { optional string page_token = 4; } +// Server response for `ListKeyVersions` API. message ListKeyVersionsResponse { // Fetched keys and versions. @@ -206,6 +256,7 @@ enum ErrorCode { INTERNAL_SERVER_EXCEPTION = 3; } +// Represents KeyValue pair to be stored or retrieved. message KeyValue { // Key against which the value is stored. diff --git a/app/src/test/java/org/vss/AbstractKVStoreIntegrationTest.java b/app/src/test/java/org/vss/AbstractKVStoreIntegrationTest.java index 183b5ff..45868c7 100644 --- a/app/src/test/java/org/vss/AbstractKVStoreIntegrationTest.java +++ b/app/src/test/java/org/vss/AbstractKVStoreIntegrationTest.java @@ -131,6 +131,66 @@ void putShouldSucceedWhenNoGlobalVersionIsGiven() { assertThat(getObject(KVStore.GLOBAL_VERSION_KEY).getVersion(), is(0L)); } + @Test + void putAndDeleteShouldSucceedAsAtomicTransaction() { + assertDoesNotThrow(() -> putObjects(null, List.of(kv("k1", "k1v1", 0)))); + // Put and Delete succeeds + assertDoesNotThrow(() -> putAndDeleteObjects(null, List.of(kv("k2", "k2v1", 0)), List.of(kv("k1", "", 1)))); + + KeyValue response = getObject("k2"); + assertThat(response.getKey(), is("k2")); + assertThat(response.getVersion(), is(1L)); + assertThat(response.getValue().toStringUtf8(), is("k2v1")); + + assertTrue(getObject("k1").getValue().isEmpty()); + + // Delete fails (and hence put as well) due to mismatched version for the deleted item. + assertThrows(ConflictException.class, () -> putAndDeleteObjects(null, List.of(kv("k3", "k3v1", 0)), List.of(kv("k2", "", 3)))); + + assertTrue(getObject("k3").getValue().isEmpty()); + assertFalse(getObject("k2").getValue().isEmpty()); + + // Put fails (and hence delete as well) due to mismatched version for the put item. + assertThrows(ConflictException.class, () -> putAndDeleteObjects(null, List.of(kv("k3", "k3v1", 1)), List.of(kv("k2", "", 1)))); + + assertTrue(getObject("k3").getValue().isEmpty()); + assertFalse(getObject("k2").getValue().isEmpty()); + + // Put and delete both fail due to mismatched global version. + assertThrows(ConflictException.class, () -> putAndDeleteObjects(2L, List.of(kv("k3", "k3v1", 0)), List.of(kv("k2", "", 1)))); + + assertTrue(getObject("k3").getValue().isEmpty()); + assertFalse(getObject("k2").getValue().isEmpty()); + + assertThat(getObject(KVStore.GLOBAL_VERSION_KEY).getVersion(), is(0L)); + } + + @Test + void deleteShouldSucceedWhenItemExists() { + assertDoesNotThrow(() -> putObjects(null, List.of(kv("k1", "k1v1", 0)))); + assertDoesNotThrow(() -> deleteObject(kv("k1", "", 1))); + + KeyValue response = getObject("k1"); + assertThat(response.getKey(), is("k1")); + assertTrue(response.getValue().isEmpty()); + } + + @Test + void deleteShouldSucceedWhenItemDoesNotExist() { + assertDoesNotThrow(() -> deleteObject(kv("non_existent_key", "", 0))); + } + + @Test + void deleteShouldBeIdempotent() { + assertDoesNotThrow(() -> putObjects(null, List.of(kv("k1", "k1v1", 0)))); + assertDoesNotThrow(() -> deleteObject(kv("k1", "", 1))); + assertDoesNotThrow(() -> deleteObject(kv("k1", "", 1))); + + KeyValue response = getObject("k1"); + assertThat(response.getKey(), is("k1")); + assertTrue(response.getValue().isEmpty()); + } + @Test void getShouldReturnEmptyResponseWhenKeyDoesNotExist() { KeyValue response = getObject("non_existent_key"); @@ -370,6 +430,25 @@ private void putObjects(@Nullable Long globalVersion, List keyValues) this.kvStore.put(putObjectRequestBuilder.build()); } + private void putAndDeleteObjects(@Nullable Long globalVersion, List putKeyValues, List deleteKeyValues) { + PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.newBuilder() + .setStoreId(STORE_ID) + .addAllTransactionItems(putKeyValues) + .addAllDeleteItems(deleteKeyValues); + + if (Objects.nonNull(globalVersion)) { + putObjectRequestBuilder.setGlobalVersion(globalVersion); + } + + this.kvStore.put(putObjectRequestBuilder.build()); + } + + private void deleteObject(KeyValue keyValue) { + DeleteObjectRequest request = DeleteObjectRequest.newBuilder() + .setStoreId(STORE_ID).setKeyValue(keyValue).build(); + this.kvStore.delete(request); + } + private ListKeyVersionsResponse list(@Nullable String nextPageToken, @Nullable Integer pageSize, @Nullable String keyPrefix) { ListKeyVersionsRequest.Builder listRequestBuilder = ListKeyVersionsRequest.newBuilder() diff --git a/app/src/test/java/org/vss/api/DeleteObjectApiTest.java b/app/src/test/java/org/vss/api/DeleteObjectApiTest.java new file mode 100644 index 0000000..ec3ba4e --- /dev/null +++ b/app/src/test/java/org/vss/api/DeleteObjectApiTest.java @@ -0,0 +1,88 @@ +package org.vss.api; + +import com.google.protobuf.ByteString; +import jakarta.ws.rs.core.Response; +import java.nio.charset.StandardCharsets; +import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.vss.DeleteObjectRequest; +import org.vss.DeleteObjectResponse; +import org.vss.ErrorCode; +import org.vss.ErrorResponse; +import org.vss.KVStore; +import org.vss.KeyValue; +import org.vss.exception.ConflictException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class DeleteObjectApiTest { + private DeleteObjectApi deleteObjectApi; + private KVStore mockKVStore; + + private static String TEST_STORE_ID = "storeId"; + private static String TEST_KEY = "key"; + private static KeyValue TEST_KV = KeyValue.newBuilder().setKey(TEST_KEY).setValue( + ByteString.copyFrom("test_value", StandardCharsets.UTF_8)).build(); + + @BeforeEach + void setUp() { + mockKVStore = mock(KVStore.class); + deleteObjectApi = new DeleteObjectApi(mockKVStore); + } + + @Test + void execute_ValidPayload_ReturnsResponse() { + DeleteObjectRequest expectedRequest = + DeleteObjectRequest.newBuilder().setStoreId(TEST_STORE_ID).setKeyValue( + KeyValue.newBuilder().setKey(TEST_KEY).setVersion(0) + ).build(); + byte[] payload = expectedRequest.toByteArray(); + DeleteObjectResponse mockResponse = DeleteObjectResponse.newBuilder().build(); + when(mockKVStore.delete(expectedRequest)).thenReturn(mockResponse); + + Response actualResponse = deleteObjectApi.execute(payload); + + assertThat(actualResponse.getStatus(), is(Response.Status.OK.getStatusCode())); + assertThat(actualResponse.getEntity(), is(mockResponse.toByteArray())); + verify(mockKVStore).delete(expectedRequest); + } + + @ParameterizedTest + @MethodSource("provideErrorTestCases") + void execute_InvalidPayload_ReturnsErrorResponse(Exception exception, + ErrorCode errorCode) { + DeleteObjectRequest expectedRequest = + DeleteObjectRequest.newBuilder().setStoreId(TEST_STORE_ID).setKeyValue( + KeyValue.newBuilder().setKey(TEST_KEY).setVersion(0) + ).build(); + byte[] payload = expectedRequest.toByteArray(); + when(mockKVStore.delete(any())).thenThrow(exception); + + Response response = deleteObjectApi.execute(payload); + + ErrorResponse expectedErrorResponse = ErrorResponse.newBuilder() + .setErrorCode(errorCode) + .setMessage("") + .build(); + assertThat(response.getEntity(), is(expectedErrorResponse.toByteArray())); + assertThat(response.getStatus(), is(expectedErrorResponse.getErrorCode().getNumber())); + verify(mockKVStore).delete(expectedRequest); + } + + private static Stream provideErrorTestCases() { + return Stream.of( + Arguments.of(new ConflictException(""), ErrorCode.CONFLICT_EXCEPTION), + Arguments.of(new IllegalArgumentException(""), ErrorCode.INVALID_REQUEST_EXCEPTION), + Arguments.of(new RuntimeException(""), ErrorCode.INTERNAL_SERVER_EXCEPTION) + ); + } +}