From 18521885434769a40f824075d03993ce5b99e4fa Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 12 Dec 2023 09:48:00 +0100 Subject: [PATCH 1/8] Add Hive for CI --- dev/Dockerfile | 10 +- dev/docker-compose-integration.yml | 14 + dev/hive/Dockerfile | 19 ++ dev/hive/core-site.xml | 34 ++ dev/provision.py | 495 ++++++++++++++--------------- dev/spark-defaults.conf | 20 +- 6 files changed, 327 insertions(+), 265 deletions(-) create mode 100644 dev/hive/Dockerfile create mode 100644 dev/hive/core-site.xml diff --git a/dev/Dockerfile b/dev/Dockerfile index 1f001f5c12..e578252d0b 100644 --- a/dev/Dockerfile +++ b/dev/Dockerfile @@ -38,9 +38,8 @@ WORKDIR ${SPARK_HOME} ENV SPARK_VERSION=3.4.2 ENV ICEBERG_SPARK_RUNTIME_VERSION=3.4_2.12 -ENV ICEBERG_VERSION=1.4.0 -ENV AWS_SDK_VERSION=2.20.18 -ENV PYICEBERG_VERSION=0.4.0 +ENV ICEBERG_VERSION=1.4.2 +ENV PYICEBERG_VERSION=0.5.1 RUN curl --retry 3 -s -C - https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \ && tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \ @@ -51,8 +50,7 @@ RUN curl -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runt && mv iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar /opt/spark/jars # Download AWS bundle -RUN curl -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -Lo iceberg-aws-bundle-${ICEBERG_VERSION}.jar \ - && mv iceberg-aws-bundle-${ICEBERG_VERSION}.jar /opt/spark/jars +RUN curl -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -Lo /opt/spark/jars/iceberg-aws-bundle-${ICEBERG_VERSION}.jar COPY spark-defaults.conf /opt/spark/conf ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}" @@ -62,7 +60,7 @@ RUN chmod u+x /opt/spark/sbin/* && \ RUN pip3 install -q ipython -RUN pip3 install "pyiceberg[s3fs]==${PYICEBERG_VERSION}" +RUN pip3 install "pyiceberg[s3fs,pyarrow,hive]==${PYICEBERG_VERSION}" COPY entrypoint.sh . COPY provision.py . diff --git a/dev/docker-compose-integration.yml b/dev/docker-compose-integration.yml index 658bd698c9..fccdcdc757 100644 --- a/dev/docker-compose-integration.yml +++ b/dev/docker-compose-integration.yml @@ -25,6 +25,7 @@ services: iceberg_net: depends_on: - rest + - hive - minio volumes: - ./warehouse:/home/iceberg/warehouse @@ -37,6 +38,7 @@ services: - 8080:8080 links: - rest:rest + - hive:hive - minio:minio rest: image: tabulario/iceberg-rest @@ -85,5 +87,17 @@ services: /usr/bin/mc policy set public minio/warehouse; tail -f /dev/null " + hive: + build: hive/ + container_name: hive + hostname: hive + networks: + iceberg_net: + ports: + - 9083:9083 + environment: + SERVICE_NAME: "metastore" + SERVICE_OPTS: "-Dmetastore.warehouse.dir=s3a://warehouse/hive/" + networks: iceberg_net: diff --git a/dev/hive/Dockerfile b/dev/hive/Dockerfile new file mode 100644 index 0000000000..a26af3e633 --- /dev/null +++ b/dev/hive/Dockerfile @@ -0,0 +1,19 @@ +FROM openjdk:8-jre-slim AS build + +RUN apt-get update -qq && apt-get -qq -y install curl + +ENV AWSSDK_VERSION=2.20.18 +ENV HADOOP_VERSION=3.1.0 + +RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar -Lo /tmp/aws-java-sdk-bundle-1.11.271.jar +RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -Lo /tmp/hadoop-aws-${HADOOP_VERSION}.jar + + +FROM apache/hive:3.1.3 + +ENV AWSSDK_VERSION=2.20.18 +ENV HADOOP_VERSION=3.1.0 + +COPY --from=build /tmp/hadoop-aws-${HADOOP_VERSION}.jar /opt/hive/lib/hadoop-aws-${HADOOP_VERSION}.jar +COPY --from=build /tmp/aws-java-sdk-bundle-1.11.271.jar /opt/hive/lib/aws-java-sdk-bundle-1.11.271.jar +COPY core-site.xml /opt/hadoop/etc/hadoop/core-site.xml \ No newline at end of file diff --git a/dev/hive/core-site.xml b/dev/hive/core-site.xml new file mode 100644 index 0000000000..85c9e97d30 --- /dev/null +++ b/dev/hive/core-site.xml @@ -0,0 +1,34 @@ + + + fs.defaultFS + s3a://warehouse/hive + + + fs.s3a.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + + + fs.s3a.fast.upload + true + + + fs.s3a.endpoint + http://minio:9000 + + + fs.s3a.access.key + admin + + + fs.s3a.secret.key + password + + + fs.s3a.connection.ssl.enabled + false + + + fs.s3a.path.style.access + true + + diff --git a/dev/provision.py b/dev/provision.py index ca6e5aa6aa..d23ce8a4fd 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -24,19 +24,8 @@ spark = SparkSession.builder.getOrCreate() -spark.sql( - """ - CREATE DATABASE IF NOT EXISTS default; -""" -) - -schema = Schema( - NestedField(field_id=1, name="uuid_col", field_type=UUIDType(), required=False), - NestedField(field_id=2, name="fixed_col", field_type=FixedType(25), required=False), -) - -catalog = load_catalog( - "local", +catalogs = {'rest': load_catalog( + "rest", **{ "type": "rest", "uri": "http://rest:8181", @@ -44,280 +33,282 @@ "s3.access-key-id": "admin", "s3.secret-access-key": "password", }, -) - -catalog.create_table(identifier="default.test_uuid_and_fixed_unpartitioned", schema=schema) +), 'hive': load_catalog( + "hive", + **{ + "type": "hive", + "uri": "http://hive:9083", + "s3.endpoint": "http://minio:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, +)} -spark.sql( - """ - INSERT INTO default.test_uuid_and_fixed_unpartitioned VALUES - ('102cb62f-e6f8-4eb0-9973-d9b012ff0967', CAST('1234567890123456789012345' AS BINARY)), - ('ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226', CAST('1231231231231231231231231' AS BINARY)), - ('639cccce-c9d2-494a-a78c-278ab234f024', CAST('12345678901234567ass12345' AS BINARY)), - ('c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b', CAST('asdasasdads12312312312111' AS BINARY)), - ('923dae77-83d6-47cd-b4b0-d383e64ee57e', CAST('qweeqwwqq1231231231231111' AS BINARY)); +for catalog_name, catalog in catalogs.items(): + spark.sql( + f""" + CREATE DATABASE IF NOT EXISTS {catalog_name}.default; """ -) + ) -spark.sql( - """ - CREATE OR REPLACE TABLE default.test_null_nan - USING iceberg - AS SELECT - 1 AS idx, - float('NaN') AS col_numeric -UNION ALL SELECT - 2 AS idx, - null AS col_numeric -UNION ALL SELECT - 3 AS idx, - 1 AS col_numeric -""" -) - -spark.sql( - """ - CREATE OR REPLACE TABLE default.test_null_nan_rewritten - USING iceberg - AS SELECT * FROM default.test_null_nan -""" -) + schema = Schema( + NestedField(field_id=1, name="uuid_col", field_type=UUIDType(), required=False), + NestedField(field_id=2, name="fixed_col", field_type=FixedType(25), required=False), + ) -spark.sql( - """ -CREATE OR REPLACE TABLE default.test_limit as - SELECT * LATERAL VIEW explode(ARRAY(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) AS idx; -""" -) + catalog.create_table(identifier="default.test_uuid_and_fixed_unpartitioned", schema=schema) -spark.sql( - """ -CREATE OR REPLACE TABLE default.test_positional_mor_deletes ( - dt date, - number integer, - letter string -) -USING iceberg -TBLPROPERTIES ( - 'write.delete.mode'='merge-on-read', - 'write.update.mode'='merge-on-read', - 'write.merge.mode'='merge-on-read', - 'format-version'='2' -); -""" -) - -# Partitioning is not really needed, but there is a bug: -# https://github.com/apache/iceberg/pull/7685 -spark.sql( - """ - ALTER TABLE default.test_positional_mor_deletes ADD PARTITION FIELD years(dt) AS dt_years -""" -) + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_uuid_and_fixed_unpartitioned VALUES + ('102cb62f-e6f8-4eb0-9973-d9b012ff0967', CAST('1234567890123456789012345' AS BINARY)), + ('ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226', CAST('1231231231231231231231231' AS BINARY)), + ('639cccce-c9d2-494a-a78c-278ab234f024', CAST('12345678901234567ass12345' AS BINARY)), + ('c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b', CAST('asdasasdads12312312312111' AS BINARY)), + ('923dae77-83d6-47cd-b4b0-d383e64ee57e', CAST('qweeqwwqq1231231231231111' AS BINARY)); + """ + ) -spark.sql( - """ -INSERT INTO default.test_positional_mor_deletes -VALUES - (CAST('2023-03-01' AS date), 1, 'a'), - (CAST('2023-03-02' AS date), 2, 'b'), - (CAST('2023-03-03' AS date), 3, 'c'), - (CAST('2023-03-04' AS date), 4, 'd'), - (CAST('2023-03-05' AS date), 5, 'e'), - (CAST('2023-03-06' AS date), 6, 'f'), - (CAST('2023-03-07' AS date), 7, 'g'), - (CAST('2023-03-08' AS date), 8, 'h'), - (CAST('2023-03-09' AS date), 9, 'i'), - (CAST('2023-03-10' AS date), 10, 'j'), - (CAST('2023-03-11' AS date), 11, 'k'), - (CAST('2023-03-12' AS date), 12, 'l'); -""" -) - -spark.sql( - """ -ALTER TABLE default.test_positional_mor_deletes CREATE TAG tag_12 + spark.sql( + f""" + CREATE OR REPLACE TABLE {catalog_name}.default.test_null_nan + USING iceberg + AS SELECT + 1 AS idx, + float('NaN') AS col_numeric + UNION ALL SELECT + 2 AS idx, + null AS col_numeric + UNION ALL SELECT + 3 AS idx, + 1 AS col_numeric """ -) + ) -spark.sql( - """ -ALTER TABLE default.test_positional_mor_deletes CREATE BRANCH without_5 + spark.sql( + f""" + CREATE OR REPLACE TABLE {catalog_name}.default.test_null_nan_rewritten + USING iceberg + AS SELECT * FROM default.test_null_nan """ -) + ) -spark.sql( + spark.sql( + f""" + CREATE OR REPLACE TABLE {catalog_name}.default.test_limit as + SELECT * LATERAL VIEW explode(ARRAY(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) AS idx; """ -DELETE FROM default.test_positional_mor_deletes.branch_without_5 WHERE number = 5 + ) + + spark.sql( + f""" + CREATE OR REPLACE TABLE {catalog_name}.default.test_positional_mor_deletes ( + dt date, + number integer, + letter string + ) + USING iceberg + TBLPROPERTIES ( + 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read', + 'write.merge.mode'='merge-on-read', + 'format-version'='2' + ); """ -) + ) + # Partitioning is not really needed, but there is a bug: + # https://github.com/apache/iceberg/pull/7685 + spark.sql( + f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes ADD PARTITION FIELD years(dt) AS dt_years" + ) -spark.sql( + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_positional_mor_deletes + VALUES + (CAST('2023-03-01' AS date), 1, 'a'), + (CAST('2023-03-02' AS date), 2, 'b'), + (CAST('2023-03-03' AS date), 3, 'c'), + (CAST('2023-03-04' AS date), 4, 'd'), + (CAST('2023-03-05' AS date), 5, 'e'), + (CAST('2023-03-06' AS date), 6, 'f'), + (CAST('2023-03-07' AS date), 7, 'g'), + (CAST('2023-03-08' AS date), 8, 'h'), + (CAST('2023-03-09' AS date), 9, 'i'), + (CAST('2023-03-10' AS date), 10, 'j'), + (CAST('2023-03-11' AS date), 11, 'k'), + (CAST('2023-03-12' AS date), 12, 'l'); """ -DELETE FROM default.test_positional_mor_deletes WHERE number = 9 -""" -) + ) -spark.sql( - """ - CREATE OR REPLACE TABLE default.test_positional_mor_double_deletes ( - dt date, - number integer, - letter string - ) - USING iceberg - TBLPROPERTIES ( - 'write.delete.mode'='merge-on-read', - 'write.update.mode'='merge-on-read', - 'write.merge.mode'='merge-on-read', - 'format-version'='2' - ); -""" -) - -# Partitioning is not really needed, but there is a bug: -# https://github.com/apache/iceberg/pull/7685 -spark.sql( - """ - ALTER TABLE default.test_positional_mor_double_deletes ADD PARTITION FIELD years(dt) AS dt_years -""" -) + spark.sql( + f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes CREATE TAG tag_12" + ) -spark.sql( - """ -INSERT INTO default.test_positional_mor_double_deletes -VALUES - (CAST('2023-03-01' AS date), 1, 'a'), - (CAST('2023-03-02' AS date), 2, 'b'), - (CAST('2023-03-03' AS date), 3, 'c'), - (CAST('2023-03-04' AS date), 4, 'd'), - (CAST('2023-03-05' AS date), 5, 'e'), - (CAST('2023-03-06' AS date), 6, 'f'), - (CAST('2023-03-07' AS date), 7, 'g'), - (CAST('2023-03-08' AS date), 8, 'h'), - (CAST('2023-03-09' AS date), 9, 'i'), - (CAST('2023-03-10' AS date), 10, 'j'), - (CAST('2023-03-11' AS date), 11, 'k'), - (CAST('2023-03-12' AS date), 12, 'l'); -""" -) - -spark.sql( - """ - DELETE FROM default.test_positional_mor_double_deletes WHERE number = 9 -""" -) + spark.sql( + f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes CREATE BRANCH without_5" + ) + + spark.sql( + f"DELETE FROM {catalog_name}.default.test_positional_mor_deletes.branch_without_5 WHERE number = 5" + ) + + spark.sql( + f"DELETE FROM {catalog_name}.default.test_positional_mor_deletes WHERE number = 9" + ) -spark.sql( - """ - DELETE FROM default.test_positional_mor_double_deletes WHERE letter == 'f' -""" -) - -all_types_dataframe = ( - spark.range(0, 5, 1, 5) - .withColumnRenamed("id", "longCol") - .withColumn("intCol", expr("CAST(longCol AS INT)")) - .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) - .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) - .withColumn("dateCol", date_add(current_date(), 1)) - .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) - .withColumn("stringCol", expr("CAST(dateCol AS STRING)")) - .withColumn("booleanCol", expr("longCol > 5")) - .withColumn("binaryCol", expr("CAST(longCol AS BINARY)")) - .withColumn("byteCol", expr("CAST(longCol AS BYTE)")) - .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))")) - .withColumn("shortCol", expr("CAST(longCol AS SHORT)")) - .withColumn("mapCol", expr("MAP(longCol, decimalCol)")) - .withColumn("arrayCol", expr("ARRAY(longCol)")) - .withColumn("structCol", expr("STRUCT(mapCol, arrayCol)")) -) - -all_types_dataframe.writeTo("default.test_all_types").tableProperty("format-version", "2").partitionedBy( - "intCol" -).createOrReplace() - -for table_name, partition in [ - ("test_partitioned_by_identity", "ts"), - ("test_partitioned_by_years", "years(dt)"), - ("test_partitioned_by_months", "months(dt)"), - ("test_partitioned_by_days", "days(ts)"), - ("test_partitioned_by_hours", "hours(ts)"), - ("test_partitioned_by_truncate", "truncate(1, letter)"), - ("test_partitioned_by_bucket", "bucket(16, number)"), -]: spark.sql( f""" - CREATE OR REPLACE TABLE default.{table_name} ( + CREATE OR REPLACE TABLE {catalog_name}.default.test_positional_mor_double_deletes ( dt date, - ts timestamp, number integer, letter string ) - USING iceberg; + USING iceberg + TBLPROPERTIES ( + 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read', + 'write.merge.mode'='merge-on-read', + 'format-version'='2' + ); """ ) - spark.sql(f"ALTER TABLE default.{table_name} ADD PARTITION FIELD {partition}") + # Partitioning is not really needed, but there is a bug: + # https://github.com/apache/iceberg/pull/7685 + spark.sql( + f"ALTER TABLE {catalog_name}.default.test_positional_mor_double_deletes ADD PARTITION FIELD years(dt) AS dt_years" + ) spark.sql( f""" - INSERT INTO default.{table_name} + INSERT INTO {catalog_name}.default.test_positional_mor_double_deletes VALUES - (CAST('2022-03-01' AS date), CAST('2022-03-01 01:22:00' AS timestamp), 1, 'a'), - (CAST('2022-03-02' AS date), CAST('2022-03-02 02:22:00' AS timestamp), 2, 'b'), - (CAST('2022-03-03' AS date), CAST('2022-03-03 03:22:00' AS timestamp), 3, 'c'), - (CAST('2022-03-04' AS date), CAST('2022-03-04 04:22:00' AS timestamp), 4, 'd'), - (CAST('2023-03-05' AS date), CAST('2023-03-05 05:22:00' AS timestamp), 5, 'e'), - (CAST('2023-03-06' AS date), CAST('2023-03-06 06:22:00' AS timestamp), 6, 'f'), - (CAST('2023-03-07' AS date), CAST('2023-03-07 07:22:00' AS timestamp), 7, 'g'), - (CAST('2023-03-08' AS date), CAST('2023-03-08 08:22:00' AS timestamp), 8, 'h'), - (CAST('2023-03-09' AS date), CAST('2023-03-09 09:22:00' AS timestamp), 9, 'i'), - (CAST('2023-03-10' AS date), CAST('2023-03-10 10:22:00' AS timestamp), 10, 'j'), - (CAST('2023-03-11' AS date), CAST('2023-03-11 11:22:00' AS timestamp), 11, 'k'), - (CAST('2023-03-12' AS date), CAST('2023-03-12 12:22:00' AS timestamp), 12, 'l'); + (CAST('2023-03-01' AS date), 1, 'a'), + (CAST('2023-03-02' AS date), 2, 'b'), + (CAST('2023-03-03' AS date), 3, 'c'), + (CAST('2023-03-04' AS date), 4, 'd'), + (CAST('2023-03-05' AS date), 5, 'e'), + (CAST('2023-03-06' AS date), 6, 'f'), + (CAST('2023-03-07' AS date), 7, 'g'), + (CAST('2023-03-08' AS date), 8, 'h'), + (CAST('2023-03-09' AS date), 9, 'i'), + (CAST('2023-03-10' AS date), 10, 'j'), + (CAST('2023-03-11' AS date), 11, 'k'), + (CAST('2023-03-12' AS date), 12, 'l'); """ ) -# There is an issue with CREATE OR REPLACE -# https://github.com/apache/iceberg/issues/8756 -spark.sql( + spark.sql( + f"DELETE FROM {catalog_name}.default.test_positional_mor_double_deletes WHERE number = 9" + ) + + spark.sql( + f"DELETE FROM {catalog_name}.default.test_positional_mor_double_deletes WHERE letter == 'f'" + ) + + all_types_dataframe = ( + spark.range(0, 5, 1, 5) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(longCol AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("dateCol", date_add(current_date(), 1)) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(dateCol AS STRING)")) + .withColumn("booleanCol", expr("longCol > 5")) + .withColumn("binaryCol", expr("CAST(longCol AS BINARY)")) + .withColumn("byteCol", expr("CAST(longCol AS BYTE)")) + .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))")) + .withColumn("shortCol", expr("CAST(longCol AS SHORT)")) + .withColumn("mapCol", expr("MAP(longCol, decimalCol)")) + .withColumn("arrayCol", expr("ARRAY(longCol)")) + .withColumn("structCol", expr("STRUCT(mapCol, arrayCol)")) + ) + + all_types_dataframe.writeTo(f"{catalog_name}.default.test_all_types").tableProperty("format-version", "2").partitionedBy( + "intCol" + ).createOrReplace() + + for table_name, partition in [ + ("test_partitioned_by_identity", "ts"), + ("test_partitioned_by_years", "years(dt)"), + ("test_partitioned_by_months", "months(dt)"), + ("test_partitioned_by_days", "days(ts)"), + ("test_partitioned_by_hours", "hours(ts)"), + ("test_partitioned_by_truncate", "truncate(1, letter)"), + ("test_partitioned_by_bucket", "bucket(16, number)"), + ]: + spark.sql( + f""" + CREATE OR REPLACE TABLE {catalog_name}.default.{table_name} ( + dt date, + ts timestamp, + number integer, + letter string + ) + USING iceberg; + """ + ) + + spark.sql(f"ALTER TABLE {catalog_name}.default.{table_name} ADD PARTITION FIELD {partition}") + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.{table_name} + VALUES + (CAST('2022-03-01' AS date), CAST('2022-03-01 01:22:00' AS timestamp), 1, 'a'), + (CAST('2022-03-02' AS date), CAST('2022-03-02 02:22:00' AS timestamp), 2, 'b'), + (CAST('2022-03-03' AS date), CAST('2022-03-03 03:22:00' AS timestamp), 3, 'c'), + (CAST('2022-03-04' AS date), CAST('2022-03-04 04:22:00' AS timestamp), 4, 'd'), + (CAST('2023-03-05' AS date), CAST('2023-03-05 05:22:00' AS timestamp), 5, 'e'), + (CAST('2023-03-06' AS date), CAST('2023-03-06 06:22:00' AS timestamp), 6, 'f'), + (CAST('2023-03-07' AS date), CAST('2023-03-07 07:22:00' AS timestamp), 7, 'g'), + (CAST('2023-03-08' AS date), CAST('2023-03-08 08:22:00' AS timestamp), 8, 'h'), + (CAST('2023-03-09' AS date), CAST('2023-03-09 09:22:00' AS timestamp), 9, 'i'), + (CAST('2023-03-10' AS date), CAST('2023-03-10 10:22:00' AS timestamp), 10, 'j'), + (CAST('2023-03-11' AS date), CAST('2023-03-11 11:22:00' AS timestamp), 11, 'k'), + (CAST('2023-03-12' AS date), CAST('2023-03-12 12:22:00' AS timestamp), 12, 'l'); + """ + ) + + # There is an issue with CREATE OR REPLACE + # https://github.com/apache/iceberg/issues/8756 + spark.sql( + f"DROP TABLE IF EXISTS {catalog_name}.default.test_table_version" + ) + + spark.sql( + f""" + CREATE TABLE {catalog_name}.default.test_table_version ( + dt date, + number integer, + letter string + ) + USING iceberg + TBLPROPERTIES ( + 'format-version'='1' + ); """ -DROP TABLE IF EXISTS default.test_table_version -""" -) + ) -spark.sql( + spark.sql( + f""" + CREATE TABLE {catalog_name}.default.test_table_sanitized_character ( + `letter/abc` string + ) + USING iceberg + TBLPROPERTIES ( + 'format-version'='1' + ); """ -CREATE TABLE default.test_table_version ( - dt date, - number integer, - letter string -) -USING iceberg -TBLPROPERTIES ( - 'format-version'='1' -); -""" -) - -spark.sql( + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_table_sanitized_character + VALUES + ('123') """ -CREATE TABLE default.test_table_sanitized_character ( - `letter/abc` string -) -USING iceberg -TBLPROPERTIES ( - 'format-version'='1' -); -""" -) - -spark.sql( - f""" -INSERT INTO default.test_table_sanitized_character -VALUES - ('123') -""" -) + ) diff --git a/dev/spark-defaults.conf b/dev/spark-defaults.conf index 56c345432a..2316336fea 100644 --- a/dev/spark-defaults.conf +++ b/dev/spark-defaults.conf @@ -16,13 +16,19 @@ # spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions -spark.sql.catalog.demo org.apache.iceberg.spark.SparkCatalog -spark.sql.catalog.demo.type rest -spark.sql.catalog.demo.uri http://rest:8181 -spark.sql.catalog.demo.io-impl org.apache.iceberg.aws.s3.S3FileIO -spark.sql.catalog.demo.warehouse s3://warehouse/wh/ -spark.sql.catalog.demo.s3.endpoint http://minio:9000 -spark.sql.defaultCatalog demo +spark.sql.catalog.rest org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.rest.type rest +spark.sql.catalog.rest.uri http://rest:8181 +spark.sql.catalog.rest.io-impl org.apache.iceberg.aws.s3.S3FileIO +spark.sql.catalog.rest.warehouse s3://warehouse/rest/ +spark.sql.catalog.rest.s3.endpoint http://minio:9000 +spark.sql.catalog.hive org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.hive.type hive +spark.sql.catalog.hive.uri http://hive:9083 +spark.sql.catalog.hive.io-impl org.apache.iceberg.aws.s3.S3FileIO +spark.sql.catalog.hive.warehouse s3://warehouse/hive/ +spark.sql.catalog.hive.s3.endpoint http://minio:9000 +spark.sql.defaultCatalog rest spark.eventLog.enabled true spark.eventLog.dir /home/iceberg/spark-events spark.history.fs.logDirectory /home/iceberg/spark-events From 6d3ba16604f022b733462a143014a0a82547e26f Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 12 Dec 2023 10:09:14 +0100 Subject: [PATCH 2/8] Add Hive integration tests --- tests/integration/__init__.py | 16 + tests/integration/test_hive.py | 408 ++++++++++++++++++ .../test_rest.py} | 0 .../test_rest_manifest.py} | 0 .../test_rest_schema.py} | 0 5 files changed, 424 insertions(+) create mode 100644 tests/integration/__init__.py create mode 100644 tests/integration/test_hive.py rename tests/{test_integration.py => integration/test_rest.py} (100%) rename tests/{test_integration_manifest.py => integration/test_rest_manifest.py} (100%) rename tests/{test_integration_schema.py => integration/test_rest_schema.py} (100%) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/tests/integration/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/test_hive.py b/tests/integration/test_hive.py new file mode 100644 index 0000000000..7292e5575c --- /dev/null +++ b/tests/integration/test_hive.py @@ -0,0 +1,408 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name + +import math +import uuid +from urllib.parse import urlparse + +import pyarrow.parquet as pq +import pytest +from pyarrow.fs import S3FileSystem + +from pyiceberg.catalog import Catalog, load_catalog +from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.expressions import ( + And, + EqualTo, + GreaterThanOrEqual, + IsNaN, + LessThan, + NotEqualTo, + NotNaN, +) +from pyiceberg.io.pyarrow import pyarrow_to_schema +from pyiceberg.schema import Schema +from pyiceberg.table import Table +from pyiceberg.types import ( + BooleanType, + IntegerType, + NestedField, + StringType, + TimestampType, +) + +DEFAULT_PROPERTIES = {} + + +@pytest.fixture() +def catalog() -> Catalog: + return load_catalog( + "local", + **{ + "type": "hive", + "uri": "http://localhost:9083", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ) + + +@pytest.fixture() +def table_test_null_nan(catalog: Catalog) -> Table: + return catalog.load_table("default.test_null_nan") + + +@pytest.fixture() +def table_test_null_nan_rewritten(catalog: Catalog) -> Table: + return catalog.load_table("default.test_null_nan_rewritten") + + +@pytest.fixture() +def table_test_limit(catalog: Catalog) -> Table: + return catalog.load_table("default.test_limit") + + +@pytest.fixture() +def table_test_all_types(catalog: Catalog) -> Table: + return catalog.load_table("default.test_all_types") + + +@pytest.fixture() +def table_test_table_version(catalog: Catalog) -> Table: + return catalog.load_table("default.test_table_version") + + +@pytest.fixture() +def table_test_table_sanitized_character(catalog: Catalog) -> Table: + return catalog.load_table("default.test_table_sanitized_character") + + +TABLE_NAME = ("default", "t1") + + +@pytest.fixture() +def table(catalog: Catalog) -> Table: + try: + catalog.drop_table(TABLE_NAME) + except NoSuchTableError: + pass # Just to make sure that the table doesn't exist + + schema = Schema( + NestedField(field_id=1, name="str", field_type=StringType(), required=False), + NestedField(field_id=2, name="int", field_type=IntegerType(), required=True), + NestedField(field_id=3, name="bool", field_type=BooleanType(), required=False), + NestedField(field_id=4, name="datetime", field_type=TimestampType(), required=False), + schema_id=1, + ) + + return catalog.create_table(identifier=TABLE_NAME, schema=schema) + + +@pytest.mark.integration +@pytest.mark.skip(reason="Not yet implemented: https://github.com/apache/iceberg-python/issues/205") +def test_table_properties(table: Table) -> None: + assert table.properties == DEFAULT_PROPERTIES + + with table.transaction() as transaction: + transaction.set_properties(abc="🤪") + + assert table.properties == dict(abc="🤪", **DEFAULT_PROPERTIES) + + with table.transaction() as transaction: + transaction.remove_properties("abc") + + assert table.properties == DEFAULT_PROPERTIES + + table = table.transaction().set_properties(abc="def").commit_transaction() + + assert table.properties == dict(abc="def", **DEFAULT_PROPERTIES) + + table = table.transaction().remove_properties("abc").commit_transaction() + + assert table.properties == DEFAULT_PROPERTIES + + +@pytest.fixture() +def test_positional_mor_deletes(catalog: Catalog) -> Table: + """Table that has positional deletes""" + return catalog.load_table("default.test_positional_mor_deletes") + + +@pytest.fixture() +def test_positional_mor_double_deletes(catalog: Catalog) -> Table: + """Table that has multiple positional deletes""" + return catalog.load_table("default.test_positional_mor_double_deletes") + + +@pytest.mark.integration +def test_pyarrow_nan(table_test_null_nan: Table) -> None: + arrow_table = table_test_null_nan.scan(row_filter=IsNaN("col_numeric"), selected_fields=("idx", "col_numeric")).to_arrow() + assert len(arrow_table) == 1 + assert arrow_table["idx"][0].as_py() == 1 + assert math.isnan(arrow_table["col_numeric"][0].as_py()) + + +@pytest.mark.integration +def test_pyarrow_nan_rewritten(table_test_null_nan_rewritten: Table) -> None: + arrow_table = table_test_null_nan_rewritten.scan( + row_filter=IsNaN("col_numeric"), selected_fields=("idx", "col_numeric") + ).to_arrow() + assert len(arrow_table) == 1 + assert arrow_table["idx"][0].as_py() == 1 + assert math.isnan(arrow_table["col_numeric"][0].as_py()) + + +@pytest.mark.integration +@pytest.mark.skip(reason="Fixing issues with NaN's: https://github.com/apache/arrow/issues/34162") +def test_pyarrow_not_nan_count(table_test_null_nan: Table) -> None: + not_nan = table_test_null_nan.scan(row_filter=NotNaN("col_numeric"), selected_fields=("idx",)).to_arrow() + assert len(not_nan) == 2 + + +@pytest.mark.integration +def test_duckdb_nan(table_test_null_nan_rewritten: Table) -> None: + con = table_test_null_nan_rewritten.scan().to_duckdb("table_test_null_nan") + result = con.query("SELECT idx, col_numeric FROM table_test_null_nan WHERE isnan(col_numeric)").fetchone() + assert result[0] == 1 + assert math.isnan(result[1]) + + +@pytest.mark.integration +def test_pyarrow_limit(table_test_limit: Table) -> None: + limited_result = table_test_limit.scan(selected_fields=("idx",), limit=1).to_arrow() + assert len(limited_result) == 1 + + empty_result = table_test_limit.scan(selected_fields=("idx",), limit=0).to_arrow() + assert len(empty_result) == 0 + + full_result = table_test_limit.scan(selected_fields=("idx",), limit=999).to_arrow() + assert len(full_result) == 10 + + +@pytest.mark.filterwarnings("ignore") +@pytest.mark.integration +def test_ray_nan(table_test_null_nan_rewritten: Table) -> None: + ray_dataset = table_test_null_nan_rewritten.scan().to_ray() + assert ray_dataset.count() == 3 + assert math.isnan(ray_dataset.take()[0]["col_numeric"]) + + +@pytest.mark.integration +def test_ray_nan_rewritten(table_test_null_nan_rewritten: Table) -> None: + ray_dataset = table_test_null_nan_rewritten.scan( + row_filter=IsNaN("col_numeric"), selected_fields=("idx", "col_numeric") + ).to_ray() + assert ray_dataset.count() == 1 + assert ray_dataset.take()[0]["idx"] == 1 + assert math.isnan(ray_dataset.take()[0]["col_numeric"]) + + +@pytest.mark.integration +@pytest.mark.skip(reason="Fixing issues with NaN's: https://github.com/apache/arrow/issues/34162") +def test_ray_not_nan_count(table_test_null_nan_rewritten: Table) -> None: + ray_dataset = table_test_null_nan_rewritten.scan(row_filter=NotNaN("col_numeric"), selected_fields=("idx",)).to_ray() + print(ray_dataset.take()) + assert ray_dataset.count() == 2 + + +@pytest.mark.integration +def test_ray_all_types(table_test_all_types: Table) -> None: + ray_dataset = table_test_all_types.scan().to_ray() + pandas_dataframe = table_test_all_types.scan().to_pandas() + assert ray_dataset.count() == pandas_dataframe.shape[0] + assert pandas_dataframe.equals(ray_dataset.to_pandas()) + + +@pytest.mark.integration +def test_pyarrow_to_iceberg_all_types(table_test_all_types: Table) -> None: + fs = S3FileSystem(endpoint_override="http://localhost:9000", access_key="admin", secret_key="password") + data_file_paths = [task.file.file_path for task in table_test_all_types.scan().plan_files()] + for data_file_path in data_file_paths: + uri = urlparse(data_file_path) + with fs.open_input_file(f"{uri.netloc}{uri.path}") as fout: + parquet_schema = pq.read_schema(fout) + stored_iceberg_schema = Schema.model_validate_json(parquet_schema.metadata.get(b"iceberg.schema")) + converted_iceberg_schema = pyarrow_to_schema(parquet_schema) + assert converted_iceberg_schema == stored_iceberg_schema + + +@pytest.mark.integration +def test_pyarrow_deletes(test_positional_mor_deletes: Table) -> None: + # number, letter + # (1, 'a'), + # (2, 'b'), + # (3, 'c'), + # (4, 'd'), + # (5, 'e'), + # (6, 'f'), + # (7, 'g'), + # (8, 'h'), + # (9, 'i'), <- deleted + # (10, 'j'), + # (11, 'k'), + # (12, 'l') + arrow_table = test_positional_mor_deletes.scan().to_arrow() + assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12] + + # Checking the filter + arrow_table = test_positional_mor_deletes.scan( + row_filter=And(GreaterThanOrEqual("letter", "e"), LessThan("letter", "k")) + ).to_arrow() + assert arrow_table["number"].to_pylist() == [5, 6, 7, 8, 10] + + # Testing the combination of a filter and a limit + arrow_table = test_positional_mor_deletes.scan( + row_filter=And(GreaterThanOrEqual("letter", "e"), LessThan("letter", "k")), limit=1 + ).to_arrow() + assert arrow_table["number"].to_pylist() == [5] + + # Testing the slicing of indices + arrow_table = test_positional_mor_deletes.scan(limit=3).to_arrow() + assert arrow_table["number"].to_pylist() == [1, 2, 3] + + +@pytest.mark.integration +def test_pyarrow_deletes_double(test_positional_mor_double_deletes: Table) -> None: + # number, letter + # (1, 'a'), + # (2, 'b'), + # (3, 'c'), + # (4, 'd'), + # (5, 'e'), + # (6, 'f'), <- second delete + # (7, 'g'), + # (8, 'h'), + # (9, 'i'), <- first delete + # (10, 'j'), + # (11, 'k'), + # (12, 'l') + arrow_table = test_positional_mor_double_deletes.scan().to_arrow() + assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 7, 8, 10, 11, 12] + + # Checking the filter + arrow_table = test_positional_mor_double_deletes.scan( + row_filter=And(GreaterThanOrEqual("letter", "e"), LessThan("letter", "k")) + ).to_arrow() + assert arrow_table["number"].to_pylist() == [5, 7, 8, 10] + + # Testing the combination of a filter and a limit + arrow_table = test_positional_mor_double_deletes.scan( + row_filter=And(GreaterThanOrEqual("letter", "e"), LessThan("letter", "k")), limit=1 + ).to_arrow() + assert arrow_table["number"].to_pylist() == [5] + + # Testing the slicing of indices + arrow_table = test_positional_mor_double_deletes.scan(limit=8).to_arrow() + assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 7, 8, 10] + + +@pytest.mark.integration +def test_partitioned_tables(catalog: Catalog) -> None: + for table_name, predicate in [ + ("test_partitioned_by_identity", "ts >= '2023-03-05T00:00:00+00:00'"), + ("test_partitioned_by_years", "dt >= '2023-03-05'"), + ("test_partitioned_by_months", "dt >= '2023-03-05'"), + ("test_partitioned_by_days", "ts >= '2023-03-05T00:00:00+00:00'"), + ("test_partitioned_by_hours", "ts >= '2023-03-05T00:00:00+00:00'"), + ("test_partitioned_by_truncate", "letter >= 'e'"), + ("test_partitioned_by_bucket", "number >= '5'"), + ]: + table = catalog.load_table(f"default.{table_name}") + arrow_table = table.scan(selected_fields=("number",), row_filter=predicate).to_arrow() + assert set(arrow_table["number"].to_pylist()) == {5, 6, 7, 8, 9, 10, 11, 12}, f"Table {table_name}, predicate {predicate}" + + +@pytest.mark.integration +def test_unpartitioned_uuid_table(catalog: Catalog) -> None: + unpartitioned_uuid = catalog.load_table("default.test_uuid_and_fixed_unpartitioned") + arrow_table_eq = unpartitioned_uuid.scan(row_filter="uuid_col == '102cb62f-e6f8-4eb0-9973-d9b012ff0967'").to_arrow() + assert arrow_table_eq["uuid_col"].to_pylist() == [uuid.UUID("102cb62f-e6f8-4eb0-9973-d9b012ff0967").bytes] + + arrow_table_neq = unpartitioned_uuid.scan( + row_filter="uuid_col != '102cb62f-e6f8-4eb0-9973-d9b012ff0967' and uuid_col != '639cccce-c9d2-494a-a78c-278ab234f024'" + ).to_arrow() + assert arrow_table_neq["uuid_col"].to_pylist() == [ + uuid.UUID("ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226").bytes, + uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b").bytes, + uuid.UUID("923dae77-83d6-47cd-b4b0-d383e64ee57e").bytes, + ] + + +@pytest.mark.integration +def test_unpartitioned_fixed_table(catalog: Catalog) -> None: + fixed_table = catalog.load_table("default.test_uuid_and_fixed_unpartitioned") + arrow_table_eq = fixed_table.scan(row_filter=EqualTo("fixed_col", b"1234567890123456789012345")).to_arrow() + assert arrow_table_eq["fixed_col"].to_pylist() == [b"1234567890123456789012345"] + + arrow_table_neq = fixed_table.scan( + row_filter=And( + NotEqualTo("fixed_col", b"1234567890123456789012345"), NotEqualTo("uuid_col", "c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b") + ) + ).to_arrow() + assert arrow_table_neq["fixed_col"].to_pylist() == [ + b"1231231231231231231231231", + b"12345678901234567ass12345", + b"qweeqwwqq1231231231231111", + ] + + +@pytest.mark.integration +def test_scan_tag(test_positional_mor_deletes: Table) -> None: + arrow_table = test_positional_mor_deletes.scan().use_ref("tag_12").to_arrow() + assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] + + +@pytest.mark.integration +def test_scan_branch(test_positional_mor_deletes: Table) -> None: + arrow_table = test_positional_mor_deletes.scan().use_ref("without_5").to_arrow() + assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12] + + +@pytest.mark.integration +@pytest.mark.skip(reason="Not yet implemented: https://github.com/apache/iceberg-python/issues/205") +def test_upgrade_table_version(table_test_table_version: Table) -> None: + assert table_test_table_version.format_version == 1 + + with table_test_table_version.transaction() as transaction: + transaction.upgrade_table_version(format_version=1) + + assert table_test_table_version.format_version == 1 + + with table_test_table_version.transaction() as transaction: + transaction.upgrade_table_version(format_version=2) + + assert table_test_table_version.format_version == 2 + + with pytest.raises(ValueError) as e: # type: ignore + with table_test_table_version.transaction() as transaction: + transaction.upgrade_table_version(format_version=1) + assert "Cannot downgrade v2 table to v1" in str(e.value) + + with pytest.raises(ValueError) as e: + with table_test_table_version.transaction() as transaction: + transaction.upgrade_table_version(format_version=3) + assert "Unsupported table format version: 3" in str(e.value) + + +@pytest.mark.integration +def test_reproduce_issue(table_test_table_sanitized_character: Table) -> None: + arrow_table = table_test_table_sanitized_character.scan().to_arrow() + assert len(arrow_table.schema.names), 1 + assert len(table_test_table_sanitized_character.schema().fields), 1 + assert arrow_table.schema.names[0] == table_test_table_sanitized_character.schema().fields[0].name diff --git a/tests/test_integration.py b/tests/integration/test_rest.py similarity index 100% rename from tests/test_integration.py rename to tests/integration/test_rest.py diff --git a/tests/test_integration_manifest.py b/tests/integration/test_rest_manifest.py similarity index 100% rename from tests/test_integration_manifest.py rename to tests/integration/test_rest_manifest.py diff --git a/tests/test_integration_schema.py b/tests/integration/test_rest_schema.py similarity index 100% rename from tests/test_integration_schema.py rename to tests/integration/test_rest_schema.py From 906323bc26cd5afc032937e9019bff6c4bd5a08c Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 12 Dec 2023 10:12:32 +0100 Subject: [PATCH 3/8] Add missing licenses --- dev/hive/Dockerfile | 15 +++++++++++++++ dev/hive/core-site.xml | 19 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/dev/hive/Dockerfile b/dev/hive/Dockerfile index a26af3e633..ff8c9fae63 100644 --- a/dev/hive/Dockerfile +++ b/dev/hive/Dockerfile @@ -1,3 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + FROM openjdk:8-jre-slim AS build RUN apt-get update -qq && apt-get -qq -y install curl diff --git a/dev/hive/core-site.xml b/dev/hive/core-site.xml index 85c9e97d30..b77332b83b 100644 --- a/dev/hive/core-site.xml +++ b/dev/hive/core-site.xml @@ -1,3 +1,22 @@ + + + + fs.defaultFS From 9ba99b17b47cc2199a3a0ca5807a276a44131ceb Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 12 Dec 2023 10:15:45 +0100 Subject: [PATCH 4/8] Fix --- dev/hive/Dockerfile | 2 +- dev/provision.py | 77 ++++++++++++++-------------------- tests/integration/test_hive.py | 3 +- 3 files changed, 34 insertions(+), 48 deletions(-) diff --git a/dev/hive/Dockerfile b/dev/hive/Dockerfile index ff8c9fae63..ee633934c3 100644 --- a/dev/hive/Dockerfile +++ b/dev/hive/Dockerfile @@ -31,4 +31,4 @@ ENV HADOOP_VERSION=3.1.0 COPY --from=build /tmp/hadoop-aws-${HADOOP_VERSION}.jar /opt/hive/lib/hadoop-aws-${HADOOP_VERSION}.jar COPY --from=build /tmp/aws-java-sdk-bundle-1.11.271.jar /opt/hive/lib/aws-java-sdk-bundle-1.11.271.jar -COPY core-site.xml /opt/hadoop/etc/hadoop/core-site.xml \ No newline at end of file +COPY core-site.xml /opt/hadoop/etc/hadoop/core-site.xml diff --git a/dev/provision.py b/dev/provision.py index d23ce8a4fd..b1ccb28192 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -24,25 +24,28 @@ spark = SparkSession.builder.getOrCreate() -catalogs = {'rest': load_catalog( - "rest", - **{ - "type": "rest", - "uri": "http://rest:8181", - "s3.endpoint": "http://minio:9000", - "s3.access-key-id": "admin", - "s3.secret-access-key": "password", - }, -), 'hive': load_catalog( - "hive", - **{ - "type": "hive", - "uri": "http://hive:9083", - "s3.endpoint": "http://minio:9000", - "s3.access-key-id": "admin", - "s3.secret-access-key": "password", - }, -)} +catalogs = { + 'rest': load_catalog( + "rest", + **{ + "type": "rest", + "uri": "http://rest:8181", + "s3.endpoint": "http://minio:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ), + 'hive': load_catalog( + "hive", + **{ + "type": "hive", + "uri": "http://hive:9083", + "s3.endpoint": "http://minio:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ), +} for catalog_name, catalog in catalogs.items(): spark.sql( @@ -119,9 +122,7 @@ # Partitioning is not really needed, but there is a bug: # https://github.com/apache/iceberg/pull/7685 - spark.sql( - f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes ADD PARTITION FIELD years(dt) AS dt_years" - ) + spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes ADD PARTITION FIELD years(dt) AS dt_years") spark.sql( f""" @@ -142,21 +143,13 @@ """ ) - spark.sql( - f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes CREATE TAG tag_12" - ) + spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes CREATE TAG tag_12") - spark.sql( - f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes CREATE BRANCH without_5" - ) + spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes CREATE BRANCH without_5") - spark.sql( - f"DELETE FROM {catalog_name}.default.test_positional_mor_deletes.branch_without_5 WHERE number = 5" - ) + spark.sql(f"DELETE FROM {catalog_name}.default.test_positional_mor_deletes.branch_without_5 WHERE number = 5") - spark.sql( - f"DELETE FROM {catalog_name}.default.test_positional_mor_deletes WHERE number = 9" - ) + spark.sql(f"DELETE FROM {catalog_name}.default.test_positional_mor_deletes WHERE number = 9") spark.sql( f""" @@ -177,9 +170,7 @@ # Partitioning is not really needed, but there is a bug: # https://github.com/apache/iceberg/pull/7685 - spark.sql( - f"ALTER TABLE {catalog_name}.default.test_positional_mor_double_deletes ADD PARTITION FIELD years(dt) AS dt_years" - ) + spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_double_deletes ADD PARTITION FIELD years(dt) AS dt_years") spark.sql( f""" @@ -200,13 +191,9 @@ """ ) - spark.sql( - f"DELETE FROM {catalog_name}.default.test_positional_mor_double_deletes WHERE number = 9" - ) + spark.sql(f"DELETE FROM {catalog_name}.default.test_positional_mor_double_deletes WHERE number = 9") - spark.sql( - f"DELETE FROM {catalog_name}.default.test_positional_mor_double_deletes WHERE letter == 'f'" - ) + spark.sql(f"DELETE FROM {catalog_name}.default.test_positional_mor_double_deletes WHERE letter == 'f'") all_types_dataframe = ( spark.range(0, 5, 1, 5) @@ -275,9 +262,7 @@ # There is an issue with CREATE OR REPLACE # https://github.com/apache/iceberg/issues/8756 - spark.sql( - f"DROP TABLE IF EXISTS {catalog_name}.default.test_table_version" - ) + spark.sql(f"DROP TABLE IF EXISTS {catalog_name}.default.test_table_version") spark.sql( f""" diff --git a/tests/integration/test_hive.py b/tests/integration/test_hive.py index 7292e5575c..bb95fafc8c 100644 --- a/tests/integration/test_hive.py +++ b/tests/integration/test_hive.py @@ -18,6 +18,7 @@ import math import uuid +from typing import Dict from urllib.parse import urlparse import pyarrow.parquet as pq @@ -46,7 +47,7 @@ TimestampType, ) -DEFAULT_PROPERTIES = {} +DEFAULT_PROPERTIES: Dict[str, str] = {} @pytest.fixture() From 2e77cb76cfffc2da8e29edbc812a0b88ea34d34e Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Thu, 14 Dec 2023 20:22:45 +0100 Subject: [PATCH 5/8] Remove Arrow --- dev/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/Dockerfile b/dev/Dockerfile index e578252d0b..44783d0809 100644 --- a/dev/Dockerfile +++ b/dev/Dockerfile @@ -60,7 +60,7 @@ RUN chmod u+x /opt/spark/sbin/* && \ RUN pip3 install -q ipython -RUN pip3 install "pyiceberg[s3fs,pyarrow,hive]==${PYICEBERG_VERSION}" +RUN pip3 install "pyiceberg[s3fs,hive]==${PYICEBERG_VERSION}" COPY entrypoint.sh . COPY provision.py . From 5ce1c8a31e2020f044a2d7e2279304a6cc98eaa7 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 17 Jan 2024 14:52:04 +0100 Subject: [PATCH 6/8] Add catalog --- dev/provision.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/dev/provision.py b/dev/provision.py index 42b14c98ed..3656361fec 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -307,20 +307,16 @@ ) spark.sql( - """ - CREATE TABLE default.test_table_add_column ( + f""" + CREATE TABLE {catalog_name}.test_table_add_column ( a string ) USING iceberg """ ) - spark.sql("INSERT INTO default.test_table_add_column VALUES ('1')") + spark.sql(f"INSERT INTO {catalog_name}.test_table_add_column VALUES ('1')") - spark.sql( - """ - ALTER TABLE default.test_table_add_column ADD COLUMN b string - """ - ) + spark.sql(f"ALTER TABLE {catalog_name}.test_table_add_column ADD COLUMN b string") - spark.sql("INSERT INTO default.test_table_add_column VALUES ('2', '2')") + spark.sql(f"INSERT INTO {catalog_name}.test_table_add_column VALUES ('2', '2')") From fc25fff929d5e889d363dbc2780e586759353c2d Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 17 Jan 2024 16:20:36 +0100 Subject: [PATCH 7/8] Update test suite --- dev/provision.py | 12 +- .../{test_rest.py => test_catalogs.py} | 148 ++++--- tests/integration/test_hive.py | 409 ------------------ 3 files changed, 87 insertions(+), 482 deletions(-) rename tests/integration/{test_rest.py => test_catalogs.py} (70%) delete mode 100644 tests/integration/test_hive.py diff --git a/dev/provision.py b/dev/provision.py index 3656361fec..e5048d2fa5 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -299,8 +299,8 @@ ) spark.sql( - """ - INSERT INTO default.test_table_sanitized_character + f""" + INSERT INTO {catalog_name}.default.test_table_sanitized_character VALUES ('123') """ @@ -308,15 +308,15 @@ spark.sql( f""" - CREATE TABLE {catalog_name}.test_table_add_column ( + CREATE TABLE {catalog_name}.default.test_table_add_column ( a string ) USING iceberg """ ) - spark.sql(f"INSERT INTO {catalog_name}.test_table_add_column VALUES ('1')") + spark.sql(f"INSERT INTO {catalog_name}.default.test_table_add_column VALUES ('1')") - spark.sql(f"ALTER TABLE {catalog_name}.test_table_add_column ADD COLUMN b string") + spark.sql(f"ALTER TABLE {catalog_name}.default.test_table_add_column ADD COLUMN b string") - spark.sql(f"INSERT INTO {catalog_name}.test_table_add_column VALUES ('2', '2')") + spark.sql(f"INSERT INTO {catalog_name}.default.test_table_add_column VALUES ('2', '2')") diff --git a/tests/integration/test_rest.py b/tests/integration/test_catalogs.py similarity index 70% rename from tests/integration/test_rest.py rename to tests/integration/test_catalogs.py index 2a173be3b3..485820df03 100644 --- a/tests/integration/test_rest.py +++ b/tests/integration/test_catalogs.py @@ -25,6 +25,7 @@ from pyarrow.fs import S3FileSystem from pyiceberg.catalog import Catalog, load_catalog +from pyiceberg.catalog.hive import HiveCatalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.expressions import ( And, @@ -50,7 +51,7 @@ @pytest.fixture() -def catalog() -> Catalog: +def catalog_rest() -> Catalog: return load_catalog( "local", **{ @@ -64,40 +65,23 @@ def catalog() -> Catalog: @pytest.fixture() -def table_test_null_nan(catalog: Catalog) -> Table: - return catalog.load_table("default.test_null_nan") - - -@pytest.fixture() -def table_test_null_nan_rewritten(catalog: Catalog) -> Table: - return catalog.load_table("default.test_null_nan_rewritten") - - -@pytest.fixture() -def table_test_limit(catalog: Catalog) -> Table: - return catalog.load_table("default.test_limit") - - -@pytest.fixture() -def table_test_all_types(catalog: Catalog) -> Table: - return catalog.load_table("default.test_all_types") - - -@pytest.fixture() -def table_test_table_version(catalog: Catalog) -> Table: - return catalog.load_table("default.test_table_version") - - -@pytest.fixture() -def table_test_table_sanitized_character(catalog: Catalog) -> Table: - return catalog.load_table("default.test_table_sanitized_character") +def catalog_hive() -> Catalog: + return load_catalog( + "local", + **{ + "type": "hive", + "uri": "http://localhost:9083", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ) TABLE_NAME = ("default", "t1") -@pytest.fixture() -def table(catalog: Catalog) -> Table: +def create_table(catalog: Catalog) -> Table: try: catalog.drop_table(TABLE_NAME) except NoSuchTableError: @@ -115,7 +99,12 @@ def table(catalog: Catalog) -> Table: @pytest.mark.integration -def test_table_properties(table: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_table_properties(catalog: Catalog) -> None: + if isinstance(catalog, HiveCatalog): + pytest.skip("Not yet implemented: https://github.com/apache/iceberg-python/issues/275") + table = create_table(catalog) + assert table.properties == DEFAULT_PROPERTIES with table.transaction() as transaction: @@ -137,26 +126,10 @@ def test_table_properties(table: Table) -> None: assert table.properties == DEFAULT_PROPERTIES -@pytest.fixture() -def test_positional_mor_deletes(catalog: Catalog) -> Table: - """Table that has positional deletes""" - return catalog.load_table("default.test_positional_mor_deletes") - - -@pytest.fixture() -def test_table_add_column(catalog: Catalog) -> Table: - """Table that has a new column""" - return catalog.load_table("default.test_table_add_column") - - -@pytest.fixture() -def test_positional_mor_double_deletes(catalog: Catalog) -> Table: - """Table that has multiple positional deletes""" - return catalog.load_table("default.test_positional_mor_double_deletes") - - @pytest.mark.integration -def test_pyarrow_nan(table_test_null_nan: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_pyarrow_nan(catalog: Catalog) -> None: + table_test_null_nan = catalog.load_table("default.test_null_nan") arrow_table = table_test_null_nan.scan(row_filter=IsNaN("col_numeric"), selected_fields=("idx", "col_numeric")).to_arrow() assert len(arrow_table) == 1 assert arrow_table["idx"][0].as_py() == 1 @@ -164,7 +137,9 @@ def test_pyarrow_nan(table_test_null_nan: Table) -> None: @pytest.mark.integration -def test_pyarrow_nan_rewritten(table_test_null_nan_rewritten: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_pyarrow_nan_rewritten(catalog: Catalog) -> None: + table_test_null_nan_rewritten = catalog.load_table("default.test_null_nan_rewritten") arrow_table = table_test_null_nan_rewritten.scan( row_filter=IsNaN("col_numeric"), selected_fields=("idx", "col_numeric") ).to_arrow() @@ -174,14 +149,19 @@ def test_pyarrow_nan_rewritten(table_test_null_nan_rewritten: Table) -> None: @pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) + @pytest.mark.skip(reason="Fixing issues with NaN's: https://github.com/apache/arrow/issues/34162") -def test_pyarrow_not_nan_count(table_test_null_nan: Table) -> None: +def test_pyarrow_not_nan_count(catalog: Catalog) -> None: + table_test_null_nan = catalog.load_table("default.test_null_nan") not_nan = table_test_null_nan.scan(row_filter=NotNaN("col_numeric"), selected_fields=("idx",)).to_arrow() assert len(not_nan) == 2 @pytest.mark.integration -def test_duckdb_nan(table_test_null_nan_rewritten: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_duckdb_nan(catalog: Catalog) -> None: + table_test_null_nan_rewritten = catalog.load_table("default.test_null_nan_rewritten") con = table_test_null_nan_rewritten.scan().to_duckdb("table_test_null_nan") result = con.query("SELECT idx, col_numeric FROM table_test_null_nan WHERE isnan(col_numeric)").fetchone() assert result[0] == 1 @@ -189,7 +169,9 @@ def test_duckdb_nan(table_test_null_nan_rewritten: Table) -> None: @pytest.mark.integration -def test_pyarrow_limit(table_test_limit: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_pyarrow_limit(catalog: Catalog) -> None: + table_test_limit = catalog.load_table("default.test_limit") limited_result = table_test_limit.scan(selected_fields=("idx",), limit=1).to_arrow() assert len(limited_result) == 1 @@ -200,16 +182,20 @@ def test_pyarrow_limit(table_test_limit: Table) -> None: assert len(full_result) == 10 -@pytest.mark.filterwarnings("ignore") @pytest.mark.integration -def test_ray_nan(table_test_null_nan_rewritten: Table) -> None: +@pytest.mark.filterwarnings("ignore") +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_ray_nan(catalog: Catalog) -> None: + table_test_null_nan_rewritten = catalog.load_table("default.test_null_nan_rewritten") ray_dataset = table_test_null_nan_rewritten.scan().to_ray() assert ray_dataset.count() == 3 assert math.isnan(ray_dataset.take()[0]["col_numeric"]) @pytest.mark.integration -def test_ray_nan_rewritten(table_test_null_nan_rewritten: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_ray_nan_rewritten(catalog: Catalog) -> None: + table_test_null_nan_rewritten = catalog.load_table("default.test_null_nan_rewritten") ray_dataset = table_test_null_nan_rewritten.scan( row_filter=IsNaN("col_numeric"), selected_fields=("idx", "col_numeric") ).to_ray() @@ -219,15 +205,20 @@ def test_ray_nan_rewritten(table_test_null_nan_rewritten: Table) -> None: @pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) + @pytest.mark.skip(reason="Fixing issues with NaN's: https://github.com/apache/arrow/issues/34162") -def test_ray_not_nan_count(table_test_null_nan_rewritten: Table) -> None: +def test_ray_not_nan_count(catalog: Catalog) -> None: + table_test_null_nan_rewritten = catalog.load_table("default.test_null_nan_rewritten") ray_dataset = table_test_null_nan_rewritten.scan(row_filter=NotNaN("col_numeric"), selected_fields=("idx",)).to_ray() print(ray_dataset.take()) assert ray_dataset.count() == 2 @pytest.mark.integration -def test_ray_all_types(table_test_all_types: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_ray_all_types(catalog: Catalog) -> None: + table_test_all_types = catalog.load_table("default.test_all_types") ray_dataset = table_test_all_types.scan().to_ray() pandas_dataframe = table_test_all_types.scan().to_pandas() assert ray_dataset.count() == pandas_dataframe.shape[0] @@ -235,7 +226,9 @@ def test_ray_all_types(table_test_all_types: Table) -> None: @pytest.mark.integration -def test_pyarrow_to_iceberg_all_types(table_test_all_types: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_pyarrow_to_iceberg_all_types(catalog: Catalog) -> None: + table_test_all_types = catalog.load_table("default.test_all_types") fs = S3FileSystem(endpoint_override="http://localhost:9000", access_key="admin", secret_key="password") data_file_paths = [task.file.file_path for task in table_test_all_types.scan().plan_files()] for data_file_path in data_file_paths: @@ -248,7 +241,8 @@ def test_pyarrow_to_iceberg_all_types(table_test_all_types: Table) -> None: @pytest.mark.integration -def test_pyarrow_deletes(test_positional_mor_deletes: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_pyarrow_deletes(catalog: Catalog) -> None: # number, letter # (1, 'a'), # (2, 'b'), @@ -262,6 +256,7 @@ def test_pyarrow_deletes(test_positional_mor_deletes: Table) -> None: # (10, 'j'), # (11, 'k'), # (12, 'l') + test_positional_mor_deletes = catalog.load_table("default.test_positional_mor_deletes") arrow_table = test_positional_mor_deletes.scan().to_arrow() assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12] @@ -283,7 +278,8 @@ def test_pyarrow_deletes(test_positional_mor_deletes: Table) -> None: @pytest.mark.integration -def test_pyarrow_deletes_double(test_positional_mor_double_deletes: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_pyarrow_deletes_double(catalog: Catalog) -> None: # number, letter # (1, 'a'), # (2, 'b'), @@ -297,6 +293,7 @@ def test_pyarrow_deletes_double(test_positional_mor_double_deletes: Table) -> No # (10, 'j'), # (11, 'k'), # (12, 'l') + test_positional_mor_double_deletes = catalog.load_table("default.test_positional_mor_double_deletes") arrow_table = test_positional_mor_double_deletes.scan().to_arrow() assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 7, 8, 10, 11, 12] @@ -318,6 +315,7 @@ def test_pyarrow_deletes_double(test_positional_mor_double_deletes: Table) -> No @pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) def test_partitioned_tables(catalog: Catalog) -> None: for table_name, predicate in [ ("test_partitioned_by_identity", "ts >= '2023-03-05T00:00:00+00:00'"), @@ -334,6 +332,7 @@ def test_partitioned_tables(catalog: Catalog) -> None: @pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) def test_unpartitioned_uuid_table(catalog: Catalog) -> None: unpartitioned_uuid = catalog.load_table("default.test_uuid_and_fixed_unpartitioned") arrow_table_eq = unpartitioned_uuid.scan(row_filter="uuid_col == '102cb62f-e6f8-4eb0-9973-d9b012ff0967'").to_arrow() @@ -350,6 +349,7 @@ def test_unpartitioned_uuid_table(catalog: Catalog) -> None: @pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) def test_unpartitioned_fixed_table(catalog: Catalog) -> None: fixed_table = catalog.load_table("default.test_uuid_and_fixed_unpartitioned") arrow_table_eq = fixed_table.scan(row_filter=EqualTo("fixed_col", b"1234567890123456789012345")).to_arrow() @@ -368,19 +368,25 @@ def test_unpartitioned_fixed_table(catalog: Catalog) -> None: @pytest.mark.integration -def test_scan_tag(test_positional_mor_deletes: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_scan_tag(catalog: Catalog) -> None: + test_positional_mor_deletes = catalog.load_table("default.test_positional_mor_deletes") arrow_table = test_positional_mor_deletes.scan().use_ref("tag_12").to_arrow() assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] @pytest.mark.integration -def test_scan_branch(test_positional_mor_deletes: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_scan_branch(catalog: Catalog) -> None: + test_positional_mor_deletes = catalog.load_table("default.test_positional_mor_deletes") arrow_table = test_positional_mor_deletes.scan().use_ref("without_5").to_arrow() assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12] @pytest.mark.integration -def test_filter_on_new_column(test_table_add_column: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_filter_on_new_column(catalog: Catalog) -> None: + test_table_add_column = catalog.load_table("default.test_table_add_column") arrow_table = test_table_add_column.scan(row_filter="b == '2'").to_arrow() assert arrow_table["b"].to_pylist() == ['2'] @@ -392,7 +398,12 @@ def test_filter_on_new_column(test_table_add_column: Table) -> None: @pytest.mark.integration -def test_upgrade_table_version(table_test_table_version: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_upgrade_table_version(catalog: Catalog) -> None: + if isinstance(catalog, HiveCatalog): + pytest.skip("Not yet implemented: https://github.com/apache/iceberg-python/issues/274") + table_test_table_version = catalog.load_table("default.test_table_version") + assert table_test_table_version.format_version == 1 with table_test_table_version.transaction() as transaction: @@ -416,8 +427,11 @@ def test_upgrade_table_version(table_test_table_version: Table) -> None: assert "Unsupported table format version: 3" in str(e.value) + @pytest.mark.integration -def test_reproduce_issue(table_test_table_sanitized_character: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_sanitize_character(catalog: Catalog) -> None: + table_test_table_sanitized_character = catalog.load_table("default.test_table_sanitized_character") arrow_table = table_test_table_sanitized_character.scan().to_arrow() assert len(arrow_table.schema.names), 1 assert len(table_test_table_sanitized_character.schema().fields), 1 diff --git a/tests/integration/test_hive.py b/tests/integration/test_hive.py deleted file mode 100644 index bb95fafc8c..0000000000 --- a/tests/integration/test_hive.py +++ /dev/null @@ -1,409 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# pylint:disable=redefined-outer-name - -import math -import uuid -from typing import Dict -from urllib.parse import urlparse - -import pyarrow.parquet as pq -import pytest -from pyarrow.fs import S3FileSystem - -from pyiceberg.catalog import Catalog, load_catalog -from pyiceberg.exceptions import NoSuchTableError -from pyiceberg.expressions import ( - And, - EqualTo, - GreaterThanOrEqual, - IsNaN, - LessThan, - NotEqualTo, - NotNaN, -) -from pyiceberg.io.pyarrow import pyarrow_to_schema -from pyiceberg.schema import Schema -from pyiceberg.table import Table -from pyiceberg.types import ( - BooleanType, - IntegerType, - NestedField, - StringType, - TimestampType, -) - -DEFAULT_PROPERTIES: Dict[str, str] = {} - - -@pytest.fixture() -def catalog() -> Catalog: - return load_catalog( - "local", - **{ - "type": "hive", - "uri": "http://localhost:9083", - "s3.endpoint": "http://localhost:9000", - "s3.access-key-id": "admin", - "s3.secret-access-key": "password", - }, - ) - - -@pytest.fixture() -def table_test_null_nan(catalog: Catalog) -> Table: - return catalog.load_table("default.test_null_nan") - - -@pytest.fixture() -def table_test_null_nan_rewritten(catalog: Catalog) -> Table: - return catalog.load_table("default.test_null_nan_rewritten") - - -@pytest.fixture() -def table_test_limit(catalog: Catalog) -> Table: - return catalog.load_table("default.test_limit") - - -@pytest.fixture() -def table_test_all_types(catalog: Catalog) -> Table: - return catalog.load_table("default.test_all_types") - - -@pytest.fixture() -def table_test_table_version(catalog: Catalog) -> Table: - return catalog.load_table("default.test_table_version") - - -@pytest.fixture() -def table_test_table_sanitized_character(catalog: Catalog) -> Table: - return catalog.load_table("default.test_table_sanitized_character") - - -TABLE_NAME = ("default", "t1") - - -@pytest.fixture() -def table(catalog: Catalog) -> Table: - try: - catalog.drop_table(TABLE_NAME) - except NoSuchTableError: - pass # Just to make sure that the table doesn't exist - - schema = Schema( - NestedField(field_id=1, name="str", field_type=StringType(), required=False), - NestedField(field_id=2, name="int", field_type=IntegerType(), required=True), - NestedField(field_id=3, name="bool", field_type=BooleanType(), required=False), - NestedField(field_id=4, name="datetime", field_type=TimestampType(), required=False), - schema_id=1, - ) - - return catalog.create_table(identifier=TABLE_NAME, schema=schema) - - -@pytest.mark.integration -@pytest.mark.skip(reason="Not yet implemented: https://github.com/apache/iceberg-python/issues/205") -def test_table_properties(table: Table) -> None: - assert table.properties == DEFAULT_PROPERTIES - - with table.transaction() as transaction: - transaction.set_properties(abc="🤪") - - assert table.properties == dict(abc="🤪", **DEFAULT_PROPERTIES) - - with table.transaction() as transaction: - transaction.remove_properties("abc") - - assert table.properties == DEFAULT_PROPERTIES - - table = table.transaction().set_properties(abc="def").commit_transaction() - - assert table.properties == dict(abc="def", **DEFAULT_PROPERTIES) - - table = table.transaction().remove_properties("abc").commit_transaction() - - assert table.properties == DEFAULT_PROPERTIES - - -@pytest.fixture() -def test_positional_mor_deletes(catalog: Catalog) -> Table: - """Table that has positional deletes""" - return catalog.load_table("default.test_positional_mor_deletes") - - -@pytest.fixture() -def test_positional_mor_double_deletes(catalog: Catalog) -> Table: - """Table that has multiple positional deletes""" - return catalog.load_table("default.test_positional_mor_double_deletes") - - -@pytest.mark.integration -def test_pyarrow_nan(table_test_null_nan: Table) -> None: - arrow_table = table_test_null_nan.scan(row_filter=IsNaN("col_numeric"), selected_fields=("idx", "col_numeric")).to_arrow() - assert len(arrow_table) == 1 - assert arrow_table["idx"][0].as_py() == 1 - assert math.isnan(arrow_table["col_numeric"][0].as_py()) - - -@pytest.mark.integration -def test_pyarrow_nan_rewritten(table_test_null_nan_rewritten: Table) -> None: - arrow_table = table_test_null_nan_rewritten.scan( - row_filter=IsNaN("col_numeric"), selected_fields=("idx", "col_numeric") - ).to_arrow() - assert len(arrow_table) == 1 - assert arrow_table["idx"][0].as_py() == 1 - assert math.isnan(arrow_table["col_numeric"][0].as_py()) - - -@pytest.mark.integration -@pytest.mark.skip(reason="Fixing issues with NaN's: https://github.com/apache/arrow/issues/34162") -def test_pyarrow_not_nan_count(table_test_null_nan: Table) -> None: - not_nan = table_test_null_nan.scan(row_filter=NotNaN("col_numeric"), selected_fields=("idx",)).to_arrow() - assert len(not_nan) == 2 - - -@pytest.mark.integration -def test_duckdb_nan(table_test_null_nan_rewritten: Table) -> None: - con = table_test_null_nan_rewritten.scan().to_duckdb("table_test_null_nan") - result = con.query("SELECT idx, col_numeric FROM table_test_null_nan WHERE isnan(col_numeric)").fetchone() - assert result[0] == 1 - assert math.isnan(result[1]) - - -@pytest.mark.integration -def test_pyarrow_limit(table_test_limit: Table) -> None: - limited_result = table_test_limit.scan(selected_fields=("idx",), limit=1).to_arrow() - assert len(limited_result) == 1 - - empty_result = table_test_limit.scan(selected_fields=("idx",), limit=0).to_arrow() - assert len(empty_result) == 0 - - full_result = table_test_limit.scan(selected_fields=("idx",), limit=999).to_arrow() - assert len(full_result) == 10 - - -@pytest.mark.filterwarnings("ignore") -@pytest.mark.integration -def test_ray_nan(table_test_null_nan_rewritten: Table) -> None: - ray_dataset = table_test_null_nan_rewritten.scan().to_ray() - assert ray_dataset.count() == 3 - assert math.isnan(ray_dataset.take()[0]["col_numeric"]) - - -@pytest.mark.integration -def test_ray_nan_rewritten(table_test_null_nan_rewritten: Table) -> None: - ray_dataset = table_test_null_nan_rewritten.scan( - row_filter=IsNaN("col_numeric"), selected_fields=("idx", "col_numeric") - ).to_ray() - assert ray_dataset.count() == 1 - assert ray_dataset.take()[0]["idx"] == 1 - assert math.isnan(ray_dataset.take()[0]["col_numeric"]) - - -@pytest.mark.integration -@pytest.mark.skip(reason="Fixing issues with NaN's: https://github.com/apache/arrow/issues/34162") -def test_ray_not_nan_count(table_test_null_nan_rewritten: Table) -> None: - ray_dataset = table_test_null_nan_rewritten.scan(row_filter=NotNaN("col_numeric"), selected_fields=("idx",)).to_ray() - print(ray_dataset.take()) - assert ray_dataset.count() == 2 - - -@pytest.mark.integration -def test_ray_all_types(table_test_all_types: Table) -> None: - ray_dataset = table_test_all_types.scan().to_ray() - pandas_dataframe = table_test_all_types.scan().to_pandas() - assert ray_dataset.count() == pandas_dataframe.shape[0] - assert pandas_dataframe.equals(ray_dataset.to_pandas()) - - -@pytest.mark.integration -def test_pyarrow_to_iceberg_all_types(table_test_all_types: Table) -> None: - fs = S3FileSystem(endpoint_override="http://localhost:9000", access_key="admin", secret_key="password") - data_file_paths = [task.file.file_path for task in table_test_all_types.scan().plan_files()] - for data_file_path in data_file_paths: - uri = urlparse(data_file_path) - with fs.open_input_file(f"{uri.netloc}{uri.path}") as fout: - parquet_schema = pq.read_schema(fout) - stored_iceberg_schema = Schema.model_validate_json(parquet_schema.metadata.get(b"iceberg.schema")) - converted_iceberg_schema = pyarrow_to_schema(parquet_schema) - assert converted_iceberg_schema == stored_iceberg_schema - - -@pytest.mark.integration -def test_pyarrow_deletes(test_positional_mor_deletes: Table) -> None: - # number, letter - # (1, 'a'), - # (2, 'b'), - # (3, 'c'), - # (4, 'd'), - # (5, 'e'), - # (6, 'f'), - # (7, 'g'), - # (8, 'h'), - # (9, 'i'), <- deleted - # (10, 'j'), - # (11, 'k'), - # (12, 'l') - arrow_table = test_positional_mor_deletes.scan().to_arrow() - assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12] - - # Checking the filter - arrow_table = test_positional_mor_deletes.scan( - row_filter=And(GreaterThanOrEqual("letter", "e"), LessThan("letter", "k")) - ).to_arrow() - assert arrow_table["number"].to_pylist() == [5, 6, 7, 8, 10] - - # Testing the combination of a filter and a limit - arrow_table = test_positional_mor_deletes.scan( - row_filter=And(GreaterThanOrEqual("letter", "e"), LessThan("letter", "k")), limit=1 - ).to_arrow() - assert arrow_table["number"].to_pylist() == [5] - - # Testing the slicing of indices - arrow_table = test_positional_mor_deletes.scan(limit=3).to_arrow() - assert arrow_table["number"].to_pylist() == [1, 2, 3] - - -@pytest.mark.integration -def test_pyarrow_deletes_double(test_positional_mor_double_deletes: Table) -> None: - # number, letter - # (1, 'a'), - # (2, 'b'), - # (3, 'c'), - # (4, 'd'), - # (5, 'e'), - # (6, 'f'), <- second delete - # (7, 'g'), - # (8, 'h'), - # (9, 'i'), <- first delete - # (10, 'j'), - # (11, 'k'), - # (12, 'l') - arrow_table = test_positional_mor_double_deletes.scan().to_arrow() - assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 7, 8, 10, 11, 12] - - # Checking the filter - arrow_table = test_positional_mor_double_deletes.scan( - row_filter=And(GreaterThanOrEqual("letter", "e"), LessThan("letter", "k")) - ).to_arrow() - assert arrow_table["number"].to_pylist() == [5, 7, 8, 10] - - # Testing the combination of a filter and a limit - arrow_table = test_positional_mor_double_deletes.scan( - row_filter=And(GreaterThanOrEqual("letter", "e"), LessThan("letter", "k")), limit=1 - ).to_arrow() - assert arrow_table["number"].to_pylist() == [5] - - # Testing the slicing of indices - arrow_table = test_positional_mor_double_deletes.scan(limit=8).to_arrow() - assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 7, 8, 10] - - -@pytest.mark.integration -def test_partitioned_tables(catalog: Catalog) -> None: - for table_name, predicate in [ - ("test_partitioned_by_identity", "ts >= '2023-03-05T00:00:00+00:00'"), - ("test_partitioned_by_years", "dt >= '2023-03-05'"), - ("test_partitioned_by_months", "dt >= '2023-03-05'"), - ("test_partitioned_by_days", "ts >= '2023-03-05T00:00:00+00:00'"), - ("test_partitioned_by_hours", "ts >= '2023-03-05T00:00:00+00:00'"), - ("test_partitioned_by_truncate", "letter >= 'e'"), - ("test_partitioned_by_bucket", "number >= '5'"), - ]: - table = catalog.load_table(f"default.{table_name}") - arrow_table = table.scan(selected_fields=("number",), row_filter=predicate).to_arrow() - assert set(arrow_table["number"].to_pylist()) == {5, 6, 7, 8, 9, 10, 11, 12}, f"Table {table_name}, predicate {predicate}" - - -@pytest.mark.integration -def test_unpartitioned_uuid_table(catalog: Catalog) -> None: - unpartitioned_uuid = catalog.load_table("default.test_uuid_and_fixed_unpartitioned") - arrow_table_eq = unpartitioned_uuid.scan(row_filter="uuid_col == '102cb62f-e6f8-4eb0-9973-d9b012ff0967'").to_arrow() - assert arrow_table_eq["uuid_col"].to_pylist() == [uuid.UUID("102cb62f-e6f8-4eb0-9973-d9b012ff0967").bytes] - - arrow_table_neq = unpartitioned_uuid.scan( - row_filter="uuid_col != '102cb62f-e6f8-4eb0-9973-d9b012ff0967' and uuid_col != '639cccce-c9d2-494a-a78c-278ab234f024'" - ).to_arrow() - assert arrow_table_neq["uuid_col"].to_pylist() == [ - uuid.UUID("ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226").bytes, - uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b").bytes, - uuid.UUID("923dae77-83d6-47cd-b4b0-d383e64ee57e").bytes, - ] - - -@pytest.mark.integration -def test_unpartitioned_fixed_table(catalog: Catalog) -> None: - fixed_table = catalog.load_table("default.test_uuid_and_fixed_unpartitioned") - arrow_table_eq = fixed_table.scan(row_filter=EqualTo("fixed_col", b"1234567890123456789012345")).to_arrow() - assert arrow_table_eq["fixed_col"].to_pylist() == [b"1234567890123456789012345"] - - arrow_table_neq = fixed_table.scan( - row_filter=And( - NotEqualTo("fixed_col", b"1234567890123456789012345"), NotEqualTo("uuid_col", "c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b") - ) - ).to_arrow() - assert arrow_table_neq["fixed_col"].to_pylist() == [ - b"1231231231231231231231231", - b"12345678901234567ass12345", - b"qweeqwwqq1231231231231111", - ] - - -@pytest.mark.integration -def test_scan_tag(test_positional_mor_deletes: Table) -> None: - arrow_table = test_positional_mor_deletes.scan().use_ref("tag_12").to_arrow() - assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] - - -@pytest.mark.integration -def test_scan_branch(test_positional_mor_deletes: Table) -> None: - arrow_table = test_positional_mor_deletes.scan().use_ref("without_5").to_arrow() - assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12] - - -@pytest.mark.integration -@pytest.mark.skip(reason="Not yet implemented: https://github.com/apache/iceberg-python/issues/205") -def test_upgrade_table_version(table_test_table_version: Table) -> None: - assert table_test_table_version.format_version == 1 - - with table_test_table_version.transaction() as transaction: - transaction.upgrade_table_version(format_version=1) - - assert table_test_table_version.format_version == 1 - - with table_test_table_version.transaction() as transaction: - transaction.upgrade_table_version(format_version=2) - - assert table_test_table_version.format_version == 2 - - with pytest.raises(ValueError) as e: # type: ignore - with table_test_table_version.transaction() as transaction: - transaction.upgrade_table_version(format_version=1) - assert "Cannot downgrade v2 table to v1" in str(e.value) - - with pytest.raises(ValueError) as e: - with table_test_table_version.transaction() as transaction: - transaction.upgrade_table_version(format_version=3) - assert "Unsupported table format version: 3" in str(e.value) - - -@pytest.mark.integration -def test_reproduce_issue(table_test_table_sanitized_character: Table) -> None: - arrow_table = table_test_table_sanitized_character.scan().to_arrow() - assert len(arrow_table.schema.names), 1 - assert len(table_test_table_sanitized_character.schema().fields), 1 - assert arrow_table.schema.names[0] == table_test_table_sanitized_character.schema().fields[0].name From 64f4732392f1a5e5f1790736f44eae20a4687e77 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 17 Jan 2024 16:22:40 +0100 Subject: [PATCH 8/8] Whitespace --- tests/integration/test_catalogs.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/integration/test_catalogs.py b/tests/integration/test_catalogs.py index 485820df03..3fbdb69ebe 100644 --- a/tests/integration/test_catalogs.py +++ b/tests/integration/test_catalogs.py @@ -150,7 +150,6 @@ def test_pyarrow_nan_rewritten(catalog: Catalog) -> None: @pytest.mark.integration @pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) - @pytest.mark.skip(reason="Fixing issues with NaN's: https://github.com/apache/arrow/issues/34162") def test_pyarrow_not_nan_count(catalog: Catalog) -> None: table_test_null_nan = catalog.load_table("default.test_null_nan") @@ -206,7 +205,6 @@ def test_ray_nan_rewritten(catalog: Catalog) -> None: @pytest.mark.integration @pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) - @pytest.mark.skip(reason="Fixing issues with NaN's: https://github.com/apache/arrow/issues/34162") def test_ray_not_nan_count(catalog: Catalog) -> None: table_test_null_nan_rewritten = catalog.load_table("default.test_null_nan_rewritten") @@ -427,7 +425,6 @@ def test_upgrade_table_version(catalog: Catalog) -> None: assert "Unsupported table format version: 3" in str(e.value) - @pytest.mark.integration @pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) def test_sanitize_character(catalog: Catalog) -> None: