diff --git a/plugins/spark/README.md b/plugins/spark/README.md index 6386a914c0..0340ea9b7c 100644 --- a/plugins/spark/README.md +++ b/plugins/spark/README.md @@ -30,11 +30,66 @@ and depends on iceberg-spark-runtime 1.8.1. # Build Plugin Jar A task createPolarisSparkJar is added to build a jar for the Polaris Spark plugin, the jar is named as: -"polaris-iceberg--spark-runtime-_.jar" +The result jar is located at plugins/spark/v3.5/build//libs after the build. -Building the Polaris project produces client jars for both Scala 2.12 and 2.13, and CI runs the Spark -client tests for both Scala versions as well. +# Start Spark with Local Polaris Service using built Jar +Once the jar is built, we can manually test it with Spark and a local Polaris service. -The Jar can also be built alone with a specific version using target `:polaris-spark-3.5_`. For example: -- `./gradlew :polaris-spark-3.5_2.12:createPolarisSparkJar` - Build a jar for the Polaris Spark plugin with scala version 2.12. -The result jar is located at plugins/spark/build//libs after the build. +The following command starts a Polaris server for local testing, it runs on localhost:8181 with default +realm `POLARIS` and root credentials `root:secret`: +```shell +./gradlew run +``` + +Once the local server is running, the following command can be used to start the spark-shell with the built Spark client +jar, and to use the local Polaris server as a Catalog. + +```shell +bin/spark-shell \ +--jars \ +--packages org.apache.hadoop:hadoop-aws:3.4.0,io.delta:delta-spark_2.12:3.3.1 \ +--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension \ +--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \ +--conf spark.sql.catalog..warehouse= \ +--conf spark.sql.catalog..header.X-Iceberg-Access-Delegation=true \ +--conf spark.sql.catalog.=org.apache.polaris.spark.SparkCatalog \ +--conf spark.sql.catalog..uri=http://localhost:8181/api/catalog \ +--conf spark.sql.catalog..credential="root:secret" \ +--conf spark.sql.catalog..scope='PRINCIPAL_ROLE:ALL' \ +--conf spark.sql.catalog..token-refresh-enabled=true \ +--conf spark.sql.catalog..type=rest \ +--conf spark.sql.sources.useV1SourceList='' +``` + +Assume the path to the built Spark client jar is +`/polaris/plugins/spark/v3.5/spark/build/2.12/libs/polaris-iceberg-1.8.1-spark-runtime-3.5_2.12-0.10.0-beta-incubating-SNAPSHOT.jar` +and the name of the catalog is `polaris`. The cli command will look like following: + +```shell +bin/spark-shell \ +--jars /polaris/plugins/spark/v3.5/spark/build/2.12/libs/polaris-iceberg-1.8.1-spark-runtime-3.5_2.12-0.10.0-beta-incubating-SNAPSHOT.jar \ +--packages org.apache.hadoop:hadoop-aws:3.4.0,io.delta:delta-spark_2.12:3.3.1 \ +--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension \ +--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \ +--conf spark.sql.catalog.polaris.warehouse= \ +--conf spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation=true \ +--conf spark.sql.catalog.polaris=org.apache.polaris.spark.SparkCatalog \ +--conf spark.sql.catalog.polaris.uri=http://localhost:8181/api/catalog \ +--conf spark.sql.catalog.polaris.credential="root:secret" \ +--conf spark.sql.catalog.polaris.scope='PRINCIPAL_ROLE:ALL' \ +--conf spark.sql.catalog.polaris.token-refresh-enabled=true \ +--conf spark.sql.catalog.polaris.type=rest \ +--conf spark.sql.sources.useV1SourceList='' +``` + +# Limitations +The Polaris Spark client supports catalog management for both Iceberg and Delta tables, it routes all Iceberg table +requests to the Iceberg REST endpoints, and routes all Delta table requests to the Generic Table REST endpoints. + +Following describes the current limitations of the Polaris Spark client: +1) Create table as select (CTAS) is not supported for Delta tables. As a result, the `saveAsTable` method of `Dataframe` + is also not supported, since it relies on the CTAS support. +2) Create a Delta table without explicit location is not supported. +3) Rename a Delta table is not supported. +4) ALTER TABLE ... SET LOCATION/SET FILEFORMAT/ADD PARTITION is not supported for DELTA table. +5) For other non-iceberg tables like csv, there is no specific guarantee provided today. diff --git a/plugins/spark/v3.5/getting-started/README.md b/plugins/spark/v3.5/getting-started/README.md new file mode 100644 index 0000000000..edad58ba3a --- /dev/null +++ b/plugins/spark/v3.5/getting-started/README.md @@ -0,0 +1,78 @@ + + +# Getting Started with Apache Spark and Apache Polaris With Delta and Iceberg + +This getting started guide provides a `docker-compose` file to set up [Apache Spark](https://spark.apache.org/) with Apache Polaris using +the new Polaris Spark Client. + +The Polaris Spark Client enables manage of both Delta and Iceberg tables using Apache Polaris. + +A Jupyter notebook is started to run PySpark, and Polaris Python client is also installed to call Polaris APIs +directly through Python Client. + +## Build the Spark Client Jar and Polaris image +If Spark Client Jar is not presented locally under plugins/spark/v3.5/build//libs, please build the jar +using +- `./gradlew assemble` -- build the Polaris project and skip the tests. + +If a Polaris image is not already present locally, build one with the following command: + +```shell +./gradlew \ + :polaris-quarkus-server:assemble \ + :polaris-quarkus-server:quarkusAppPartsBuild --rerun \ + -Dquarkus.container-image.build=true +``` + +## Run the `docker-compose` file + +To start the `docker-compose` file, run this command from the repo's root directory: +```shell +docker-compose -f plugins/spark/v3.5/getting-started/docker-compose.yml up +``` + +This will spin up 2 container services +* The `polaris` service for running Apache Polaris using an in-memory metastore +* The `jupyter` service for running Jupyter notebook with PySpark + +NOTE: Starting the container first time may take a couple of minutes, because it will need to download the Spark 3.5.5. +When working with Delta, the Polaris Spark Client requires delta-io >= 3.2.1, and it requires at least Spark 3.5.3, +but the current jupyter Spark image only support Spark 3.5.0. + +### Run with AWS access setup +If you want to interact with S3 bucket, make sure you have the following environment variables setup correctly in +your local env before running the `docker-compose` file. +``` +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= +``` + +## Access the Jupyter notebook interface +In the Jupyter notebook container log, look for the URL to access the Jupyter notebook. The url should be in the +format, `http://127.0.0.1:8888/lab?token=`. + +Open the Jupyter notebook in a browser. +Navigate to [`notebooks/SparkPolaris.ipynb`](http://127.0.0.1:8888/lab/tree/notebooks/SparkPolaris.ipynb) + +If the above url doesn't work, try to replace `127.0.0.1` with `localhost`, for example: +`http://localhost:8888/lab?token=`. + +## Run the Jupyter notebook +You can now run all cells in the notebook or write your own code! diff --git a/plugins/spark/v3.5/getting-started/docker-compose.yml b/plugins/spark/v3.5/getting-started/docker-compose.yml new file mode 100644 index 0000000000..6dbcc65c6c --- /dev/null +++ b/plugins/spark/v3.5/getting-started/docker-compose.yml @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +services: + polaris: + image: apache/polaris:latest + ports: + - "8181:8181" + - "8182" + environment: + AWS_REGION: us-west-2 + AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID + AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY + POLARIS_BOOTSTRAP_CREDENTIALS: default-realm,root,s3cr3t + polaris.realm-context.realms: default-realm + quarkus.otel.sdk.disabled: "true" + healthcheck: + test: ["CMD", "curl", "http://localhost:8182/healthcheck"] + interval: 10s + timeout: 10s + retries: 5 + jupyter: + build: + context: ../../../../ # this is needed to get the ./client + dockerfile: ./plugins/spark/v3.5/getting-started/notebooks/Dockerfile + network: host + ports: + - "8888:8888" + depends_on: + polaris: + condition: service_healthy + environment: + AWS_REGION: us-west-2 + AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID + AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY + POLARIS_HOST: polaris + volumes: + - ./notebooks:/home/jovyan/notebooks diff --git a/plugins/spark/v3.5/getting-started/notebooks/Dockerfile b/plugins/spark/v3.5/getting-started/notebooks/Dockerfile new file mode 100644 index 0000000000..2af9412c60 --- /dev/null +++ b/plugins/spark/v3.5/getting-started/notebooks/Dockerfile @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +FROM jupyter/all-spark-notebook:spark-3.5.0 + +ENV LANGUAGE='en_US:en' + +USER root + +# Generic table support requires delta 3.2.1 +# Install Spark 3.5.5 +RUN wget -q https://archive.apache.org/dist/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz \ + && tar -xzf spark-3.5.5-bin-hadoop3.tgz \ + && mv spark-3.5.5-bin-hadoop3 /opt/spark \ + && rm spark-3.5.5-bin-hadoop3.tgz + +# Set environment variables +ENV SPARK_HOME=/opt/spark +ENV PATH=$SPARK_HOME/bin:$PATH + +USER jovyan + +COPY --chown=jovyan client /home/jovyan/client +COPY --chown=jovyan regtests/requirements.txt /tmp +COPY --chown=jovyan plugins/spark/v3.5/spark/build/2.12/libs /home/jovyan/polaris_libs +RUN pip install -r /tmp/requirements.txt +RUN cd client/python && poetry lock && \ + python3 -m poetry install && \ + pip install -e . + +WORKDIR /home/jovyan/ diff --git a/plugins/spark/v3.5/getting-started/notebooks/SparkPolaris.ipynb b/plugins/spark/v3.5/getting-started/notebooks/SparkPolaris.ipynb new file mode 100644 index 0000000000..ad32424b8b --- /dev/null +++ b/plugins/spark/v3.5/getting-started/notebooks/SparkPolaris.ipynb @@ -0,0 +1,851 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "f4ab2c33-c072-49e9-93de-da24759113f7", + "metadata": {}, + "source": [ + "# Bootstrap the client with ROOT credentials\n", + "Using the python client generated from our OpenAPI spec, we generate a token from our root user's credentials" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "f982815a-2b48-46ab-96a6-20dad7ec1420", + "metadata": {}, + "outputs": [], + "source": [ + "from polaris.catalog.api.iceberg_catalog_api import IcebergCatalogAPI\n", + "from polaris.catalog.api.iceberg_o_auth2_api import IcebergOAuth2API\n", + "from polaris.catalog.api_client import ApiClient as CatalogApiClient\n", + "from polaris.catalog.api_client import Configuration as CatalogApiClientConfiguration\n", + "\n", + "polaris_credential = 'root:s3cr3t' # pragma: allowlist secret\n", + "\n", + "client_id, client_secret = polaris_credential.split(\":\")\n", + "client = CatalogApiClient(CatalogApiClientConfiguration(username=client_id,\n", + " password=client_secret,\n", + " host='http://polaris:8181/api/catalog'))\n", + "\n", + "oauth_api = IcebergOAuth2API(client)\n", + "token = oauth_api.get_token(scope='PRINCIPAL_ROLE:ALL',\n", + " client_id=client_id,\n", + " client_secret=client_secret,\n", + " grant_type='client_credentials',\n", + " _headers={'realm': 'default-realm'})\n" + ] + }, + { + "cell_type": "markdown", + "id": "4c21f4a1-4129-4dd8-9a6c-fa6eeabfa56e", + "metadata": {}, + "source": [ + "# Create our first catalog\n", + "\n", + "* Creates a catalog named `polaris_catalog` that writes to a specified location in the Local Filesystem." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "0f7a311a-9a55-4ff7-a40e-db3c74c53b9b", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "PolarisCatalog(type='INTERNAL', name='polaris_demo', properties=CatalogProperties(default_base_location='file:///tmp/polaris/', additional_properties={}), create_timestamp=1745882018864, last_update_timestamp=1745882018864, entity_version=1, storage_config_info=FileStorageConfigInfo(storage_type='FILE', allowed_locations=['file:///tmp', 'file:///tmp/polaris/']))" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from polaris.management import *\n", + "\n", + "client = ApiClient(Configuration(access_token=token.access_token,\n", + " host='http://polaris:8181/api/management/v1'))\n", + "root_client = PolarisDefaultApi(client)\n", + "\n", + "storage_conf = FileStorageConfigInfo(storage_type=\"FILE\", allowed_locations=[\"file:///tmp\"])\n", + "catalog_name = 'polaris_demo'\n", + "catalog = Catalog(name=catalog_name, type='INTERNAL', properties={\"default-base-location\": \"file:///tmp/polaris/\"},\n", + " storage_config_info=storage_conf)\n", + "catalog.storage_config_info = storage_conf\n", + "root_client.create_catalog(create_catalog_request=CreateCatalogRequest(catalog=catalog))\n", + "resp = root_client.get_catalog(catalog_name=catalog.name)\n", + "resp" + ] + }, + { + "cell_type": "markdown", + "id": "6521039f-c25d-4baa-96ae-a4408c0fced0", + "metadata": {}, + "source": [ + "# Utility Functions" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "e3e42c12-4e01-4577-bdf5-90c2704a5de8", + "metadata": {}, + "outputs": [], + "source": [ + "# Creates a principal with the given name\n", + "def create_principal(api, principal_name):\n", + " principal = Principal(name=principal_name, type=\"SERVICE\")\n", + " try:\n", + " principal_result = api.create_principal(CreatePrincipalRequest(principal=principal))\n", + " return principal_result\n", + " except ApiException as e:\n", + " if e.status == 409:\n", + " return api.rotate_credentials(principal_name=principal_name)\n", + " else:\n", + " raise e\n", + "\n", + "# Create a catalog role with the given name\n", + "def create_catalog_role(api, catalog, role_name):\n", + " catalog_role = CatalogRole(name=role_name)\n", + " try:\n", + " api.create_catalog_role(catalog_name=catalog.name, create_catalog_role_request=CreateCatalogRoleRequest(catalog_role=catalog_role))\n", + " return api.get_catalog_role(catalog_name=catalog.name, catalog_role_name=role_name)\n", + " except ApiException as e:\n", + " return api.get_catalog_role(catalog_name=catalog.name, catalog_role_name=role_name)\n", + " else:\n", + " raise e\n", + "\n", + "# Create a principal role with the given name\n", + "def create_principal_role(api, role_name):\n", + " principal_role = PrincipalRole(name=role_name)\n", + " try:\n", + " api.create_principal_role(CreatePrincipalRoleRequest(principal_role=principal_role))\n", + " return api.get_principal_role(principal_role_name=role_name)\n", + " except ApiException as e:\n", + " return api.get_principal_role(principal_role_name=role_name)\n" + ] + }, + { + "cell_type": "markdown", + "id": "15c250ca-7161-418e-bc52-8bbd88a3e57c", + "metadata": {}, + "source": [ + "# Create a new Principal, Principal Role, and Catalog Role\n", + "The new Principal belongs to the `engineer` principal role, which has `CATALOG_MANAGE_CONTENT` privileges on the `polaris_catalog`. \n", + "\n", + "\n", + "`CATALOG_MANAGE_CONTENT` has create/list/read/write privileges on all entities within the catalog. The same privilege could be granted to a namespace, in which case, the engineers could create/list/read/write any entity under that namespace" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "c5ceb5ca-f977-46c7-b2a6-07dda59e8a8b", + "metadata": {}, + "outputs": [], + "source": [ + "# Create the engineer_principal\n", + "engineer_principal = create_principal(root_client, \"principle\")\n", + "\n", + "# Create the principal role\n", + "engineer_role = create_principal_role(root_client, \"engineer\")\n", + "\n", + "# Create the catalog role\n", + "manager_catalog_role = create_catalog_role(root_client, catalog, \"manage_catalog\")\n", + "\n", + "# Grant the catalog role to the principal role\n", + "# All principals in the principal role have the catalog role's privileges\n", + "root_client.assign_catalog_role_to_principal_role(principal_role_name=engineer_role.name,\n", + " catalog_name=catalog.name,\n", + " grant_catalog_role_request=GrantCatalogRoleRequest(catalog_role=manager_catalog_role))\n", + "\n", + "# Assign privileges to the catalog role\n", + "# Here, we grant CATALOG_MANAGE_CONTENT\n", + "root_client.add_grant_to_catalog_role(catalog.name, manager_catalog_role.name,\n", + " AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,\n", + " type='catalog',\n", + " privilege=CatalogPrivilege.CATALOG_MANAGE_CONTENT)))\n", + "\n", + "# Assign the principal role to the principal\n", + "root_client.assign_principal_role(engineer_principal.principal.name, grant_principal_role_request=GrantPrincipalRoleRequest(principal_role=engineer_role))" + ] + }, + { + "cell_type": "markdown", + "id": "8a04cf15-a327-4ab9-a083-6da4e7dd1623", + "metadata": {}, + "source": [ + "# Create a reader Principal, Principal Role, and Catalog Role\n", + "This new principal belongs to the `product_manager` principal role, which is explicitly granted read and list permissions on the catalog.\n", + "\n", + "Permissions cascade, so permissions granted at the catalog level are inherited by namespaces and tables within the catalog." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "b51a6433-99c9-46c5-a855-928e30bad6e5", + "metadata": {}, + "outputs": [], + "source": [ + "# Create a reader principal\n", + "reader_principal = create_principal(root_client, \"mlee\")\n", + "\n", + "# Create the principal role\n", + "pm_role = create_principal_role(root_client, \"product_manager\")\n", + "\n", + "# Create the catalog role\n", + "read_only_role = create_catalog_role(root_client, catalog, \"read_only\")\n", + "\n", + "# Grant the catalog role to the principal role\n", + "root_client.assign_catalog_role_to_principal_role(principal_role_name=pm_role.name,\n", + " catalog_name=catalog.name,\n", + " grant_catalog_role_request=GrantCatalogRoleRequest(catalog_role=read_only_role))\n", + "\n", + "# Assign privileges to the catalog role\n", + "# Here, the catalog role is granted READ and LIST privileges at the catalog level\n", + "# Privileges cascade down\n", + "root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,\n", + " AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,\n", + " type='catalog',\n", + " privilege=CatalogPrivilege.TABLE_LIST)))\n", + "root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,\n", + " AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,\n", + " type='catalog',\n", + " privilege=CatalogPrivilege.TABLE_READ_PROPERTIES)))\n", + "root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,\n", + " AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,\n", + " type='catalog',\n", + " privilege=CatalogPrivilege.TABLE_READ_DATA)))\n", + "root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,\n", + " AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,\n", + " type='catalog',\n", + " privilege=CatalogPrivilege.VIEW_LIST)))\n", + "root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,\n", + " AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,\n", + " type='catalog',\n", + " privilege=CatalogPrivilege.VIEW_READ_PROPERTIES)))\n", + "root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,\n", + " AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,\n", + " type='catalog',\n", + " privilege=CatalogPrivilege.NAMESPACE_READ_PROPERTIES)))\n", + "root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,\n", + " AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,\n", + " type='catalog',\n", + " privilege=CatalogPrivilege.NAMESPACE_LIST)))\n", + "\n", + "# Assign the principal role to the principal\n", + "root_client.assign_principal_role(reader_principal.principal.name, grant_principal_role_request=GrantPrincipalRoleRequest(principal_role=pm_role))" + ] + }, + { + "cell_type": "markdown", + "id": "14c1e2b3-a0d4-49b5-8e1e-ddb43f98b115", + "metadata": {}, + "source": [ + "# Create a Spark session with the engineer credentials\n", + "\n", + "* Catalog URI points to our Polaris installation\n", + "* Credential set using the client_id and client_secret generated for the principal\n", + "* Scope set to `PRINCIPAL_ROLE:ALL`\n", + "* `X-Iceberg-Access-Delegation` is set to vended-credentials" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "fd13f24b-9d59-470d-9be1-660c22dde680", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "\n", + "spark = (SparkSession.builder\n", + " .config(\"spark.jars\", \"../polaris_libs/polaris-iceberg-1.8.1-spark-runtime-3.5_2.12-0.11.0-beta-incubating-SNAPSHOT.jar\")\n", + " .config(\"spark.jars.packages\", \"org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-spark_2.12:3.2.1\")\n", + " .config(\"spark.sql.catalog.spark_catalog\", \"org.apache.spark.sql.delta.catalog.DeltaCatalog\")\n", + " .config('spark.sql.iceberg.vectorization.enabled', 'false')\n", + "\n", + " .config(\"spark.sql.extensions\", \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension\")\n", + " # Configure the 'polaris' catalog as an Iceberg rest catalog\n", + " .config(\"spark.sql.catalog.polaris\", \"org.apache.polaris.spark.SparkCatalog\")\n", + " # Specify the rest catalog endpoint \n", + " .config(\"spark.sql.catalog.polaris.uri\", \"http://polaris:8181/api/catalog\")\n", + " # Enable token refresh\n", + " .config(\"spark.sql.catalog.polaris.token-refresh-enabled\", \"true\")\n", + " # specify the client_id:client_secret pair\n", + " .config(\"spark.sql.catalog.polaris.credential\", f\"{engineer_principal.credentials.client_id}:{engineer_principal.credentials.client_secret}\")\n", + "\n", + " # Set the warehouse to the name of the catalog we created\n", + " .config(\"spark.sql.catalog.polaris.warehouse\", catalog_name)\n", + "\n", + " # Scope set to PRINCIPAL_ROLE:ALL\n", + " .config(\"spark.sql.catalog.polaris.scope\", 'PRINCIPAL_ROLE:ALL')\n", + "\n", + " # Enable access credential delegation\n", + " .config(\"spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation\", 'vended-credentials')\n", + "\n", + " # AWS configuration\n", + " .config(\"spark.hadoop.fs.s3.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\")\n", + "\n", + " .config(\"spark.sql.catalog.polaris.io-impl\", \"org.apache.iceberg.io.ResolvingFileIO\")\n", + " .config(\"spark.sql.catalog.polaris.s3.region\", \"us-west-2\")\n", + " .config(\"spark.history.fs.logDirectory\", \"/home/iceberg/spark-events\")).getOrCreate()\n" + ] + }, + { + "cell_type": "markdown", + "id": "f1cfef99-4a52-433b-ac1d-c92db5f396a3", + "metadata": {}, + "source": [ + "# USE polaris\n", + "Tell Spark to use the Polaris catalog" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "72e9e5fb-b22e-4d38-bb1e-4ca78c0d0f3e", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---------+\n", + "|namespace|\n", + "+---------+\n", + "+---------+\n", + "\n" + ] + } + ], + "source": [ + "spark.sql(\"USE polaris\")\n", + "spark.sql(\"SHOW NAMESPACES\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "6b6c5a4a-d469-4364-9249-1a4aeb4d560c", + "metadata": {}, + "source": [ + "# Create Nested Namespaces" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "54159ab2-5964-49a0-8202-a4b64ee4f9e7", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---------------+\n", + "| namespace|\n", + "+---------------+\n", + "|DELTA_NS.PUBLIC|\n", + "+---------------+\n", + "\n" + ] + } + ], + "source": [ + "spark.sql(\"CREATE NAMESPACE IF NOT EXISTS DELTA_NS\")\n", + "spark.sql(\"CREATE NAMESPACE IF NOT EXISTS DELTA_NS.PUBLIC\")\n", + "spark.sql(\"SHOW NAMESPACES IN DELTA_NS\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "51a5311e-4a40-4bdc-aaee-b5845e06d020", + "metadata": {}, + "source": [ + "# Create a Delta Table" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "4abc8426-7f2a-4f3f-9e26-1f1824f870c6", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "DataFrame[]" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark.sql(\"USE NAMESPACE DELTA_NS.PUBLIC\")\n", + "spark.sql(\"\"\"CREATE TABLE IF NOT EXISTS PEOPLE (\n", + " id int, name string)\n", + "USING delta LOCATION 'file:///tmp/delta_tables/people';\n", + "\"\"\")\n", + "# You can also use cloud storage like s3. For eample: s3:///\n", + "# Make the corresponding credentials are set up correctly" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "13356e64-23ca-4804-a1b9-e9f57f4d14ca", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---------------+---------+-----------+\n", + "| namespace|tableName|isTemporary|\n", + "+---------------+---------+-----------+\n", + "|DELTA_NS.PUBLIC| PEOPLE| false|\n", + "+---------------+---------+-----------+\n", + "\n" + ] + } + ], + "source": [ + "spark.sql(\"SHOW TABLES\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "91fa7c6c-34e0-4bb9-babc-3f3db4778101", + "metadata": {}, + "source": [ + "# It's Empty" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "ff5a466d-6a67-4f42-a6a6-ac54ec258e54", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+----+\n", + "| id|name|\n", + "+---+----+\n", + "+---+----+\n", + "\n" + ] + } + ], + "source": [ + "spark.sql(\"SELECT * FROM PEOPLE\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "f7b297de-ed11-41df-8ed9-e9396c6c4465", + "metadata": {}, + "source": [ + "# Insert some records\n", + "Querying again shows some records" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "d7ab2991-6de9-4105-9f95-4c9f1c18f426", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+------+\n", + "| id| name|\n", + "+---+------+\n", + "| 3|jonath|\n", + "| 1| anna|\n", + "| 2| bob|\n", + "+---+------+\n", + "\n" + ] + } + ], + "source": [ + "spark.sql(\"INSERT INTO PEOPLE VALUES (1, 'anna'), (2, 'bob'), (3, 'jonath')\")\n", + "spark.sql(\"SELECT * FROM PEOPLE\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "b3d49be8-cf02-4b81-82ce-31e1ee46630a", + "metadata": {}, + "source": [ + "# Create Iceberg Table" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "8eb5fd0c-20d4-42ce-a823-b6ae43f58313", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "DataFrame[]" + ] + }, + "execution_count": 14, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark.sql(\"USE NAMESPACE DELTA_NS.PUBLIC\")\n", + "spark.sql(\"\"\"CREATE TABLE IF NOT EXISTS COUNTRY (code string, name string) USING iceberg\"\"\")" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "54904fd6-96b4-4198-b5b7-2b6a9e6eea1f", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---------------+---------+-----------+\n", + "| namespace|tableName|isTemporary|\n", + "+---------------+---------+-----------+\n", + "|DELTA_NS.PUBLIC| COUNTRY| false|\n", + "|DELTA_NS.PUBLIC| PEOPLE| false|\n", + "+---------------+---------+-----------+\n", + "\n" + ] + } + ], + "source": [ + "spark.sql(\"SHOW TABLES\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "ae216740-a5cb-4f28-b219-1114d7189546", + "metadata": {}, + "source": [ + "# Insert values for the iceberg table" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "b1e639be-5a3a-41c6-a782-dd939bc2eea4", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+----+-------------+\n", + "|code| name|\n", + "+----+-------------+\n", + "| US|United States|\n", + "| CA| Canada|\n", + "| FR| France|\n", + "| IN| India|\n", + "+----+-------------+\n", + "\n" + ] + } + ], + "source": [ + "spark.sql(\"INSERT INTO COUNTRY VALUES ('US', 'United States'), ('CA', 'Canada'), ('FR', 'France'), ('IN', 'India')\")\n", + "spark.sql(\"SELECT * FROM COUNTRY\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "2a76a0ba-800b-436c-b617-3725286af58c", + "metadata": {}, + "source": [ + "# Initiate a new Spark session\n", + "Change the credentials to the PM's read-only credentials" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "6f3aac79-bf45-4603-bd64-30eeab4bdfa7", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "DataFrame[]" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# The new spark session inherits everything from the previous session except for the overridden credentials\n", + "new_spark = spark.newSession()\n", + "new_spark.conf.set(\"spark.sql.catalog.polaris.credential\", f\"{reader_principal.credentials.client_id}:{reader_principal.credentials.client_secret}\")\n", + "new_spark.sql(\"USE polaris\")" + ] + }, + { + "cell_type": "markdown", + "id": "b6ba9acb-2e9d-4ffa-a685-8a85c75f3046", + "metadata": {}, + "source": [ + "# Show Namespace contents\n", + "We can still `USE NAMESPACE` and `SHOW TABLES`, which require `READ_NAMESPACE_PROPERTIES` and `LIST_TABLES` privileges respectively" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "d517424d-8893-4375-ac3b-c532c8682b6a", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---------------+---------+-----------+\n", + "| namespace|tableName|isTemporary|\n", + "+---------------+---------+-----------+\n", + "|DELTA_NS.PUBLIC| COUNTRY| false|\n", + "|DELTA_NS.PUBLIC| PEOPLE| false|\n", + "+---------------+---------+-----------+\n", + "\n" + ] + } + ], + "source": [ + "new_spark.sql(\"USE NAMESPACE DELTA_NS.PUBLIC\")\n", + "new_spark.sql(\"SHOW TABLES\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "ecfba50e-ec5d-41dd-8715-78ea1c1f42e2", + "metadata": {}, + "source": [ + "# Table reads work" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "7fce4b1f-4d71-4d03-8b60-3e9ca6ca6ddf", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+----+-------------+\n", + "|code| name|\n", + "+----+-------------+\n", + "| US|United States|\n", + "| CA| Canada|\n", + "| FR| France|\n", + "| IN| India|\n", + "+----+-------------+\n", + "\n" + ] + } + ], + "source": [ + "new_spark.sql(\"SELECT * FROM COUNTRY\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "1f38c9c2-1b9a-4be1-b0ab-42e64c398b78", + "metadata": {}, + "source": [ + "# Drop table fails\n", + "NOTE: there is currently no write privilege support for non-iceberg tables" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "92c57967-bd4e-4ffb-a01d-0b22c334432c", + "metadata": {}, + "outputs": [], + "source": [ + "new_spark.sql(\"DROP TABLE COUNTRY\")" + ] + }, + { + "cell_type": "markdown", + "id": "572bf0ac-10d4-4b46-a2dd-b921b2ea99d4", + "metadata": {}, + "source": [ + "# Drop table using original spark session" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "ced0c7cc-1ba6-4711-8645-4ce210491e28", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "DataFrame[]" + ] + }, + "execution_count": 21, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark.sql(\"DROP TABLE COUNTRY\")" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "a32de6ed-96c3-493c-9430-fba91b527929", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---------------+---------+-----------+\n", + "| namespace|tableName|isTemporary|\n", + "+---------------+---------+-----------+\n", + "|DELTA_NS.PUBLIC| PEOPLE| false|\n", + "+---------------+---------+-----------+\n", + "\n" + ] + } + ], + "source": [ + "spark.sql(\"SHOW TABLES\").show()" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "id": "4e3f75a9-c95c-4a74-b12f-9630013e2291", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---------------+---------+-----------+\n", + "| namespace|tableName|isTemporary|\n", + "+---------------+---------+-----------+\n", + "|DELTA_NS.PUBLIC| PEOPLE| false|\n", + "+---------------+---------+-----------+\n", + "\n" + ] + } + ], + "source": [ + "new_spark.sql(\"SHOW TABLES\").show()" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "id": "5b78a8e3-a9e7-4c7f-a9af-7a91d4644e1c", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "DataFrame[]" + ] + }, + "execution_count": 24, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark.sql(\"DROP TABLE PEOPLE\")" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "id": "c42563bb-399e-4308-aa23-764192c5005a", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---------+---------+-----------+\n", + "|namespace|tableName|isTemporary|\n", + "+---------+---------+-----------+\n", + "+---------+---------+-----------+\n", + "\n" + ] + } + ], + "source": [ + "spark.sql(\"SHOW TABLES\").show()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.6" + }, + "toc-autonumbering": false, + "toc-showmarkdowntxt": false, + "toc-showtags": false + }, + "nbformat": 4, + "nbformat_minor": 5 +}