Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
* @see PolarisClient#catalogApi(ClientCredentials)
*/
public class CatalogApi extends RestApi {
CatalogApi(Client client, PolarisApiEndpoints endpoints, String authToken, URI uri) {
public CatalogApi(Client client, PolarisApiEndpoints endpoints, String authToken, URI uri) {
super(client, endpoints, authToken, uri);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
* @see PolarisClient#managementApi(ClientCredentials)
*/
public class ManagementApi extends RestApi {
ManagementApi(Client client, PolarisApiEndpoints endpoints, String authToken, URI uri) {
public ManagementApi(Client client, PolarisApiEndpoints endpoints, String authToken, URI uri) {
super(client, endpoints, authToken, uri);
}

Expand Down
14 changes: 6 additions & 8 deletions plugins/spark/v3.5/integration/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,13 @@ dependencies {

implementation(project(":polaris-runtime-service"))

testImplementation(project(":polaris-api-management-model"))
testImplementation(
"org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}"
)
testImplementation(project(":polaris-spark-${sparkMajorVersion}_${scalaVersion}"))

testImplementation(project(":polaris-api-management-model"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 this provides more flexibility on the client side by minimizing dependencies, as it only depends on the Polaris REST spec now.


testImplementation("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") {
// exclude log4j dependencies. Explicit dependencies for the log4j libraries are
// enforced below to ensure the version compatibility
Expand All @@ -64,13 +68,7 @@ dependencies {
testImplementation("io.delta:delta-spark_${scalaVersion}:3.3.1")

testImplementation(platform(libs.jackson.bom))
testImplementation("com.fasterxml.jackson.core:jackson-annotations")
testImplementation("com.fasterxml.jackson.core:jackson-core")
testImplementation("com.fasterxml.jackson.core:jackson-databind")

testImplementation(
"org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}"
)
testImplementation("com.fasterxml.jackson.jakarta.rs:jackson-jakarta-rs-json-provider")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have two Jackson in tests - the relocated one and this?

Copy link
Contributor Author

@gh-yzou gh-yzou Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That one is used by a polaris client test utility that is used to talk to the polaris management API for test setup, such as createCatalog. This test utility has nothing to do with Iceberg, and is not suppose to rely on the iceberg spark client library.


testImplementation(testFixtures(project(":polaris-runtime-service")))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.spark.quarkus.it;

import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.polaris.service.it.ext.PolarisServerManagerLoader.polarisServerManager;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jakarta.rs.json.JacksonJsonProvider;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import java.util.Map;
import java.util.Random;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTClient;
import org.apache.iceberg.rest.auth.AuthSession;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
import org.apache.polaris.service.it.env.ClientCredentials;
import org.apache.polaris.service.it.env.ManagementApi;
import org.apache.polaris.service.it.env.PolarisApiEndpoints;

/**
* This class provides a REST client for the Polaris Management service endpoints and its auth-token
* endpoint, which is used in Spark client tests to run commands that Spark SQL can’t issue directly
* (e.g., createCatalog).
*/
public final class PolarisManagementClient implements AutoCloseable {
private final PolarisApiEndpoints endpoints;
private final Client client;
// Use an alphanumeric ID for widest compatibility in HTTP and SQL.
// Use MAX_RADIX for shorter output.
private final String clientId =
Long.toString(Math.abs(new Random().nextLong()), Character.MAX_RADIX);
// initialization an Iceberg rest client for fetch token
private final RESTClient restClient;

private PolarisManagementClient(PolarisApiEndpoints endpoints) {
this.endpoints = endpoints;

this.client =
ClientBuilder.newBuilder()
.readTimeout(5, MINUTES)
.connectTimeout(1, MINUTES)
.register(new JacksonJsonProvider(new ObjectMapper()))
.build();

this.restClient = HTTPClient.builder(Map.of()).uri(endpoints.catalogApiEndpoint()).build();
}

public static PolarisManagementClient managementClient(PolarisApiEndpoints endpoints) {
return new PolarisManagementClient(endpoints);
}

/** This method should be used by test code to make top-level entity names. */
public String newEntityName(String hint) {
return polarisServerManager().transformEntityName(hint + "_" + clientId);
}

public ManagementApi managementApi(String authToken) {
return new ManagementApi(client, endpoints, authToken, endpoints.managementApiEndpoint());
}

public ManagementApi managementApi(ClientCredentials credentials) {
return managementApi(obtainToken(credentials));
}

/** Requests an access token from the Polaris server for the given {@link ClientCredentials}. */
public String obtainToken(ClientCredentials credentials) {
OAuthTokenResponse response =
OAuth2Util.fetchToken(
restClient.withAuthSession(AuthSession.EMPTY),
Map.of(),
String.format("%s:%s", credentials.clientId(), credentials.clientSecret()),
"PRINCIPAL_ROLE:ALL",
endpoints.catalogApiEndpoint() + "/v1/oauth/tokens",
Map.of("grant_type", "client_credentials"));
return response.token();
}

@Override
public void close() throws Exception {
client.close();
restClient.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public class SparkCatalogIcebergIT extends SparkCatalogBaseIT {
@Override
protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String catalogName) {
return builder
.config(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config(
String.format("spark.sql.catalog.%s", catalogName),
"org.apache.iceberg.spark.SparkCatalog")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,133 @@
*/
package org.apache.polaris.spark.quarkus.it;

import com.adobe.testing.s3mock.testcontainers.S3MockContainer;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.FormatMethod;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.DirectoryFileFilter;
import org.apache.commons.io.filefilter.FalseFileFilter;
import org.apache.polaris.service.it.ext.PolarisSparkIntegrationTestBase;
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
import org.apache.polaris.core.admin.model.Catalog;
import org.apache.polaris.core.admin.model.CatalogProperties;
import org.apache.polaris.core.admin.model.PolarisCatalog;
import org.apache.polaris.core.admin.model.StorageConfigInfo;
import org.apache.polaris.service.it.env.ClientCredentials;
import org.apache.polaris.service.it.env.IntegrationTestsHelper;
import org.apache.polaris.service.it.env.ManagementApi;
import org.apache.polaris.service.it.env.PolarisApiEndpoints;
import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.LoggerFactory;

public abstract class SparkIntegrationBase extends PolarisSparkIntegrationTestBase {
@ExtendWith(PolarisIntegrationTestExtension.class)
public abstract class SparkIntegrationBase {
protected static final S3MockContainer s3Container =
new S3MockContainer("3.11.0").withInitialBuckets("my-bucket,my-old-bucket");
protected static SparkSession spark;
protected PolarisApiEndpoints endpoints;
protected PolarisManagementClient client;
protected ManagementApi managementApi;
protected String catalogName;
protected String sparkToken;

protected URI warehouseDir;

@BeforeAll
public static void setup() throws IOException {
s3Container.start();
}

@AfterAll
public static void cleanup() {
s3Container.stop();
}

@BeforeEach
public void before(
PolarisApiEndpoints apiEndpoints, ClientCredentials credentials, @TempDir Path tempDir) {
endpoints = apiEndpoints;
client = PolarisManagementClient.managementClient(endpoints);
sparkToken = client.obtainToken(credentials);
managementApi = client.managementApi(credentials);

warehouseDir = IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve("spark-warehouse");

catalogName = client.newEntityName("spark_catalog");

AwsStorageConfigInfo awsConfigModel =
AwsStorageConfigInfo.builder()
.setRoleArn("arn:aws:iam::123456789012:role/my-role")
.setExternalId("externalId")
.setUserArn("userArn")
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
.setAllowedLocations(List.of("s3://my-old-bucket/path/to/data"))
.build();
CatalogProperties props = new CatalogProperties("s3://my-bucket/path/to/data");
props.putAll(
Map.of(
"table-default.s3.endpoint",
s3Container.getHttpEndpoint(),
"table-default.s3.path-style-access",
"true",
"table-default.s3.access-key-id",
"foo",
"table-default.s3.secret-access-key",
"bar",
"s3.endpoint",
s3Container.getHttpEndpoint(),
"s3.path-style-access",
"true",
"s3.access-key-id",
"foo",
"s3.secret-access-key",
"bar",
"polaris.config.drop-with-purge.enabled",
"true"));
Catalog catalog =
PolarisCatalog.builder()
.setType(Catalog.TypeEnum.INTERNAL)
.setName(catalogName)
.setProperties(props)
.setStorageConfigInfo(awsConfigModel)
.build();

managementApi.createCatalog(catalog);

SparkSession.Builder sessionBuilder =
SparkSession.builder()
.master("local[1]")
.config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config(
"spark.hadoop.fs.s3.aws.credentials.provider",
"org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider")
.config("spark.hadoop.fs.s3.access.key", "foo")
.config("spark.hadoop.fs.s3.secret.key", "bar")
.config("spark.ui.showConsoleProgress", false)
.config("spark.ui.enabled", "false");
spark = withCatalog(sessionBuilder, catalogName).getOrCreate();

onSpark("USE " + catalogName);
}

@Override
protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String catalogName) {
return builder
.config(
Expand All @@ -61,6 +171,38 @@ protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String
.config(String.format("spark.sql.catalog.%s.s3.region", catalogName), "us-west-2");
}

@AfterEach
public void after() throws Exception {
cleanupCatalog(catalogName);
try {
SparkSession.clearDefaultSession();
SparkSession.clearActiveSession();
spark.close();
} catch (Exception e) {
LoggerFactory.getLogger(getClass()).error("Unable to close spark session", e);
}

client.close();
}

protected void cleanupCatalog(String catalogName) {
onSpark("USE " + catalogName);
List<Row> namespaces = onSpark("SHOW NAMESPACES").collectAsList();
for (Row namespace : namespaces) {
List<Row> tables = onSpark("SHOW TABLES IN " + namespace.getString(0)).collectAsList();
for (Row table : tables) {
onSpark("DROP TABLE " + namespace.getString(0) + "." + table.getString(1));
}
List<Row> views = onSpark("SHOW VIEWS IN " + namespace.getString(0)).collectAsList();
for (Row view : views) {
onSpark("DROP VIEW " + namespace.getString(0) + "." + view.getString(1));
}
onSpark("DROP NAMESPACE " + namespace.getString(0));
}

managementApi.deleteCatalog(catalogName);
}

@FormatMethod
protected List<Object[]> sql(String query, Object... args) {
List<Row> rows = spark.sql(String.format(query, args)).collectAsList();
Expand Down Expand Up @@ -110,4 +252,8 @@ protected List<String> listDirs(String path) {
protected String generateName(String prefix) {
return prefix + "_" + UUID.randomUUID().toString().replaceAll("-", "");
}

protected static Dataset<Row> onSpark(@Language("SQL") String sql) {
return spark.sql(sql);
}
}
Loading
Loading