Skip to content
Merged
16 changes: 5 additions & 11 deletions polaris-synchronizer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ with 10 concurrent catalog setup threads:
java -jar cli/build/libs/polaris-synchronizer-cli.jar create-omnipotent-principal \
--polaris-api-connection-properties base-url=http://localhost:8181 \
--polaris-api-connection-properties oauth2-server-uri=http://localhost:8181/api/catalog/v1/oauth/tokens \
--polaris-api-connection-properties client-id=root \
--polaris-api-connection-properties client-secret=<client_secret> \
--polaris-api-connection-properties credential=<client_id>:<client_secret> \
--polaris-api-connection-properties scope=PRINCIPAL_ROLE:ALL \
--replace \ # replace it if it already exists
--concurrency 10 # 10 concurrent catalog setup threads
Expand Down Expand Up @@ -105,8 +104,7 @@ java -jar cli/build/libs/polaris-synchronizer-cli.jar \
create-omnipotent-principal \
--polaris-api-connection-properties base-url=http://localhost:8181 \
--polaris-api-connection-properties oauth2-server-uri=http://localhost:8181/api/catalog/v1/oauth/tokens \
--polaris-api-connection-properties client-id=root \
--polaris-api-connection-properties client-secret=<client_secret> \
--polaris-api-connection-properties credential=<client_id>:<client_secret> \
--polaris-api-connection-properties scope=PRINCIPAL_ROLE:ALL \
--replace \ # replace if it already exists
--concurrency 10 \ # 10 concurrent catalog setup threads
Expand Down Expand Up @@ -143,22 +141,18 @@ diff between the source and target Polaris instances. This can be achieved using
> Polaris instance. The new credentials will be logged to stdout, ONLY for each newly created or overwritten principal.
> Please note that this output should be securely managed, client credentials should only ever be stored in a secure vault.

**Example** Running the synchronization between source Polaris instance using an access token, and a target Polaris instance
**Example** Running the synchronization between source Polaris instance using a bearer token, and a target Polaris instance
using client credentials.
```
java -jar cli/build/libs/polaris-synchronizer-cli.jar sync-polaris \
--source-properties base-url=http://localhost:8181 \
--source-properties client-id=root \
--source-properties client-secret=<client_secret> \
--source-properties oauth2-server-uri=http://localhost:8181/api/catalog/v1/oauth/tokens \
--source-properties scope=PRINCIPAL_ROLE:ALL \
--source-properties token=<bearer_token> \
--source-properties omnipotent-principal-name=omnipotent-principal-XXXXX \
--source-properties omnipotent-principal-client-id=589550e8b23d271e \
--source-properties omnipotent-principal-client-secret=<omni_client_secret> \
--source-properties omnipotent-principal-oauth2-server-uri=http://localhost:8181/api/catalog/v1/oauth/tokens \
--target-properties base-url=http://localhost:5858 \
--target-properties client-id=root \
--target-properties client-secret=<client_secret> \
--target-properties credential=<client_id>:<client_secret> \
--target-properties oauth2-server-uri=http://localhost:5858/api/catalog/v1/oauth/tokens \
--target-properties scope=PRINCIPAL_ROLE:ALL \
--target-properties omnipotent-principal-name=omnipotent-principal-YYYYY \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,41 +630,30 @@ public void syncCatalogs() {
}

for (Catalog catalog : catalogSyncPlan.entitiesToSyncChildren()) {
IcebergCatalogService sourceIcebergCatalogService;

try {
sourceIcebergCatalogService = source.initializeIcebergCatalogService(catalog.getName());
try (IcebergCatalogService sourceIcebergCatalogService = source.initializeIcebergCatalogService(catalog.getName())) {
clientLogger.info(
"Initialized Iceberg REST catalog for Polaris catalog {} on source.",
catalog.getName());
} catch (Exception e) {
if (haltOnFailure) throw e;
clientLogger.error(
"Failed to initialize Iceberg REST catalog for Polaris catalog {} on source.",
catalog.getName(),
e);
continue;
}
"Initialized Iceberg REST catalog for Polaris catalog {} on source.",
catalog.getName());

IcebergCatalogService targetIcebergCatalogService;
try (IcebergCatalogService targetIcebergCatalogService = target.initializeIcebergCatalogService(catalog.getName())) {
clientLogger.info(
"Initialized Iceberg REST catalog for Polaris catalog {} on target.",
catalog.getName());

syncNamespaces(
catalog.getName(), Namespace.empty(), sourceIcebergCatalogService, targetIcebergCatalogService);
}

try {
targetIcebergCatalogService = target.initializeIcebergCatalogService(catalog.getName());
clientLogger.info(
"Initialized Iceberg REST catalog for Polaris catalog {} on target.",
catalog.getName());
} catch (Exception e) {
if (haltOnFailure) throw e;
clientLogger.error(
"Failed to initialize Iceberg REST catalog for Polaris catalog {} on target.",
catalog.getName(),
e);
"Failed to synchronize Iceberg REST catalog for Polaris catalog {}.",
catalog.getName(),
e);
if (haltOnFailure) throw new RuntimeException(e);
continue;
}

syncNamespaces(
catalog.getName(), Namespace.empty(), sourceIcebergCatalogService, targetIcebergCatalogService);

// NOTE: Grants are synced on a per catalog role basis, so we need to ensure that catalog roles
// are only synced AFTER Iceberg catalog entities, because they may depend on the Iceberg catalog
// entities already existing
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package org.apache.polaris.tools.sync.polaris.auth;

import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTClient;
import org.apache.iceberg.rest.auth.AuthConfig;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.util.ThreadPools;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;

/**
* Wraps {@link OAuth2Util.AuthSession} to provide supported authentication flows.
*/
public class AuthenticationSessionWrapper implements Closeable {

private final RESTClient restClient;

private final OAuth2Util.AuthSession authSession;

private final ScheduledExecutorService executor;

public AuthenticationSessionWrapper(Map<String, String> properties) {
this.restClient = HTTPClient.builder(Map.of())
.uri(properties.get(OAuth2Properties.OAUTH2_SERVER_URI))
.build();
this.authSession = this.newAuthSession(this.restClient, properties);
this.executor = ThreadPools.newScheduledPool(UUID.randomUUID() + "-token-refresh", 1);
}

/**
* Initializes a new authentication session. Supports client_credentials and bearer token flow.
* @param properties properties to initialize the session with
* @return an authentication session, with token refresh if applicable
*/
private OAuth2Util.AuthSession newAuthSession(RESTClient restClient, Map<String, String> properties) {
OAuth2Util.AuthSession parent = new OAuth2Util.AuthSession(
Map.of(),
AuthConfig.builder()
.scope(properties.get(OAuth2Properties.SCOPE))
.oauth2ServerUri(properties.get(OAuth2Properties.OAUTH2_SERVER_URI))
.optionalOAuthParams(OAuth2Util.buildOptionalParam(properties))
.build()
);

// This is for client_credentials flow
if (properties.containsKey(OAuth2Properties.CREDENTIAL)) {
return OAuth2Util.AuthSession.fromCredential(
restClient,
// threads created here will be daemon threads, so termination of main program
// will terminate the token refresh thread automatically
this.executor,
properties.get(OAuth2Properties.CREDENTIAL),
parent
);
}

// This is for regular bearer token flow
if (properties.containsKey(OAuth2Properties.TOKEN)) {
return OAuth2Util.AuthSession.fromAccessToken(
restClient,
// threads created here will be daemon threads, so termination of main program
// will terminate the token refresh thread automatically
this.executor,
properties.get(OAuth2Properties.TOKEN),
null, /* defaultExpiresAtMillis */
parent
);
}

throw new IllegalArgumentException("Unable to construct authenticated session with the provided properties.");
}

/**
* Get refreshed authentication headers for session.
*/
public Map<String, String> getSessionHeaders() {
return this.authSession.headers();
}

@Override
public void close() throws IOException {
try (restClient; executor) {}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* Generic interface to provide and store ETags for tables within catalogs. This allows the storage
* of the ETag to be completely independent from the tool.
*/
public interface ETagManager {
public interface ETagManager extends AutoCloseable {

/**
* Used to initialize the instance for use. Should be called prior to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ public String getETag(String catalogName, TableIdentifier tableIdentifier) {

@Override
public void storeETag(String catalogName, TableIdentifier tableIdentifier, String etag) {}

@Override
public void close() {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.iceberg.rest.ResourcePaths;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.LoadTableResponseParser;
import org.apache.polaris.tools.sync.polaris.http.OAuth2Util;
import org.apache.polaris.tools.sync.polaris.auth.AuthenticationSessionWrapper;

/**
* Overrides loadTable default implementation to issue a custom loadTable request to the Polaris
Expand All @@ -58,14 +58,14 @@ public class PolarisCatalog extends RESTCatalog

private Map<String, String> properties = null;

private String accessToken = null;

private HttpClient httpClient = null;

private ObjectMapper objectMapper = null;

private ResourcePaths resourcePaths = null;

private AuthenticationSessionWrapper authenticationSession = null;

public PolarisCatalog() {
super();
}
Expand All @@ -82,22 +82,8 @@ public void initialize(String name, Map<String, String> props) {

super.initialize(name, props);

if (accessToken == null || httpClient == null || this.objectMapper == null) {
String oauth2ServerUri = props.get("uri") + "/v1/oauth/tokens";
String credential = props.get("credential");

String clientId = credential.split(":")[0];
String clientSecret = credential.split(":")[1];

String scope = props.get("scope");

// TODO: Add token refresh
try {
this.accessToken = OAuth2Util.fetchToken(oauth2ServerUri, clientId, clientSecret, scope);
} catch (IOException e) {
throw new RuntimeException(e);
}

if (authenticationSession == null || httpClient == null || this.objectMapper == null) {
this.authenticationSession = new AuthenticationSessionWrapper(this.properties);
this.httpClient = HttpClient.newBuilder().build();
this.objectMapper = new ObjectMapper();
}
Expand Down Expand Up @@ -127,9 +113,10 @@ public Table loadTable(TableIdentifier ident, String etag) {
HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder()
.uri(URI.create(tablePath))
.header(HttpHeaders.AUTHORIZATION, "Bearer " + accessToken)
.GET();

this.authenticationSession.getSessionHeaders().forEach(requestBuilder::header);

// specify last known etag in if-none-match header
if (etag != null) {
requestBuilder.header(HttpHeaders.IF_NONE_MATCH, etag);
Expand Down Expand Up @@ -171,4 +158,21 @@ public Table loadTable(TableIdentifier ident, String etag) {

return new BaseTable(ops, CatalogUtil.fullTableName(catalogName, ident));
}

@Override
public void close() throws IOException {
final AuthenticationSessionWrapper session = this.authenticationSession;
final HttpClient httpClient = this.httpClient;

try (session; httpClient) {
super.close();
} finally {
this.authenticationSession = null;
this.httpClient = null;
this.objectMapper = null;
this.resourcePaths = null;
}

super.close();
}
}

This file was deleted.

Loading