Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ object ParserUtils {
node.getText.slice(1, node.getText.size - 1)
}

/** Collect the entries if any. */
def entry(key: String, value: Token): Seq[(String, String)] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. This update looks okay.

Option(value).toSeq.map(x => key -> string(x))
}

/** Get the origin (line and position) of the token. */
def position(token: Token): Origin = {
val opt = Option(token)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,10 +654,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
*/
override def visitRowFormatDelimited(
ctx: RowFormatDelimitedContext): CatalogStorageFormat = withOrigin(ctx) {
// Collect the entries if any.
def entry(key: String, value: Token): Seq[(String, String)] = {
Option(value).toSeq.map(x => key -> string(x))
}
// TODO we need proper support for the NULL format.
val entries =
entry("field.delim", ctx.fieldsTerminatedBy) ++
Expand Down Expand Up @@ -756,9 +752,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
// expects a seq of pairs in which the old parsers' token names are used as keys.
// Transforming the result of visitRowFormatDelimited would be quite a bit messier than
// retrieving the key value pairs ourselves.
def entry(key: String, value: Token): Seq[(String, String)] = {
Option(value).map(t => key -> t.getText).toSeq
}
val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++
entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++
entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU

def isHive23OrSpark: Boolean

// In Hive 1.2, the string representation of a decimal omits trailing zeroes.
// But in Hive 2.3, it is always padded to 18 digits with trailing zeroes if necessary.
val decimalToString: Column => Column = if (isHive23OrSpark) {
c => c.cast("string")
} else {
c => c.cast("decimal(1, 0)").cast("string")
}

def createScriptTransformationExec(
input: Seq[Expression],
script: String,
Expand Down Expand Up @@ -130,13 +138,6 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
|FROM v
""".stripMargin)

// In Hive 1.2, the string representation of a decimal omits trailing zeroes.
// But in Hive 2.3, it is always padded to 18 digits with trailing zeroes if necessary.
val decimalToString: Column => Column = if (isHive23OrSpark) {
c => c.cast("string")
} else {
c => c.cast("decimal(1, 0)").cast("string")
}
checkAnswer(query, identity, df.select(
'a.cast("string"),
'b.cast("string"),
Expand Down Expand Up @@ -311,6 +312,66 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
}
}
}

test("SPARK-32608: Script Transform ROW FORMAT DELIMIT value should format value") {
withTempView("v") {
val df = Seq(
(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)),
(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)),
(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3))
).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18)
df.createTempView("v")

// input/output with same delimit
checkAnswer(
sql(
s"""
|SELECT TRANSFORM(a, b, c, d, e)
| ROW FORMAT DELIMITED
| FIELDS TERMINATED BY ','
| COLLECTION ITEMS TERMINATED BY '#'
| MAP KEYS TERMINATED BY '@'
| LINES TERMINATED BY '\n'
| NULL DEFINED AS 'null'
| USING 'cat' AS (a, b, c, d, e)
| ROW FORMAT DELIMITED
| FIELDS TERMINATED BY ','
| COLLECTION ITEMS TERMINATED BY '#'
| MAP KEYS TERMINATED BY '@'
| LINES TERMINATED BY '\n'
| NULL DEFINED AS 'NULL'
|FROM v
""".stripMargin), identity, df.select(
'a.cast("string"),
'b.cast("string"),
'c.cast("string"),
decimalToString('d),
'e.cast("string")).collect())

// input/output with different delimit and show result
checkAnswer(
sql(
s"""
|SELECT TRANSFORM(a, b, c, d, e)
| ROW FORMAT DELIMITED
| FIELDS TERMINATED BY ','
| LINES TERMINATED BY '\n'
| NULL DEFINED AS 'null'
| USING 'cat' AS (value)
| ROW FORMAT DELIMITED
| FIELDS TERMINATED BY '&'
| LINES TERMINATED BY '\n'
| NULL DEFINED AS 'NULL'
|FROM v
""".stripMargin), identity, df.select(
concat_ws(",",
'a.cast("string"),
'b.cast("string"),
'c.cast("string"),
decimalToString('d),
'e.cast("string"))).collect())
}
}
}

case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAlias, UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Concat, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, RepartitionByExpression, Sort}
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Concat, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, RefreshResource}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, StaticSQLConf}
Expand All @@ -38,6 +38,7 @@ import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType
* defined in the Catalyst module.
*/
class SparkSqlParserSuite extends AnalysisTest {
import org.apache.spark.sql.catalyst.dsl.expressions._
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you put this import here instead of the top?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you put this import here instead of the top?

Copy from PlanParserSuite.....
Should I move this line to top in PlanParserSuite in pr #29414

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, I see. Its okay as it is.


val newConf = new SQLConf
private lazy val parser = new SparkSqlParser(newConf)
Expand Down Expand Up @@ -330,4 +331,44 @@ class SparkSqlParserSuite extends AnalysisTest {
assertEqual("ADD FILE /path with space/abc.txt", AddFileCommand("/path with space/abc.txt"))
assertEqual("ADD JAR /path with space/abc.jar", AddJarCommand("/path with space/abc.jar"))
}

test("SPARK-32608: script transform with row format delimit") {
assertEqual(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add end-2-end tests, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add end-2-end tests, too?

Added in BasicScriptTransformationExecSuite

"""
|SELECT TRANSFORM(a, b, c)
| ROW FORMAT DELIMITED
| FIELDS TERMINATED BY ','
| COLLECTION ITEMS TERMINATED BY '#'
| MAP KEYS TERMINATED BY '@'
| LINES TERMINATED BY '\n'
| NULL DEFINED AS 'null'
| USING 'cat' AS (a, b, c)
| ROW FORMAT DELIMITED
| FIELDS TERMINATED BY ','
| COLLECTION ITEMS TERMINATED BY '#'
| MAP KEYS TERMINATED BY '@'
| LINES TERMINATED BY '\n'
| NULL DEFINED AS 'NULL'
|FROM testData
""".stripMargin,
ScriptTransformation(
Seq('a, 'b, 'c),
"cat",
Seq(AttributeReference("a", StringType)(),
AttributeReference("b", StringType)(),
AttributeReference("c", StringType)()),
UnresolvedRelation(TableIdentifier("testData")),
ScriptInputOutputSchema(
Seq(("TOK_TABLEROWFORMATFIELD", ","),
("TOK_TABLEROWFORMATCOLLITEMS", "#"),
("TOK_TABLEROWFORMATMAPKEYS", "@"),
("TOK_TABLEROWFORMATLINES", "\n"),
("TOK_TABLEROWFORMATNULL", "null")),
Seq(("TOK_TABLEROWFORMATFIELD", ","),
("TOK_TABLEROWFORMATCOLLITEMS", "#"),
("TOK_TABLEROWFORMATMAPKEYS", "@"),
("TOK_TABLEROWFORMATLINES", "\n"),
("TOK_TABLEROWFORMATNULL", "NULL")), None, None,
List.empty, List.empty, None, None, false)))
}
}