Skip to content

Commit d391446

Browse files
authored
Resume partial download from S3 on connection drop (#46589)
Today if the connection to S3 times out or drops after starting to download an object then the SDK does not attempt to recover or resume the download, causing the restore of the whole shard to fail and retry. This commit allows Elasticsearch to detect such a mid-stream failure and to resume the download from where it failed.
1 parent a220f54 commit d391446

File tree

5 files changed

+264
-36
lines changed

5 files changed

+264
-36
lines changed

plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import com.amazonaws.AmazonClientException;
2323
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
24-
import com.amazonaws.services.s3.model.AmazonS3Exception;
2524
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
2625
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
2726
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
@@ -31,7 +30,6 @@
3130
import com.amazonaws.services.s3.model.ObjectMetadata;
3231
import com.amazonaws.services.s3.model.PartETag;
3332
import com.amazonaws.services.s3.model.PutObjectRequest;
34-
import com.amazonaws.services.s3.model.S3Object;
3533
import com.amazonaws.services.s3.model.UploadPartRequest;
3634
import com.amazonaws.services.s3.model.UploadPartResult;
3735
import org.apache.lucene.util.SetOnce;
@@ -48,7 +46,6 @@
4846

4947
import java.io.IOException;
5048
import java.io.InputStream;
51-
import java.nio.file.NoSuchFileException;
5249
import java.util.ArrayList;
5350
import java.util.HashSet;
5451
import java.util.List;
@@ -81,18 +78,7 @@ class S3BlobContainer extends AbstractBlobContainer {
8178

8279
@Override
8380
public InputStream readBlob(String blobName) throws IOException {
84-
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
85-
final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(blobStore.bucket(),
86-
buildKey(blobName)));
87-
return s3Object.getObjectContent();
88-
} catch (final AmazonClientException e) {
89-
if (e instanceof AmazonS3Exception) {
90-
if (404 == ((AmazonS3Exception) e).getStatusCode()) {
91-
throw new NoSuchFileException("Blob object [" + blobName + "] not found: " + e.getMessage());
92-
}
93-
}
94-
throw e;
95-
}
81+
return new S3RetryingInputStream(blobStore, buildKey(blobName));
9682
}
9783

9884
/**

plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ public AmazonS3Reference clientReference() {
6868
return service.client(repositoryMetaData);
6969
}
7070

71+
int getMaxRetries() {
72+
return service.settings(repositoryMetaData).maxRetries;
73+
}
74+
7175
public String bucket() {
7276
return bucket;
7377
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.repositories.s3;
20+
21+
import com.amazonaws.AmazonClientException;
22+
import com.amazonaws.services.s3.model.AmazonS3Exception;
23+
import com.amazonaws.services.s3.model.GetObjectRequest;
24+
import com.amazonaws.services.s3.model.S3Object;
25+
import org.apache.logging.log4j.LogManager;
26+
import org.apache.logging.log4j.Logger;
27+
import org.apache.logging.log4j.message.ParameterizedMessage;
28+
import org.elasticsearch.core.internal.io.IOUtils;
29+
import org.elasticsearch.Version;
30+
31+
import java.io.IOException;
32+
import java.io.InputStream;
33+
import java.nio.file.NoSuchFileException;
34+
import java.util.ArrayList;
35+
import java.util.List;
36+
37+
/**
38+
* Wrapper around an S3 object that will retry the {@link GetObjectRequest} if the download fails part-way through, resuming from where
39+
* the failure occurred. This should be handled by the SDK but it isn't today. This should be revisited in the future (e.g. before removing
40+
* the {@link Version#V_7_0_0} version constant) and removed when the SDK handles retries itself.
41+
*
42+
* See https://github.com/aws/aws-sdk-java/issues/856 for the related SDK issue
43+
*/
44+
class S3RetryingInputStream extends InputStream {
45+
46+
private static final Logger logger = LogManager.getLogger(S3RetryingInputStream.class);
47+
48+
static final int MAX_SUPPRESSED_EXCEPTIONS = 10;
49+
50+
private final S3BlobStore blobStore;
51+
private final String blobKey;
52+
private final int maxAttempts;
53+
54+
private InputStream currentStream;
55+
private int attempt = 1;
56+
private List<IOException> failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
57+
private long currentOffset;
58+
private boolean closed;
59+
60+
S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException {
61+
this.blobStore = blobStore;
62+
this.blobKey = blobKey;
63+
this.maxAttempts = blobStore.getMaxRetries() + 1;
64+
currentStream = openStream();
65+
}
66+
67+
private InputStream openStream() throws IOException {
68+
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
69+
final GetObjectRequest getObjectRequest = new GetObjectRequest(blobStore.bucket(), blobKey);
70+
if (currentOffset > 0) {
71+
getObjectRequest.setRange(currentOffset);
72+
}
73+
final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
74+
return s3Object.getObjectContent();
75+
} catch (final AmazonClientException e) {
76+
if (e instanceof AmazonS3Exception) {
77+
if (404 == ((AmazonS3Exception) e).getStatusCode()) {
78+
throw addSuppressedExceptions(new NoSuchFileException("Blob object [" + blobKey + "] not found: " + e.getMessage()));
79+
}
80+
}
81+
throw addSuppressedExceptions(e);
82+
}
83+
}
84+
85+
@Override
86+
public int read() throws IOException {
87+
ensureOpen();
88+
while (true) {
89+
try {
90+
final int result = currentStream.read();
91+
currentOffset += 1;
92+
return result;
93+
} catch (IOException e) {
94+
reopenStreamOrFail(e);
95+
}
96+
}
97+
}
98+
99+
@Override
100+
public int read(byte[] b, int off, int len) throws IOException {
101+
ensureOpen();
102+
while (true) {
103+
try {
104+
final int bytesRead = currentStream.read(b, off, len);
105+
if (bytesRead == -1) {
106+
return -1;
107+
}
108+
currentOffset += bytesRead;
109+
return bytesRead;
110+
} catch (IOException e) {
111+
reopenStreamOrFail(e);
112+
}
113+
}
114+
}
115+
116+
private void ensureOpen() {
117+
if (closed) {
118+
assert false : "using S3RetryingInputStream after close";
119+
throw new IllegalStateException("using S3RetryingInputStream after close");
120+
}
121+
}
122+
123+
private void reopenStreamOrFail(IOException e) throws IOException {
124+
if (attempt >= maxAttempts) {
125+
throw addSuppressedExceptions(e);
126+
}
127+
logger.debug(new ParameterizedMessage("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], retrying",
128+
blobStore.bucket(), blobKey, currentOffset, attempt, maxAttempts), e);
129+
attempt += 1;
130+
if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
131+
failures.add(e);
132+
}
133+
IOUtils.closeWhileHandlingException(currentStream);
134+
currentStream = openStream();
135+
}
136+
137+
@Override
138+
public void close() throws IOException {
139+
currentStream.close();
140+
closed = true;
141+
}
142+
143+
@Override
144+
public long skip(long n) {
145+
throw new UnsupportedOperationException("S3RetryingInputStream does not support seeking");
146+
}
147+
148+
@Override
149+
public void reset() {
150+
throw new UnsupportedOperationException("S3RetryingInputStream does not support seeking");
151+
}
152+
153+
private <T extends Exception> T addSuppressedExceptions(T e) {
154+
for (IOException failure : failures) {
155+
e.addSuppressed(failure);
156+
}
157+
return e;
158+
}
159+
}

plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public AmazonS3Reference client(RepositoryMetaData repositoryMetaData) {
106106
* @param repositoryMetaData Repository Metadata
107107
* @return S3ClientSettings
108108
*/
109-
private S3ClientSettings settings(RepositoryMetaData repositoryMetaData) {
109+
S3ClientSettings settings(RepositoryMetaData repositoryMetaData) {
110110
final String clientName = S3Repository.CLIENT_NAME.get(repositoryMetaData.settings());
111111
final S3ClientSettings staticSettings = staticClientSettings.get(clientName);
112112
if (staticSettings != null) {

0 commit comments

Comments
 (0)