Skip to content

Commit ce329c0

Browse files
bjaroszefhussonnois
authored andcommitted
perf(plugin): optimize aws s3 file listing
Lower number of requests sent to S3 while listing objects. For buckets with large number of objects, listing can take some time, because there is a request for metadata sent to s3 for each object in the bucket. This is redundant, because all data needed for objects listing are available in S3ObjectSummary. This reduce listing time significantly. Resolves: #490
1 parent d8e8df8 commit ce329c0

File tree

2 files changed

+28
-7
lines changed

2 files changed

+28
-7
lines changed

connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/AmazonS3FileSystemListing.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,6 @@ public Collection<FileObjectMeta> listObjects() {
9595

9696
objectMetaList.addAll(objectListing.getObjectSummaries()
9797
.stream()
98-
.map(s3ObjectSummary ->
99-
new S3BucketKey(
100-
s3ObjectSummary.getBucketName(),
101-
s3ObjectSummary.getKey()
102-
))
10398
.map(s3Storage::getObjectMetadata)
10499
.filter(Objects::nonNull)
105100
.collect(Collectors.toList()));

connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/AmazonS3Storage.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
2828
import com.amazonaws.services.s3.model.GetObjectRequest;
2929
import com.amazonaws.services.s3.model.ObjectMetadata;
30+
import com.amazonaws.services.s3.model.S3ObjectSummary;
3031
import com.amazonaws.services.s3.model.StorageClass;
3132
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
3233
import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta;
@@ -186,6 +187,13 @@ public FileObjectMeta getObjectMetadata(final S3BucketKey s3Object) {
186187
);
187188
}
188189

190+
public FileObjectMeta getObjectMetadata(final S3ObjectSummary s3ObjectSummary) {
191+
return createFileObjectMeta(
192+
new S3BucketKey(s3ObjectSummary.getBucketName(), s3ObjectSummary.getKey()),
193+
s3ObjectSummary
194+
);
195+
}
196+
189197
private ObjectMetadata loadObjectMetadata(final S3BucketKey s3Object) {
190198
var request = new GetObjectMetadataRequest(s3Object.bucketName(), s3Object.key());
191199
try {
@@ -264,11 +272,11 @@ private static FileObjectMeta createFileObjectMeta(final S3BucketKey s3Object,
264272
userDefinedMetadata.put("s3.object.summary.etag", objectMetadata.getETag());
265273
userDefinedMetadata.put("s3.object.summary.storageClass", objectMetadata.getStorageClass());
266274

267-
final String contentMD5 = objectMetadata.getContentMD5();
275+
final String contentMD5 = objectMetadata.getETag();
268276

269277
FileObjectMeta.ContentDigest digest = null;
270278
if (contentMD5 != null) {
271-
digest = new FileObjectMeta.ContentDigest(contentMD5, "MD5");
279+
digest = new FileObjectMeta.ContentDigest(contentMD5, "ETAG");
272280
}
273281

274282
return new GenericFileObjectMeta.Builder()
@@ -280,4 +288,22 @@ private static FileObjectMeta createFileObjectMeta(final S3BucketKey s3Object,
280288
.withUserDefinedMetadata(userDefinedMetadata)
281289
.build();
282290
}
291+
292+
private static FileObjectMeta createFileObjectMeta(final S3BucketKey s3Object,
293+
final S3ObjectSummary s3ObjectSummary) {
294+
final String contentMD5 = s3ObjectSummary.getETag();
295+
296+
FileObjectMeta.ContentDigest digest = null;
297+
if (contentMD5 != null) {
298+
digest = new FileObjectMeta.ContentDigest(contentMD5, "ETAG");
299+
}
300+
301+
return new GenericFileObjectMeta.Builder()
302+
.withUri(s3Object.toURI())
303+
.withName(s3ObjectSummary.getKey())
304+
.withContentLength(s3ObjectSummary.getSize())
305+
.withLastModified(s3ObjectSummary.getLastModified())
306+
.withContentDigest(digest)
307+
.build();
308+
}
283309
}

0 commit comments

Comments
 (0)