Skip to content

Commit 97a49e0

Browse files
authored
Spark: Add CreateTable and LoadTable implementation for SparkCatalog (#1303)
1 parent f76f969 commit 97a49e0

File tree

14 files changed

+1175
-37
lines changed

14 files changed

+1175
-37
lines changed

plugins/pluginlibs.versions.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,6 @@
2020
[versions]
2121
iceberg = "1.8.1"
2222
spark35 = "3.5.5"
23+
scala212 = "2.12.19"
24+
scala213 = "2.13.15"
25+

plugins/spark/v3.5/build.gradle.kts

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,34 @@ val scalaVersion = getAndUseScalaVersionForProject()
4141
val icebergVersion = pluginlibs.versions.iceberg.get()
4242
val spark35Version = pluginlibs.versions.spark35.get()
4343

44+
val scalaLibraryVersion =
45+
if (scalaVersion == "2.12") {
46+
pluginlibs.versions.scala212.get()
47+
} else {
48+
pluginlibs.versions.scala213.get()
49+
}
50+
4451
dependencies {
4552
implementation(project(":polaris-api-iceberg-service")) {
46-
// exclude the iceberg and jackson dependencies, use the
47-
// dependencies packed in the iceberg-spark dependency
53+
// exclude the iceberg dependencies, use the ones pulled
54+
// by iceberg-core
4855
exclude("org.apache.iceberg", "*")
49-
exclude("com.fasterxml.jackson.core", "*")
5056
}
57+
implementation(project(":polaris-api-catalog-service"))
58+
implementation(project(":polaris-core")) { exclude("org.apache.iceberg", "*") }
59+
60+
implementation("org.apache.iceberg:iceberg-core:${icebergVersion}")
5161

5262
implementation(
5363
"org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}"
54-
)
64+
) {
65+
// exclude the iceberg rest dependencies, use the ones pulled
66+
// with iceberg-core dependency
67+
exclude("org.apache.iceberg", "iceberg-core")
68+
}
5569

70+
compileOnly("org.scala-lang:scala-library:${scalaLibraryVersion}")
71+
compileOnly("org.scala-lang:scala-reflect:${scalaLibraryVersion}")
5672
compileOnly("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") {
5773
// exclude log4j dependencies
5874
exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
@@ -78,24 +94,65 @@ dependencies {
7894
}
7995
}
8096

97+
// TODO: replace the check using gradlew checkstyle plugin
98+
tasks.register("checkNoDisallowedImports") {
99+
doLast {
100+
// List of disallowed imports. Right now, we disallow usage of shaded or
101+
// relocated libraries in the iceberg spark runtime jar.
102+
val disallowedImports =
103+
listOf("import org.apache.iceberg.shaded.", "org.apache.iceberg.relocated.")
104+
105+
// Directory to scan for Java files
106+
val sourceDirs = listOf(file("src/main/java"), file("src/test/java"))
107+
108+
val violations = mutableListOf<String>()
109+
// Scan Java files in each directory
110+
sourceDirs.forEach { sourceDir ->
111+
fileTree(sourceDir)
112+
.matching {
113+
include("**/*.java") // Only include Java files
114+
}
115+
.forEach { file ->
116+
val content = file.readText()
117+
disallowedImports.forEach { importStatement ->
118+
if (content.contains(importStatement)) {
119+
violations.add(
120+
"Disallowed import found in ${file.relativeTo(projectDir)}: $importStatement"
121+
)
122+
}
123+
}
124+
}
125+
}
126+
127+
if (violations.isNotEmpty()) {
128+
throw GradleException("Disallowed imports found! $violations")
129+
}
130+
}
131+
}
132+
133+
tasks.named("check") { dependsOn("checkNoDisallowedImports") }
134+
81135
tasks.register<ShadowJar>("createPolarisSparkJar") {
82136
archiveClassifier = null
83137
archiveBaseName =
84138
"polaris-iceberg-${icebergVersion}-spark-runtime-${sparkMajorVersion}_${scalaVersion}"
85139
isZip64 = true
86140

87-
dependencies { exclude("META-INF/**") }
141+
mergeServiceFiles()
88142

89143
// pack both the source code and dependencies
90144
from(sourceSets.main.get().output)
91145
configurations = listOf(project.configurations.runtimeClasspath.get())
92146

93-
mergeServiceFiles()
94-
95147
// Optimization: Minimize the JAR (remove unused classes from dependencies)
96148
// The iceberg-spark-runtime plugin is always packaged along with our polaris-spark plugin,
97149
// therefore excluded from the optimization.
98-
minimize { exclude(dependency("org.apache.iceberg:iceberg-spark-runtime-*.*")) }
150+
minimize {
151+
exclude(dependency("org.apache.iceberg:iceberg-spark-runtime-*.*"))
152+
exclude(dependency("org.apache.iceberg:iceberg-core*.*"))
153+
}
154+
155+
relocate("com.fasterxml", "org.apache.polaris.shaded.com.fasterxml.jackson")
99156
}
100157

101158
tasks.withType(Jar::class).named("sourcesJar") { dependsOn("createPolarisSparkJar") }
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.spark;
20+
21+
import java.util.List;
22+
import java.util.Map;
23+
import org.apache.iceberg.catalog.Namespace;
24+
import org.apache.iceberg.catalog.TableIdentifier;
25+
import org.apache.polaris.service.types.GenericTable;
26+
27+
public interface PolarisCatalog {
28+
List<TableIdentifier> listGenericTables(Namespace ns);
29+
30+
GenericTable loadGenericTable(TableIdentifier identifier);
31+
32+
boolean dropGenericTable(TableIdentifier identifier);
33+
34+
GenericTable createGenericTable(
35+
TableIdentifier identifier, String format, String doc, Map<String, String> props);
36+
}
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.spark;
20+
21+
import com.google.common.base.Preconditions;
22+
import com.google.common.collect.ImmutableMap;
23+
import com.google.common.collect.ImmutableSet;
24+
import java.io.Closeable;
25+
import java.io.IOException;
26+
import java.io.UncheckedIOException;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.Set;
30+
import java.util.function.Function;
31+
import org.apache.iceberg.CatalogProperties;
32+
import org.apache.iceberg.catalog.Namespace;
33+
import org.apache.iceberg.catalog.TableIdentifier;
34+
import org.apache.iceberg.io.CloseableGroup;
35+
import org.apache.iceberg.rest.Endpoint;
36+
import org.apache.iceberg.rest.ErrorHandlers;
37+
import org.apache.iceberg.rest.HTTPClient;
38+
import org.apache.iceberg.rest.RESTClient;
39+
import org.apache.iceberg.rest.ResourcePaths;
40+
import org.apache.iceberg.rest.auth.OAuth2Util;
41+
import org.apache.iceberg.rest.responses.ConfigResponse;
42+
import org.apache.iceberg.util.EnvironmentUtil;
43+
import org.apache.polaris.core.rest.PolarisEndpoints;
44+
import org.apache.polaris.core.rest.PolarisResourcePaths;
45+
import org.apache.polaris.service.types.GenericTable;
46+
import org.apache.polaris.spark.rest.CreateGenericTableRESTRequest;
47+
import org.apache.polaris.spark.rest.LoadGenericTableRESTResponse;
48+
49+
/**
50+
* [[PolarisRESTCatalog]] talks to Polaris REST APIs, and implements the PolarisCatalog interfaces,
51+
* which are generic table related APIs at this moment. This class doesn't interact with any Spark
52+
* objects.
53+
*/
54+
public class PolarisRESTCatalog implements PolarisCatalog, Closeable {
55+
private final Function<Map<String, String>, RESTClient> clientBuilder;
56+
57+
private RESTClient restClient = null;
58+
private CloseableGroup closeables = null;
59+
private Set<Endpoint> endpoints;
60+
private OAuth2Util.AuthSession catalogAuth = null;
61+
private PolarisResourcePaths pathGenerator = null;
62+
63+
// the default endpoints to config if server doesn't specify the 'endpoints' configuration.
64+
private static final Set<Endpoint> DEFAULT_ENDPOINTS = PolarisEndpoints.GENERIC_TABLE_ENDPOINTS;
65+
66+
public PolarisRESTCatalog() {
67+
this(config -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build());
68+
}
69+
70+
public PolarisRESTCatalog(Function<Map<String, String>, RESTClient> clientBuilder) {
71+
this.clientBuilder = clientBuilder;
72+
}
73+
74+
public void initialize(Map<String, String> unresolved, OAuth2Util.AuthSession catalogAuth) {
75+
Preconditions.checkArgument(unresolved != null, "Invalid configuration: null");
76+
77+
// Resolve any configuration that is supplied by environment variables.
78+
// For example: if we have an entity ("key", "env:envVar") in the unresolved,
79+
// and envVar is configured to envValue in system env. After resolve, we got
80+
// entity ("key", "envValue").
81+
Map<String, String> props = EnvironmentUtil.resolveAll(unresolved);
82+
83+
// TODO: switch to use authManager once iceberg dependency is updated to 1.9.0
84+
this.catalogAuth = catalogAuth;
85+
86+
ConfigResponse config;
87+
try (RESTClient initClient = clientBuilder.apply(props).withAuthSession(catalogAuth)) {
88+
config = fetchConfig(initClient, catalogAuth.headers(), props);
89+
} catch (IOException e) {
90+
throw new UncheckedIOException("Failed to close HTTP client", e);
91+
}
92+
93+
// call getConfig to get the server configurations
94+
Map<String, String> mergedProps = config.merge(props);
95+
if (config.endpoints().isEmpty()) {
96+
this.endpoints = DEFAULT_ENDPOINTS;
97+
} else {
98+
this.endpoints = ImmutableSet.copyOf(config.endpoints());
99+
}
100+
101+
this.pathGenerator = PolarisResourcePaths.forCatalogProperties(mergedProps);
102+
this.restClient = clientBuilder.apply(mergedProps).withAuthSession(catalogAuth);
103+
104+
this.closeables = new CloseableGroup();
105+
this.closeables.addCloseable(this.restClient);
106+
this.closeables.setSuppressCloseFailure(true);
107+
}
108+
109+
protected static ConfigResponse fetchConfig(
110+
RESTClient client, Map<String, String> headers, Map<String, String> properties) {
111+
// send the client's warehouse location to the service to keep in sync
112+
// this is needed for cases where the warehouse is configured at client side,
113+
// and used by Polaris server as catalog name.
114+
ImmutableMap.Builder<String, String> queryParams = ImmutableMap.builder();
115+
if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
116+
queryParams.put(
117+
CatalogProperties.WAREHOUSE_LOCATION,
118+
properties.get(CatalogProperties.WAREHOUSE_LOCATION));
119+
}
120+
121+
ConfigResponse configResponse =
122+
client.get(
123+
ResourcePaths.config(),
124+
queryParams.build(),
125+
ConfigResponse.class,
126+
headers,
127+
ErrorHandlers.defaultErrorHandler());
128+
configResponse.validate();
129+
return configResponse;
130+
}
131+
132+
@Override
133+
public void close() throws IOException {
134+
if (closeables != null) {
135+
closeables.close();
136+
}
137+
}
138+
139+
@Override
140+
public List<TableIdentifier> listGenericTables(Namespace ns) {
141+
throw new UnsupportedOperationException("listTables not supported");
142+
}
143+
144+
@Override
145+
public boolean dropGenericTable(TableIdentifier identifier) {
146+
throw new UnsupportedOperationException("dropTable not supported");
147+
}
148+
149+
@Override
150+
public GenericTable createGenericTable(
151+
TableIdentifier identifier, String format, String doc, Map<String, String> props) {
152+
Endpoint.check(endpoints, PolarisEndpoints.V1_CREATE_GENERIC_TABLE);
153+
CreateGenericTableRESTRequest request =
154+
new CreateGenericTableRESTRequest(identifier.name(), format, doc, props);
155+
156+
LoadGenericTableRESTResponse response =
157+
restClient
158+
.withAuthSession(this.catalogAuth)
159+
.post(
160+
pathGenerator.genericTables(identifier.namespace()),
161+
request,
162+
LoadGenericTableRESTResponse.class,
163+
Map.of(),
164+
ErrorHandlers.tableErrorHandler());
165+
166+
return response.getTable();
167+
}
168+
169+
@Override
170+
public GenericTable loadGenericTable(TableIdentifier identifier) {
171+
Endpoint.check(endpoints, PolarisEndpoints.V1_LOAD_GENERIC_TABLE);
172+
LoadGenericTableRESTResponse response =
173+
restClient
174+
.withAuthSession(this.catalogAuth)
175+
.get(
176+
pathGenerator.genericTable(identifier),
177+
null,
178+
LoadGenericTableRESTResponse.class,
179+
Map.of(),
180+
ErrorHandlers.tableErrorHandler());
181+
182+
return response.getTable();
183+
}
184+
}

0 commit comments

Comments
 (0)