Skip to content

Commit 61758e0

Browse files
authored
feat: support readTime in Datastore query splitter. (#763)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/java-datastore/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> ☕️ If you write sample code, please follow the [samples format]( https://github.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
1 parent 7358aa3 commit 61758e0

File tree

8 files changed

+568
-122
lines changed

8 files changed

+568
-122
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!-- see http://mojo.codehaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
3+
<differences>
4+
<difference>
5+
<className>com/google/datastore/v1/client/QuerySplitter</className>
6+
<method>java.util.List getSplits(com.google.datastore.v1.Query, com.google.datastore.v1.PartitionId, int, com.google.datastore.v1.client.Datastore, com.google.protobuf.Timestamp)</method>
7+
<differenceType>7012</differenceType>
8+
</difference>
9+
</differences>

datastore-v1-proto-client/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@
8383
<artifactId>protobuf-java</artifactId>
8484
</dependency>
8585

86+
<dependency>
87+
<groupId>com.google.api</groupId>
88+
<artifactId>api-common</artifactId>
89+
</dependency>
90+
8691
<!-- Test dependencies. -->
8792
<dependency>
8893
<groupId>junit</groupId>

datastore-v1-proto-client/src/main/java/com/google/datastore/v1/client/QuerySplitter.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
*/
1616
package com.google.datastore.v1.client;
1717

18+
import com.google.api.core.BetaApi;
1819
import com.google.datastore.v1.PartitionId;
1920
import com.google.datastore.v1.Query;
21+
import com.google.protobuf.Timestamp;
2022
import java.util.List;
2123

2224
/** Provides the ability to split a query into multiple shards. */
@@ -39,4 +41,16 @@ public interface QuerySplitter {
3941
*/
4042
List<Query> getSplits(Query query, PartitionId partition, int numSplits, Datastore datastore)
4143
throws DatastoreException;
44+
45+
/**
46+
* Same as {@link #getSplits(Query, PartitionId, int, Datastore)} but the splits are based on
47+
* {@code readTime}, and the returned sharded {@link Query}s should also be executed with {@code
48+
* readTime}. Reading from a timestamp is currently a private preview feature in Datastore.
49+
*/
50+
@BetaApi
51+
default List<Query> getSplits(
52+
Query query, PartitionId partition, int numSplits, Datastore datastore, Timestamp readTime)
53+
throws DatastoreException {
54+
throw new UnsupportedOperationException("Not implemented.");
55+
}
4256
}

datastore-v1-proto-client/src/main/java/com/google/datastore/v1/client/QuerySplitterImpl.java

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static com.google.datastore.v1.client.DatastoreHelper.makeAndFilter;
1919

20+
import com.google.api.core.BetaApi;
2021
import com.google.datastore.v1.EntityResult;
2122
import com.google.datastore.v1.Filter;
2223
import com.google.datastore.v1.Key;
@@ -29,11 +30,14 @@
2930
import com.google.datastore.v1.Query;
3031
import com.google.datastore.v1.QueryResultBatch;
3132
import com.google.datastore.v1.QueryResultBatch.MoreResultsType;
33+
import com.google.datastore.v1.ReadOptions;
3234
import com.google.datastore.v1.RunQueryRequest;
35+
import com.google.protobuf.Timestamp;
3336
import java.util.ArrayList;
3437
import java.util.Collections;
3538
import java.util.EnumSet;
3639
import java.util.List;
40+
import javax.annotation.Nullable;
3741

3842
/**
3943
* Provides the ability to split a query into multiple shards using Cloud Datastore.
@@ -63,7 +67,24 @@ private QuerySplitterImpl() {
6367
public List<Query> getSplits(
6468
Query query, PartitionId partition, int numSplits, Datastore datastore)
6569
throws DatastoreException, IllegalArgumentException {
70+
return getSplitsInternal(query, partition, numSplits, datastore, null);
71+
}
6672

73+
@BetaApi
74+
@Override
75+
public List<Query> getSplits(
76+
Query query, PartitionId partition, int numSplits, Datastore datastore, Timestamp readTime)
77+
throws DatastoreException, IllegalArgumentException {
78+
return getSplitsInternal(query, partition, numSplits, datastore, readTime);
79+
}
80+
81+
private List<Query> getSplitsInternal(
82+
Query query,
83+
PartitionId partition,
84+
int numSplits,
85+
Datastore datastore,
86+
@Nullable Timestamp readTime)
87+
throws DatastoreException, IllegalArgumentException {
6788
List<Query> splits = new ArrayList<Query>(numSplits);
6889
if (numSplits == 1) {
6990
splits.add(query);
@@ -72,7 +93,7 @@ public List<Query> getSplits(
7293
validateQuery(query);
7394
validateSplitSize(numSplits);
7495

75-
List<Key> scatterKeys = getScatterKeys(numSplits, query, partition, datastore);
96+
List<Key> scatterKeys = getScatterKeys(numSplits, query, partition, datastore, readTime);
7697
Key lastKey = null;
7798
for (Key nextKey : getSplitKey(scatterKeys, numSplits)) {
7899
splits.add(createSplit(lastKey, nextKey, query));
@@ -182,23 +203,28 @@ private Query createSplit(Key lastKey, Key nextKey, Query query) {
182203
* @param query the user query.
183204
* @param partition the partition to run the query in.
184205
* @param datastore the datastore containing the data.
206+
* @param readTime read time at which to get the split keys from the datastore.
185207
* @throws DatastoreException if there was an error when executing the datastore query.
186208
*/
187209
private List<Key> getScatterKeys(
188-
int numSplits, Query query, PartitionId partition, Datastore datastore)
210+
int numSplits,
211+
Query query,
212+
PartitionId partition,
213+
Datastore datastore,
214+
@Nullable Timestamp readTime)
189215
throws DatastoreException {
190216
Query.Builder scatterPointQuery = createScatterQuery(query, numSplits);
191217

192218
List<Key> keySplits = new ArrayList<Key>();
193219

194220
QueryResultBatch batch;
195221
do {
196-
RunQueryRequest scatterRequest =
197-
RunQueryRequest.newBuilder()
198-
.setPartitionId(partition)
199-
.setQuery(scatterPointQuery)
200-
.build();
201-
batch = datastore.runQuery(scatterRequest).getBatch();
222+
RunQueryRequest.Builder scatterRequest =
223+
RunQueryRequest.newBuilder().setPartitionId(partition).setQuery(scatterPointQuery);
224+
if (readTime != null) {
225+
scatterRequest.setReadOptions(ReadOptions.newBuilder().setReadTime(readTime).build());
226+
}
227+
batch = datastore.runQuery(scatterRequest.build()).getBatch();
202228
for (EntityResult result : batch.getEntityResultsList()) {
203229
keySplits.add(result.getEntity().getKey());
204230
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.datastore.v1.client.testing;
17+
18+
import com.google.api.client.auth.oauth2.Credential;
19+
import com.google.api.client.http.HttpRequest;
20+
import java.io.IOException;
21+
22+
/** Fake credential used for testing purpose. */
23+
public class MockCredential extends Credential {
24+
public MockCredential() {
25+
super(
26+
new AccessMethod() {
27+
@Override
28+
public void intercept(HttpRequest request, String accessToken) throws IOException {}
29+
30+
@Override
31+
public String getAccessTokenFromRequest(HttpRequest request) {
32+
return "MockAccessToken";
33+
}
34+
});
35+
}
36+
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.datastore.v1.client.testing;
17+
18+
import static com.google.common.base.Preconditions.checkState;
19+
20+
import com.google.api.client.auth.oauth2.Credential;
21+
import com.google.api.client.http.GenericUrl;
22+
import com.google.api.client.http.HttpRequestFactory;
23+
import com.google.api.client.http.HttpStatusCodes;
24+
import com.google.api.client.http.HttpTransport;
25+
import com.google.api.client.http.LowLevelHttpRequest;
26+
import com.google.api.client.http.LowLevelHttpResponse;
27+
import com.google.api.client.testing.http.MockHttpTransport;
28+
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
29+
import com.google.api.client.testing.http.MockLowLevelHttpResponse;
30+
import com.google.api.client.testing.util.TestableByteArrayInputStream;
31+
import com.google.common.collect.Iterables;
32+
import com.google.datastore.v1.client.DatastoreFactory;
33+
import com.google.datastore.v1.client.DatastoreOptions;
34+
import com.google.protobuf.Message;
35+
import com.google.rpc.Code;
36+
import com.google.rpc.Status;
37+
import java.io.ByteArrayOutputStream;
38+
import java.io.IOException;
39+
import java.util.List;
40+
41+
/** Fake Datastore factory used for testing purposes when a true Datastore service is not needed. */
42+
public class MockDatastoreFactory extends DatastoreFactory {
43+
private int nextStatus;
44+
private Message nextResponse;
45+
private Status nextError;
46+
private IOException nextException;
47+
48+
private String lastPath;
49+
private String lastMimeType;
50+
private byte[] lastBody;
51+
private List<String> lastCookies;
52+
private String lastApiFormatHeaderValue;
53+
54+
public void setNextResponse(Message response) {
55+
nextStatus = HttpStatusCodes.STATUS_CODE_OK;
56+
nextResponse = response;
57+
nextError = null;
58+
nextException = null;
59+
}
60+
61+
public void setNextError(int status, Code code, String message) {
62+
nextStatus = status;
63+
nextResponse = null;
64+
nextError = makeErrorContent(message, code);
65+
nextException = null;
66+
}
67+
68+
public void setNextException(IOException exception) {
69+
nextStatus = 0;
70+
nextResponse = null;
71+
nextError = null;
72+
nextException = exception;
73+
}
74+
75+
@Override
76+
public HttpRequestFactory makeClient(DatastoreOptions options) {
77+
HttpTransport transport =
78+
new MockHttpTransport() {
79+
@Override
80+
public LowLevelHttpRequest buildRequest(String method, String url) {
81+
return new MockLowLevelHttpRequest(url) {
82+
@Override
83+
public LowLevelHttpResponse execute() throws IOException {
84+
lastPath = new GenericUrl(getUrl()).getRawPath();
85+
lastMimeType = getContentType();
86+
lastCookies = getHeaderValues("Cookie");
87+
lastApiFormatHeaderValue =
88+
Iterables.getOnlyElement(getHeaderValues("X-Goog-Api-Format-Version"));
89+
ByteArrayOutputStream out = new ByteArrayOutputStream();
90+
getStreamingContent().writeTo(out);
91+
lastBody = out.toByteArray();
92+
if (nextException != null) {
93+
throw nextException;
94+
}
95+
MockLowLevelHttpResponse response =
96+
new MockLowLevelHttpResponse()
97+
.setStatusCode(nextStatus)
98+
.setContentType("application/x-protobuf");
99+
if (nextError != null) {
100+
checkState(nextResponse == null);
101+
response.setContent(new TestableByteArrayInputStream(nextError.toByteArray()));
102+
} else {
103+
response.setContent(new TestableByteArrayInputStream(nextResponse.toByteArray()));
104+
}
105+
return response;
106+
}
107+
};
108+
}
109+
};
110+
Credential credential = options.getCredential();
111+
return transport.createRequestFactory(credential);
112+
}
113+
114+
public String getLastPath() {
115+
return lastPath;
116+
}
117+
118+
public String getLastMimeType() {
119+
return lastMimeType;
120+
}
121+
122+
public String getLastApiFormatHeaderValue() {
123+
return lastApiFormatHeaderValue;
124+
}
125+
126+
public byte[] getLastBody() {
127+
return lastBody;
128+
}
129+
130+
public List<String> getLastCookies() {
131+
return lastCookies;
132+
}
133+
134+
private static Status makeErrorContent(String message, Code code) {
135+
return Status.newBuilder().setCode(code.getNumber()).setMessage(message).build();
136+
}
137+
}

0 commit comments

Comments
 (0)