From 404190221a788ebc3a0cbf5cb47cf532436ce965 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 18 Jan 2016 15:10:04 -0800 Subject: [PATCH 1/8] [SPARK-12882][SQL] simplify bucket tests and add more comments Right now, the bucket tests are kind of hard to understand, this PR simplifies them and add more commetns. Author: Wenchen Fan Closes #10813 from cloud-fan/bucket-comment. --- .../spark/sql/sources/BucketedReadSuite.scala | 56 +++++++++------ .../sql/sources/BucketedWriteSuite.scala | 68 ++++++++++++------- 2 files changed, 78 insertions(+), 46 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 58ecdd3b801d3..150d0c748631e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.{Column, DataFrame, DataFrameWriter, QueryTest, SQLC import org.apache.spark.sql.catalyst.expressions.UnsafeProjection import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.Exchange +import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.execution.joins.SortMergeJoin import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -61,15 +62,30 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet private val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1") private val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2") + /** + * A helper method to test the bucket read functionality using join. It will save `df1` and `df2` + * to hive tables, bucketed or not, according to the given bucket specifics. Next we will join + * these 2 tables, and firstly make sure the answer is corrected, and then check if the shuffle + * exists as user expected according to the `shuffleLeft` and `shuffleRight`. + */ private def testBucketing( - bucketing1: DataFrameWriter => DataFrameWriter, - bucketing2: DataFrameWriter => DataFrameWriter, + bucketSpecLeft: Option[BucketSpec], + bucketSpecRight: Option[BucketSpec], joinColumns: Seq[String], shuffleLeft: Boolean, shuffleRight: Boolean): Unit = { withTable("bucketed_table1", "bucketed_table2") { - bucketing1(df1.write.format("parquet")).saveAsTable("bucketed_table1") - bucketing2(df2.write.format("parquet")).saveAsTable("bucketed_table2") + def withBucket(writer: DataFrameWriter, bucketSpec: Option[BucketSpec]): DataFrameWriter = { + bucketSpec.map { spec => + writer.bucketBy( + spec.numBuckets, + spec.bucketColumnNames.head, + spec.bucketColumnNames.tail: _*) + }.getOrElse(writer) + } + + withBucket(df1.write.format("parquet"), bucketSpecLeft).saveAsTable("bucketed_table1") + withBucket(df2.write.format("parquet"), bucketSpecRight).saveAsTable("bucketed_table2") withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { val t1 = hiveContext.table("bucketed_table1") @@ -95,42 +111,42 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet } test("avoid shuffle when join 2 bucketed tables") { - val bucketing = (writer: DataFrameWriter) => writer.bucketBy(8, "i", "j") - testBucketing(bucketing, bucketing, Seq("i", "j"), shuffleLeft = false, shuffleRight = false) + val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) + testBucketing(bucketSpec, bucketSpec, Seq("i", "j"), shuffleLeft = false, shuffleRight = false) } // Enable it after fix https://issues.apache.org/jira/browse/SPARK-12704 ignore("avoid shuffle when join keys are a super-set of bucket keys") { - val bucketing = (writer: DataFrameWriter) => writer.bucketBy(8, "i") - testBucketing(bucketing, bucketing, Seq("i", "j"), shuffleLeft = false, shuffleRight = false) + val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil)) + testBucketing(bucketSpec, bucketSpec, Seq("i", "j"), shuffleLeft = false, shuffleRight = false) } test("only shuffle one side when join bucketed table and non-bucketed table") { - val bucketing = (writer: DataFrameWriter) => writer.bucketBy(8, "i", "j") - testBucketing(bucketing, identity, Seq("i", "j"), shuffleLeft = false, shuffleRight = true) + val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) + testBucketing(bucketSpec, None, Seq("i", "j"), shuffleLeft = false, shuffleRight = true) } test("only shuffle one side when 2 bucketed tables have different bucket number") { - val bucketing1 = (writer: DataFrameWriter) => writer.bucketBy(8, "i", "j") - val bucketing2 = (writer: DataFrameWriter) => writer.bucketBy(5, "i", "j") - testBucketing(bucketing1, bucketing2, Seq("i", "j"), shuffleLeft = false, shuffleRight = true) + val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Nil)) + val bucketSpec2 = Some(BucketSpec(5, Seq("i", "j"), Nil)) + testBucketing(bucketSpec1, bucketSpec2, Seq("i", "j"), shuffleLeft = false, shuffleRight = true) } test("only shuffle one side when 2 bucketed tables have different bucket keys") { - val bucketing1 = (writer: DataFrameWriter) => writer.bucketBy(8, "i") - val bucketing2 = (writer: DataFrameWriter) => writer.bucketBy(8, "j") - testBucketing(bucketing1, bucketing2, Seq("i"), shuffleLeft = false, shuffleRight = true) + val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Nil)) + val bucketSpec2 = Some(BucketSpec(8, Seq("j"), Nil)) + testBucketing(bucketSpec1, bucketSpec2, Seq("i"), shuffleLeft = false, shuffleRight = true) } test("shuffle when join keys are not equal to bucket keys") { - val bucketing = (writer: DataFrameWriter) => writer.bucketBy(8, "i") - testBucketing(bucketing, bucketing, Seq("j"), shuffleLeft = true, shuffleRight = true) + val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil)) + testBucketing(bucketSpec, bucketSpec, Seq("j"), shuffleLeft = true, shuffleRight = true) } test("shuffle when join 2 bucketed tables with bucketing disabled") { - val bucketing = (writer: DataFrameWriter) => writer.bucketBy(8, "i", "j") + val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { - testBucketing(bucketing, bucketing, Seq("i", "j"), shuffleLeft = true, shuffleRight = true) + testBucketing(bucketSpec, bucketSpec, Seq("i", "j"), shuffleLeft = true, shuffleRight = true) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index e812439bed9aa..dad1fc1273810 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -65,39 +65,55 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + /** + * A helper method to check the bucket write functionality in low level, i.e. check the written + * bucket files to see if the data are correct. User should pass in a data dir that these bucket + * files are written to, and the format of data(parquet, json, etc.), and the bucketing + * information. + */ private def testBucketing( dataDir: File, source: String, + numBuckets: Int, bucketCols: Seq[String], sortCols: Seq[String] = Nil): Unit = { val allBucketFiles = dataDir.listFiles().filterNot(f => f.getName.startsWith(".") || f.getName.startsWith("_") ) - val groupedBucketFiles = allBucketFiles.groupBy(f => BucketingUtils.getBucketId(f.getName).get) - assert(groupedBucketFiles.size <= 8) - - for ((bucketId, bucketFiles) <- groupedBucketFiles) { - for (bucketFilePath <- bucketFiles.map(_.getAbsolutePath)) { - val types = df.select((bucketCols ++ sortCols).map(col): _*).schema.map(_.dataType) - val columns = (bucketCols ++ sortCols).zip(types).map { - case (colName, dt) => col(colName).cast(dt) - } - val readBack = sqlContext.read.format(source).load(bucketFilePath).select(columns: _*) - if (sortCols.nonEmpty) { - checkAnswer(readBack.sort(sortCols.map(col): _*), readBack.collect()) - } + for (bucketFile <- allBucketFiles) { + val bucketId = BucketingUtils.getBucketId(bucketFile.getName).get + assert(bucketId >= 0 && bucketId < numBuckets) - val qe = readBack.select(bucketCols.map(col): _*).queryExecution - val rows = qe.toRdd.map(_.copy()).collect() - val getBucketId = UnsafeProjection.create( - HashPartitioning(qe.analyzed.output, 8).partitionIdExpression :: Nil, - qe.analyzed.output) + // We may loss the type information after write(e.g. json format doesn't keep schema + // information), here we get the types from the original dataframe. + val types = df.select((bucketCols ++ sortCols).map(col): _*).schema.map(_.dataType) + val columns = (bucketCols ++ sortCols).zip(types).map { + case (colName, dt) => col(colName).cast(dt) + } - for (row <- rows) { - val actualBucketId = getBucketId(row).getInt(0) - assert(actualBucketId == bucketId) - } + // Read the bucket file into a dataframe, so that it's easier to test. + val readBack = sqlContext.read.format(source) + .load(bucketFile.getAbsolutePath) + .select(columns: _*) + + // If we specified sort columns while writing bucket table, make sure the data in this + // bucket file is already sorted. + if (sortCols.nonEmpty) { + checkAnswer(readBack.sort(sortCols.map(col): _*), readBack.collect()) + } + + // Go through all rows in this bucket file, calculate bucket id according to bucket column + // values, and make sure it equals to the expected bucket id that inferred from file name. + val qe = readBack.select(bucketCols.map(col): _*).queryExecution + val rows = qe.toRdd.map(_.copy()).collect() + val getBucketId = UnsafeProjection.create( + HashPartitioning(qe.analyzed.output, numBuckets).partitionIdExpression :: Nil, + qe.analyzed.output) + + for (row <- rows) { + val actualBucketId = getBucketId(row).getInt(0) + assert(actualBucketId == bucketId) } } } @@ -113,7 +129,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle val tableDir = new File(hiveContext.warehousePath, "bucketed_table") for (i <- 0 until 5) { - testBucketing(new File(tableDir, s"i=$i"), source, Seq("j", "k")) + testBucketing(new File(tableDir, s"i=$i"), source, 8, Seq("j", "k")) } } } @@ -131,7 +147,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle val tableDir = new File(hiveContext.warehousePath, "bucketed_table") for (i <- 0 until 5) { - testBucketing(new File(tableDir, s"i=$i"), source, Seq("j"), Seq("k")) + testBucketing(new File(tableDir, s"i=$i"), source, 8, Seq("j"), Seq("k")) } } } @@ -146,7 +162,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle .saveAsTable("bucketed_table") val tableDir = new File(hiveContext.warehousePath, "bucketed_table") - testBucketing(tableDir, source, Seq("i", "j")) + testBucketing(tableDir, source, 8, Seq("i", "j")) } } } @@ -161,7 +177,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle .saveAsTable("bucketed_table") val tableDir = new File(hiveContext.warehousePath, "bucketed_table") - testBucketing(tableDir, source, Seq("i", "j"), Seq("k")) + testBucketing(tableDir, source, 8, Seq("i", "j"), Seq("k")) } } } From a973f483f6b819ed4ecac27ff5c064ea13a8dd71 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 18 Jan 2016 15:38:03 -0800 Subject: [PATCH 2/8] [SPARK-12814][DOCUMENT] Add deploy instructions for Python in flume integration doc This PR added instructions to get flume assembly jar for Python users in the flume integration page like Kafka doc. Author: Shixiong Zhu Closes #10746 from zsxwing/flume-doc. --- docs/streaming-flume-integration.md | 13 +++++++++++-- docs/streaming-kafka-integration.md | 4 ++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/docs/streaming-flume-integration.md b/docs/streaming-flume-integration.md index 383d954409ce4..e2d589b843f99 100644 --- a/docs/streaming-flume-integration.md +++ b/docs/streaming-flume-integration.md @@ -71,7 +71,16 @@ configuring Flume agents. cluster (Mesos, YARN or Spark Standalone), so that resource allocation can match the names and launch the receiver in the right machine. -3. **Deploying:** Package `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). +3. **Deploying:** As with any Spark applications, `spark-submit` is used to launch your application. However, the details are slightly different for Scala/Java applications and Python applications. + + For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). + + For Python applications which lack SBT/Maven project management, `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies can be directly added to `spark-submit` using `--packages` (see [Application Submission Guide](submitting-applications.html)). That is, + + ./bin/spark-submit --packages org.apache.spark:spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + + Alternatively, you can also download the JAR of the Maven artifact `spark-streaming-flume-assembly` from the + [Maven repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-flume-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`. ## Approach 2: Pull-based Approach using a Custom Sink Instead of Flume pushing data directly to Spark Streaming, this approach runs a custom Flume sink that allows the following. @@ -157,7 +166,7 @@ configuring Flume agents. Note that each input DStream can be configured to receive data from multiple sinks. -3. **Deploying:** Package `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). +3. **Deploying:** This is same as the first approach. diff --git a/docs/streaming-kafka-integration.md b/docs/streaming-kafka-integration.md index 9454714eeb9cb..015a2f1fa0bdc 100644 --- a/docs/streaming-kafka-integration.md +++ b/docs/streaming-kafka-integration.md @@ -71,7 +71,7 @@ Next, we discuss how to use this approach in your streaming application. ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... Alternatively, you can also download the JAR of the Maven artifact `spark-streaming-kafka-assembly` from the - [Maven repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`. + [Maven repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`. ## Approach 2: Direct Approach (No Receivers) This new receiver-less "direct" approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka's simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this is an experimental feature introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API. @@ -207,4 +207,4 @@ Next, we discuss how to use this approach in your streaming application. Another thing to note is that since this approach does not use Receivers, the standard receiver-related (that is, [configurations](configuration.html) of the form `spark.streaming.receiver.*` ) will not apply to the input DStreams created by this approach (will apply to other input DStreams though). Instead, use the [configurations](configuration.html) `spark.streaming.kafka.*`. An important one is `spark.streaming.kafka.maxRatePerPartition` which is the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API. -3. **Deploying:** This is same as the first approach, for Scala, Java and Python. +3. **Deploying:** This is same as the first approach. From 4bcea1b8595424678aa6c92d66ba08c92e0fefe5 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 18 Jan 2016 16:26:52 -0800 Subject: [PATCH 3/8] Revert "[SPARK-12829] Turn Java style checker on" This reverts commit 591c88c9e2a6c2e2ca84f1b66c635f198a16d112. `lint-java` doesn't work on a machine with a clean Maven cache. --- dev/run-tests.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dev/run-tests.py b/dev/run-tests.py index 5b717895106b3..8f47728f206c3 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -538,7 +538,8 @@ def main(): or f.endswith("checkstyle.xml") or f.endswith("checkstyle-suppressions.xml") for f in changed_files): - run_java_style_checks() + # run_java_style_checks() + pass if not changed_files or any(f.endswith(".py") for f in changed_files): run_python_style_checks() if not changed_files or any(f.endswith(".R") for f in changed_files): From 721845c1b64fd6e3b911bd77c94e01dc4e5fd102 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 18 Jan 2016 16:50:05 -0800 Subject: [PATCH 4/8] [SPARK-12894][DOCUMENT] Add deploy instructions for Python in Kinesis integration doc This PR added instructions to get Kinesis assembly jar for Python users in the Kinesis integration page like Kafka doc. Author: Shixiong Zhu Closes #10822 from zsxwing/kinesis-doc. --- docs/streaming-kinesis-integration.md | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md index 07194b0a6b758..5f5e2b9087cc9 100644 --- a/docs/streaming-kinesis-integration.md +++ b/docs/streaming-kinesis-integration.md @@ -15,12 +15,13 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m #### Configuring Spark Streaming Application -1. **Linking:** In your SBT/Maven project definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information). +1. **Linking:** For Scala/Java applications using SBT/Maven project definitions, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information). groupId = org.apache.spark artifactId = spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} version = {{site.SPARK_VERSION_SHORT}} + For Python applications, you will have to add this above library and its dependencies when deploying your application. See the *Deploying* subsection below. **Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your application.** 2. **Programming:** In the streaming application code, import `KinesisUtils` and create the input DStream of byte array as follows: @@ -116,7 +117,16 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m In other versions of the API, you can also specify the AWS access key and secret key directly. -3. **Deploying:** Package `spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). +3. **Deploying:** As with any Spark applications, `spark-submit` is used to launch your application. However, the details are slightly different for Scala/Java applications and Python applications. + + For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). + + For Python applications which lack SBT/Maven project management, `spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its dependencies can be directly added to `spark-submit` using `--packages` (see [Application Submission Guide](submitting-applications.html)). That is, + + ./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + + Alternatively, you can also download the JAR of the Maven artifact `spark-streaming-kinesis-asl-assembly` from the + [Maven repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kinesis-asl-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`. *Points to remember at runtime:* From 39ac56fc60734d0e095314fc38a7b36fbb4c80f7 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 18 Jan 2016 17:10:32 -0800 Subject: [PATCH 5/8] [SPARK-12889][SQL] Rename ParserDialect -> ParserInterface. Based on discussions in #10801, I'm submitting a pull request to rename ParserDialect to ParserInterface. Author: Reynold Xin Closes #10817 from rxin/SPARK-12889. --- .../apache/spark/sql/catalyst/AbstractSparkSQLParser.scala | 2 +- .../main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala | 2 +- .../catalyst/{ParserDialect.scala => ParserInterface.scala} | 2 +- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 2 +- .../scala/org/apache/spark/sql/execution/SparkSQLParser.scala | 4 ++-- .../apache/spark/sql/execution/datasources/DDLParser.scala | 4 ++-- .../main/scala/org/apache/spark/sql/hive/HiveContext.scala | 4 ++-- 7 files changed, 10 insertions(+), 10 deletions(-) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/{ParserDialect.scala => ParserInterface.scala} (98%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala index 9443369808984..38fa5cb585ee7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala @@ -26,7 +26,7 @@ import scala.util.parsing.input.CharArrayReader.EofCh import org.apache.spark.sql.catalyst.plans.logical._ private[sql] abstract class AbstractSparkSQLParser - extends StandardTokenParsers with PackratParsers with ParserDialect { + extends StandardTokenParsers with PackratParsers with ParserInterface { def parsePlan(input: String): LogicalPlan = synchronized { // Initialize the Keywords. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala index 4bc721d0b2074..5fb41f7e4bf8c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala @@ -33,7 +33,7 @@ import org.apache.spark.util.random.RandomSampler /** * This class translates SQL to Catalyst [[LogicalPlan]]s or [[Expression]]s. */ -private[sql] class CatalystQl(val conf: ParserConf = SimpleParserConf()) extends ParserDialect { +private[sql] class CatalystQl(val conf: ParserConf = SimpleParserConf()) extends ParserInterface { object Token { def unapply(node: ASTNode): Some[(String, List[ASTNode])] = { CurrentOrigin.setPosition(node.line, node.positionInLine) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserInterface.scala similarity index 98% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserInterface.scala index 3aa141af63079..24ec452c4d2ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserInterface.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan /** * Interface for a parser. */ -trait ParserDialect { +trait ParserInterface { /** Creates LogicalPlan for a given SQL string. */ def parsePlan(sqlText: String): LogicalPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index b8c8c78b91a5e..147e3557b632b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -203,7 +203,7 @@ class SQLContext private[sql]( protected[sql] lazy val optimizer: Optimizer = new SparkOptimizer(this) @transient - protected[sql] val sqlParser: ParserDialect = new SparkSQLParser(new SparkQl(conf)) + protected[sql] val sqlParser: ParserInterface = new SparkSQLParser(new SparkQl(conf)) @transient protected[sql] val ddlParser: DDLParser = new DDLParser(sqlParser) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSQLParser.scala index 1af2c756cdc5a..d2d8271563726 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSQLParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSQLParser.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import scala.util.parsing.combinator.RegexParsers -import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, ParserDialect, TableIdentifier} +import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, ParserInterface, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -33,7 +33,7 @@ import org.apache.spark.sql.types.StringType * parameter because this allows us to return a different dialect if we * have to. */ -class SparkSQLParser(fallback: => ParserDialect) extends AbstractSparkSQLParser { +class SparkSQLParser(fallback: => ParserInterface) extends AbstractSparkSQLParser { override def parseExpression(sql: String): Expression = fallback.parseExpression(sql) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala index 4dea947f6a849..f4766b037027d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala @@ -22,7 +22,7 @@ import scala.util.matching.Regex import org.apache.spark.Logging import org.apache.spark.sql.SaveMode -import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, ParserDialect, TableIdentifier} +import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, ParserInterface, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -32,7 +32,7 @@ import org.apache.spark.sql.types._ /** * A parser for foreign DDL commands. */ -class DDLParser(fallback: => ParserDialect) +class DDLParser(fallback: => ParserInterface) extends AbstractSparkSQLParser with DataTypeParser with Logging { override def parseExpression(sql: String): Expression = fallback.parseExpression(sql) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index dd536b78c7242..eaca3c9269bb7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -42,7 +42,7 @@ import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql._ import org.apache.spark.sql.SQLConf.SQLConfEntry import org.apache.spark.sql.SQLConf.SQLConfEntry._ -import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect} +import org.apache.spark.sql.catalyst.{InternalRow, ParserInterface} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback @@ -546,7 +546,7 @@ class HiveContext private[hive]( } @transient - protected[sql] override val sqlParser: ParserDialect = { + protected[sql] override val sqlParser: ParserInterface = { new SparkSQLParser(new ExtendedHiveQlParser(this)) } From 323d51f1dadf733e413203d678cb3f76e4d68981 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 18 Jan 2016 17:29:54 -0800 Subject: [PATCH 6/8] [SPARK-12700] [SQL] embed condition into SMJ and BroadcastHashJoin Currently SortMergeJoin and BroadcastHashJoin do not support condition, the need a followed Filter for that, the result projection to generate UnsafeRow could be very expensive if they generate lots of rows and could be filtered mostly by condition. This PR brings the support of condition for SortMergeJoin and BroadcastHashJoin, just like other outer joins do. This could improve the performance of Q72 by 7x (from 120s to 16.5s). Author: Davies Liu Closes #10653 from davies/filter_join. --- .../spark/sql/execution/SparkStrategies.scala | 24 ++---- .../execution/joins/BroadcastHashJoin.scala | 1 + .../spark/sql/execution/joins/HashJoin.scala | 81 +++++++++++-------- .../sql/execution/joins/HashOuterJoin.scala | 5 +- .../sql/execution/joins/SortMergeJoin.scala | 46 +++++++---- .../sql/execution/joins/InnerJoinSuite.scala | 11 +-- 6 files changed, 96 insertions(+), 72 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 910519d0e6814..df0f730499211 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight} import org.apache.spark.sql.{execution, Strategy} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -77,33 +78,22 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ object EquiJoinSelection extends Strategy with PredicateHelper { - private[this] def makeBroadcastHashJoin( - leftKeys: Seq[Expression], - rightKeys: Seq[Expression], - left: LogicalPlan, - right: LogicalPlan, - condition: Option[Expression], - side: joins.BuildSide): Seq[SparkPlan] = { - val broadcastHashJoin = execution.joins.BroadcastHashJoin( - leftKeys, rightKeys, side, planLater(left), planLater(right)) - condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil - } - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { // --- Inner joins -------------------------------------------------------------------------- case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, CanBroadcast(right)) => - makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildRight) + joins.BroadcastHashJoin( + leftKeys, rightKeys, BuildRight, condition, planLater(left), planLater(right)) :: Nil case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, CanBroadcast(left), right) => - makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildLeft) + joins.BroadcastHashJoin( + leftKeys, rightKeys, BuildLeft, condition, planLater(left), planLater(right)) :: Nil case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) if RowOrdering.isOrderable(leftKeys) => - val mergeJoin = - joins.SortMergeJoin(leftKeys, rightKeys, planLater(left), planLater(right)) - condition.map(Filter(_, mergeJoin)).getOrElse(mergeJoin) :: Nil + joins.SortMergeJoin( + leftKeys, rightKeys, condition, planLater(left), planLater(right)) :: Nil // --- Outer joins -------------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index 0a818cc2c2a27..c9ea579b5e809 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -39,6 +39,7 @@ case class BroadcastHashJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], buildSide: BuildSide, + condition: Option[Expression], left: SparkPlan, right: SparkPlan) extends BinaryNode with HashJoin { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 7f9d9daa5ab20..8ef854001f4de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.joins +import java.util.NoSuchElementException + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan @@ -29,6 +31,7 @@ trait HashJoin { val leftKeys: Seq[Expression] val rightKeys: Seq[Expression] val buildSide: BuildSide + val condition: Option[Expression] val left: SparkPlan val right: SparkPlan @@ -50,6 +53,12 @@ trait HashJoin { protected def streamSideKeyGenerator: Projection = UnsafeProjection.create(streamedKeys, streamedPlan.output) + @transient private[this] lazy val boundCondition = if (condition.isDefined) { + newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output) + } else { + (r: InternalRow) => true + } + protected def hashJoin( streamIter: Iterator[InternalRow], numStreamRows: LongSQLMetric, @@ -68,44 +77,52 @@ trait HashJoin { private[this] val joinKeys = streamSideKeyGenerator - override final def hasNext: Boolean = - (currentMatchPosition != -1 && currentMatchPosition < currentHashMatches.size) || - (streamIter.hasNext && fetchNext()) + override final def hasNext: Boolean = { + while (true) { + // check if it's end of current matches + if (currentHashMatches != null && currentMatchPosition == currentHashMatches.length) { + currentHashMatches = null + currentMatchPosition = -1 + } - override final def next(): InternalRow = { - val ret = buildSide match { - case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition)) - case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow) - } - currentMatchPosition += 1 - numOutputRows += 1 - resultProjection(ret) - } + // find the next match + while (currentHashMatches == null && streamIter.hasNext) { + currentStreamedRow = streamIter.next() + numStreamRows += 1 + val key = joinKeys(currentStreamedRow) + if (!key.anyNull) { + currentHashMatches = hashedRelation.get(key) + if (currentHashMatches != null) { + currentMatchPosition = 0 + } + } + } + if (currentHashMatches == null) { + return false + } - /** - * Searches the streamed iterator for the next row that has at least one match in hashtable. - * - * @return true if the search is successful, and false if the streamed iterator runs out of - * tuples. - */ - private final def fetchNext(): Boolean = { - currentHashMatches = null - currentMatchPosition = -1 - - while (currentHashMatches == null && streamIter.hasNext) { - currentStreamedRow = streamIter.next() - numStreamRows += 1 - val key = joinKeys(currentStreamedRow) - if (!key.anyNull) { - currentHashMatches = hashedRelation.get(key) + // found some matches + buildSide match { + case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition)) + case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow) + } + if (boundCondition(joinRow)) { + return true + } else { + currentMatchPosition += 1 } } + false // unreachable + } - if (currentHashMatches == null) { - false + override final def next(): InternalRow = { + // next() could be called without calling hasNext() + if (hasNext) { + currentMatchPosition += 1 + numOutputRows += 1 + resultProjection(joinRow) } else { - currentMatchPosition = 0 - true + throw new NoSuchElementException } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala index 6d464d6946b78..9e614309de129 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala @@ -78,8 +78,11 @@ trait HashOuterJoin { @transient private[this] lazy val leftNullRow = new GenericInternalRow(left.output.length) @transient private[this] lazy val rightNullRow = new GenericInternalRow(right.output.length) - @transient private[this] lazy val boundCondition = + @transient private[this] lazy val boundCondition = if (condition.isDefined) { newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output) + } else { + (row: InternalRow) => true + } // TODO we need to rewrite all of the iterators with our own implementation instead of the Scala // iterator for performance purpose. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala index 812f881d06fb8..322a954b4f792 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics} case class SortMergeJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], + condition: Option[Expression], left: SparkPlan, right: SparkPlan) extends BinaryNode { @@ -64,6 +65,13 @@ case class SortMergeJoin( val numOutputRows = longMetric("numOutputRows") left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => + val boundCondition: (InternalRow) => Boolean = { + condition.map { cond => + newPredicate(cond, left.output ++ right.output) + }.getOrElse { + (r: InternalRow) => true + } + } new RowIterator { // The projection used to extract keys from input rows of the left child. private[this] val leftKeyGenerator = UnsafeProjection.create(leftKeys, left.output) @@ -89,26 +97,34 @@ case class SortMergeJoin( private[this] val resultProjection: (InternalRow) => InternalRow = UnsafeProjection.create(schema) + if (smjScanner.findNextInnerJoinRows()) { + currentRightMatches = smjScanner.getBufferedMatches + currentLeftRow = smjScanner.getStreamedRow + currentMatchIdx = 0 + } + override def advanceNext(): Boolean = { - if (currentMatchIdx == -1 || currentMatchIdx == currentRightMatches.length) { - if (smjScanner.findNextInnerJoinRows()) { - currentRightMatches = smjScanner.getBufferedMatches - currentLeftRow = smjScanner.getStreamedRow - currentMatchIdx = 0 - } else { - currentRightMatches = null - currentLeftRow = null - currentMatchIdx = -1 + while (currentMatchIdx >= 0) { + if (currentMatchIdx == currentRightMatches.length) { + if (smjScanner.findNextInnerJoinRows()) { + currentRightMatches = smjScanner.getBufferedMatches + currentLeftRow = smjScanner.getStreamedRow + currentMatchIdx = 0 + } else { + currentRightMatches = null + currentLeftRow = null + currentMatchIdx = -1 + return false + } } - } - if (currentLeftRow != null) { joinRow(currentLeftRow, currentRightMatches(currentMatchIdx)) currentMatchIdx += 1 - numOutputRows += 1 - true - } else { - false + if (boundCondition(joinRow)) { + numOutputRows += 1 + return true + } } + false } override def getRow: InternalRow = resultProjection(joinRow) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala index 42fadaa8e2215..ab81b702596af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.joins -import org.apache.spark.sql.{execution, DataFrame, Row, SQLConf} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys import org.apache.spark.sql.catalyst.plans.Inner @@ -25,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.execution._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StringType, StructType} +import org.apache.spark.sql.{DataFrame, Row, SQLConf} class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { import testImplicits.localSeqToDataFrameHolder @@ -88,9 +88,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { leftPlan: SparkPlan, rightPlan: SparkPlan, side: BuildSide) = { - val broadcastHashJoin = - execution.joins.BroadcastHashJoin(leftKeys, rightKeys, side, leftPlan, rightPlan) - boundCondition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) + joins.BroadcastHashJoin(leftKeys, rightKeys, side, boundCondition, leftPlan, rightPlan) } def makeSortMergeJoin( @@ -100,9 +98,8 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { leftPlan: SparkPlan, rightPlan: SparkPlan) = { val sortMergeJoin = - execution.joins.SortMergeJoin(leftKeys, rightKeys, leftPlan, rightPlan) - val filteredJoin = boundCondition.map(Filter(_, sortMergeJoin)).getOrElse(sortMergeJoin) - EnsureRequirements(sqlContext).apply(filteredJoin) + joins.SortMergeJoin(leftKeys, rightKeys, boundCondition, leftPlan, rightPlan) + EnsureRequirements(sqlContext).apply(sortMergeJoin) } test(s"$testName using BroadcastHashJoin (build=left)") { From 2b5d11f34d73eb7117c0c4668c1abb27dcc3a403 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 18 Jan 2016 19:22:29 -0800 Subject: [PATCH 7/8] [SPARK-12885][MINOR] Rename 3 fields in ShuffleWriteMetrics This is a small step in implementing SPARK-10620, which migrates TaskMetrics to accumulators. This patch is strictly a cleanup patch and introduces no change in functionality. It literally just renames 3 fields for consistency. Today we have: ``` inputMetrics.recordsRead outputMetrics.bytesWritten shuffleReadMetrics.localBlocksFetched ... shuffleWriteMetrics.shuffleRecordsWritten shuffleWriteMetrics.shuffleBytesWritten shuffleWriteMetrics.shuffleWriteTime ``` The shuffle write ones are kind of redundant. We can drop the `shuffle` part in the method names. I added backward compatible (but deprecated) methods with the old names. Parent PR: #10717 Author: Andrew Or Closes #10811 from andrewor14/rename-things. --- .../sort/BypassMergeSortShuffleWriter.java | 4 +- .../shuffle/sort/ShuffleExternalSorter.java | 4 +- .../shuffle/sort/UnsafeShuffleWriter.java | 6 +-- .../storage/TimeTrackingOutputStream.java | 10 ++-- .../spark/executor/ShuffleWriteMetrics.scala | 37 ++++++++----- .../spark/scheduler/SparkListener.scala | 2 +- .../shuffle/FileShuffleBlockResolver.scala | 2 +- .../shuffle/sort/SortShuffleWriter.scala | 2 +- .../status/api/v1/AllStagesResource.scala | 12 ++--- .../spark/storage/DiskBlockObjectWriter.scala | 12 ++--- .../apache/spark/ui/exec/ExecutorsTab.scala | 2 +- .../spark/ui/jobs/JobProgressListener.scala | 8 +-- .../org/apache/spark/ui/jobs/StagePage.scala | 14 ++--- .../org/apache/spark/util/JsonProtocol.scala | 13 ++--- .../collection/ExternalAppendOnlyMap.scala | 4 +- .../util/collection/ExternalSorter.scala | 4 +- .../sort/UnsafeShuffleWriterSuite.java | 20 +++---- .../scala/org/apache/spark/ShuffleSuite.scala | 4 +- .../metrics/InputOutputMetricsSuite.scala | 2 +- .../spark/scheduler/SparkListenerSuite.scala | 2 +- .../BypassMergeSortShuffleWriterSuite.scala | 8 +-- .../storage/DiskBlockObjectWriterSuite.scala | 54 +++++++++---------- .../ui/jobs/JobProgressListenerSuite.scala | 2 +- .../apache/spark/util/JsonProtocolSuite.scala | 12 ++--- 24 files changed, 126 insertions(+), 114 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 56cdc22f36261..a06dc1ce91542 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -143,7 +143,7 @@ public void write(Iterator> records) throws IOException { // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be // included in the shuffle write time. - writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime); + writeMetrics.incWriteTime(System.nanoTime() - openStartTime); while (records.hasNext()) { final Product2 record = records.next(); @@ -203,7 +203,7 @@ private long[] writePartitionedFile(File outputFile) throws IOException { threwException = false; } finally { Closeables.close(out, threwException); - writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime); + writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); } partitionWriters = null; return lengths; diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index 9affff80143d7..2c84de5bf2a5a 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -233,8 +233,8 @@ private void writeSortedFile(boolean isLastFile) throws IOException { // Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`. // Consistent with ExternalSorter, we do not count this IO towards shuffle write time. // This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this. - writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten()); - taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten()); + writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten()); + taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten()); } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 744c3008ca50e..c8cc7056975ec 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -298,8 +298,8 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti // final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs // to be counted as shuffle write, but this will lead to double-counting of the final // SpillInfo's bytes. - writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length()); - writeMetrics.incShuffleBytesWritten(outputFile.length()); + writeMetrics.decBytesWritten(spills[spills.length - 1].file.length()); + writeMetrics.incBytesWritten(outputFile.length()); return partitionLengths; } } catch (IOException e) { @@ -411,7 +411,7 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th spillInputChannelPositions[i] += actualBytesTransferred; bytesToTransfer -= actualBytesTransferred; } - writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime); + writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); bytesWrittenToMergedFile += partitionLengthInSpill; partitionLengths[partition] += partitionLengthInSpill; } diff --git a/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java b/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java index dc2aa30466cc6..5d0555a8c28e1 100644 --- a/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java +++ b/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java @@ -42,34 +42,34 @@ public TimeTrackingOutputStream(ShuffleWriteMetrics writeMetrics, OutputStream o public void write(int b) throws IOException { final long startTime = System.nanoTime(); outputStream.write(b); - writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime); + writeMetrics.incWriteTime(System.nanoTime() - startTime); } @Override public void write(byte[] b) throws IOException { final long startTime = System.nanoTime(); outputStream.write(b); - writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime); + writeMetrics.incWriteTime(System.nanoTime() - startTime); } @Override public void write(byte[] b, int off, int len) throws IOException { final long startTime = System.nanoTime(); outputStream.write(b, off, len); - writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime); + writeMetrics.incWriteTime(System.nanoTime() - startTime); } @Override public void flush() throws IOException { final long startTime = System.nanoTime(); outputStream.flush(); - writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime); + writeMetrics.incWriteTime(System.nanoTime() - startTime); } @Override public void close() throws IOException { final long startTime = System.nanoTime(); outputStream.close(); - writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime); + writeMetrics.incWriteTime(System.nanoTime() - startTime); } } diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala index 469ebe26c7b56..24795f860087f 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala @@ -26,28 +26,39 @@ import org.apache.spark.annotation.DeveloperApi */ @DeveloperApi class ShuffleWriteMetrics extends Serializable { + /** * Number of bytes written for the shuffle by this task */ - @volatile private var _shuffleBytesWritten: Long = _ - def shuffleBytesWritten: Long = _shuffleBytesWritten - private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value - private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value + @volatile private var _bytesWritten: Long = _ + def bytesWritten: Long = _bytesWritten + private[spark] def incBytesWritten(value: Long) = _bytesWritten += value + private[spark] def decBytesWritten(value: Long) = _bytesWritten -= value /** * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds */ - @volatile private var _shuffleWriteTime: Long = _ - def shuffleWriteTime: Long = _shuffleWriteTime - private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value - private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value + @volatile private var _writeTime: Long = _ + def writeTime: Long = _writeTime + private[spark] def incWriteTime(value: Long) = _writeTime += value + private[spark] def decWriteTime(value: Long) = _writeTime -= value /** * Total number of records written to the shuffle by this task */ - @volatile private var _shuffleRecordsWritten: Long = _ - def shuffleRecordsWritten: Long = _shuffleRecordsWritten - private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value - private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value - private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value + @volatile private var _recordsWritten: Long = _ + def recordsWritten: Long = _recordsWritten + private[spark] def incRecordsWritten(value: Long) = _recordsWritten += value + private[spark] def decRecordsWritten(value: Long) = _recordsWritten -= value + private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value + + // Legacy methods for backward compatibility. + // TODO: remove these once we make this class private. + @deprecated("use bytesWritten instead", "2.0.0") + def shuffleBytesWritten: Long = bytesWritten + @deprecated("use writeTime instead", "2.0.0") + def shuffleWriteTime: Long = writeTime + @deprecated("use recordsWritten instead", "2.0.0") + def shuffleRecordsWritten: Long = recordsWritten + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 3130a65240a99..f5267f58c2e42 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -271,7 +271,7 @@ class StatsReportListener extends SparkListener with Logging { // Shuffle write showBytesDistribution("shuffle bytes written:", - (_, metric) => metric.shuffleWriteMetrics.map(_.shuffleBytesWritten), taskInfoMetrics) + (_, metric) => metric.shuffleWriteMetrics.map(_.bytesWritten), taskInfoMetrics) // Fetch & I/O showMillisDistribution("fetch wait time:", diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala index 294e16cde1931..2970968f0bd47 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala @@ -90,7 +90,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, so should be included in the shuffle write time. - writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime) + writeMetrics.incWriteTime(System.nanoTime - openStartTime) override def releaseWriters(success: Boolean) { shuffleState.completedMapTasks.add(mapId) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index f83cf8859e581..5c5a5f5a4cb6a 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -94,7 +94,7 @@ private[spark] class SortShuffleWriter[K, V, C]( val startTime = System.nanoTime() sorter.stop() context.taskMetrics.shuffleWriteMetrics.foreach( - _.incShuffleWriteTime(System.nanoTime - startTime)) + _.incWriteTime(System.nanoTime - startTime)) sorter = null } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index 341ae782362a0..078718ba11260 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -214,9 +214,9 @@ private[v1] object AllStagesResource { raw.shuffleWriteMetrics } def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions( - writeBytes = submetricQuantiles(_.shuffleBytesWritten), - writeRecords = submetricQuantiles(_.shuffleRecordsWritten), - writeTime = submetricQuantiles(_.shuffleWriteTime) + writeBytes = submetricQuantiles(_.bytesWritten), + writeRecords = submetricQuantiles(_.recordsWritten), + writeTime = submetricQuantiles(_.writeTime) ) }.metricOption @@ -283,9 +283,9 @@ private[v1] object AllStagesResource { def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = { new ShuffleWriteMetrics( - bytesWritten = internal.shuffleBytesWritten, - writeTime = internal.shuffleWriteTime, - recordsWritten = internal.shuffleRecordsWritten + bytesWritten = internal.bytesWritten, + writeTime = internal.writeTime, + recordsWritten = internal.recordsWritten ) } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index e36a367323b20..c34d49c0d9061 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -102,7 +102,7 @@ private[spark] class DiskBlockObjectWriter( objOut.flush() val start = System.nanoTime() fos.getFD.sync() - writeMetrics.incShuffleWriteTime(System.nanoTime() - start) + writeMetrics.incWriteTime(System.nanoTime() - start) } } { objOut.close() @@ -132,7 +132,7 @@ private[spark] class DiskBlockObjectWriter( close() finalPosition = file.length() // In certain compression codecs, more bytes are written after close() is called - writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition) + writeMetrics.incBytesWritten(finalPosition - reportedPosition) } else { finalPosition = file.length() } @@ -152,8 +152,8 @@ private[spark] class DiskBlockObjectWriter( // truncating the file to its initial position. try { if (initialized) { - writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition) - writeMetrics.decShuffleRecordsWritten(numRecordsWritten) + writeMetrics.decBytesWritten(reportedPosition - initialPosition) + writeMetrics.decRecordsWritten(numRecordsWritten) objOut.flush() bs.flush() close() @@ -201,7 +201,7 @@ private[spark] class DiskBlockObjectWriter( */ def recordWritten(): Unit = { numRecordsWritten += 1 - writeMetrics.incShuffleRecordsWritten(1) + writeMetrics.incRecordsWritten(1) if (numRecordsWritten % 32 == 0) { updateBytesWritten() @@ -226,7 +226,7 @@ private[spark] class DiskBlockObjectWriter( */ private def updateBytesWritten() { val pos = channel.position() - writeMetrics.incShuffleBytesWritten(pos - reportedPosition) + writeMetrics.incBytesWritten(pos - reportedPosition) reportedPosition = pos } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 2d955a66601ee..160d7a4dff2dc 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -129,7 +129,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp } metrics.shuffleWriteMetrics.foreach { shuffleWrite => executorToShuffleWrite(eid) = - executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten + executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.bytesWritten } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index ca37829216f22..4a9f8b30525fe 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -426,14 +426,14 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val execSummary = stageData.executorSummary.getOrElseUpdate(execId, new ExecutorSummary) val shuffleWriteDelta = - (taskMetrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L) - - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L)) + (taskMetrics.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L) + - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.bytesWritten).getOrElse(0L)) stageData.shuffleWriteBytes += shuffleWriteDelta execSummary.shuffleWrite += shuffleWriteDelta val shuffleWriteRecordsDelta = - (taskMetrics.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L) - - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleRecordsWritten).getOrElse(0L)) + (taskMetrics.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L) + - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.recordsWritten).getOrElse(0L)) stageData.shuffleWriteRecords += shuffleWriteRecordsDelta execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 6d4066a870cdd..914f6183cc2a4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -500,11 +500,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { getFormattedSizeQuantiles(shuffleReadRemoteSizes) val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble + metrics.get.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L).toDouble } val shuffleWriteRecords = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L).toDouble + metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble } val shuffleWriteQuantiles = Shuffle Write Size / Records +: @@ -619,7 +619,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val shuffleReadTimeProportion = toProportion(shuffleReadTime) val shuffleWriteTime = (metricsOpt.flatMap(_.shuffleWriteMetrics - .map(_.shuffleWriteTime)).getOrElse(0L) / 1e6).toLong + .map(_.writeTime)).getOrElse(0L) / 1e6).toLong val shuffleWriteTimeProportion = toProportion(shuffleWriteTime) val serializationTime = metricsOpt.map(_.resultSerializationTime).getOrElse(0L) @@ -930,13 +930,13 @@ private[ui] class TaskDataSource( val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("") val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics) - val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten).getOrElse(0L) + val shuffleWriteSortable = maybeShuffleWrite.map(_.bytesWritten).getOrElse(0L) val shuffleWriteReadable = maybeShuffleWrite - .map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("") + .map(m => s"${Utils.bytesToString(m.bytesWritten)}").getOrElse("") val shuffleWriteRecords = maybeShuffleWrite - .map(_.shuffleRecordsWritten.toString).getOrElse("") + .map(_.recordsWritten.toString).getOrElse("") - val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime) + val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.writeTime) val writeTimeSortable = maybeWriteTime.getOrElse(0L) val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms => if (ms == 0) "" else UIUtils.formatDuration(ms) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index a6460bc8b8202..b88221a249eb8 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -331,10 +331,11 @@ private[spark] object JsonProtocol { ("Total Records Read" -> shuffleReadMetrics.recordsRead) } + // TODO: Drop the redundant "Shuffle" since it's inconsistent with related classes. def shuffleWriteMetricsToJson(shuffleWriteMetrics: ShuffleWriteMetrics): JValue = { - ("Shuffle Bytes Written" -> shuffleWriteMetrics.shuffleBytesWritten) ~ - ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime) ~ - ("Shuffle Records Written" -> shuffleWriteMetrics.shuffleRecordsWritten) + ("Shuffle Bytes Written" -> shuffleWriteMetrics.bytesWritten) ~ + ("Shuffle Write Time" -> shuffleWriteMetrics.writeTime) ~ + ("Shuffle Records Written" -> shuffleWriteMetrics.recordsWritten) } def inputMetricsToJson(inputMetrics: InputMetrics): JValue = { @@ -752,9 +753,9 @@ private[spark] object JsonProtocol { def shuffleWriteMetricsFromJson(json: JValue): ShuffleWriteMetrics = { val metrics = new ShuffleWriteMetrics - metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long]) - metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long]) - metrics.setShuffleRecordsWritten((json \ "Shuffle Records Written") + metrics.incBytesWritten((json \ "Shuffle Bytes Written").extract[Long]) + metrics.incWriteTime((json \ "Shuffle Write Time").extract[Long]) + metrics.setRecordsWritten((json \ "Shuffle Records Written") .extractOpt[Long].getOrElse(0)) metrics } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 4a44481cf4e14..ff9dad7d38bf0 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -193,8 +193,8 @@ class ExternalAppendOnlyMap[K, V, C]( val w = writer writer = null w.commitAndClose() - _diskBytesSpilled += curWriteMetrics.shuffleBytesWritten - batchSizes.append(curWriteMetrics.shuffleBytesWritten) + _diskBytesSpilled += curWriteMetrics.bytesWritten + batchSizes.append(curWriteMetrics.bytesWritten) objectsWritten = 0 } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 63ba954a7fa7e..4c7416e00b004 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -262,8 +262,8 @@ private[spark] class ExternalSorter[K, V, C]( val w = writer writer = null w.commitAndClose() - _diskBytesSpilled += spillMetrics.shuffleBytesWritten - batchSizes.append(spillMetrics.shuffleBytesWritten) + _diskBytesSpilled += spillMetrics.bytesWritten + batchSizes.append(spillMetrics.bytesWritten) spillMetrics = null objectsWritten = 0 } diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 5fe64bde3604a..625fdd57eb5d4 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -279,8 +279,8 @@ public void writeEmptyIterator() throws Exception { assertTrue(mapStatus.isDefined()); assertTrue(mergedOutputFile.exists()); assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile); - assertEquals(0, taskMetrics.shuffleWriteMetrics().get().shuffleRecordsWritten()); - assertEquals(0, taskMetrics.shuffleWriteMetrics().get().shuffleBytesWritten()); + assertEquals(0, taskMetrics.shuffleWriteMetrics().get().recordsWritten()); + assertEquals(0, taskMetrics.shuffleWriteMetrics().get().bytesWritten()); assertEquals(0, taskMetrics.diskBytesSpilled()); assertEquals(0, taskMetrics.memoryBytesSpilled()); } @@ -311,10 +311,10 @@ public void writeWithoutSpilling() throws Exception { HashMultiset.create(readRecordsFromFile())); assertSpillFilesWereCleanedUp(); ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get(); - assertEquals(dataToWrite.size(), shuffleWriteMetrics.shuffleRecordsWritten()); + assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten()); assertEquals(0, taskMetrics.diskBytesSpilled()); assertEquals(0, taskMetrics.memoryBytesSpilled()); - assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.shuffleBytesWritten()); + assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten()); } private void testMergingSpills( @@ -354,11 +354,11 @@ private void testMergingSpills( assertEquals(HashMultiset.create(dataToWrite), HashMultiset.create(readRecordsFromFile())); assertSpillFilesWereCleanedUp(); ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get(); - assertEquals(dataToWrite.size(), shuffleWriteMetrics.shuffleRecordsWritten()); + assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten()); assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L)); assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length())); assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L)); - assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.shuffleBytesWritten()); + assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten()); } @Test @@ -416,11 +416,11 @@ public void writeEnoughDataToTriggerSpill() throws Exception { readRecordsFromFile(); assertSpillFilesWereCleanedUp(); ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get(); - assertEquals(dataToWrite.size(), shuffleWriteMetrics.shuffleRecordsWritten()); + assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten()); assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L)); assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length())); assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L)); - assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.shuffleBytesWritten()); + assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten()); } @Test @@ -437,11 +437,11 @@ public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exce readRecordsFromFile(); assertSpillFilesWereCleanedUp(); ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get(); - assertEquals(dataToWrite.size(), shuffleWriteMetrics.shuffleRecordsWritten()); + assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten()); assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L)); assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length())); assertThat(taskMetrics.memoryBytesSpilled(), greaterThan(0L)); - assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.shuffleBytesWritten()); + assertEquals(mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten()); } @Test diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index c45d81459e8e2..6ffa1c8ac1406 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -450,8 +450,8 @@ object ShuffleSuite { val listener = new SparkListener { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { taskEnd.taskMetrics.shuffleWriteMetrics.foreach { m => - recordsWritten += m.shuffleRecordsWritten - bytesWritten += m.shuffleBytesWritten + recordsWritten += m.recordsWritten + bytesWritten += m.bytesWritten } taskEnd.taskMetrics.shuffleReadMetrics.foreach { m => recordsRead += m.recordsRead diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index aaf62e0f91067..e5a448298a624 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -212,7 +212,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext metrics.inputMetrics.foreach(inputRead += _.recordsRead) metrics.outputMetrics.foreach(outputWritten += _.recordsWritten) metrics.shuffleReadMetrics.foreach(shuffleRead += _.recordsRead) - metrics.shuffleWriteMetrics.foreach(shuffleWritten += _.shuffleRecordsWritten) + metrics.shuffleWriteMetrics.foreach(shuffleWritten += _.recordsWritten) } }) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index dc15f5932d6f8..c87158d89f3fb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -269,7 +269,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match taskMetrics.inputMetrics should not be ('defined) taskMetrics.outputMetrics should not be ('defined) taskMetrics.shuffleWriteMetrics should be ('defined) - taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0L) + taskMetrics.shuffleWriteMetrics.get.bytesWritten should be > (0L) } if (stageInfo.rddInfos.exists(_.name == d4.name)) { taskMetrics.shuffleReadMetrics should be ('defined) diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index ef6ce04e3ff28..fdacd8c9f5908 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -145,8 +145,8 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte assert(outputFile.length() === 0) assert(temporaryFilesCreated.isEmpty) val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get - assert(shuffleWriteMetrics.shuffleBytesWritten === 0) - assert(shuffleWriteMetrics.shuffleRecordsWritten === 0) + assert(shuffleWriteMetrics.bytesWritten === 0) + assert(shuffleWriteMetrics.recordsWritten === 0) assert(taskMetrics.diskBytesSpilled === 0) assert(taskMetrics.memoryBytesSpilled === 0) } @@ -169,8 +169,8 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte assert(writer.getPartitionLengths.filter(_ == 0L).size === 4) // should be 4 zero length files assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get - assert(shuffleWriteMetrics.shuffleBytesWritten === outputFile.length()) - assert(shuffleWriteMetrics.shuffleRecordsWritten === records.length) + assert(shuffleWriteMetrics.bytesWritten === outputFile.length()) + assert(shuffleWriteMetrics.recordsWritten === records.length) assert(taskMetrics.diskBytesSpilled === 0) assert(taskMetrics.memoryBytesSpilled === 0) } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala index 5d36617cfc447..8eff3c297035d 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala @@ -50,18 +50,18 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { writer.write(Long.box(20), Long.box(30)) // Record metrics update on every write - assert(writeMetrics.shuffleRecordsWritten === 1) + assert(writeMetrics.recordsWritten === 1) // Metrics don't update on every write - assert(writeMetrics.shuffleBytesWritten == 0) + assert(writeMetrics.bytesWritten == 0) // After 32 writes, metrics should update for (i <- 0 until 32) { writer.flush() writer.write(Long.box(i), Long.box(i)) } - assert(writeMetrics.shuffleBytesWritten > 0) - assert(writeMetrics.shuffleRecordsWritten === 33) + assert(writeMetrics.bytesWritten > 0) + assert(writeMetrics.recordsWritten === 33) writer.commitAndClose() - assert(file.length() == writeMetrics.shuffleBytesWritten) + assert(file.length() == writeMetrics.bytesWritten) } test("verify write metrics on revert") { @@ -72,19 +72,19 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { writer.write(Long.box(20), Long.box(30)) // Record metrics update on every write - assert(writeMetrics.shuffleRecordsWritten === 1) + assert(writeMetrics.recordsWritten === 1) // Metrics don't update on every write - assert(writeMetrics.shuffleBytesWritten == 0) + assert(writeMetrics.bytesWritten == 0) // After 32 writes, metrics should update for (i <- 0 until 32) { writer.flush() writer.write(Long.box(i), Long.box(i)) } - assert(writeMetrics.shuffleBytesWritten > 0) - assert(writeMetrics.shuffleRecordsWritten === 33) + assert(writeMetrics.bytesWritten > 0) + assert(writeMetrics.recordsWritten === 33) writer.revertPartialWritesAndClose() - assert(writeMetrics.shuffleBytesWritten == 0) - assert(writeMetrics.shuffleRecordsWritten == 0) + assert(writeMetrics.bytesWritten == 0) + assert(writeMetrics.recordsWritten == 0) } test("Reopening a closed block writer") { @@ -109,11 +109,11 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { writer.write(i, i) } writer.commitAndClose() - val bytesWritten = writeMetrics.shuffleBytesWritten - assert(writeMetrics.shuffleRecordsWritten === 1000) + val bytesWritten = writeMetrics.bytesWritten + assert(writeMetrics.recordsWritten === 1000) writer.revertPartialWritesAndClose() - assert(writeMetrics.shuffleRecordsWritten === 1000) - assert(writeMetrics.shuffleBytesWritten === bytesWritten) + assert(writeMetrics.recordsWritten === 1000) + assert(writeMetrics.bytesWritten === bytesWritten) } test("commitAndClose() should be idempotent") { @@ -125,13 +125,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { writer.write(i, i) } writer.commitAndClose() - val bytesWritten = writeMetrics.shuffleBytesWritten - val writeTime = writeMetrics.shuffleWriteTime - assert(writeMetrics.shuffleRecordsWritten === 1000) + val bytesWritten = writeMetrics.bytesWritten + val writeTime = writeMetrics.writeTime + assert(writeMetrics.recordsWritten === 1000) writer.commitAndClose() - assert(writeMetrics.shuffleRecordsWritten === 1000) - assert(writeMetrics.shuffleBytesWritten === bytesWritten) - assert(writeMetrics.shuffleWriteTime === writeTime) + assert(writeMetrics.recordsWritten === 1000) + assert(writeMetrics.bytesWritten === bytesWritten) + assert(writeMetrics.writeTime === writeTime) } test("revertPartialWritesAndClose() should be idempotent") { @@ -143,13 +143,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { writer.write(i, i) } writer.revertPartialWritesAndClose() - val bytesWritten = writeMetrics.shuffleBytesWritten - val writeTime = writeMetrics.shuffleWriteTime - assert(writeMetrics.shuffleRecordsWritten === 0) + val bytesWritten = writeMetrics.bytesWritten + val writeTime = writeMetrics.writeTime + assert(writeMetrics.recordsWritten === 0) writer.revertPartialWritesAndClose() - assert(writeMetrics.shuffleRecordsWritten === 0) - assert(writeMetrics.shuffleBytesWritten === bytesWritten) - assert(writeMetrics.shuffleWriteTime === writeTime) + assert(writeMetrics.recordsWritten === 0) + assert(writeMetrics.bytesWritten === bytesWritten) + assert(writeMetrics.writeTime === writeTime) } test("fileSegment() can only be called after commitAndClose() has been called") { diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index e02f5a1b20fe3..ee2d56a679395 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -277,7 +277,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with shuffleReadMetrics.incRemoteBytesRead(base + 1) shuffleReadMetrics.incLocalBytesRead(base + 9) shuffleReadMetrics.incRemoteBlocksFetched(base + 2) - shuffleWriteMetrics.incShuffleBytesWritten(base + 3) + shuffleWriteMetrics.incBytesWritten(base + 3) taskMetrics.setExecutorRunTime(base + 4) taskMetrics.incDiskBytesSpilled(base + 5) taskMetrics.incMemoryBytesSpilled(base + 6) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 068e8397c89bb..9dd400fc1c2de 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -227,7 +227,7 @@ class JsonProtocolSuite extends SparkFunSuite { .removeField { case (field, _) => field == "Shuffle Records Written" } val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) assert(newMetrics.shuffleReadMetrics.get.recordsRead == 0) - assert(newMetrics.shuffleWriteMetrics.get.shuffleRecordsWritten == 0) + assert(newMetrics.shuffleWriteMetrics.get.recordsWritten == 0) } test("OutputMetrics backward compatibility") { @@ -568,8 +568,8 @@ class JsonProtocolSuite extends SparkFunSuite { } private def assertEquals(metrics1: ShuffleWriteMetrics, metrics2: ShuffleWriteMetrics) { - assert(metrics1.shuffleBytesWritten === metrics2.shuffleBytesWritten) - assert(metrics1.shuffleWriteTime === metrics2.shuffleWriteTime) + assert(metrics1.bytesWritten === metrics2.bytesWritten) + assert(metrics1.writeTime === metrics2.writeTime) } private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) { @@ -794,9 +794,9 @@ class JsonProtocolSuite extends SparkFunSuite { t.outputMetrics = Some(outputMetrics) } else { val sw = new ShuffleWriteMetrics - sw.incShuffleBytesWritten(a + b + c) - sw.incShuffleWriteTime(b + c + d) - sw.setShuffleRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1) + sw.incBytesWritten(a + b + c) + sw.incWriteTime(b + c + d) + sw.setRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1) t.shuffleWriteMetrics = Some(sw) } // Make at most 6 blocks From 74ba84b64cab0bf3828033037267955aca296d3a Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 18 Jan 2016 19:40:10 -0800 Subject: [PATCH 8/8] [HOT][BUILD] Changed the import order This PR is to fix the master's build break. The following tests failed due to the import order issues in the master. https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49651/consoleFull https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49652/consoleFull https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49653/consoleFull Author: gatorsmile Closes #10823 from gatorsmile/importOrder. --- .../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 2 +- .../org/apache/spark/sql/execution/joins/InnerJoinSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index df0f730499211..c4ddb6d76b2c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight} import org.apache.spark.sql.{execution, Strategy} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -29,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand} import org.apache.spark.sql.execution.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _} +import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight} private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SparkPlanner => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala index ab81b702596af..149f34dbd748f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.joins +import org.apache.spark.sql.{DataFrame, Row, SQLConf} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys import org.apache.spark.sql.catalyst.plans.Inner @@ -24,7 +25,6 @@ import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.execution._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StringType, StructType} -import org.apache.spark.sql.{DataFrame, Row, SQLConf} class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { import testImplicits.localSeqToDataFrameHolder