Skip to content

Commit b2b1453

Browse files
Modularize federation (Option 2) (#2332)
* Modularize federation (Option 2) * Move polaris-extensions-federation-hadoop dependency * Change identifier to lowerCase * Change identifiers to constants
1 parent 22e4c68 commit b2b1453

File tree

11 files changed

+303
-52
lines changed

11 files changed

+303
-52
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
plugins {
21+
id("polaris-client")
22+
alias(libs.plugins.jandex)
23+
}
24+
25+
dependencies {
26+
// Polaris dependencies
27+
implementation(project(":polaris-core"))
28+
29+
implementation(platform(libs.iceberg.bom))
30+
implementation("org.apache.iceberg:iceberg-api")
31+
implementation("org.apache.iceberg:iceberg-core")
32+
implementation("org.apache.iceberg:iceberg-common")
33+
34+
// Hadoop dependencies (for Hadoop catalog support)
35+
implementation(libs.hadoop.common) {
36+
exclude("org.slf4j", "slf4j-reload4j")
37+
exclude("org.slf4j", "slf4j-log4j12")
38+
exclude("ch.qos.reload4j", "reload4j")
39+
exclude("log4j", "log4j")
40+
exclude("org.apache.zookeeper", "zookeeper")
41+
exclude("org.apache.hadoop.thirdparty", "hadoop-shaded-protobuf_3_25")
42+
exclude("com.github.pjfanning", "jersey-json")
43+
exclude("com.sun.jersey", "jersey-core")
44+
exclude("com.sun.jersey", "jersey-server")
45+
exclude("com.sun.jersey", "jersey-servlet")
46+
exclude("io.dropwizard.metrics", "metrics-core")
47+
}
48+
implementation(libs.hadoop.client.api)
49+
implementation(libs.hadoop.client.runtime)
50+
51+
// CDI dependencies for runtime discovery
52+
implementation(libs.jakarta.enterprise.cdi.api)
53+
implementation(libs.smallrye.common.annotation)
54+
55+
// Logging
56+
implementation(libs.slf4j.api)
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.extensions.federation.hadoop;
20+
21+
import io.smallrye.common.annotation.Identifier;
22+
import jakarta.enterprise.context.ApplicationScoped;
23+
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.iceberg.catalog.Catalog;
25+
import org.apache.iceberg.hadoop.HadoopCatalog;
26+
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
27+
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
28+
import org.apache.polaris.core.connection.AuthenticationType;
29+
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
30+
import org.apache.polaris.core.connection.ConnectionType;
31+
import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo;
32+
import org.apache.polaris.core.secrets.UserSecretsManager;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
/** Factory class for creating a Hadoop catalog handle based on connection configuration. */
37+
@ApplicationScoped
38+
@Identifier(ConnectionType.HADOOP_FACTORY_IDENTIFIER)
39+
public class HadoopFederatedCatalogFactory implements ExternalCatalogFactory {
40+
private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFederatedCatalogFactory.class);
41+
42+
@Override
43+
public Catalog createCatalog(
44+
ConnectionConfigInfoDpo connectionConfigInfoDpo, UserSecretsManager userSecretsManager) {
45+
// Currently, Polaris supports Hadoop federation only via IMPLICIT authentication.
46+
// Hence, prior to initializing the configuration, ensure that the catalog uses
47+
// IMPLICIT authentication.
48+
AuthenticationParametersDpo authenticationParametersDpo =
49+
connectionConfigInfoDpo.getAuthenticationParameters();
50+
if (authenticationParametersDpo.getAuthenticationTypeCode()
51+
!= AuthenticationType.IMPLICIT.getCode()) {
52+
throw new IllegalStateException("Hadoop federation only supports IMPLICIT authentication.");
53+
}
54+
Configuration conf = new Configuration();
55+
String warehouse = ((HadoopConnectionConfigInfoDpo) connectionConfigInfoDpo).getWarehouse();
56+
HadoopCatalog hadoopCatalog = new HadoopCatalog(conf, warehouse);
57+
hadoopCatalog.initialize(
58+
warehouse, connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager));
59+
return hadoopCatalog;
60+
}
61+
}

gradle/projects.main.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ polaris-minio-testcontainer=tools/minio-testcontainer
4242
polaris-version=tools/version
4343
polaris-misc-types=tools/misc-types
4444
polaris-persistence-varint=nosql/persistence/varint
45+
polaris-extensions-federation-hadoop=extensions/federation/hadoop
4546

4647
polaris-config-docs-annotations=tools/config-docs/annotations
4748
polaris-config-docs-generator=tools/config-docs/generator
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.core.catalog;
20+
21+
import org.apache.iceberg.catalog.Catalog;
22+
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
23+
import org.apache.polaris.core.secrets.UserSecretsManager;
24+
25+
/**
26+
* Factory interface for creating external catalog handles based on connection configuration.
27+
*
28+
* <p>Implementations should be annotated with CDI annotations and use the @Identifier annotation to
29+
* specify which connection type they support.
30+
*/
31+
public interface ExternalCatalogFactory {
32+
33+
/**
34+
* Creates a catalog handle for the given connection configuration.
35+
*
36+
* @param connectionConfig the connection configuration
37+
* @param userSecretsManager the user secrets manager for handling credentials
38+
* @return the initialized catalog
39+
* @throws IllegalStateException if the connection configuration is invalid
40+
*/
41+
Catalog createCatalog(
42+
ConnectionConfigInfoDpo connectionConfig, UserSecretsManager userSecretsManager);
43+
}

polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ public enum ConnectionType {
3535
HADOOP(2),
3636
;
3737

38+
public static final String ICEBERG_REST_FACTORY_IDENTIFIER = "iceberg_rest";
39+
public static final String HADOOP_FACTORY_IDENTIFIER = "hadoop";
40+
3841
private static final ConnectionType[] REVERSE_MAPPING_ARRAY;
3942

4043
static {
@@ -77,4 +80,22 @@ public enum ConnectionType {
7780
public int getCode() {
7881
return this.code;
7982
}
83+
84+
/**
85+
* Get the factory identifier string used for CDI injection of the appropriate
86+
* ExternalCatalogFactory.
87+
*
88+
* @return the factory identifier string
89+
*/
90+
public String getFactoryIdentifier() {
91+
switch (this) {
92+
case ICEBERG_REST:
93+
return ICEBERG_REST_FACTORY_IDENTIFIER;
94+
case HADOOP:
95+
return HADOOP_FACTORY_IDENTIFIER;
96+
default:
97+
throw new UnsupportedOperationException(
98+
"No factory identifier for connection type: " + this);
99+
}
100+
}
80101
}

runtime/server/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ dependencies {
4848
runtimeOnly("org.postgresql:postgresql")
4949
runtimeOnly(project(":polaris-relational-jdbc"))
5050
runtimeOnly("io.quarkus:quarkus-jdbc-postgresql")
51+
runtimeOnly(project(":polaris-extensions-federation-hadoop"))
5152

5253
// enforce the Quarkus _platform_ here, to get a consistent and validated set of dependencies
5354
implementation(enforcedPlatform(libs.quarkus.bom))

runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import com.google.common.collect.ImmutableMap;
2828
import com.google.common.collect.ImmutableSet;
2929
import jakarta.enterprise.context.RequestScoped;
30+
import jakarta.enterprise.inject.Any;
31+
import jakarta.enterprise.inject.Instance;
3032
import jakarta.inject.Inject;
3133
import jakarta.ws.rs.WebApplicationException;
3234
import jakarta.ws.rs.core.HttpHeaders;
@@ -61,6 +63,7 @@
6163
import org.apache.iceberg.rest.responses.LoadTableResponse;
6264
import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
6365
import org.apache.polaris.core.auth.PolarisAuthorizer;
66+
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
6467
import org.apache.polaris.core.config.RealmConfig;
6568
import org.apache.polaris.core.context.CallContext;
6669
import org.apache.polaris.core.context.RealmContext;
@@ -146,6 +149,7 @@ public class IcebergCatalogAdapter
146149
private final CatalogPrefixParser prefixParser;
147150
private final ReservedProperties reservedProperties;
148151
private final CatalogHandlerUtils catalogHandlerUtils;
152+
private final Instance<ExternalCatalogFactory> externalCatalogFactories;
149153

150154
@Inject
151155
public IcebergCatalogAdapter(
@@ -159,7 +163,8 @@ public IcebergCatalogAdapter(
159163
PolarisAuthorizer polarisAuthorizer,
160164
CatalogPrefixParser prefixParser,
161165
ReservedProperties reservedProperties,
162-
CatalogHandlerUtils catalogHandlerUtils) {
166+
CatalogHandlerUtils catalogHandlerUtils,
167+
@Any Instance<ExternalCatalogFactory> externalCatalogFactories) {
163168
this.realmContext = realmContext;
164169
this.callContext = callContext;
165170
this.realmConfig = callContext.getRealmConfig();
@@ -172,6 +177,7 @@ public IcebergCatalogAdapter(
172177
this.prefixParser = prefixParser;
173178
this.reservedProperties = reservedProperties;
174179
this.catalogHandlerUtils = catalogHandlerUtils;
180+
this.externalCatalogFactories = externalCatalogFactories;
175181
}
176182

177183
/**
@@ -208,7 +214,8 @@ IcebergCatalogHandler newHandlerWrapper(SecurityContext securityContext, String
208214
catalogName,
209215
polarisAuthorizer,
210216
reservedProperties,
211-
catalogHandlerUtils);
217+
catalogHandlerUtils,
218+
externalCatalogFactories);
212219
}
213220

214221
@Override

runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java

Lines changed: 20 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import com.google.common.base.Preconditions;
2222
import com.google.common.collect.Maps;
23+
import io.smallrye.common.annotation.Identifier;
2324
import jakarta.annotation.Nonnull;
25+
import jakarta.enterprise.inject.Instance;
2426
import jakarta.ws.rs.core.SecurityContext;
2527
import java.io.Closeable;
2628
import java.time.OffsetDateTime;
@@ -33,7 +35,6 @@
3335
import java.util.Optional;
3436
import java.util.Set;
3537
import java.util.stream.Collectors;
36-
import org.apache.hadoop.conf.Configuration;
3738
import org.apache.iceberg.BaseMetadataTable;
3839
import org.apache.iceberg.BaseTable;
3940
import org.apache.iceberg.MetadataUpdate;
@@ -46,7 +47,6 @@
4647
import org.apache.iceberg.UpdateRequirement;
4748
import org.apache.iceberg.catalog.Catalog;
4849
import org.apache.iceberg.catalog.Namespace;
49-
import org.apache.iceberg.catalog.SessionCatalog;
5050
import org.apache.iceberg.catalog.SupportsNamespaces;
5151
import org.apache.iceberg.catalog.TableIdentifier;
5252
import org.apache.iceberg.catalog.ViewCatalog;
@@ -55,9 +55,6 @@
5555
import org.apache.iceberg.exceptions.CommitFailedException;
5656
import org.apache.iceberg.exceptions.ForbiddenException;
5757
import org.apache.iceberg.exceptions.NoSuchTableException;
58-
import org.apache.iceberg.hadoop.HadoopCatalog;
59-
import org.apache.iceberg.rest.HTTPClient;
60-
import org.apache.iceberg.rest.RESTCatalog;
6158
import org.apache.iceberg.rest.credentials.ImmutableCredential;
6259
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
6360
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
@@ -76,13 +73,10 @@
7673
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
7774
import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
7875
import org.apache.polaris.core.auth.PolarisAuthorizer;
76+
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
7977
import org.apache.polaris.core.config.FeatureConfiguration;
80-
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
81-
import org.apache.polaris.core.connection.AuthenticationType;
8278
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
8379
import org.apache.polaris.core.connection.ConnectionType;
84-
import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo;
85-
import org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo;
8680
import org.apache.polaris.core.context.CallContext;
8781
import org.apache.polaris.core.entity.CatalogEntity;
8882
import org.apache.polaris.core.entity.PolarisEntitySubType;
@@ -132,6 +126,8 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab
132126
private final ReservedProperties reservedProperties;
133127
private final CatalogHandlerUtils catalogHandlerUtils;
134128

129+
private final Instance<ExternalCatalogFactory> externalCatalogFactories;
130+
135131
// Catalog instance will be initialized after authorizing resolver successfully resolves
136132
// the catalog entity.
137133
protected Catalog baseCatalog = null;
@@ -151,13 +147,15 @@ public IcebergCatalogHandler(
151147
String catalogName,
152148
PolarisAuthorizer authorizer,
153149
ReservedProperties reservedProperties,
154-
CatalogHandlerUtils catalogHandlerUtils) {
150+
CatalogHandlerUtils catalogHandlerUtils,
151+
Instance<ExternalCatalogFactory> externalCatalogFactories) {
155152
super(callContext, resolutionManifestFactory, securityContext, catalogName, authorizer);
156153
this.metaStoreManager = metaStoreManager;
157154
this.userSecretsManager = userSecretsManager;
158155
this.catalogFactory = catalogFactory;
159156
this.reservedProperties = reservedProperties;
160157
this.catalogHandlerUtils = catalogHandlerUtils;
158+
this.externalCatalogFactories = externalCatalogFactories;
161159
}
162160

163161
/**
@@ -220,42 +218,18 @@ protected void initializeCatalog() {
220218
ConnectionType connectionType =
221219
ConnectionType.fromCode(connectionConfigInfoDpo.getConnectionTypeCode());
222220

223-
switch (connectionType) {
224-
case ICEBERG_REST:
225-
SessionCatalog.SessionContext context = SessionCatalog.SessionContext.createEmpty();
226-
federatedCatalog =
227-
new RESTCatalog(
228-
context,
229-
(config) ->
230-
HTTPClient.builder(config)
231-
.uri(config.get(org.apache.iceberg.CatalogProperties.URI))
232-
.build());
233-
federatedCatalog.initialize(
234-
((IcebergRestConnectionConfigInfoDpo) connectionConfigInfoDpo).getRemoteCatalogName(),
235-
connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager()));
236-
break;
237-
case HADOOP:
238-
// Currently, Polaris supports Hadoop federation only via IMPLICIT authentication.
239-
// Hence, prior to initializing the configuration, ensure that the catalog uses
240-
// IMPLICIT authentication.
241-
AuthenticationParametersDpo authenticationParametersDpo =
242-
connectionConfigInfoDpo.getAuthenticationParameters();
243-
if (authenticationParametersDpo.getAuthenticationTypeCode()
244-
!= AuthenticationType.IMPLICIT.getCode()) {
245-
throw new IllegalStateException(
246-
"Hadoop federation only supports IMPLICIT authentication.");
247-
}
248-
Configuration conf = new Configuration();
249-
String warehouse =
250-
((HadoopConnectionConfigInfoDpo) connectionConfigInfoDpo).getWarehouse();
251-
federatedCatalog = new HadoopCatalog(conf, warehouse);
252-
federatedCatalog.initialize(
253-
warehouse,
254-
connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager()));
255-
break;
256-
default:
257-
throw new UnsupportedOperationException(
258-
"Connection type not supported: " + connectionType);
221+
// Use the unified factory pattern for all external catalog types
222+
Instance<ExternalCatalogFactory> externalCatalogFactory =
223+
externalCatalogFactories.select(
224+
Identifier.Literal.of(connectionType.getFactoryIdentifier()));
225+
if (externalCatalogFactory.isResolvable()) {
226+
federatedCatalog =
227+
externalCatalogFactory
228+
.get()
229+
.createCatalog(connectionConfigInfoDpo, getUserSecretsManager());
230+
} else {
231+
throw new UnsupportedOperationException(
232+
"External catalog factory for type '" + connectionType + "' is unavailable.");
259233
}
260234
this.baseCatalog = federatedCatalog;
261235
} else {

0 commit comments

Comments
 (0)