Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public enum PolarisAuthorizableOperation {
UPDATE_NAMESPACE_PROPERTIES(NAMESPACE_WRITE_PROPERTIES),
LIST_TABLES(TABLE_LIST),
CREATE_TABLE_DIRECT(TABLE_CREATE),
CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION(EnumSet.of(TABLE_CREATE, TABLE_WRITE_DATA)),
CREATE_TABLE_STAGED(TABLE_CREATE),
CREATE_TABLE_STAGED_WITH_WRITE_DELEGATION(EnumSet.of(TABLE_CREATE, TABLE_WRITE_DATA)),
REGISTER_TABLE(TABLE_CREATE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,8 @@ public LoadTableResponse createTableDirect(Namespace namespace, CreateTableReque

public LoadTableResponse createTableDirectWithWriteDelegation(
Namespace namespace, CreateTableRequest request) {
PolarisAuthorizableOperation op = PolarisAuthorizableOperation.CREATE_TABLE_DIRECT;
PolarisAuthorizableOperation op =
PolarisAuthorizableOperation.CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION;
authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
op, TableIdentifier.of(namespace, request.name()));

Expand Down Expand Up @@ -591,20 +592,18 @@ public LoadTableResponse createTableDirectWithWriteDelegation(
LoadTableResponse.Builder responseBuilder =
LoadTableResponse.builder().withTableMetadata(tableMetadata);
if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) {
try {
Set<PolarisStorageActions> actionsRequested =
getValidTableActionsOrThrow(tableIdentifier);

LOG.atDebug()
.addKeyValue("tableIdentifier", tableIdentifier)
.addKeyValue("tableLocation", tableMetadata.location())
.log("Fetching client credentials for table");
responseBuilder.addAllConfig(
credentialDelegation.getCredentialConfig(
tableIdentifier, tableMetadata, actionsRequested));
} catch (ForbiddenException | NoSuchTableException e) {
// No privileges available
}
LOG.atDebug()
.addKeyValue("tableIdentifier", tableIdentifier)
.addKeyValue("tableLocation", tableMetadata.location())
.log("Fetching client credentials for table");
responseBuilder.addAllConfig(
credentialDelegation.getCredentialConfig(
tableIdentifier,
tableMetadata,
Set.of(
PolarisStorageActions.READ,
PolarisStorageActions.WRITE,
PolarisStorageActions.LIST)));
}
return responseBuilder.build();
} else if (table instanceof BaseMetadataTable) {
Expand Down Expand Up @@ -706,18 +705,13 @@ public LoadTableResponse createTableStagedWithWriteDelegation(
LoadTableResponse.builder().withTableMetadata(metadata);

if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) {
try {
Set<PolarisStorageActions> actionsRequested = getValidTableActionsOrThrow(ident);

LOG.atDebug()
.addKeyValue("tableIdentifier", ident)
.addKeyValue("tableLocation", metadata.location())
.log("Fetching client credentials for table");
responseBuilder.addAllConfig(
credentialDelegation.getCredentialConfig(ident, metadata, actionsRequested));
} catch (ForbiddenException | NoSuchTableException e) {
// No privileges available
}
LOG.atDebug()
.addKeyValue("tableIdentifier", ident)
.addKeyValue("tableLocation", metadata.location())
.log("Fetching client credentials for table");
responseBuilder.addAllConfig(
credentialDelegation.getCredentialConfig(
ident, metadata, Set.of(PolarisStorageActions.ALL)));
}
return responseBuilder.build();
});
Expand Down Expand Up @@ -779,39 +773,31 @@ public LoadTableResponse loadTable(TableIdentifier tableIdentifier, String snaps
return doCatalogOperation(() -> CatalogHandlers.loadTable(baseCatalog, tableIdentifier));
}

private Set<PolarisStorageActions> getValidTableActionsOrThrow(TableIdentifier tableIdentifier) {
public LoadTableResponse loadTableWithAccessDelegation(
TableIdentifier tableIdentifier, String xIcebergAccessDelegation, String snapshots) {
// Here we have a single method that falls through multiple candidate
// PolarisAuthorizableOperations because instead of identifying the desired operation up-front
// and
// failing the authz check if grants aren't found, we find the first most-privileged authz match
// and respond according to that.
PolarisAuthorizableOperation read =
PolarisAuthorizableOperation.LOAD_TABLE_WITH_READ_DELEGATION;
PolarisAuthorizableOperation write =
PolarisAuthorizableOperation.LOAD_TABLE_WITH_WRITE_DELEGATION;

Set<PolarisStorageActions> actionsRequested =
new HashSet<>(Set.of(PolarisStorageActions.READ, PolarisStorageActions.LIST));
try {
// TODO: Refactor to have a boolean-return version of the helpers so we can fallthrough
// easily.
authorizeBasicTableLikeOperationOrThrow(write, PolarisEntitySubType.TABLE, tableIdentifier);
actionsRequested.add(PolarisStorageActions.WRITE);
} catch (ForbiddenException | NoSuchTableException e) {
LOG.atDebug()
.addKeyValue("tableIdentifier", tableIdentifier)
.log("Authz failed for LOAD_TABLE_WITH_WRITE_DELEGATION so attempting READ only");
} catch (ForbiddenException e) {
authorizeBasicTableLikeOperationOrThrow(read, PolarisEntitySubType.TABLE, tableIdentifier);
}
return actionsRequested;
}

public LoadTableResponse loadTableWithAccessDelegation(
TableIdentifier tableIdentifier, String xIcebergAccessDelegation, String snapshots) {
// Here we have a single method that falls through multiple candidate
// PolarisAuthorizableOperations because instead of identifying the desired operation up-front
// and
// failing the authz check if grants aren't found, we find the first most-privileged authz match
// and respond according to that.

// TODO: Find a way for the configuration or caller to better express whether to fail or omit
// when data-access is specified but access delegation grants are not found.
Set<PolarisStorageActions> actionsRequested = getValidTableActionsOrThrow(tableIdentifier);

return doCatalogOperation(
() -> {
Table table = baseCatalog.loadTable(tableIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,61 @@ public void testCreateTableDirectInsufficientPermissions() {
});
}

@Test
public void testCreateTableDirectWithWriteDelegationAllSufficientPrivileges() {
Assertions.assertThat(
adminService.grantPrivilegeOnCatalogToRole(
CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.TABLE_DROP))
.isTrue();
Assertions.assertThat(
adminService.grantPrivilegeOnCatalogToRole(
CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.TABLE_WRITE_DATA))
.isTrue();

final TableIdentifier newtable = TableIdentifier.of(NS2, "newtable");
final CreateTableRequest createDirectWithWriteDelegationRequest =
CreateTableRequest.builder().withName("newtable").withSchema(SCHEMA).stageCreate().build();

doTestSufficientPrivilegeSets(
List.of(
Set.of(PolarisPrivilege.TABLE_CREATE, PolarisPrivilege.TABLE_WRITE_DATA),
Set.of(PolarisPrivilege.CATALOG_MANAGE_CONTENT)),
() -> {
newWrapper(Set.of(PRINCIPAL_ROLE1))
.createTableDirectWithWriteDelegation(NS2, createDirectWithWriteDelegationRequest);
},
() -> {
newWrapper(Set.of(PRINCIPAL_ROLE2)).dropTableWithPurge(newtable);
},
PRINCIPAL_NAME);
}

@Test
public void testCreateTableDirectWithWriteDelegationInsufficientPermissions() {
final CreateTableRequest createDirectWithWriteDelegationRequest =
CreateTableRequest.builder()
.withName("directtable")
.withSchema(SCHEMA)
.stageCreate()
.build();

doTestInsufficientPrivileges(
List.of(
PolarisPrivilege.NAMESPACE_FULL_METADATA,
PolarisPrivilege.VIEW_FULL_METADATA,
PolarisPrivilege.TABLE_DROP,
PolarisPrivilege.TABLE_CREATE, // TABLE_CREATE itself is insufficient for delegation
PolarisPrivilege.TABLE_READ_PROPERTIES,
PolarisPrivilege.TABLE_WRITE_PROPERTIES,
PolarisPrivilege.TABLE_READ_DATA,
PolarisPrivilege.TABLE_WRITE_DATA,
PolarisPrivilege.TABLE_LIST),
() -> {
newWrapper(Set.of(PRINCIPAL_ROLE1))
.createTableDirectWithWriteDelegation(NS2, createDirectWithWriteDelegationRequest);
});
}

@Test
public void testCreateTableStagedAllSufficientPrivileges() {
Assertions.assertThat(
Expand Down
65 changes: 44 additions & 21 deletions regtests/t_pyspark/src/test_spark_sql_s3_with_privileges.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from polaris.management import PolarisDefaultApi, Principal, PrincipalRole, CatalogRole, \
CatalogGrant, CatalogPrivilege, ApiException, CreateCatalogRoleRequest, CreatePrincipalRoleRequest, \
CreatePrincipalRequest, AddGrantRequest, GrantCatalogRoleRequest, GrantPrincipalRoleRequest
from polaris.management.exceptions import ForbiddenException


@pytest.fixture
Expand Down Expand Up @@ -742,8 +743,7 @@ def test_spark_credentials_s3_direct_without_write(root_client, snowflake_catalo
def test_spark_credentials_s3_direct_without_read(
snowflake_catalog, snowman_catalog_client, creator_catalog_client, test_bucket):
"""
Create a table using `creator`, which does not have TABLE_READ_DATA and ensure that credentials to read the table
are not vended.
Create a table using `creator`, which does not have TABLE_READ_DATA and expect a `ForbiddenException`
"""
snowman_catalog_client.create_namespace(
prefix=snowflake_catalog.name,
Expand All @@ -752,26 +752,23 @@ def test_spark_credentials_s3_direct_without_read(
)
)

response = creator_catalog_client.create_table(
prefix=snowflake_catalog.name,
namespace="some_schema",
x_iceberg_access_delegation="true",
create_table_request=CreateTableRequest(
name="some_table",
var_schema=ModelSchema(
type = 'struct',
fields = [],
)
)
)

assert not response.config
try:
creator_catalog_client.create_table(
prefix=snowflake_catalog.name,
namespace="some_schema",
x_iceberg_access_delegation="true",
create_table_request=CreateTableRequest(
name="some_table",
var_schema=ModelSchema(
type = 'struct',
fields = [],
)
)
)
pytest.fail("Expected exception when creating a table without TABLE_WRITE")
except Exception as e:
assert 'CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION' in str(e)

snowman_catalog_client.drop_table(
prefix=snowflake_catalog.name,
namespace="some_schema",
table="some_table"
)
snowman_catalog_client.drop_namespace(
prefix=snowflake_catalog.name,
namespace="some_schema"
Expand Down Expand Up @@ -883,6 +880,32 @@ def test_spark_credentials_s3_scoped_to_metadata_data_locations(root_client, sno
spark.sql('DROP NAMESPACE db1.schema')
spark.sql('DROP NAMESPACE db1')

@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true', reason='AWS_TEST_ENABLED is not set or is false')
def test_spark_ctas(snowflake_catalog, polaris_catalog_url, snowman):
"""
Create a table using CTAS and ensure that credentials are vended
:param root_client:
:param snowflake_catalog:
:return:
"""
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
table_name = f'iceberg_test_table_{str(uuid.uuid4())[-10:]}'
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('CREATE NAMESPACE db1')
spark.sql('CREATE NAMESPACE db1.schema')
spark.sql('USE db1.schema')
spark.sql(f'CREATE TABLE {table_name}_t1 (col1 int)')
spark.sql('SHOW TABLES')

# Insert some data
spark.sql(f"INSERT INTO {table_name}_t1 VALUES (10)")

# Run CTAS
spark.sql(f"CREATE TABLE {table_name}_t2 AS SELECT * FROM {table_name}_t1")


def create_catalog_role(api, catalog, role_name):
catalog_role = CatalogRole(name=role_name)
try:
Expand Down