Skip to content

Commit 2898ee8

Browse files
authored
feat: allow limiting ReadChannel (#1180)
The ReadChannel returned from storage.reader and blob.reader now allow limiting the max number of bytes the channel will make available for read. Use ReadChannel#limit(long) to set the limit. This can be used in conjunction with seek to allow for range gets of objects independent of any buffer or chunk sizes.
1 parent 3f64f11 commit 2898ee8

File tree

3 files changed

+153
-4
lines changed

3 files changed

+153
-4
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannel.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static com.google.cloud.RetryHelper.runWithRetries;
2020

21+
import com.google.api.client.util.Preconditions;
2122
import com.google.api.gax.retrying.ResultRetryAlgorithm;
2223
import com.google.api.services.storage.model.StorageObject;
2324
import com.google.cloud.ReadChannel;
@@ -52,6 +53,7 @@ class BlobReadChannel implements ReadChannel {
5253
private final StorageObject storageObject;
5354
private int bufferPos;
5455
private byte[] buffer;
56+
private long limit;
5557

5658
BlobReadChannel(
5759
StorageOptions serviceOptions, BlobId blob, Map<StorageRpc.Option, ?> requestOptions) {
@@ -62,6 +64,7 @@ class BlobReadChannel implements ReadChannel {
6264
isOpen = true;
6365
storageRpc = serviceOptions.getStorageRpcV1();
6466
storageObject = blob.toPb();
67+
this.limit = Long.MAX_VALUE;
6568
}
6669

6770
@Override
@@ -71,7 +74,8 @@ public RestorableState<ReadChannel> capture() {
7174
.setPosition(position)
7275
.setIsOpen(isOpen)
7376
.setEndOfStream(endOfStream)
74-
.setChunkSize(chunkSize);
77+
.setChunkSize(chunkSize)
78+
.setLimit(limit);
7579
if (buffer != null) {
7680
builder.setPosition(position + bufferPos);
7781
builder.setEndOfStream(false);
@@ -119,7 +123,8 @@ public int read(ByteBuffer byteBuffer) throws IOException {
119123
if (endOfStream) {
120124
return -1;
121125
}
122-
final int toRead = Math.max(byteBuffer.remaining(), chunkSize);
126+
final int toRead =
127+
Math.toIntExact(Math.min(limit - position, Math.max(byteBuffer.remaining(), chunkSize)));
123128
try {
124129
ResultRetryAlgorithm<?> algorithm =
125130
retryAlgorithmManager.getForObjectsGet(storageObject, requestOptions);
@@ -158,6 +163,18 @@ public int read(ByteBuffer byteBuffer) throws IOException {
158163
return toWrite;
159164
}
160165

166+
@Override
167+
public ReadChannel limit(long limit) {
168+
Preconditions.checkArgument(limit >= 0, "Limit must be >= 0");
169+
this.limit = limit;
170+
return this;
171+
}
172+
173+
@Override
174+
public long limit() {
175+
return limit;
176+
}
177+
161178
static class StateImpl implements RestorableState<ReadChannel>, Serializable {
162179

163180
private static final long serialVersionUID = 3889420316004453706L;
@@ -170,6 +187,7 @@ static class StateImpl implements RestorableState<ReadChannel>, Serializable {
170187
private final boolean isOpen;
171188
private final boolean endOfStream;
172189
private final int chunkSize;
190+
private final long limit;
173191

174192
StateImpl(Builder builder) {
175193
this.serviceOptions = builder.serviceOptions;
@@ -180,6 +198,7 @@ static class StateImpl implements RestorableState<ReadChannel>, Serializable {
180198
this.isOpen = builder.isOpen;
181199
this.endOfStream = builder.endOfStream;
182200
this.chunkSize = builder.chunkSize;
201+
this.limit = builder.limit;
183202
}
184203

185204
static class Builder {
@@ -191,6 +210,7 @@ static class Builder {
191210
private boolean isOpen;
192211
private boolean endOfStream;
193212
private int chunkSize;
213+
private long limit;
194214

195215
private Builder(StorageOptions options, BlobId blob, Map<StorageRpc.Option, ?> reqOptions) {
196216
this.serviceOptions = options;
@@ -223,6 +243,11 @@ Builder setChunkSize(int chunkSize) {
223243
return this;
224244
}
225245

246+
Builder setLimit(long limit) {
247+
this.limit = limit;
248+
return this;
249+
}
250+
226251
RestorableState<ReadChannel> build() {
227252
return new StateImpl(this);
228253
}
@@ -241,13 +266,22 @@ public ReadChannel restore() {
241266
channel.isOpen = isOpen;
242267
channel.endOfStream = endOfStream;
243268
channel.chunkSize = chunkSize;
269+
channel.limit = limit;
244270
return channel;
245271
}
246272

247273
@Override
248274
public int hashCode() {
249275
return Objects.hash(
250-
serviceOptions, blob, requestOptions, lastEtag, position, isOpen, endOfStream, chunkSize);
276+
serviceOptions,
277+
blob,
278+
requestOptions,
279+
lastEtag,
280+
position,
281+
isOpen,
282+
endOfStream,
283+
chunkSize,
284+
limit);
251285
}
252286

253287
@Override
@@ -266,7 +300,8 @@ public boolean equals(Object obj) {
266300
&& this.position == other.position
267301
&& this.isOpen == other.isOpen
268302
&& this.endOfStream == other.endOfStream
269-
&& this.chunkSize == other.chunkSize;
303+
&& this.chunkSize == other.chunkSize
304+
&& this.limit == other.limit;
270305
}
271306

272307
@Override
@@ -276,6 +311,7 @@ public String toString() {
276311
.add("position", position)
277312
.add("isOpen", isOpen)
278313
.add("endOfStream", endOfStream)
314+
.add("limit", limit)
279315
.toString();
280316
}
281317
}

google-cloud-storage/src/test/java/com/google/cloud/storage/BlobReadChannelTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,13 +223,19 @@ public void testSaveAndRestore() throws IOException {
223223
public void testStateEquals() {
224224
replay(storageRpcMock);
225225
reader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS);
226+
int limit = 342;
227+
reader.limit(limit);
226228
@SuppressWarnings("resource") // avoid closing when you don't want partial writes to GCS
227229
ReadChannel secondReader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS);
230+
secondReader.limit(limit);
228231
RestorableState<ReadChannel> state = reader.capture();
229232
RestorableState<ReadChannel> secondState = secondReader.capture();
230233
assertEquals(state, secondState);
231234
assertEquals(state.hashCode(), secondState.hashCode());
232235
assertEquals(state.toString(), secondState.toString());
236+
237+
ReadChannel restore = secondState.restore();
238+
assertEquals(limit, restore.limit());
233239
}
234240

235241
private static byte[] randomByteArray(int size) {
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage.it;
18+
19+
import static com.google.common.truth.Truth.assertThat;
20+
21+
import com.google.cloud.NoCredentials;
22+
import com.google.cloud.ReadChannel;
23+
import com.google.cloud.WriteChannel;
24+
import com.google.cloud.storage.BlobInfo;
25+
import com.google.cloud.storage.Bucket;
26+
import com.google.cloud.storage.BucketInfo;
27+
import com.google.cloud.storage.DataGeneration;
28+
import com.google.cloud.storage.Storage;
29+
import com.google.cloud.storage.StorageOptions;
30+
import com.google.cloud.storage.conformance.retry.TestBench;
31+
import java.io.IOException;
32+
import java.nio.ByteBuffer;
33+
import java.util.Random;
34+
import org.junit.ClassRule;
35+
import org.junit.Rule;
36+
import org.junit.Test;
37+
import org.junit.rules.TestName;
38+
39+
public final class ITBlobReadChannelTest {
40+
41+
private static final int _16MiB = 16 * 1024 * 1024;
42+
private static final int _256KiB = 256 * 1024;
43+
44+
@ClassRule
45+
public static final TestBench testBench =
46+
TestBench.newBuilder().setContainerName("blob-read-channel-test").build();
47+
48+
@Rule public final TestName testName = new TestName();
49+
50+
@Rule public final DataGeneration dataGeneration = new DataGeneration(new Random(872364872));
51+
52+
@Test
53+
public void testLimit_smallerThanOneChunk() throws IOException {
54+
int srcContentSize = _256KiB;
55+
int rangeBegin = 57;
56+
int rangeEnd = 2384;
57+
int chunkSize = _16MiB;
58+
doLimitTest(srcContentSize, rangeBegin, rangeEnd, chunkSize);
59+
}
60+
61+
@Test
62+
public void testLimit_largerThanOneChunk() throws IOException {
63+
int srcContentSize = _16MiB + (_256KiB * 3);
64+
int rangeBegin = 384;
65+
int rangeEnd = rangeBegin + _16MiB;
66+
int chunkSize = _16MiB;
67+
68+
doLimitTest(srcContentSize, rangeBegin, rangeEnd, chunkSize);
69+
}
70+
71+
private void doLimitTest(int srcContentSize, int rangeBegin, int rangeEnd, int chunkSize)
72+
throws IOException {
73+
Storage s =
74+
StorageOptions.newBuilder()
75+
.setProjectId("blob-read-channel-test")
76+
.setHost(testBench.getBaseUri())
77+
.setCredentials(NoCredentials.getInstance())
78+
.build()
79+
.getService();
80+
81+
String testNameMethodName = testName.getMethodName();
82+
String bucketName = String.format("bucket-%s", testNameMethodName.toLowerCase());
83+
String blobName = String.format("%s/src", testNameMethodName);
84+
85+
Bucket bucket = s.create(BucketInfo.of(bucketName));
86+
BlobInfo src = BlobInfo.newBuilder(bucket, blobName).build();
87+
ByteBuffer content = dataGeneration.randByteBuffer(srcContentSize);
88+
ByteBuffer expectedSubContent = content.duplicate();
89+
expectedSubContent.position(rangeBegin);
90+
expectedSubContent.limit(rangeEnd);
91+
try (WriteChannel writer = s.writer(src)) {
92+
writer.write(content);
93+
}
94+
95+
ByteBuffer actual = ByteBuffer.allocate(rangeEnd - rangeBegin);
96+
97+
try (ReadChannel reader = s.reader(src.getBlobId())) {
98+
reader.setChunkSize(chunkSize);
99+
reader.seek(rangeBegin);
100+
reader.limit(rangeEnd);
101+
reader.read(actual);
102+
actual.flip();
103+
}
104+
105+
assertThat(actual).isEqualTo(expectedSubContent);
106+
}
107+
}

0 commit comments

Comments
 (0)