Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,11 @@ public synchronized int read() throws IOException {
// reclaim them (see MonoSendMany). Additionally, that very same operator requests
// 128 elements (that's hardcoded) once it's subscribed (later on, it requests
// by 64 elements), that's why we provide 64kb buffers.
return Flux.range(0, (int) Math.ceil((double) length / (double) chunkSize))

// length is at most 100MB so it's safe to cast back to an integer in this case
final int parts = (int) length / chunkSize;
final long remaining = length % chunkSize;
return Flux.range(0, remaining == 0 ? parts : parts + 1)
.map(i -> i * chunkSize)
.concatMap(pos -> Mono.fromCallable(() -> {
long count = pos + chunkSize > length ? length - pos : chunkSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,11 @@ public void testWriteBlobWithRetries() throws Exception {
public void testWriteLargeBlob() throws Exception {
final int maxRetries = randomIntBetween(2, 5);

final byte[] data = randomBytes((int) ByteSizeUnit.MB.toBytes(10));
int nbBlocks = (int) Math.ceil((double) data.length / (double) ByteSizeUnit.MB.toBytes(1));
final byte[] data = randomBytes(ByteSizeUnit.MB.toIntBytes(10) + randomIntBetween(0, ByteSizeUnit.MB.toIntBytes(1)));
int nbBlocks = data.length / ByteSizeUnit.MB.toIntBytes(1);
if (data.length % ByteSizeUnit.MB.toIntBytes(1) != 0) {
nbBlocks += 1;
}

final int nbErrors = 2; // we want all requests to fail at least once
final AtomicInteger countDownUploads = new AtomicInteger(nbErrors * nbBlocks);
Expand Down Expand Up @@ -378,6 +381,9 @@ public void testWriteLargeBlob() throws Exception {
if (randomBoolean()) {
Streams.readFully(exchange.getRequestBody());
AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
} else {
long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length"));
readFromInputStream(exchange.getRequestBody(), randomLongBetween(0, contentLength));
}
exchange.close();
});
Expand Down Expand Up @@ -621,4 +627,16 @@ private String getEndpointForServer(HttpServer server, String accountName) {
InetSocketAddress address = server.getAddress();
return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort() + "/" + accountName;
}

private void readFromInputStream(InputStream inputStream, long bytesToRead) {
try {
long totalBytesRead = 0;
while (inputStream.read() != -1 && totalBytesRead < bytesToRead) {
totalBytesRead += 1;
}
assertThat(totalBytesRead, equalTo(bytesToRead));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}