Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def __hash__(self):

pyspark_sql = Module(
name="pyspark-sql",
dependencies=[pyspark_core, hive],
dependencies=[pyspark_core, hive, avro],
source_file_regexes=[
"python/pyspark/sql"
],
Expand All @@ -360,6 +360,7 @@ def __hash__(self):
"pyspark.sql.streaming",
"pyspark.sql.udf",
"pyspark.sql.window",
"pyspark.sql.avro.functions",
# unittests
"pyspark.sql.tests.test_appsubmit",
"pyspark.sql.tests.test_arrow",
Expand Down
31 changes: 31 additions & 0 deletions docs/sql-data-sources-avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,37 @@ StreamingQuery query = output
.option("topic", "topic2")
.start();

{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
from pyspark.sql.avro.functions import from_avro, to_avro

# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()

df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("subscribe", "topic1")\
.load()

# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))

query = output\
.writeStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("topic", "topic2")\
.start()

{% endhighlight %}
</div>
</div>
Expand Down
6 changes: 6 additions & 0 deletions python/docs/pyspark.sql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ pyspark.sql.functions module
:members:
:undoc-members:

pyspark.sql.avro.functions module
---------------------------------
.. automodule:: pyspark.sql.avro.functions
:members:
:undoc-members:

pyspark.sql.streaming module
----------------------------
.. automodule:: pyspark.sql.streaming
Expand Down
18 changes: 18 additions & 0 deletions python/pyspark/sql/avro/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

__all__ = ['functions']
134 changes: 134 additions & 0 deletions python/pyspark/sql/avro/functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
A collections of builtin avro functions
"""


from pyspark import since, SparkContext
from pyspark.rdd import ignore_unicode_prefix
from pyspark.sql.column import Column, _to_java_column
from pyspark.util import _print_missing_jar


@ignore_unicode_prefix
@since(3.0)
def from_avro(data, jsonFormatSchema, options={}):
"""
Converts a binary column of avro format into its corresponding catalyst value. The specified
schema must match the read data, otherwise the behavior is undefined: it may fail or return
arbitrary result.

Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we improve the wording here maybe? "built-in but external" might be a bit confusing. What do you think of something like it's a supported but optional data source that requires special deployment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is taken over from the original feature which introduced in 2.4. I think the users already got used to it. If you still think it worth I suggest to modify the original feature as well.

application as per the deployment section of "Apache Avro Data Source Guide".

:param data: the binary column.
:param jsonFormatSchema: the avro schema in JSON string format.
:param options: options to control how the Avro record is parsed.

>>> from pyspark.sql import Row
>>> from pyspark.sql.avro.functions import from_avro, to_avro
>>> data = [(1, Row(name='Alice', age=2))]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> avroDf = df.select(to_avro(df.value).alias("avro"))
>>> avroDf.collect()
[Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))]
>>> jsonFormatSchema = '''{"type":"record","name":"topLevelRecord","fields":
... [{"name":"avro","type":[{"type":"record","name":"value","namespace":"topLevelRecord",
... "fields":[{"name":"age","type":["long","null"]},
... {"name":"name","type":["string","null"]}]},"null"]}]}'''
>>> avroDf.select(from_avro(avroDf.avro, jsonFormatSchema).alias("value")).collect()
[Row(value=Row(avro=Row(age=2, name=u'Alice')))]
"""

sc = SparkContext._active_spark_context
try:
jc = sc._jvm.org.apache.spark.sql.avro.functions.from_avro(
_to_java_column(data), jsonFormatSchema, options)
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
_print_missing_jar("Avro", "avro", "avro", sc.version)
raise
return Column(jc)


@ignore_unicode_prefix
@since(3.0)
def to_avro(data):
"""
Converts a column into binary of avro format.

Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the
application as per the deployment section of "Apache Avro Data Source Guide".

:param data: the data column.

>>> from pyspark.sql import Row
>>> from pyspark.sql.avro.functions import to_avro
>>> data = [(1, Row(name='Alice', age=2))]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_avro(df.value).alias("avro")).collect()
[Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))]
"""

sc = SparkContext._active_spark_context
try:
jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(data))
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
_print_missing_jar("Avro", "avro", "avro", sc.version)
raise
return Column(jc)


def _test():
import os
import sys
from pyspark.testing.utils import search_jar
avro_jar = search_jar("external/avro", "spark-avro")
if avro_jar is None:
print(
"Skipping all Avro Python tests as the optional Avro project was "
"not compiled into a JAR. To run these tests, "
"you need to build Spark with 'build/sbt -Pavro package' or "
"'build/mvn -Pavro package' before running this test.")
sys.exit(0)
else:
existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
jars_args = "--jars %s" % avro_jar
os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args])

import doctest
from pyspark.sql import Row, SparkSession
import pyspark.sql.avro.functions
globs = pyspark.sql.avro.functions.__dict__.copy()
spark = SparkSession.builder\
.master("local[4]")\
.appName("sql.avro.functions tests")\
.getOrCreate()
globs['spark'] = spark
(failure_count, test_count) = doctest.testmod(
pyspark.sql.avro.functions, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
spark.stop()
if failure_count:
sys.exit(-1)


if __name__ == "__main__":
_test()
30 changes: 7 additions & 23 deletions python/pyspark/streaming/kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from pyspark.serializers import NoOpSerializer
from pyspark.storagelevel import StorageLevel
from pyspark.streaming import DStream
from pyspark.util import _print_missing_jar


__all__ = ['KinesisUtils', 'InitialPositionInStream', 'utf8_decoder']

Expand Down Expand Up @@ -82,7 +84,11 @@ def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName,
helper = ssc._jvm.org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper()
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
KinesisUtils._printErrorMsg(ssc.sparkContext)
_print_missing_jar(
"Streaming's Kinesis",
"streaming-kinesis-asl",
"streaming-kinesis-asl-assembly",
ssc.sparkContext.version)
raise
jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
regionName, initialPositionInStream, jduration, jlevel,
Expand All @@ -91,28 +97,6 @@ def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName,
stream = DStream(jstream, ssc, NoOpSerializer())
return stream.map(lambda v: decoder(v))

@staticmethod
def _printErrorMsg(sc):
print("""
________________________________________________________________________________________________

Spark Streaming's Kinesis libraries not found in class path. Try one of the following.

1. Include the Kinesis library and its dependencies with in the
spark-submit command as

$ bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl:%s ...

2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
Group Id = org.apache.spark, Artifact Id = spark-streaming-kinesis-asl-assembly, Version = %s.
Then, include the jar in the spark-submit command as

$ bin/spark-submit --jars <spark-streaming-kinesis-asl-assembly.jar> ...

________________________________________________________________________________________________

""" % (sc.version, sc.version))


class InitialPositionInStream(object):
LATEST, TRIM_HORIZON = (0, 1)
31 changes: 3 additions & 28 deletions python/pyspark/testing/streamingutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import glob
import os
import tempfile
import time
import unittest

from pyspark import SparkConf, SparkContext, RDD
from pyspark.streaming import StreamingContext


def search_kinesis_asl_assembly_jar():
kinesis_asl_assembly_dir = os.path.join(
os.environ["SPARK_HOME"], "external/kinesis-asl-assembly")

# We should ignore the following jars
ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", "tests.jar")

# Search jar in the project dir using the jar name_prefix for both sbt build and maven
# build because the artifact jars are in different directories.
name_prefix = "spark-streaming-kinesis-asl-assembly"
sbt_build = glob.glob(os.path.join(
kinesis_asl_assembly_dir, "target/scala-*/%s-*.jar" % name_prefix))
maven_build = glob.glob(os.path.join(
kinesis_asl_assembly_dir, "target/%s_*.jar" % name_prefix))
jar_paths = sbt_build + maven_build
jars = [jar for jar in jar_paths if not jar.endswith(ignored_jar_suffixes)]

if not jars:
return None
elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs: %s; please "
"remove all but one") % (", ".join(jars)))
else:
return jars[0]
from pyspark.testing.utils import search_jar


# Must be same as the variable and condition defined in KinesisTestUtils.scala and modules.py
Expand All @@ -59,7 +33,8 @@ def search_kinesis_asl_assembly_jar():
"Skipping all Kinesis Python tests as environmental variable 'ENABLE_KINESIS_TESTS' "
"was not set.")
else:
kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
kinesis_asl_assembly_jar = search_jar("external/kinesis-asl-assembly",
"spark-streaming-kinesis-asl-assembly")
if kinesis_asl_assembly_jar is None:
kinesis_requirement_message = (
"Skipping all Kinesis Python tests as the optional Kinesis project was "
Expand Down
25 changes: 25 additions & 0 deletions python/pyspark/testing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import glob
import os
import struct
import sys
Expand Down Expand Up @@ -100,3 +101,27 @@ def write(self, b):

def close(self):
pass


def search_jar(project_relative_path, jar_name_prefix):
project_full_path = os.path.join(
os.environ["SPARK_HOME"], project_relative_path)

# We should ignore the following jars
ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", "tests.jar")

# Search jar in the project dir using the jar name_prefix for both sbt build and maven
# build because the artifact jars are in different directories.
sbt_build = glob.glob(os.path.join(
project_full_path, "target/scala-*/%s*.jar" % jar_name_prefix))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, All.
This causes Python UT failures which blocks another PRs. Please see #24268 .

maven_build = glob.glob(os.path.join(
project_full_path, "target/%s*.jar" % jar_name_prefix))
jar_paths = sbt_build + maven_build
jars = [jar for jar in jar_paths if not jar.endswith(ignored_jar_suffixes)]

if not jars:
return None
elif len(jars) > 1:
raise Exception("Found multiple JARs: %s; please remove all but one" % (", ".join(jars)))
else:
return jars[0]
27 changes: 27 additions & 0 deletions python/pyspark/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,33 @@ def wrapper(*args, **kwargs):
return wrapper


def _print_missing_jar(lib_name, pkg_name, jar_name, spark_version):
print("""
________________________________________________________________________________________________

Spark %(lib_name)s libraries not found in class path. Try one of the following.

1. Include the %(lib_name)s library and its dependencies with in the
spark-submit command as

$ bin/spark-submit --packages org.apache.spark:spark-%(pkg_name)s:%(spark_version)s ...

2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
Group Id = org.apache.spark, Artifact Id = spark-%(jar_name)s, Version = %(spark_version)s.
Then, include the jar in the spark-submit command as

$ bin/spark-submit --jars <spark-%(jar_name)s.jar> ...

________________________________________________________________________________________________

""" % {
"lib_name": lib_name,
"pkg_name": pkg_name,
"jar_name": jar_name,
"spark_version": spark_version
})


if __name__ == "__main__":
import doctest
(failure_count, test_count) = doctest.testmod()
Expand Down