Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,14 @@
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.StorageCredential;
import org.apache.iceberg.io.SupportsRecoveryOperations;
import org.apache.iceberg.io.SupportsStorageCredentials;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
Expand Down Expand Up @@ -80,7 +85,11 @@
* schemes s3a, s3n, https are also treated as s3 file paths. Using this FileIO with other schemes
* will result in {@link org.apache.iceberg.exceptions.ValidationException}.
*/
public class S3FileIO implements CredentialSupplier, DelegateFileIO, SupportsRecoveryOperations {
public class S3FileIO
implements CredentialSupplier,
DelegateFileIO,
SupportsRecoveryOperations,
SupportsStorageCredentials {
private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class);
private static final String DEFAULT_METRICS_IMPL =
"org.apache.iceberg.hadoop.HadoopMetricsContext";
Expand All @@ -96,6 +105,7 @@ public class S3FileIO implements CredentialSupplier, DelegateFileIO, SupportsRec
private MetricsContext metrics = MetricsContext.nullMetrics();
private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
private transient StackTraceElement[] createStack;
private List<StorageCredential> storageCredentials = ImmutableList.of();

/**
* No-arg constructor to load the FileIO dynamically.
Expand Down Expand Up @@ -422,7 +432,13 @@ public String getCredential() {
@Override
public void initialize(Map<String, String> props) {
this.properties = SerializableMap.copyOf(props);
this.s3FileIOProperties = new S3FileIOProperties(properties);
Map<String, String> propertiesWithCredentials =
ImmutableMap.<String, String>builder()
.putAll(properties)
.putAll(storageCredentialConfig())
.buildKeepingLast();

this.s3FileIOProperties = new S3FileIOProperties(propertiesWithCredentials);
this.createStack =
PropertyUtil.propertyAsBoolean(props, "init-creation-stacktrace", true)
? Thread.currentThread().getStackTrace()
Expand Down Expand Up @@ -547,4 +563,28 @@ private boolean recoverObject(ObjectVersion version, String bucket) {

return true;
}

@Override
public void setCredentials(List<StorageCredential> credentials) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I observe, the this.storageCredentials can be wrapped in ImmutableList.copyOf. This way, credentials() doesn't have to create a copy every time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will not work with Kryo ser/de because Kryo will try to add elements back after deserialization into the list and then fail

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the getter can be written this way (with additional field called copy):
first time getter assigns ImmutableList.copyOf() to copy field

subsequently copy field is returned directly until setter is called

Preconditions.checkArgument(credentials != null, "Invalid storage credentials: null");
// copy credentials into a modifiable collection for Kryo serde
this.storageCredentials = Lists.newArrayList(credentials);
}

@Override
public List<StorageCredential> credentials() {
return ImmutableList.copyOf(storageCredentials);
}

private Map<String, String> storageCredentialConfig() {
List<StorageCredential> s3Credentials =
storageCredentials.stream()
.filter(c -> c.prefix().startsWith("s3"))
.collect(Collectors.toList());

Preconditions.checkState(
s3Credentials.size() <= 1, "Invalid S3 Credentials: only one S3 credential should exist");

return s3Credentials.isEmpty() ? Map.of() : s3Credentials.get(0).config();
}
}
185 changes: 185 additions & 0 deletions aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@
import org.apache.iceberg.io.FileIOParser;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.io.ImmutableStorageCredential;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.io.StorageCredential;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand All @@ -77,6 +79,8 @@
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializableSupplier;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
Expand Down Expand Up @@ -455,6 +459,29 @@ public void testS3FileIOWithEmptyPropsKryoSerialization() throws IOException {
assertThat(roundTripSerializedFileIO.properties()).isEqualTo(testS3FileIO.properties());
}

@Test
public void fileIOWithStorageCredentialsKryoSerialization() throws IOException {
S3FileIO fileIO = new S3FileIO();
fileIO.setCredentials(
ImmutableList.of(StorageCredential.create("prefix", Map.of("key1", "val1"))));
fileIO.initialize(Map.of());

assertThat(TestHelpers.KryoHelpers.roundTripSerialize(fileIO).credentials())
.isEqualTo(fileIO.credentials());
}

@Test
public void fileIOWithStorageCredentialsJavaSerialization()
throws IOException, ClassNotFoundException {
S3FileIO fileIO = new S3FileIO();
fileIO.setCredentials(
ImmutableList.of(StorageCredential.create("prefix", Map.of("key1", "val1"))));
fileIO.initialize(Map.of());

assertThat(TestHelpers.roundTripSerialize(fileIO).credentials())
.isEqualTo(fileIO.credentials());
}

@Test
public void testS3FileIOJavaSerialization() throws IOException, ClassNotFoundException {
FileIO testS3FileIO = new S3FileIO();
Expand Down Expand Up @@ -539,6 +566,164 @@ public void testInputFileWithManifest() throws IOException {
verify(s3mock, never()).headObject(any(HeadObjectRequest.class));
}

@Test
public void resolvingFileIOLoadWithStorageCredentials()
throws IOException, ClassNotFoundException {
StorageCredential credential = StorageCredential.create("prefix", Map.of("key1", "val1"));
List<StorageCredential> storageCredentials = ImmutableList.of(credential);
ResolvingFileIO resolvingFileIO = new ResolvingFileIO();
resolvingFileIO.setCredentials(storageCredentials);
resolvingFileIO.initialize(ImmutableMap.of());

FileIO result =
DynMethods.builder("io")
.hiddenImpl(ResolvingFileIO.class, String.class)
.build(resolvingFileIO)
.invoke("s3://foo/bar");
assertThat(result)
.isInstanceOf(S3FileIO.class)
.asInstanceOf(InstanceOfAssertFactories.type(S3FileIO.class))
.extracting(S3FileIO::credentials)
.isEqualTo(storageCredentials);

// make sure credentials are still present after kryo serde
ResolvingFileIO io = TestHelpers.KryoHelpers.roundTripSerialize(resolvingFileIO);
assertThat(io.credentials()).isEqualTo(storageCredentials);
result =
DynMethods.builder("io")
.hiddenImpl(ResolvingFileIO.class, String.class)
.build(io)
.invoke("s3://foo/bar");
assertThat(result)
.isInstanceOf(S3FileIO.class)
.asInstanceOf(InstanceOfAssertFactories.type(S3FileIO.class))
.extracting(S3FileIO::credentials)
.isEqualTo(storageCredentials);

// make sure credentials are still present after java serde
io = TestHelpers.roundTripSerialize(resolvingFileIO);
assertThat(io.credentials()).isEqualTo(storageCredentials);
result =
DynMethods.builder("io")
.hiddenImpl(ResolvingFileIO.class, String.class)
.build(io)
.invoke("s3://foo/bar");
assertThat(result)
.isInstanceOf(S3FileIO.class)
.asInstanceOf(InstanceOfAssertFactories.type(S3FileIO.class))
.extracting(S3FileIO::credentials)
.isEqualTo(storageCredentials);
}

@Test
public void noStorageCredentialConfigured() {
S3FileIO fileIO = new S3FileIO();
fileIO.setCredentials(ImmutableList.of());
fileIO.initialize(
ImmutableMap.of(
"s3.access-key-id",
"keyIdFromProperties",
"s3.secret-access-key",
"accessKeyFromProperties",
"s3.session-token",
"sessionTokenFromProperties"));

ObjectAssert<S3FileIOProperties> s3FileIOProperties =
assertThat(fileIO)
.extracting("s3FileIOProperties")
.asInstanceOf(InstanceOfAssertFactories.type(S3FileIOProperties.class));
s3FileIOProperties.extracting(S3FileIOProperties::accessKeyId).isEqualTo("keyIdFromProperties");
s3FileIOProperties
.extracting(S3FileIOProperties::secretAccessKey)
.isEqualTo("accessKeyFromProperties");
s3FileIOProperties
.extracting(S3FileIOProperties::sessionToken)
.isEqualTo("sessionTokenFromProperties");
}

@Test
public void singleStorageCredentialConfigured() {
StorageCredential s3Credential =
ImmutableStorageCredential.builder()
.prefix("s3://custom-uri")
.config(
ImmutableMap.of(
"s3.access-key-id",
"keyIdFromCredential",
"s3.secret-access-key",
"accessKeyFromCredential",
"s3.session-token",
"sessionTokenFromCredential"))
.build();

S3FileIO fileIO = new S3FileIO();
fileIO.setCredentials(ImmutableList.of(s3Credential));
fileIO.initialize(
ImmutableMap.of(
"s3.access-key-id",
"keyIdFromProperties",
"s3.secret-access-key",
"accessKeyFromProperties",
"s3.session-token",
"sessionTokenFromProperties"));

ObjectAssert<S3FileIOProperties> s3FileIOProperties =
assertThat(fileIO)
.extracting("s3FileIOProperties")
.asInstanceOf(InstanceOfAssertFactories.type(S3FileIOProperties.class));
s3FileIOProperties.extracting(S3FileIOProperties::accessKeyId).isEqualTo("keyIdFromCredential");
s3FileIOProperties
.extracting(S3FileIOProperties::secretAccessKey)
.isEqualTo("accessKeyFromCredential");
s3FileIOProperties
.extracting(S3FileIOProperties::sessionToken)
.isEqualTo("sessionTokenFromCredential");
}

@Test
public void multipleStorageCredentialsConfigured() {
StorageCredential s3Credential1 =
ImmutableStorageCredential.builder()
.prefix("s3://custom-uri/1")
.config(
ImmutableMap.of(
"s3.access-key-id",
"keyIdFromCredential1",
"s3.secret-access-key",
"accessKeyFromCredential1",
"s3.session-token",
"sessionTokenFromCredential1"))
.build();

StorageCredential s3Credential2 =
ImmutableStorageCredential.builder()
.prefix("s3://custom-uri/2")
.config(
ImmutableMap.of(
"s3.access-key-id",
"keyIdFromCredential2",
"s3.secret-access-key",
"accessKeyFromCredential2",
"s3.session-token",
"sessionTokenFromCredential2"))
.build();

S3FileIO fileIO = new S3FileIO();
fileIO.setCredentials(ImmutableList.of(s3Credential1, s3Credential2));
assertThatThrownBy(
() ->
fileIO.initialize(
ImmutableMap.of(
"s3.access-key-id",
"keyIdFromProperties",
"s3.secret-access-key",
"accessKeyFromProperties",
"s3.session-token",
"sessionTokenFromProperties")))
.isInstanceOf(IllegalStateException.class)
.hasMessage("Invalid S3 Credentials: only one S3 credential should exist");
}

private void createRandomObjects(String prefix, int count) {
S3URI s3URI = new S3URI(prefix);

Expand Down
33 changes: 33 additions & 0 deletions core/src/main/java/org/apache/iceberg/CatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.StorageCredential;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.io.SupportsStorageCredentials;
import org.apache.iceberg.metrics.LoggingMetricsReporter;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;
Expand Down Expand Up @@ -343,6 +346,33 @@ public static Catalog buildIcebergCatalog(String name, Map<String, String> optio
* loaded class cannot be cast to the given interface type
*/
public static FileIO loadFileIO(String impl, Map<String, String> properties, Object hadoopConf) {
return loadFileIO(impl, properties, hadoopConf, ImmutableList.of());
}

/**
* Load a custom {@link FileIO} implementation.
*
* <p>The implementation must have a no-arg constructor. If the class implements Configurable, a
* Hadoop config will be passed using Configurable.setConf. If the class implements {@link
* SupportsStorageCredentials}, the storage credentials will be passed using {@link
* SupportsStorageCredentials#setCredentials(List)}. {@link FileIO#initialize(Map properties)} is
* called to complete the initialization.
*
* @param impl full class name of a custom FileIO implementation
* @param properties used to initialize the FileIO implementation
* @param hadoopConf a hadoop Configuration
* @param storageCredentials the storage credentials to configure if the FileIO implementation
* implements {@link SupportsStorageCredentials}
* @return FileIO class
* @throws IllegalArgumentException if class path not found or right constructor not found or the
* loaded class cannot be cast to the given interface type
*/
@SuppressWarnings("unchecked")
public static FileIO loadFileIO(
String impl,
Map<String, String> properties,
Object hadoopConf,
List<StorageCredential> storageCredentials) {
LOG.info("Loading custom FileIO implementation: {}", impl);
DynConstructors.Ctor<FileIO> ctor;
try {
Expand All @@ -365,6 +395,9 @@ public static FileIO loadFileIO(String impl, Map<String, String> properties, Obj
}

configureHadoopConf(fileIO, hadoopConf);
if (fileIO instanceof SupportsStorageCredentials) {
((SupportsStorageCredentials) fileIO).setCredentials(storageCredentials);
}

fileIO.initialize(properties);
return fileIO;
Expand Down
Loading