From e4c7e027e9216bad42ac85916de05e26163452ea Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Wed, 13 Jul 2016 03:29:48 -0700 Subject: [PATCH 1/4] set default record reader and writer in script transformation --- .../spark/sql/execution/SparkSqlParser.scala | 11 +++++++-- sql/hive/src/test/resources/test_script.sh | 23 +++++++++++++++++++ .../sql/hive/execution/SQLQuerySuite.scala | 11 +++++++++ 3 files changed, 43 insertions(+), 2 deletions(-) create mode 100755 sql/hive/src/test/resources/test_script.sh diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 066ff57721a3..29772395130c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1329,7 +1329,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { // SPARK-10310: Special cases LazySimpleSerDe val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { - Try(conf.getConfString(configKey)).toOption + defaultRecordHandler(configKey) } else { None } @@ -1340,10 +1340,17 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val name = conf.getConfString("hive.script.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") val props = Seq("field.delim" -> "\t") - val recordHandler = Try(conf.getConfString(configKey)).toOption + val recordHandler = defaultRecordHandler(configKey) (Nil, Option(name), props, recordHandler) } + def defaultRecordHandler(configKey: String): Option[String] = { + Try(conf.getConfString(configKey)).orElse(Try(configKey match { + case "hive.script.recordreader" => "org.apache.hadoop.hive.ql.exec.TextRecordReader" + case "hive.script.recordwriter" => "org.apache.hadoop.hive.ql.exec.TextRecordWriter" + })).toOption + } + val (inFormat, inSerdeClass, inSerdeProps, reader) = format(inRowFormat, "hive.script.recordreader") diff --git a/sql/hive/src/test/resources/test_script.sh b/sql/hive/src/test/resources/test_script.sh new file mode 100755 index 000000000000..ab998c41b22c --- /dev/null +++ b/sql/hive/src/test/resources/test_script.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +while read line +do + echo "$line" | sed 's/\t/_/' +done < /dev/stdin diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 9c1f21825315..bdae9999b300 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -63,6 +63,17 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { import hiveContext._ import spark.implicits._ + test("script") { + val df = Seq(("x1", "y1", "z1"), ("x2", "y2", "z2")).toDF("c1", "c2", "c3") + df.createOrReplaceTempView("script_table") + val query1 = sql( + """ + |SELECT col1 FROM (from(SELECT c1, c2, c3 FROM script_table) tempt_table + |REDUCE c1, c2, c3 USING 'bash src/test/resources/test_script.sh' AS + |(col1 STRING, col2 STRING)) script_test_table""".stripMargin) + checkAnswer(query1, Row("x1_y1") :: Row("x2_y2") :: Nil) + } + test("UDTF") { withUserDefinedFunction("udtf_count2" -> true) { sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}") From 8048732c56bd43dd0ca175a0a047a495ab06fcea Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Mon, 18 Jul 2016 15:45:28 +0800 Subject: [PATCH 2/4] address comment --- .../spark/sql/execution/SparkSqlParser.scala | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 29772395130c..20ba1e2acba9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1306,7 +1306,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { // Decode and input/output format. type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) - def format(fmt: RowFormatContext, configKey: String): Format = fmt match { + def format(fmt: RowFormatContext, configKey: String, configValue: String): Format = fmt match { case c: RowFormatDelimitedContext => // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema // expects a seq of pairs in which the old parsers' token names are used as keys. @@ -1329,6 +1329,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { // SPARK-10310: Special cases LazySimpleSerDe val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { + Try(conf.getConfString(configKey, configValue)).toOption defaultRecordHandler(configKey) } else { None @@ -1344,18 +1345,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { (Nil, Option(name), props, recordHandler) } - def defaultRecordHandler(configKey: String): Option[String] = { - Try(conf.getConfString(configKey)).orElse(Try(configKey match { - case "hive.script.recordreader" => "org.apache.hadoop.hive.ql.exec.TextRecordReader" - case "hive.script.recordwriter" => "org.apache.hadoop.hive.ql.exec.TextRecordWriter" - })).toOption - } - val (inFormat, inSerdeClass, inSerdeProps, reader) = - format(inRowFormat, "hive.script.recordreader") + format( + inRowFormat, "hive.script.recordreader", "org.apache.hadoop.hive.ql.exec.TextRecordReader") val (outFormat, outSerdeClass, outSerdeProps, writer) = - format(outRowFormat, "hive.script.recordwriter") + format( + outRowFormat, "hive.script.recordwriter", + "org.apache.hadoop.hive.ql.exec.TextRecordWriter") ScriptInputOutputSchema( inFormat, outFormat, From 814f67926038ef136efa12b8e41d8c252411b9c9 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Mon, 18 Jul 2016 15:45:56 +0800 Subject: [PATCH 3/4] Update SparkSqlParser.scala --- .../scala/org/apache/spark/sql/execution/SparkSqlParser.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 20ba1e2acba9..51344d8484c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1330,7 +1330,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { // SPARK-10310: Special cases LazySimpleSerDe val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { Try(conf.getConfString(configKey, configValue)).toOption - defaultRecordHandler(configKey) } else { None } From 0edfed48a1e6a438a18d488404b900a51475a2d5 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Mon, 18 Jul 2016 15:46:56 +0800 Subject: [PATCH 4/4] Update SparkSqlParser.scala --- .../scala/org/apache/spark/sql/execution/SparkSqlParser.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 51344d8484c2..c231c83bcfd9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1340,7 +1340,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val name = conf.getConfString("hive.script.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") val props = Seq("field.delim" -> "\t") - val recordHandler = defaultRecordHandler(configKey) + val recordHandler = Try(conf.getConfString(configKey, configValue)).toOption (Nil, Option(name), props, recordHandler) }