From 4062cda3087ae42c6c3cb24508fc1d3a931accdf Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 21 Dec 2015 17:50:29 -0800 Subject: [PATCH 01/16] Preparing Spark release v1.6.0-rc4 --- assembly/pom.xml | 2 +- bagel/pom.xml | 2 +- core/pom.xml | 2 +- docker-integration-tests/pom.xml | 2 +- examples/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt-assembly/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml | 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml | 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tags/pom.xml | 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml | 2 +- 35 files changed, 35 insertions(+), 35 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index 4b60ee00ffbe..fbabaa5424f1 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../pom.xml diff --git a/bagel/pom.xml b/bagel/pom.xml index 672e9469aec9..1b3e417b8cc5 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index 61744bb5c7bf..15b8d755a129 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../pom.xml diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml index 39d3f344615e..d579879b71e1 100644 --- a/docker-integration-tests/pom.xml +++ b/docker-integration-tests/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../pom.xml diff --git a/examples/pom.xml b/examples/pom.xml index f5ab2a7fdc09..37b15bb3c604 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../pom.xml diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index dceedcf23ed5..295455acf2d6 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 75113ff753e7..31b907f189a6 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 57f83607365d..c74505e7175b 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml index a9ed39ef8c9a..1f4b7b4cbd39 100644 --- a/external/kafka-assembly/pom.xml +++ b/external/kafka-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index 79258c126e04..3fe72c6bc996 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml index 89713a28ca6a..3aac9651f584 100644 --- a/external/mqtt-assembly/pom.xml +++ b/external/mqtt-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index 59fba8b826b4..c062b1e7db46 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml index 087270de90b3..c45fc4f29cdc 100644 --- a/external/twitter/pom.xml +++ b/external/twitter/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml index 02d6b8128157..9eddd9a39fd9 100644 --- a/external/zeromq/pom.xml +++ b/external/zeromq/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml index 4ce90e75fd35..4e8a2f34cac9 100644 --- a/extras/java8-tests/pom.xml +++ b/extras/java8-tests/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml index 61ba4787fbf9..6bd0cb148ae4 100644 --- a/extras/kinesis-asl-assembly/pom.xml +++ b/extras/kinesis-asl-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml index 519a920279c9..7756fa26cdd0 100644 --- a/extras/kinesis-asl/pom.xml +++ b/extras/kinesis-asl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml index 87a4f05a0596..de8747e15f95 100644 --- a/extras/spark-ganglia-lgpl/pom.xml +++ b/extras/spark-ganglia-lgpl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/graphx/pom.xml b/graphx/pom.xml index 8cd66c5b2e82..a66c86e7a843 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../pom.xml diff --git a/launcher/pom.xml b/launcher/pom.xml index 5739bfc16958..6bbf7cfa8325 100644 --- a/launcher/pom.xml +++ b/launcher/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../pom.xml diff --git a/mllib/pom.xml b/mllib/pom.xml index 70139121d8c7..d1b3dbc35928 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../pom.xml diff --git a/network/common/pom.xml b/network/common/pom.xml index 9af6cc5e925f..6d84f93984be 100644 --- a/network/common/pom.xml +++ b/network/common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml index 70ba5cb1995b..00c141eadb39 100644 --- a/network/shuffle/pom.xml +++ b/network/shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml index e2360eff5cfe..e3644a4787ea 100644 --- a/network/yarn/pom.xml +++ b/network/yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/pom.xml b/pom.xml index 4050b43d6856..bbee213d0665 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 pom Spark Project Parent POM http://spark.apache.org/ diff --git a/repl/pom.xml b/repl/pom.xml index 154c99d23c7f..c663702ddb13 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../pom.xml diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 61d6fc63554b..33ef0d6d7df0 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 06841b094562..84ac87b21804 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index b5b2143292a6..5e0e51381f3e 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index d96f3e2b9f62..291a676f247c 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../../pom.xml diff --git a/streaming/pom.xml b/streaming/pom.xml index 435e16db13ab..5177baf0892a 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../pom.xml diff --git a/tags/pom.xml b/tags/pom.xml index ca93722e7334..8d1a107d0705 100644 --- a/tags/pom.xml +++ b/tags/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../pom.xml diff --git a/tools/pom.xml b/tools/pom.xml index 1e64f280e5be..fee0b3dce840 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../pom.xml diff --git a/unsafe/pom.xml b/unsafe/pom.xml index a1c1111364ee..4e9c73cd5c13 100644 --- a/unsafe/pom.xml +++ b/unsafe/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../pom.xml diff --git a/yarn/pom.xml b/yarn/pom.xml index 989b820bec9e..8bc520d3c548 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0-SNAPSHOT + 1.6.0 ../pom.xml From 5b19e7cfded0e2e41b6f427b4c3cfc3f06f85466 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 21 Dec 2015 17:50:36 -0800 Subject: [PATCH 02/16] Preparing development version 1.6.0-SNAPSHOT --- assembly/pom.xml | 2 +- bagel/pom.xml | 2 +- core/pom.xml | 2 +- docker-integration-tests/pom.xml | 2 +- examples/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt-assembly/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml | 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml | 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tags/pom.xml | 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml | 2 +- 35 files changed, 35 insertions(+), 35 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index fbabaa5424f1..4b60ee00ffbe 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../pom.xml diff --git a/bagel/pom.xml b/bagel/pom.xml index 1b3e417b8cc5..672e9469aec9 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index 15b8d755a129..61744bb5c7bf 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../pom.xml diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml index d579879b71e1..39d3f344615e 100644 --- a/docker-integration-tests/pom.xml +++ b/docker-integration-tests/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../pom.xml diff --git a/examples/pom.xml b/examples/pom.xml index 37b15bb3c604..f5ab2a7fdc09 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../pom.xml diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index 295455acf2d6..dceedcf23ed5 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 31b907f189a6..75113ff753e7 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/external/flume/pom.xml b/external/flume/pom.xml index c74505e7175b..57f83607365d 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml index 1f4b7b4cbd39..a9ed39ef8c9a 100644 --- a/external/kafka-assembly/pom.xml +++ b/external/kafka-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index 3fe72c6bc996..79258c126e04 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml index 3aac9651f584..89713a28ca6a 100644 --- a/external/mqtt-assembly/pom.xml +++ b/external/mqtt-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index c062b1e7db46..59fba8b826b4 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml index c45fc4f29cdc..087270de90b3 100644 --- a/external/twitter/pom.xml +++ b/external/twitter/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml index 9eddd9a39fd9..02d6b8128157 100644 --- a/external/zeromq/pom.xml +++ b/external/zeromq/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml index 4e8a2f34cac9..4ce90e75fd35 100644 --- a/extras/java8-tests/pom.xml +++ b/extras/java8-tests/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml index 6bd0cb148ae4..61ba4787fbf9 100644 --- a/extras/kinesis-asl-assembly/pom.xml +++ b/extras/kinesis-asl-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml index 7756fa26cdd0..519a920279c9 100644 --- a/extras/kinesis-asl/pom.xml +++ b/extras/kinesis-asl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml index de8747e15f95..87a4f05a0596 100644 --- a/extras/spark-ganglia-lgpl/pom.xml +++ b/extras/spark-ganglia-lgpl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/graphx/pom.xml b/graphx/pom.xml index a66c86e7a843..8cd66c5b2e82 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../pom.xml diff --git a/launcher/pom.xml b/launcher/pom.xml index 6bbf7cfa8325..5739bfc16958 100644 --- a/launcher/pom.xml +++ b/launcher/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../pom.xml diff --git a/mllib/pom.xml b/mllib/pom.xml index d1b3dbc35928..70139121d8c7 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../pom.xml diff --git a/network/common/pom.xml b/network/common/pom.xml index 6d84f93984be..9af6cc5e925f 100644 --- a/network/common/pom.xml +++ b/network/common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml index 00c141eadb39..70ba5cb1995b 100644 --- a/network/shuffle/pom.xml +++ b/network/shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml index e3644a4787ea..e2360eff5cfe 100644 --- a/network/yarn/pom.xml +++ b/network/yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/pom.xml b/pom.xml index bbee213d0665..4050b43d6856 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT pom Spark Project Parent POM http://spark.apache.org/ diff --git a/repl/pom.xml b/repl/pom.xml index c663702ddb13..154c99d23c7f 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../pom.xml diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 33ef0d6d7df0..61d6fc63554b 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 84ac87b21804..06841b094562 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index 5e0e51381f3e..b5b2143292a6 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 291a676f247c..d96f3e2b9f62 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../../pom.xml diff --git a/streaming/pom.xml b/streaming/pom.xml index 5177baf0892a..435e16db13ab 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../pom.xml diff --git a/tags/pom.xml b/tags/pom.xml index 8d1a107d0705..ca93722e7334 100644 --- a/tags/pom.xml +++ b/tags/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../pom.xml diff --git a/tools/pom.xml b/tools/pom.xml index fee0b3dce840..1e64f280e5be 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../pom.xml diff --git a/unsafe/pom.xml b/unsafe/pom.xml index 4e9c73cd5c13..a1c1111364ee 100644 --- a/unsafe/pom.xml +++ b/unsafe/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../pom.xml diff --git a/yarn/pom.xml b/yarn/pom.xml index 8bc520d3c548..989b820bec9e 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.6.0 + 1.6.0-SNAPSHOT ../pom.xml From 309ef355fc511b70765983358d5c92b5f1a26bce Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 21 Dec 2015 22:28:18 -0800 Subject: [PATCH 03/16] [MINOR] Fix typos in JavaStreamingContext Author: Shixiong Zhu Closes #10424 from zsxwing/typo. (cherry picked from commit 93da8565fea42d8ac978df411daced4a9ea3a9c8) Signed-off-by: Reynold Xin --- .../spark/streaming/api/java/JavaStreamingContext.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 8f21c79a760c..7a5013502546 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -695,9 +695,9 @@ object JavaStreamingContext { * * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext - * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactor. + * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory. */ - @deprecated("use getOrCreate without JavaStreamingContextFactor", "1.4.0") + @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0") def getOrCreate( checkpointPath: String, factory: JavaStreamingContextFactory @@ -718,7 +718,7 @@ object JavaStreamingContext { * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible * file system - * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactor. + * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory. */ @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0") def getOrCreate( @@ -744,7 +744,7 @@ object JavaStreamingContext { * file system * @param createOnError Whether to create a new JavaStreamingContext if there is an * error in reading checkpoint data. - * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactor. + * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory. */ @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0") def getOrCreate( From 0f905d7df43b20d9335ec880b134d8d4f962c297 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 21 Dec 2015 23:12:05 -0800 Subject: [PATCH 04/16] [SPARK-11823][SQL] Fix flaky JDBC cancellation test in HiveThriftBinaryServerSuite This patch fixes a flaky "test jdbc cancel" test in HiveThriftBinaryServerSuite. This test is prone to a race-condition which causes it to block indefinitely with while waiting for an extremely slow query to complete, which caused many Jenkins builds to time out. For more background, see my comments on #6207 (the PR which introduced this test). Author: Josh Rosen Closes #10425 from JoshRosen/SPARK-11823. (cherry picked from commit 2235cd44407e3b6b401fb84a2096ade042c51d36) Signed-off-by: Josh Rosen --- .../HiveThriftServer2Suites.scala | 85 ++++++++++++------- 1 file changed, 56 insertions(+), 29 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 139d8e897ba1..ebb2575416b7 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -23,9 +23,8 @@ import java.sql.{Date, DriverManager, SQLException, Statement} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ -import scala.concurrent.{Await, Promise, future} +import scala.concurrent.{Await, ExecutionContext, Promise, future} import scala.io.Source import scala.util.{Random, Try} @@ -43,7 +42,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.{Logging, SparkFunSuite} object TestData { @@ -356,31 +355,54 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map") queries.foreach(statement.execute) - - val largeJoin = "SELECT COUNT(*) FROM test_map " + - List.fill(10)("join test_map").mkString(" ") - val f = future { Thread.sleep(100); statement.cancel(); } - val e = intercept[SQLException] { - statement.executeQuery(largeJoin) + implicit val ec = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonSingleThreadExecutor("test-jdbc-cancel")) + try { + // Start a very-long-running query that will take hours to finish, then cancel it in order + // to demonstrate that cancellation works. + val f = future { + statement.executeQuery( + "SELECT COUNT(*) FROM test_map " + + List.fill(10)("join test_map").mkString(" ")) + } + // Note that this is slightly race-prone: if the cancel is issued before the statement + // begins executing then we'll fail with a timeout. As a result, this fixed delay is set + // slightly more conservatively than may be strictly necessary. + Thread.sleep(1000) + statement.cancel() + val e = intercept[SQLException] { + Await.result(f, 3.minute) + } + assert(e.getMessage.contains("cancelled")) + + // Cancellation is a no-op if spark.sql.hive.thriftServer.async=false + statement.executeQuery("SET spark.sql.hive.thriftServer.async=false") + try { + val sf = future { + statement.executeQuery( + "SELECT COUNT(*) FROM test_map " + + List.fill(4)("join test_map").mkString(" ") + ) + } + // Similarly, this is also slightly race-prone on fast machines where the query above + // might race and complete before we issue the cancel. + Thread.sleep(1000) + statement.cancel() + val rs1 = Await.result(sf, 3.minute) + rs1.next() + assert(rs1.getInt(1) === math.pow(5, 5)) + rs1.close() + + val rs2 = statement.executeQuery("SELECT COUNT(*) FROM test_map") + rs2.next() + assert(rs2.getInt(1) === 5) + rs2.close() + } finally { + statement.executeQuery("SET spark.sql.hive.thriftServer.async=true") + } + } finally { + ec.shutdownNow() } - assert(e.getMessage contains "cancelled") - Await.result(f, 3.minute) - - // cancel is a noop - statement.executeQuery("SET spark.sql.hive.thriftServer.async=false") - val sf = future { Thread.sleep(100); statement.cancel(); } - val smallJoin = "SELECT COUNT(*) FROM test_map " + - List.fill(4)("join test_map").mkString(" ") - val rs1 = statement.executeQuery(smallJoin) - Await.result(sf, 3.minute) - rs1.next() - assert(rs1.getInt(1) === math.pow(5, 5)) - rs1.close() - - val rs2 = statement.executeQuery("SELECT COUNT(*) FROM test_map") - rs2.next() - assert(rs2.getInt(1) === 5) - rs2.close() } } @@ -817,6 +839,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl } override protected def beforeAll(): Unit = { + super.beforeAll() // Chooses a random port between 10000 and 19999 listeningPort = 10000 + Random.nextInt(10000) diagnosisBuffer.clear() @@ -838,7 +861,11 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl } override protected def afterAll(): Unit = { - stopThriftServer() - logInfo("HiveThriftServer2 stopped") + try { + stopThriftServer() + logInfo("HiveThriftServer2 stopped") + } finally { + super.afterAll() + } } } From 94fb5e870403e19feca8faf7d98bba6d14f7a362 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 22 Dec 2015 15:33:30 -0800 Subject: [PATCH 05/16] [SPARK-12487][STREAMING][DOCUMENT] Add docs for Kafka message handler Author: Shixiong Zhu Closes #10439 from zsxwing/kafka-message-handler-doc. (cherry picked from commit 93db50d1c2ff97e6eb9200a995e4601f752968ae) Signed-off-by: Tathagata Das --- docs/streaming-kafka-integration.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/streaming-kafka-integration.md b/docs/streaming-kafka-integration.md index 5be73c42560f..9454714eeb9c 100644 --- a/docs/streaming-kafka-integration.md +++ b/docs/streaming-kafka-integration.md @@ -104,6 +104,7 @@ Next, we discuss how to use this approach in your streaming application. [key class], [value class], [key decoder class], [value decoder class] ]( streamingContext, [map of Kafka parameters], [set of topics to consume]) + You can also pass a `messageHandler` to `createDirectStream` to access `MessageAndMetadata` that contains metadata about the current message and transform it to any desired type. See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$) and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala). @@ -115,6 +116,7 @@ Next, we discuss how to use this approach in your streaming application. [key class], [value class], [key decoder class], [value decoder class], [map of Kafka parameters], [set of topics to consume]); + You can also pass a `messageHandler` to `createDirectStream` to access `MessageAndMetadata` that contains metadata about the current message and transform it to any desired type. See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html) and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java). @@ -123,6 +125,7 @@ Next, we discuss how to use this approach in your streaming application. from pyspark.streaming.kafka import KafkaUtils directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) + You can also pass a `messageHandler` to `createDirectStream` to access `KafkaMessageAndMetadata` that contains metadata about the current message and transform it to any desired type. By default, the Python API will decode Kafka data as UTF8 encoded strings. You can specify your custom decoding function to decode the byte arrays in Kafka records to any arbitrary data type. See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils) and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py). From 942c0577b201a08fffdcaf71e4d1867266ae309e Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 22 Dec 2015 16:39:10 -0800 Subject: [PATCH 06/16] [SPARK-12429][STREAMING][DOC] Add Accumulator and Broadcast example for Streaming This PR adds Scala, Java and Python examples to show how to use Accumulator and Broadcast in Spark Streaming to support checkpointing. Author: Shixiong Zhu Closes #10385 from zsxwing/accumulator-broadcast-example. (cherry picked from commit 20591afd790799327f99485c5a969ed7412eca45) Signed-off-by: Tathagata Das --- docs/programming-guide.md | 6 +- docs/streaming-programming-guide.md | 165 ++++++++++++++++++ .../JavaRecoverableNetworkWordCount.java | 71 +++++++- .../recoverable_network_wordcount.py | 30 +++- .../RecoverableNetworkWordCount.scala | 66 ++++++- 5 files changed, 325 insertions(+), 13 deletions(-) diff --git a/docs/programming-guide.md b/docs/programming-guide.md index f823b89a4b5e..010346e1e81d 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -806,7 +806,7 @@ However, in `cluster` mode, what happens is more complicated, and the above may What is happening here is that the variables within the closure sent to each executor are now copies and thus, when **counter** is referenced within the `foreach` function, it's no longer the **counter** on the driver node. There is still a **counter** in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of **counter** will still be zero since all operations on **counter** were referencing the value within the serialized closure. -To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#AccumLink). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail. +To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#accumulators). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail. In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that's just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed. @@ -1091,7 +1091,7 @@ for details. foreach(func) - Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. + Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details. @@ -1336,7 +1336,7 @@ run on the cluster so that `v` is not shipped to the nodes more than once. In ad `v` should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later). -## Accumulators +## Accumulators Accumulators are variables that are only "added" to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index ed6b28c28213..3b071c7da559 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1415,6 +1415,171 @@ Note that the connections in the pool should be lazily created on demand and tim *** +## Accumulators and Broadcast Variables + +[Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use [Accumulators](programming-guide.html#accumulators) or [Broadcast variables](programming-guide.html#broadcast-variables) as well, you'll have to create lazily instantiated singleton instances for [Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) so that they can be re-instantiated after the driver restarts on failure. This is shown in the following example. + +
+
+{% highlight scala %} + +object WordBlacklist { + + @volatile private var instance: Broadcast[Seq[String]] = null + + def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { + if (instance == null) { + synchronized { + if (instance == null) { + val wordBlacklist = Seq("a", "b", "c") + instance = sc.broadcast(wordBlacklist) + } + } + } + instance + } +} + +object DroppedWordsCounter { + + @volatile private var instance: Accumulator[Long] = null + + def getInstance(sc: SparkContext): Accumulator[Long] = { + if (instance == null) { + synchronized { + if (instance == null) { + instance = sc.accumulator(0L, "WordsInBlacklistCounter") + } + } + } + instance + } +} + +wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { + // Get or register the blacklist Broadcast + val blacklist = WordBlacklist.getInstance(rdd.sparkContext) + // Get or register the droppedWordsCounter Accumulator + val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) + // Use blacklist to drop words and use droppedWordsCounter to count them + val counts = rdd.filter { case (word, count) => + if (blacklist.value.contains(word)) { + droppedWordsCounter += count + false + } else { + true + } + }.collect() + val output = "Counts at time " + time + " " + counts +}) + +{% endhighlight %} + +See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala). +
+
+{% highlight java %} + +class JavaWordBlacklist { + + private static volatile Broadcast> instance = null; + + public static Broadcast> getInstance(JavaSparkContext jsc) { + if (instance == null) { + synchronized (JavaWordBlacklist.class) { + if (instance == null) { + List wordBlacklist = Arrays.asList("a", "b", "c"); + instance = jsc.broadcast(wordBlacklist); + } + } + } + return instance; + } +} + +class JavaDroppedWordsCounter { + + private static volatile Accumulator instance = null; + + public static Accumulator getInstance(JavaSparkContext jsc) { + if (instance == null) { + synchronized (JavaDroppedWordsCounter.class) { + if (instance == null) { + instance = jsc.accumulator(0, "WordsInBlacklistCounter"); + } + } + } + return instance; + } +} + +wordCounts.foreachRDD(new Function2, Time, Void>() { + @Override + public Void call(JavaPairRDD rdd, Time time) throws IOException { + // Get or register the blacklist Broadcast + final Broadcast> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); + // Get or register the droppedWordsCounter Accumulator + final Accumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); + // Use blacklist to drop words and use droppedWordsCounter to count them + String counts = rdd.filter(new Function, Boolean>() { + @Override + public Boolean call(Tuple2 wordCount) throws Exception { + if (blacklist.value().contains(wordCount._1())) { + droppedWordsCounter.add(wordCount._2()); + return false; + } else { + return true; + } + } + }).collect().toString(); + String output = "Counts at time " + time + " " + counts; + } +} + +{% endhighlight %} + +See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java). +
+
+{% highlight python %} + +def getWordBlacklist(sparkContext): + if ('wordBlacklist' not in globals()): + globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"]) + return globals()['wordBlacklist'] + +def getDroppedWordsCounter(sparkContext): + if ('droppedWordsCounter' not in globals()): + globals()['droppedWordsCounter'] = sparkContext.accumulator(0) + return globals()['droppedWordsCounter'] + +def echo(time, rdd): + # Get or register the blacklist Broadcast + blacklist = getWordBlacklist(rdd.context) + # Get or register the droppedWordsCounter Accumulator + droppedWordsCounter = getDroppedWordsCounter(rdd.context) + + # Use blacklist to drop words and use droppedWordsCounter to count them + def filterFunc(wordCount): + if wordCount[0] in blacklist.value: + droppedWordsCounter.add(wordCount[1]) + False + else: + True + + counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect()) + +wordCounts.foreachRDD(echo) + +{% endhighlight %} + +See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/recoverable_network_wordcount.py). + +
+
+ +*** + ## DataFrame and SQL Operations You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL. diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java index bceda97f058e..90d473703ec5 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java @@ -21,17 +21,22 @@ import java.io.IOException; import java.nio.charset.Charset; import java.util.Arrays; +import java.util.List; import java.util.regex.Pattern; import scala.Tuple2; import com.google.common.collect.Lists; import com.google.common.io.Files; +import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.broadcast.Broadcast; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaDStream; @@ -41,7 +46,48 @@ import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; /** - * Counts words in text encoded with UTF8 received from the network every second. + * Use this singleton to get or register a Broadcast variable. + */ +class JavaWordBlacklist { + + private static volatile Broadcast> instance = null; + + public static Broadcast> getInstance(JavaSparkContext jsc) { + if (instance == null) { + synchronized (JavaWordBlacklist.class) { + if (instance == null) { + List wordBlacklist = Arrays.asList("a", "b", "c"); + instance = jsc.broadcast(wordBlacklist); + } + } + } + return instance; + } +} + +/** + * Use this singleton to get or register an Accumulator. + */ +class JavaDroppedWordsCounter { + + private static volatile Accumulator instance = null; + + public static Accumulator getInstance(JavaSparkContext jsc) { + if (instance == null) { + synchronized (JavaDroppedWordsCounter.class) { + if (instance == null) { + instance = jsc.accumulator(0, "WordsInBlacklistCounter"); + } + } + } + return instance; + } +} + +/** + * Counts words in text encoded with UTF8 received from the network every second. This example also + * shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that + * they can be registered on driver failures. * * Usage: JavaRecoverableNetworkWordCount * and describe the TCP server that Spark Streaming would connect to receive @@ -111,10 +157,27 @@ public Integer call(Integer i1, Integer i2) { wordCounts.foreachRDD(new Function2, Time, Void>() { @Override public Void call(JavaPairRDD rdd, Time time) throws IOException { - String counts = "Counts at time " + time + " " + rdd.collect(); - System.out.println(counts); + // Get or register the blacklist Broadcast + final Broadcast> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); + // Get or register the droppedWordsCounter Accumulator + final Accumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); + // Use blacklist to drop words and use droppedWordsCounter to count them + String counts = rdd.filter(new Function, Boolean>() { + @Override + public Boolean call(Tuple2 wordCount) throws Exception { + if (blacklist.value().contains(wordCount._1())) { + droppedWordsCounter.add(wordCount._2()); + return false; + } else { + return true; + } + } + }).collect().toString(); + String output = "Counts at time " + time + " " + counts; + System.out.println(output); + System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally"); System.out.println("Appending to " + outputFile.getAbsolutePath()); - Files.append(counts + "\n", outputFile, Charset.defaultCharset()); + Files.append(output + "\n", outputFile, Charset.defaultCharset()); return null; } }); diff --git a/examples/src/main/python/streaming/recoverable_network_wordcount.py b/examples/src/main/python/streaming/recoverable_network_wordcount.py index ac91f0a06b17..52b2639cdf55 100644 --- a/examples/src/main/python/streaming/recoverable_network_wordcount.py +++ b/examples/src/main/python/streaming/recoverable_network_wordcount.py @@ -44,6 +44,20 @@ from pyspark.streaming import StreamingContext +# Get or register a Broadcast variable +def getWordBlacklist(sparkContext): + if ('wordBlacklist' not in globals()): + globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"]) + return globals()['wordBlacklist'] + + +# Get or register an Accumulator +def getDroppedWordsCounter(sparkContext): + if ('droppedWordsCounter' not in globals()): + globals()['droppedWordsCounter'] = sparkContext.accumulator(0) + return globals()['droppedWordsCounter'] + + def createContext(host, port, outputPath): # If you do not see this printed, that means the StreamingContext has been loaded # from the new checkpoint @@ -60,8 +74,22 @@ def createContext(host, port, outputPath): wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y) def echo(time, rdd): - counts = "Counts at time %s %s" % (time, rdd.collect()) + # Get or register the blacklist Broadcast + blacklist = getWordBlacklist(rdd.context) + # Get or register the droppedWordsCounter Accumulator + droppedWordsCounter = getDroppedWordsCounter(rdd.context) + + # Use blacklist to drop words and use droppedWordsCounter to count them + def filterFunc(wordCount): + if wordCount[0] in blacklist.value: + droppedWordsCounter.add(wordCount[1]) + False + else: + True + + counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect()) print(counts) + print("Dropped %d word(s) totally" % droppedWordsCounter.value) print("Appending to " + os.path.abspath(outputPath)) with open(outputPath, 'a') as f: f.write(counts + "\n") diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index 9916882e4f94..38d4fd11f97d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -23,13 +23,55 @@ import java.nio.charset.Charset import com.google.common.io.Files -import org.apache.spark.SparkConf +import org.apache.spark.{Accumulator, SparkConf, SparkContext} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Time, Seconds, StreamingContext} import org.apache.spark.util.IntParam /** - * Counts words in text encoded with UTF8 received from the network every second. + * Use this singleton to get or register a Broadcast variable. + */ +object WordBlacklist { + + @volatile private var instance: Broadcast[Seq[String]] = null + + def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { + if (instance == null) { + synchronized { + if (instance == null) { + val wordBlacklist = Seq("a", "b", "c") + instance = sc.broadcast(wordBlacklist) + } + } + } + instance + } +} + +/** + * Use this singleton to get or register an Accumulator. + */ +object DroppedWordsCounter { + + @volatile private var instance: Accumulator[Long] = null + + def getInstance(sc: SparkContext): Accumulator[Long] = { + if (instance == null) { + synchronized { + if (instance == null) { + instance = sc.accumulator(0L, "WordsInBlacklistCounter") + } + } + } + instance + } +} + +/** + * Counts words in text encoded with UTF8 received from the network every second. This example also + * shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that + * they can be registered on driver failures. * * Usage: RecoverableNetworkWordCount * and describe the TCP server that Spark Streaming would connect to receive @@ -75,10 +117,24 @@ object RecoverableNetworkWordCount { val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { - val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]") - println(counts) + // Get or register the blacklist Broadcast + val blacklist = WordBlacklist.getInstance(rdd.sparkContext) + // Get or register the droppedWordsCounter Accumulator + val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) + // Use blacklist to drop words and use droppedWordsCounter to count them + val counts = rdd.filter { case (word, count) => + if (blacklist.value.contains(word)) { + droppedWordsCounter += count + false + } else { + true + } + }.collect().mkString("[", ", ", "]") + val output = "Counts at time " + time + " " + counts + println(output) + println("Dropped " + droppedWordsCounter.value + " word(s) totally") println("Appending to " + outputFile.getAbsolutePath) - Files.append(counts + "\n", outputFile, Charset.defaultCharset()) + Files.append(output + "\n", outputFile, Charset.defaultCharset()) }) ssc } From c6c9bf99af0ee0559248ad772460e9b2efde5861 Mon Sep 17 00:00:00 2001 From: pierre-borckmans Date: Tue, 22 Dec 2015 23:00:42 -0800 Subject: [PATCH 07/16] [SPARK-12477][SQL] - Tungsten projection fails for null values in array fields Accessing null elements in an array field fails when tungsten is enabled. It works in Spark 1.3.1, and in Spark > 1.5 with Tungsten disabled. This PR solves this by checking if the accessed element in the array field is null, in the generated code. Example: ``` // Array of String case class AS( as: Seq[String] ) val dfAS = sc.parallelize( Seq( AS ( Seq("a",null,"b") ) ) ).toDF dfAS.registerTempTable("T_AS") for (i <- 0 to 2) { println(i + " = " + sqlContext.sql(s"select as[$i] from T_AS").collect.mkString(","))} ``` With Tungsten disabled: ``` 0 = [a] 1 = [null] 2 = [b] ``` With Tungsten enabled: ``` 0 = [a] 15/12/22 09:32:50 ERROR Executor: Exception in task 7.0 in stage 1.0 (TID 15) java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters$UTF8StringWriter.getSize(UnsafeRowWriters.java:90) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.TungstenProject$$anonfun$3$$anonfun$apply$3.apply(basicOperators.scala:90) at org.apache.spark.sql.execution.TungstenProject$$anonfun$3$$anonfun$apply$3.apply(basicOperators.scala:88) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ``` Author: pierre-borckmans Closes #10429 from pierre-borckmans/SPARK-12477_Tungsten-Projection-Null-Element-In-Array. (cherry picked from commit 43b2a6390087b7ce262a54dc8ab8dd825db62e21) Signed-off-by: Reynold Xin --- .../sql/catalyst/expressions/complexTypeExtractors.scala | 2 +- .../org/apache/spark/sql/DataFrameComplexTypeSuite.scala | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala index 10ce10aaf6da..31520f5241d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala @@ -222,7 +222,7 @@ case class GetArrayItem(child: Expression, ordinal: Expression) nullSafeCodeGen(ctx, ev, (eval1, eval2) => { s""" final int index = (int) $eval2; - if (index >= $eval1.numElements() || index < 0) { + if (index >= $eval1.numElements() || index < 0 || $eval1.isNullAt(index)) { ${ev.isNull} = true; } else { ${ev.value} = ${ctx.getValue(eval1, dataType, "index")}; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala index 09f7b507670c..b76fc73b7fa0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala @@ -43,4 +43,13 @@ class DataFrameComplexTypeSuite extends QueryTest with SharedSQLContext { val df = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b") df.select(array($"a").as("s")).select(f(expr("s[0]"))).collect() } + + test("SPARK-12477 accessing null element in array field") { + val df = sparkContext.parallelize(Seq((Seq("val1", null, "val2"), + Seq(Some(1), None, Some(2))))).toDF("s", "i") + val nullStringRow = df.selectExpr("s[1]").collect()(0) + assert(nullStringRow == org.apache.spark.sql.Row(null)) + val nullIntRow = df.selectExpr("i[1]").collect()(0) + assert(nullIntRow == org.apache.spark.sql.Row(null)) + } } From 5987b1658b837400691160c38ba6eedc47274ee4 Mon Sep 17 00:00:00 2001 From: Adrian Bridgett Date: Wed, 23 Dec 2015 16:00:03 -0800 Subject: [PATCH 08/16] [SPARK-12499][BUILD] don't force MAVEN_OPTS allow the user to override MAVEN_OPTS (2GB wasn't sufficient for me) Author: Adrian Bridgett Closes #10448 from abridgett/feature/do_not_force_maven_opts. (cherry picked from commit ead6abf7e7fc14b451214951d4991d497aa65e63) Signed-off-by: Josh Rosen --- make-distribution.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/make-distribution.sh b/make-distribution.sh index c949e94a1226..48c7c8bf1eba 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -162,7 +162,7 @@ fi # Build uber fat JAR cd "$SPARK_HOME" -export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" +export MAVEN_OPTS="${MAVEN_OPTS:--Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m}" # Store the command as an array because $MVN variable might have spaces in it. # Normal quoting tricks don't work. From b49856ae5983aca8ed7df2f478fc5f399ec34ce8 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Fri, 18 Dec 2015 16:05:18 -0800 Subject: [PATCH 09/16] [SPARK-12411][CORE] Decrease executor heartbeat timeout to match heartbeat interval Previously, the rpc timeout was the default network timeout, which is the same value the driver uses to determine dead executors. This means if there is a network issue, the executor is determined dead after one heartbeat attempt. There is a separate config for the heartbeat interval which is a better value to use for the heartbeat RPC. With this change, the executor will make multiple heartbeat attempts even with RPC issues. Author: Nong Li Closes #10365 from nongli/spark-12411. --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 7b68dfe5ad06..a7bb412e1c94 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -30,6 +30,7 @@ import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.rpc.RpcTimeout import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} @@ -445,7 +446,8 @@ private[spark] class Executor( val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId) try { - val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](message) + val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse]( + message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) if (response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat") env.blockManager.reregister() From 4dd8712c1b64a64da0fa0413e2c9be68ad0ddc17 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 24 Dec 2015 21:27:55 +0900 Subject: [PATCH 10/16] [SPARK-12502][BUILD][PYTHON] Script /dev/run-tests fails when IBM Java is used fix an exception with IBM JDK by removing update field from a JavaVersion tuple. This is because IBM JDK does not have information on update '_xx' Author: Kazuaki Ishizaki Closes #10463 from kiszk/SPARK-12502. (cherry picked from commit 9e85bb71ad2d7d3a9da0cb8853f3216d37e6ff47) Signed-off-by: Kousuke Saruta --- dev/run-tests.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/dev/run-tests.py b/dev/run-tests.py index 9e1abb069719..a2bc6ca3254c 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -148,7 +148,7 @@ def determine_java_executable(): return java_exe if java_exe else which("java") -JavaVersion = namedtuple('JavaVersion', ['major', 'minor', 'patch', 'update']) +JavaVersion = namedtuple('JavaVersion', ['major', 'minor', 'patch']) def determine_java_version(java_exe): @@ -164,14 +164,13 @@ def determine_java_version(java_exe): # find raw version string, eg 'java version "1.8.0_25"' raw_version_str = next(x for x in raw_output_lines if " version " in x) - match = re.search('(\d+)\.(\d+)\.(\d+)_(\d+)', raw_version_str) + match = re.search('(\d+)\.(\d+)\.(\d+)', raw_version_str) major = int(match.group(1)) minor = int(match.group(2)) patch = int(match.group(3)) - update = int(match.group(4)) - return JavaVersion(major, minor, patch, update) + return JavaVersion(major, minor, patch) # ------------------------------------------------------------------------------------------------- # Functions for running the other build and test scripts From 865dd8bccfc994310ad6664151d469043706ef3b Mon Sep 17 00:00:00 2001 From: CK50 Date: Thu, 24 Dec 2015 13:39:11 +0000 Subject: [PATCH 11/16] [SPARK-12010][SQL] Spark JDBC requires support for column-name-free INSERT syntax In the past Spark JDBC write only worked with technologies which support the following INSERT statement syntax (JdbcUtils.scala: insertStatement()): INSERT INTO $table VALUES ( ?, ?, ..., ? ) But some technologies require a list of column names: INSERT INTO $table ( $colNameList ) VALUES ( ?, ?, ..., ? ) This was blocking the use of e.g. the Progress JDBC Driver for Cassandra. Another limitation is that syntax 1 relies no the dataframe field ordering match that of the target table. This works fine, as long as the target table has been created by writer.jdbc(). If the target table contains more columns (not created by writer.jdbc()), then the insert fails due mismatch of number of columns or their data types. This PR switches to the recommended second INSERT syntax. Column names are taken from datafram field names. Author: CK50 Closes #10380 from CK50/master-SPARK-12010-2. (cherry picked from commit 502476e45c314a1229b3bce1c61f5cb94a9fc04b) Signed-off-by: Sean Owen --- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 252f1cfd5d9c..28cd688ef7d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -63,14 +63,10 @@ object JdbcUtils extends Logging { * Returns a PreparedStatement that inserts a row into table via conn. */ def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = { - val sql = new StringBuilder(s"INSERT INTO $table VALUES (") - var fieldsLeft = rddSchema.fields.length - while (fieldsLeft > 0) { - sql.append("?") - if (fieldsLeft > 1) sql.append(", ") else sql.append(")") - fieldsLeft = fieldsLeft - 1 - } - conn.prepareStatement(sql.toString()) + val columns = rddSchema.fields.map(_.name).mkString(",") + val placeholders = rddSchema.fields.map(_ => "?").mkString(",") + val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)" + conn.prepareStatement(sql) } /** From b8da77ef776ab9cdc130a70293d75e7bdcdf95b0 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 27 Dec 2015 23:18:48 -0800 Subject: [PATCH 12/16] [SPARK-12520] [PYSPARK] Correct Descriptions and Add Use Cases in Equi-Join After reading the JIRA https://issues.apache.org/jira/browse/SPARK-12520, I double checked the code. For example, users can do the Equi-Join like ```df.join(df2, 'name', 'outer').select('name', 'height').collect()``` - There exists a bug in 1.5 and 1.4. The code just ignores the third parameter (join type) users pass. However, the join type we called is `Inner`, even if the user-specified type is the other type (e.g., `Outer`). - After a PR: https://github.com/apache/spark/pull/8600, the 1.6 does not have such an issue, but the description has not been updated. Plan to submit another PR to fix 1.5 and issue an error message if users specify a non-inner join type when using Equi-Join. Author: gatorsmile Closes #10477 from gatorsmile/pyOuterJoin. --- python/pyspark/sql/dataframe.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index b15b8d7b087d..a0fdaf34a185 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -608,13 +608,16 @@ def join(self, other, on=None, how=None): :param on: a string for join column name, a list of column names, , a join expression (Column) or a list of Columns. If `on` is a string or a list of string indicating the name of the join column(s), - the column(s) must exist on both sides, and this performs an inner equi-join. + the column(s) must exist on both sides, and this performs an equi-join. :param how: str, default 'inner'. One of `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. >>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect() [Row(name=None, height=80), Row(name=u'Alice', height=None), Row(name=u'Bob', height=85)] + >>> df.join(df2, 'name', 'outer').select('name', 'height').collect() + [Row(name=u'Tom', height=80), Row(name=u'Alice', height=None), Row(name=u'Bob', height=85)] + >>> cond = [df.name == df3.name, df.age == df3.age] >>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect() [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)] From 1fbcb6e7be9cd9fa5255837cfc5358f2283f4aaf Mon Sep 17 00:00:00 2001 From: Yaron Weinsberg Date: Tue, 29 Dec 2015 05:19:11 +0900 Subject: [PATCH 13/16] [SPARK-12517] add default RDD name for one created via sc.textFile The feature was first added at commit: 7b877b27053bfb7092e250e01a3b887e1b50a109 but was later removed (probably by mistake) at commit: fc8b58195afa67fbb75b4c8303e022f703cbf007. This change sets the default path of RDDs created via sc.textFile(...) to the path argument. Here is the symptom: * Using spark-1.5.2-bin-hadoop2.6: scala> sc.textFile("/home/root/.bashrc").name res5: String = null scala> sc.binaryFiles("/home/root/.bashrc").name res6: String = /home/root/.bashrc * while using Spark 1.3.1: scala> sc.textFile("/home/root/.bashrc").name res0: String = /home/root/.bashrc scala> sc.binaryFiles("/home/root/.bashrc").name res1: String = /home/root/.bashrc Author: Yaron Weinsberg Author: yaron Closes #10456 from wyaron/master. (cherry picked from commit 73b70f076d4e22396b7e145f2ce5974fbf788048) Signed-off-by: Kousuke Saruta --- .../scala/org/apache/spark/SparkContext.scala | 4 +-- .../org/apache/spark/SparkContextSuite.scala | 25 +++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8a62b71c3fa6..add3f0444e53 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -830,7 +830,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], - minPartitions).map(pair => pair._2.toString) + minPartitions).map(pair => pair._2.toString).setName(path) } /** @@ -879,7 +879,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli classOf[Text], classOf[Text], updateConf, - minPartitions).setName(path).map(record => (record._1.toString, record._2.toString)) + minPartitions).map(record => (record._1.toString, record._2.toString)).setName(path) } /** diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index d4f2ea87650a..172ef050cc27 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -274,6 +274,31 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { } } + test("Default path for file based RDDs is properly set (SPARK-12517)") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + + // Test filetextFile, wholeTextFiles, binaryFiles, hadoopFile and + // newAPIHadoopFile for setting the default path as the RDD name + val mockPath = "default/path/for/" + + var targetPath = mockPath + "textFile" + assert(sc.textFile(targetPath).name === targetPath) + + targetPath = mockPath + "wholeTextFiles" + assert(sc.wholeTextFiles(targetPath).name === targetPath) + + targetPath = mockPath + "binaryFiles" + assert(sc.binaryFiles(targetPath).name === targetPath) + + targetPath = mockPath + "hadoopFile" + assert(sc.hadoopFile(targetPath).name === targetPath) + + targetPath = mockPath + "newAPIHadoopFile" + assert(sc.newAPIHadoopFile(targetPath).name === targetPath) + + sc.stop() + } + test("calling multiple sc.stop() must not throw any exception") { noException should be thrownBy { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) From 7c7d76f34c0e09aae12f03e7c2922d4eb50d1830 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 29 Dec 2015 05:33:19 +0900 Subject: [PATCH 14/16] [SPARK-12424][ML] The implementation of ParamMap#filter is wrong. ParamMap#filter uses `mutable.Map#filterKeys`. The return type of `filterKey` is collection.Map, not mutable.Map but the result is casted to mutable.Map using `asInstanceOf` so we get `ClassCastException`. Also, the return type of Map#filterKeys is not Serializable. It's the issue of Scala (https://issues.scala-lang.org/browse/SI-6654). Author: Kousuke Saruta Closes #10381 from sarutak/SPARK-12424. (cherry picked from commit 07165ca06fe0866677525f85fec25e4dbd336674) Signed-off-by: Kousuke Saruta --- .../org/apache/spark/ml/param/params.scala | 8 ++++-- .../apache/spark/ml/param/ParamsSuite.scala | 28 +++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index d182b0a98896..149c46a4e87e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -857,8 +857,12 @@ final class ParamMap private[ml] (private val map: mutable.Map[Param[Any], Any]) * Filters this param map for the given parent. */ def filter(parent: Params): ParamMap = { - val filtered = map.filterKeys(_.parent == parent) - new ParamMap(filtered.asInstanceOf[mutable.Map[Param[Any], Any]]) + // Don't use filterKeys because mutable.Map#filterKeys + // returns the instance of collections.Map, not mutable.Map. + // Otherwise, we get ClassCastException. + // Not using filterKeys also avoid SI-6654 + val filtered = map.filter { case (k, _) => k.parent == parent.uid } + new ParamMap(filtered) } /** diff --git a/mllib/src/test/scala/org/apache/spark/ml/param/ParamsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/param/ParamsSuite.scala index a1878be747ce..748868554fe6 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/param/ParamsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/param/ParamsSuite.scala @@ -17,7 +17,10 @@ package org.apache.spark.ml.param +import java.io.{ByteArrayOutputStream, NotSerializableException, ObjectOutputStream} + import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.util.MyParams import org.apache.spark.mllib.linalg.{Vector, Vectors} class ParamsSuite extends SparkFunSuite { @@ -349,6 +352,31 @@ class ParamsSuite extends SparkFunSuite { val t3 = t.copy(ParamMap(t.maxIter -> 20)) assert(t3.isSet(t3.maxIter)) } + + test("Filtering ParamMap") { + val params1 = new MyParams("my_params1") + val params2 = new MyParams("my_params2") + val paramMap = ParamMap( + params1.intParam -> 1, + params2.intParam -> 1, + params1.doubleParam -> 0.2, + params2.doubleParam -> 0.2) + val filteredParamMap = paramMap.filter(params1) + + assert(filteredParamMap.size === 2) + filteredParamMap.toSeq.foreach { + case ParamPair(p, _) => + assert(p.parent === params1.uid) + } + + // At the previous implementation of ParamMap#filter, + // mutable.Map#filterKeys was used internally but + // the return type of the method is not serializable (see SI-6654). + // Now mutable.Map#filter is used instead of filterKeys and the return type is serializable. + // So let's ensure serializability. + val objOut = new ObjectOutputStream(new ByteArrayOutputStream()) + objOut.writeObject(filteredParamMap) + } } object ParamsSuite extends SparkFunSuite { From a9c52d4954aa445ab751b38ddbfd8fb6f84d7c14 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Tue, 29 Dec 2015 07:02:30 +0900 Subject: [PATCH 15/16] [SPARK-12222][CORE] Deserialize RoaringBitmap using Kryo serializer throw Buffer underflow exception Since we only need to implement `def skipBytes(n: Int)`, code in #10213 could be simplified. davies scwf Author: Daoyuan Wang Closes #10253 from adrian-wang/kryo. (cherry picked from commit a6d385322e7dfaff600465fa5302010a5f122c6b) Signed-off-by: Kousuke Saruta --- .../scala/org/apache/spark/serializer/KryoSerializer.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index ebed766d7bd5..e8268a577df7 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -399,12 +399,7 @@ private[serializer] class KryoInputDataInputBridge(input: KryoInput) extends Dat override def readInt(): Int = input.readInt() override def readUnsignedShort(): Int = input.readShortUnsigned() override def skipBytes(n: Int): Int = { - var remaining: Long = n - while (remaining > 0) { - val skip = Math.min(Integer.MAX_VALUE, remaining).asInstanceOf[Int] - input.skip(skip) - remaining -= skip - } + input.skip(n) n } override def readFully(b: Array[Byte]): Unit = input.read(b) From fd202485ace613d9930d0ede48ba8a65920004db Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 28 Dec 2015 15:01:51 -0800 Subject: [PATCH 16/16] [SPARK-12489][CORE][SQL][MLIB] Fix minor issues found by FindBugs Include the following changes: 1. Close `java.sql.Statement` 2. Fix incorrect `asInstanceOf`. 3. Remove unnecessary `synchronized` and `ReentrantLock`. Author: Shixiong Zhu Closes #10440 from zsxwing/findbugs. (cherry picked from commit 710b41172958a0b3a2b70c48821aefc81893731b) Signed-off-by: Shixiong Zhu --- .../cluster/mesos/MesosClusterScheduler.scala | 3 +- .../apache/spark/launcher/LauncherServer.java | 4 +- .../java/org/apache/spark/launcher/Main.java | 2 +- .../scala/org/apache/spark/ml/tree/Node.scala | 4 +- .../apache/spark/sql/DataFrameWriter.scala | 7 ++- .../execution/datasources/jdbc/JDBCRDD.scala | 47 ++++++++++--------- .../datasources/jdbc/JdbcUtils.scala | 16 ++++++- 7 files changed, 51 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index a6d9374eb9e8..16815d51d4c6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -18,7 +18,6 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File -import java.util.concurrent.locks.ReentrantLock import java.util.{Collections, Date, List => JList} import scala.collection.JavaConverters._ @@ -126,7 +125,7 @@ private[spark] class MesosClusterScheduler( private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200) private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute private val schedulerState = engineFactory.createEngine("scheduler") - private val stateLock = new ReentrantLock() + private val stateLock = new Object() private val finishedDrivers = new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers) private var frameworkId: String = null diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java index d099ee9aa9da..414ffc2c84e5 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java +++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java @@ -293,9 +293,7 @@ private class ServerConnection extends LauncherConnection { protected void handle(Message msg) throws IOException { try { if (msg instanceof Hello) { - synchronized (timeout) { - timeout.cancel(); - } + timeout.cancel(); timeout = null; Hello hello = (Hello) msg; ChildProcAppHandle handle = pending.remove(hello.secret); diff --git a/launcher/src/main/java/org/apache/spark/launcher/Main.java b/launcher/src/main/java/org/apache/spark/launcher/Main.java index a4e3acc674f3..e751e948e356 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/Main.java +++ b/launcher/src/main/java/org/apache/spark/launcher/Main.java @@ -151,7 +151,7 @@ private static class MainClassOptionParser extends SparkSubmitOptionParser { @Override protected boolean handle(String opt, String value) { - if (opt == CLASS) { + if (CLASS.equals(opt)) { className = value; } return false; diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala index d89682611e3f..9cfd466294b9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala @@ -386,9 +386,9 @@ private[tree] object LearningNode { var levelsToGo = indexToLevel(nodeIndex) while (levelsToGo > 0) { if ((nodeIndex & (1 << levelsToGo - 1)) == 0) { - tmpNode = tmpNode.leftChild.asInstanceOf[LearningNode] + tmpNode = tmpNode.leftChild.get } else { - tmpNode = tmpNode.rightChild.asInstanceOf[LearningNode] + tmpNode = tmpNode.rightChild.get } levelsToGo -= 1 } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 03867beb7822..ab362539e298 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -297,7 +297,12 @@ final class DataFrameWriter private[sql](df: DataFrame) { if (!tableExists) { val schema = JdbcUtils.schemaString(df, url) val sql = s"CREATE TABLE $table ($schema)" - conn.createStatement.executeUpdate(sql) + val statement = conn.createStatement + try { + statement.executeUpdate(sql) + } finally { + statement.close() + } } } finally { conn.close() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index b2b5b2658e9d..c2f2a312a4fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -120,30 +120,35 @@ private[sql] object JDBCRDD extends Logging { val dialect = JdbcDialects.get(url) val conn: Connection = getConnector(properties.getProperty("driver"), url, properties)() try { - val rs = conn.prepareStatement(s"SELECT * FROM $table WHERE 1=0").executeQuery() + val statement = conn.prepareStatement(s"SELECT * FROM $table WHERE 1=0") try { - val rsmd = rs.getMetaData - val ncols = rsmd.getColumnCount - val fields = new Array[StructField](ncols) - var i = 0 - while (i < ncols) { - val columnName = rsmd.getColumnLabel(i + 1) - val dataType = rsmd.getColumnType(i + 1) - val typeName = rsmd.getColumnTypeName(i + 1) - val fieldSize = rsmd.getPrecision(i + 1) - val fieldScale = rsmd.getScale(i + 1) - val isSigned = rsmd.isSigned(i + 1) - val nullable = rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls - val metadata = new MetadataBuilder().putString("name", columnName) - val columnType = - dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse( - getCatalystType(dataType, fieldSize, fieldScale, isSigned)) - fields(i) = StructField(columnName, columnType, nullable, metadata.build()) - i = i + 1 + val rs = statement.executeQuery() + try { + val rsmd = rs.getMetaData + val ncols = rsmd.getColumnCount + val fields = new Array[StructField](ncols) + var i = 0 + while (i < ncols) { + val columnName = rsmd.getColumnLabel(i + 1) + val dataType = rsmd.getColumnType(i + 1) + val typeName = rsmd.getColumnTypeName(i + 1) + val fieldSize = rsmd.getPrecision(i + 1) + val fieldScale = rsmd.getScale(i + 1) + val isSigned = rsmd.isSigned(i + 1) + val nullable = rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls + val metadata = new MetadataBuilder().putString("name", columnName) + val columnType = + dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse( + getCatalystType(dataType, fieldSize, fieldScale, isSigned)) + fields(i) = StructField(columnName, columnType, nullable, metadata.build()) + i = i + 1 + } + return new StructType(fields) + } finally { + rs.close() } - return new StructType(fields) } finally { - rs.close() + statement.close() } } finally { conn.close() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 28cd688ef7d7..46f2670eee01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -49,14 +49,26 @@ object JdbcUtils extends Logging { // Somewhat hacky, but there isn't a good way to identify whether a table exists for all // SQL database systems using JDBC meta data calls, considering "table" could also include // the database name. Query used to find table exists can be overriden by the dialects. - Try(conn.prepareStatement(dialect.getTableExistsQuery(table)).executeQuery()).isSuccess + Try { + val statement = conn.prepareStatement(dialect.getTableExistsQuery(table)) + try { + statement.executeQuery() + } finally { + statement.close() + } + }.isSuccess } /** * Drops a table from the JDBC database. */ def dropTable(conn: Connection, table: String): Unit = { - conn.createStatement.executeUpdate(s"DROP TABLE $table") + val statement = conn.createStatement + try { + statement.executeUpdate(s"DROP TABLE $table") + } finally { + statement.close() + } } /**