Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions plugins/pluginlibs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@
[versions]
iceberg = "1.8.1"
spark35 = "3.5.5"
scala212 = "2.12.19"
scala213 = "2.13.15"

73 changes: 65 additions & 8 deletions plugins/spark/v3.5/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,34 @@ val scalaVersion = getAndUseScalaVersionForProject()
val icebergVersion = pluginlibs.versions.iceberg.get()
val spark35Version = pluginlibs.versions.spark35.get()

val scalaLibraryVersion =
if (scalaVersion == "2.12") {
pluginlibs.versions.scala212.get()
} else {
pluginlibs.versions.scala213.get()
}

dependencies {
implementation(project(":polaris-api-iceberg-service")) {
// exclude the iceberg and jackson dependencies, use the
// dependencies packed in the iceberg-spark dependency
// exclude the iceberg dependencies, use the ones pulled
// by iceberg-core
exclude("org.apache.iceberg", "*")
exclude("com.fasterxml.jackson.core", "*")
}
implementation(project(":polaris-api-catalog-service"))
implementation(project(":polaris-core")) { exclude("org.apache.iceberg", "*") }

implementation("org.apache.iceberg:iceberg-core:${icebergVersion}")

implementation(
"org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}"
)
) {
// exclude the iceberg rest dependencies, use the ones pulled
// with iceberg-core dependency
exclude("org.apache.iceberg", "iceberg-core")
}

compileOnly("org.scala-lang:scala-library:${scalaLibraryVersion}")
compileOnly("org.scala-lang:scala-reflect:${scalaLibraryVersion}")
compileOnly("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") {
// exclude log4j dependencies
exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
Expand All @@ -78,24 +94,65 @@ dependencies {
}
}

// TODO: replace the check using gradlew checkstyle plugin
tasks.register("checkNoDisallowedImports") {
doLast {
// List of disallowed imports. Right now, we disallow usage of shaded or
// relocated libraries in the iceberg spark runtime jar.
val disallowedImports =
listOf("import org.apache.iceberg.shaded.", "org.apache.iceberg.relocated.")

// Directory to scan for Java files
val sourceDirs = listOf(file("src/main/java"), file("src/test/java"))

val violations = mutableListOf<String>()
// Scan Java files in each directory
sourceDirs.forEach { sourceDir ->
fileTree(sourceDir)
.matching {
include("**/*.java") // Only include Java files
}
.forEach { file ->
val content = file.readText()
disallowedImports.forEach { importStatement ->
if (content.contains(importStatement)) {
violations.add(
"Disallowed import found in ${file.relativeTo(projectDir)}: $importStatement"
)
}
}
}
}

if (violations.isNotEmpty()) {
throw GradleException("Disallowed imports found! $violations")
}
}
}

tasks.named("check") { dependsOn("checkNoDisallowedImports") }

tasks.register<ShadowJar>("createPolarisSparkJar") {
archiveClassifier = null
archiveBaseName =
"polaris-iceberg-${icebergVersion}-spark-runtime-${sparkMajorVersion}_${scalaVersion}"
isZip64 = true

dependencies { exclude("META-INF/**") }
mergeServiceFiles()

// pack both the source code and dependencies
from(sourceSets.main.get().output)
configurations = listOf(project.configurations.runtimeClasspath.get())

mergeServiceFiles()

// Optimization: Minimize the JAR (remove unused classes from dependencies)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR, but why are we excluding the entire META_INF/** path? This is probably not OK from a legal standpoint. Also, there is no point in calling mergeServiceFiles() if in the end we exclude the entire META-INF/service/ folder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some dependency library seems including some signature file in the manifest, if i do not exclude them from packing the jar, i run into issues like following when loading with spark

java.lang.SecurityException: Invalid signature file digest for Manifest main attributes

so far i didn't find a good way to deal with this yet, wondering if you got any suggestions?

And yes, if i exclude all of them, mergeServiceFiles is pretty much no op

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it seems I just need to exclude the signature files META-INF/.SF, META-INF/.DSA and META-INF/*.RSA

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed all exclude, and repacked the package, for some reason, i don't run into any problem anymore. Removed all exclude, and we should be fine for now. We will come back to this if we run into problem in the future.

// The iceberg-spark-runtime plugin is always packaged along with our polaris-spark plugin,
// therefore excluded from the optimization.
minimize { exclude(dependency("org.apache.iceberg:iceberg-spark-runtime-*.*")) }
minimize {
exclude(dependency("org.apache.iceberg:iceberg-spark-runtime-*.*"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's surprising, do we want to include the entire Spark runtime jar in this plugin jar?

Why not use both jars side by side, e.g.

spark-sql \
    --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.apache.polaris:polaris-spark-3.5_2.12

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are couple of reason for now:

  1. the minimize optimization doesn't work for reflections, and iceberg runtime uses a lot of reflections today.
  2. the client side is compiled with particular version today, mix iceberg library with a different version may have some unknown behavior. For the first version, I plan to ship it with a particular version, and then we can work on better separation of the libraries in the future release.

exclude(dependency("org.apache.iceberg:iceberg-core*.*"))
}

relocate("com.fasterxml", "org.apache.polaris.shaded.com.fasterxml.jackson")
}

tasks.withType(Jar::class).named("sourcesJar") { dependsOn("createPolarisSparkJar") }
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.spark;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.polaris.service.types.GenericTable;

public interface PolarisCatalog {
List<TableIdentifier> listGenericTables(Namespace ns);

GenericTable loadGenericTable(TableIdentifier identifier);

boolean dropGenericTable(TableIdentifier identifier);

GenericTable createGenericTable(
TableIdentifier identifier, String format, String doc, Map<String, String> props);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.spark;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.rest.Endpoint;
import org.apache.iceberg.rest.ErrorHandlers;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTClient;
import org.apache.iceberg.rest.ResourcePaths;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.iceberg.util.EnvironmentUtil;
import org.apache.polaris.core.rest.PolarisEndpoints;
import org.apache.polaris.core.rest.PolarisResourcePaths;
import org.apache.polaris.service.types.GenericTable;
import org.apache.polaris.spark.rest.CreateGenericTableRESTRequest;
import org.apache.polaris.spark.rest.LoadGenericTableRESTResponse;

/**
* [[PolarisRESTCatalog]] talks to Polaris REST APIs, and implements the PolarisCatalog interfaces,
* which are generic table related APIs at this moment. This class doesn't interact with any Spark
* objects.
*/
public class PolarisRESTCatalog implements PolarisCatalog, Closeable {
private final Function<Map<String, String>, RESTClient> clientBuilder;

private RESTClient restClient = null;
private CloseableGroup closeables = null;
private Set<Endpoint> endpoints;
private OAuth2Util.AuthSession catalogAuth = null;
private PolarisResourcePaths pathGenerator = null;

// the default endpoints to config if server doesn't specify the 'endpoints' configuration.
private static final Set<Endpoint> DEFAULT_ENDPOINTS = PolarisEndpoints.GENERIC_TABLE_ENDPOINTS;

public PolarisRESTCatalog() {
this(config -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build());
}

public PolarisRESTCatalog(Function<Map<String, String>, RESTClient> clientBuilder) {
this.clientBuilder = clientBuilder;
}

public void initialize(Map<String, String> unresolved, OAuth2Util.AuthSession catalogAuth) {
Preconditions.checkArgument(unresolved != null, "Invalid configuration: null");

// Resolve any configuration that is supplied by environment variables.
// For example: if we have an entity ("key", "env:envVar") in the unresolved,
// and envVar is configured to envValue in system env. After resolve, we got
// entity ("key", "envValue").
Map<String, String> props = EnvironmentUtil.resolveAll(unresolved);

// TODO: switch to use authManager once iceberg dependency is updated to 1.9.0
this.catalogAuth = catalogAuth;

ConfigResponse config;
try (RESTClient initClient = clientBuilder.apply(props).withAuthSession(catalogAuth)) {
config = fetchConfig(initClient, catalogAuth.headers(), props);
} catch (IOException e) {
throw new UncheckedIOException("Failed to close HTTP client", e);
}

// call getConfig to get the server configurations
Map<String, String> mergedProps = config.merge(props);
if (config.endpoints().isEmpty()) {
this.endpoints = DEFAULT_ENDPOINTS;
} else {
this.endpoints = ImmutableSet.copyOf(config.endpoints());
}

this.pathGenerator = PolarisResourcePaths.forCatalogProperties(mergedProps);
this.restClient = clientBuilder.apply(mergedProps).withAuthSession(catalogAuth);

this.closeables = new CloseableGroup();
this.closeables.addCloseable(this.restClient);
this.closeables.setSuppressCloseFailure(true);
}

protected static ConfigResponse fetchConfig(
RESTClient client, Map<String, String> headers, Map<String, String> properties) {
// send the client's warehouse location to the service to keep in sync
// this is needed for cases where the warehouse is configured at client side,
// and used by Polaris server as catalog name.
ImmutableMap.Builder<String, String> queryParams = ImmutableMap.builder();
if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
queryParams.put(
CatalogProperties.WAREHOUSE_LOCATION,
properties.get(CatalogProperties.WAREHOUSE_LOCATION));
}

ConfigResponse configResponse =
client.get(
ResourcePaths.config(),
queryParams.build(),
ConfigResponse.class,
headers,
ErrorHandlers.defaultErrorHandler());
configResponse.validate();
return configResponse;
}

@Override
public void close() throws IOException {
if (closeables != null) {
closeables.close();
}
}

@Override
public List<TableIdentifier> listGenericTables(Namespace ns) {
throw new UnsupportedOperationException("listTables not supported");
}

@Override
public boolean dropGenericTable(TableIdentifier identifier) {
throw new UnsupportedOperationException("dropTable not supported");
}

@Override
public GenericTable createGenericTable(
TableIdentifier identifier, String format, String doc, Map<String, String> props) {
Endpoint.check(endpoints, PolarisEndpoints.V1_CREATE_GENERIC_TABLE);
CreateGenericTableRESTRequest request =
new CreateGenericTableRESTRequest(identifier.name(), format, doc, props);

LoadGenericTableRESTResponse response =
restClient
.withAuthSession(this.catalogAuth)
.post(
pathGenerator.genericTables(identifier.namespace()),
request,
LoadGenericTableRESTResponse.class,
Map.of(),
ErrorHandlers.tableErrorHandler());

return response.getTable();
}

@Override
public GenericTable loadGenericTable(TableIdentifier identifier) {
Endpoint.check(endpoints, PolarisEndpoints.V1_LOAD_GENERIC_TABLE);
LoadGenericTableRESTResponse response =
restClient
.withAuthSession(this.catalogAuth)
.get(
pathGenerator.genericTable(identifier),
null,
LoadGenericTableRESTResponse.class,
Map.of(),
ErrorHandlers.tableErrorHandler());

return response.getTable();
}
}
Loading