diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index eef7f259391b..d496eecd4d70 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -341,7 +341,7 @@ def __hash__(self): pyspark_sql = Module( name="pyspark-sql", - dependencies=[pyspark_core, hive], + dependencies=[pyspark_core, hive, avro], source_file_regexes=[ "python/pyspark/sql" ], @@ -360,6 +360,7 @@ def __hash__(self): "pyspark.sql.streaming", "pyspark.sql.udf", "pyspark.sql.window", + "pyspark.sql.avro.functions", # unittests "pyspark.sql.tests.test_appsubmit", "pyspark.sql.tests.test_arrow", diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md index afb91ae86885..40d53fbe9871 100644 --- a/docs/sql-data-sources-avro.md +++ b/docs/sql-data-sources-avro.md @@ -137,6 +137,37 @@ StreamingQuery query = output .option("topic", "topic2") .start(); +{% endhighlight %} + +
+{% highlight python %} +from pyspark.sql.avro.functions import from_avro, to_avro + +# `from_avro` requires Avro schema in JSON string format. +jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read() + +df = spark\ + .readStream\ + .format("kafka")\ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\ + .option("subscribe", "topic1")\ + .load() + +# 1. Decode the Avro data into a struct; +# 2. Filter by column `favorite_color`; +# 3. Encode the column `name` in Avro format. +output = df\ + .select(from_avro("value", jsonFormatSchema).alias("user"))\ + .where('user.favorite_color == "red"')\ + .select(to_avro("user.name").alias("value")) + +query = output\ + .writeStream\ + .format("kafka")\ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\ + .option("topic", "topic2")\ + .start() + {% endhighlight %}
diff --git a/python/docs/pyspark.sql.rst b/python/docs/pyspark.sql.rst index 5c3b7e274857..5da7b44a952a 100644 --- a/python/docs/pyspark.sql.rst +++ b/python/docs/pyspark.sql.rst @@ -23,6 +23,12 @@ pyspark.sql.functions module :members: :undoc-members: +pyspark.sql.avro.functions module +--------------------------------- +.. automodule:: pyspark.sql.avro.functions + :members: + :undoc-members: + pyspark.sql.streaming module ---------------------------- .. automodule:: pyspark.sql.streaming diff --git a/python/pyspark/sql/avro/__init__.py b/python/pyspark/sql/avro/__init__.py new file mode 100644 index 000000000000..8f01fbbf6482 --- /dev/null +++ b/python/pyspark/sql/avro/__init__.py @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +__all__ = ['functions'] diff --git a/python/pyspark/sql/avro/functions.py b/python/pyspark/sql/avro/functions.py new file mode 100644 index 000000000000..81686df86679 --- /dev/null +++ b/python/pyspark/sql/avro/functions.py @@ -0,0 +1,134 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A collections of builtin avro functions +""" + + +from pyspark import since, SparkContext +from pyspark.rdd import ignore_unicode_prefix +from pyspark.sql.column import Column, _to_java_column +from pyspark.util import _print_missing_jar + + +@ignore_unicode_prefix +@since(3.0) +def from_avro(data, jsonFormatSchema, options={}): + """ + Converts a binary column of avro format into its corresponding catalyst value. The specified + schema must match the read data, otherwise the behavior is undefined: it may fail or return + arbitrary result. + + Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the + application as per the deployment section of "Apache Avro Data Source Guide". + + :param data: the binary column. + :param jsonFormatSchema: the avro schema in JSON string format. + :param options: options to control how the Avro record is parsed. + + >>> from pyspark.sql import Row + >>> from pyspark.sql.avro.functions import from_avro, to_avro + >>> data = [(1, Row(name='Alice', age=2))] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> avroDf = df.select(to_avro(df.value).alias("avro")) + >>> avroDf.collect() + [Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))] + >>> jsonFormatSchema = '''{"type":"record","name":"topLevelRecord","fields": + ... [{"name":"avro","type":[{"type":"record","name":"value","namespace":"topLevelRecord", + ... "fields":[{"name":"age","type":["long","null"]}, + ... {"name":"name","type":["string","null"]}]},"null"]}]}''' + >>> avroDf.select(from_avro(avroDf.avro, jsonFormatSchema).alias("value")).collect() + [Row(value=Row(avro=Row(age=2, name=u'Alice')))] + """ + + sc = SparkContext._active_spark_context + try: + jc = sc._jvm.org.apache.spark.sql.avro.functions.from_avro( + _to_java_column(data), jsonFormatSchema, options) + except TypeError as e: + if str(e) == "'JavaPackage' object is not callable": + _print_missing_jar("Avro", "avro", "avro", sc.version) + raise + return Column(jc) + + +@ignore_unicode_prefix +@since(3.0) +def to_avro(data): + """ + Converts a column into binary of avro format. + + Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the + application as per the deployment section of "Apache Avro Data Source Guide". + + :param data: the data column. + + >>> from pyspark.sql import Row + >>> from pyspark.sql.avro.functions import to_avro + >>> data = [(1, Row(name='Alice', age=2))] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(to_avro(df.value).alias("avro")).collect() + [Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))] + """ + + sc = SparkContext._active_spark_context + try: + jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(data)) + except TypeError as e: + if str(e) == "'JavaPackage' object is not callable": + _print_missing_jar("Avro", "avro", "avro", sc.version) + raise + return Column(jc) + + +def _test(): + import os + import sys + from pyspark.testing.utils import search_jar + avro_jar = search_jar("external/avro", "spark-avro") + if avro_jar is None: + print( + "Skipping all Avro Python tests as the optional Avro project was " + "not compiled into a JAR. To run these tests, " + "you need to build Spark with 'build/sbt -Pavro package' or " + "'build/mvn -Pavro package' before running this test.") + sys.exit(0) + else: + existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell") + jars_args = "--jars %s" % avro_jar + os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args]) + + import doctest + from pyspark.sql import Row, SparkSession + import pyspark.sql.avro.functions + globs = pyspark.sql.avro.functions.__dict__.copy() + spark = SparkSession.builder\ + .master("local[4]")\ + .appName("sql.avro.functions tests")\ + .getOrCreate() + globs['spark'] = spark + (failure_count, test_count) = doctest.testmod( + pyspark.sql.avro.functions, globs=globs, + optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) + spark.stop() + if failure_count: + sys.exit(-1) + + +if __name__ == "__main__": + _test() diff --git a/python/pyspark/streaming/kinesis.py b/python/pyspark/streaming/kinesis.py index b3348828fdf6..4ed9f2a40c3a 100644 --- a/python/pyspark/streaming/kinesis.py +++ b/python/pyspark/streaming/kinesis.py @@ -18,6 +18,8 @@ from pyspark.serializers import NoOpSerializer from pyspark.storagelevel import StorageLevel from pyspark.streaming import DStream +from pyspark.util import _print_missing_jar + __all__ = ['KinesisUtils', 'InitialPositionInStream', 'utf8_decoder'] @@ -82,7 +84,11 @@ def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName, helper = ssc._jvm.org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper() except TypeError as e: if str(e) == "'JavaPackage' object is not callable": - KinesisUtils._printErrorMsg(ssc.sparkContext) + _print_missing_jar( + "Streaming's Kinesis", + "streaming-kinesis-asl", + "streaming-kinesis-asl-assembly", + ssc.sparkContext.version) raise jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl, regionName, initialPositionInStream, jduration, jlevel, @@ -91,28 +97,6 @@ def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName, stream = DStream(jstream, ssc, NoOpSerializer()) return stream.map(lambda v: decoder(v)) - @staticmethod - def _printErrorMsg(sc): - print(""" -________________________________________________________________________________________________ - - Spark Streaming's Kinesis libraries not found in class path. Try one of the following. - - 1. Include the Kinesis library and its dependencies with in the - spark-submit command as - - $ bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl:%s ... - - 2. Download the JAR of the artifact from Maven Central http://search.maven.org/, - Group Id = org.apache.spark, Artifact Id = spark-streaming-kinesis-asl-assembly, Version = %s. - Then, include the jar in the spark-submit command as - - $ bin/spark-submit --jars ... - -________________________________________________________________________________________________ - -""" % (sc.version, sc.version)) - class InitialPositionInStream(object): LATEST, TRIM_HORIZON = (0, 1) diff --git a/python/pyspark/testing/streamingutils.py b/python/pyspark/testing/streamingutils.py index 85a2fa14b936..3bed50721a98 100644 --- a/python/pyspark/testing/streamingutils.py +++ b/python/pyspark/testing/streamingutils.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import glob import os import tempfile import time @@ -22,32 +21,7 @@ from pyspark import SparkConf, SparkContext, RDD from pyspark.streaming import StreamingContext - - -def search_kinesis_asl_assembly_jar(): - kinesis_asl_assembly_dir = os.path.join( - os.environ["SPARK_HOME"], "external/kinesis-asl-assembly") - - # We should ignore the following jars - ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", "tests.jar") - - # Search jar in the project dir using the jar name_prefix for both sbt build and maven - # build because the artifact jars are in different directories. - name_prefix = "spark-streaming-kinesis-asl-assembly" - sbt_build = glob.glob(os.path.join( - kinesis_asl_assembly_dir, "target/scala-*/%s-*.jar" % name_prefix)) - maven_build = glob.glob(os.path.join( - kinesis_asl_assembly_dir, "target/%s_*.jar" % name_prefix)) - jar_paths = sbt_build + maven_build - jars = [jar for jar in jar_paths if not jar.endswith(ignored_jar_suffixes)] - - if not jars: - return None - elif len(jars) > 1: - raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs: %s; please " - "remove all but one") % (", ".join(jars))) - else: - return jars[0] +from pyspark.testing.utils import search_jar # Must be same as the variable and condition defined in KinesisTestUtils.scala and modules.py @@ -59,7 +33,8 @@ def search_kinesis_asl_assembly_jar(): "Skipping all Kinesis Python tests as environmental variable 'ENABLE_KINESIS_TESTS' " "was not set.") else: - kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar() + kinesis_asl_assembly_jar = search_jar("external/kinesis-asl-assembly", + "spark-streaming-kinesis-asl-assembly") if kinesis_asl_assembly_jar is None: kinesis_requirement_message = ( "Skipping all Kinesis Python tests as the optional Kinesis project was " diff --git a/python/pyspark/testing/utils.py b/python/pyspark/testing/utils.py index 7df0acae026f..c6a528194f00 100644 --- a/python/pyspark/testing/utils.py +++ b/python/pyspark/testing/utils.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import glob import os import struct import sys @@ -100,3 +101,27 @@ def write(self, b): def close(self): pass + + +def search_jar(project_relative_path, jar_name_prefix): + project_full_path = os.path.join( + os.environ["SPARK_HOME"], project_relative_path) + + # We should ignore the following jars + ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", "tests.jar") + + # Search jar in the project dir using the jar name_prefix for both sbt build and maven + # build because the artifact jars are in different directories. + sbt_build = glob.glob(os.path.join( + project_full_path, "target/scala-*/%s*.jar" % jar_name_prefix)) + maven_build = glob.glob(os.path.join( + project_full_path, "target/%s*.jar" % jar_name_prefix)) + jar_paths = sbt_build + maven_build + jars = [jar for jar in jar_paths if not jar.endswith(ignored_jar_suffixes)] + + if not jars: + return None + elif len(jars) > 1: + raise Exception("Found multiple JARs: %s; please remove all but one" % (", ".join(jars))) + else: + return jars[0] diff --git a/python/pyspark/util.py b/python/pyspark/util.py index f906f4959543..d0ecd43ead5a 100644 --- a/python/pyspark/util.py +++ b/python/pyspark/util.py @@ -106,6 +106,33 @@ def wrapper(*args, **kwargs): return wrapper +def _print_missing_jar(lib_name, pkg_name, jar_name, spark_version): + print(""" +________________________________________________________________________________________________ + + Spark %(lib_name)s libraries not found in class path. Try one of the following. + + 1. Include the %(lib_name)s library and its dependencies with in the + spark-submit command as + + $ bin/spark-submit --packages org.apache.spark:spark-%(pkg_name)s:%(spark_version)s ... + + 2. Download the JAR of the artifact from Maven Central http://search.maven.org/, + Group Id = org.apache.spark, Artifact Id = spark-%(jar_name)s, Version = %(spark_version)s. + Then, include the jar in the spark-submit command as + + $ bin/spark-submit --jars ... + +________________________________________________________________________________________________ + +""" % { + "lib_name": lib_name, + "pkg_name": pkg_name, + "jar_name": jar_name, + "spark_version": spark_version + }) + + if __name__ == "__main__": import doctest (failure_count, test_count) = doctest.testmod()