diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java index c7da9e27e18..b8655d91656 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java @@ -203,13 +203,14 @@ public int listen(Query query) { hardAssert(!queryViewsByQuery.containsKey(query), "We already listen to query: %s", query); TargetData targetData = localStore.allocateTarget(query.toTarget()); - remoteStore.listen(targetData); ViewSnapshot viewSnapshot = initializeViewAndComputeSnapshot( query, targetData.getTargetId(), targetData.getResumeToken()); syncEngineListener.onViewSnapshots(Collections.singletonList(viewSnapshot)); + remoteStore.listen(targetData); + return targetData.getTargetId(); } diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalSerializer.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalSerializer.java index fb4e476b411..98fb7230821 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalSerializer.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalSerializer.java @@ -260,7 +260,8 @@ TargetData decodeTargetData(com.google.firebase.firestore.proto.Target targetPro QueryPurpose.LISTEN, version, lastLimboFreeSnapshotVersion, - resumeToken); + resumeToken, + null); } public com.google.firestore.bundle.BundledQuery encodeBundledQuery(BundledQuery bundledQuery) { diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/TargetData.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/TargetData.java index 18375731fe0..ff2e3274cee 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/TargetData.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/TargetData.java @@ -16,10 +16,12 @@ import static com.google.firebase.firestore.util.Preconditions.checkNotNull; +import androidx.annotation.Nullable; import com.google.firebase.firestore.core.Target; import com.google.firebase.firestore.model.SnapshotVersion; import com.google.firebase.firestore.remote.WatchStream; import com.google.protobuf.ByteString; +import java.util.Objects; /** An immutable set of metadata that the store will need to keep track of for each target. */ public final class TargetData { @@ -30,6 +32,7 @@ public final class TargetData { private final SnapshotVersion snapshotVersion; private final SnapshotVersion lastLimboFreeSnapshotVersion; private final ByteString resumeToken; + private final @Nullable Integer expectedCount; /** * Creates a new TargetData with the given values. @@ -45,6 +48,9 @@ public final class TargetData { * @param resumeToken An opaque, server-assigned token that allows watching a target to be resumed * after disconnecting without retransmitting all the data that matches the target. The resume * token essentially identifies a point in time from which the server should resume sending + * @param expectedCount The number of documents that last matched the query at the resume token or + * read time. Documents are counted only when making a listen request with resume token or + * read time, otherwise, keep it null. */ TargetData( Target target, @@ -53,7 +59,8 @@ public final class TargetData { QueryPurpose purpose, SnapshotVersion snapshotVersion, SnapshotVersion lastLimboFreeSnapshotVersion, - ByteString resumeToken) { + ByteString resumeToken, + @Nullable Integer expectedCount) { this.target = checkNotNull(target); this.targetId = targetId; this.sequenceNumber = sequenceNumber; @@ -61,6 +68,7 @@ public final class TargetData { this.purpose = purpose; this.snapshotVersion = checkNotNull(snapshotVersion); this.resumeToken = checkNotNull(resumeToken); + this.expectedCount = expectedCount; } /** Convenience constructor for use when creating a TargetData for the first time. */ @@ -72,7 +80,8 @@ public TargetData(Target target, int targetId, long sequenceNumber, QueryPurpose purpose, SnapshotVersion.NONE, SnapshotVersion.NONE, - WatchStream.EMPTY_RESUME_TOKEN); + WatchStream.EMPTY_RESUME_TOKEN, + null); } /** Creates a new target data instance with an updated sequence number. */ @@ -84,7 +93,8 @@ public TargetData withSequenceNumber(long sequenceNumber) { purpose, snapshotVersion, lastLimboFreeSnapshotVersion, - resumeToken); + resumeToken, + /* expectedCount= */ null); } /** Creates a new target data instance with an updated resume token and snapshot version. */ @@ -96,7 +106,21 @@ public TargetData withResumeToken(ByteString resumeToken, SnapshotVersion snapsh purpose, snapshotVersion, lastLimboFreeSnapshotVersion, - resumeToken); + resumeToken, + expectedCount); + } + + /** Creates a new target data instance with an updated expected count. */ + public TargetData withExpectedCount(@Nullable Integer expectedCount) { + return new TargetData( + target, + targetId, + sequenceNumber, + purpose, + snapshotVersion, + lastLimboFreeSnapshotVersion, + resumeToken, + expectedCount); } /** Creates a new target data instance with an updated last limbo free snapshot version number. */ @@ -108,7 +132,8 @@ public TargetData withLastLimboFreeSnapshotVersion(SnapshotVersion lastLimboFree purpose, snapshotVersion, lastLimboFreeSnapshotVersion, - resumeToken); + resumeToken, + expectedCount); } public Target getTarget() { @@ -135,6 +160,11 @@ public ByteString getResumeToken() { return resumeToken; } + @Nullable + public Integer getExpectedCount() { + return expectedCount; + } + /** * Returns the last snapshot version for which the associated view contained no limbo documents. */ @@ -158,7 +188,8 @@ public boolean equals(Object o) { && purpose.equals(targetData.purpose) && snapshotVersion.equals(targetData.snapshotVersion) && lastLimboFreeSnapshotVersion.equals(targetData.lastLimboFreeSnapshotVersion) - && resumeToken.equals(targetData.resumeToken); + && resumeToken.equals(targetData.resumeToken) + && Objects.equals(expectedCount, targetData.expectedCount); } @Override @@ -170,6 +201,7 @@ public int hashCode() { result = 31 * result + snapshotVersion.hashCode(); result = 31 * result + lastLimboFreeSnapshotVersion.hashCode(); result = 31 * result + resumeToken.hashCode(); + result = 31 * result + Objects.hashCode(expectedCount); return result; } @@ -190,6 +222,8 @@ public String toString() { + lastLimboFreeSnapshotVersion + ", resumeToken=" + resumeToken + + ", expectedCount=" + + expectedCount + '}'; } } diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteSerializer.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteSerializer.java index 57f51a7b8cc..7f1f7781045 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteSerializer.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteSerializer.java @@ -499,6 +499,13 @@ public Target encodeTarget(TargetData targetData) { builder.setResumeToken(targetData.getResumeToken()); } + // TODO(Mila) Incorporate this into the if statement above. + if (targetData.getExpectedCount() != null + && (!targetData.getResumeToken().isEmpty() + || targetData.getSnapshotVersion().compareTo(SnapshotVersion.NONE) > 0)) { + builder.setExpectedCount(Int32Value.newBuilder().setValue(targetData.getExpectedCount())); + } + return builder.build(); } diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java index 3999f22a939..965ceee35da 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java @@ -369,6 +369,12 @@ public void listen(TargetData targetData) { private void sendWatchRequest(TargetData targetData) { watchChangeAggregator.recordPendingTargetRequest(targetData.getTargetId()); + if (!targetData.getResumeToken().isEmpty() + || targetData.getSnapshotVersion().compareTo(SnapshotVersion.NONE) > 0) { + int expectedCount = this.getRemoteKeysForTarget(targetData.getTargetId()).size(); + targetData = targetData.withExpectedCount(expectedCount); + } + watchStream.watchQuery(targetData); } diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/local/LocalSerializerTest.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/local/LocalSerializerTest.java index 124727f5681..20223e78dec 100644 --- a/firebase-firestore/src/test/java/com/google/firebase/firestore/local/LocalSerializerTest.java +++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/local/LocalSerializerTest.java @@ -378,7 +378,8 @@ public void testEncodesTargetData() { QueryPurpose.LISTEN, snapshotVersion, limboFreeVersion, - resumeToken); + resumeToken, + null); // Let the RPC serializer test various permutations of query serialization. com.google.firestore.v1.Target.QueryTarget queryTarget = diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/local/TargetCacheTestCase.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/local/TargetCacheTestCase.java index e098f7666f1..58a3a57ba69 100644 --- a/firebase-firestore/src/test/java/com/google/firebase/firestore/local/TargetCacheTestCase.java +++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/local/TargetCacheTestCase.java @@ -331,7 +331,8 @@ private TargetData newTargetData(Query query, int targetId, long version) { QueryPurpose.LISTEN, version(version), version(version), - resumeToken(version)); + resumeToken(version), + null); } /** Adds the given query data to the targetCache under test, committing immediately. */ diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/spec/SpecTestCase.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/spec/SpecTestCase.java index cf136ab45d8..7be9479da3b 100644 --- a/firebase-firestore/src/test/java/com/google/firebase/firestore/spec/SpecTestCase.java +++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/spec/SpecTestCase.java @@ -1006,6 +1006,9 @@ private void validateExpectedState(@Nullable JSONObject expectedState) throws JS targetData.withResumeToken( ByteString.EMPTY, version(queryDataJson.getInt("readTime"))); } + if (queryDataJson.has("expectedCount")) { + targetData = targetData.withExpectedCount(queryDataJson.getInt("expectedCount")); + } expectedActiveTargets.get(targetId).add(targetData); } @@ -1144,6 +1147,10 @@ private void validateActiveTargets() { expectedTarget.getResumeToken().toStringUtf8(), actualTarget.getResumeToken().toStringUtf8()); + if (expectedTarget.getExpectedCount() != null) { + assertEquals(expectedTarget.getExpectedCount(), actualTarget.getExpectedCount()); + } + actualTargets.remove(expected.getKey()); } diff --git a/firebase-firestore/src/test/resources/json/listen_spec_test.json b/firebase-firestore/src/test/resources/json/listen_spec_test.json index 5755019b048..85170a91d71 100644 --- a/firebase-firestore/src/test/resources/json/listen_spec_test.json +++ b/firebase-firestore/src/test/resources/json/listen_spec_test.json @@ -3337,6 +3337,158 @@ } ] }, + "ExpectedCount in listen request should work after coming back online": { + "describeName": "Listens:", + "itName": "ExpectedCount in listen request should work after coming back online", + "tags": [ + ], + "config": { + "numClients": 1, + "useGarbageCollection": false + }, + "steps": [ + { + "userListen": { + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + }, + "targetId": 2 + }, + "expectedState": { + "activeTargets": { + "2": { + "queries": [ + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "resumeToken": "" + } + } + } + }, + { + "watchAck": [ + 2 + ] + }, + { + "watchEntity": { + "docs": [ + { + "key": "collection/a", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "a" + }, + "version": 1000 + } + ], + "targets": [ + 2 + ] + } + }, + { + "watchCurrent": [ + [ + 2 + ], + "resume-token-1000" + ] + }, + { + "watchSnapshot": { + "targetIds": [ + ], + "version": 1000 + }, + "expectedSnapshotEvents": [ + { + "added": [ + { + "key": "collection/a", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "a" + }, + "version": 1000 + } + ], + "errorCode": 0, + "fromCache": false, + "hasPendingWrites": false, + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + } + ] + }, + { + "enableNetwork": false, + "expectedSnapshotEvents": [ + { + "errorCode": 0, + "fromCache": true, + "hasPendingWrites": false, + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + } + ], + "expectedState": { + "activeLimboDocs": [ + ], + "activeTargets": { + }, + "enqueuedLimboDocs": [ + ] + } + }, + { + "enableNetwork": true, + "expectedState": { + "activeTargets": { + "2": { + "queries": [ + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "resumeToken": "resume-token-1000", + "expectedCount": 1 + } + } + } + } + ] + }, "Ignores update from inactive target": { "describeName": "Listens:", "itName": "Ignores update from inactive target", @@ -12409,6 +12561,525 @@ } ] }, + "Resuming a query should specify expectedCount that does not include pending mutations": { + "describeName": "Listens:", + "itName": "Resuming a query should specify expectedCount that does not include pending mutations", + "tags": [ + ], + "config": { + "numClients": 1, + "useGarbageCollection": false + }, + "steps": [ + { + "userListen": { + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + }, + "targetId": 2 + }, + "expectedState": { + "activeTargets": { + "2": { + "queries": [ + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "resumeToken": "" + } + } + } + }, + { + "watchAck": [ + 2 + ] + }, + { + "watchEntity": { + "docs": [ + { + "key": "collection/a", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "a" + }, + "version": 1000 + } + ], + "targets": [ + 2 + ] + } + }, + { + "watchCurrent": [ + [ + 2 + ], + "resume-token-1000" + ] + }, + { + "watchSnapshot": { + "targetIds": [ + ], + "version": 1000 + }, + "expectedSnapshotEvents": [ + { + "added": [ + { + "key": "collection/a", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "a" + }, + "version": 1000 + } + ], + "errorCode": 0, + "fromCache": false, + "hasPendingWrites": false, + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + } + ] + }, + { + "userUnlisten": [ + 2, + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "expectedState": { + "activeTargets": { + } + } + }, + { + "userSet": [ + "collection/b", + { + "key": "b" + } + ] + }, + { + "userListen": { + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + }, + "targetId": 2 + }, + "expectedSnapshotEvents": [ + { + "added": [ + { + "key": "collection/a", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "a" + }, + "version": 1000 + }, + { + "key": "collection/b", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": true + }, + "value": { + "key": "b" + }, + "version": 0 + } + ], + "errorCode": 0, + "fromCache": true, + "hasPendingWrites": true, + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + } + ], + "expectedState": { + "activeTargets": { + "2": { + "queries": [ + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "resumeToken": "resume-token-1000", + "expectedCount": 1 + } + } + } + } + ] + }, + "Resuming a query should specify expectedCount when adding the target": { + "describeName": "Listens:", + "itName": "Resuming a query should specify expectedCount when adding the target", + "tags": [ + ], + "config": { + "numClients": 1, + "useGarbageCollection": false + }, + "steps": [ + { + "userListen": { + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + }, + "targetId": 2 + }, + "expectedState": { + "activeTargets": { + "2": { + "queries": [ + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "resumeToken": "" + } + } + } + }, + { + "watchAck": [ + 2 + ] + }, + { + "watchEntity": { + "docs": [ + ], + "targets": [ + 2 + ] + } + }, + { + "watchCurrent": [ + [ + 2 + ], + "resume-token-1000" + ] + }, + { + "watchSnapshot": { + "targetIds": [ + ], + "version": 1000 + }, + "expectedSnapshotEvents": [ + { + "errorCode": 0, + "fromCache": false, + "hasPendingWrites": false, + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + } + ] + }, + { + "userUnlisten": [ + 2, + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "expectedState": { + "activeTargets": { + } + } + }, + { + "watchRemove": { + "targetIds": [ + 2 + ] + } + }, + { + "userListen": { + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + }, + "targetId": 2 + }, + "expectedSnapshotEvents": [ + { + "errorCode": 0, + "fromCache": true, + "hasPendingWrites": false, + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + } + ], + "expectedState": { + "activeTargets": { + "2": { + "queries": [ + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "resumeToken": "resume-token-1000", + "expectedCount": 0 + } + } + } + }, + { + "watchAck": [ + 2 + ] + }, + { + "watchEntity": { + "docs": [ + { + "key": "collection/a", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "a" + }, + "version": 1000 + }, + { + "key": "collection/b", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "b" + }, + "version": 1000 + } + ], + "targets": [ + 2 + ] + } + }, + { + "watchCurrent": [ + [ + 2 + ], + "resume-token-2000" + ] + }, + { + "watchSnapshot": { + "targetIds": [ + ], + "version": 2000 + }, + "expectedSnapshotEvents": [ + { + "added": [ + { + "key": "collection/a", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "a" + }, + "version": 1000 + }, + { + "key": "collection/b", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "b" + }, + "version": 1000 + } + ], + "errorCode": 0, + "fromCache": false, + "hasPendingWrites": false, + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + } + ] + }, + { + "userUnlisten": [ + 2, + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "expectedState": { + "activeTargets": { + } + } + }, + { + "userListen": { + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + }, + "targetId": 2 + }, + "expectedSnapshotEvents": [ + { + "added": [ + { + "key": "collection/a", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "a" + }, + "version": 1000 + }, + { + "key": "collection/b", + "options": { + "hasCommittedMutations": false, + "hasLocalMutations": false + }, + "value": { + "key": "b" + }, + "version": 1000 + } + ], + "errorCode": 0, + "fromCache": true, + "hasPendingWrites": false, + "query": { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + } + ], + "expectedState": { + "activeTargets": { + "2": { + "queries": [ + { + "filters": [ + ], + "orderBys": [ + ], + "path": "collection" + } + ], + "resumeToken": "resume-token-2000", + "expectedCount": 2 + } + } + } + } + ] + }, "Secondary client advances query state with global snapshot from primary": { "describeName": "Listens:", "itName": "Secondary client advances query state with global snapshot from primary",