From 0161e503dc79092efc64a7960a02fdbd21edaa41 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 15 Feb 2019 10:42:06 +0100 Subject: [PATCH 01/11] [SPARK-26856][PYSPARK] Python support for from_avro and to_avro APIs --- docs/sql-data-sources-avro.md | 31 ++++++++++++++++++ python/pyspark/sql/functions.py | 56 +++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md index afb91ae86885..7e84a8c50a31 100644 --- a/docs/sql-data-sources-avro.md +++ b/docs/sql-data-sources-avro.md @@ -137,6 +137,37 @@ StreamingQuery query = output .option("topic", "topic2") .start(); +{% endhighlight %} + +
+{% highlight python %} +from pyspark.sql.functions import from_avro, to_avro + +# `from_avro` requires Avro schema in JSON string format. +jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read() + +df = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribe", "topic1") + .load() + +# 1. Decode the Avro data into a struct; +# 2. Filter by column `favorite_color`; +# 3. Encode the column `name` in Avro format. +output = df + .select(from_avro("value", jsonFormatSchema).alias("user")) + .where("user.favorite_color == \"red\"") + .select(to_avro("user.name").alias("value")) + +query = output + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic2") + .start() + {% endhighlight %}
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 3c33e2bed92d..b042cf9d7e3d 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2402,6 +2402,62 @@ def to_csv(col, options={}): return Column(jc) +@since(3.0) +def from_avro(col, jsonFormatSchema, options={}): + """ + Converts a binary column of avro format into its corresponding catalyst value. The specified + schema must match the read data, otherwise the behavior is undefined: it may fail or return + arbitrary result. + + Avro is built-in but external data source module since Spark 2.4. Please deploy the application + as per the deployment section of "Apache Avro Data Source Guide". + + :param data: the binary column. + :param jsonFormatSchema: the avro schema in JSON string format. + :param options: options to control how the Avro record is parsed. + + >>> from pyspark.sql import Row + >>> from pyspark.sql.functions import from_avro, to_avro + >>> data = [(1, Row(name='Alice', age=2))] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> avroDf = df.select(to_avro(df.value).alias("avro")) + >>> avroDf.collect() + [Row(avro=bytearray(b'\x00\x00\x04\x00\nAlice'))] + >>> jsonFormatSchema = '''{"type":"record","name":"topLevelRecord","fields": + ... [{"name":"avro","type":[{"type":"record","name":"value","namespace":"topLevelRecord","fields": + ... [{"name":"age","type":["long","null"]},{"name":"name","type":["string","null"]}]},"null"]}]}''' + >>> avroDf.select(from_avro(avroDf.avro, jsonFormatSchema).alias("value")).collect() + [Row(value=Row(avro=Row(age=2, name=u'Alice')))] + """ + + sc = SparkContext._active_spark_context + jc = sc._jvm.org.apache.spark.sql.avro.functions.from_avro(_to_java_column(col), jsonFormatSchema, options) + return Column(jc) + + +@since(3.0) +def to_avro(col): + """ + Converts a column into binary of avro format. + + Avro is built-in but external data source module since Spark 2.4. Please deploy the application + as per the deployment section of "Apache Avro Data Source Guide". + + :param data: the data column. + + >>> from pyspark.sql import Row + >>> from pyspark.sql.functions import to_avro + >>> data = [(1, Row(name='Alice', age=2))] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(to_avro(df.value).alias("avro")).collect() + [Row(avro=bytearray(b'\x00\x00\x04\x00\nAlice'))] + """ + + sc = SparkContext._active_spark_context + jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(col)) + return Column(jc) + + @since(1.5) def size(col): """ From 1733afe7dfe91bb48c884f0fb223293a098df0f3 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 15 Feb 2019 13:24:08 +0100 Subject: [PATCH 02/11] Style fix --- python/pyspark/sql/functions.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index b042cf9d7e3d..dca0c72ec87b 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2424,14 +2424,16 @@ def from_avro(col, jsonFormatSchema, options={}): >>> avroDf.collect() [Row(avro=bytearray(b'\x00\x00\x04\x00\nAlice'))] >>> jsonFormatSchema = '''{"type":"record","name":"topLevelRecord","fields": - ... [{"name":"avro","type":[{"type":"record","name":"value","namespace":"topLevelRecord","fields": - ... [{"name":"age","type":["long","null"]},{"name":"name","type":["string","null"]}]},"null"]}]}''' + ... [{"name":"avro","type":[{"type":"record","name":"value","namespace":"topLevelRecord", + ... "fields":[{"name":"age","type":["long","null"]}, + ... {"name":"name","type":["string","null"]}]},"null"]}]}''' >>> avroDf.select(from_avro(avroDf.avro, jsonFormatSchema).alias("value")).collect() [Row(value=Row(avro=Row(age=2, name=u'Alice')))] """ sc = SparkContext._active_spark_context - jc = sc._jvm.org.apache.spark.sql.avro.functions.from_avro(_to_java_column(col), jsonFormatSchema, options) + jc = sc._jvm.org.apache.spark.sql.avro.functions.from_avro(_to_java_column(col), + jsonFormatSchema, options) return Column(jc) From ea67dca9b2a945cb865a7e7946a1226eeda5a657 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 15 Feb 2019 13:51:43 +0100 Subject: [PATCH 03/11] Style fix --- python/pyspark/sql/functions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index dca0c72ec87b..99e1432c4415 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2422,7 +2422,7 @@ def from_avro(col, jsonFormatSchema, options={}): >>> df = spark.createDataFrame(data, ("key", "value")) >>> avroDf = df.select(to_avro(df.value).alias("avro")) >>> avroDf.collect() - [Row(avro=bytearray(b'\x00\x00\x04\x00\nAlice'))] + [Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))] >>> jsonFormatSchema = '''{"type":"record","name":"topLevelRecord","fields": ... [{"name":"avro","type":[{"type":"record","name":"value","namespace":"topLevelRecord", ... "fields":[{"name":"age","type":["long","null"]}, @@ -2452,7 +2452,7 @@ def to_avro(col): >>> data = [(1, Row(name='Alice', age=2))] >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(to_avro(df.value).alias("avro")).collect() - [Row(avro=bytearray(b'\x00\x00\x04\x00\nAlice'))] + [Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))] """ sc = SparkContext._active_spark_context From 331cfcdc0c7f40ba1a0f6351f5309f2835782789 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 18 Feb 2019 22:04:51 +0100 Subject: [PATCH 04/11] * Moved to package pyspark.sql.avro.functions * Added avro artifact * Some refactoring * Formatting fixes --- dev/sparktestsupport/modules.py | 4 + docs/sql-data-sources-avro.md | 28 ++--- python/pyspark/sql/avro/__init__.py | 18 +++ python/pyspark/sql/avro/functions.py | 134 +++++++++++++++++++++++ python/pyspark/sql/functions.py | 58 ---------- python/pyspark/streaming/kinesis.py | 30 ++--- python/pyspark/testing/streamingutils.py | 31 +----- python/pyspark/testing/utils.py | 25 +++++ python/pyspark/util.py | 27 +++++ 9 files changed, 232 insertions(+), 123 deletions(-) create mode 100644 python/pyspark/sql/avro/__init__.py create mode 100644 python/pyspark/sql/avro/functions.py diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index eef7f259391b..1c9ea3a08eb4 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -178,6 +178,10 @@ def __hash__(self): ], sbt_test_goals=[ "avro/test", + ], + python_test_goals=[ + # doctests + "pyspark.sql.avro.functions" ] ) diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md index 7e84a8c50a31..40d53fbe9871 100644 --- a/docs/sql-data-sources-avro.md +++ b/docs/sql-data-sources-avro.md @@ -141,31 +141,31 @@ StreamingQuery query = output
{% highlight python %} -from pyspark.sql.functions import from_avro, to_avro +from pyspark.sql.avro.functions import from_avro, to_avro # `from_avro` requires Avro schema in JSON string format. jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read() -df = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("subscribe", "topic1") +df = spark\ + .readStream\ + .format("kafka")\ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\ + .option("subscribe", "topic1")\ .load() # 1. Decode the Avro data into a struct; # 2. Filter by column `favorite_color`; # 3. Encode the column `name` in Avro format. -output = df - .select(from_avro("value", jsonFormatSchema).alias("user")) - .where("user.favorite_color == \"red\"") +output = df\ + .select(from_avro("value", jsonFormatSchema).alias("user"))\ + .where('user.favorite_color == "red"')\ .select(to_avro("user.name").alias("value")) -query = output - .writeStream - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("topic", "topic2") +query = output\ + .writeStream\ + .format("kafka")\ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\ + .option("topic", "topic2")\ .start() {% endhighlight %} diff --git a/python/pyspark/sql/avro/__init__.py b/python/pyspark/sql/avro/__init__.py new file mode 100644 index 000000000000..8f01fbbf6482 --- /dev/null +++ b/python/pyspark/sql/avro/__init__.py @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +__all__ = ['functions'] diff --git a/python/pyspark/sql/avro/functions.py b/python/pyspark/sql/avro/functions.py new file mode 100644 index 000000000000..ab0abe2c2cbc --- /dev/null +++ b/python/pyspark/sql/avro/functions.py @@ -0,0 +1,134 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A collections of builtin avro functions +""" + + +from pyspark import since, SparkContext +from pyspark.sql.column import Column, _to_java_column +from pyspark.util import _print_missing_jar + + +@since(3.0) +def from_avro(col, jsonFormatSchema, options={}): + """ + Converts a binary column of avro format into its corresponding catalyst value. The specified + schema must match the read data, otherwise the behavior is undefined: it may fail or return + arbitrary result. + + Avro is built-in but external data source module since Spark 2.4. Please deploy the application + as per the deployment section of "Apache Avro Data Source Guide". + + :param data: the binary column. + :param jsonFormatSchema: the avro schema in JSON string format. + :param options: options to control how the Avro record is parsed. + + >>> from pyspark.sql import Row + >>> from pyspark.sql.avro.functions import from_avro, to_avro + >>> data = [(1, Row(name='Alice', age=2))] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> avroDf = df.select(to_avro(df.value).alias("avro")) + >>> avroDf.collect() + [Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))] + >>> jsonFormatSchema = '''{"type":"record","name":"topLevelRecord","fields": + ... [{"name":"avro","type":[{"type":"record","name":"value","namespace":"topLevelRecord", + ... "fields":[{"name":"age","type":["long","null"]}, + ... {"name":"name","type":["string","null"]}]},"null"]}]}''' + >>> avroDf.select(from_avro(avroDf.avro, jsonFormatSchema).alias("value")).collect() + [Row(value=Row(avro=Row(age=2, name=u'Alice')))] + """ + + sc = SparkContext._active_spark_context + try: + jc = sc._jvm.org.apache.spark.sql.avro.functions.from_avro( + _to_java_column(col), jsonFormatSchema, options) + except TypeError as e: + if str(e) == "'JavaPackage' object is not callable": + _print_missing_jar("Avro", "avro", "avro", ssc.sparkContext.version) + raise + return Column(jc) + + +@since(3.0) +def to_avro(col): + """ + Converts a column into binary of avro format. + + Avro is built-in but external data source module since Spark 2.4. Please deploy the application + as per the deployment section of "Apache Avro Data Source Guide". + + :param data: the data column. + + >>> from pyspark.sql import Row + >>> from pyspark.sql.avro.functions import to_avro + >>> data = [(1, Row(name='Alice', age=2))] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(to_avro(df.value).alias("avro")).collect() + [Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))] + """ + + sc = SparkContext._active_spark_context + try: + jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(col)) + except TypeError as e: + if str(e) == "'JavaPackage' object is not callable": + _print_missing_jar("Avro", "avro", "avro", ssc.sparkContext.version) + raise + return Column(jc) + + +def _test(): + import os + import sys + from pyspark.testing.utils import search_jar + avro_jar = search_jar("external/avro", "spark-avro") + if avro_jar is None: + print( + "Skipping all Avro Python tests as the optional Avro project was " + "not compiled into a JAR. To run these tests, " + "you need to build Spark with 'build/sbt -Pavro package' or " + "'build/mvn -Pavro package' before running this test.") + sys.exit(0) + else: + existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell") + jars_args = "--jars %s" % avro_jar + os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args]) + + import doctest + from pyspark.sql import Row, SparkSession + import pyspark.sql.avro.functions + globs = pyspark.sql.avro.functions.__dict__.copy() + spark = SparkSession.builder\ + .master("local[4]")\ + .appName("sql.avro.functions tests")\ + .getOrCreate() + sc = spark.sparkContext + globs['sc'] = sc + globs['spark'] = spark + globs['df'] = spark.createDataFrame([Row(name='Alice', age=2), Row(name='Bob', age=5)]) + (failure_count, test_count) = doctest.testmod( + pyspark.sql.avro.functions, globs=globs, + optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) + spark.stop() + if failure_count: + sys.exit(-1) + + +if __name__ == "__main__": + _test() diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 99e1432c4415..3c33e2bed92d 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2402,64 +2402,6 @@ def to_csv(col, options={}): return Column(jc) -@since(3.0) -def from_avro(col, jsonFormatSchema, options={}): - """ - Converts a binary column of avro format into its corresponding catalyst value. The specified - schema must match the read data, otherwise the behavior is undefined: it may fail or return - arbitrary result. - - Avro is built-in but external data source module since Spark 2.4. Please deploy the application - as per the deployment section of "Apache Avro Data Source Guide". - - :param data: the binary column. - :param jsonFormatSchema: the avro schema in JSON string format. - :param options: options to control how the Avro record is parsed. - - >>> from pyspark.sql import Row - >>> from pyspark.sql.functions import from_avro, to_avro - >>> data = [(1, Row(name='Alice', age=2))] - >>> df = spark.createDataFrame(data, ("key", "value")) - >>> avroDf = df.select(to_avro(df.value).alias("avro")) - >>> avroDf.collect() - [Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))] - >>> jsonFormatSchema = '''{"type":"record","name":"topLevelRecord","fields": - ... [{"name":"avro","type":[{"type":"record","name":"value","namespace":"topLevelRecord", - ... "fields":[{"name":"age","type":["long","null"]}, - ... {"name":"name","type":["string","null"]}]},"null"]}]}''' - >>> avroDf.select(from_avro(avroDf.avro, jsonFormatSchema).alias("value")).collect() - [Row(value=Row(avro=Row(age=2, name=u'Alice')))] - """ - - sc = SparkContext._active_spark_context - jc = sc._jvm.org.apache.spark.sql.avro.functions.from_avro(_to_java_column(col), - jsonFormatSchema, options) - return Column(jc) - - -@since(3.0) -def to_avro(col): - """ - Converts a column into binary of avro format. - - Avro is built-in but external data source module since Spark 2.4. Please deploy the application - as per the deployment section of "Apache Avro Data Source Guide". - - :param data: the data column. - - >>> from pyspark.sql import Row - >>> from pyspark.sql.functions import to_avro - >>> data = [(1, Row(name='Alice', age=2))] - >>> df = spark.createDataFrame(data, ("key", "value")) - >>> df.select(to_avro(df.value).alias("avro")).collect() - [Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))] - """ - - sc = SparkContext._active_spark_context - jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(col)) - return Column(jc) - - @since(1.5) def size(col): """ diff --git a/python/pyspark/streaming/kinesis.py b/python/pyspark/streaming/kinesis.py index b3348828fdf6..4ed9f2a40c3a 100644 --- a/python/pyspark/streaming/kinesis.py +++ b/python/pyspark/streaming/kinesis.py @@ -18,6 +18,8 @@ from pyspark.serializers import NoOpSerializer from pyspark.storagelevel import StorageLevel from pyspark.streaming import DStream +from pyspark.util import _print_missing_jar + __all__ = ['KinesisUtils', 'InitialPositionInStream', 'utf8_decoder'] @@ -82,7 +84,11 @@ def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName, helper = ssc._jvm.org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper() except TypeError as e: if str(e) == "'JavaPackage' object is not callable": - KinesisUtils._printErrorMsg(ssc.sparkContext) + _print_missing_jar( + "Streaming's Kinesis", + "streaming-kinesis-asl", + "streaming-kinesis-asl-assembly", + ssc.sparkContext.version) raise jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl, regionName, initialPositionInStream, jduration, jlevel, @@ -91,28 +97,6 @@ def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName, stream = DStream(jstream, ssc, NoOpSerializer()) return stream.map(lambda v: decoder(v)) - @staticmethod - def _printErrorMsg(sc): - print(""" -________________________________________________________________________________________________ - - Spark Streaming's Kinesis libraries not found in class path. Try one of the following. - - 1. Include the Kinesis library and its dependencies with in the - spark-submit command as - - $ bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl:%s ... - - 2. Download the JAR of the artifact from Maven Central http://search.maven.org/, - Group Id = org.apache.spark, Artifact Id = spark-streaming-kinesis-asl-assembly, Version = %s. - Then, include the jar in the spark-submit command as - - $ bin/spark-submit --jars ... - -________________________________________________________________________________________________ - -""" % (sc.version, sc.version)) - class InitialPositionInStream(object): LATEST, TRIM_HORIZON = (0, 1) diff --git a/python/pyspark/testing/streamingutils.py b/python/pyspark/testing/streamingutils.py index 85a2fa14b936..0b3a68d18fd6 100644 --- a/python/pyspark/testing/streamingutils.py +++ b/python/pyspark/testing/streamingutils.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import glob import os import tempfile import time @@ -22,32 +21,7 @@ from pyspark import SparkConf, SparkContext, RDD from pyspark.streaming import StreamingContext - - -def search_kinesis_asl_assembly_jar(): - kinesis_asl_assembly_dir = os.path.join( - os.environ["SPARK_HOME"], "external/kinesis-asl-assembly") - - # We should ignore the following jars - ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", "tests.jar") - - # Search jar in the project dir using the jar name_prefix for both sbt build and maven - # build because the artifact jars are in different directories. - name_prefix = "spark-streaming-kinesis-asl-assembly" - sbt_build = glob.glob(os.path.join( - kinesis_asl_assembly_dir, "target/scala-*/%s-*.jar" % name_prefix)) - maven_build = glob.glob(os.path.join( - kinesis_asl_assembly_dir, "target/%s_*.jar" % name_prefix)) - jar_paths = sbt_build + maven_build - jars = [jar for jar in jar_paths if not jar.endswith(ignored_jar_suffixes)] - - if not jars: - return None - elif len(jars) > 1: - raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs: %s; please " - "remove all but one") % (", ".join(jars))) - else: - return jars[0] +from pyspark.testing.utils import search_jar # Must be same as the variable and condition defined in KinesisTestUtils.scala and modules.py @@ -59,7 +33,8 @@ def search_kinesis_asl_assembly_jar(): "Skipping all Kinesis Python tests as environmental variable 'ENABLE_KINESIS_TESTS' " "was not set.") else: - kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar() + kinesis_asl_assembly_jar = search_jar("external/kinesis-asl-assembly", + "spark-streaming-kinesis-asl-assembly") if kinesis_asl_assembly_jar is None: kinesis_requirement_message = ( "Skipping all Kinesis Python tests as the optional Kinesis project was " diff --git a/python/pyspark/testing/utils.py b/python/pyspark/testing/utils.py index 7df0acae026f..c6a528194f00 100644 --- a/python/pyspark/testing/utils.py +++ b/python/pyspark/testing/utils.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import glob import os import struct import sys @@ -100,3 +101,27 @@ def write(self, b): def close(self): pass + + +def search_jar(project_relative_path, jar_name_prefix): + project_full_path = os.path.join( + os.environ["SPARK_HOME"], project_relative_path) + + # We should ignore the following jars + ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", "tests.jar") + + # Search jar in the project dir using the jar name_prefix for both sbt build and maven + # build because the artifact jars are in different directories. + sbt_build = glob.glob(os.path.join( + project_full_path, "target/scala-*/%s*.jar" % jar_name_prefix)) + maven_build = glob.glob(os.path.join( + project_full_path, "target/%s*.jar" % jar_name_prefix)) + jar_paths = sbt_build + maven_build + jars = [jar for jar in jar_paths if not jar.endswith(ignored_jar_suffixes)] + + if not jars: + return None + elif len(jars) > 1: + raise Exception("Found multiple JARs: %s; please remove all but one" % (", ".join(jars))) + else: + return jars[0] diff --git a/python/pyspark/util.py b/python/pyspark/util.py index f906f4959543..d0ecd43ead5a 100644 --- a/python/pyspark/util.py +++ b/python/pyspark/util.py @@ -106,6 +106,33 @@ def wrapper(*args, **kwargs): return wrapper +def _print_missing_jar(lib_name, pkg_name, jar_name, spark_version): + print(""" +________________________________________________________________________________________________ + + Spark %(lib_name)s libraries not found in class path. Try one of the following. + + 1. Include the %(lib_name)s library and its dependencies with in the + spark-submit command as + + $ bin/spark-submit --packages org.apache.spark:spark-%(pkg_name)s:%(spark_version)s ... + + 2. Download the JAR of the artifact from Maven Central http://search.maven.org/, + Group Id = org.apache.spark, Artifact Id = spark-%(jar_name)s, Version = %(spark_version)s. + Then, include the jar in the spark-submit command as + + $ bin/spark-submit --jars ... + +________________________________________________________________________________________________ + +""" % { + "lib_name": lib_name, + "pkg_name": pkg_name, + "jar_name": jar_name, + "spark_version": spark_version + }) + + if __name__ == "__main__": import doctest (failure_count, test_count) = doctest.testmod() From 7a8cc33d9b2246a7cbc2274ccd5f27897baef8e2 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Tue, 19 Feb 2019 18:28:44 +0100 Subject: [PATCH 05/11] Style fix --- python/pyspark/testing/streamingutils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/testing/streamingutils.py b/python/pyspark/testing/streamingutils.py index 0b3a68d18fd6..3bed50721a98 100644 --- a/python/pyspark/testing/streamingutils.py +++ b/python/pyspark/testing/streamingutils.py @@ -34,7 +34,7 @@ "was not set.") else: kinesis_asl_assembly_jar = search_jar("external/kinesis-asl-assembly", - "spark-streaming-kinesis-asl-assembly") + "spark-streaming-kinesis-asl-assembly") if kinesis_asl_assembly_jar is None: kinesis_requirement_message = ( "Skipping all Kinesis Python tests as the optional Kinesis project was " From d5249374de3c13e1ad14ad8e34cf576a847d18bc Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Tue, 19 Feb 2019 18:40:17 +0100 Subject: [PATCH 06/11] Compile fix --- python/pyspark/sql/avro/functions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/avro/functions.py b/python/pyspark/sql/avro/functions.py index ab0abe2c2cbc..1555a830bdfd 100644 --- a/python/pyspark/sql/avro/functions.py +++ b/python/pyspark/sql/avro/functions.py @@ -60,7 +60,7 @@ def from_avro(col, jsonFormatSchema, options={}): _to_java_column(col), jsonFormatSchema, options) except TypeError as e: if str(e) == "'JavaPackage' object is not callable": - _print_missing_jar("Avro", "avro", "avro", ssc.sparkContext.version) + _print_missing_jar("Avro", "avro", "avro", sc.version) raise return Column(jc) @@ -88,7 +88,7 @@ def to_avro(col): jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(col)) except TypeError as e: if str(e) == "'JavaPackage' object is not callable": - _print_missing_jar("Avro", "avro", "avro", ssc.sparkContext.version) + _print_missing_jar("Avro", "avro", "avro", sc.version) raise return Column(jc) From dcd7b07402073c49cacceb727aedb7360f5d8f84 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 21 Feb 2019 13:01:15 +0100 Subject: [PATCH 07/11] Param rename to make it compliant with the old API --- python/pyspark/sql/avro/functions.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/avro/functions.py b/python/pyspark/sql/avro/functions.py index 1555a830bdfd..7bf20c88c829 100644 --- a/python/pyspark/sql/avro/functions.py +++ b/python/pyspark/sql/avro/functions.py @@ -26,7 +26,7 @@ @since(3.0) -def from_avro(col, jsonFormatSchema, options={}): +def from_avro(data, jsonFormatSchema, options={}): """ Converts a binary column of avro format into its corresponding catalyst value. The specified schema must match the read data, otherwise the behavior is undefined: it may fail or return @@ -57,7 +57,7 @@ def from_avro(col, jsonFormatSchema, options={}): sc = SparkContext._active_spark_context try: jc = sc._jvm.org.apache.spark.sql.avro.functions.from_avro( - _to_java_column(col), jsonFormatSchema, options) + _to_java_column(data), jsonFormatSchema, options) except TypeError as e: if str(e) == "'JavaPackage' object is not callable": _print_missing_jar("Avro", "avro", "avro", sc.version) @@ -66,7 +66,7 @@ def from_avro(col, jsonFormatSchema, options={}): @since(3.0) -def to_avro(col): +def to_avro(data): """ Converts a column into binary of avro format. @@ -85,7 +85,7 @@ def to_avro(col): sc = SparkContext._active_spark_context try: - jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(col)) + jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(data)) except TypeError as e: if str(e) == "'JavaPackage' object is not callable": _print_missing_jar("Avro", "avro", "avro", sc.version) From 89ff143fb570c99d724989ed464cd020cd236313 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 22 Feb 2019 12:29:43 +0100 Subject: [PATCH 08/11] Review fixes: * ignore_unicode_prefix added * pyspark.sql.rst extended with avro * Note added to doc --- python/docs/pyspark.sql.rst | 6 ++++++ python/pyspark/sql/avro/functions.py | 11 +++++++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/python/docs/pyspark.sql.rst b/python/docs/pyspark.sql.rst index 5c3b7e274857..5da7b44a952a 100644 --- a/python/docs/pyspark.sql.rst +++ b/python/docs/pyspark.sql.rst @@ -23,6 +23,12 @@ pyspark.sql.functions module :members: :undoc-members: +pyspark.sql.avro.functions module +--------------------------------- +.. automodule:: pyspark.sql.avro.functions + :members: + :undoc-members: + pyspark.sql.streaming module ---------------------------- .. automodule:: pyspark.sql.streaming diff --git a/python/pyspark/sql/avro/functions.py b/python/pyspark/sql/avro/functions.py index 7bf20c88c829..18cb1e3506f9 100644 --- a/python/pyspark/sql/avro/functions.py +++ b/python/pyspark/sql/avro/functions.py @@ -21,10 +21,12 @@ from pyspark import since, SparkContext +from pyspark.rdd import ignore_unicode_prefix from pyspark.sql.column import Column, _to_java_column from pyspark.util import _print_missing_jar +@ignore_unicode_prefix @since(3.0) def from_avro(data, jsonFormatSchema, options={}): """ @@ -32,8 +34,8 @@ def from_avro(data, jsonFormatSchema, options={}): schema must match the read data, otherwise the behavior is undefined: it may fail or return arbitrary result. - Avro is built-in but external data source module since Spark 2.4. Please deploy the application - as per the deployment section of "Apache Avro Data Source Guide". + Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the + application as per the deployment section of "Apache Avro Data Source Guide". :param data: the binary column. :param jsonFormatSchema: the avro schema in JSON string format. @@ -65,13 +67,14 @@ def from_avro(data, jsonFormatSchema, options={}): return Column(jc) +@ignore_unicode_prefix @since(3.0) def to_avro(data): """ Converts a column into binary of avro format. - Avro is built-in but external data source module since Spark 2.4. Please deploy the application - as per the deployment section of "Apache Avro Data Source Guide". + Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the + application as per the deployment section of "Apache Avro Data Source Guide". :param data: the data column. From 65f523bf1c4b068e0c35e56ff7948d0db1106d92 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 7 Mar 2019 17:01:03 +0100 Subject: [PATCH 09/11] Add dependencies --- dev/sparktestsupport/modules.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 1c9ea3a08eb4..538405e1c890 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -345,7 +345,7 @@ def __hash__(self): pyspark_sql = Module( name="pyspark-sql", - dependencies=[pyspark_core, hive], + dependencies=[pyspark_core, hive, avro], source_file_regexes=[ "python/pyspark/sql" ], @@ -364,6 +364,7 @@ def __hash__(self): "pyspark.sql.streaming", "pyspark.sql.udf", "pyspark.sql.window", + "pyspark.sql.avro.functions", # unittests "pyspark.sql.tests.test_appsubmit", "pyspark.sql.tests.test_arrow", From 74aaa6f77542d7a53a15426a3c06a42806ab5d23 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 8 Mar 2019 10:21:18 +0100 Subject: [PATCH 10/11] Review fix --- dev/sparktestsupport/modules.py | 4 ---- python/pyspark/sql/avro/functions.py | 2 -- 2 files changed, 6 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 538405e1c890..d496eecd4d70 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -178,10 +178,6 @@ def __hash__(self): ], sbt_test_goals=[ "avro/test", - ], - python_test_goals=[ - # doctests - "pyspark.sql.avro.functions" ] ) diff --git a/python/pyspark/sql/avro/functions.py b/python/pyspark/sql/avro/functions.py index 18cb1e3506f9..f02952513d94 100644 --- a/python/pyspark/sql/avro/functions.py +++ b/python/pyspark/sql/avro/functions.py @@ -122,9 +122,7 @@ def _test(): .appName("sql.avro.functions tests")\ .getOrCreate() sc = spark.sparkContext - globs['sc'] = sc globs['spark'] = spark - globs['df'] = spark.createDataFrame([Row(name='Alice', age=2), Row(name='Bob', age=5)]) (failure_count, test_count) = doctest.testmod( pyspark.sql.avro.functions, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) From f3f0348ef4cce81fee62c7a390c465dd66a54ed2 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Sun, 10 Mar 2019 21:19:22 +0100 Subject: [PATCH 11/11] Remove usuned variable --- python/pyspark/sql/avro/functions.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/sql/avro/functions.py b/python/pyspark/sql/avro/functions.py index f02952513d94..81686df86679 100644 --- a/python/pyspark/sql/avro/functions.py +++ b/python/pyspark/sql/avro/functions.py @@ -121,7 +121,6 @@ def _test(): .master("local[4]")\ .appName("sql.avro.functions tests")\ .getOrCreate() - sc = spark.sparkContext globs['spark'] = spark (failure_count, test_count) = doctest.testmod( pyspark.sql.avro.functions, globs=globs,