-
Notifications
You must be signed in to change notification settings - Fork 17
Add token refresh and token exchange OAuth2 flows to polaris-synchronizer tool. #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add token refresh and token exchange OAuth2 flows to polaris-synchronizer tool. #8
Conversation
| OAuth2Properties.ACCESS_TOKEN_TYPE, | ||
| OAuth2Properties.JWT_TOKEN_TYPE, | ||
| OAuth2Properties.SAML2_TOKEN_TYPE, | ||
| OAuth2Properties.SAML1_TOKEN_TYPE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is not working as intended. You need to use a LinkedHashSet if you want a Set that preserves insertion order. In this case, a regular List would also work just fine.
Here is an example that demonstrates that Set.of(...) does not preserve insertion order.
jshell> Set<Integer> s = Set.of(1, 5, 2, 4, 3)
s ==> [5, 4, 3, 2, 1]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it to a List! Good catch
| if (properties.containsKey(OAuth2Properties.CREDENTIAL)) { | ||
| return OAuth2Util.AuthSession.fromCredential( | ||
| restClient, | ||
| ThreadPools.newScheduledPool(UUID.randomUUID() + "-token-refresh", 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It could be useful to add a comment here (and below) for code maintainability. It looks like the thread pool with never be shut down, and therefore that the application can never terminate. But the Iceberg documentation states that threads created by ThreadPools.newScheduledPool(...) will be daemon threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a comment clarifying this.
|
Hi @mansehajsingh I have a few questions about the features being introduced here:
In Polaris, the token exchange grant type is meant primarily for token refreshes. What is the use case here? Where is the subject token expected to come from?
What is the use case for the actor token, and where it is supposed to come from? Asking because the actor token is not honored by OSS Polaris, only vendor-specific products make usage of actor tokens, e.g. Tabular. Which makes me realize that we might be "sneaking in" some vendor-specific features here. I don't mind doing so, but I think they should be more clearly flagged as Snowflake-specific features.
That's interesting, thanks for link 👍 I didn't know that Open Catalog already had support for external authentication, since OSS Polaris doesn't – but hopefully not for a long time: apache/polaris#1397. We probably should make sure that whichever support is added here for external auth also works with OSS Polaris with an external IDP like Keycloak or Auth0. |
|
Hi @adutra!
To be honest, writing this part of the implementation I was trying to keep a level of parity with the way authentication was implemented in
You're right- I had mislabelled this, I've updated this in the PR description and the comments. I had been confused looking at the implementation of Iceberg's
Agreed, are we okay to merge this PR ahead of time once it has gone through review and open up an issue to ensure that when external OAuth is finalized in Polaris we ensure that this external OAuth support is compatible? |
|
cc: @collado-mike |
I cannot see a use case where token exchange is going to be necessary in the context of a catalog synchronization. I would suggest to refrain from including support for that initially, and wait until someone actually comes up with a valid use case.
You are not the only one 😄 Here is some context: The only situation where a token exchange happens in Iceberg and it's not a token refresh scenario, is when the server "vends" an OAuth token to the client as part of a In that scenario, immediately after the vended token is received, a token exchange happens and the vended token becomes the subject token and the client's current OAuth2 token becomes the actor token. I do not know of any catalog server that uses this feature. Polaris OSS and Nessie do not support it. I suspect Tabular was making use of it. And I would argue that vending OAuth tokens is not a good practice anyways. All of this to say: I would suggest again to hold off implementing support for this in the context of this catalog synchronization tool.
Yes, that's fine 👍 |
|
@adutra I've gone ahead and removed token exchange flow- thanks for the context! |
| .credential(properties.get(OAuth2Properties.CREDENTIAL)) | ||
| .scope(properties.get(OAuth2Properties.SCOPE)) | ||
| .oauth2ServerUri(properties.get(OAuth2Properties.OAUTH2_SERVER_URI)) | ||
| .token(properties.get(OAuth2Properties.TOKEN)) | ||
| .tokenType(OAuth2Properties.ACCESS_TOKEN_TYPE) | ||
| .optionalOAuthParams(OAuth2Util.buildOptionalParam(properties)) | ||
| .build() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to create the parent session without token and credential since you are going to create a child session with these right after.
| .credential(properties.get(OAuth2Properties.CREDENTIAL)) | |
| .scope(properties.get(OAuth2Properties.SCOPE)) | |
| .oauth2ServerUri(properties.get(OAuth2Properties.OAUTH2_SERVER_URI)) | |
| .token(properties.get(OAuth2Properties.TOKEN)) | |
| .tokenType(OAuth2Properties.ACCESS_TOKEN_TYPE) | |
| .optionalOAuthParams(OAuth2Util.buildOptionalParam(properties)) | |
| .build() | |
| .scope(properties.get(OAuth2Properties.SCOPE)) | |
| .oauth2ServerUri(properties.get(OAuth2Properties.OAUTH2_SERVER_URI)) | |
| .optionalOAuthParams(OAuth2Util.buildOptionalParam(properties)) | |
| .build() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have taken them out 👍
| */ | ||
| private OAuth2Util.AuthSession newAuthSession(Map<String, String> properties) { | ||
|
|
||
| RESTClient restClient = HTTPClient.builder(Map.of()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This client holds resources and must be closed when the application closes. Since we already have a problem with the thread pools created below, as @pingtimeout pointed out, I think that it would be worth spending some time to make this class implement AutoCloseable, then properly implement the close() method and close all the resources.
But then you'd need to make sure that the AuthenticationSessionWrapper.close() method is called. I see that this class is used in two places: PolarisCatalog and PolarisApiService. For PolarisCatalog it's easy because it already has a close() method. However PolarisApiService doesn't, so you might need to investigate how to properly close its AuthenticationSessionWrapper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've gone ahead and made some of the core classes implement Closeable. I've tried to add explicit and appropriate calls to close whenever possible, and made it so that the CLI also ensures that in the event of unexpected failure, there are runtime hooks to ensure the resources are closed appropriately. I've tried to scope the explicit closing to the sections that call the methods to create the closeable resources. Now,
PolarisSynchronizerwill close anyIcebergCatalogServiceit creates, in which casePolarisIcebergCatalogServicewill close the underlyingPolarisCatalogwhich closes its ownAuthenticationSessionWrapper.PolarisApiServiceis closed explicitly by the CLI on program termination as well, closings itsAuthenticationSessionWrapper.
1b7b5bf to
a53e33b
Compare
a53e33b to
69a8b56
Compare
|
The changes here seem good to me though I'd defer to those who are already commenting and have greater context to the auth flows. Thanks for these changes as it helps this tool be more powerful! |
| private final OAuth2Util.AuthSession authSession; | ||
|
|
||
| public AuthenticationSessionWrapper(Map<String, String> properties) { | ||
| this.restClient = HTTPClient.builder(Map.of()) | ||
| .uri(properties.get(OAuth2Properties.OAUTH2_SERVER_URI)) | ||
| .build(); | ||
| this.authSession = this.newAuthSession(this.restClient, properties); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While at it let's tackle the executor issue as well:
| private final OAuth2Util.AuthSession authSession; | |
| public AuthenticationSessionWrapper(Map<String, String> properties) { | |
| this.restClient = HTTPClient.builder(Map.of()) | |
| .uri(properties.get(OAuth2Properties.OAUTH2_SERVER_URI)) | |
| .build(); | |
| this.authSession = this.newAuthSession(this.restClient, properties); | |
| } | |
| private final OAuth2Util.AuthSession authSession; | |
| private final ScheduledExecutorService executor; | |
| public AuthenticationSessionWrapper(Map<String, String> properties) { | |
| this.restClient = HTTPClient.builder(Map.of()) | |
| .uri(properties.get(OAuth2Properties.OAUTH2_SERVER_URI)) | |
| .build(); | |
| this.authSession = this.newAuthSession(this.restClient, properties); | |
| executor = ThreadPools.newScheduledPool(UUID.randomUUID() + "-token-refresh", 1); | |
| } |
| if (this.restClient != null) { | ||
| this.restClient.close(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (this.restClient != null) { | |
| this.restClient.close(); | |
| } | |
| try (restClient; executor){} |
| if (httpClient != null) { | ||
| httpClient.close(); | ||
| } | ||
|
|
||
| if (this.authenticationSession != null) { | ||
| this.authenticationSession.close(); | ||
| } | ||
|
|
||
| super.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (httpClient != null) { | |
| httpClient.close(); | |
| } | |
| if (this.authenticationSession != null) { | |
| this.authenticationSession.close(); | |
| } | |
| super.close(); | |
| AuthenticationSessionWrapper session = authenticationSession; | |
| HttpClient httpClient = this.httpClient; | |
| try (session; httpClient) { | |
| super.close(); | |
| } finally { | |
| this.authenticationSession = null; | |
| this.httpClient = null; | |
| this.objectMapper = null; | |
| this.resourcePaths = null; | |
| } |
| if (this.catalog != null) { | ||
| this.catalog.close(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (this.catalog != null) { | |
| this.catalog.close(); | |
| } | |
| this.catalog.close(); |
| "\n\t- oauth2-server-uri: the uri of the OAuth2 server to authenticate to. (eg. http://localhost:8181/api/catalog/v1/oauth/tokens)" + | ||
| "\n\t- credential: the client credentials to use to authenticate against the Polaris instance (eg. <client_id>:client_secret>)" + | ||
| "\n\t- scope: the scope to authenticate with for the service_admin (eg. PRINCIPAL_ROLE:ALL)" + | ||
| "\n\t- <token_type>=<token>: for token exchange authentication, the token type (eg. urn:ietf:params:oauth:token-type:access_token) with a provided token"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this still hold true?
| * the program reaches the call to {@link Closeable#close()}. | ||
| * @param closeable the resource to close | ||
| */ | ||
| public static void closeResourceOnTermination(final Closeable closeable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is needed, it's possible to close all resources using try-with-resources blocks.
| polarisApiConnectionProperties.putIfAbsent(PolarisApiService.ICEBERG_WRITE_ACCESS_PROPERTY, | ||
| String.valueOf(withWriteAccess)); | ||
|
|
||
| PolarisService polaris = PolarisServiceFactory.createPolarisService( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use try-with-resources instead, it's much more reliable:
try (PolarisService polaris = PolarisServiceFactory.createPolarisService(
PolarisServiceFactory.ServiceType.API, polarisApiConnectionProperties)) {
...
}| PolarisServiceFactory.createPolarisService(PolarisServiceFactory.ServiceType.API, sourceProperties); | ||
| PolarisService target = | ||
| PolarisServiceFactory.createPolarisService(PolarisServiceFactory.ServiceType.API, targetProperties); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use try-with-resources again:
try (
PolarisService source =
PolarisServiceFactory.createPolarisService(PolarisServiceFactory.ServiceType.API, sourceProperties);
PolarisService target = PolarisServiceFactory.createPolarisService(
PolarisServiceFactory.ServiceType.API, targetProperties)) {| } | ||
| } | ||
| })); | ||
| if (etagService instanceof Closeable closeableETagService) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: make ETagManager extend Autocloseable:
public interface ETagManager extends AutoCloseable {
...
@Override
default void close() throws Exception {
}
}Then here:
try (
PolarisService source =
PolarisServiceFactory.createPolarisService(PolarisServiceFactory.ServiceType.API, sourceProperties);
PolarisService target = PolarisServiceFactory.createPolarisService(
PolarisServiceFactory.ServiceType.API, targetProperties);
ETagManager etagService = ETagManagerFactory.createETagManager(etagManagerType,
etagManagerProperties)) {
PolarisSynchronizer synchronizer =
new PolarisSynchronizer(
consoleLog,
haltOnFailure,
accessControlAwarePlanner,
source,
target,
etagService);
synchronizer.syncPrincipalRoles();
if (shouldSyncPrincipals) {
consoleLog.warn(
"Principal migration will reset credentials on the target Polaris instance. " +
"Principal migration will log the new target Principal credentials to stdout.");
synchronizer.syncPrincipals();
}
synchronizer.syncCatalogs();
}
adutra
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mansehajsingh this is almost ready to go, please fix the issue with catalogs improperly closed in PolarisSynchronizer.
| syncNamespaces( | ||
| catalog.getName(), Namespace.empty(), sourceIcebergCatalogService, targetIcebergCatalogService); | ||
|
|
||
| try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks suspicious; the catalogs won't be properly closed if an error is thrown above.
Suggestion:
try(IcebergCatalogService sourceIcebergCatalogService = source.initializeIcebergCatalogService(catalog.getName())) {
clientLogger.info(
"Initialized Iceberg REST catalog for Polaris catalog {} on source.",
catalog.getName());
try(IcebergCatalogService targetIcebergCatalogService = target.initializeIcebergCatalogService(catalog.getName())) {
clientLogger.info(
"Initialized Iceberg REST catalog for Polaris catalog {} on target.",
catalog.getName());
syncNamespaces(
catalog.getName(), Namespace.empty(), sourceIcebergCatalogService, targetIcebergCatalogService);
}
} catch (Exception e) {
if (haltOnFailure) throw new RuntimeException(e);
clientLogger.error(
"Failed to synchronize Iceberg REST catalog for Polaris catalog {}.",
catalog.getName(),
e);
continue;
}|
@adutra Thanks for taking a look! The catalogs should be closed properly now! |
adutra
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM thanks @mansehajsingh !
eric-maynard
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the review @pingtimeout & @dimas-b!
This PR uses the Apache Iceberg OAuth2 utilities to enable a wider array of authentication flows to the tool. Many of the options have been standardized to the same options that Iceberg OAuth2 properties use. Here are a few examples:
credentialwhich is formatted<client_id>:<client_secret>as opposed to separate properties. These will now be refreshed periodically.bearer-tokenthe tool will now usetoken. This initializes a session that does not refresh:3. Polaris supports exchanging an access token for another access token. For this flow, you can now provide a<subject_token_type>=<subject_token>property pair to use for token exchange. Natively within Polaris we only supporturn:ietf:params:oauth:token-type:access_tokenas the subject token type, but all the token types are supported in this PR in case external OAuth is used. As well, in Polaris you need to send the token in theAuthorizationheader as well to call the token exchange endpoint, so you need to specify thetokenproperty as well to provide a bearer token to the token exchange request. The bearer token type will default to token typeurn:ietf:params:oauth:token-type:access_token.client_secretfield, like so (notice the empty client id):oauth2-server-uriof the external oauth server, and we can specify the optional OAuth parameters likeaudiencethrough the CLI as well: