From 6d55393f20e16e9e43e466f474617dc0b90bb533 Mon Sep 17 00:00:00 2001 From: "zhichao.li" Date: Wed, 4 Nov 2015 16:28:08 +0800 Subject: [PATCH 1/4] parallel --- .../main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../spark/sql/hive/ParallelUnionRDD.scala | 52 +++++++++++++++++++ .../apache/spark/sql/hive/TableReader.scala | 2 +- 3 files changed, 54 insertions(+), 2 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a81a98b526b5a..89353862e6f41 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -214,7 +214,7 @@ abstract class RDD[T: ClassTag]( // Our dependencies and partitions will be gotten by calling subclass's methods below, and will // be overwritten when we're checkpointed private var dependencies_ : Seq[Dependency[_]] = null - @transient private var partitions_ : Array[Partition] = null + @transient @volatile private var partitions_ : Array[Partition] = null /** An Option holding our checkpoint RDD, if we are checkpointed */ private def checkpointRDD: Option[CheckpointRDD[T]] = checkpointData.flatMap(_.checkpointRDD) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala new file mode 100644 index 0000000000000..35b9e8a716d9b --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.util.concurrent.Callable + +import org.apache.spark.rdd.{RDD, UnionPartition, UnionRDD} +import org.apache.spark.util.ThreadUtils +import org.apache.spark.{Partition, SparkContext} + +import scala.reflect.ClassTag + +class ParallelUnionRDD[T: ClassTag]( + sc: SparkContext, + rdds: Seq[RDD[T]]) extends UnionRDD[T](sc, rdds){ + // TODO: We might need to guess a more reasonable thread pool size here + @transient val executorService = ThreadUtils.newDaemonFixedThreadPool( + Math.min(rdds.size, Runtime.getRuntime.availableProcessors()), "ParallelUnionRDD") + + override def getPartitions: Array[Partition] = { + // Calc partitions field for each RDD in parallel. + val rddPartitions = rdds.map {rdd => + (rdd, executorService.submit(new Callable[Array[Partition]] { + override def call(): Array[Partition] = rdd.partitions + })) + }.map {case(r, f) => (r, f.get())} + + val array = new Array[Partition](rddPartitions.map(_._2.length).sum) + var pos = 0 + for (((rdd, partitions), rddIndex) <- rddPartitions.zipWithIndex; split <- partitions) { + array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index) + pos += 1 + } + array + } + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index fd465e80a87e5..8cb5dd4d9d09c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -246,7 +246,7 @@ class HadoopTableReader( if (hivePartitionRDDs.size == 0) { new EmptyRDD[InternalRow](sc.sparkContext) } else { - new UnionRDD(hivePartitionRDDs(0).context, hivePartitionRDDs) + new ParallelUnionRDD(hivePartitionRDDs(0).context, hivePartitionRDDs) } } From 6456f12c3d4554a03d18f9d8d26ad315e33753d8 Mon Sep 17 00:00:00 2001 From: "zhichao.li" Date: Wed, 24 Feb 2016 15:11:30 +0800 Subject: [PATCH 2/4] address comment --- .../main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../spark/sql/hive/ParallelUnionRDD.scala | 9 ++++---- .../hive/execution/HiveTableScanSuite.scala | 21 +++++++++++++++++++ 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 89353862e6f41..a81a98b526b5a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -214,7 +214,7 @@ abstract class RDD[T: ClassTag]( // Our dependencies and partitions will be gotten by calling subclass's methods below, and will // be overwritten when we're checkpointed private var dependencies_ : Seq[Dependency[_]] = null - @transient @volatile private var partitions_ : Array[Partition] = null + @transient private var partitions_ : Array[Partition] = null /** An Option holding our checkpoint RDD, if we are checkpointed */ private def checkpointRDD: Option[CheckpointRDD[T]] = checkpointData.flatMap(_.checkpointRDD) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala index 35b9e8a716d9b..defa695ec327e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala @@ -25,17 +25,18 @@ import org.apache.spark.{Partition, SparkContext} import scala.reflect.ClassTag +object ParallelUnionRDD { + lazy val executorService = ThreadUtils.newDaemonFixedThreadPool(16, "ParallelUnionRDD") +} + class ParallelUnionRDD[T: ClassTag]( sc: SparkContext, rdds: Seq[RDD[T]]) extends UnionRDD[T](sc, rdds){ - // TODO: We might need to guess a more reasonable thread pool size here - @transient val executorService = ThreadUtils.newDaemonFixedThreadPool( - Math.min(rdds.size, Runtime.getRuntime.availableProcessors()), "ParallelUnionRDD") override def getPartitions: Array[Partition] = { // Calc partitions field for each RDD in parallel. val rddPartitions = rdds.map {rdd => - (rdd, executorService.submit(new Callable[Array[Partition]] { + (rdd, ParallelUnionRDD.executorService.submit(new Callable[Array[Partition]] { override def call(): Array[Partition] = rdd.partitions })) }.map {case(r, f) => (r, f.get())} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala index b0c0dcbe5c25c..e14c562908478 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala @@ -89,4 +89,25 @@ class HiveTableScanSuite extends HiveComparisonTest { assert(sql("select CaseSensitiveColName from spark_4959_2").head() === Row("hi")) assert(sql("select casesensitivecolname from spark_4959_2").head() === Row("hi")) } + + + test("Spark-11517: calc partitions in parallel") { + val partitionNum = 500 + val partitionTable = "combine" + sql("set hive.exec.dynamic.partition.mode=nonstrict") + val df = (1 to 500).map { i => (i, i)}.toDF("a", "b").coalesce(500) + df.registerTempTable("temp") + sql(s"""create table $partitionTable (a int, b string) + |partitioned by (c int) + |stored as orc""".stripMargin) + sql( + s"""insert into table $partitionTable partition(c) + |select a, b, (b % $partitionNum) as c from temp""".stripMargin) + + // Ensure that the result is the same as the original + assert( + sql( s"""select * from $partitionTable order by a""").collect().map(_.toString()).deep + == (1 to 500).map{i => s"[$i,$i,${i % partitionNum}]"}.toArray.deep + ) + } } From db84ab94d26e945fc44ef2adb789eb85ad229a3c Mon Sep 17 00:00:00 2001 From: "zhichao.li" Date: Wed, 24 Feb 2016 15:41:14 +0800 Subject: [PATCH 3/4] style --- .../scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala index defa695ec327e..fc075406fc61f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala @@ -19,11 +19,11 @@ package org.apache.spark.sql.hive import java.util.concurrent.Callable +import scala.reflect.ClassTag + +import org.apache.spark.{Partition, SparkContext} import org.apache.spark.rdd.{RDD, UnionPartition, UnionRDD} import org.apache.spark.util.ThreadUtils -import org.apache.spark.{Partition, SparkContext} - -import scala.reflect.ClassTag object ParallelUnionRDD { lazy val executorService = ThreadUtils.newDaemonFixedThreadPool(16, "ParallelUnionRDD") From fdac95bb06546b5d92b8c5dda5ee633f2221d347 Mon Sep 17 00:00:00 2001 From: "zhichao.li" Date: Mon, 29 Feb 2016 08:58:04 +0800 Subject: [PATCH 4/4] style --- .../scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala | 6 +++--- .../spark/sql/hive/execution/HiveTableScanSuite.scala | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala index fc075406fc61f..dacc6fa546e17 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala @@ -25,11 +25,11 @@ import org.apache.spark.{Partition, SparkContext} import org.apache.spark.rdd.{RDD, UnionPartition, UnionRDD} import org.apache.spark.util.ThreadUtils -object ParallelUnionRDD { +private[hive] object ParallelUnionRDD { lazy val executorService = ThreadUtils.newDaemonFixedThreadPool(16, "ParallelUnionRDD") } -class ParallelUnionRDD[T: ClassTag]( +private[hive] class ParallelUnionRDD[T: ClassTag]( sc: SparkContext, rdds: Seq[RDD[T]]) extends UnionRDD[T](sc, rdds){ @@ -39,7 +39,7 @@ class ParallelUnionRDD[T: ClassTag]( (rdd, ParallelUnionRDD.executorService.submit(new Callable[Array[Partition]] { override def call(): Array[Partition] = rdd.partitions })) - }.map {case(r, f) => (r, f.get())} + }.map { case(r, f) => (r, f.get()) } val array = new Array[Partition](rddPartitions.map(_._2.length).sum) var pos = 0 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala index e14c562908478..a67c22747efd0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala @@ -90,7 +90,6 @@ class HiveTableScanSuite extends HiveComparisonTest { assert(sql("select casesensitivecolname from spark_4959_2").head() === Row("hi")) } - test("Spark-11517: calc partitions in parallel") { val partitionNum = 500 val partitionTable = "combine"