diff --git a/plugins/spark/README.md b/plugins/spark/README.md
index 1bdfe3dd70..adaccec70c 100644
--- a/plugins/spark/README.md
+++ b/plugins/spark/README.md
@@ -21,12 +21,16 @@
The Polaris Spark plugin provides a SparkCatalog class, which communicates with the Polaris
REST endpoints, and provides implementations for Apache Spark's
-[TableCatalog](https://github.com/apache/spark/blob/v3.5.6/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java),
-[ViewCatalog](https://github.com/apache/spark/blob/v3.5.6/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java) classes.
-[SupportsNamespaces](https://github.com/apache/spark/blob/v3.5.6/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java),
+- [TableCatalog](https://github.com/apache/spark/blob/v3.5.6/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java)
+- [ViewCatalog](https://github.com/apache/spark/blob/v3.5.6/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java)
+- [SupportsNamespaces](https://github.com/apache/spark/blob/v3.5.6/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java)
-Right now, the plugin only provides support for Spark 3.5, Scala version 2.12 and 2.13,
-and depends on iceberg-spark-runtime 1.9.1.
+Right now, the plugin only provides support for Spark 3.5, Scala version 2.12 and 2.13, and depends on iceberg-spark-runtime 1.9.1.
+
+The Polaris Spark client supports catalog management for both Iceberg and Delta tables. It routes all Iceberg table
+requests to the Iceberg REST endpoints and routes all Delta table requests to the Generic Table REST endpoints.
+
+The Spark Client requires at least delta 3.2.1 to work with Delta tables, which requires at least Apache Spark 3.5.3.
# Start Spark with local Polaris service using the Polaris Spark plugin
The following command starts a Polaris server for local testing, it runs on localhost:8181 with default
@@ -112,15 +116,16 @@ bin/spark-shell \
--conf spark.sql.sources.useV1SourceList=''
```
-# Limitations
-The Polaris Spark client supports catalog management for both Iceberg and Delta tables, it routes all Iceberg table
-requests to the Iceberg REST endpoints, and routes all Delta table requests to the Generic Table REST endpoints.
+# Current Limitations
+The following describes the current limitations of the Polaris Spark client:
-The Spark Client requires at least delta 3.2.1 to work with Delta tables, which requires at least Apache Spark 3.5.3.
-Following describes the current functionality limitations of the Polaris Spark client:
-1) Create table as select (CTAS) is not supported for Delta tables. As a result, the `saveAsTable` method of `Dataframe`
- is also not supported, since it relies on the CTAS support.
-2) Create a Delta table without explicit location is not supported.
-3) Rename a Delta table is not supported.
-4) ALTER TABLE ... SET LOCATION is not supported for DELTA table.
-5) For other non-Iceberg tables like csv, it is not supported today.
+## General Limitations
+1. The Polaris Spark client only supports Iceberg and Delta tables. It does not support other table formats like CSV, JSON, etc.
+2. Generic tables (non-Iceberg tables) do not currently support credential vending.
+
+## Delta Table Limitations
+1. Create table as select (CTAS) is not supported for Delta tables. As a result, the `saveAsTable` method of `Dataframe`
+ is also not supported, since it relies on the CTAS support.
+2. Create a Delta table without explicit location is not supported.
+3. Rename a Delta table is not supported.
+4. ALTER TABLE ... SET LOCATION is not supported for DELTA table.
diff --git a/plugins/spark/v3.5/getting-started/README.md b/plugins/spark/v3.5/getting-started/README.md
index 582bd177a8..dc52331653 100644
--- a/plugins/spark/v3.5/getting-started/README.md
+++ b/plugins/spark/v3.5/getting-started/README.md
@@ -20,7 +20,7 @@
# Getting Started with Apache Spark and Apache Polaris With Delta and Iceberg
This getting started guide provides a `docker-compose` file to set up [Apache Spark](https://spark.apache.org/) with Apache Polaris using
-the new Polaris Spark Client.
+the new Polaris Spark Client.
The Polaris Spark Client enables manage of both Delta and Iceberg tables using Apache Polaris.
@@ -48,7 +48,7 @@ To start the `docker-compose` file, run this command from the repo's root direct
docker-compose -f plugins/spark/v3.5/getting-started/docker-compose.yml up
```
-This will spin up 2 container services
+This will spin up 2 container services:
* The `polaris` service for running Apache Polaris using an in-memory metastore
* The `jupyter` service for running Jupyter notebook with PySpark
diff --git a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java
index 2974384247..09316ba124 100644
--- a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java
+++ b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java
@@ -28,6 +28,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Helper class for integrating Delta table functionality with Polaris Spark Catalog.
+ *
+ *
This class is responsible for dynamically loading and configuring a Delta Catalog
+ * implementation to work with Polaris. It sets up the Delta Catalog as a delegating catalog
+ * extension with Polaris Spark Catalog as the delegate, enabling Delta table operations through
+ * Polaris.
+ *
+ *
The class uses reflection to configure the Delta Catalog to behave identically to Unity
+ * Catalog, as the current Delta Catalog implementation is hardcoded for Unity Catalog. This is a
+ * temporary workaround until Delta extends support for other catalog implementations (see
+ * https://github.com/delta-io/delta/issues/4306).
+ */
public class DeltaHelper {
private static final Logger LOG = LoggerFactory.getLogger(DeltaHelper.class);
diff --git a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/PolarisRESTCatalogTest.java b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/PolarisRESTCatalogTest.java
new file mode 100644
index 0000000000..ee27dc84ec
--- /dev/null
+++ b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/PolarisRESTCatalogTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.polaris.core.rest.PolarisEndpoints;
+import org.apache.polaris.spark.rest.GenericTable;
+import org.apache.polaris.spark.rest.ListGenericTablesRESTResponse;
+import org.apache.polaris.spark.rest.LoadGenericTableRESTResponse;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+public class PolarisRESTCatalogTest {
+
+ private RESTClient mockClient;
+ private OAuth2Util.AuthSession mockAuthSession;
+ private PolarisRESTCatalog catalog;
+
+ @BeforeEach
+ public void setup() {
+ mockClient = mock(RESTClient.class);
+ mockAuthSession = mock(OAuth2Util.AuthSession.class);
+ when(mockAuthSession.headers()).thenReturn(ImmutableMap.of("Authorization", "Bearer token"));
+ when(mockClient.withAuthSession(any())).thenReturn(mockClient);
+
+ catalog = new PolarisRESTCatalog(config -> mockClient);
+ }
+
+ @Test
+ public void testInitializeWithDefaultEndpoints() {
+ ConfigResponse configResponse =
+ ConfigResponse.builder()
+ .withDefaults(ImmutableMap.of())
+ .withOverrides(ImmutableMap.of())
+ .build();
+
+ when(mockClient.get(any(), anyMap(), eq(ConfigResponse.class), anyMap(), any()))
+ .thenReturn(configResponse);
+
+ Map properties =
+ ImmutableMap.of(CatalogProperties.URI, "http://localhost:8181");
+
+ catalog.initialize(properties, mockAuthSession);
+
+ verify(mockClient).get(any(), anyMap(), eq(ConfigResponse.class), anyMap(), any());
+ }
+
+ @Test
+ public void testInitializeWithCustomEndpoints() {
+ ConfigResponse configResponse =
+ ConfigResponse.builder()
+ .withDefaults(ImmutableMap.of())
+ .withOverrides(ImmutableMap.of())
+ .withEndpoints(
+ ImmutableList.of(
+ PolarisEndpoints.V1_LIST_GENERIC_TABLES,
+ PolarisEndpoints.V1_CREATE_GENERIC_TABLE))
+ .build();
+
+ when(mockClient.get(any(), anyMap(), eq(ConfigResponse.class), anyMap(), any()))
+ .thenReturn(configResponse);
+
+ Map properties =
+ ImmutableMap.of(CatalogProperties.URI, "http://localhost:8181");
+
+ catalog.initialize(properties, mockAuthSession);
+
+ verify(mockClient).get(any(), anyMap(), eq(ConfigResponse.class), anyMap(), any());
+ }
+
+ @Test
+ public void testInitializeWithPageSize() {
+ ConfigResponse configResponse =
+ ConfigResponse.builder()
+ .withDefaults(ImmutableMap.of())
+ .withOverrides(ImmutableMap.of())
+ .build();
+
+ when(mockClient.get(any(), anyMap(), eq(ConfigResponse.class), anyMap(), any()))
+ .thenReturn(configResponse);
+
+ Map properties =
+ ImmutableMap.of(
+ CatalogProperties.URI,
+ "http://localhost:8181",
+ PolarisRESTCatalog.REST_PAGE_SIZE,
+ "10");
+
+ catalog.initialize(properties, mockAuthSession);
+
+ verify(mockClient).get(any(), anyMap(), eq(ConfigResponse.class), anyMap(), any());
+ }
+
+ @Test
+ public void testInitializeWithInvalidPageSize() {
+ ConfigResponse configResponse =
+ ConfigResponse.builder()
+ .withDefaults(ImmutableMap.of())
+ .withOverrides(ImmutableMap.of())
+ .build();
+
+ when(mockClient.get(any(), anyMap(), eq(ConfigResponse.class), anyMap(), any()))
+ .thenReturn(configResponse);
+
+ Map properties =
+ ImmutableMap.of(
+ CatalogProperties.URI,
+ "http://localhost:8181",
+ PolarisRESTCatalog.REST_PAGE_SIZE,
+ "-1");
+
+ assertThatThrownBy(() -> catalog.initialize(properties, mockAuthSession))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("must be a positive integer");
+ }
+
+ @Test
+ public void testInitializeWithNullConfig() {
+ assertThatThrownBy(() -> catalog.initialize(null, mockAuthSession))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid configuration: null");
+ }
+
+ @Test
+ public void testListGenericTables() {
+ initializeCatalog();
+
+ Namespace namespace = Namespace.of("test_ns");
+ TableIdentifier table1 = TableIdentifier.of(namespace, "table1");
+ TableIdentifier table2 = TableIdentifier.of(namespace, "table2");
+
+ ListGenericTablesRESTResponse response =
+ new ListGenericTablesRESTResponse(null, ImmutableSet.of(table1, table2));
+
+ when(mockClient.get(any(), anyMap(), eq(ListGenericTablesRESTResponse.class), anyMap(), any()))
+ .thenReturn(response);
+
+ List tables = catalog.listGenericTables(namespace);
+
+ assertThat(tables).hasSize(2);
+ assertThat(tables).contains(table1, table2);
+ }
+
+ @Test
+ public void testListGenericTablesWithPagination() {
+ ConfigResponse configResponse =
+ ConfigResponse.builder()
+ .withDefaults(ImmutableMap.of())
+ .withOverrides(ImmutableMap.of())
+ .build();
+
+ when(mockClient.get(any(), anyMap(), eq(ConfigResponse.class), anyMap(), any()))
+ .thenReturn(configResponse);
+
+ Map properties =
+ ImmutableMap.of(
+ CatalogProperties.URI, "http://localhost:8181", PolarisRESTCatalog.REST_PAGE_SIZE, "2");
+
+ catalog.initialize(properties, mockAuthSession);
+
+ Namespace namespace = Namespace.of("test_ns");
+ TableIdentifier table1 = TableIdentifier.of(namespace, "table1");
+ TableIdentifier table2 = TableIdentifier.of(namespace, "table2");
+ TableIdentifier table3 = TableIdentifier.of(namespace, "table3");
+
+ ListGenericTablesRESTResponse response1 =
+ new ListGenericTablesRESTResponse("page2", ImmutableSet.of(table1, table2));
+ ListGenericTablesRESTResponse response2 =
+ new ListGenericTablesRESTResponse(null, ImmutableSet.of(table3));
+
+ when(mockClient.get(any(), anyMap(), eq(ListGenericTablesRESTResponse.class), anyMap(), any()))
+ .thenReturn(response1, response2);
+
+ List tables = catalog.listGenericTables(namespace);
+
+ assertThat(tables).hasSize(3);
+ assertThat(tables).contains(table1, table2, table3);
+ }
+
+ @Test
+ public void testCreateGenericTable() {
+ initializeCatalog();
+
+ TableIdentifier identifier = TableIdentifier.of("test_ns", "test_table");
+ GenericTable table =
+ GenericTable.builder()
+ .setName("test_table")
+ .setFormat("delta")
+ .setBaseLocation("s3://bucket/path")
+ .setDoc("Test table")
+ .setProperties(ImmutableMap.of("key", "value"))
+ .build();
+
+ LoadGenericTableRESTResponse response = new LoadGenericTableRESTResponse(table);
+
+ when(mockClient.post(any(), any(), eq(LoadGenericTableRESTResponse.class), anyMap(), any()))
+ .thenReturn(response);
+
+ GenericTable result =
+ catalog.createGenericTable(
+ identifier, "delta", "s3://bucket/path", "Test table", ImmutableMap.of("key", "value"));
+
+ assertThat(result.getName()).isEqualTo("test_table");
+ assertThat(result.getFormat()).isEqualTo("delta");
+ assertThat(result.getBaseLocation()).isEqualTo("s3://bucket/path");
+ }
+
+ @Test
+ public void testLoadGenericTable() {
+ initializeCatalog();
+
+ TableIdentifier identifier = TableIdentifier.of("test_ns", "test_table");
+ GenericTable table = GenericTable.builder().setName("test_table").setFormat("delta").build();
+
+ LoadGenericTableRESTResponse response = new LoadGenericTableRESTResponse(table);
+
+ when(mockClient.get(any(), any(), eq(LoadGenericTableRESTResponse.class), anyMap(), any()))
+ .thenReturn(response);
+
+ GenericTable result = catalog.loadGenericTable(identifier);
+
+ assertThat(result.getName()).isEqualTo("test_table");
+ assertThat(result.getFormat()).isEqualTo("delta");
+ }
+
+ @Test
+ public void testDropGenericTableSuccess() {
+ initializeCatalog();
+
+ TableIdentifier identifier = TableIdentifier.of("test_ns", "test_table");
+
+ when(mockClient.delete(any(), any(), anyMap(), any())).thenReturn(null);
+
+ boolean result = catalog.dropGenericTable(identifier);
+
+ assertThat(result).isTrue();
+ }
+
+ @Test
+ public void testDropGenericTableNotFound() {
+ initializeCatalog();
+
+ TableIdentifier identifier = TableIdentifier.of("test_ns", "test_table");
+
+ when(mockClient.delete(any(), any(), anyMap(), any()))
+ .thenThrow(new NoSuchTableException("Table not found"));
+
+ boolean result = catalog.dropGenericTable(identifier);
+
+ assertThat(result).isFalse();
+ }
+
+ @Test
+ public void testFetchConfigWithWarehouseLocation() {
+ RESTClient client = mock(RESTClient.class);
+ Map headers = ImmutableMap.of("Authorization", "Bearer token");
+ Map properties =
+ ImmutableMap.of(
+ CatalogProperties.URI,
+ "http://localhost:8181",
+ CatalogProperties.WAREHOUSE_LOCATION,
+ "s3://warehouse");
+
+ ConfigResponse expectedResponse =
+ ConfigResponse.builder()
+ .withDefaults(ImmutableMap.of())
+ .withOverrides(ImmutableMap.of())
+ .build();
+
+ when(client.get(any(), anyMap(), eq(ConfigResponse.class), anyMap(), any()))
+ .thenReturn(expectedResponse);
+
+ ConfigResponse response = PolarisRESTCatalog.fetchConfig(client, headers, properties);
+
+ assertThat(response).isNotNull();
+
+ @SuppressWarnings("unchecked")
+ ArgumentCaptor