1818 */
1919package org .apache .polaris .service .catalog .iceberg ;
2020
21+ import static org .apache .polaris .service .catalog .AccessDelegationMode .REMOTE_SIGNING ;
2122import static org .apache .polaris .service .catalog .AccessDelegationMode .VENDED_CREDENTIALS ;
2223import static org .apache .polaris .service .catalog .validation .IcebergPropertiesValidation .validateIcebergProperties ;
2324
3233import jakarta .ws .rs .core .HttpHeaders ;
3334import jakarta .ws .rs .core .Response ;
3435import jakarta .ws .rs .core .SecurityContext ;
36+ import jakarta .ws .rs .core .UriInfo ;
3537import java .util .EnumSet ;
3638import java .util .Map ;
3739import java .util .Optional ;
3840import java .util .Set ;
3941import java .util .function .Function ;
4042import org .apache .iceberg .MetadataUpdate ;
43+ import org .apache .iceberg .aws .s3 .signer .S3SignResponse ;
4144import org .apache .iceberg .catalog .Namespace ;
4245import org .apache .iceberg .catalog .TableIdentifier ;
4346import org .apache .iceberg .exceptions .BadRequestException ;
8184import org .apache .polaris .service .context .catalog .CallContextCatalogFactory ;
8285import org .apache .polaris .service .http .IcebergHttpUtil ;
8386import org .apache .polaris .service .http .IfNoneMatch ;
87+ import org .apache .polaris .service .storage .aws .signer .S3RequestSignerFactory ;
8488import org .apache .polaris .service .types .CommitTableRequest ;
8589import org .apache .polaris .service .types .CommitViewRequest ;
8690import org .apache .polaris .service .types .NotificationRequest ;
91+ import org .apache .polaris .service .types .S3SignRequest ;
8792import org .slf4j .Logger ;
8893import org .slf4j .LoggerFactory ;
8994
@@ -144,6 +149,8 @@ public class IcebergCatalogAdapter
144149 private final CatalogPrefixParser prefixParser ;
145150 private final ReservedProperties reservedProperties ;
146151 private final CatalogHandlerUtils catalogHandlerUtils ;
152+ private final S3RequestSignerFactory s3RequestSignerFactory ;
153+ private final UriInfo uriInfo ;
147154
148155 @ Inject
149156 public IcebergCatalogAdapter (
@@ -157,7 +164,9 @@ public IcebergCatalogAdapter(
157164 PolarisAuthorizer polarisAuthorizer ,
158165 CatalogPrefixParser prefixParser ,
159166 ReservedProperties reservedProperties ,
160- CatalogHandlerUtils catalogHandlerUtils ) {
167+ CatalogHandlerUtils catalogHandlerUtils ,
168+ S3RequestSignerFactory s3RequestSignerFactory ,
169+ UriInfo uriInfo ) {
161170 this .realmContext = realmContext ;
162171 this .callContext = callContext ;
163172 this .catalogFactory = catalogFactory ;
@@ -169,6 +178,8 @@ public IcebergCatalogAdapter(
169178 this .prefixParser = prefixParser ;
170179 this .reservedProperties = reservedProperties ;
171180 this .catalogHandlerUtils = catalogHandlerUtils ;
181+ this .s3RequestSignerFactory = s3RequestSignerFactory ;
182+ this .uriInfo = uriInfo ;
172183 }
173184
174185 /**
@@ -205,7 +216,9 @@ IcebergCatalogHandler newHandlerWrapper(SecurityContext securityContext, String
205216 catalogName ,
206217 polarisAuthorizer ,
207218 reservedProperties ,
208- catalogHandlerUtils );
219+ catalogHandlerUtils ,
220+ s3RequestSignerFactory ,
221+ uriInfo );
209222 }
210223
211224 @ Override
@@ -323,11 +336,13 @@ public Response updateProperties(
323336 catalog -> Response .ok (catalog .updateNamespaceProperties (ns , revisedRequest )).build ());
324337 }
325338
326- private EnumSet <AccessDelegationMode > parseAccessDelegationModes (String accessDelegationMode ) {
339+ private static Set <AccessDelegationMode > parseAccessDelegationModes (String accessDelegationMode ) {
327340 EnumSet <AccessDelegationMode > delegationModes =
328341 AccessDelegationMode .fromProtocolValuesList (accessDelegationMode );
329342 Preconditions .checkArgument (
330- delegationModes .isEmpty () || delegationModes .contains (VENDED_CREDENTIALS ),
343+ delegationModes .isEmpty ()
344+ || delegationModes .contains (VENDED_CREDENTIALS )
345+ || delegationModes .contains (REMOTE_SIGNING ),
331346 "Unsupported access delegation mode: %s" ,
332347 accessDelegationMode );
333348 return delegationModes ;
@@ -342,8 +357,7 @@ public Response createTable(
342357 RealmContext realmContext ,
343358 SecurityContext securityContext ) {
344359 validateIcebergProperties (callContext , createTableRequest .properties ());
345- EnumSet <AccessDelegationMode > delegationModes =
346- parseAccessDelegationModes (accessDelegationMode );
360+ Set <AccessDelegationMode > delegationModes = parseAccessDelegationModes (accessDelegationMode );
347361 Namespace ns = decodeNamespace (namespace );
348362 return withCatalog (
349363 securityContext ,
@@ -354,7 +368,8 @@ public Response createTable(
354368 return Response .ok (catalog .createTableStaged (ns , createTableRequest )).build ();
355369 } else {
356370 return Response .ok (
357- catalog .createTableStagedWithWriteDelegation (ns , createTableRequest ))
371+ catalog .createTableStagedWithWriteDelegation (
372+ prefix , ns , createTableRequest , delegationModes ))
358373 .build ();
359374 }
360375 } else if (delegationModes .isEmpty ()) {
@@ -364,7 +379,8 @@ public Response createTable(
364379 .build ();
365380 } else {
366381 LoadTableResponse response =
367- catalog .createTableDirectWithWriteDelegation (ns , createTableRequest );
382+ catalog .createTableDirectWithWriteDelegation (
383+ prefix , ns , createTableRequest , delegationModes );
368384 return tryInsertETagHeader (
369385 Response .ok (response ), response , namespace , createTableRequest .name ())
370386 .build ();
@@ -397,8 +413,7 @@ public Response loadTable(
397413 String snapshots ,
398414 RealmContext realmContext ,
399415 SecurityContext securityContext ) {
400- EnumSet <AccessDelegationMode > delegationModes =
401- parseAccessDelegationModes (accessDelegationMode );
416+ Set <AccessDelegationMode > delegationModes = parseAccessDelegationModes (accessDelegationMode );
402417 Namespace ns = decodeNamespace (namespace );
403418 TableIdentifier tableIdentifier = TableIdentifier .of (ns , RESTUtil .decodeString (table ));
404419
@@ -422,7 +437,8 @@ public Response loadTable(
422437 } else {
423438 response =
424439 catalog
425- .loadTableWithAccessDelegationIfStale (tableIdentifier , ifNoneMatch , snapshots )
440+ .loadTableWithAccessDelegationIfStale (
441+ prefix , tableIdentifier , ifNoneMatch , delegationModes )
426442 .orElseThrow (() -> new WebApplicationException (Response .Status .NOT_MODIFIED ));
427443 }
428444
@@ -588,8 +604,7 @@ public Response loadCredentials(
588604 securityContext ,
589605 prefix ,
590606 catalog -> {
591- LoadTableResponse loadTableResponse =
592- catalog .loadTableWithAccessDelegation (tableIdentifier , "all" );
607+ LoadTableResponse loadTableResponse = catalog .loadTable (tableIdentifier , "all" );
593608 return Response .ok (
594609 ImmutableLoadCredentialsResponse .builder ()
595610 .credentials (loadTableResponse .credentials ())
@@ -797,4 +812,24 @@ public Response getConfig(
797812 .build ())
798813 .build ();
799814 }
815+
816+ @ Override
817+ public Response signS3Request (
818+ String prefix ,
819+ String namespace ,
820+ String table ,
821+ S3SignRequest s3SignRequest ,
822+ RealmContext realmContext ,
823+ SecurityContext securityContext ) {
824+ Namespace ns = decodeNamespace (namespace );
825+ TableIdentifier tableIdentifier = TableIdentifier .of (ns , RESTUtil .decodeString (table ));
826+ return withCatalog (
827+ securityContext ,
828+ prefix ,
829+ catalog -> {
830+ // TODO Cache-Control header
831+ S3SignResponse response = catalog .signS3Request (s3SignRequest , tableIdentifier );
832+ return Response .status (Response .Status .OK ).entity (response ).build ();
833+ });
834+ }
800835}
0 commit comments