diff --git a/polaris-synchronizer/README.md b/polaris-synchronizer/README.md index fb9e4f6..6ac82df 100644 --- a/polaris-synchronizer/README.md +++ b/polaris-synchronizer/README.md @@ -61,8 +61,7 @@ with 10 concurrent catalog setup threads: java -jar cli/build/libs/polaris-synchronizer-cli.jar create-omnipotent-principal \ --polaris-api-connection-properties base-url=http://localhost:8181 \ --polaris-api-connection-properties oauth2-server-uri=http://localhost:8181/api/catalog/v1/oauth/tokens \ ---polaris-api-connection-properties client-id=root \ ---polaris-api-connection-properties client-secret= \ +--polaris-api-connection-properties credential=: \ --polaris-api-connection-properties scope=PRINCIPAL_ROLE:ALL \ --replace \ # replace it if it already exists --concurrency 10 # 10 concurrent catalog setup threads @@ -105,8 +104,7 @@ java -jar cli/build/libs/polaris-synchronizer-cli.jar \ create-omnipotent-principal \ --polaris-api-connection-properties base-url=http://localhost:8181 \ --polaris-api-connection-properties oauth2-server-uri=http://localhost:8181/api/catalog/v1/oauth/tokens \ ---polaris-api-connection-properties client-id=root \ ---polaris-api-connection-properties client-secret= \ +--polaris-api-connection-properties credential=: \ --polaris-api-connection-properties scope=PRINCIPAL_ROLE:ALL \ --replace \ # replace if it already exists --concurrency 10 \ # 10 concurrent catalog setup threads @@ -143,22 +141,18 @@ diff between the source and target Polaris instances. This can be achieved using > Polaris instance. The new credentials will be logged to stdout, ONLY for each newly created or overwritten principal. > Please note that this output should be securely managed, client credentials should only ever be stored in a secure vault. -**Example** Running the synchronization between source Polaris instance using an access token, and a target Polaris instance +**Example** Running the synchronization between source Polaris instance using a bearer token, and a target Polaris instance using client credentials. ``` java -jar cli/build/libs/polaris-synchronizer-cli.jar sync-polaris \ --source-properties base-url=http://localhost:8181 \ ---source-properties client-id=root \ ---source-properties client-secret= \ ---source-properties oauth2-server-uri=http://localhost:8181/api/catalog/v1/oauth/tokens \ ---source-properties scope=PRINCIPAL_ROLE:ALL \ +--source-properties token= \ --source-properties omnipotent-principal-name=omnipotent-principal-XXXXX \ --source-properties omnipotent-principal-client-id=589550e8b23d271e \ --source-properties omnipotent-principal-client-secret= \ --source-properties omnipotent-principal-oauth2-server-uri=http://localhost:8181/api/catalog/v1/oauth/tokens \ --target-properties base-url=http://localhost:5858 \ ---target-properties client-id=root \ ---target-properties client-secret= \ +--target-properties credential=: \ --target-properties oauth2-server-uri=http://localhost:5858/api/catalog/v1/oauth/tokens \ --target-properties scope=PRINCIPAL_ROLE:ALL \ --target-properties omnipotent-principal-name=omnipotent-principal-YYYYY \ diff --git a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/PolarisSynchronizer.java b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/PolarisSynchronizer.java index c6a2cc4..a521ecf 100644 --- a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/PolarisSynchronizer.java +++ b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/PolarisSynchronizer.java @@ -630,41 +630,30 @@ public void syncCatalogs() { } for (Catalog catalog : catalogSyncPlan.entitiesToSyncChildren()) { - IcebergCatalogService sourceIcebergCatalogService; - try { - sourceIcebergCatalogService = source.initializeIcebergCatalogService(catalog.getName()); + try (IcebergCatalogService sourceIcebergCatalogService = source.initializeIcebergCatalogService(catalog.getName())) { clientLogger.info( - "Initialized Iceberg REST catalog for Polaris catalog {} on source.", - catalog.getName()); - } catch (Exception e) { - if (haltOnFailure) throw e; - clientLogger.error( - "Failed to initialize Iceberg REST catalog for Polaris catalog {} on source.", - catalog.getName(), - e); - continue; - } + "Initialized Iceberg REST catalog for Polaris catalog {} on source.", + catalog.getName()); - IcebergCatalogService targetIcebergCatalogService; + try (IcebergCatalogService targetIcebergCatalogService = target.initializeIcebergCatalogService(catalog.getName())) { + clientLogger.info( + "Initialized Iceberg REST catalog for Polaris catalog {} on target.", + catalog.getName()); + + syncNamespaces( + catalog.getName(), Namespace.empty(), sourceIcebergCatalogService, targetIcebergCatalogService); + } - try { - targetIcebergCatalogService = target.initializeIcebergCatalogService(catalog.getName()); - clientLogger.info( - "Initialized Iceberg REST catalog for Polaris catalog {} on target.", - catalog.getName()); } catch (Exception e) { - if (haltOnFailure) throw e; clientLogger.error( - "Failed to initialize Iceberg REST catalog for Polaris catalog {} on target.", - catalog.getName(), - e); + "Failed to synchronize Iceberg REST catalog for Polaris catalog {}.", + catalog.getName(), + e); + if (haltOnFailure) throw new RuntimeException(e); continue; } - syncNamespaces( - catalog.getName(), Namespace.empty(), sourceIcebergCatalogService, targetIcebergCatalogService); - // NOTE: Grants are synced on a per catalog role basis, so we need to ensure that catalog roles // are only synced AFTER Iceberg catalog entities, because they may depend on the Iceberg catalog // entities already existing diff --git a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/auth/AuthenticationSessionWrapper.java b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/auth/AuthenticationSessionWrapper.java new file mode 100644 index 0000000..5ce0304 --- /dev/null +++ b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/auth/AuthenticationSessionWrapper.java @@ -0,0 +1,90 @@ +package org.apache.polaris.tools.sync.polaris.auth; + +import org.apache.iceberg.rest.HTTPClient; +import org.apache.iceberg.rest.RESTClient; +import org.apache.iceberg.rest.auth.AuthConfig; +import org.apache.iceberg.rest.auth.OAuth2Properties; +import org.apache.iceberg.rest.auth.OAuth2Util; +import org.apache.iceberg.util.ThreadPools; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; + +/** + * Wraps {@link OAuth2Util.AuthSession} to provide supported authentication flows. + */ +public class AuthenticationSessionWrapper implements Closeable { + + private final RESTClient restClient; + + private final OAuth2Util.AuthSession authSession; + + private final ScheduledExecutorService executor; + + public AuthenticationSessionWrapper(Map properties) { + this.restClient = HTTPClient.builder(Map.of()) + .uri(properties.get(OAuth2Properties.OAUTH2_SERVER_URI)) + .build(); + this.authSession = this.newAuthSession(this.restClient, properties); + this.executor = ThreadPools.newScheduledPool(UUID.randomUUID() + "-token-refresh", 1); + } + + /** + * Initializes a new authentication session. Supports client_credentials and bearer token flow. + * @param properties properties to initialize the session with + * @return an authentication session, with token refresh if applicable + */ + private OAuth2Util.AuthSession newAuthSession(RESTClient restClient, Map properties) { + OAuth2Util.AuthSession parent = new OAuth2Util.AuthSession( + Map.of(), + AuthConfig.builder() + .scope(properties.get(OAuth2Properties.SCOPE)) + .oauth2ServerUri(properties.get(OAuth2Properties.OAUTH2_SERVER_URI)) + .optionalOAuthParams(OAuth2Util.buildOptionalParam(properties)) + .build() + ); + + // This is for client_credentials flow + if (properties.containsKey(OAuth2Properties.CREDENTIAL)) { + return OAuth2Util.AuthSession.fromCredential( + restClient, + // threads created here will be daemon threads, so termination of main program + // will terminate the token refresh thread automatically + this.executor, + properties.get(OAuth2Properties.CREDENTIAL), + parent + ); + } + + // This is for regular bearer token flow + if (properties.containsKey(OAuth2Properties.TOKEN)) { + return OAuth2Util.AuthSession.fromAccessToken( + restClient, + // threads created here will be daemon threads, so termination of main program + // will terminate the token refresh thread automatically + this.executor, + properties.get(OAuth2Properties.TOKEN), + null, /* defaultExpiresAtMillis */ + parent + ); + } + + throw new IllegalArgumentException("Unable to construct authenticated session with the provided properties."); + } + + /** + * Get refreshed authentication headers for session. + */ + public Map getSessionHeaders() { + return this.authSession.headers(); + } + + @Override + public void close() throws IOException { + try (restClient; executor) {} + } + +} diff --git a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/ETagManager.java b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/ETagManager.java index 832d75c..bd6fcf5 100644 --- a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/ETagManager.java +++ b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/ETagManager.java @@ -26,7 +26,7 @@ * Generic interface to provide and store ETags for tables within catalogs. This allows the storage * of the ETag to be completely independent from the tool. */ -public interface ETagManager { +public interface ETagManager extends AutoCloseable { /** * Used to initialize the instance for use. Should be called prior to diff --git a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/NoOpETagManager.java b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/NoOpETagManager.java index 1bd1499..a17cbf4 100644 --- a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/NoOpETagManager.java +++ b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/NoOpETagManager.java @@ -35,4 +35,8 @@ public String getETag(String catalogName, TableIdentifier tableIdentifier) { @Override public void storeETag(String catalogName, TableIdentifier tableIdentifier, String etag) {} + + @Override + public void close() {} + } diff --git a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/PolarisCatalog.java b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/PolarisCatalog.java index 5c489ae..1e76e34 100644 --- a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/PolarisCatalog.java +++ b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/PolarisCatalog.java @@ -40,7 +40,7 @@ import org.apache.iceberg.rest.ResourcePaths; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.LoadTableResponseParser; -import org.apache.polaris.tools.sync.polaris.http.OAuth2Util; +import org.apache.polaris.tools.sync.polaris.auth.AuthenticationSessionWrapper; /** * Overrides loadTable default implementation to issue a custom loadTable request to the Polaris @@ -58,14 +58,14 @@ public class PolarisCatalog extends RESTCatalog private Map properties = null; - private String accessToken = null; - private HttpClient httpClient = null; private ObjectMapper objectMapper = null; private ResourcePaths resourcePaths = null; + private AuthenticationSessionWrapper authenticationSession = null; + public PolarisCatalog() { super(); } @@ -82,22 +82,8 @@ public void initialize(String name, Map props) { super.initialize(name, props); - if (accessToken == null || httpClient == null || this.objectMapper == null) { - String oauth2ServerUri = props.get("uri") + "/v1/oauth/tokens"; - String credential = props.get("credential"); - - String clientId = credential.split(":")[0]; - String clientSecret = credential.split(":")[1]; - - String scope = props.get("scope"); - - // TODO: Add token refresh - try { - this.accessToken = OAuth2Util.fetchToken(oauth2ServerUri, clientId, clientSecret, scope); - } catch (IOException e) { - throw new RuntimeException(e); - } - + if (authenticationSession == null || httpClient == null || this.objectMapper == null) { + this.authenticationSession = new AuthenticationSessionWrapper(this.properties); this.httpClient = HttpClient.newBuilder().build(); this.objectMapper = new ObjectMapper(); } @@ -127,9 +113,10 @@ public Table loadTable(TableIdentifier ident, String etag) { HttpRequest.Builder requestBuilder = HttpRequest.newBuilder() .uri(URI.create(tablePath)) - .header(HttpHeaders.AUTHORIZATION, "Bearer " + accessToken) .GET(); + this.authenticationSession.getSessionHeaders().forEach(requestBuilder::header); + // specify last known etag in if-none-match header if (etag != null) { requestBuilder.header(HttpHeaders.IF_NONE_MATCH, etag); @@ -171,4 +158,21 @@ public Table loadTable(TableIdentifier ident, String etag) { return new BaseTable(ops, CatalogUtil.fullTableName(catalogName, ident)); } + + @Override + public void close() throws IOException { + final AuthenticationSessionWrapper session = this.authenticationSession; + final HttpClient httpClient = this.httpClient; + + try (session; httpClient) { + super.close(); + } finally { + this.authenticationSession = null; + this.httpClient = null; + this.objectMapper = null; + this.resourcePaths = null; + } + + super.close(); + } } diff --git a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/http/HttpUtil.java b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/http/HttpUtil.java deleted file mode 100644 index 50c1428..0000000 --- a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/http/HttpUtil.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.tools.sync.polaris.http; - -import java.net.URLEncoder; -import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.stream.Collectors; - -/** Encapsulates handy http utility methods. */ -public class HttpUtil { - - /** Turn a {@link Map} into an xxx-url-form-encoded compatible String form body. */ - public static String constructFormEncodedString(Map parameters) { - return parameters.entrySet().stream() - .map( - entry -> - URLEncoder.encode(entry.getKey(), StandardCharsets.UTF_8) - + "=" - + URLEncoder.encode(entry.getValue(), StandardCharsets.UTF_8)) - .collect(Collectors.joining("&")); - } -} diff --git a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/http/OAuth2Util.java b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/http/OAuth2Util.java deleted file mode 100644 index 98ada78..0000000 --- a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/http/OAuth2Util.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.tools.sync.polaris.http; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.util.Map; -import java.util.NoSuchElementException; -import org.apache.http.HttpHeaders; -import org.apache.http.entity.ContentType; - -/** Utility class to manage OAuth2 flow for a Polaris instance. */ -public class OAuth2Util { - - private static final HttpClient httpClient = HttpClient.newHttpClient(); - - private static final ObjectMapper objectMapper = new ObjectMapper(); - - public static String fetchToken( - String oauth2ServerUri, String clientId, String clientSecret, String scope) - throws IOException { - - Map formBody = - Map.of( - "grant_type", "client_credentials", - "scope", scope, - "client_id", clientId, - "client_secret", clientSecret); - - String formBodyAsString = HttpUtil.constructFormEncodedString(formBody); - - HttpRequest request = - HttpRequest.newBuilder() - .uri(URI.create(oauth2ServerUri)) - .header(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.getMimeType()) - .POST(HttpRequest.BodyPublishers.ofString(formBodyAsString)) - .build(); - - try { - HttpResponse response = - httpClient.send(request, HttpResponse.BodyHandlers.ofString()); - Map responseBody = - objectMapper.readValue(response.body(), new TypeReference<>() {}); - - String accessToken = responseBody.getOrDefault("access_token", null); - - if (accessToken != null) { - return accessToken; - } - - throw new NoSuchElementException( - "No field 'access_token' found in response from oauth2-server-uri."); - } catch (Exception e) { - throw new RuntimeException("Could not fetch access token", e); - } - } -} diff --git a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/IcebergCatalogService.java b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/IcebergCatalogService.java index fe7020b..240e5f8 100644 --- a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/IcebergCatalogService.java +++ b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/IcebergCatalogService.java @@ -23,6 +23,7 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import java.io.Closeable; import java.util.List; import java.util.Map; @@ -30,7 +31,7 @@ * Wrapper around {@link org.apache.iceberg.catalog.Catalog} that exposes functionality * that uses multiple Iceberg operations. For example, cascading drops of namespaces. */ -public interface IcebergCatalogService { +public interface IcebergCatalogService extends AutoCloseable { // NAMESPACES List listNamespaces(Namespace parentNamespace); diff --git a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/PolarisService.java b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/PolarisService.java index f011880..b1cb8e8 100644 --- a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/PolarisService.java +++ b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/PolarisService.java @@ -26,13 +26,14 @@ import org.apache.polaris.core.admin.model.PrincipalRole; import org.apache.polaris.core.admin.model.PrincipalWithCredentials; +import java.io.Closeable; import java.util.List; import java.util.Map; /** * Generic wrapper for a Polaris entity store. */ -public interface PolarisService { +public interface PolarisService extends AutoCloseable { /** * Called to perform initializing tasks for a Polaris entity store. diff --git a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/impl/PolarisApiService.java b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/impl/PolarisApiService.java index f454775..9afc389 100644 --- a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/impl/PolarisApiService.java +++ b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/impl/PolarisApiService.java @@ -19,7 +19,6 @@ package org.apache.polaris.tools.sync.polaris.service.impl; -import org.apache.http.HttpHeaders; import org.apache.iceberg.catalog.Namespace; import org.apache.polaris.core.admin.model.AddGrantRequest; import org.apache.polaris.core.admin.model.Catalog; @@ -39,10 +38,11 @@ import org.apache.polaris.management.ApiClient; import org.apache.polaris.management.client.PolarisManagementDefaultApi; import org.apache.polaris.tools.sync.polaris.access.AccessControlService; -import org.apache.polaris.tools.sync.polaris.http.OAuth2Util; +import org.apache.polaris.tools.sync.polaris.auth.AuthenticationSessionWrapper; import org.apache.polaris.tools.sync.polaris.service.IcebergCatalogService; import org.apache.polaris.tools.sync.polaris.service.PolarisService; +import java.io.IOException; import java.util.List; import java.util.Map; @@ -68,6 +68,8 @@ public class PolarisApiService implements PolarisService { private boolean icebergWriteAccess = false; + private AuthenticationSessionWrapper authenticationSession = null; + public PolarisApiService() {} @Override @@ -75,25 +77,14 @@ public void initialize(Map properties) throws Exception { this.properties = properties; String baseUrl = properties.get("base-url"); - String token = properties.get("bearer-token"); - - if (token == null) { - String oauth2ServerUri = properties.get("oauth2-server-uri"); - String clientId = properties.get("client-id"); - String clientSecret = properties.get("client-secret"); - String scope = properties.get("scope"); - - token = OAuth2Util.fetchToken(oauth2ServerUri, clientId, clientSecret, scope); - } - - String bearerToken = token; // to make it effectively final to use it in a lambda ApiClient client = new ApiClient(); client.updateBaseUri(baseUrl + "/api/management/v1"); - // TODO: Add token refresh - client.setRequestInterceptor(requestBuilder -> - requestBuilder.header(HttpHeaders.AUTHORIZATION, "Bearer " + bearerToken)); + this.authenticationSession = new AuthenticationSessionWrapper(properties); + + client.setRequestInterceptor(requestBuilder + -> authenticationSession.getSessionHeaders().forEach(requestBuilder::header)); this.baseUrl = baseUrl; this.api = new PolarisManagementDefaultApi(client); @@ -284,4 +275,21 @@ public IcebergCatalogService initializeIcebergCatalogService(String catalogName) baseUrl, catalogName, omnipotentPrincipal, properties); } + @Override + public void close() throws IOException { + AuthenticationSessionWrapper session = this.authenticationSession; + + try (session) {} + finally { + this.properties = null; + this.baseUrl = null; + this.api = null; + this.accessControlService = null; + this.omnipotentPrincipal = null; + this.omnipotentPrincipalRole = null; + this.icebergWriteAccess = false; + this.authenticationSession = null; + } + } + } diff --git a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/impl/PolarisIcebergCatalogService.java b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/impl/PolarisIcebergCatalogService.java index 3955c67..cb9b50a 100644 --- a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/impl/PolarisIcebergCatalogService.java +++ b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/impl/PolarisIcebergCatalogService.java @@ -28,6 +28,7 @@ import org.apache.polaris.tools.sync.polaris.catalog.PolarisCatalog; import org.apache.polaris.tools.sync.polaris.service.IcebergCatalogService; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -160,4 +161,9 @@ public void dropTableWithoutPurge(TableIdentifier tableIdentifier) { this.catalog.dropTable(tableIdentifier, false /* purge */); } + @Override + public void close() throws IOException { + this.catalog.close(); + } + } diff --git a/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/CLIUtil.java b/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/CLIUtil.java new file mode 100644 index 0000000..1232fe6 --- /dev/null +++ b/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/CLIUtil.java @@ -0,0 +1,26 @@ +package org.apache.polaris.tools.sync.polaris; + +/** + * CLI specific utilities and constants. + */ +public class CLIUtil { + + public static final String API_SERVICE_PROPERTIES_DESCRIPTION = + "\nProperties:" + + "\n\t- base-url: the base url of the Polaris instance (eg. http://localhost:8181)" + + "\n\t- token: the bearer token to authenticate against the Polaris instance with." + + "\n\t- oauth2-server-uri: the uri of the OAuth2 server to authenticate to. (eg. http://localhost:8181/api/catalog/v1/oauth/tokens)" + + "\n\t- credential: the client credentials to use to authenticate against the Polaris instance (eg. :client_secret>)" + + "\n\t- scope: the scope to authenticate with for the service_admin (eg. PRINCIPAL_ROLE:ALL)"; + + public static final String OMNIPOTENT_PRINCIPAL_PROPERTIES_DESCRIPTION = + "\nOmnipotent Principal Properties:" + + "\n\t- omnipotent-principal-name: the name of the omnipotent principal created using create-omnipotent-principal on the source Polaris" + + "\n\t- omnipotent-principal-client-id: the client id of the omnipotent principal created using create-omnipotent-principal on the source Polaris" + + "\n\t- omnipotent-principal-client-secret: the client secret of the omnipotent principal created using create-omnipotent-principal on the source Polaris" + + "\n\t- omnipotent-principal-oauth2-server-uri: (default: /v1/oauth/tokens endpoint for provided Polaris base-url) " + + "the OAuth2 server to use to authenticate the omnipotent-principal for Iceberg catalog access"; + + private CLIUtil() {} + +} diff --git a/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/CreateOmnipotentPrincipalCommand.java b/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/CreateOmnipotentPrincipalCommand.java index df65f81..39394c7 100644 --- a/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/CreateOmnipotentPrincipalCommand.java +++ b/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/CreateOmnipotentPrincipalCommand.java @@ -56,15 +56,7 @@ public class CreateOmnipotentPrincipalCommand implements Callable { @CommandLine.Option( names = {"--polaris-api-connection-properties"}, required = true, - description = "The connection properties to connect to the Polaris API." + - "\nProperties:" + - "\n\t- base-url: the base url of the Polaris instance (eg. http://localhost:8181)" + - "\n\t- bearer-token: the bearer token to authenticate against the Polaris instance with. Must " + - "be provided if any of oauth2-server-uri, client-id, client-secret, or scope are not provided." + - "\n\t- oauth2-server-uri: the uri of the OAuth2 server to authenticate to. (eg. http://localhost:8181/api/catalog/v1/oauth/tokens)" + - "\n\t- client-id: the client id belonging to a service admin to authenticate with" + - "\n\t- client-secret: the client secret belong to a service admin to authenticate with" + - "\n\t- scope: the scope to authenticate with for the service_admin (eg. PRINCIPAL_ROLE:ALL)" + description = CLIUtil.API_SERVICE_PROPERTIES_DESCRIPTION ) private Map polarisApiConnectionProperties; @@ -96,100 +88,102 @@ public Integer call() throws Exception { polarisApiConnectionProperties.putIfAbsent(PolarisApiService.ICEBERG_WRITE_ACCESS_PROPERTY, String.valueOf(withWriteAccess)); - PolarisService polaris = PolarisServiceFactory.createPolarisService( - PolarisServiceFactory.ServiceType.API, polarisApiConnectionProperties); + try (PolarisService polaris = PolarisServiceFactory.createPolarisService( + PolarisServiceFactory.ServiceType.API, polarisApiConnectionProperties)) { + + AccessControlService accessControlService = new AccessControlService((PolarisApiService) polaris); + + PrincipalWithCredentials principalWithCredentials; + + try { + principalWithCredentials = accessControlService.createOmnipotentPrincipal(replace); + } catch (Exception e) { + consoleLog.error("Failed to create omnipotent principal.", e); + return 1; + } + + consoleLog.info( + "Created omnipotent principal {}.", principalWithCredentials.getPrincipal().getName()); + + PrincipalRole principalRole; + + try { + principalRole = + accessControlService.createAndAssignPrincipalRole(principalWithCredentials, replace); + } catch (Exception e) { + consoleLog.error("Failed to create omnipotent principal role and assign it to principal.", e); + return 1; + } + + consoleLog.info( + "Created omnipotent principal role {} and assigned it to omnipotent principal {}.", + principalWithCredentials.getPrincipal().getName(), + principalRole.getName()); + + List catalogs = polaris.listCatalogs(); + + consoleLog.info("Identified {} catalogs to create catalog roles for.", catalogs.size()); + + final String permissionLevel = withWriteAccess ? "write" : "readonly"; + + AtomicInteger completedCatalogSetups = new AtomicInteger(0); + + Queue failedCatalogs = new ConcurrentLinkedQueue<>(); + + ExecutorService executor = Executors.newFixedThreadPool(concurrency); + + List> futures = new ArrayList<>(); + + for (Catalog catalog : catalogs) { + CompletableFuture future = + CompletableFuture.runAsync( + () -> { + try { + accessControlService.setupOmnipotentRoleForCatalog( + catalog.getName(), principalRole, replace, withWriteAccess); + } catch (Exception e) { + failedCatalogs.add(catalog); + consoleLog.error( + "Failed to setup omnipotent catalog role for catalog {} with {} access. - {}/{}", + catalog.getName(), + permissionLevel, + completedCatalogSetups.incrementAndGet(), + catalogs.size(), + e); + return; + } + + consoleLog.info( + "Finished omnipotent principal setup for catalog {} with {} access. - {}/{}", + catalog.getName(), + permissionLevel, + completedCatalogSetups.incrementAndGet(), + catalogs.size()); + }, + executor); + + futures.add(future); + } + + futures.forEach(CompletableFuture::join); + + consoleLog.info( + "Encountered issues creating catalog roles for the following catalogs: {}", + failedCatalogs.stream().map(Catalog::getName).toList()); + + consoleLog.info( + "\n======================================================\n" + + "Omnipotent Principal Credentials:\n" + + "\tname = {}\n" + + "\tclientId = {}\n" + + "\tclientSecret = {}\n" + + "======================================================", + principalWithCredentials.getPrincipal().getName(), + principalWithCredentials.getCredentials().getClientId(), + principalWithCredentials.getCredentials().getClientSecret()); - AccessControlService accessControlService = new AccessControlService((PolarisApiService) polaris); - - PrincipalWithCredentials principalWithCredentials; - - try { - principalWithCredentials = accessControlService.createOmnipotentPrincipal(replace); - } catch (Exception e) { - consoleLog.error("Failed to create omnipotent principal.", e); - return 1; } - consoleLog.info( - "Created omnipotent principal {}.", principalWithCredentials.getPrincipal().getName()); - - PrincipalRole principalRole; - - try { - principalRole = - accessControlService.createAndAssignPrincipalRole(principalWithCredentials, replace); - } catch (Exception e) { - consoleLog.error("Failed to create omnipotent principal role and assign it to principal.", e); - return 1; - } - - consoleLog.info( - "Created omnipotent principal role {} and assigned it to omnipotent principal {}.", - principalWithCredentials.getPrincipal().getName(), - principalRole.getName()); - - List catalogs = polaris.listCatalogs(); - - consoleLog.info("Identified {} catalogs to create catalog roles for.", catalogs.size()); - - final String permissionLevel = withWriteAccess ? "write" : "readonly"; - - AtomicInteger completedCatalogSetups = new AtomicInteger(0); - - Queue failedCatalogs = new ConcurrentLinkedQueue<>(); - - ExecutorService executor = Executors.newFixedThreadPool(concurrency); - - List> futures = new ArrayList<>(); - - for (Catalog catalog : catalogs) { - CompletableFuture future = - CompletableFuture.runAsync( - () -> { - try { - accessControlService.setupOmnipotentRoleForCatalog( - catalog.getName(), principalRole, replace, withWriteAccess); - } catch (Exception e) { - failedCatalogs.add(catalog); - consoleLog.error( - "Failed to setup omnipotent catalog role for catalog {} with {} access. - {}/{}", - catalog.getName(), - permissionLevel, - completedCatalogSetups.incrementAndGet(), - catalogs.size(), - e); - return; - } - - consoleLog.info( - "Finished omnipotent principal setup for catalog {} with {} access. - {}/{}", - catalog.getName(), - permissionLevel, - completedCatalogSetups.incrementAndGet(), - catalogs.size()); - }, - executor); - - futures.add(future); - } - - futures.forEach(CompletableFuture::join); - - consoleLog.info( - "Encountered issues creating catalog roles for the following catalogs: {}", - failedCatalogs.stream().map(Catalog::getName).toList()); - - consoleLog.info( - "\n======================================================\n" - + "Omnipotent Principal Credentials:\n" - + "\tname = {}\n" - + "\tclientId = {}\n" - + "\tclientSecret = {}\n" - + "======================================================", - principalWithCredentials.getPrincipal().getName(), - principalWithCredentials.getCredentials().getClientId(), - principalWithCredentials.getCredentials().getClientSecret()); - return 0; } } diff --git a/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/SyncPolarisCommand.java b/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/SyncPolarisCommand.java index bfd64e2..b3f4cc9 100644 --- a/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/SyncPolarisCommand.java +++ b/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/SyncPolarisCommand.java @@ -18,8 +18,6 @@ */ package org.apache.polaris.tools.sync.polaris; -import java.io.Closeable; -import java.io.IOException; import java.util.Map; import java.util.concurrent.Callable; import org.apache.polaris.tools.sync.polaris.catalog.ETagManager; @@ -49,42 +47,14 @@ public class SyncPolarisCommand implements Callable { @CommandLine.Option( names = {"--source-properties"}, required = true, - description = "Properties to initialize Polaris entity source." + - "\nProperties:" + - "\n\t- base-url: the base url of the Polaris instance (eg. http://localhost:8181)" + - "\n\t- bearer-token: the bearer token to authenticate against the Polaris instance with. Must " + - "be provided if any of oauth2-server-uri, client-id, client-secret, or scope are not provided." + - "\n\t- oauth2-server-uri: the uri of the OAuth2 server to authenticate to. (eg. http://localhost:8181/api/catalog/v1/oauth/tokens)" + - "\n\t- client-id: the client id belonging to a service admin to authenticate with" + - "\n\t- client-secret: the client secret belong to a service admin to authenticate with" + - "\n\t- scope: the scope to authenticate with for the service_admin (eg. PRINCIPAL_ROLE:ALL)" + - "\nOmnipotent Principal Properties:" + - "\n\t- omnipotent-principal-name: the name of the omnipotent principal created using create-omnipotent-principal on the source Polaris" + - "\n\t- omnipotent-principal-client-id: the client id of the omnipotent principal created using create-omnipotent-principal on the source Polaris" + - "\n\t- omnipotent-principal-client-secret: the client secret of the omnipotent principal created using create-omnipotent-principal on the source Polaris" + - "\n\t- omnipotent-principal-oauth2-server-uri: (default: /v1/oauth/tokens endpoint for provided Polaris base-url) " - + "the OAuth2 server to use to authenticate the omnipotent-principal for Iceberg catalog access" + description = CLIUtil.API_SERVICE_PROPERTIES_DESCRIPTION + CLIUtil.OMNIPOTENT_PRINCIPAL_PROPERTIES_DESCRIPTION ) private Map sourceProperties; @CommandLine.Option( names = {"--target-properties"}, required = true, - description = "Properties to initialize Polaris entity target." + - "\nProperties:" + - "\n\t- base-url: the base url of the Polaris instance (eg. http://localhost:8181)" + - "\n\t- bearer-token: the bearer token to authenticate against the Polaris instance with. Must " + - "be provided if any of oauth2-server-uri, client-id, client-secret, or scope are not provided." + - "\n\t- oauth2-server-uri: the uri of the OAuth2 server to authenticate to. (eg. http://localhost:8181/api/catalog/v1/oauth/tokens)" + - "\n\t- client-id: the client id belonging to a service admin to authenticate with" + - "\n\t- client-secret: the client secret belong to a service admin to authenticate with" + - "\n\t- scope: the scope to authenticate with for the service_admin (eg. PRINCIPAL_ROLE:ALL)" + - "\nOmnipotent Principal Properties:" + - "\n\t- omnipotent-principal-name: the name of the omnipotent principal created using create-omnipotent-principal on the target Polaris" + - "\n\t- omnipotent-principal-client-id: the client id of the omnipotent principal created using create-omnipotent-principal on the target Polaris" + - "\n\t- omnipotent-principal-client-secret: the client secret of the omnipotent principal created using create-omnipotent-principal on the target Polaris" + - "\n\t- omnipotent-principal-oauth2-server-uri: (default: /v1/oauth/tokens endpoint for provided Polaris base-url) " - + "the OAuth2 server to use to retrieve a bearer token for the omnipotent-principal" + description = CLIUtil.API_SERVICE_PROPERTIES_DESCRIPTION + CLIUtil.OMNIPOTENT_PRINCIPAL_PROPERTIES_DESCRIPTION ) private Map targetProperties; @@ -130,41 +100,29 @@ public Integer call() throws Exception { sourceProperties.put(PolarisApiService.ICEBERG_WRITE_ACCESS_PROPERTY, Boolean.toString(false)); targetProperties.put(PolarisApiService.ICEBERG_WRITE_ACCESS_PROPERTY, Boolean.toString(true)); - PolarisService source = - PolarisServiceFactory.createPolarisService(PolarisServiceFactory.ServiceType.API, sourceProperties); - PolarisService target = - PolarisServiceFactory.createPolarisService(PolarisServiceFactory.ServiceType.API, targetProperties); - - ETagManager etagService = ETagManagerFactory.createETagManager(etagManagerType, etagManagerProperties); - - Runtime.getRuntime() - .addShutdownHook( - new Thread( - () -> { - if (etagService instanceof Closeable closableETagService) { - try { - closableETagService.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - })); - - PolarisSynchronizer synchronizer = - new PolarisSynchronizer( - consoleLog, - haltOnFailure, - accessControlAwarePlanner, - source, - target, - etagService); - synchronizer.syncPrincipalRoles(); - if (shouldSyncPrincipals) { - consoleLog.warn("Principal migration will reset credentials on the target Polaris instance. " + - "Principal migration will log the new target Principal credentials to stdout."); - synchronizer.syncPrincipals(); + try ( + PolarisService source = PolarisServiceFactory.createPolarisService( + PolarisServiceFactory.ServiceType.API, sourceProperties); + PolarisService target = PolarisServiceFactory.createPolarisService( + PolarisServiceFactory.ServiceType.API, targetProperties); + ETagManager etagManager = ETagManagerFactory.createETagManager(etagManagerType, etagManagerProperties) + ) { + PolarisSynchronizer synchronizer = + new PolarisSynchronizer( + consoleLog, + haltOnFailure, + accessControlAwarePlanner, + source, + target, + etagManager); + synchronizer.syncPrincipalRoles(); + if (shouldSyncPrincipals) { + consoleLog.warn("Principal migration will reset credentials on the target Polaris instance. " + + "Principal migration will log the new target Principal credentials to stdout."); + synchronizer.syncPrincipals(); + } + synchronizer.syncCatalogs(); } - synchronizer.syncCatalogs(); return 0; }