From c6e9e0dc84d0025ddb19e7e665b80c7930e53359 Mon Sep 17 00:00:00 2001 From: kolea2 Date: Mon, 29 Apr 2024 11:12:05 -0400 Subject: [PATCH] test(refactor): move aggregation tests into ITDatastoreTest --- .../it/ITDatastoreAggregationsTest.java | 361 ------------- .../cloud/datastore/it/ITDatastoreTest.java | 511 ++++++++++++++---- 2 files changed, 411 insertions(+), 461 deletions(-) delete mode 100644 google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITDatastoreAggregationsTest.java diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITDatastoreAggregationsTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITDatastoreAggregationsTest.java deleted file mode 100644 index fd430095f..000000000 --- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITDatastoreAggregationsTest.java +++ /dev/null @@ -1,361 +0,0 @@ -/* - * Copyright 2023 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.datastore.it; - -import static com.google.cloud.datastore.aggregation.Aggregation.avg; -import static com.google.cloud.datastore.aggregation.Aggregation.count; -import static com.google.cloud.datastore.aggregation.Aggregation.sum; -import static com.google.common.collect.Iterables.getOnlyElement; -import static com.google.common.truth.Truth.assertThat; - -import com.google.cloud.datastore.AggregationQuery; -import com.google.cloud.datastore.AggregationResult; -import com.google.cloud.datastore.Datastore; -import com.google.cloud.datastore.Datastore.TransactionCallable; -import com.google.cloud.datastore.DatastoreOptions; -import com.google.cloud.datastore.Entity; -import com.google.cloud.datastore.EntityQuery; -import com.google.cloud.datastore.GqlQuery; -import com.google.cloud.datastore.Key; -import com.google.cloud.datastore.Query; -import com.google.cloud.datastore.QueryResults; -import com.google.cloud.datastore.Transaction; -import com.google.cloud.datastore.testing.RemoteDatastoreHelper; -import com.google.common.collect.ImmutableList; -import com.google.datastore.v1.TransactionOptions; -import com.google.datastore.v1.TransactionOptions.ReadOnly; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import org.junit.After; -import org.junit.Test; - -// TODO(jainsahab) Move all the aggregation related tests from ITDatastoreTest to this file -public class ITDatastoreAggregationsTest { - - private static final RemoteDatastoreHelper HELPER = RemoteDatastoreHelper.create(); - private static final DatastoreOptions OPTIONS = HELPER.getOptions(); - private static final Datastore DATASTORE = OPTIONS.getService(); - - private static final String KIND = "Marks"; - - @After - public void tearDown() { - EntityQuery allEntitiesQuery = Query.newEntityQueryBuilder().build(); - QueryResults allEntities = DATASTORE.run(allEntitiesQuery); - Key[] keysToDelete = - ImmutableList.copyOf(allEntities).stream().map(Entity::getKey).toArray(Key[]::new); - DATASTORE.delete(keysToDelete); - } - - Key key1 = DATASTORE.newKeyFactory().setKind(KIND).newKey(1); - Key key2 = DATASTORE.newKeyFactory().setKind(KIND).newKey(2); - Key key3 = DATASTORE.newKeyFactory().setKind(KIND).newKey(3); - - Entity entity1 = - Entity.newBuilder(key1).set("name", "Jon Stark").set("marks", 89).set("cgpa", 7.34).build(); - Entity entity2 = - Entity.newBuilder(key2).set("name", "Arya Stark").set("marks", 95).set("cgpa", 9.27).build(); - Entity entity3 = - Entity.newBuilder(key3).set("name", "Night king").set("marks", 55).set("cgpa", 5.16).build(); - - @Test - public void testSumAggregation() { - DATASTORE.put(entity1, entity2); - - EntityQuery baseQuery = Query.newEntityQueryBuilder().setKind(KIND).build(); - AggregationQuery aggregationQuery = - Query.newAggregationQueryBuilder() - .over(baseQuery) - .addAggregations(sum("marks").as("total_marks")) - .setNamespace(OPTIONS.getNamespace()) - .build(); - - // sum of 2 entities - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getLong("total_marks")) - .isEqualTo(184L); - - // sum of 3 entities - DATASTORE.put(entity3); - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getLong("total_marks")) - .isEqualTo(239L); - } - - @Test - public void testSumAggregationWithAutoGeneratedAlias() { - DATASTORE.put(entity1, entity2); - - EntityQuery baseQuery = Query.newEntityQueryBuilder().setKind(KIND).build(); - AggregationQuery aggregationQuery = - Query.newAggregationQueryBuilder() - .over(baseQuery) - .addAggregations(sum("marks")) - .setNamespace(OPTIONS.getNamespace()) - .build(); - - // sum of 2 entities - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getLong("property_1")) - .isEqualTo(184L); - - // sum of 3 entities - DATASTORE.put(entity3); - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getLong("property_1")) - .isEqualTo(239L); - } - - @Test - public void testSumAggregationInGqlQuery() { - DATASTORE.put(entity1, entity2); - - GqlQuery gqlQuery = - GqlQuery.newGqlQueryBuilder( - "AGGREGATE SUM(marks) AS total_marks OVER (SELECT * FROM Marks)") - .build(); - - AggregationQuery aggregationQuery = - Query.newAggregationQueryBuilder() - .over(gqlQuery) - .setNamespace(OPTIONS.getNamespace()) - .build(); - - // sum of 2 entities - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getLong("total_marks")) - .isEqualTo(184L); - - // sum of 3 entities - DATASTORE.put(entity3); - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getLong("total_marks")) - .isEqualTo(239L); - } - - @Test - public void testSumAggregationWithResultOfDoubleType() { - DATASTORE.put(entity1, entity2); - - EntityQuery baseQuery = Query.newEntityQueryBuilder().setKind(KIND).build(); - AggregationQuery aggregationQuery = - Query.newAggregationQueryBuilder() - .over(baseQuery) - .addAggregations(sum("cgpa").as("total_cgpa")) - .setNamespace(OPTIONS.getNamespace()) - .build(); - - // sum of 2 entities - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getDouble("total_cgpa")) - .isEqualTo(16.61); - - // sum of 3 entities - DATASTORE.put(entity3); - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getDouble("total_cgpa")) - .isEqualTo(21.77); - } - - @Test - public void testAvgAggregation() { - DATASTORE.put(entity1, entity2); - - EntityQuery baseQuery = Query.newEntityQueryBuilder().setKind(KIND).build(); - AggregationQuery aggregationQuery = - Query.newAggregationQueryBuilder() - .over(baseQuery) - .addAggregations(avg("marks").as("avg_marks")) - .setNamespace(OPTIONS.getNamespace()) - .build(); - - // avg of 2 entities - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getDouble("avg_marks")) - .isEqualTo(92D); - - // avg of 3 entities - DATASTORE.put(entity3); - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getDouble("avg_marks")) - .isEqualTo(79.66666666666667); - } - - @Test - public void testAvgAggregationWithAutoGeneratedAlias() { - DATASTORE.put(entity1, entity2); - - EntityQuery baseQuery = Query.newEntityQueryBuilder().setKind(KIND).build(); - AggregationQuery aggregationQuery = - Query.newAggregationQueryBuilder() - .over(baseQuery) - .addAggregations(avg("marks")) - .setNamespace(OPTIONS.getNamespace()) - .build(); - - // avg of 2 entities - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getDouble("property_1")) - .isEqualTo(92D); - - // avg of 3 entities - DATASTORE.put(entity3); - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getDouble("property_1")) - .isEqualTo(79.66666666666667); - } - - @Test - public void testAvgAggregationInGqlQuery() { - DATASTORE.put(entity1, entity2); - - GqlQuery gqlQuery = - Query.newGqlQueryBuilder("AGGREGATE AVG(marks) AS avg_marks OVER (SELECT * FROM Marks)") - .build(); - - AggregationQuery aggregationQuery = - Query.newAggregationQueryBuilder() - .over(gqlQuery) - .setNamespace(OPTIONS.getNamespace()) - .build(); - - // avg of 2 entities - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getDouble("avg_marks")) - .isEqualTo(92D); - - // avg of 3 entities - DATASTORE.put(entity3); - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getDouble("avg_marks")) - .isEqualTo(79.66666666666667); - } - - @Test - public void testSumAndAvgAggregationTogether() { - DATASTORE.put(entity1, entity2); - - EntityQuery baseQuery = Query.newEntityQueryBuilder().setKind(KIND).build(); - AggregationQuery aggregationQuery = - Query.newAggregationQueryBuilder() - .over(baseQuery) - .addAggregations(sum("marks").as("total_marks")) - .addAggregations(avg("marks").as("avg_marks")) - .setNamespace(OPTIONS.getNamespace()) - .build(); - - // sum of 2 entities - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getLong("total_marks")) - .isEqualTo(184L); - // avg of 2 entities - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getDouble("avg_marks")) - .isEqualTo(92D); - } - - @Test - public void testTransactionShouldReturnAConsistentSnapshot() { - DATASTORE.put(entity1, entity2); - - EntityQuery baseQuery = Query.newEntityQueryBuilder().setKind(KIND).build(); - AggregationQuery aggregationQuery = - Query.newAggregationQueryBuilder() - .over(baseQuery) - .addAggregation(count().as("count")) - .addAggregations(sum("marks").as("total_marks")) - .addAggregations(avg("marks").as("avg_marks")) - .setNamespace(OPTIONS.getNamespace()) - .build(); - - // original entity count is 2 - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getLong("count")) - .isEqualTo(2L); - - // FIRST TRANSACTION - DATASTORE.runInTransaction( - (TransactionCallable) - inFirstTransaction -> { - // creating a new entity - inFirstTransaction.put(entity3); - - // aggregation result consistently being produced for original 2 entities - AggregationResult aggregationResult = - getOnlyElement(inFirstTransaction.runAggregation(aggregationQuery)); - assertThat(aggregationResult.getLong("count")).isEqualTo(2L); - assertThat(aggregationResult.getLong("total_marks")).isEqualTo(184L); - assertThat(aggregationResult.getDouble("avg_marks")).isEqualTo(92D); - return null; - }); - - // after first transaction is committed, we have 3 entities now. - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getLong("count")) - .isEqualTo(3L); - - // SECOND TRANSACTION - DATASTORE.runInTransaction( - (TransactionCallable) - inSecondTransaction -> { - // deleting ENTITY3 - inSecondTransaction.delete(entity3.getKey()); - - // aggregation result still coming for 3 entities - AggregationResult aggregationResult = - getOnlyElement(inSecondTransaction.runAggregation(aggregationQuery)); - assertThat(aggregationResult.getLong("count")).isEqualTo(3L); - assertThat(aggregationResult.getLong("total_marks")).isEqualTo(239L); - assertThat(aggregationResult.getDouble("avg_marks")).isEqualTo(79.66666666666667); - return null; - }); - - // after second transaction is committed, we are back to 2 entities now. - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getLong("count")) - .isEqualTo(2L); - } - - @Test - public void testReadOnlyTransactionShouldNotLockTheDocuments() - throws ExecutionException, InterruptedException { - ExecutorService executor = Executors.newSingleThreadExecutor(); - DATASTORE.put(entity1, entity2); - - EntityQuery baseQuery = Query.newEntityQueryBuilder().setKind(KIND).build(); - AggregationQuery aggregationQuery = - Query.newAggregationQueryBuilder() - .over(baseQuery) - .addAggregation(count().as("count")) - .addAggregations(sum("marks").as("total_marks")) - .addAggregations(avg("marks").as("avg_marks")) - .setNamespace(OPTIONS.getNamespace()) - .build(); - - TransactionOptions transactionOptions = - TransactionOptions.newBuilder().setReadOnly(ReadOnly.newBuilder().build()).build(); - Transaction readOnlyTransaction = DATASTORE.newTransaction(transactionOptions); - - // Executing query in transaction, results for original 2 entities - AggregationResult aggregationResult = - getOnlyElement(readOnlyTransaction.runAggregation(aggregationQuery)); - assertThat(aggregationResult.getLong("count")).isEqualTo(2L); - assertThat(aggregationResult.getLong("total_marks")).isEqualTo(184L); - assertThat(aggregationResult.getDouble("avg_marks")).isEqualTo(92D); - - // Concurrent write task. - Future addNewEntityTaskOutsideTransaction = - executor.submit( - () -> { - DATASTORE.put(entity3); - return null; - }); - - // should not throw exception and complete successfully as the ongoing transaction is read-only. - addNewEntityTaskOutsideTransaction.get(); - - // cleanup - readOnlyTransaction.commit(); - executor.shutdownNow(); - - assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).getLong("count")) - .isEqualTo(3L); - } -} diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITDatastoreTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITDatastoreTest.java index fded9bc3d..09dcd2116 100644 --- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITDatastoreTest.java +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITDatastoreTest.java @@ -16,7 +16,9 @@ package com.google.cloud.datastore.it; +import static com.google.cloud.datastore.aggregation.Aggregation.avg; import static com.google.cloud.datastore.aggregation.Aggregation.count; +import static com.google.cloud.datastore.aggregation.Aggregation.sum; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; @@ -31,6 +33,7 @@ import com.google.cloud.Timestamp; import com.google.cloud.Tuple; import com.google.cloud.datastore.AggregationQuery; +import com.google.cloud.datastore.AggregationResult; import com.google.cloud.datastore.AggregationResults; import com.google.cloud.datastore.Batch; import com.google.cloud.datastore.BooleanValue; @@ -86,6 +89,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -140,6 +144,7 @@ public class ITDatastoreTest { private static Key KEY3; private static Key KEY4; private static Key KEY5; + private static final String MARKS_KIND = "Marks"; private static FullEntity PARTIAL_ENTITY1; private static FullEntity PARTIAL_ENTITY2; private static FullEntity PARTIAL_ENTITY3; @@ -147,6 +152,10 @@ public class ITDatastoreTest { private static Entity ENTITY2; private static Entity ENTITY3; + private static Entity AGGREGATION_ENTITY_1; + private static Entity AGGREGATION_ENTITY_2; + private static Entity AGGREGATION_ENTITY_3; + @Rule public Timeout globalTimeout = Timeout.seconds(100); @Rule public MultipleAttemptsRule multipleAttemptsRule = new MultipleAttemptsRule(3); @@ -233,6 +242,29 @@ public ITDatastoreTest( .set("partial1", PARTIAL_ENTITY2) .set("partial2", ENTITY2) .build(); + + Key aggregationKey1 = datastore.newKeyFactory().setKind(MARKS_KIND).newKey(1); + Key aggregationKey2 = datastore.newKeyFactory().setKind(MARKS_KIND).newKey(2); + Key aggregationKey3 = datastore.newKeyFactory().setKind(MARKS_KIND).newKey(3); + + AGGREGATION_ENTITY_1 = + Entity.newBuilder(aggregationKey1) + .set("name", "Person1") + .set("marks", 89) + .set("cgpa", 7.34) + .build(); + AGGREGATION_ENTITY_2 = + Entity.newBuilder(aggregationKey2) + .set("name", "Person2") + .set("marks", 95) + .set("cgpa", 9.27) + .build(); + AGGREGATION_ENTITY_3 = + Entity.newBuilder(aggregationKey3) + .set("name", "Person3") + .set("marks", 55) + .set("cgpa", 5.16) + .build(); } @Before @@ -974,6 +1006,385 @@ public void testRunAggregationQueryWithLimit() { .build())); } + @Test + public void testSumAggregation() { + datastore.put(AGGREGATION_ENTITY_1, AGGREGATION_ENTITY_2); + + EntityQuery baseQuery = Query.newEntityQueryBuilder().setKind(MARKS_KIND).build(); + AggregationQuery aggregationQuery = + Query.newAggregationQueryBuilder() + .over(baseQuery) + .addAggregations(sum("marks").as("total_marks")) + .setNamespace(NAMESPACE) + .build(); + + // sum of 2 entities + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getLong("total_marks")) + .isEqualTo(184L); + + // sum of 3 entities + datastore.put(AGGREGATION_ENTITY_3); + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getLong("total_marks")) + .isEqualTo(239L); + } + + @Test + public void testSumAggregationWithAutoGeneratedAlias() { + datastore.put(AGGREGATION_ENTITY_1, AGGREGATION_ENTITY_2); + + EntityQuery baseQuery = Query.newEntityQueryBuilder().setKind(MARKS_KIND).build(); + AggregationQuery aggregationQuery = + Query.newAggregationQueryBuilder() + .over(baseQuery) + .addAggregations(sum("marks")) + .setNamespace(NAMESPACE) + .build(); + + // sum of 2 entities + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getLong("property_1")) + .isEqualTo(184L); + + // sum of 3 entities + datastore.put(AGGREGATION_ENTITY_3); + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getLong("property_1")) + .isEqualTo(239L); + } + + @Test + public void testSumAggregationInGqlQuery() { + datastore.put(AGGREGATION_ENTITY_1, AGGREGATION_ENTITY_2); + + GqlQuery gqlQuery = + GqlQuery.newGqlQueryBuilder( + "AGGREGATE SUM(marks) AS total_marks OVER (SELECT * FROM Marks)") + .build(); + + AggregationQuery aggregationQuery = + Query.newAggregationQueryBuilder().over(gqlQuery).setNamespace(NAMESPACE).build(); + + // sum of 2 entities + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getLong("total_marks")) + .isEqualTo(184L); + + // sum of 3 entities + datastore.put(AGGREGATION_ENTITY_3); + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getLong("total_marks")) + .isEqualTo(239L); + } + + @Test + public void testSumAggregationWithResultOfDoubleType() { + datastore.put(AGGREGATION_ENTITY_1, AGGREGATION_ENTITY_2); + + EntityQuery baseQuery = Query.newEntityQueryBuilder().setKind(MARKS_KIND).build(); + AggregationQuery aggregationQuery = + Query.newAggregationQueryBuilder() + .over(baseQuery) + .addAggregations(sum("cgpa").as("total_cgpa")) + .setNamespace(NAMESPACE) + .build(); + + // sum of 2 entities + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getDouble("total_cgpa")) + .isEqualTo(16.61); + + // sum of 3 entities + datastore.put(AGGREGATION_ENTITY_3); + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getDouble("total_cgpa")) + .isEqualTo(21.77); + } + + @Test + public void testAvgAggregation() { + datastore.put(AGGREGATION_ENTITY_1, AGGREGATION_ENTITY_2); + + EntityQuery baseQuery = Query.newEntityQueryBuilder().setKind(MARKS_KIND).build(); + AggregationQuery aggregationQuery = + Query.newAggregationQueryBuilder() + .over(baseQuery) + .addAggregations(avg("marks").as("avg_marks")) + .setNamespace(NAMESPACE) + .build(); + + // avg of 2 entities + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getDouble("avg_marks")) + .isEqualTo(92D); + + // avg of 3 entities + datastore.put(AGGREGATION_ENTITY_3); + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getDouble("avg_marks")) + .isEqualTo(79.66666666666667); + } + + @Test + public void testAvgAggregationWithAutoGeneratedAlias() { + datastore.put(AGGREGATION_ENTITY_1, AGGREGATION_ENTITY_2); + + EntityQuery baseQuery = Query.newEntityQueryBuilder().setKind(MARKS_KIND).build(); + AggregationQuery aggregationQuery = + Query.newAggregationQueryBuilder() + .over(baseQuery) + .addAggregations(avg("marks")) + .setNamespace(NAMESPACE) + .build(); + + // avg of 2 entities + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getDouble("property_1")) + .isEqualTo(92D); + + // avg of 3 entities + datastore.put(AGGREGATION_ENTITY_3); + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getDouble("property_1")) + .isEqualTo(79.66666666666667); + } + + @Test + public void testAvgAggregationInGqlQuery() { + datastore.put(AGGREGATION_ENTITY_1, AGGREGATION_ENTITY_2); + + GqlQuery gqlQuery = + Query.newGqlQueryBuilder("AGGREGATE AVG(marks) AS avg_marks OVER (SELECT * FROM Marks)") + .build(); + + AggregationQuery aggregationQuery = + Query.newAggregationQueryBuilder().over(gqlQuery).setNamespace(NAMESPACE).build(); + + // avg of 2 entities + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getDouble("avg_marks")) + .isEqualTo(92D); + + // avg of 3 entities + datastore.put(AGGREGATION_ENTITY_3); + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getDouble("avg_marks")) + .isEqualTo(79.66666666666667); + } + + @Test + public void testSumAndAvgAggregationTogether() { + datastore.put(AGGREGATION_ENTITY_1, AGGREGATION_ENTITY_2); + + EntityQuery baseQuery = Query.newEntityQueryBuilder().setKind(MARKS_KIND).build(); + AggregationQuery aggregationQuery = + Query.newAggregationQueryBuilder() + .over(baseQuery) + .addAggregations(sum("marks").as("total_marks")) + .addAggregations(avg("marks").as("avg_marks")) + .setNamespace(NAMESPACE) + .build(); + + // sum of 2 entities + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getLong("total_marks")) + .isEqualTo(184L); + // avg of 2 entities + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getDouble("avg_marks")) + .isEqualTo(92D); + } + + @Test + public void testTransactionShouldReturnAConsistentSnapshot() { + datastore.put(AGGREGATION_ENTITY_1, AGGREGATION_ENTITY_2); + + EntityQuery baseQuery = Query.newEntityQueryBuilder().setKind(MARKS_KIND).build(); + AggregationQuery aggregationQuery = + Query.newAggregationQueryBuilder() + .over(baseQuery) + .addAggregation(count().as("count")) + .addAggregations(sum("marks").as("total_marks")) + .addAggregations(avg("marks").as("avg_marks")) + .setNamespace(NAMESPACE) + .build(); + + // original entity count is 2 + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getLong("count")) + .isEqualTo(2L); + + // FIRST TRANSACTION + datastore.runInTransaction( + (TransactionCallable) + inFirstTransaction -> { + // creating a new entity + inFirstTransaction.put(AGGREGATION_ENTITY_3); + + // aggregation result consistently being produced for original 2 entities + AggregationResult aggregationResult = + getOnlyElement(inFirstTransaction.runAggregation(aggregationQuery)); + assertThat(aggregationResult.getLong("count")).isEqualTo(2L); + assertThat(aggregationResult.getLong("total_marks")).isEqualTo(184L); + assertThat(aggregationResult.getDouble("avg_marks")).isEqualTo(92D); + return null; + }); + + // after first transaction is committed, we have 3 entities now. + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getLong("count")) + .isEqualTo(3L); + + // SECOND TRANSACTION + datastore.runInTransaction( + (TransactionCallable) + inSecondTransaction -> { + // deleting ENTITY3 + inSecondTransaction.delete(AGGREGATION_ENTITY_3.getKey()); + + // aggregation result still coming for 3 entities + AggregationResult aggregationResult = + getOnlyElement(inSecondTransaction.runAggregation(aggregationQuery)); + assertThat(aggregationResult.getLong("count")).isEqualTo(3L); + assertThat(aggregationResult.getLong("total_marks")).isEqualTo(239L); + assertThat(aggregationResult.getDouble("avg_marks")).isEqualTo(79.66666666666667); + return null; + }); + + // after second transaction is committed, we are back to 2 entities now. + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getLong("count")) + .isEqualTo(2L); + } + + @Test + public void testReadOnlyTransactionShouldNotLockTheDocuments() + throws ExecutionException, InterruptedException { + ExecutorService executor = Executors.newSingleThreadExecutor(); + datastore.put(AGGREGATION_ENTITY_1, AGGREGATION_ENTITY_2); + + EntityQuery baseQuery = Query.newEntityQueryBuilder().setKind(MARKS_KIND).build(); + AggregationQuery aggregationQuery = + Query.newAggregationQueryBuilder() + .over(baseQuery) + .addAggregation(count().as("count")) + .addAggregations(sum("marks").as("total_marks")) + .addAggregations(avg("marks").as("avg_marks")) + .setNamespace(NAMESPACE) + .build(); + + TransactionOptions transactionOptions = + TransactionOptions.newBuilder().setReadOnly(ReadOnly.newBuilder().build()).build(); + Transaction readOnlyTransaction = datastore.newTransaction(transactionOptions); + + // Executing query in transaction, results for original 2 entities + AggregationResult aggregationResult = + getOnlyElement(readOnlyTransaction.runAggregation(aggregationQuery)); + assertThat(aggregationResult.getLong("count")).isEqualTo(2L); + assertThat(aggregationResult.getLong("total_marks")).isEqualTo(184L); + assertThat(aggregationResult.getDouble("avg_marks")).isEqualTo(92D); + + // Concurrent write task. + Future addNewEntityTaskOutsideTransaction = + executor.submit( + () -> { + datastore.put(AGGREGATION_ENTITY_3); + return null; + }); + + // should not throw exception and complete successfully as the ongoing transaction is read-only. + addNewEntityTaskOutsideTransaction.get(); + + // cleanup + readOnlyTransaction.commit(); + executor.shutdownNow(); + + assertThat(getOnlyElement(datastore.runAggregation(aggregationQuery)).getLong("count")) + .isEqualTo(3L); + } + + private void testCountAggregationWith(Consumer configurer) { + AggregationQuery.Builder builder = Query.newAggregationQueryBuilder().setNamespace(NAMESPACE); + configurer.accept(builder); + AggregationQuery aggregationQuery = builder.build(); + String alias = "total_count"; + + Long countBeforeAdd = getOnlyElement(datastore.runAggregation(aggregationQuery)).getLong(alias); + long expectedCount = countBeforeAdd + 1; + + Entity newEntity = + Entity.newBuilder(ENTITY1) + .setKey(Key.newBuilder(KEY3, KIND1, 1).build()) + .set("null", NULL_VALUE) + .set("partial1", PARTIAL_ENTITY2) + .set("partial2", ENTITY2) + .build(); + datastore.put(newEntity); + + Long countAfterAdd = getOnlyElement(datastore.runAggregation(aggregationQuery)).getLong(alias); + assertThat(countAfterAdd).isEqualTo(expectedCount); + + datastore.delete(newEntity.getKey()); + } + + private void testCountAggregationWithLimit( + Consumer withoutLimitConfigurer, + BiConsumer withLimitConfigurer) { + String alias = "total_count"; + + AggregationQuery.Builder withoutLimitBuilder = + Query.newAggregationQueryBuilder().setNamespace(NAMESPACE); + withoutLimitConfigurer.accept(withoutLimitBuilder); + + Long currentCount = + getOnlyElement(datastore.runAggregation(withoutLimitBuilder.build())).getLong(alias); + long limit = currentCount - 1; + + AggregationQuery.Builder withLimitBuilder = + Query.newAggregationQueryBuilder().setNamespace(NAMESPACE); + withLimitConfigurer.accept(withLimitBuilder, limit); + + Long countWithLimit = + getOnlyElement(datastore.runAggregation(withLimitBuilder.build())).getLong(alias); + assertThat(countWithLimit).isEqualTo(limit); + } + + private void testCountAggregationReadTimeWith(Consumer configurer) + throws InterruptedException { + Entity entity1 = + Entity.newBuilder( + Key.newBuilder(PROJECT_ID, "new_kind", "name-01") + .setDatabaseId(options.getDatabaseId()) + .setNamespace(NAMESPACE) + .build()) + .set("name", "name01") + .build(); + Entity entity2 = + Entity.newBuilder( + Key.newBuilder(PROJECT_ID, "new_kind", "name-02") + .setDatabaseId(options.getDatabaseId()) + .setNamespace(NAMESPACE) + .build()) + .set("name", "name02") + .build(); + Entity entity3 = + Entity.newBuilder( + Key.newBuilder(PROJECT_ID, "new_kind", "name-03") + .setDatabaseId(options.getDatabaseId()) + .setNamespace(NAMESPACE) + .build()) + .set("name", "name03") + .build(); + + datastore.put(entity1, entity2); + Thread.sleep(1000); + Timestamp now = Timestamp.now(); + Thread.sleep(1000); + datastore.put(entity3); + + try { + AggregationQuery.Builder builder = Query.newAggregationQueryBuilder().setNamespace(NAMESPACE); + configurer.accept(builder); + AggregationQuery countAggregationQuery = builder.build(); + + Long latestCount = + getOnlyElement(datastore.runAggregation(countAggregationQuery)).getLong("total_count"); + assertThat(latestCount).isEqualTo(3L); + + ExplainOptions explainOptions = ExplainOptions.newBuilder().setAnalyze(true).build(); + AggregationResults results = + datastore.runAggregation(countAggregationQuery, explainOptions, ReadOption.readTime(now)); + Long oldCount = getOnlyElement(results).getLong("total_count"); + assertThat(oldCount).isEqualTo(2L); + assertPlanSummary(results.getExplainMetrics().get().getPlanSummary()); + assertExecutionStats(results.getExplainMetrics().get().getExecutionStats().get(), 1, 1, "2"); + } finally { + datastore.delete(entity1.getKey(), entity2.getKey(), entity3.getKey()); + } + } + /** * if an entity is modified or deleted within a transaction, a query or lookup returns the * original version of the entity as of the beginning of the transaction, or nothing if the entity @@ -1737,106 +2148,6 @@ public void testQueryWithReadTime() throws InterruptedException { } } - private void testCountAggregationWith(Consumer configurer) { - AggregationQuery.Builder builder = Query.newAggregationQueryBuilder().setNamespace(NAMESPACE); - configurer.accept(builder); - AggregationQuery aggregationQuery = builder.build(); - String alias = "total_count"; - - Long countBeforeAdd = getOnlyElement(datastore.runAggregation(aggregationQuery)).getLong(alias); - long expectedCount = countBeforeAdd + 1; - - Entity newEntity = - Entity.newBuilder(ENTITY1) - .setKey(Key.newBuilder(KEY3, KIND1, 1).build()) - .set("null", NULL_VALUE) - .set("partial1", PARTIAL_ENTITY2) - .set("partial2", ENTITY2) - .build(); - datastore.put(newEntity); - - Long countAfterAdd = getOnlyElement(datastore.runAggregation(aggregationQuery)).getLong(alias); - assertThat(countAfterAdd).isEqualTo(expectedCount); - - datastore.delete(newEntity.getKey()); - } - - private void testCountAggregationWithLimit( - Consumer withoutLimitConfigurer, - BiConsumer withLimitConfigurer) { - String alias = "total_count"; - - AggregationQuery.Builder withoutLimitBuilder = - Query.newAggregationQueryBuilder().setNamespace(NAMESPACE); - withoutLimitConfigurer.accept(withoutLimitBuilder); - - Long currentCount = - getOnlyElement(datastore.runAggregation(withoutLimitBuilder.build())).getLong(alias); - long limit = currentCount - 1; - - AggregationQuery.Builder withLimitBuilder = - Query.newAggregationQueryBuilder().setNamespace(NAMESPACE); - withLimitConfigurer.accept(withLimitBuilder, limit); - - Long countWithLimit = - getOnlyElement(datastore.runAggregation(withLimitBuilder.build())).getLong(alias); - assertThat(countWithLimit).isEqualTo(limit); - } - - private void testCountAggregationReadTimeWith(Consumer configurer) - throws InterruptedException { - Entity entity1 = - Entity.newBuilder( - Key.newBuilder(PROJECT_ID, "new_kind", "name-01") - .setDatabaseId(options.getDatabaseId()) - .setNamespace(NAMESPACE) - .build()) - .set("name", "Tyrion Lannister") - .build(); - Entity entity2 = - Entity.newBuilder( - Key.newBuilder(PROJECT_ID, "new_kind", "name-02") - .setDatabaseId(options.getDatabaseId()) - .setNamespace(NAMESPACE) - .build()) - .set("name", "Jaime Lannister") - .build(); - Entity entity3 = - Entity.newBuilder( - Key.newBuilder(PROJECT_ID, "new_kind", "name-03") - .setDatabaseId(options.getDatabaseId()) - .setNamespace(NAMESPACE) - .build()) - .set("name", "Cersei Lannister") - .build(); - - datastore.put(entity1, entity2); - Thread.sleep(1000); - Timestamp now = Timestamp.now(); - Thread.sleep(1000); - datastore.put(entity3); - - try { - AggregationQuery.Builder builder = Query.newAggregationQueryBuilder().setNamespace(NAMESPACE); - configurer.accept(builder); - AggregationQuery countAggregationQuery = builder.build(); - - Long latestCount = - getOnlyElement(datastore.runAggregation(countAggregationQuery)).getLong("total_count"); - assertThat(latestCount).isEqualTo(3L); - - ExplainOptions explainOptions = ExplainOptions.newBuilder().setAnalyze(true).build(); - AggregationResults results = - datastore.runAggregation(countAggregationQuery, explainOptions, ReadOption.readTime(now)); - Long oldCount = getOnlyElement(results).getLong("total_count"); - assertThat(oldCount).isEqualTo(2L); - assertPlanSummary(results.getExplainMetrics().get().getPlanSummary()); - assertExecutionStats(results.getExplainMetrics().get().getExecutionStats().get(), 1, 1, "2"); - } finally { - datastore.delete(entity1.getKey(), entity2.getKey(), entity3.getKey()); - } - } - private void assertDatastoreException( DatastoreException expected, String reason, int statusCode) { assertEquals(reason, expected.getReason());