diff --git a/dev/docker-compose-integration.yml b/dev/docker-compose-integration.yml index fccdcdc757..05f487aa43 100644 --- a/dev/docker-compose-integration.yml +++ b/dev/docker-compose-integration.yml @@ -41,19 +41,14 @@ services: - hive:hive - minio:minio rest: - image: tabulario/iceberg-rest + build: https://github.com/apache/polaris.git container_name: pyiceberg-rest networks: iceberg_net: ports: - 8181:8181 - environment: - - AWS_ACCESS_KEY_ID=admin - - AWS_SECRET_ACCESS_KEY=password - - AWS_REGION=us-east-1 - - CATALOG_WAREHOUSE=s3://warehouse/ - - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO - - CATALOG_S3_ENDPOINT=http://minio:9000 + volumes: + - ./warehouse:/warehouse minio: image: minio/minio container_name: pyiceberg-minio diff --git a/dev/provision.py b/dev/provision.py index 53360748b6..575ecfcbfa 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -22,17 +22,165 @@ from pyiceberg.schema import Schema from pyiceberg.types import FixedType, NestedField, UUIDType -spark = SparkSession.builder.getOrCreate() +import json + +from requests import HTTPError, Session + +PRINCIPAL_TOKEN="principal:root;realm:default-realm" +POLARIS_URL="http://rest:8181" +PRINCIPAL_NAME="iceberg" +CATALOG_NAME="polaris" +CATALOG_ROLE="admin_role" +PRINCIPAL_ROLE = "admin_principal_role" + +def create_principal(session: Session) -> str: + response = session.get(url=f"{POLARIS_URL}/api/management/v1/principals/{PRINCIPAL_NAME}") + try: + # rotate creds + response.raise_for_status() + response = session.delete( + url=f"{POLARIS_URL}/api/management/v1/principals/{PRINCIPAL_NAME}", + ) + finally: + # create principal + data = {"principal": {"name": PRINCIPAL_NAME}, "credentialRotationRequired": 'false'} + response = session.post( + url=f"{POLARIS_URL}/api/management/v1/principals", data=json.dumps(data), + ) + credentials = response.json()["credentials"] + + principal_credential = f"{credentials['clientId']}:{credentials['clientSecret']}" + return principal_credential + +def create_catalog(session: Session) -> None: + response = session.get( + url=f"{POLARIS_URL}/api/management/v1/catalogs/{CATALOG_NAME}", + ) + try: + response.raise_for_status() + except HTTPError: + # Create Catalog + data = { + "catalog": { + "name": CATALOG_NAME, + "type": "INTERNAL", + "readOnly": False, + "properties": { + "default-base-location": "file:///warehouse" + }, + "storageConfigInfo": { + "storageType": "FILE", + "allowedLocations": [ + "file:///warehouse" + ] + } + } + } + response = session.post( + url=f"{POLARIS_URL}/api/management/v1/catalogs", data=json.dumps(data), + ) + response.raise_for_status() + +def create_catalog_role(session: Session) -> None: + try: + response = session.get( + url=f"{POLARIS_URL}/api/management/v1/catalogs/{CATALOG_NAME}/catalog-roles/{CATALOG_ROLE}" + ) + response.raise_for_status() + except HTTPError: + # Create Catalog Role + data = { + "catalogRole": { + "name": CATALOG_ROLE, + } + } + response = session.post( + url=f"{POLARIS_URL}/api/management/v1/catalogs/{CATALOG_NAME}/catalog-roles", data=json.dumps(data), + ) + response.raise_for_status() + +def grant_catalog_privileges(session: Session) -> None: + # Grant Catalog privileges to the catalog role + data = { + "grant": { + "type": "catalog", + "privilege": "CATALOG_MANAGE_CONTENT" + } + } + response = session.put( + url=f"{POLARIS_URL}/api/management/v1/catalogs/{CATALOG_NAME}/catalog-roles/{CATALOG_ROLE}/grants", data=json.dumps(data), + ) + response.raise_for_status() + +def create_principal_role(session: Session) -> None: + try: + response = session.get( + url=f"{POLARIS_URL}/api/management/v1/principal-roles/{PRINCIPAL_ROLE}", + ) + response.raise_for_status() + except HTTPError: + # Create a principal role + data = { + "principalRole": { + "name": PRINCIPAL_ROLE, + } + } + response = session.post( + url=f"{POLARIS_URL}/api/management/v1/principal-roles", data=json.dumps(data), + ) + response.raise_for_status() + + # Assign the catalog role to the principal role + data = { + "catalogRole": { + "name": CATALOG_ROLE, + } + } + response = session.put( + url=f"{POLARIS_URL}/api/management/v1/principal-roles/{PRINCIPAL_ROLE}/catalog-roles/{CATALOG_NAME}", data=json.dumps(data), + ) + response.raise_for_status() + + # Assign the principal role to the root principal + data = { + "principalRole": { + "name": PRINCIPAL_ROLE, + } + } + response = session.put( + url=f"{POLARIS_URL}/api/management/v1/principals/{PRINCIPAL_NAME}/principal-roles", data=json.dumps(data), + ) + response.raise_for_status() + +session = Session() +session.headers["Content-type"] = "application/json" +session.headers["Accept"] = "application/json" +session.headers["Authorization"] = f"Bearer {PRINCIPAL_TOKEN}" + +principal_credential = create_principal(session) +create_catalog(session) +create_catalog_role(session) +grant_catalog_privileges(session) +create_principal_role(session) + +spark = SparkSession.builder.config( + "spark.sql.catalog.rest.credential", principal_credential + ).getOrCreate() + +print(spark.sparkContext.getConf().getAll()) catalogs = { 'rest': load_catalog( "rest", **{ "type": "rest", - "uri": "http://rest:8181", + "credential": principal_credential, + "uri": "http://rest:8181/api/catalog", "s3.endpoint": "http://minio:9000", "s3.access-key-id": "admin", "s3.secret-access-key": "password", + "warehouse": "polaris", + "scope": "PRINCIPAL_ROLE:ALL" }, ), 'hive': load_catalog( diff --git a/dev/spark-defaults.conf b/dev/spark-defaults.conf index 2316336fea..1d221b38d2 100644 --- a/dev/spark-defaults.conf +++ b/dev/spark-defaults.conf @@ -18,9 +18,11 @@ spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.rest org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.rest.type rest -spark.sql.catalog.rest.uri http://rest:8181 +spark.sql.catalog.rest.uri http://rest:8181/api/catalog +spark.sql.catalog.rest.oauth2-server-uri http://rest:8181/api/catalog/v1/oauth/tokens spark.sql.catalog.rest.io-impl org.apache.iceberg.aws.s3.S3FileIO -spark.sql.catalog.rest.warehouse s3://warehouse/rest/ +spark.sql.catalog.rest.warehouse polaris +spark.sql.catalog.rest.scope PRINCIPAL_ROLE:ALL spark.sql.catalog.rest.s3.endpoint http://minio:9000 spark.sql.catalog.hive org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.hive.type hive diff --git a/test.ipynb b/test.ipynb new file mode 100644 index 0000000000..372820e087 --- /dev/null +++ b/test.ipynb @@ -0,0 +1,300 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "from requests import HTTPError, Session\n", + "\n", + "PRINCIPAL_TOKEN=\"principal:root;realm:default-realm\"\n", + "POLARIS_URL=\"http://localhost:8181\"\n", + "PRINCIPAL_NAME=\"iceberg\"\n", + "CATALOG_NAME=\"polaris\"\n", + "CATALOG_ROLE=\"admin_role\"\n", + "PRINCIPAL_ROLE = \"admin_principal_role\"" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "metadata": {}, + "outputs": [], + "source": [ + "def create_principal(session: Session) -> str:\n", + " response = session.get(url=f\"{POLARIS_URL}/api/management/v1/principals/{PRINCIPAL_NAME}\")\n", + " try:\n", + " # rotate creds\n", + " response.raise_for_status()\n", + " response = session.delete(\n", + " url=f\"{POLARIS_URL}/api/management/v1/principals/{PRINCIPAL_NAME}\",\n", + " )\n", + " finally:\n", + " # create principal\n", + " data = {\"principal\": {\"name\": PRINCIPAL_NAME}, \"credentialRotationRequired\": 'false'}\n", + " response = session.post(\n", + " url=f\"{POLARIS_URL}/api/management/v1/principals\", data=json.dumps(data),\n", + " )\n", + " credentials = response.json()[\"credentials\"]\n", + "\n", + " principal_credential = f\"{credentials['clientId']}:{credentials['clientSecret']}\"\n", + " return principal_credential\n", + "\n", + "def create_catalog(session: Session) -> str:\n", + " response = session.get(\n", + " url=f\"{POLARIS_URL}/api/management/v1/catalogs/{CATALOG_NAME}\",\n", + " )\n", + " try:\n", + " response.raise_for_status()\n", + " except HTTPError:\n", + " # Create Catalog\n", + " data = {\n", + " \"catalog\": {\n", + " \"name\": CATALOG_NAME,\n", + " \"type\": \"INTERNAL\",\n", + " \"readOnly\": False,\n", + " \"properties\": {\n", + " \"default-base-location\": \"file:///warehouse\"\n", + " },\n", + " \"storageConfigInfo\": {\n", + " \"storageType\": \"FILE\",\n", + " \"allowedLocations\": [\n", + " \"file:///warehouse\"\n", + " ]\n", + " }\n", + " }\n", + " }\n", + " response = session.post(\n", + " url=f\"{POLARIS_URL}/api/management/v1/catalogs\", data=json.dumps(data),\n", + " )\n", + " response.raise_for_status()\n", + "\n", + "def create_catalog_role(session: Session) -> None:\n", + " try:\n", + " response = session.get(\n", + " url=f\"{POLARIS_URL}/api/management/v1/catalogs/{CATALOG_NAME}/catalog-roles/{CATALOG_ROLE}\"\n", + " )\n", + " response.raise_for_status()\n", + " except HTTPError:\n", + " # Create Catalog Role\n", + " data = {\n", + " \"catalogRole\": {\n", + " \"name\": CATALOG_ROLE,\n", + " }\n", + " }\n", + " response = session.post(\n", + " url=f\"{POLARIS_URL}/api/management/v1/catalogs/{CATALOG_NAME}/catalog-roles\", data=json.dumps(data),\n", + " )\n", + " response.raise_for_status()\n", + "\n", + "def grant_catalog_privileges(session: Session) -> None:\n", + " # Grant Catalog privileges to the catalog role\n", + " data = {\n", + " \"grant\": {\n", + " \"type\": \"catalog\",\n", + " \"privilege\": \"CATALOG_MANAGE_CONTENT\"\n", + " }\n", + " }\n", + " response = session.put(\n", + " url=f\"{POLARIS_URL}/api/management/v1/catalogs/{CATALOG_NAME}/catalog-roles/{CATALOG_ROLE}/grants\", data=json.dumps(data),\n", + " )\n", + " response.raise_for_status()\n", + "\n", + "def create_principal_role(session: Session) -> None:\n", + " try:\n", + " response = session.get(\n", + " url=f\"{POLARIS_URL}/api/management/v1/principal-roles/{PRINCIPAL_ROLE}\",\n", + " )\n", + " response.raise_for_status()\n", + " except HTTPError:\n", + " # Create a principal role\n", + " data = {\n", + " \"principalRole\": {\n", + " \"name\": PRINCIPAL_ROLE,\n", + " }\n", + " }\n", + " response = session.post(\n", + " url=f\"{POLARIS_URL}/api/management/v1/principal-roles\", data=json.dumps(data),\n", + " )\n", + " response.raise_for_status()\n", + " \n", + " # Assign the catalog role to the principal role\n", + " data = {\n", + " \"catalogRole\": {\n", + " \"name\": CATALOG_ROLE,\n", + " }\n", + " }\n", + " response = session.put(\n", + " url=f\"{POLARIS_URL}/api/management/v1/principal-roles/{PRINCIPAL_ROLE}/catalog-roles/{CATALOG_NAME}\", data=json.dumps(data),\n", + " )\n", + " response.raise_for_status()\n", + "\n", + " # Assign the principal role to the root principal\n", + " data = {\n", + " \"principalRole\": {\n", + " \"name\": PRINCIPAL_ROLE,\n", + " }\n", + " }\n", + " response = session.put(\n", + " url=f\"{POLARIS_URL}/api/management/v1/principals/{PRINCIPAL_NAME}/principal-roles\", data=json.dumps(data),\n", + " )\n", + " response.raise_for_status()" + ] + }, + { + "cell_type": "code", + "execution_count": 48, + "metadata": {}, + "outputs": [], + "source": [ + "session = Session()\n", + "session.headers[\"Content-type\"] = \"application/json\"\n", + "session.headers[\"Accept\"] = \"application/json\"\n", + "session.headers[\"Authorization\"] = f\"Bearer {PRINCIPAL_TOKEN}\"\n", + "\n", + "principal_credential = create_principal(session)\n", + "create_catalog(session)\n", + "create_catalog_role(session)\n", + "grant_catalog_privileges(session)\n", + "create_principal_role(session)" + ] + }, + { + "cell_type": "code", + "execution_count": 49, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/workspaces/iceberg-python/pyiceberg/utils/deprecated.py:51: DeprecationWarning: Deprecated in 0.8.0, will be removed in 1.0.0. Iceberg REST client is missing the OAuth2 server URI configuration and defaults to http://localhost:8181/api/catalogoauth/tokens. This automatic fallback will be removed in a future Iceberg release.It is recommended to configure the OAuth2 endpoint using the 'oauth2-server-uri'property to be prepared. This warning will disappear if the OAuth2endpoint is explicitly configured. See https://github.com/apache/iceberg/issues/10537\n", + " _deprecation_warning(message)\n" + ] + } + ], + "source": [ + "from pyiceberg.catalog import load_catalog\n", + "\n", + "\n", + "catalog = load_catalog(\n", + " \"local\",\n", + " **{\n", + " \"type\": \"rest\",\n", + " \"credential\": principal_credential,\n", + " \"uri\": \"http://localhost:8181/api/catalog\",\n", + " \"s3.endpoint\": \"http://localhost:9000\",\n", + " \"s3.access-key-id\": \"admin\",\n", + " \"s3.secret-access-key\": \"password\",\n", + " \"warehouse\": \"polaris\",\n", + " \"scope\": \"PRINCIPAL_ROLE:ALL\"\n", + " },\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": 50, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[]" + ] + }, + "execution_count": 50, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "catalog.list_namespaces()" + ] + }, + { + "cell_type": "code", + "execution_count": 51, + "metadata": {}, + "outputs": [], + "source": [ + "catalog.create_namespace(\"test\")" + ] + }, + { + "cell_type": "code", + "execution_count": 52, + "metadata": {}, + "outputs": [ + { + "ename": "ServiceUnavailableError", + "evalue": "RuntimeIOException: Failed to create file: file:/warehouse/test/test_uuid_and_fixed_unpartitioned/metadata/00000-57d09f7c-98e6-4fd8-aab3-c352c0cdd251.metadata.json", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mHTTPError\u001b[0m Traceback (most recent call last)", + "File \u001b[0;32m/workspaces/iceberg-python/pyiceberg/catalog/rest.py:597\u001b[0m, in \u001b[0;36mRestCatalog._create_table\u001b[0;34m(self, identifier, schema, location, partition_spec, sort_order, properties, stage_create)\u001b[0m\n\u001b[1;32m 596\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 597\u001b[0m \u001b[43mresponse\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mraise_for_status\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 598\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m HTTPError \u001b[38;5;28;01mas\u001b[39;00m exc:\n", + "File \u001b[0;32m~/.cache/pypoetry/virtualenvs/pyiceberg-FsHa-ZgB-py3.12/lib/python3.12/site-packages/requests/models.py:1024\u001b[0m, in \u001b[0;36mResponse.raise_for_status\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 1023\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m http_error_msg:\n\u001b[0;32m-> 1024\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m HTTPError(http_error_msg, response\u001b[38;5;241m=\u001b[39m\u001b[38;5;28mself\u001b[39m)\n", + "\u001b[0;31mHTTPError\u001b[0m: 503 Server Error: Service Unavailable for url: http://localhost:8181/api/catalog/v1/polaris/namespaces/test/tables", + "\nThe above exception was the direct cause of the following exception:\n", + "\u001b[0;31mServiceUnavailableError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[52], line 10\u001b[0m\n\u001b[1;32m 3\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mpyiceberg\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mtypes\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m FixedType, NestedField, UUIDType\n\u001b[1;32m 5\u001b[0m schema \u001b[38;5;241m=\u001b[39m Schema(\n\u001b[1;32m 6\u001b[0m NestedField(field_id\u001b[38;5;241m=\u001b[39m\u001b[38;5;241m1\u001b[39m, name\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124muuid_col\u001b[39m\u001b[38;5;124m\"\u001b[39m, field_type\u001b[38;5;241m=\u001b[39mUUIDType(), required\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mFalse\u001b[39;00m),\n\u001b[1;32m 7\u001b[0m NestedField(field_id\u001b[38;5;241m=\u001b[39m\u001b[38;5;241m2\u001b[39m, name\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mfixed_col\u001b[39m\u001b[38;5;124m\"\u001b[39m, field_type\u001b[38;5;241m=\u001b[39mFixedType(\u001b[38;5;241m25\u001b[39m), required\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mFalse\u001b[39;00m),\n\u001b[1;32m 8\u001b[0m )\n\u001b[0;32m---> 10\u001b[0m table \u001b[38;5;241m=\u001b[39m \u001b[43mcatalog\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcreate_table\u001b[49m\u001b[43m(\u001b[49m\u001b[43midentifier\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mtest.test_uuid_and_fixed_unpartitioned\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mschema\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mschema\u001b[49m\u001b[43m)\u001b[49m\n", + "File \u001b[0;32m~/.cache/pypoetry/virtualenvs/pyiceberg-FsHa-ZgB-py3.12/lib/python3.12/site-packages/tenacity/__init__.py:336\u001b[0m, in \u001b[0;36mBaseRetrying.wraps..wrapped_f\u001b[0;34m(*args, **kw)\u001b[0m\n\u001b[1;32m 334\u001b[0m copy \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mcopy()\n\u001b[1;32m 335\u001b[0m wrapped_f\u001b[38;5;241m.\u001b[39mstatistics \u001b[38;5;241m=\u001b[39m copy\u001b[38;5;241m.\u001b[39mstatistics \u001b[38;5;66;03m# type: ignore[attr-defined]\u001b[39;00m\n\u001b[0;32m--> 336\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43mcopy\u001b[49m\u001b[43m(\u001b[49m\u001b[43mf\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkw\u001b[49m\u001b[43m)\u001b[49m\n", + "File \u001b[0;32m~/.cache/pypoetry/virtualenvs/pyiceberg-FsHa-ZgB-py3.12/lib/python3.12/site-packages/tenacity/__init__.py:475\u001b[0m, in \u001b[0;36mRetrying.__call__\u001b[0;34m(self, fn, *args, **kwargs)\u001b[0m\n\u001b[1;32m 473\u001b[0m retry_state \u001b[38;5;241m=\u001b[39m RetryCallState(retry_object\u001b[38;5;241m=\u001b[39m\u001b[38;5;28mself\u001b[39m, fn\u001b[38;5;241m=\u001b[39mfn, args\u001b[38;5;241m=\u001b[39margs, kwargs\u001b[38;5;241m=\u001b[39mkwargs)\n\u001b[1;32m 474\u001b[0m \u001b[38;5;28;01mwhile\u001b[39;00m \u001b[38;5;28;01mTrue\u001b[39;00m:\n\u001b[0;32m--> 475\u001b[0m do \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43miter\u001b[49m\u001b[43m(\u001b[49m\u001b[43mretry_state\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mretry_state\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 476\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(do, DoAttempt):\n\u001b[1;32m 477\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n", + "File \u001b[0;32m~/.cache/pypoetry/virtualenvs/pyiceberg-FsHa-ZgB-py3.12/lib/python3.12/site-packages/tenacity/__init__.py:376\u001b[0m, in \u001b[0;36mBaseRetrying.iter\u001b[0;34m(self, retry_state)\u001b[0m\n\u001b[1;32m 374\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m\n\u001b[1;32m 375\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m action \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39miter_state\u001b[38;5;241m.\u001b[39mactions:\n\u001b[0;32m--> 376\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[43maction\u001b[49m\u001b[43m(\u001b[49m\u001b[43mretry_state\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 377\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n", + "File \u001b[0;32m~/.cache/pypoetry/virtualenvs/pyiceberg-FsHa-ZgB-py3.12/lib/python3.12/site-packages/tenacity/__init__.py:398\u001b[0m, in \u001b[0;36mBaseRetrying._post_retry_check_actions..\u001b[0;34m(rs)\u001b[0m\n\u001b[1;32m 396\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21m_post_retry_check_actions\u001b[39m(\u001b[38;5;28mself\u001b[39m, retry_state: \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mRetryCallState\u001b[39m\u001b[38;5;124m\"\u001b[39m) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[1;32m 397\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m (\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39miter_state\u001b[38;5;241m.\u001b[39mis_explicit_retry \u001b[38;5;129;01mor\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39miter_state\u001b[38;5;241m.\u001b[39mretry_run_result):\n\u001b[0;32m--> 398\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_add_action_func(\u001b[38;5;28;01mlambda\u001b[39;00m rs: \u001b[43mrs\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43moutcome\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mresult\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m)\n\u001b[1;32m 399\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m\n\u001b[1;32m 401\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mafter \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n", + "File \u001b[0;32m/usr/local/python/3.12.1/lib/python3.12/concurrent/futures/_base.py:449\u001b[0m, in \u001b[0;36mFuture.result\u001b[0;34m(self, timeout)\u001b[0m\n\u001b[1;32m 447\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m CancelledError()\n\u001b[1;32m 448\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_state \u001b[38;5;241m==\u001b[39m FINISHED:\n\u001b[0;32m--> 449\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m__get_result\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 451\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_condition\u001b[38;5;241m.\u001b[39mwait(timeout)\n\u001b[1;32m 453\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_state \u001b[38;5;129;01min\u001b[39;00m [CANCELLED, CANCELLED_AND_NOTIFIED]:\n", + "File \u001b[0;32m/usr/local/python/3.12.1/lib/python3.12/concurrent/futures/_base.py:401\u001b[0m, in \u001b[0;36mFuture.__get_result\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 399\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_exception:\n\u001b[1;32m 400\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 401\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_exception\n\u001b[1;32m 402\u001b[0m \u001b[38;5;28;01mfinally\u001b[39;00m:\n\u001b[1;32m 403\u001b[0m \u001b[38;5;66;03m# Break a reference cycle with the exception in self._exception\u001b[39;00m\n\u001b[1;32m 404\u001b[0m \u001b[38;5;28mself\u001b[39m \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m\n", + "File \u001b[0;32m~/.cache/pypoetry/virtualenvs/pyiceberg-FsHa-ZgB-py3.12/lib/python3.12/site-packages/tenacity/__init__.py:478\u001b[0m, in \u001b[0;36mRetrying.__call__\u001b[0;34m(self, fn, *args, **kwargs)\u001b[0m\n\u001b[1;32m 476\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(do, DoAttempt):\n\u001b[1;32m 477\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 478\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[43mfn\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 479\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mBaseException\u001b[39;00m: \u001b[38;5;66;03m# noqa: B902\u001b[39;00m\n\u001b[1;32m 480\u001b[0m retry_state\u001b[38;5;241m.\u001b[39mset_exception(sys\u001b[38;5;241m.\u001b[39mexc_info()) \u001b[38;5;66;03m# type: ignore[arg-type]\u001b[39;00m\n", + "File \u001b[0;32m/workspaces/iceberg-python/pyiceberg/catalog/rest.py:613\u001b[0m, in \u001b[0;36mRestCatalog.create_table\u001b[0;34m(self, identifier, schema, location, partition_spec, sort_order, properties)\u001b[0m\n\u001b[1;32m 603\u001b[0m \u001b[38;5;129m@retry\u001b[39m(\u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39m_RETRY_ARGS)\n\u001b[1;32m 604\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mcreate_table\u001b[39m(\n\u001b[1;32m 605\u001b[0m \u001b[38;5;28mself\u001b[39m,\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 611\u001b[0m properties: Properties \u001b[38;5;241m=\u001b[39m EMPTY_DICT,\n\u001b[1;32m 612\u001b[0m ) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m Table:\n\u001b[0;32m--> 613\u001b[0m table_response \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_create_table\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 614\u001b[0m \u001b[43m \u001b[49m\u001b[43midentifier\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43midentifier\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 615\u001b[0m \u001b[43m \u001b[49m\u001b[43mschema\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mschema\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 616\u001b[0m \u001b[43m \u001b[49m\u001b[43mlocation\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mlocation\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 617\u001b[0m \u001b[43m \u001b[49m\u001b[43mpartition_spec\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mpartition_spec\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 618\u001b[0m \u001b[43m \u001b[49m\u001b[43msort_order\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43msort_order\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 619\u001b[0m \u001b[43m \u001b[49m\u001b[43mproperties\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mproperties\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 620\u001b[0m \u001b[43m \u001b[49m\u001b[43mstage_create\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;28;43;01mFalse\u001b[39;49;00m\u001b[43m,\u001b[49m\n\u001b[1;32m 621\u001b[0m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 622\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_response_to_table(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39midentifier_to_tuple(identifier), table_response)\n", + "File \u001b[0;32m/workspaces/iceberg-python/pyiceberg/catalog/rest.py:599\u001b[0m, in \u001b[0;36mRestCatalog._create_table\u001b[0;34m(self, identifier, schema, location, partition_spec, sort_order, properties, stage_create)\u001b[0m\n\u001b[1;32m 597\u001b[0m response\u001b[38;5;241m.\u001b[39mraise_for_status()\n\u001b[1;32m 598\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m HTTPError \u001b[38;5;28;01mas\u001b[39;00m exc:\n\u001b[0;32m--> 599\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_handle_non_200_response\u001b[49m\u001b[43m(\u001b[49m\u001b[43mexc\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m{\u001b[49m\u001b[38;5;241;43m409\u001b[39;49m\u001b[43m:\u001b[49m\u001b[43m \u001b[49m\u001b[43mTableAlreadyExistsError\u001b[49m\u001b[43m}\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 601\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m TableResponse(\u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mresponse\u001b[38;5;241m.\u001b[39mjson())\n", + "File \u001b[0;32m/workspaces/iceberg-python/pyiceberg/catalog/rest.py:471\u001b[0m, in \u001b[0;36mRestCatalog._handle_non_200_response\u001b[0;34m(self, exc, error_handler)\u001b[0m\n\u001b[1;32m 466\u001b[0m errs \u001b[38;5;241m=\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m, \u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;241m.\u001b[39mjoin(err[\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mmsg\u001b[39m\u001b[38;5;124m\"\u001b[39m] \u001b[38;5;28;01mfor\u001b[39;00m err \u001b[38;5;129;01min\u001b[39;00m e\u001b[38;5;241m.\u001b[39merrors())\n\u001b[1;32m 467\u001b[0m response \u001b[38;5;241m=\u001b[39m (\n\u001b[1;32m 468\u001b[0m \u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mRESTError \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mexc\u001b[38;5;241m.\u001b[39mresponse\u001b[38;5;241m.\u001b[39mstatus_code\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m: Received unexpected JSON Payload: \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mexc\u001b[38;5;241m.\u001b[39mresponse\u001b[38;5;241m.\u001b[39mtext\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m, errors: \u001b[39m\u001b[38;5;132;01m{\u001b[39;00merrs\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 469\u001b[0m )\n\u001b[0;32m--> 471\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exception(response) \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mexc\u001b[39;00m\n", + "\u001b[0;31mServiceUnavailableError\u001b[0m: RuntimeIOException: Failed to create file: file:/warehouse/test/test_uuid_and_fixed_unpartitioned/metadata/00000-57d09f7c-98e6-4fd8-aab3-c352c0cdd251.metadata.json" + ] + } + ], + "source": [ + "from pyiceberg.catalog import load_catalog\n", + "from pyiceberg.schema import Schema\n", + "from pyiceberg.types import FixedType, NestedField, UUIDType\n", + "\n", + "schema = Schema(\n", + " NestedField(field_id=1, name=\"uuid_col\", field_type=UUIDType(), required=False),\n", + " NestedField(field_id=2, name=\"fixed_col\", field_type=FixedType(25), required=False),\n", + " )\n", + "\n", + "table = catalog.create_table(identifier=\"test.test_uuid_and_fixed_unpartitioned\", schema=schema)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "pyiceberg-FsHa-ZgB-py3.12", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.1" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/tests/conftest.py b/tests/conftest.py index b05947ebe6..56c779452e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2185,20 +2185,6 @@ def bound_reference_uuid() -> BoundReference[str]: return BoundReference(field=NestedField(1, "field", UUIDType(), required=False), accessor=Accessor(position=0, inner=None)) -@pytest.fixture(scope="session") -def session_catalog() -> Catalog: - return load_catalog( - "local", - **{ - "type": "rest", - "uri": "http://localhost:8181", - "s3.endpoint": "http://localhost:9000", - "s3.access-key-id": "admin", - "s3.secret-access-key": "password", - }, - ) - - @pytest.fixture(scope="session") def session_catalog_hive() -> Catalog: return load_catalog( diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/tests/integration/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000000..74a8b046bc --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,179 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json + +import pytest +from requests import HTTPError, Session + +from pyiceberg.catalog import Catalog, load_catalog + +PRINCIPAL_TOKEN = "principal:root;realm:default-realm" +POLARIS_URL = "http://localhost:8181" +PRINCIPAL_NAME = "iceberg" +CATALOG_NAME = "polaris" +CATALOG_ROLE = "admin_role" +PRINCIPAL_ROLE = "admin_principal_role" + + +def create_principal(session: Session) -> str: + response = session.get(url=f"{POLARIS_URL}/api/management/v1/principals/{PRINCIPAL_NAME}") + try: + # rotate creds + response.raise_for_status() + response = session.delete( + url=f"{POLARIS_URL}/api/management/v1/principals/{PRINCIPAL_NAME}", + ) + finally: + # create principal + data = {"principal": {"name": PRINCIPAL_NAME}, "credentialRotationRequired": "false"} + response = session.post( + url=f"{POLARIS_URL}/api/management/v1/principals", + data=json.dumps(data), + ) + credentials = response.json()["credentials"] + + principal_credential = f"{credentials['clientId']}:{credentials['clientSecret']}" + return principal_credential + + +def create_catalog(session: Session) -> None: + response = session.get( + url=f"{POLARIS_URL}/api/management/v1/catalogs/{CATALOG_NAME}", + ) + try: + response.raise_for_status() + except HTTPError: + # Create Catalog + data = { + "catalog": { + "name": CATALOG_NAME, + "type": "INTERNAL", + "readOnly": False, + "properties": {"default-base-location": "file:///warehouse"}, + "storageConfigInfo": {"storageType": "FILE", "allowedLocations": ["file:///warehouse"]}, + } + } + response = session.post( + url=f"{POLARIS_URL}/api/management/v1/catalogs", + data=json.dumps(data), + ) + response.raise_for_status() + + +def create_catalog_role(session: Session) -> None: + try: + response = session.get(url=f"{POLARIS_URL}/api/management/v1/catalogs/{CATALOG_NAME}/catalog-roles/{CATALOG_ROLE}") + response.raise_for_status() + except HTTPError: + # Create Catalog Role + data = { + "catalogRole": { + "name": CATALOG_ROLE, + } + } + response = session.post( + url=f"{POLARIS_URL}/api/management/v1/catalogs/{CATALOG_NAME}/catalog-roles", + data=json.dumps(data), + ) + response.raise_for_status() + + +def grant_catalog_privileges(session: Session) -> None: + # Grant Catalog privileges to the catalog role + data = {"grant": {"type": "catalog", "privilege": "CATALOG_MANAGE_CONTENT"}} + response = session.put( + url=f"{POLARIS_URL}/api/management/v1/catalogs/{CATALOG_NAME}/catalog-roles/{CATALOG_ROLE}/grants", + data=json.dumps(data), + ) + response.raise_for_status() + + +def create_principal_role(session: Session) -> None: + try: + response = session.get( + url=f"{POLARIS_URL}/api/management/v1/principal-roles/{PRINCIPAL_ROLE}", + ) + response.raise_for_status() + except HTTPError: + # Create a principal role + data = { + "principalRole": { + "name": PRINCIPAL_ROLE, + } + } + response = session.post( + url=f"{POLARIS_URL}/api/management/v1/principal-roles", + data=json.dumps(data), + ) + response.raise_for_status() + + # Assign the catalog role to the principal role + data = { + "catalogRole": { + "name": CATALOG_ROLE, + } + } + response = session.put( + url=f"{POLARIS_URL}/api/management/v1/principal-roles/{PRINCIPAL_ROLE}/catalog-roles/{CATALOG_NAME}", + data=json.dumps(data), + ) + response.raise_for_status() + + # Assign the principal role to the root principal + data = { + "principalRole": { + "name": PRINCIPAL_ROLE, + } + } + response = session.put( + url=f"{POLARIS_URL}/api/management/v1/principals/{PRINCIPAL_NAME}/principal-roles", + data=json.dumps(data), + ) + response.raise_for_status() + + +@pytest.fixture(scope="session") +def principal_credential() -> str: + session = Session() + session.headers["Content-type"] = "application/json" + session.headers["Accept"] = "application/json" + session.headers["Authorization"] = f"Bearer {PRINCIPAL_TOKEN}" + + principal_credential = create_principal(session) + create_catalog(session) + create_catalog_role(session) + grant_catalog_privileges(session) + create_principal_role(session) + return principal_credential + + +@pytest.fixture(scope="session") +def session_catalog() -> Catalog: + return load_catalog( + "local", + **{ + "type": "rest", + "credential": principal_credential, + "uri": "http://localhost:8181/api/catalog", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + "warehouse": "polaris", + "scope": "PRINCIPAL_ROLE:ALL", + }, + )