Skip to content
Merged
22 changes: 22 additions & 0 deletions plugins/pluginlibs.versions.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

[versions]
iceberg = "1.8.1"
spark35 = "3.5.5"
40 changes: 40 additions & 0 deletions plugins/spark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Polaris Spark Plugin

The Polaris Spark plugin provides a SparkCatalog class, which communicates with the Polaris
REST endpoints, and provides implementations for Apache Spark's
[TableCatalog](https://github.com/apache/spark/blob/v3.5.5/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java),
[SupportsNamespaces](https://github.com/apache/spark/blob/v3.5.5/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java),
[ViewCatalog](https://github.com/apache/spark/blob/v3.5.5/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java) classes.

Right now, the plugin only provides support for Spark 3.5, Scala version 2.12 and 2.13,
and depends on iceberg-spark-runtime 1.8.1.

# Build Plugin Jar
A task createPolarisSparkJar is added to build a jar for the Polaris Spark plugin, the jar is named as:
"polaris-iceberg-<iceberg_version>-spark-runtime-<spark_major_version>_<scala_version>.jar"

Building the Polaris project produces client jars for both Scala 2.12 and 2.13, and CI runs the Spark
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that is still correct, we are automatically building for all supported spark version and scala versions. All versions are added here https://github.com/apache/polaris/pull/1190/files#diff-5625e3601fa0ad3a6a2824239e5a2fde71c149597d31394f9224a08c24be7b9dR66

client tests for both Scala versions as well.

The Jar can also be built alone with a specific version using target `:polaris-spark-3.5_<scala_version>`. For example:
- `./gradlew :polaris-spark-3.5_2.12:createPolarisSparkJar` - Build a jar for the Polaris Spark plugin with scala version 2.12.
The result jar is located at plugins/spark/build/<scala_version>/libs after the build.
22 changes: 22 additions & 0 deletions plugins/spark/spark-scala.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

sparkVersions=3.5

scalaVersions=2.12,2.13
101 changes: 101 additions & 0 deletions plugins/spark/v3.5/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar

plugins {
id("polaris-client")
alias(libs.plugins.jandex)
}

fun getAndUseScalaVersionForProject(): String {
val sparkScala = project.name.split("-").last().split("_")

val scalaVersion = sparkScala[1]

// direct the build to build/<scalaVersion> to avoid potential collision problem
project.layout.buildDirectory.set(layout.buildDirectory.dir(scalaVersion).get())

return scalaVersion
}

// get version information
val sparkMajorVersion = "3.5"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this hard coded? Don't we have it defined above?

Copy link
Contributor Author

@gh-yzou gh-yzou Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sparkVersion here is used to define the spark version used by this build file, which is used only for spark version 3.5 (since it is under spark v3.5). For example

implementation(
    "org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}"

The definition above defines all sparkVersions and scala versions have to be added as the gradlew projects and build targets.

Another way we can do this is to parse the spark version from the project name like how I parse the scala version in function getAndUseScalaVersionForProject, which seems unnecessary since this build file is only used by spark 3.5.

val scalaVersion = getAndUseScalaVersionForProject()
val icebergVersion = pluginlibs.versions.iceberg.get()
val spark35Version = pluginlibs.versions.spark35.get()

dependencies {
implementation(project(":polaris-api-iceberg-service")) {
// exclude the iceberg and jackson dependencies, use the
// dependencies packed in the iceberg-spark dependency
exclude("org.apache.iceberg", "*")
exclude("com.fasterxml.jackson.core", "*")
}

implementation(
"org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}"
)

compileOnly("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") {
// exclude log4j dependencies
exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
exclude("org.apache.logging.log4j", "log4j-api")
exclude("org.apache.logging.log4j", "log4j-1.2-api")
exclude("org.slf4j", "jul-to-slf4j")
}

testImplementation(platform(libs.junit.bom))
testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation(libs.assertj.core)
testImplementation(libs.mockito.core)

testImplementation(
"org.apache.iceberg:iceberg-spark-runtime-3.5_${scalaVersion}:${icebergVersion}"
)
testImplementation("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") {
// exclude log4j dependencies
exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
exclude("org.apache.logging.log4j", "log4j-api")
exclude("org.apache.logging.log4j", "log4j-1.2-api")
exclude("org.slf4j", "jul-to-slf4j")
}
}

tasks.register<ShadowJar>("createPolarisSparkJar") {
archiveClassifier = null
archiveBaseName =
"polaris-iceberg-${icebergVersion}-spark-runtime-${sparkMajorVersion}_${scalaVersion}"
isZip64 = true

dependencies { exclude("META-INF/**") }

// pack both the source code and dependencies
from(sourceSets.main.get().output)
configurations = listOf(project.configurations.runtimeClasspath.get())

mergeServiceFiles()

// Optimization: Minimize the JAR (remove unused classes from dependencies)
// The iceberg-spark-runtime plugin is always packaged along with our polaris-spark plugin,
// therefore excluded from the optimization.
minimize { exclude(dependency("org.apache.iceberg:iceberg-spark-runtime-*.*")) }
}

tasks.withType(Jar::class).named("sourcesJar") { dependsOn("createPolarisSparkJar") }
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.spark;

import com.google.common.collect.ImmutableSet;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.*;
import org.apache.spark.sql.connector.catalog.*;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

public class SparkCatalog implements TableCatalog, SupportsNamespaces, ViewCatalog {
private static final Set<String> DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER);
private String catalogName = null;
private org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;

// TODO: Add Polaris Specific REST Catalog

@Override
public String name() {
return catalogName;
}

@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
}

@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
throw new UnsupportedOperationException("loadTable");
}

@Override
public Table createTable(
Identifier ident, StructType schema, Transform[] transforms, Map<String, String> properties)
throws TableAlreadyExistsException {
throw new UnsupportedOperationException("createTable");
}

@Override
public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
throw new UnsupportedOperationException("alterTable");
}

@Override
public boolean dropTable(Identifier ident) {
throw new UnsupportedOperationException("dropTable");
}

@Override
public void renameTable(Identifier from, Identifier to)
throws NoSuchTableException, TableAlreadyExistsException {
throw new UnsupportedOperationException("renameTable");
}

@Override
public Identifier[] listTables(String[] namespace) {
throw new UnsupportedOperationException("listTables");
}

@Override
public String[] defaultNamespace() {
throw new UnsupportedOperationException("defaultNamespace");
}

@Override
public String[][] listNamespaces() {
throw new UnsupportedOperationException("listNamespaces");
}

@Override
public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException {
throw new UnsupportedOperationException("listNamespaces");
}

@Override
public Map<String, String> loadNamespaceMetadata(String[] namespace)
throws NoSuchNamespaceException {
throw new UnsupportedOperationException("loadNamespaceMetadata");
}

@Override
public void createNamespace(String[] namespace, Map<String, String> metadata)
throws NamespaceAlreadyExistsException {
throw new UnsupportedOperationException("createNamespace");
}

@Override
public void alterNamespace(String[] namespace, NamespaceChange... changes)
throws NoSuchNamespaceException {
throw new UnsupportedOperationException("alterNamespace");
}

@Override
public boolean dropNamespace(String[] namespace, boolean cascade)
throws NoSuchNamespaceException {
throw new UnsupportedOperationException("dropNamespace");
}

@Override
public Identifier[] listViews(String... namespace) {
throw new UnsupportedOperationException("listViews");
}

@Override
public View loadView(Identifier ident) throws NoSuchViewException {
throw new UnsupportedOperationException("loadView");
}

@Override
public View createView(
Identifier ident,
String sql,
String currentCatalog,
String[] currentNamespace,
StructType schema,
String[] queryColumnNames,
String[] columnAliases,
String[] columnComments,
Map<String, String> properties)
throws ViewAlreadyExistsException, NoSuchNamespaceException {
throw new UnsupportedOperationException("createView");
}

@Override
public View alterView(Identifier ident, ViewChange... changes)
throws NoSuchViewException, IllegalArgumentException {
throw new UnsupportedOperationException("alterView");
}

@Override
public boolean dropView(Identifier ident) {
throw new UnsupportedOperationException("dropView");
}

@Override
public void renameView(Identifier fromIdentifier, Identifier toIdentifier)
throws NoSuchViewException, ViewAlreadyExistsException {
throw new UnsupportedOperationException("renameView");
}
}
Loading