From fa18d262513d763ac0616971d9288a94a54bc88f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 27 Jul 2015 12:22:47 -0700 Subject: [PATCH 01/12] Add basic unit test for script transform with 'cat' command. --- .../spark/sql/execution/SparkPlanTest.scala | 27 ++++++---- .../execution/ScriptTransformationSuite.scala | 54 +++++++++++++++++++ 2 files changed, 70 insertions(+), 11 deletions(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala index 6a8f394545816..f46855edfe0de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala @@ -21,7 +21,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.test.TestSQLContext -import org.apache.spark.sql.{DataFrame, DataFrameHolder, Row} +import org.apache.spark.sql.{SQLContext, DataFrame, DataFrameHolder, Row} import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag @@ -33,11 +33,13 @@ import scala.util.control.NonFatal */ class SparkPlanTest extends SparkFunSuite { + protected def sqlContext: SQLContext = TestSQLContext + /** * Creates a DataFrame from a local Seq of Product. */ implicit def localSeqToDataFrameHolder[A <: Product : TypeTag](data: Seq[A]): DataFrameHolder = { - TestSQLContext.implicits.localSeqToDataFrameHolder(data) + sqlContext.implicits.localSeqToDataFrameHolder(data) } /** @@ -98,7 +100,7 @@ class SparkPlanTest extends SparkFunSuite { planFunction: Seq[SparkPlan] => SparkPlan, expectedAnswer: Seq[Row], sortAnswers: Boolean = true): Unit = { - SparkPlanTest.checkAnswer(input, planFunction, expectedAnswer, sortAnswers) match { + SparkPlanTest.checkAnswer(input, planFunction, expectedAnswer, sortAnswers, sqlContext) match { case Some(errorMessage) => fail(errorMessage) case None => } @@ -121,7 +123,8 @@ class SparkPlanTest extends SparkFunSuite { planFunction: SparkPlan => SparkPlan, expectedPlanFunction: SparkPlan => SparkPlan, sortAnswers: Boolean = true): Unit = { - SparkPlanTest.checkAnswer(input, planFunction, expectedPlanFunction, sortAnswers) match { + SparkPlanTest.checkAnswer( + input, planFunction, expectedPlanFunction, sortAnswers, sqlContext) match { case Some(errorMessage) => fail(errorMessage) case None => } @@ -147,13 +150,14 @@ object SparkPlanTest { input: DataFrame, planFunction: SparkPlan => SparkPlan, expectedPlanFunction: SparkPlan => SparkPlan, - sortAnswers: Boolean): Option[String] = { + sortAnswers: Boolean, + sqlContext: SQLContext): Option[String] = { val outputPlan = planFunction(input.queryExecution.sparkPlan) val expectedOutputPlan = expectedPlanFunction(input.queryExecution.sparkPlan) val expectedAnswer: Seq[Row] = try { - executePlan(expectedOutputPlan) + executePlan(expectedOutputPlan, sqlContext) } catch { case NonFatal(e) => val errorMessage = @@ -168,7 +172,7 @@ object SparkPlanTest { } val actualAnswer: Seq[Row] = try { - executePlan(outputPlan) + executePlan(outputPlan, sqlContext) } catch { case NonFatal(e) => val errorMessage = @@ -207,12 +211,13 @@ object SparkPlanTest { input: Seq[DataFrame], planFunction: Seq[SparkPlan] => SparkPlan, expectedAnswer: Seq[Row], - sortAnswers: Boolean): Option[String] = { + sortAnswers: Boolean, + sqlContext: SQLContext): Option[String] = { val outputPlan = planFunction(input.map(_.queryExecution.sparkPlan)) val sparkAnswer: Seq[Row] = try { - executePlan(outputPlan) + executePlan(outputPlan, sqlContext) } catch { case NonFatal(e) => val errorMessage = @@ -275,10 +280,10 @@ object SparkPlanTest { } } - private def executePlan(outputPlan: SparkPlan): Seq[Row] = { + private def executePlan(outputPlan: SparkPlan, sqlContext: SQLContext): Seq[Row] = { // A very simple resolver to make writing tests easier. In contrast to the real resolver // this is always case sensitive and does not try to handle scoping or complex type resolution. - val resolvedPlan = TestSQLContext.prepareForExecution.execute( + val resolvedPlan = sqlContext.prepareForExecution.execute( outputPlan transform { case plan: SparkPlan => val inputMap = plan.children.flatMap(_.output).map(a => (a.name, a)).toMap diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala new file mode 100644 index 0000000000000..363b9b20001a2 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest} +import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.types.StringType + +class ScriptTransformationSuite extends SparkPlanTest { + + override def sqlContext: SQLContext = TestHive + + private val ioschema = HiveScriptIOSchema( + inputRowFormat = Seq.empty, + outputRowFormat = Seq.empty, + inputSerdeClass = "", + outputSerdeClass = "", + inputSerdeProps = Seq.empty, + outputSerdeProps = Seq.empty, + schemaLess = false + ) + + test("basic test with 'cat' command") { + val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") + checkAnswer( + rowsDf, + (child: SparkPlan) => new ScriptTransformation( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = child, + ioschema = ioschema + )(TestHive), + rowsDf.collect()) + } + +} From b43e4ec31b3bd4030c9e9985a64b2f186df17825 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 27 Jul 2015 14:36:55 -0700 Subject: [PATCH 02/12] Clean up nullability in ScriptTransformation --- .../org/apache/spark/sql/hive/HiveQl.scala | 10 +- .../hive/execution/ScriptTransformation.scala | 100 ++++++++---------- .../execution/ScriptTransformationSuite.scala | 4 +- 3 files changed, 54 insertions(+), 60 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 620b8a44d8a9b..69c9cd3543a77 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -874,15 +874,15 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C } def matchSerDe(clause: Seq[ASTNode]) - : (Seq[(String, String)], String, Seq[(String, String)]) = clause match { + : (Seq[(String, String)], Option[String], Seq[(String, String)]) = clause match { case Token("TOK_SERDEPROPS", propsClause) :: Nil => val rowFormat = propsClause.map { case Token(name, Token(value, Nil) :: Nil) => (name, value) } - (rowFormat, "", Nil) + (rowFormat, None, Nil) case Token("TOK_SERDENAME", Token(serdeClass, Nil) :: Nil) :: Nil => - (Nil, serdeClass, Nil) + (Nil, Some(serdeClass), Nil) case Token("TOK_SERDENAME", Token(serdeClass, Nil) :: Token("TOK_TABLEPROPERTIES", @@ -891,9 +891,9 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case Token("TOK_TABLEPROPERTY", Token(name, Nil) :: Token(value, Nil) :: Nil) => (name, value) } - (Nil, serdeClass, serdeProps) + (Nil, Some(serdeClass), serdeProps) - case Nil => (Nil, "", Nil) + case Nil => (Nil, None, Nil) } val (inRowFormat, inSerdeClass, inSerdeProps) = matchSerDe(inputSerdeClause) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 205e622195f09..06c363ff090a3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution import java.io.{BufferedReader, DataInputStream, DataOutputStream, EOFException, InputStreamReader} import java.util.Properties +import javax.annotation.Nullable import scala.collection.JavaConversions._ @@ -68,7 +69,11 @@ case class ScriptTransformation( val errorStream = proc.getErrorStream val reader = new BufferedReader(new InputStreamReader(inputStream)) - val (outputSerde, outputSoi) = ioschema.initOutputSerDe(output) + // This nullability is a performance optimization in order to avoid an Option.foreach() call + // inside of a loop + @Nullable val (outputSerde, outputSoi) = { + ioschema.initOutputSerDe(output).getOrElse((null, null)) + } val iterator: Iterator[InternalRow] = new Iterator[InternalRow] with HiveInspectors { var cacheRow: InternalRow = null @@ -146,7 +151,9 @@ case class ScriptTransformation( } } - val (inputSerde, inputSoi) = ioschema.initInputSerDe(input) + // This nullability is a performance optimization in order to avoid an Option.foreach() call + // inside of a loop + @Nullable val (inputSerde, inputSoi) = ioschema.initInputSerDe(input).getOrElse((null, null)) val dataOutputStream = new DataOutputStream(outputStream) val outputProjection = new InterpretedProjection(input, child.output) @@ -200,33 +207,43 @@ private[hive] case class HiveScriptIOSchema ( inputRowFormat: Seq[(String, String)], outputRowFormat: Seq[(String, String)], - inputSerdeClass: String, - outputSerdeClass: String, + inputSerdeClass: Option[String], + outputSerdeClass: Option[String], inputSerdeProps: Seq[(String, String)], outputSerdeProps: Seq[(String, String)], schemaLess: Boolean) extends ScriptInputOutputSchema with HiveInspectors { - val defaultFormat = Map(("TOK_TABLEROWFORMATFIELD", "\t"), - ("TOK_TABLEROWFORMATLINES", "\n")) + private val defaultFormat = Map( + ("TOK_TABLEROWFORMATFIELD", "\t"), + ("TOK_TABLEROWFORMATLINES", "\n") + ) val inputRowFormatMap = inputRowFormat.toMap.withDefault((k) => defaultFormat(k)) val outputRowFormatMap = outputRowFormat.toMap.withDefault((k) => defaultFormat(k)) - def initInputSerDe(input: Seq[Expression]): (AbstractSerDe, ObjectInspector) = { - val (columns, columnTypes) = parseAttrs(input) - val serde = initSerDe(inputSerdeClass, columns, columnTypes, inputSerdeProps) - (serde, initInputSoi(serde, columns, columnTypes)) + def initInputSerDe(input: Seq[Expression]): Option[(AbstractSerDe, ObjectInspector)] = { + inputSerdeClass.map { serdeClass => + val (columns, columnTypes) = parseAttrs(input) + val serde = initSerDe(serdeClass, columns, columnTypes, inputSerdeProps) + val fieldObjectInspectors = columnTypes.map(toInspector) + val objectInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(columns, fieldObjectInspectors) + .asInstanceOf[ObjectInspector] + (serde, objectInspector) + } } - def initOutputSerDe(output: Seq[Attribute]): (AbstractSerDe, StructObjectInspector) = { - val (columns, columnTypes) = parseAttrs(output) - val serde = initSerDe(outputSerdeClass, columns, columnTypes, outputSerdeProps) - (serde, initOutputputSoi(serde)) + def initOutputSerDe(output: Seq[Attribute]): Option[(AbstractSerDe, StructObjectInspector)] = { + outputSerdeClass.map { serdeClass => + val (columns, columnTypes) = parseAttrs(output) + val serde = initSerDe(serdeClass, columns, columnTypes, outputSerdeProps) + val structObjectInspector = serde.getObjectInspector().asInstanceOf[StructObjectInspector] + (serde, structObjectInspector) + } } - def parseAttrs(attrs: Seq[Expression]): (Seq[String], Seq[DataType]) = { - + private def parseAttrs(attrs: Seq[Expression]): (Seq[String], Seq[DataType]) = { val columns = attrs.map { case aref: AttributeReference => aref.name case e: NamedExpression => e.name @@ -242,52 +259,29 @@ case class HiveScriptIOSchema ( (columns, columnTypes) } - def initSerDe(serdeClassName: String, columns: Seq[String], - columnTypes: Seq[DataType], serdeProps: Seq[(String, String)]): AbstractSerDe = { + private def initSerDe( + serdeClassName: String, + columns: Seq[String], + columnTypes: Seq[DataType], + serdeProps: Seq[(String, String)]): AbstractSerDe = { - val serde: AbstractSerDe = if (serdeClassName != "") { + val serde: AbstractSerDe = { val trimed_class = serdeClassName.split("'")(1) Utils.classForName(trimed_class) .newInstance.asInstanceOf[AbstractSerDe] - } else { - null } - if (serde != null) { - val columnTypesNames = columnTypes.map(_.toTypeInfo.getTypeName()).mkString(",") + val columnTypesNames = columnTypes.map(_.toTypeInfo.getTypeName()).mkString(",") - var propsMap = serdeProps.map(kv => { - (kv._1.split("'")(1), kv._2.split("'")(1)) - }).toMap + (serdeConstants.LIST_COLUMNS -> columns.mkString(",")) - propsMap = propsMap + (serdeConstants.LIST_COLUMN_TYPES -> columnTypesNames) + var propsMap = serdeProps.map(kv => { + (kv._1.split("'")(1), kv._2.split("'")(1)) + }).toMap + (serdeConstants.LIST_COLUMNS -> columns.mkString(",")) + propsMap = propsMap + (serdeConstants.LIST_COLUMN_TYPES -> columnTypesNames) - val properties = new Properties() - properties.putAll(propsMap) - serde.initialize(null, properties) - } + val properties = new Properties() + properties.putAll(propsMap) + serde.initialize(null, properties) serde } - - def initInputSoi(inputSerde: AbstractSerDe, columns: Seq[String], columnTypes: Seq[DataType]) - : ObjectInspector = { - - if (inputSerde != null) { - val fieldObjectInspectors = columnTypes.map(toInspector(_)) - ObjectInspectorFactory - .getStandardStructObjectInspector(columns, fieldObjectInspectors) - .asInstanceOf[ObjectInspector] - } else { - null - } - } - - def initOutputputSoi(outputSerde: AbstractSerDe): StructObjectInspector = { - if (outputSerde != null) { - outputSerde.getObjectInspector().asInstanceOf[StructObjectInspector] - } else { - null - } - } } - diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index 363b9b20001a2..aca7dbcff10c1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -30,8 +30,8 @@ class ScriptTransformationSuite extends SparkPlanTest { private val ioschema = HiveScriptIOSchema( inputRowFormat = Seq.empty, outputRowFormat = Seq.empty, - inputSerdeClass = "", - outputSerdeClass = "", + inputSerdeClass = None, + outputSerdeClass = None, inputSerdeProps = Seq.empty, outputSerdeProps = Seq.empty, schemaLess = false From bd4c94887a3dbe63a4794c1ac8ea4457a0925887 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 27 Jul 2015 16:21:44 -0700 Subject: [PATCH 03/12] Skip launching of external command for empty partitions. --- .../hive/execution/ScriptTransformation.scala | 10 ++++++- .../execution/ScriptTransformationSuite.scala | 26 ++++++++++++++++--- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 06c363ff090a3..7343fe69e65aa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -57,7 +57,7 @@ case class ScriptTransformation( override def otherCopyArgs: Seq[HiveContext] = sc :: Nil protected override def doExecute(): RDD[InternalRow] = { - child.execute().mapPartitions { iter => + def processIterator(iter: Iterator[InternalRow]): Iterator[InternalRow] = { val cmd = List("/bin/bash", "-c", script) val builder = new ProcessBuilder(cmd) // We need to start threads connected to the process pipeline: @@ -197,6 +197,14 @@ case class ScriptTransformation( iterator } + + child.execute().mapPartitions { iter => + if (iter.hasNext) { + processIterator(iter) + } else { + Iterator.empty + } + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index aca7dbcff10c1..6b7fff1c0994f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive.execution +import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe + import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest} @@ -27,7 +29,7 @@ class ScriptTransformationSuite extends SparkPlanTest { override def sqlContext: SQLContext = TestHive - private val ioschema = HiveScriptIOSchema( + private val noSerdeIOSchema = HiveScriptIOSchema( inputRowFormat = Seq.empty, outputRowFormat = Seq.empty, inputSerdeClass = None, @@ -37,7 +39,12 @@ class ScriptTransformationSuite extends SparkPlanTest { schemaLess = false ) - test("basic test with 'cat' command") { + val serdeIOSchema = noSerdeIOSchema.copy( + inputSerdeClass = Some(s"'${classOf[LazySimpleSerDe].getCanonicalName}"), + outputSerdeClass = Some(s"'${classOf[LazySimpleSerDe].getCanonicalName}") + ) + + test("cat without SerDe") { val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") checkAnswer( rowsDf, @@ -46,9 +53,22 @@ class ScriptTransformationSuite extends SparkPlanTest { script = "cat", output = Seq(AttributeReference("a", StringType)()), child = child, - ioschema = ioschema + ioschema = noSerdeIOSchema )(TestHive), rowsDf.collect()) } + test("cat with LazySimpleSerDe") { + val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") + checkAnswer( + rowsDf, + (child: SparkPlan) => new ScriptTransformation( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = child, + ioschema = serdeIOSchema + )(TestHive), + rowsDf.collect()) + } } From 4ee36a209a22ed35f51704fc2002c52646d0b70e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 27 Jul 2015 16:48:33 -0700 Subject: [PATCH 04/12] Kill script transform subprocess when error occurs in input writer. --- .../hive/execution/ScriptTransformation.scala | 33 +++++++++++++++---- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 7343fe69e65aa..10d1174c2b69b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -22,6 +22,7 @@ import java.util.Properties import javax.annotation.Nullable import scala.collection.JavaConversions._ +import scala.util.control.NonFatal import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.AbstractSerDe @@ -171,10 +172,11 @@ case class ScriptTransformation( // the pipeline / buffer capacity. new Thread(new Runnable() { override def run(): Unit = { - Utils.tryWithSafeFinally { - iter - .map(outputProjection) - .foreach { row => + // We can't use Utils.tryWithSafeFinally here because we also need a `catch` block, so + // let's use a variable to record whether the `finally` block was hit due to an exception + var threwException: Boolean = true + try { + iter.map(outputProjection).foreach { row => if (inputSerde == null) { val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"), ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8") @@ -187,9 +189,25 @@ case class ScriptTransformation( } } outputStream.close() - } { - if (proc.waitFor() != 0) { - logError(stderrBuffer.toString) // log the stderr circular buffer + threwException = false + } catch { + case NonFatal(e) => + // An error occurred while writing input, so kill the child process. According to the + // Javadoc this call will not throw an exception: + proc.destroy() + throw e + } finally { + try { + if (proc.waitFor() != 0) { + logError(stderrBuffer.toString) // log the stderr circular buffer + } + } catch { + case NonFatal(exceptionFromFinallyBlock) => + if (!threwException) { + throw exceptionFromFinallyBlock + } else { + log.error("Exception in finally block", exceptionFromFinallyBlock) + } } } } @@ -202,6 +220,7 @@ case class ScriptTransformation( if (iter.hasNext) { processIterator(iter) } else { + // If the input iterator has no rows then do not launch the external script. Iterator.empty } } From 8b162b6776f5cda19a0e80147e27a4fa53d1780f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 27 Jul 2015 18:32:38 -0700 Subject: [PATCH 05/12] Add failing test which demonstrates exception masking issue --- .../execution/ScriptTransformationSuite.scala | 33 +++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index 6b7fff1c0994f..2cd5f09135c88 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -19,9 +19,11 @@ package org.apache.spark.sql.hive.execution import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.execution.{UnaryNode, SparkPlan, SparkPlanTest} import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.types.StringType @@ -71,4 +73,31 @@ class ScriptTransformationSuite extends SparkPlanTest { )(TestHive), rowsDf.collect()) } + + test("script transformation should not swallow errors from upstream pipelined operators") { + val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") + val e = intercept[Exception] { + checkAnswer( + rowsDf, + (child: SparkPlan) => new ScriptTransformation( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = serdeIOSchema + )(TestHive), + rowsDf.collect()) + } + assert(e.getMessage === "intentional exception") + } +} + +private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryNode { + override protected def doExecute(): RDD[InternalRow] = { + child.execute().map { x => + Thread.sleep(1000) // This sleep gives the external process time to start. + throw new Exception("intentional exception") + } + } + override def output: Seq[Attribute] = child.output } From 88278de8d7539c63c67db79da798a850fc15c4c3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 27 Jul 2015 18:47:06 -0700 Subject: [PATCH 06/12] Split ScriptTransformation writer thread into own class. --- .../hive/execution/ScriptTransformation.scala | 120 +++++++++++------- .../execution/ScriptTransformationSuite.scala | 6 +- 2 files changed, 76 insertions(+), 50 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 10d1174c2b69b..1cfb3030d38aa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.execution -import java.io.{BufferedReader, DataInputStream, DataOutputStream, EOFException, InputStreamReader} +import java.io._ import java.util.Properties import javax.annotation.Nullable @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.AbstractSerDe import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.CatalystTypeConverters @@ -152,10 +153,6 @@ case class ScriptTransformation( } } - // This nullability is a performance optimization in order to avoid an Option.foreach() call - // inside of a loop - @Nullable val (inputSerde, inputSoi) = ioschema.initInputSerDe(input).getOrElse((null, null)) - val dataOutputStream = new DataOutputStream(outputStream) val outputProjection = new InterpretedProjection(input, child.output) // TODO make the 2048 configurable? @@ -166,52 +163,25 @@ case class ScriptTransformation( stderrBuffer, // output to a circular buffer "Thread-ScriptTransformation-STDERR-Consumer").start() + // This nullability is a performance optimization in order to avoid an Option.foreach() call + // inside of a loop + @Nullable val (inputSerde, inputSoi) = ioschema.initInputSerDe(input).getOrElse((null, null)) + // Put the write(output to the pipeline) into a single thread // and keep the collector as remain in the main thread. // otherwise it will causes deadlock if the data size greater than // the pipeline / buffer capacity. - new Thread(new Runnable() { - override def run(): Unit = { - // We can't use Utils.tryWithSafeFinally here because we also need a `catch` block, so - // let's use a variable to record whether the `finally` block was hit due to an exception - var threwException: Boolean = true - try { - iter.map(outputProjection).foreach { row => - if (inputSerde == null) { - val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"), - ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8") - - outputStream.write(data) - } else { - val writable = inputSerde.serialize( - row.asInstanceOf[GenericInternalRow].values, inputSoi) - prepareWritable(writable).write(dataOutputStream) - } - } - outputStream.close() - threwException = false - } catch { - case NonFatal(e) => - // An error occurred while writing input, so kill the child process. According to the - // Javadoc this call will not throw an exception: - proc.destroy() - throw e - } finally { - try { - if (proc.waitFor() != 0) { - logError(stderrBuffer.toString) // log the stderr circular buffer - } - } catch { - case NonFatal(exceptionFromFinallyBlock) => - if (!threwException) { - throw exceptionFromFinallyBlock - } else { - log.error("Exception in finally block", exceptionFromFinallyBlock) - } - } - } - } - }, "Thread-ScriptTransformation-Feed").start() + val writerThread = new ScriptTransformationWriterThread( + iter, + outputProjection, + inputSerde, + inputSoi, + ioschema, + outputStream, + proc, + stderrBuffer + ) + writerThread.start() iterator } @@ -227,6 +197,62 @@ case class ScriptTransformation( } } +private class ScriptTransformationWriterThread( + iter: Iterator[InternalRow], + outputProjection: Projection, + @Nullable inputSerde: AbstractSerDe, + @Nullable inputSoi: ObjectInspector, + ioschema: HiveScriptIOSchema, + outputStream: OutputStream, + proc: Process, + stderrBuffer: CircularBuffer + ) extends Thread("Thread-ScriptTransformation-Feed") with Logging { + + setDaemon(true) + + override def run(): Unit = Utils.logUncaughtExceptions { + val dataOutputStream = new DataOutputStream(outputStream) + + // We can't use Utils.tryWithSafeFinally here because we also need a `catch` block, so + // let's use a variable to record whether the `finally` block was hit due to an exception + var threwException: Boolean = true + try { + iter.map(outputProjection).foreach { row => + if (inputSerde == null) { + val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"), + ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8") + outputStream.write(data) + } else { + val writable = inputSerde.serialize( + row.asInstanceOf[GenericInternalRow].values, inputSoi) + prepareWritable(writable).write(dataOutputStream) + } + } + outputStream.close() + threwException = false + } catch { + case NonFatal(e) => + // An error occurred while writing input, so kill the child process. According to the + // Javadoc this call will not throw an exception: + proc.destroy() + throw e + } finally { + try { + if (proc.waitFor() != 0) { + logError(stderrBuffer.toString) // log the stderr circular buffer + } + } catch { + case NonFatal(exceptionFromFinallyBlock) => + if (!threwException) { + throw exceptionFromFinallyBlock + } else { + log.error("Exception in finally block", exceptionFromFinallyBlock) + } + } + } + } +} + /** * The wrapper class of Hive input and output schema properties */ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index 2cd5f09135c88..89409418f0f32 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -41,7 +41,7 @@ class ScriptTransformationSuite extends SparkPlanTest { schemaLess = false ) - val serdeIOSchema = noSerdeIOSchema.copy( + private val serdeIOSchema = noSerdeIOSchema.copy( inputSerdeClass = Some(s"'${classOf[LazySimpleSerDe].getCanonicalName}"), outputSerdeClass = Some(s"'${classOf[LazySimpleSerDe].getCanonicalName}") ) @@ -76,7 +76,7 @@ class ScriptTransformationSuite extends SparkPlanTest { test("script transformation should not swallow errors from upstream pipelined operators") { val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") - val e = intercept[Exception] { + val e = intercept[IllegalArgumentException] { checkAnswer( rowsDf, (child: SparkPlan) => new ScriptTransformation( @@ -96,7 +96,7 @@ private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryNod override protected def doExecute(): RDD[InternalRow] = { child.execute().map { x => Thread.sleep(1000) // This sleep gives the external process time to start. - throw new Exception("intentional exception") + throw new IllegalArgumentException("intentional exception") } } override def output: Seq[Attribute] = child.output From b31258dec638dcce37ff1542c9b31b20aedeaad7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 27 Jul 2015 18:48:04 -0700 Subject: [PATCH 07/12] Rename iterator variables to disambiguate. --- .../spark/sql/hive/execution/ScriptTransformation.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 1cfb3030d38aa..64119a5672ee4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -59,7 +59,7 @@ case class ScriptTransformation( override def otherCopyArgs: Seq[HiveContext] = sc :: Nil protected override def doExecute(): RDD[InternalRow] = { - def processIterator(iter: Iterator[InternalRow]): Iterator[InternalRow] = { + def processIterator(inputIterator: Iterator[InternalRow]): Iterator[InternalRow] = { val cmd = List("/bin/bash", "-c", script) val builder = new ProcessBuilder(cmd) // We need to start threads connected to the process pipeline: @@ -77,7 +77,7 @@ case class ScriptTransformation( ioschema.initOutputSerDe(output).getOrElse((null, null)) } - val iterator: Iterator[InternalRow] = new Iterator[InternalRow] with HiveInspectors { + val outputIterator: Iterator[InternalRow] = new Iterator[InternalRow] with HiveInspectors { var cacheRow: InternalRow = null var curLine: String = null var eof: Boolean = false @@ -172,7 +172,7 @@ case class ScriptTransformation( // otherwise it will causes deadlock if the data size greater than // the pipeline / buffer capacity. val writerThread = new ScriptTransformationWriterThread( - iter, + inputIterator, outputProjection, inputSerde, inputSoi, @@ -183,7 +183,7 @@ case class ScriptTransformation( ) writerThread.start() - iterator + outputIterator } child.execute().mapPartitions { iter => From 323bb2b0e860ba64032a51632f6ba0ca0b9e45d9 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 27 Jul 2015 19:02:59 -0700 Subject: [PATCH 08/12] Fix error-swallowing bug --- .../hive/execution/ScriptTransformation.scala | 86 ++++++++++++------- .../execution/ScriptTransformationSuite.scala | 24 +++++- 2 files changed, 75 insertions(+), 35 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 64119a5672ee4..a89d604fdb908 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -71,6 +71,36 @@ case class ScriptTransformation( val errorStream = proc.getErrorStream val reader = new BufferedReader(new InputStreamReader(inputStream)) + // TODO make the 2048 configurable? + val stderrBuffer = new CircularBuffer(2048) + + // Consume the error stream from the pipeline, otherwise it will be blocked if + // the pipeline is full. + new RedirectThread(errorStream, // input stream from the pipeline + stderrBuffer, // output to a circular buffer + "Thread-ScriptTransformation-STDERR-Consumer").start() + + val outputProjection = new InterpretedProjection(input, child.output) + + // This nullability is a performance optimization in order to avoid an Option.foreach() call + // inside of a loop + @Nullable val (inputSerde, inputSoi) = ioschema.initInputSerDe(input).getOrElse((null, null)) + + // Put the write(output to the pipeline) into a single thread + // and keep the collector as remain in the main thread. + // otherwise it will causes deadlock if the data size greater than + // the pipeline / buffer capacity. + val writerThread = new ScriptTransformationWriterThread( + inputIterator, + outputProjection, + inputSerde, + inputSoi, + ioschema, + outputStream, + proc, + stderrBuffer + ) + // This nullability is a performance optimization in order to avoid an Option.foreach() call // inside of a loop @Nullable val (outputSerde, outputSoi) = { @@ -86,12 +116,26 @@ case class ScriptTransformation( if (outputSerde == null) { if (curLine == null) { curLine = reader.readLine() - curLine != null + if (curLine == null) { + if (writerThread.exception.isDefined) { + throw writerThread.exception.get + } + false + } else { + true + } } else { true } } else { - !eof + if (eof) { + if (writerThread.exception.isDefined) { + throw writerThread.exception.get + } + false + } else { + true + } } } @@ -117,11 +161,11 @@ case class ScriptTransformation( } i += 1 }) - return mutableRow + mutableRow } catch { case e: EOFException => eof = true - return null + null } } @@ -153,34 +197,6 @@ case class ScriptTransformation( } } - val outputProjection = new InterpretedProjection(input, child.output) - - // TODO make the 2048 configurable? - val stderrBuffer = new CircularBuffer(2048) - // Consume the error stream from the pipeline, otherwise it will be blocked if - // the pipeline is full. - new RedirectThread(errorStream, // input stream from the pipeline - stderrBuffer, // output to a circular buffer - "Thread-ScriptTransformation-STDERR-Consumer").start() - - // This nullability is a performance optimization in order to avoid an Option.foreach() call - // inside of a loop - @Nullable val (inputSerde, inputSoi) = ioschema.initInputSerDe(input).getOrElse((null, null)) - - // Put the write(output to the pipeline) into a single thread - // and keep the collector as remain in the main thread. - // otherwise it will causes deadlock if the data size greater than - // the pipeline / buffer capacity. - val writerThread = new ScriptTransformationWriterThread( - inputIterator, - outputProjection, - inputSerde, - inputSoi, - ioschema, - outputStream, - proc, - stderrBuffer - ) writerThread.start() outputIterator @@ -210,6 +226,11 @@ private class ScriptTransformationWriterThread( setDaemon(true) + @volatile private var _exception: Throwable = null + + /** Contains the exception thrown while writing the parent iterator to the external process. */ + def exception: Option[Throwable] = Option(_exception) + override def run(): Unit = Utils.logUncaughtExceptions { val dataOutputStream = new DataOutputStream(outputStream) @@ -234,6 +255,7 @@ private class ScriptTransformationWriterThread( case NonFatal(e) => // An error occurred while writing input, so kill the child process. According to the // Javadoc this call will not throw an exception: + _exception = e proc.destroy() throw e } finally { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index 89409418f0f32..b47b9ae4b5286 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe +import org.scalatest.exceptions.TestFailedException import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext @@ -74,9 +75,26 @@ class ScriptTransformationSuite extends SparkPlanTest { rowsDf.collect()) } - test("script transformation should not swallow errors from upstream pipelined operators") { + test("script transformation should not swallow errors from upstream operators (no serde)") { val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") - val e = intercept[IllegalArgumentException] { + val e = intercept[TestFailedException] { + checkAnswer( + rowsDf, + (child: SparkPlan) => new ScriptTransformation( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = noSerdeIOSchema + )(TestHive), + rowsDf.collect()) + } + assert(e.getMessage().contains("intentional exception")) + } + + test("script transformation should not swallow errors from upstream operators (with serde)") { + val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") + val e = intercept[TestFailedException] { checkAnswer( rowsDf, (child: SparkPlan) => new ScriptTransformation( @@ -88,7 +106,7 @@ class ScriptTransformationSuite extends SparkPlanTest { )(TestHive), rowsDf.collect()) } - assert(e.getMessage === "intentional exception") + assert(e.getMessage().contains("intentional exception")) } } From 494cde023ffb87a9308709dd536d9957105969d0 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 27 Jul 2015 19:10:14 -0700 Subject: [PATCH 09/12] Propagate TaskContext to writer thread --- .../sql/hive/execution/ScriptTransformation.scala | 10 +++++++--- .../sql/hive/execution/ScriptTransformationSuite.scala | 2 ++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index a89d604fdb908..7b82479064935 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.AbstractSerDe import org.apache.hadoop.hive.serde2.objectinspector._ -import org.apache.spark.Logging +import org.apache.spark.{TaskContext, Logging} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.CatalystTypeConverters @@ -98,7 +98,8 @@ case class ScriptTransformation( ioschema, outputStream, proc, - stderrBuffer + stderrBuffer, + TaskContext.get() ) // This nullability is a performance optimization in order to avoid an Option.foreach() call @@ -221,7 +222,8 @@ private class ScriptTransformationWriterThread( ioschema: HiveScriptIOSchema, outputStream: OutputStream, proc: Process, - stderrBuffer: CircularBuffer + stderrBuffer: CircularBuffer, + taskContext: TaskContext ) extends Thread("Thread-ScriptTransformation-Feed") with Logging { setDaemon(true) @@ -232,6 +234,8 @@ private class ScriptTransformationWriterThread( def exception: Option[Throwable] = Option(_exception) override def run(): Unit = Utils.logUncaughtExceptions { + TaskContext.setTaskContext(taskContext) + val dataOutputStream = new DataOutputStream(outputStream) // We can't use Utils.tryWithSafeFinally here because we also need a `catch` block, so diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index b47b9ae4b5286..77b88b5581c59 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.scalatest.exceptions.TestFailedException +import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow @@ -113,6 +114,7 @@ class ScriptTransformationSuite extends SparkPlanTest { private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryNode { override protected def doExecute(): RDD[InternalRow] = { child.execute().map { x => + assert(TaskContext.get() != null) // Make sure that TaskContext is defined. Thread.sleep(1000) // This sleep gives the external process time to start. throw new IllegalArgumentException("intentional exception") } From 6a06a8c91640e1361383e40f42696fac78091720 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 27 Jul 2015 19:40:50 -0700 Subject: [PATCH 10/12] Clean up handling of quotes in serde class name --- .../src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 4 ++-- .../spark/sql/hive/execution/ScriptTransformation.scala | 6 +----- .../sql/hive/execution/ScriptTransformationSuite.scala | 4 ++-- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 69c9cd3543a77..60c05954502f8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -882,7 +882,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C (rowFormat, None, Nil) case Token("TOK_SERDENAME", Token(serdeClass, Nil) :: Nil) :: Nil => - (Nil, Some(serdeClass), Nil) + (Nil, Some(PlanUtils.stripQuotes(serdeClass)), Nil) case Token("TOK_SERDENAME", Token(serdeClass, Nil) :: Token("TOK_TABLEPROPERTIES", @@ -891,7 +891,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case Token("TOK_TABLEPROPERTY", Token(name, Nil) :: Token(value, Nil) :: Nil) => (name, value) } - (Nil, Some(serdeClass), serdeProps) + (Nil, Some(PlanUtils.stripQuotes(serdeClass)), serdeProps) case Nil => (Nil, None, Nil) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 7b82479064935..98089fe69ea25 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -344,11 +344,7 @@ case class HiveScriptIOSchema ( columnTypes: Seq[DataType], serdeProps: Seq[(String, String)]): AbstractSerDe = { - val serde: AbstractSerDe = { - val trimed_class = serdeClassName.split("'")(1) - Utils.classForName(trimed_class) - .newInstance.asInstanceOf[AbstractSerDe] - } + val serde = Utils.classForName(serdeClassName).newInstance.asInstanceOf[AbstractSerDe] val columnTypesNames = columnTypes.map(_.toTypeInfo.getTypeName()).mkString(",") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index 77b88b5581c59..0875232aede3e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -44,8 +44,8 @@ class ScriptTransformationSuite extends SparkPlanTest { ) private val serdeIOSchema = noSerdeIOSchema.copy( - inputSerdeClass = Some(s"'${classOf[LazySimpleSerDe].getCanonicalName}"), - outputSerdeClass = Some(s"'${classOf[LazySimpleSerDe].getCanonicalName}") + inputSerdeClass = Some(classOf[LazySimpleSerDe].getCanonicalName), + outputSerdeClass = Some(classOf[LazySimpleSerDe].getCanonicalName) ) test("cat without SerDe") { From 983f200b73b9c04432c3cf086586e5bcaef0658f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 28 Jul 2015 08:53:27 -0700 Subject: [PATCH 11/12] Use unescapeSQLString instead of stripQuotes --- .../src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 60c05954502f8..f911e3ef45187 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -882,7 +882,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C (rowFormat, None, Nil) case Token("TOK_SERDENAME", Token(serdeClass, Nil) :: Nil) :: Nil => - (Nil, Some(PlanUtils.stripQuotes(serdeClass)), Nil) + (Nil, Some(BaseSemanticAnalyzer.unescapeSQLString(serdeClass)), Nil) case Token("TOK_SERDENAME", Token(serdeClass, Nil) :: Token("TOK_TABLEPROPERTIES", @@ -891,7 +891,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case Token("TOK_TABLEPROPERTY", Token(name, Nil) :: Token(value, Nil) :: Nil) => (name, value) } - (Nil, Some(PlanUtils.stripQuotes(serdeClass)), serdeProps) + (Nil, Some(BaseSemanticAnalyzer.unescapeSQLString(serdeClass)), serdeProps) case Nil => (Nil, None, Nil) } From 16c44e253998c9a0ced7fcde6a16ed7fea17392f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 28 Jul 2015 13:40:35 -0700 Subject: [PATCH 12/12] Update some comments --- .../hive/execution/ScriptTransformation.scala | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 98089fe69ea25..741c705e2a253 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -62,22 +62,20 @@ case class ScriptTransformation( def processIterator(inputIterator: Iterator[InternalRow]): Iterator[InternalRow] = { val cmd = List("/bin/bash", "-c", script) val builder = new ProcessBuilder(cmd) - // We need to start threads connected to the process pipeline: - // 1) The error msg generated by the script process would be hidden. - // 2) If the error msg is too big to chock up the buffer, the input logic would be hung + val proc = builder.start() val inputStream = proc.getInputStream val outputStream = proc.getOutputStream val errorStream = proc.getErrorStream - val reader = new BufferedReader(new InputStreamReader(inputStream)) - // TODO make the 2048 configurable? + // In order to avoid deadlocks, we need to consume the error output of the child process. + // To avoid issues caused by large error output, we use a circular buffer to limit the amount + // of error output that we retain. See SPARK-7862 for more discussion of the deadlock / hang + // that motivates this. val stderrBuffer = new CircularBuffer(2048) - - // Consume the error stream from the pipeline, otherwise it will be blocked if - // the pipeline is full. - new RedirectThread(errorStream, // input stream from the pipeline - stderrBuffer, // output to a circular buffer + new RedirectThread( + errorStream, + stderrBuffer, "Thread-ScriptTransformation-STDERR-Consumer").start() val outputProjection = new InterpretedProjection(input, child.output) @@ -86,10 +84,8 @@ case class ScriptTransformation( // inside of a loop @Nullable val (inputSerde, inputSoi) = ioschema.initInputSerDe(input).getOrElse((null, null)) - // Put the write(output to the pipeline) into a single thread - // and keep the collector as remain in the main thread. - // otherwise it will causes deadlock if the data size greater than - // the pipeline / buffer capacity. + // This new thread will consume the ScriptTransformation's input rows and write them to the + // external process. That process's output will be read by this current thread. val writerThread = new ScriptTransformationWriterThread( inputIterator, outputProjection, @@ -108,6 +104,7 @@ case class ScriptTransformation( ioschema.initOutputSerDe(output).getOrElse((null, null)) } + val reader = new BufferedReader(new InputStreamReader(inputStream)) val outputIterator: Iterator[InternalRow] = new Iterator[InternalRow] with HiveInspectors { var cacheRow: InternalRow = null var curLine: String = null