From 344c2ab62095f91e161814230082f0d30c257365 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 9 Sep 2018 15:07:34 +0200 Subject: [PATCH 01/44] Initial implementation of CsvToStruct --- sql/catalyst/pom.xml | 6 + .../spark/sql/catalyst}/csv/CSVOptions.scala | 2 +- .../spark/sql/catalyst/csv/CSVUtils.scala | 57 +++++++++ .../sql/catalyst}/csv/UnivocityParser.scala | 7 +- .../sql/catalyst/expressions/ExprUtils.scala | 45 ++++++++ .../catalyst/expressions/csvExpressions.scala | 109 ++++++++++++++++++ .../expressions/jsonExpressions.scala | 18 +-- .../catalyst/util}/FailureSafeParser.scala | 5 +- .../sql/catalyst}/csv/CSVUtilsSuite.scala | 2 +- .../apache/spark/sql/DataFrameReader.scala | 6 +- .../datasources/csv/CSVDataSource.scala | 4 +- .../datasources/csv/CSVFileFormat.scala | 1 + .../datasources/csv/CSVInferSchema.scala | 1 + .../execution/datasources/csv/CSVUtils.scala | 12 +- .../datasources/csv/UnivocityGenerator.scala | 1 + .../datasources/json/JsonDataSource.scala | 3 +- .../datasources/csv/CSVInferSchemaSuite.scala | 2 + .../csv/UnivocityParserSuite.scala | 2 + 18 files changed, 243 insertions(+), 40 deletions(-) rename sql/{core/src/main/scala/org/apache/spark/sql/execution/datasources => catalyst/src/main/scala/org/apache/spark/sql/catalyst}/csv/CSVOptions.scala (99%) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala rename sql/{core/src/main/scala/org/apache/spark/sql/execution/datasources => catalyst/src/main/scala/org/apache/spark/sql/catalyst}/csv/UnivocityParser.scala (98%) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala rename sql/{core/src/main/scala/org/apache/spark/sql/execution/datasources => catalyst/src/main/scala/org/apache/spark/sql/catalyst/util}/FailureSafeParser.scala (95%) rename sql/{core/src/test/scala/org/apache/spark/sql/execution/datasources => catalyst/src/test/scala/org/apache/spark/sql/catalyst}/csv/CSVUtilsSuite.scala (96%) diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 7d23637e28342..d6aa5c4ab19b3 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -103,6 +103,12 @@ commons-codec commons-codec + + com.univocity + univocity-parsers + 2.7.3 + jar + target/scala-${scala.binary.version}/classes diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala similarity index 99% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index fab8d62da0c1d..dfb5e8ab6106b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources.csv +package org.apache.spark.sql.catalyst.csv import java.nio.charset.StandardCharsets import java.util.{Locale, TimeZone} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala new file mode 100644 index 0000000000000..adbe7c402d515 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.csv + +object CSVUtils { + /** + * Filter ignorable rows for CSV iterator (lines empty and starting with `comment`). + * This is currently being used in CSV reading path and CSV schema inference. + */ + def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = { + iter.filter { line => + line.trim.nonEmpty && !line.startsWith(options.comment.toString) + } + } + + /** + * Helper method that converts string representation of a character to actual character. + * It handles some Java escaped strings and throws exception if given string is longer than one + * character. + */ + @throws[IllegalArgumentException] + def toChar(str: String): Char = { + if (str.charAt(0) == '\\') { + str.charAt(1) + match { + case 't' => '\t' + case 'r' => '\r' + case 'b' => '\b' + case 'f' => '\f' + case '\"' => '\"' // In case user changes quote char and uses \" as delimiter in options + case '\'' => '\'' + case 'u' if str == """\u0000""" => '\u0000' + case _ => + throw new IllegalArgumentException(s"Unsupported special character for delimiter: $str") + } + } else if (str.length == 1) { + str.charAt(0) + } else { + throw new IllegalArgumentException(s"Delimiter cannot be more than one character: $str") + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala similarity index 98% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index e15af425b2649..b5905c0426e7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources.csv +package org.apache.spark.sql.catalyst.csv import java.io.InputStream import java.math.BigDecimal @@ -28,8 +28,7 @@ import com.univocity.parsers.csv.CsvParser import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils} -import org.apache.spark.sql.execution.datasources.FailureSafeParser +import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils, FailureSafeParser} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -259,7 +258,7 @@ class UnivocityParser( } } -private[csv] object UnivocityParser { +private[sql] object UnivocityParser { /** * Parses a stream that contains CSV strings and turns it into an iterator of tokens. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala new file mode 100644 index 0000000000000..e5708894f22b4 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.util.ArrayBasedMapData +import org.apache.spark.sql.types.{MapType, StringType, StructType} + +object ExprUtils { + + def evalSchemaExpr(exp: Expression): StructType = exp match { + case Literal(s, StringType) => StructType.fromDDL(s.toString) + case e => throw new AnalysisException( + s"Schema should be specified in DDL format as a string literal instead of ${e.sql}") + } + + def convertToMapData(exp: Expression): Map[String, String] = exp match { + case m: CreateMap + if m.dataType.acceptsType(MapType(StringType, StringType, valueContainsNull = false)) => + val arrayMap = m.eval().asInstanceOf[ArrayBasedMapData] + ArrayBasedMapData.toScalaMap(arrayMap).map { case (key, value) => + key.toString -> value.toString + } + case m: CreateMap => + throw new AnalysisException( + s"A type of keys and values in map() must be string, but got ${m.dataType.catalogString}") + case _ => + throw new AnalysisException("Must use a map() function for options") + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala new file mode 100644 index 0000000000000..008f4a4147ed5 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ + Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class CsvToStructs( + schema: StructType, + options: Map[String, String], + child: Expression, + timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { + + override def nullable: Boolean = true + + // The CSV input data might be missing certain fields. We force the nullability + // of the user-provided schema to avoid data corruptions. + val nullableSchema = schema.asNullable + + // Used in `FunctionRegistry` + def this(child: Expression, schema: Expression, options: Map[String, String]) = + this( + schema = ExprUtils.evalSchemaExpr(schema), + options = options, + child = child, + timeZoneId = None) + + def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String]) + + def this(child: Expression, schema: Expression, options: Expression) = + this( + schema = ExprUtils.evalSchemaExpr(schema), + options = ExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + + // This converts parsed rows to the desired output by the given schema. + @transient + lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null + + @transient lazy val parser = { + val parsedOptions = new CSVOptions(options, true, timeZoneId.get) + val mode = parsedOptions.parseMode + if (mode != PermissiveMode && mode != FailFastMode) { + throw new AnalysisException(s"from_csv() doesn't support the ${mode.name} mode. " + + s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.") + } + val actualSchema = + StructType(nullableSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) + val rawParser = new UnivocityParser(actualSchema, actualSchema, parsedOptions) + new FailureSafeParser[String]( + input => Seq(rawParser.parse(input)), + mode, + nullableSchema, + parsedOptions.columnNameOfCorruptRecord, + parsedOptions.multiLine) + } + + override def dataType: DataType = nullableSchema + + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = + copy(timeZoneId = Option(timeZoneId)) + + override def nullSafeEval(input: Any): Any = { + val csv = input.asInstanceOf[UTF8String].toString + if (csv.trim.isEmpty) return null + try { + converter(parser.parse(csv)) + } catch { + case _: BadRecordException => null + } + } + + override def inputTypes: Seq[AbstractDataType] = StringType :: Nil +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index bd9090a07471b..2d8e16f5c8de1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -539,7 +539,7 @@ case class JsonToStructs( def this(child: Expression, schema: Expression, options: Expression) = this( schema = JsonExprUtils.evalSchemaExpr(schema), - options = JsonExprUtils.convertToMapData(options), + options = ExprUtils.convertToMapData(options), child = child, timeZoneId = None) @@ -650,7 +650,7 @@ case class StructsToJson( def this(child: Expression) = this(Map.empty, child, None) def this(child: Expression, options: Expression) = this( - options = JsonExprUtils.convertToMapData(options), + options = ExprUtils.convertToMapData(options), child = child, timeZoneId = None) @@ -771,18 +771,4 @@ object JsonExprUtils { "Schema should be specified in DDL format as a string literal" + s" or output of the schema_of_json function instead of ${e.sql}") } - - def convertToMapData(exp: Expression): Map[String, String] = exp match { - case m: CreateMap - if m.dataType.acceptsType(MapType(StringType, StringType, valueContainsNull = false)) => - val arrayMap = m.eval().asInstanceOf[ArrayBasedMapData] - ArrayBasedMapData.toScalaMap(arrayMap).map { case (key, value) => - key.toString -> value.toString - } - case m: CreateMap => - throw new AnalysisException( - s"A type of keys and values in map() must be string, but got ${m.dataType.catalogString}") - case _ => - throw new AnalysisException("Must use a map() function for options") - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala similarity index 95% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index 90e81661bae7a..2751ca32ba65f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -15,13 +15,12 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources +package org.apache.spark.sql.catalyst.util import org.apache.spark.SparkException + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVUtilsSuite.scala similarity index 96% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtilsSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVUtilsSuite.scala index 221e44ce2cff6..3217df9aed335 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVUtilsSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources.csv +package org.apache.spark.sql.catalyst.csv import org.apache.spark.SparkFunSuite diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 0cfcc45fb3d31..e61f2de414e88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -23,15 +23,17 @@ import scala.collection.JavaConverters._ import com.fasterxml.jackson.databind.ObjectMapper import com.univocity.parsers.csv.CsvParser - import org.apache.spark.Partition + import org.apache.spark.annotation.InterfaceStability import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.csv.{CSVOptions, UnivocityParser} import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} +import org.apache.spark.sql.catalyst.util.FailureSafeParser import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser} +import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.csv._ import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 2b86054c0ffcb..f4a9630d75b80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -34,6 +34,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.{BinaryFileRDD, RDD} import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv.{CSVOptions, UnivocityParser} +import org.apache.spark.sql.catalyst.csv.CSVUtils.filterCommentAndEmpty import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat import org.apache.spark.sql.types.StructType @@ -247,7 +249,7 @@ object TextInputCSVDataSource extends CSVDataSource { val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions) val sampled: Dataset[String] = CSVUtils.sample(csv, parsedOptions) val tokenRDD = sampled.rdd.mapPartitions { iter => - val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions) + val filteredLines = filterCommentAndEmpty(iter, parsedOptions) val linesWithoutHeader = CSVUtils.filterHeaderLine(filteredLines, firstLine, parsedOptions) val parser = new CsvParser(parsedOptions.asParserSettings) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 9aad0bd55e736..7310196d0d17f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv.{CSVOptions, UnivocityParser} import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index a585cbed2551b..ba6c0dde7ee3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -23,6 +23,7 @@ import scala.util.control.Exception._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion +import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala index 7ce65fa89b02d..786e5123899fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.execution.datasources.csv import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset +import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types._ object CSVUtils { /** @@ -40,16 +40,6 @@ object CSVUtils { } } - /** - * Filter ignorable rows for CSV iterator (lines empty and starting with `comment`). - * This is currently being used in CSV reading path and CSV schema inference. - */ - def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = { - iter.filter { line => - line.trim.nonEmpty && !line.startsWith(options.comment.toString) - } - } - /** * Skip the given first line so that only data can remain in a dataset. * This is similar with `dropHeaderLine` below and currently being used in CSV schema inference. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala index 4082a0df8ba75..37d9d9abc8680 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala @@ -22,6 +22,7 @@ import java.io.Writer import com.univocity.parsers.csv.CsvWriter import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 76f58371ae264..f7e8dee17656b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -27,13 +27,14 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat - import org.apache.spark.TaskContext + import org.apache.spark.input.{PortableDataStream, StreamInputFormat} import org.apache.spark.rdd.{BinaryFileRDD, RDD} import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JsonInferSchema, JSONOptions} +import org.apache.spark.sql.catalyst.util.FailureSafeParser import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala index 57e36e082653c..f6067cf8e9279 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.datasources.csv import org.apache.spark.SparkFunSuite + +import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.types._ class CSVInferSchemaSuite extends SparkFunSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala index 458edb253fb33..44bf5643fb9d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala @@ -21,6 +21,8 @@ import java.math.BigDecimal import java.util.Locale import org.apache.spark.SparkFunSuite + +import org.apache.spark.sql.catalyst.csv.{CSVOptions, UnivocityParser} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String From 5905019dd3fbe9dbba20505bf9df2b8ca62b4849 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 9 Sep 2018 20:22:32 +0200 Subject: [PATCH 02/44] Added CSV Expression Test Suite --- .../expressions/CsvExpressionsSuite.scala | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala new file mode 100644 index 0000000000000..a2b6aa216eeaf --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.Calendar + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.plans.PlanTestBase +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with PlanTestBase { + val badCsv = "\u0000\u0000\u0000A\u0001AAA" + + val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID) + + test("from_csv") { + val csvData = "1" + val schema = StructType(StructField("a", IntegerType) :: Nil) + checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData), gmtId), + InternalRow(1) + ) + } + + test("from_csv - invalid data") { + val csvData = "---" + val schema = StructType(StructField("a", DoubleType) :: Nil) + checkEvaluation( + CsvToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(csvData), gmtId), + InternalRow(null)) + + // Default mode is Permissive + checkEvaluation(CsvToStructs(schema, Map.empty, Literal(csvData), gmtId), InternalRow(null)) + } + + test("from_csv null input column") { + val schema = StructType(StructField("a", IntegerType) :: Nil) + checkEvaluation( + CsvToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId), + null + ) + } + + ignore("from_csv bad UTF-8") { + val schema = StructType(StructField("a", IntegerType) :: Nil) + checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(badCsv), gmtId), + null) + } + + test("from_csv with timestamp") { + val schema = StructType(StructField("t", TimestampType) :: Nil) + + val csvData1 = "2016-01-01T00:00:00.123Z" + var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT) + c.set(2016, 0, 1, 0, 0, 0) + c.set(Calendar.MILLISECOND, 123) + checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData1), gmtId), + InternalRow(c.getTimeInMillis * 1000L) + ) + // The result doesn't change because the CSV string includes timezone string ("Z" here), + // which means the string represents the timestamp string in the timezone regardless of + // the timeZoneId parameter. + checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData1), Option("PST")), + InternalRow(c.getTimeInMillis * 1000L) + ) + + val csvData2 = "2016-01-01T00:00:00" + for (tz <- DateTimeTestUtils.ALL_TIMEZONES) { + c = Calendar.getInstance(tz) + c.set(2016, 0, 1, 0, 0, 0) + c.set(Calendar.MILLISECOND, 0) + checkEvaluation( + CsvToStructs( + schema, + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"), + Literal(csvData2), + Option(tz.getID)), + InternalRow(c.getTimeInMillis * 1000L) + ) + checkEvaluation( + CsvToStructs( + schema, + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", + DateTimeUtils.TIMEZONE_OPTION -> tz.getID), + Literal(csvData2), + gmtId), + InternalRow(c.getTimeInMillis * 1000L) + ) + } + } + + test("from_csv empty input column") { + val schema = StructType(StructField("a", IntegerType) :: Nil) + checkEvaluation( + CsvToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId), + null + ) + } + + test("from_csv missing fields") { + val input = """1,,"foo"""" + val csvSchema = new StructType() + .add("a", LongType, nullable = false) + .add("b", StringType, nullable = false) + .add("c", StringType, nullable = false) + val output = InternalRow(1L, null, UTF8String.fromString("foo")) + val expr = CsvToStructs(csvSchema, Map.empty, Literal.create(input, StringType), gmtId) + checkEvaluation(expr, output) + val schema = expr.dataType + val schemaToCompare = csvSchema.asNullable + assert(schemaToCompare == schema) + } +} From c5ac43273e70199467c28aaa40591042eac096aa Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 9 Sep 2018 20:42:49 +0200 Subject: [PATCH 03/44] Register from_csv functions and add tests --- .../catalyst/analysis/FunctionRegistry.scala | 5 +- .../org/apache/spark/sql/functions.scala | 16 +++++ .../apache/spark/sql/CsvFunctionsSuite.scala | 72 +++++++++++++++++++ 3 files changed, 92 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 77860e1584f42..fa6baeb48aec3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -528,7 +528,10 @@ object FunctionRegistry { castAlias("date", DateType), castAlias("timestamp", TimestampType), castAlias("binary", BinaryType), - castAlias("string", StringType) + castAlias("string", StringType), + + // csv + expression[CsvToStructs]("from_csv") ) val builtin: SimpleFunctionRegistry = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 10b67d7a1ca54..410c7df0b9523 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3811,6 +3811,22 @@ object functions { @scala.annotation.varargs def map_concat(cols: Column*): Column = withExpr { MapConcat(cols.map(_.expr)) } + /** + * Parses a column containing a CSV string into a `StringType` with the specified schema. + * Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the CSV string + * @param options options to control how the CSV is parsed. accepts the same options and the + * CSV data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr { + CsvToStructs(schema, options, e.expr) + } + // scalastyle:off line.size.limit // scalastyle:off parameter.number diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala new file mode 100644 index 0000000000000..6e9921d0a9bf4 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ + +class CsvFunctionsSuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + test("from_csv") { + val df = Seq("1").toDS() + val schema = new StructType().add("a", IntegerType) + + checkAnswer( + df.select(from_csv($"value", schema, Map.empty)), + Row(Row(1)) :: Nil) + } + + test("from_csv with option") { + val df = Seq("26/08/2015 18:00").toDS() + val schema = new StructType().add("time", TimestampType) + val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm") + + checkAnswer( + df.select(from_csv($"value", schema, options)), + Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))) + } + + test("from_csv missing columns") { + val df = Seq("1").toDS() + val schema = new StructType() + .add("a", IntegerType) + .add("b", IntegerType) + + checkAnswer( + df.select(from_csv($"value", schema, Map.empty)), + Row(Row(1, null)) :: Nil) + } + + test("from_csv invalid CSV") { + val df = Seq("???").toDS() + val schema = new StructType().add("a", IntegerType) + + checkAnswer( + df.select(from_csv($"value", schema, Map.empty)), + Row(Row(null)) :: Nil) + } + + test("Support from_csv in SQL") { + val df1 = Seq("1").toDS() + checkAnswer( + df1.selectExpr("from_csv(value, 'a INT')"), + Row(Row(1)) :: Nil) + } +} From bd2124cfcf00b8e049154dad9903b22b176347bb Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 9 Sep 2018 21:07:00 +0200 Subject: [PATCH 04/44] Fix imports --- .../org/apache/spark/sql/catalyst/util/FailureSafeParser.scala | 1 - .../src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 2 +- .../spark/sql/execution/datasources/json/JsonDataSource.scala | 2 +- .../sql/execution/datasources/csv/CSVInferSchemaSuite.scala | 1 - .../sql/execution/datasources/csv/UnivocityParserSuite.scala | 2 -- 5 files changed, 2 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index 2751ca32ba65f..fecfff5789a5c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.util import org.apache.spark.SparkException - import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.types.StructType diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index e61f2de414e88..619448e7812cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -23,8 +23,8 @@ import scala.collection.JavaConverters._ import com.fasterxml.jackson.databind.ObjectMapper import com.univocity.parsers.csv.CsvParser -import org.apache.spark.Partition +import org.apache.spark.Partition import org.apache.spark.annotation.InterfaceStability import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index f7e8dee17656b..c7608e2e881ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -27,8 +27,8 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import org.apache.spark.TaskContext +import org.apache.spark.TaskContext import org.apache.spark.input.{PortableDataStream, StreamInputFormat} import org.apache.spark.rdd.{BinaryFileRDD, RDD} import org.apache.spark.sql.{Dataset, Encoders, SparkSession} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala index f6067cf8e9279..6b64f2ffa98dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.datasources.csv import org.apache.spark.SparkFunSuite - import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.types._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala index 44bf5643fb9d5..6f231142949d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala @@ -18,10 +18,8 @@ package org.apache.spark.sql.execution.datasources.csv import java.math.BigDecimal -import java.util.Locale import org.apache.spark.SparkFunSuite - import org.apache.spark.sql.catalyst.csv.{CSVOptions, UnivocityParser} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ From b9bb081a3d655dbdece9d8a998853eda68171553 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 9 Sep 2018 21:23:16 +0200 Subject: [PATCH 05/44] Adding SQL tests --- .../sql-tests/inputs/csv-functions.sql | 12 ++ .../sql-tests/results/csv-functions.sql.out | 104 ++++++++++++++++++ 2 files changed, 116 insertions(+) create mode 100644 sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out diff --git a/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql new file mode 100644 index 0000000000000..c77568914cc51 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql @@ -0,0 +1,12 @@ +-- from_csv +describe function from_csv; +describe function extended from_csv; +select from_csv('1', 'a INT'); +select from_csv('1, 3.14', 'a INT, f FLOAT'); +select from_csv('26/08/2015', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')); +-- Check if errors handled +select from_csv('1', 1); +select from_csv('1', 'a InvalidType'); +select from_csv('1', 'a INT', named_struct('mode', 'PERMISSIVE')); +select from_csv('1', 'a INT', map('mode', 1)); +select from_csv(); diff --git a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out new file mode 100644 index 0000000000000..4f74bbebd22ee --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out @@ -0,0 +1,104 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 10 + + +-- !query 0 +describe function from_csv +-- !query 0 schema +struct +-- !query 0 output +Class: org.apache.spark.sql.catalyst.expressions.CsvToStructs +Function: from_csv +Usage: from_csv(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`. + + +-- !query 1 +describe function extended from_csv +-- !query 1 schema +struct +-- !query 1 output +Class: org.apache.spark.sql.catalyst.expressions.CsvToStructs +Extended Usage: + Examples: + > SELECT from_csv('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + + Since: 3.0.0 + +Function: from_csv +Usage: from_csv(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`. + + +-- !query 2 +select from_csv('1', 'a INT') +-- !query 2 schema +struct> +-- !query 2 output +{"a":1} + + +-- !query 3 +select from_csv('1, 3.14', 'a INT, f FLOAT') +-- !query 3 schema +struct> +-- !query 3 output +{"a":1,"f":3.14} + + +-- !query 4 +select from_csv('26/08/2015', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')) +-- !query 4 schema +struct> +-- !query 4 output +{"time":2015-08-26 00:00:00.0} + + +-- !query 5 +select from_csv('1', 1) +-- !query 5 schema +struct<> +-- !query 5 output +org.apache.spark.sql.AnalysisException +Schema should be specified in DDL format as a string literal instead of 1;; line 1 pos 7 + + +-- !query 6 +select from_csv('1', 'a InvalidType') +-- !query 6 schema +struct<> +-- !query 6 output +org.apache.spark.sql.AnalysisException + +DataType invalidtype is not supported.(line 1, pos 2) + +== SQL == +a InvalidType +--^^^ +; line 1 pos 7 + + +-- !query 7 +select from_csv('1', 'a INT', named_struct('mode', 'PERMISSIVE')) +-- !query 7 schema +struct<> +-- !query 7 output +org.apache.spark.sql.AnalysisException +Must use a map() function for options;; line 1 pos 7 + + +-- !query 8 +select from_csv('1', 'a INT', map('mode', 1)) +-- !query 8 schema +struct<> +-- !query 8 output +org.apache.spark.sql.AnalysisException +A type of keys and values in map() must be string, but got map;; line 1 pos 7 + + +-- !query 9 +select from_csv() +-- !query 9 schema +struct<> +-- !query 9 output +org.apache.spark.sql.AnalysisException +Invalid number of arguments for function from_csv. Expected: one of 2 and 3; Found: 0; line 1 pos 7 From 14ae619e2d6f18156fb24f0e391fb7ad11548ddd Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 9 Sep 2018 22:18:33 +0200 Subject: [PATCH 06/44] from_csv for PySpark --- python/pyspark/sql/functions.py | 21 +++++++++++++++++++ .../org/apache/spark/sql/functions.scala | 18 +++++++++++++++- .../apache/spark/sql/CsvFunctionsSuite.scala | 8 ++++--- 3 files changed, 43 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 9396b16b7ada8..29a49e6f03e17 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2634,6 +2634,27 @@ def sequence(start, stop, step=None): return Column(sc._jvm.functions.sequence( _to_java_column(start), _to_java_column(stop), _to_java_column(step))) +@ignore_unicode_prefix +@since(3.0) +def from_csv(col, schema, options={}): + """ + Parses a column containing a CSV string into a :class:`StructType` + with the specified schema. Returns `null`, in the case of an unparseable string. + + :param col: string column in CSV format + :param schema: a string with schema in DDL format to use when parsing the CSV column. + :param options: options to control parsing. accepts the same options as the CSV datasource + + >>> from pyspark.sql.types import * + >>> data = [(1, '1')] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(from_csv(df.value, "a INT").alias("csv")).collect() + [Row(csv=Row(a=1))] + """ + + sc = SparkContext._active_spark_context + jc = sc._jvm.functions.from_csv(_to_java_column(col), schema, options) + return Column(jc) # ---------------------------- User Defined Function ---------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 410c7df0b9523..c40f0ebf32cf8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3812,7 +3812,7 @@ object functions { def map_concat(cols: Column*): Column = withExpr { MapConcat(cols.map(_.expr)) } /** - * Parses a column containing a CSV string into a `StringType` with the specified schema. + * Parses a column containing a CSV string into a `StructType` with the specified schema. * Returns `null`, in the case of an unparseable string. * * @param e a string column containing CSV data. @@ -3827,6 +3827,22 @@ object functions { CsvToStructs(schema, options, e.expr) } + /** + * (Java-specific) Parses a column containing a CSV string into a `StructType` + * with the specified schema. Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the CSV string + * @param options options to control how the CSV is parsed. accepts the same options and the + * CSV data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def from_csv(e: Column, schema: String, options: java.util.Map[String, String]): Column = { + withExpr(new CsvToStructs(e.expr, lit(schema).expr, options.asScala.toMap)) + } + // scalastyle:off line.size.limit // scalastyle:off parameter.number diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index 6e9921d0a9bf4..1b4bc78632011 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -24,12 +24,14 @@ import org.apache.spark.sql.types._ class CsvFunctionsSuite extends QueryTest with SharedSQLContext { import testImplicits._ + val noOptions = Map[String, String]() + test("from_csv") { val df = Seq("1").toDS() val schema = new StructType().add("a", IntegerType) checkAnswer( - df.select(from_csv($"value", schema, Map.empty)), + df.select(from_csv($"value", schema, noOptions)), Row(Row(1)) :: Nil) } @@ -50,7 +52,7 @@ class CsvFunctionsSuite extends QueryTest with SharedSQLContext { .add("b", IntegerType) checkAnswer( - df.select(from_csv($"value", schema, Map.empty)), + df.select(from_csv($"value", schema, noOptions)), Row(Row(1, null)) :: Nil) } @@ -59,7 +61,7 @@ class CsvFunctionsSuite extends QueryTest with SharedSQLContext { val schema = new StructType().add("a", IntegerType) checkAnswer( - df.select(from_csv($"value", schema, Map.empty)), + df.select(from_csv($"value", schema, noOptions)), Row(Row(null)) :: Nil) } From cfb2ac3844ccc7a374b3c3ec58150264b545269f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 10 Sep 2018 10:47:27 +0200 Subject: [PATCH 07/44] from_csv for SparkR --- R/pkg/NAMESPACE | 1 + R/pkg/R/functions.R | 19 +++++++++++++++++++ R/pkg/R/generics.R | 4 ++++ R/pkg/tests/fulltests/test_sparkSQL.R | 5 +++++ 4 files changed, 29 insertions(+) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 96ff389faf4a0..0dcef6c79ec34 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -275,6 +275,7 @@ exportMethods("%<=>%", "format_number", "format_string", "from_json", + "from_csv", "from_unixtime", "from_utc_timestamp", "getField", diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 572dee50127b8..22cf5289a32fb 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -3720,3 +3720,22 @@ setMethod("current_timestamp", jc <- callJStatic("org.apache.spark.sql.functions", "current_timestamp") column(jc) }) + +#' @details +#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType} +#' with the specified \code{schema}. +#' If the string is unparseable, the Column will contain the value NA. +#' +#' @rdname column_collection_functions +#' @param schema a DDL-formatted string +#' @aliases from_csv from_csv,Column,character-method +#' +#' @note from_csv since 3.0.0 +setMethod("from_csv", signature(x = "Column", schema = "character"), + function(x, schema, ...) { + options <- varargsToStrEnv(...) + jc <- callJStatic("org.apache.spark.sql.functions", + "from_csv", + x@jc, schema, options) + column(jc) + }) \ No newline at end of file diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 27c1b312d645c..9d8ebc8f69366 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -984,6 +984,10 @@ setGeneric("format_string", function(format, x, ...) { standardGeneric("format_s #' @name NULL setGeneric("from_json", function(x, schema, ...) { standardGeneric("from_json") }) +#' @rdname column_collection_functions +#' @name NULL +setGeneric("from_csv", function(x, schema, ...) { standardGeneric("from_csv") }) + #' @rdname column_datetime_functions #' @name NULL setGeneric("from_unixtime", function(x, ...) { standardGeneric("from_unixtime") }) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 0c4bdb31b027b..641fb4f11626e 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1659,6 +1659,11 @@ test_that("column functions", { expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2) expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4) + # Test from_csv() + df <- as.DataFrame(list(list("col" = "1"))) + c <- collect(select(df, alias(from_csv(df$col, "a INT"), "csv"))) + expect_equal(c[[1]][[1]]$a, 1) + # Test to_json(), from_json() df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people") j <- collect(select(df, alias(to_json(df$people), "json"))) From d2bfd9430f05d006accdecb6a62ed659fbd6a2f8 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 10 Sep 2018 18:01:58 +0200 Subject: [PATCH 08/44] Making Python style checker happy --- python/pyspark/sql/functions.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 29a49e6f03e17..1b5066f8b9b9a 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2634,6 +2634,7 @@ def sequence(start, stop, step=None): return Column(sc._jvm.functions.sequence( _to_java_column(start), _to_java_column(stop), _to_java_column(step))) + @ignore_unicode_prefix @since(3.0) def from_csv(col, schema, options={}): @@ -2656,6 +2657,7 @@ def from_csv(col, schema, options={}): jc = sc._jvm.functions.from_csv(_to_java_column(col), schema, options) return Column(jc) + # ---------------------------- User Defined Function ---------------------------------- class PandasUDFType(object): From d19242da0eff30b3149b3fd0e14ccab863f6b698 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 11 Sep 2018 17:02:52 +0200 Subject: [PATCH 09/44] Addressing review comments --- R/pkg/R/functions.R | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 22cf5289a32fb..f46502f0b426d 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -3728,6 +3728,7 @@ setMethod("current_timestamp", #' #' @rdname column_collection_functions #' @param schema a DDL-formatted string +#' @param ... options for CSV parser. Supported the same options as for CSV data source #' @aliases from_csv from_csv,Column,character-method #' #' @note from_csv since 3.0.0 @@ -3738,4 +3739,4 @@ setMethod("from_csv", signature(x = "Column", schema = "character"), "from_csv", x@jc, schema, options) column(jc) - }) \ No newline at end of file + }) From 147c978c19cb3c976f875bf502c50fbd01140251 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 11 Sep 2018 17:07:21 +0200 Subject: [PATCH 10/44] Updating comments --- R/pkg/R/functions.R | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index f46502f0b426d..4108635266804 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -196,10 +196,10 @@ NULL #' \item \code{array_position}: a value to locate in the given array. #' \item \code{array_remove}: a value to remove in the given array. #' } -#' @param ... additional argument(s). In \code{to_json} and \code{from_json}, this contains -#' additional named properties to control how it is converted, accepts the same -#' options as the JSON data source. In \code{arrays_zip}, this contains additional -#' Columns of arrays to be merged. +#' @param ... additional argument(s). In \code{to_json}, \code{from_json} and \code{from_csv}, +#' this contains additional named properties to control how it is converted, accepts +#' the same options as the JSON and CSV data source. In \code{arrays_zip}, +#' this contains additional Columns of arrays to be merged. #' @name column_collection_functions #' @rdname column_collection_functions #' @family collection functions From 42b82274e47541f5f8453000f8645a65ab3bdbff Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 11 Sep 2018 17:11:19 +0200 Subject: [PATCH 11/44] added new line at the end of file --- R/pkg/R/functions.R | 1 + 1 file changed, 1 insertion(+) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 4108635266804..81944b8890ae8 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -3740,3 +3740,4 @@ setMethod("from_csv", signature(x = "Column", schema = "character"), x@jc, schema, options) column(jc) }) + From 2a0b65b7774cfcfeab489795f980ed0e38d225ab Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 11 Sep 2018 22:30:55 +0200 Subject: [PATCH 12/44] Moving from_csv closer to from_json and fixing warnings --- R/pkg/R/functions.R | 41 ++++++++++++++++++++--------------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 81944b8890ae8..b03d17c22f5f1 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -188,6 +188,7 @@ NULL #' \item \code{to_json}: it is the column containing the struct, array of the structs, #' the map or array of maps. #' \item \code{from_json}: it is the column containing the JSON string. +#' \item \code{from_csv}: it is the column containing the CSV string. #' } #' @param y Column to compute on. #' @param value A value to compute on. @@ -2202,6 +2203,25 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType") column(jc) }) +#' @details +#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType} +#' with the specified \code{schema}. +#' If the string is unparseable, the Column will contain the value NA. +#' +#' @rdname column_collection_functions +#' @param schema a DDL-formatted string +#' @aliases from_csv from_csv,Column,character-method +#' +#' @note from_csv since 3.0.0 +setMethod("from_csv", signature(x = "Column", schema = "character"), + function(x, schema, ...) { + options <- varargsToStrEnv(...) + jc <- callJStatic("org.apache.spark.sql.functions", + "from_csv", + x@jc, schema, options) + column(jc) + }) + #' @details #' \code{from_utc_timestamp}: Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a #' time in UTC, and renders that time as a timestamp in the given time zone. For example, 'GMT+1' @@ -3720,24 +3740,3 @@ setMethod("current_timestamp", jc <- callJStatic("org.apache.spark.sql.functions", "current_timestamp") column(jc) }) - -#' @details -#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType} -#' with the specified \code{schema}. -#' If the string is unparseable, the Column will contain the value NA. -#' -#' @rdname column_collection_functions -#' @param schema a DDL-formatted string -#' @param ... options for CSV parser. Supported the same options as for CSV data source -#' @aliases from_csv from_csv,Column,character-method -#' -#' @note from_csv since 3.0.0 -setMethod("from_csv", signature(x = "Column", schema = "character"), - function(x, schema, ...) { - options <- varargsToStrEnv(...) - jc <- callJStatic("org.apache.spark.sql.functions", - "from_csv", - x@jc, schema, options) - column(jc) - }) - From 1ccca306be984e9d1a11a1efcc0997ebe4a2fe74 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 12 Sep 2018 09:26:41 +0200 Subject: [PATCH 13/44] Deduplication of schema description --- R/pkg/R/functions.R | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index b03d17c22f5f1..bdabf5a358a18 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -197,6 +197,13 @@ NULL #' \item \code{array_position}: a value to locate in the given array. #' \item \code{array_remove}: a value to remove in the given array. #' } +#' @param schema +#' \itemize{ +#' \item \code{from_json}: a structType object to use as the schema to use +#' when parsing the JSON string. Since Spark 2.3, the DDL-formatted string is +#' also supported for the schema. +#' \item \code{from_csv}: a DDL-formatted string +#' } #' @param ... additional argument(s). In \code{to_json}, \code{from_json} and \code{from_csv}, #' this contains additional named properties to control how it is converted, accepts #' the same options as the JSON and CSV data source. In \code{arrays_zip}, @@ -2165,8 +2172,6 @@ setMethod("date_format", signature(y = "Column", x = "character"), #' to \code{TRUE}. If the string is unparseable, the Column will contain the value NA. #' #' @rdname column_collection_functions -#' @param schema a structType object to use as the schema to use when parsing the JSON string. -#' Since Spark 2.3, the DDL-formatted string is also supported for the schema. #' @param as.json.array indicating if input string is JSON array of objects or a single object. #' @aliases from_json from_json,Column,characterOrstructType-method #' @examples @@ -2209,7 +2214,6 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType") #' If the string is unparseable, the Column will contain the value NA. #' #' @rdname column_collection_functions -#' @param schema a DDL-formatted string #' @aliases from_csv from_csv,Column,character-method #' #' @note from_csv since 3.0.0 From 7063af0a61875078d7eb0b30f56d058a7d704640 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 13 Sep 2018 12:26:01 +0200 Subject: [PATCH 14/44] Enable tests which was failed because parseLine returned null --- .../spark/sql/catalyst/expressions/CsvExpressionsSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala index a2b6aa216eeaf..670b219d9439d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala @@ -59,11 +59,11 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P ) } - ignore("from_csv bad UTF-8") { + test("from_csv bad UTF-8") { val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( CsvToStructs(schema, Map.empty, Literal(badCsv), gmtId), - null) + InternalRow(null)) } test("from_csv with timestamp") { From aee472c3e9431858d544b2bad765e582c8a33e91 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 16 Sep 2018 10:47:07 +0200 Subject: [PATCH 15/44] Addressing Felix Cheung's review comments --- R/pkg/NAMESPACE | 2 +- R/pkg/R/functions.R | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 0dcef6c79ec34..c5122843ff202 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -274,8 +274,8 @@ exportMethods("%<=>%", "floor", "format_number", "format_string", - "from_json", "from_csv", + "from_json", "from_unixtime", "from_utc_timestamp", "getField", diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index bdabf5a358a18..d578c41e0fb0f 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2215,7 +2215,6 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType") #' #' @rdname column_collection_functions #' @aliases from_csv from_csv,Column,character-method -#' #' @note from_csv since 3.0.0 setMethod("from_csv", signature(x = "Column", schema = "character"), function(x, schema, ...) { From 5e3787be804828c5a8b70ea72804a75f9aef841d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 20 Sep 2018 22:17:30 +0200 Subject: [PATCH 16/44] Re-targeting to Spark 2.5.0 --- R/pkg/R/functions.R | 2 +- python/pyspark/sql/functions.py | 2 +- .../spark/sql/catalyst/expressions/csvExpressions.scala | 2 +- sql/core/src/main/scala/org/apache/spark/sql/functions.scala | 4 ++-- .../test/resources/sql-tests/results/csv-functions.sql.out | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index d578c41e0fb0f..80df2f2ae13db 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2215,7 +2215,7 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType") #' #' @rdname column_collection_functions #' @aliases from_csv from_csv,Column,character-method -#' @note from_csv since 3.0.0 +#' @note from_csv since 2.5.0 setMethod("from_csv", signature(x = "Column", schema = "character"), function(x, schema, ...) { options <- varargsToStrEnv(...) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 8a666859d520f..7aeb88d7b2124 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2638,7 +2638,7 @@ def sequence(start, stop, step=None): @ignore_unicode_prefix -@since(3.0) +@since(2.5) def from_csv(col, schema, options={}): """ Parses a column containing a CSV string into a :class:`StructType` diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 008f4a4147ed5..0c1466de53ac3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -36,7 +36,7 @@ import org.apache.spark.unsafe.types.UTF8String > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); {"a":1, "b":0.8} """, - since = "3.0.0") + since = "2.5.0") // scalastyle:on line.size.limit case class CsvToStructs( schema: StructType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index c40f0ebf32cf8..754dea79a8d9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3821,7 +3821,7 @@ object functions { * CSV data source. * * @group collection_funcs - * @since 3.0.0 + * @since 2.5.0 */ def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr { CsvToStructs(schema, options, e.expr) @@ -3837,7 +3837,7 @@ object functions { * CSV data source. * * @group collection_funcs - * @since 3.0.0 + * @since 2.5.0 */ def from_csv(e: Column, schema: String, options: java.util.Map[String, String]): Column = { withExpr(new CsvToStructs(e.expr, lit(schema).expr, options.asScala.toMap)) diff --git a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out index 4f74bbebd22ee..ebb19440c19f0 100644 --- a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out @@ -23,7 +23,7 @@ Extended Usage: > SELECT from_csv('1, 0.8', 'a INT, b DOUBLE'); {"a":1, "b":0.8} - Since: 3.0.0 + Since: 2.5.0 Function: from_csv Usage: from_csv(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`. From 81ae6881ea09ba2caeafc41ff6a4fa097096deac Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 20 Sep 2018 22:26:12 +0200 Subject: [PATCH 17/44] Removing unnecessary trim and exception handling. --- .../spark/sql/catalyst/expressions/csvExpressions.scala | 7 +------ .../sql/catalyst/expressions/CsvExpressionsSuite.scala | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 0c1466de53ac3..7ca32a4e61d36 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -97,12 +97,7 @@ case class CsvToStructs( override def nullSafeEval(input: Any): Any = { val csv = input.asInstanceOf[UTF8String].toString - if (csv.trim.isEmpty) return null - try { - converter(parser.parse(csv)) - } catch { - case _: BadRecordException => null - } + converter(parser.parse(csv)) } override def inputTypes: Seq[AbstractDataType] = StringType :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala index 670b219d9439d..57cdc71e73b43 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala @@ -114,7 +114,7 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( CsvToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId), - null + InternalRow(null) ) } From 27e162b91214774fac81d0ae99139887865ddd05 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 24 Sep 2018 21:29:47 +0200 Subject: [PATCH 18/44] Removing unnecessary sql tests --- .../sql-tests/inputs/csv-functions.sql | 3 - .../sql-tests/results/csv-functions.sql.out | 77 +++++-------------- 2 files changed, 21 insertions(+), 59 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql index c77568914cc51..d2214fd016028 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql @@ -1,7 +1,4 @@ -- from_csv -describe function from_csv; -describe function extended from_csv; -select from_csv('1', 'a INT'); select from_csv('1, 3.14', 'a INT, f FLOAT'); select from_csv('26/08/2015', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')); -- Check if errors handled diff --git a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out index ebb19440c19f0..15dbe36bc0f6a 100644 --- a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out @@ -1,72 +1,37 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 10 +-- Number of queries: 7 -- !query 0 -describe function from_csv --- !query 0 schema -struct --- !query 0 output -Class: org.apache.spark.sql.catalyst.expressions.CsvToStructs -Function: from_csv -Usage: from_csv(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`. - - --- !query 1 -describe function extended from_csv --- !query 1 schema -struct --- !query 1 output -Class: org.apache.spark.sql.catalyst.expressions.CsvToStructs -Extended Usage: - Examples: - > SELECT from_csv('1, 0.8', 'a INT, b DOUBLE'); - {"a":1, "b":0.8} - - Since: 2.5.0 - -Function: from_csv -Usage: from_csv(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`. - - --- !query 2 -select from_csv('1', 'a INT') --- !query 2 schema -struct> --- !query 2 output -{"a":1} - - --- !query 3 select from_csv('1, 3.14', 'a INT, f FLOAT') --- !query 3 schema +-- !query 0 schema struct> --- !query 3 output +-- !query 0 output {"a":1,"f":3.14} --- !query 4 +-- !query 1 select from_csv('26/08/2015', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')) --- !query 4 schema +-- !query 1 schema struct> --- !query 4 output +-- !query 1 output {"time":2015-08-26 00:00:00.0} --- !query 5 +-- !query 2 select from_csv('1', 1) --- !query 5 schema +-- !query 2 schema struct<> --- !query 5 output +-- !query 2 output org.apache.spark.sql.AnalysisException Schema should be specified in DDL format as a string literal instead of 1;; line 1 pos 7 --- !query 6 +-- !query 3 select from_csv('1', 'a InvalidType') --- !query 6 schema +-- !query 3 schema struct<> --- !query 6 output +-- !query 3 output org.apache.spark.sql.AnalysisException DataType invalidtype is not supported.(line 1, pos 2) @@ -77,28 +42,28 @@ a InvalidType ; line 1 pos 7 --- !query 7 +-- !query 4 select from_csv('1', 'a INT', named_struct('mode', 'PERMISSIVE')) --- !query 7 schema +-- !query 4 schema struct<> --- !query 7 output +-- !query 4 output org.apache.spark.sql.AnalysisException Must use a map() function for options;; line 1 pos 7 --- !query 8 +-- !query 5 select from_csv('1', 'a INT', map('mode', 1)) --- !query 8 schema +-- !query 5 schema struct<> --- !query 8 output +-- !query 5 output org.apache.spark.sql.AnalysisException A type of keys and values in map() must be string, but got map;; line 1 pos 7 --- !query 9 +-- !query 6 select from_csv() --- !query 9 schema +-- !query 6 schema struct<> --- !query 9 output +-- !query 6 output org.apache.spark.sql.AnalysisException Invalid number of arguments for function from_csv. Expected: one of 2 and 3; Found: 0; line 1 pos 7 From 783559a94dded253839a1e2af201c9b7c13ba272 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 24 Sep 2018 21:39:55 +0200 Subject: [PATCH 19/44] Removing unnecessary import in python's example --- python/pyspark/sql/functions.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 7aeb88d7b2124..6984ea0b94379 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2648,7 +2648,6 @@ def from_csv(col, schema, options={}): :param schema: a string with schema in DDL format to use when parsing the CSV column. :param options: options to control parsing. accepts the same options as the CSV datasource - >>> from pyspark.sql.types import * >>> data = [(1, '1')] >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(from_csv(df.value, "a INT").alias("csv")).collect() From 5945322234ef9366be06706b003043e9db3234b1 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 25 Sep 2018 22:31:30 +0200 Subject: [PATCH 20/44] Moving tests from csvFunctionsSuite to csvExpressionsSuite --- .../expressions/CsvExpressionsSuite.scala | 13 ++++++- .../apache/spark/sql/CsvFunctionsSuite.scala | 37 +++---------------- 2 files changed, 17 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala index 57cdc71e73b43..d732c06058137 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala @@ -118,7 +118,7 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P ) } - test("from_csv missing fields") { + test("forcing schema nullability") { val input = """1,,"foo"""" val csvSchema = new StructType() .add("a", LongType, nullable = false) @@ -131,4 +131,15 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P val schemaToCompare = csvSchema.asNullable assert(schemaToCompare == schema) } + + + test("from_csv missing columns") { + val schema = new StructType() + .add("a", IntegerType) + .add("b", IntegerType) + checkEvaluation( + CsvToStructs(schema, Map.empty, Literal.create("1"), gmtId), + InternalRow(1, null) + ) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index 1b4bc78632011..7b461797ed0a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import scala.collection.JavaConverters._ + import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -24,14 +26,12 @@ import org.apache.spark.sql.types._ class CsvFunctionsSuite extends QueryTest with SharedSQLContext { import testImplicits._ - val noOptions = Map[String, String]() - - test("from_csv") { + test("from_csv with empty options") { val df = Seq("1").toDS() - val schema = new StructType().add("a", IntegerType) + val schema = "a int" checkAnswer( - df.select(from_csv($"value", schema, noOptions)), + df.select(from_csv($"value", schema, Map[String, String]().asJava)), Row(Row(1)) :: Nil) } @@ -44,31 +44,4 @@ class CsvFunctionsSuite extends QueryTest with SharedSQLContext { df.select(from_csv($"value", schema, options)), Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))) } - - test("from_csv missing columns") { - val df = Seq("1").toDS() - val schema = new StructType() - .add("a", IntegerType) - .add("b", IntegerType) - - checkAnswer( - df.select(from_csv($"value", schema, noOptions)), - Row(Row(1, null)) :: Nil) - } - - test("from_csv invalid CSV") { - val df = Seq("???").toDS() - val schema = new StructType().add("a", IntegerType) - - checkAnswer( - df.select(from_csv($"value", schema, noOptions)), - Row(Row(null)) :: Nil) - } - - test("Support from_csv in SQL") { - val df1 = Seq("1").toDS() - checkAnswer( - df1.selectExpr("from_csv(value, 'a INT')"), - Row(Row(1)) :: Nil) - } } From 9102135202139851bde1f4aab11a63de50637753 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 27 Sep 2018 23:02:50 +0200 Subject: [PATCH 21/44] Fix a mistake after merge --- R/pkg/R/functions.R | 4 ---- 1 file changed, 4 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index a76bceabeb8cf..2851814eacabd 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2226,10 +2226,6 @@ setMethod("from_csv", signature(x = "Column", schema = "character"), column(jc) }) -#' @details -#' \code{from_utc_timestamp}: Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a -#' time in UTC, and renders that time as a timestamp in the given time zone. For example, 'GMT+1' -#' would yield '2017-07-14 03:40:00.0'. #' \code{from_utc_timestamp}: This is a common function for databases supporting TIMESTAMP WITHOUT #' TIMEZONE. This function takes a timestamp which is timezone-agnostic, and interprets it as a #' timestamp in UTC, and renders that timestamp as a timestamp in the given time zone. From b36d96af3853701690297e5bd89a37b7209a446e Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 27 Sep 2018 23:04:49 +0200 Subject: [PATCH 22/44] Revert the datails tag back --- R/pkg/R/functions.R | 1 + 1 file changed, 1 insertion(+) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 2851814eacabd..6d56c2f0076ed 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2226,6 +2226,7 @@ setMethod("from_csv", signature(x = "Column", schema = "character"), column(jc) }) +#' @details #' \code{from_utc_timestamp}: This is a common function for databases supporting TIMESTAMP WITHOUT #' TIMEZONE. This function takes a timestamp which is timezone-agnostic, and interprets it as a #' timestamp in UTC, and renders that timestamp as a timestamp in the given time zone. From 2169b860093b2dcd99693c33b836035b42dcd128 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 29 Sep 2018 12:38:42 +0200 Subject: [PATCH 23/44] An example of from_csv in R --- R/pkg/R/functions.R | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 6d56c2f0076ed..d760e0d78f2d8 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2216,6 +2216,12 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType") #' #' @rdname column_collection_functions #' @aliases from_csv from_csv,Column,character-method +#' @examples +#' +#' \dontrun{ +#' df <- sql("SELECT 'Amsterdam,2018' as csv") +#' schema <- "city STRING, year INT" +#' head(select(df, from_csv(df$csv, schema))) #' @note from_csv since 2.5.0 setMethod("from_csv", signature(x = "Column", schema = "character"), function(x, schema, ...) { From 826be4e67221c9d2a4d904ff56db48073883715c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 29 Sep 2018 13:08:00 +0200 Subject: [PATCH 24/44] Missed } --- R/pkg/R/functions.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index d760e0d78f2d8..9989d6bde6e75 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2221,7 +2221,7 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType") #' \dontrun{ #' df <- sql("SELECT 'Amsterdam,2018' as csv") #' schema <- "city STRING, year INT" -#' head(select(df, from_csv(df$csv, schema))) +#' head(select(df, from_csv(df$csv, schema)))} #' @note from_csv since 2.5.0 setMethod("from_csv", signature(x = "Column", schema = "character"), function(x, schema, ...) { From a5b6f696c5f410d3be93299bb748799d37d04bb9 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 29 Sep 2018 17:11:57 +0200 Subject: [PATCH 25/44] Fix wrong reference --- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index f33a1961f1741..9f2848365bf40 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -754,7 +754,7 @@ case class SchemaOfJson( def this(child: Expression, options: Expression) = this( child = child, - options = JsonExprUtils.convertToMapData(options)) + options = ExprUtils.convertToMapData(options)) @transient private lazy val jsonOptions = new JSONOptions(options, "UTC") @@ -777,7 +777,6 @@ case class SchemaOfJson( } object JsonExprUtils { - def evalSchemaExpr(exp: Expression): DataType = exp match { case Literal(s, StringType) => DataType.fromDDL(s.toString) case e @ SchemaOfJson(_: Literal, _) => From 0ee4ec166fe16ca76b015c522ed141b0df95043d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 3 Oct 2018 19:44:48 +0100 Subject: [PATCH 26/44] 2.5 -> 3.0 --- R/pkg/R/functions.R | 2 +- python/pyspark/sql/functions.py | 2 +- .../spark/sql/catalyst/expressions/csvExpressions.scala | 2 +- sql/core/src/main/scala/org/apache/spark/sql/functions.scala | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 9989d6bde6e75..f5c8bf8d5d7ed 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2222,7 +2222,7 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType") #' df <- sql("SELECT 'Amsterdam,2018' as csv") #' schema <- "city STRING, year INT" #' head(select(df, from_csv(df$csv, schema)))} -#' @note from_csv since 2.5.0 +#' @note from_csv since 3.0.0 setMethod("from_csv", signature(x = "Column", schema = "character"), function(x, schema, ...) { options <- varargsToStrEnv(...) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index dd7ba54d6fb6d..7a6902d80fb1b 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2665,7 +2665,7 @@ def sequence(start, stop, step=None): @ignore_unicode_prefix -@since(2.5) +@since(3.0) def from_csv(col, schema, options={}): """ Parses a column containing a CSV string into a :class:`StructType` diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 7ca32a4e61d36..7adf01abbee8e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -36,7 +36,7 @@ import org.apache.spark.unsafe.types.UTF8String > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); {"a":1, "b":0.8} """, - since = "2.5.0") + since = "3.0.0") // scalastyle:on line.size.limit case class CsvToStructs( schema: StructType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 58df378f9506c..87467a6969c66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3840,7 +3840,7 @@ object functions { * CSV data source. * * @group collection_funcs - * @since 2.5.0 + * @since 3.0.0 */ def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr { CsvToStructs(schema, options, e.expr) @@ -3856,7 +3856,7 @@ object functions { * CSV data source. * * @group collection_funcs - * @since 2.5.0 + * @since 3.0.0 */ def from_csv(e: Column, schema: String, options: java.util.Map[String, String]): Column = { withExpr(new CsvToStructs(e.expr, lit(schema).expr, options.asScala.toMap)) From 3af7bfa4833e1357a03e71ac230690f9e77d074f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 4 Oct 2018 20:23:25 +0100 Subject: [PATCH 27/44] Added an example with options --- .../apache/spark/sql/catalyst/expressions/csvExpressions.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 7adf01abbee8e..8e5d3107218c3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -35,6 +35,8 @@ import org.apache.spark.unsafe.types.UTF8String Examples: > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); {"a":1, "b":0.8} + > SELECT _FUNC_('26/08/2015', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')) + {"time":2015-08-26 00:00:00.0} """, since = "3.0.0") // scalastyle:on line.size.limit From 24481c24fea6adaea7d1b20d4d866c91ae06a532 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 4 Oct 2018 20:26:47 +0100 Subject: [PATCH 28/44] Passing a parameter by name --- .../apache/spark/sql/catalyst/expressions/csvExpressions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 8e5d3107218c3..498917746585d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -75,7 +75,7 @@ case class CsvToStructs( lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null @transient lazy val parser = { - val parsedOptions = new CSVOptions(options, true, timeZoneId.get) + val parsedOptions = new CSVOptions(options, columnPruning = true, timeZoneId.get) val mode = parsedOptions.parseMode if (mode != PermissiveMode && mode != FailFastMode) { throw new AnalysisException(s"from_csv() doesn't support the ${mode.name} mode. " + From 70149d85cfd0a4e0cedf8ee8bb111dadda2a4bf3 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 4 Oct 2018 20:28:22 +0100 Subject: [PATCH 29/44] Adding {} --- .../apache/spark/sql/catalyst/expressions/csvExpressions.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 498917746585d..e54e814e4538b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -94,8 +94,9 @@ case class CsvToStructs( override def dataType: DataType = nullableSchema - override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = { copy(timeZoneId = Option(timeZoneId)) + } override def nullSafeEval(input: Any): Any = { val csv = input.asInstanceOf[UTF8String].toString From d70089689ee1098e7de5de79bdba600db0b5d5eb Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 4 Oct 2018 20:39:16 +0100 Subject: [PATCH 30/44] Test for unsupported mode --- .../catalyst/expressions/CsvExpressionsSuite.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala index d732c06058137..dee1b0c113a49 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions import java.util.Calendar +import org.scalatest.exceptions.TestFailedException + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.plans.PlanTestBase @@ -142,4 +144,15 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P InternalRow(1, null) ) } + + test("unsupported mode") { + val csvData = "---" + val schema = StructType(StructField("a", DoubleType) :: Nil) + val exception = intercept[TestFailedException] { + checkEvaluation( + CsvToStructs(schema, Map("mode" -> DropMalformedMode.name), Literal(csvData), gmtId), + InternalRow(null)) + }.getCause + assert(exception.getMessage.contains("from_csv() doesn't support the DROPMALFORMED mode")) + } } From 643aaeab3d9d50ebe7a53d3d47fff3b3d16975d3 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 4 Oct 2018 21:01:15 +0100 Subject: [PATCH 31/44] Test for corrupted records --- .../org/apache/spark/sql/CsvFunctionsSuite.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index 7b461797ed0a1..6ccd06be26a50 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -44,4 +44,19 @@ class CsvFunctionsSuite extends QueryTest with SharedSQLContext { df.select(from_csv($"value", schema, options)), Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))) } + + + test("checking the columnNameOfCorruptRecord option") { + val columnNameOfCorruptRecord = "_unparsed" + val df = Seq("0,2013-111-11 12:13:14", "1,1983-08-04").toDS() + val schema = new StructType().add("a", IntegerType).add("b", TimestampType) + val schemaWithCorrField1 = schema.add(columnNameOfCorruptRecord, StringType) + val df2 = df + .select(from_csv($"value", schemaWithCorrField1, Map( + "mode" -> "Permissive", "columnNameOfCorruptRecord" -> columnNameOfCorruptRecord))) + + checkAnswer(df2, Seq( + Row(Row(null, null, "0,2013-111-11 12:13:14")), + Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null)))) + } } From 3f76ffb9dae5eacaff149f176437a368f27a162a Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 5 Oct 2018 10:35:46 +0100 Subject: [PATCH 32/44] Speed up test by taking only 50 timezones out of 620 --- .../spark/sql/catalyst/expressions/CsvExpressionsSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala index dee1b0c113a49..7f34eaa09c8a0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions import java.util.Calendar +import scala.util.Random + import org.scalatest.exceptions.TestFailedException import org.apache.spark.SparkFunSuite @@ -88,7 +90,7 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P ) val csvData2 = "2016-01-01T00:00:00" - for (tz <- DateTimeTestUtils.ALL_TIMEZONES) { + for (tz <- Random.shuffle(DateTimeTestUtils.ALL_TIMEZONES).take(50)) { c = Calendar.getInstance(tz) c.set(2016, 0, 1, 0, 0, 0) c.set(Calendar.MILLISECOND, 0) From 7d337832c6e68d0d9930419a4dadc99aaa172039 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 5 Oct 2018 10:43:11 +0100 Subject: [PATCH 33/44] Making the comment clear --- python/pyspark/sql/functions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 7a6902d80fb1b..88abaa8e08c21 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2668,8 +2668,8 @@ def sequence(start, stop, step=None): @since(3.0) def from_csv(col, schema, options={}): """ - Parses a column containing a CSV string into a :class:`StructType` - with the specified schema. Returns `null`, in the case of an unparseable string. + Parses a column containing a CSV string to a row with the specified schema. + Returns `null`, in the case of an unparseable string. :param col: string column in CSV format :param schema: a string with schema in DDL format to use when parsing the CSV column. From 50916259d49f0466b2b62a2ab581e73eb4e6c36d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 5 Oct 2018 11:10:46 +0100 Subject: [PATCH 34/44] Addressing Wenchen's review comments --- .../catalyst/expressions/csvExpressions.scala | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index e54e814e4538b..2c8994089fbec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -45,13 +45,17 @@ case class CsvToStructs( options: Map[String, String], child: Expression, timeZoneId: Option[String] = None) - extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { + extends UnaryExpression + with TimeZoneAwareExpression + with CodegenFallback + with ExpectsInputTypes + with NullIntolerant { - override def nullable: Boolean = true + override def nullable: Boolean = child.nullable // The CSV input data might be missing certain fields. We force the nullability // of the user-provided schema to avoid data corruptions. - val nullableSchema = schema.asNullable + val nullableSchema: StructType = schema.asNullable // Used in `FunctionRegistry` def this(child: Expression, schema: Expression, options: Map[String, String]) = @@ -72,7 +76,12 @@ case class CsvToStructs( // This converts parsed rows to the desired output by the given schema. @transient - lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null + lazy val converter = (rows: Iterator[InternalRow]) => + if (rows.hasNext) { + rows.next() + } else { + throw new IllegalArgumentException("Expected at least one row from CSV parser.") + } @transient lazy val parser = { val parsedOptions = new CSVOptions(options, columnPruning = true, timeZoneId.get) From b318239f96c8b589ed493ec83e85ea40672647fd Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 6 Oct 2018 00:20:54 +0200 Subject: [PATCH 35/44] Improving the error message --- .../spark/sql/catalyst/expressions/csvExpressions.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 2c8994089fbec..459a53c7c0fe6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -76,12 +76,13 @@ case class CsvToStructs( // This converts parsed rows to the desired output by the given schema. @transient - lazy val converter = (rows: Iterator[InternalRow]) => + lazy val converter = (rows: Iterator[InternalRow]) => { if (rows.hasNext) { rows.next() } else { - throw new IllegalArgumentException("Expected at least one row from CSV parser.") + throw new IllegalArgumentException("Expected one row from CSV parser.") } + } @transient lazy val parser = { val parsedOptions = new CSVOptions(options, columnPruning = true, timeZoneId.get) From 3d8a89327c256f7703d7b57b3f5680dc95bd73db Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 11 Oct 2018 10:37:02 +0200 Subject: [PATCH 36/44] Using outstanding timezones instead of all timezones --- .../test/scala/org/apache/spark/SparkFunSuite.scala | 10 ++++++++++ .../sql/catalyst/expressions/CsvExpressionsSuite.scala | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index 31289026b0027..c76b842b876fa 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark // scalastyle:off import java.io.File +import java.util.TimeZone import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome} @@ -106,4 +107,13 @@ abstract class SparkFunSuite } } + lazy val outstandingTimezones = Seq( + TimeZone.getTimeZone("UTC"), + TimeZone.getTimeZone("PST"), + TimeZone.getTimeZone("CET"), + TimeZone.getTimeZone("Africa/Dakar"), + TimeZone.getTimeZone("America/Los_Angeles"), + TimeZone.getTimeZone("Antarctica/Vostok"), + TimeZone.getTimeZone("Asia/Hong_Kong"), + TimeZone.getTimeZone("Europe/Amsterdam")) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala index 7f34eaa09c8a0..756656d1ba825 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala @@ -90,7 +90,7 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P ) val csvData2 = "2016-01-01T00:00:00" - for (tz <- Random.shuffle(DateTimeTestUtils.ALL_TIMEZONES).take(50)) { + for (tz <- outstandingTimezones) { c = Calendar.getInstance(tz) c.set(2016, 0, 1, 0, 0, 0) c.set(Calendar.MILLISECOND, 0) From 8d297b2b8a5f1db4830b4868e59bd7e9e162b4cc Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 11 Oct 2018 15:10:42 +0200 Subject: [PATCH 37/44] Moving back to DataTimeTestUtils --- .../test/scala/org/apache/spark/SparkFunSuite.scala | 10 ---------- .../sql/catalyst/expressions/CsvExpressionsSuite.scala | 4 +--- .../spark/sql/catalyst/util/DateTimeTestUtils.scala | 10 ++++++++++ 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index c76b842b876fa..31289026b0027 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark // scalastyle:off import java.io.File -import java.util.TimeZone import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome} @@ -107,13 +106,4 @@ abstract class SparkFunSuite } } - lazy val outstandingTimezones = Seq( - TimeZone.getTimeZone("UTC"), - TimeZone.getTimeZone("PST"), - TimeZone.getTimeZone("CET"), - TimeZone.getTimeZone("Africa/Dakar"), - TimeZone.getTimeZone("America/Los_Angeles"), - TimeZone.getTimeZone("Antarctica/Vostok"), - TimeZone.getTimeZone("Asia/Hong_Kong"), - TimeZone.getTimeZone("Europe/Amsterdam")) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala index 756656d1ba825..65987af710750 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.catalyst.expressions import java.util.Calendar -import scala.util.Random - import org.scalatest.exceptions.TestFailedException import org.apache.spark.SparkFunSuite @@ -90,7 +88,7 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P ) val csvData2 = "2016-01-01T00:00:00" - for (tz <- outstandingTimezones) { + for (tz <- DateTimeTestUtils.outstandingTimezones) { c = Calendar.getInstance(tz) c.set(2016, 0, 1, 0, 0, 0) c.set(Calendar.MILLISECOND, 0) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala index 0c1feb3aa0882..dfa0fe93a2f9c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala @@ -26,6 +26,16 @@ object DateTimeTestUtils { val ALL_TIMEZONES: Seq[TimeZone] = TimeZone.getAvailableIDs.toSeq.map(TimeZone.getTimeZone) + val outstandingTimezones: Seq[TimeZone] = Seq( + "UTC", + "PST", + "CET", + "Africa/Dakar", + "America/Los_Angeles", + "Antarctica/Vostok", + "Asia/Hong_Kong", + "Europe/Amsterdam").map(TimeZone.getTimeZone) + def withDefaultTimeZone[T](newDefaultTimeZone: TimeZone)(block: => T): T = { val originalDefaultTimeZone = TimeZone.getDefault try { From c3a31d4ea6f3da305f6ab08eca2484043564bd2f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 12 Oct 2018 18:15:11 +0200 Subject: [PATCH 38/44] Moving CSVHeaderChecker to sql/catalyst --- .../sql/catalyst}/csv/CSVHeaderChecker.scala | 2 +- .../spark/sql/catalyst/csv/CSVUtils.scala | 23 +++++++++++++++++++ .../apache/spark/sql/DataFrameReader.scala | 3 +-- .../datasources/csv/CSVDataSource.scala | 2 +- .../datasources/csv/CSVFileFormat.scala | 2 +- .../execution/datasources/csv/CSVUtils.scala | 23 ------------------- 6 files changed, 27 insertions(+), 28 deletions(-) rename sql/{core/src/main/scala/org/apache/spark/sql/execution/datasources => catalyst/src/main/scala/org/apache/spark/sql/catalyst}/csv/CSVHeaderChecker.scala (98%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala similarity index 98% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala index 558ee91c419b9..b1b010de9f2f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources.csv +package org.apache.spark.sql.catalyst.csv import com.univocity.parsers.csv.CsvParser diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala index adbe7c402d515..64fcbf89ee044 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala @@ -28,6 +28,29 @@ object CSVUtils { } } + def skipComments(iter: Iterator[String], options: CSVOptions): Iterator[String] = { + if (options.isCommentSet) { + val commentPrefix = options.comment.toString + iter.dropWhile { line => + line.trim.isEmpty || line.trim.startsWith(commentPrefix) + } + } else { + iter.dropWhile(_.trim.isEmpty) + } + } + + /** + * Extracts header and moves iterator forward so that only data remains in it + */ + def extractHeader(iter: Iterator[String], options: CSVOptions): Option[String] = { + val nonEmptyLines = skipComments(iter, options) + if (nonEmptyLines.hasNext) { + Some(nonEmptyLines.next()) + } else { + None + } + } + /** * Helper method that converts string representation of a character to actual character. * It handles some Java escaped strings and throws exception if given string is longer than one diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 76147d653762e..4f6d8b8a0c34a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -22,14 +22,13 @@ import java.util.{Locale, Properties} import scala.collection.JavaConverters._ import com.fasterxml.jackson.databind.ObjectMapper -import com.univocity.parsers.csv.CsvParser import org.apache.spark.Partition import org.apache.spark.annotation.InterfaceStability import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.csv.{CSVOptions, UnivocityParser} +import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, UnivocityParser} import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} import org.apache.spark.sql.catalyst.util.FailureSafeParser import org.apache.spark.sql.execution.command.DDLUtils diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index f7e4e9c7fd438..b133bb5a57b27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -34,7 +34,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.{BinaryFileRDD, RDD} import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.csv.{CSVOptions, UnivocityParser} +import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, UnivocityParser} import org.apache.spark.sql.catalyst.csv.CSVUtils.filterCommentAndEmpty import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 0291dc41c1aa9..954a5a9cdecbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.csv.{CSVOptions, UnivocityParser} +import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, UnivocityParser} import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala index fa3080b3702b8..84888caffd81d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala @@ -58,29 +58,6 @@ object CSVUtils { } } - def skipComments(iter: Iterator[String], options: CSVOptions): Iterator[String] = { - if (options.isCommentSet) { - val commentPrefix = options.comment.toString - iter.dropWhile { line => - line.trim.isEmpty || line.trim.startsWith(commentPrefix) - } - } else { - iter.dropWhile(_.trim.isEmpty) - } - } - - /** - * Extracts header and moves iterator forward so that only data remains in it - */ - def extractHeader(iter: Iterator[String], options: CSVOptions): Option[String] = { - val nonEmptyLines = skipComments(iter, options) - if (nonEmptyLines.hasNext) { - Some(nonEmptyLines.next()) - } else { - None - } - } - /** * Generates a header from the given row which is null-safe and duplicate-safe. */ From 88e3b10a15e240db3e114f6ef08d2386b55c1e98 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 13 Oct 2018 08:16:21 +0200 Subject: [PATCH 39/44] Moving toChar to sql/catalyst --- .../spark/sql/catalyst/csv/CSVUtils.scala | 36 ++++++++++--------- .../execution/datasources/csv/CSVUtils.scala | 29 --------------- 2 files changed, 19 insertions(+), 46 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala index 64fcbf89ee044..109325089ff88 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala @@ -58,23 +58,25 @@ object CSVUtils { */ @throws[IllegalArgumentException] def toChar(str: String): Char = { - if (str.charAt(0) == '\\') { - str.charAt(1) - match { - case 't' => '\t' - case 'r' => '\r' - case 'b' => '\b' - case 'f' => '\f' - case '\"' => '\"' // In case user changes quote char and uses \" as delimiter in options - case '\'' => '\'' - case 'u' if str == """\u0000""" => '\u0000' - case _ => - throw new IllegalArgumentException(s"Unsupported special character for delimiter: $str") - } - } else if (str.length == 1) { - str.charAt(0) - } else { - throw new IllegalArgumentException(s"Delimiter cannot be more than one character: $str") + (str: Seq[Char]) match { + case Seq() => throw new IllegalArgumentException("Delimiter cannot be empty string") + case Seq('\\') => throw new IllegalArgumentException("Single backslash is prohibited." + + " It has special meaning as beginning of an escape sequence." + + " To get the backslash character, pass a string with two backslashes as the delimiter.") + case Seq(c) => c + case Seq('\\', 't') => '\t' + case Seq('\\', 'r') => '\r' + case Seq('\\', 'b') => '\b' + case Seq('\\', 'f') => '\f' + // In case user changes quote char and uses \" as delimiter in options + case Seq('\\', '\"') => '\"' + case Seq('\\', '\'') => '\'' + case Seq('\\', '\\') => '\\' + case _ if str == """\u0000""" => '\u0000' + case Seq('\\', _) => + throw new IllegalArgumentException(s"Unsupported special character for delimiter: $str") + case _ => + throw new IllegalArgumentException(s"Delimiter cannot be more than one character: $str") } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala index 0defc00aa3cfb..689555f53eb26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala @@ -100,35 +100,6 @@ object CSVUtils { } } - /** - * Helper method that converts string representation of a character to actual character. - * It handles some Java escaped strings and throws exception if given string is longer than one - * character. - */ - @throws[IllegalArgumentException] - def toChar(str: String): Char = { - (str: Seq[Char]) match { - case Seq() => throw new IllegalArgumentException("Delimiter cannot be empty string") - case Seq('\\') => throw new IllegalArgumentException("Single backslash is prohibited." + - " It has special meaning as beginning of an escape sequence." + - " To get the backslash character, pass a string with two backslashes as the delimiter.") - case Seq(c) => c - case Seq('\\', 't') => '\t' - case Seq('\\', 'r') => '\r' - case Seq('\\', 'b') => '\b' - case Seq('\\', 'f') => '\f' - // In case user changes quote char and uses \" as delimiter in options - case Seq('\\', '\"') => '\"' - case Seq('\\', '\'') => '\'' - case Seq('\\', '\\') => '\\' - case _ if str == """\u0000""" => '\u0000' - case Seq('\\', _) => - throw new IllegalArgumentException(s"Unsupported special character for delimiter: $str") - case _ => - throw new IllegalArgumentException(s"Delimiter cannot be more than one character: $str") - } - } - /** * Sample CSV dataset as configured by `samplingRatio`. */ From 2ffed5fdc518c5832ca917722fb197c11f630d6d Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 15 Oct 2018 18:30:31 +0800 Subject: [PATCH 40/44] Address comments at 22379 --- R/pkg/R/functions.R | 11 ++++++-- R/pkg/tests/fulltests/test_sparkSQL.R | 2 ++ python/pyspark/sql/functions.py | 16 +++++++++++- ...SVUtils.scala => CSVExpressionUtils.scala} | 2 +- .../sql/catalyst/csv/CSVHeaderChecker.scala | 2 +- .../spark/sql/catalyst/csv/CSVOptions.scala | 2 +- .../sql/catalyst/csv/UnivocityParser.scala | 2 +- .../sql/catalyst/csv/CSVUtilsSuite.scala | 26 +++++++++---------- .../datasources/csv/CSVDataSource.scala | 3 +-- .../execution/datasources/csv/CSVUtils.scala | 4 +++ .../org/apache/spark/sql/functions.scala | 4 +-- .../apache/spark/sql/CsvFunctionsSuite.scala | 2 +- 12 files changed, 51 insertions(+), 25 deletions(-) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/{CSVUtils.scala => CSVExpressionUtils.scala} (99%) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 72ae3771211de..d2ca1d6c00bb4 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2223,12 +2223,19 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType") #' schema <- "city STRING, year INT" #' head(select(df, from_csv(df$csv, schema)))} #' @note from_csv since 3.0.0 -setMethod("from_csv", signature(x = "Column", schema = "character"), +setMethod("from_csv", signature(x = "Column", schema = "characterOrColumn"), function(x, schema, ...) { + if (class(schema) == "Column") { + jschema <- schema@jc + } else if (is.character(schema)) { + jschema <- callJStatic("org.apache.spark.sql.functions", "lit", schema) + } else { + stop("schema argument should be a column or character") + } options <- varargsToStrEnv(...) jc <- callJStatic("org.apache.spark.sql.functions", "from_csv", - x@jc, schema, options) + x@jc, jschema, options) column(jc) }) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 9c790acf7d3bb..5ad5d78d3ed17 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1651,6 +1651,8 @@ test_that("column functions", { df <- as.DataFrame(list(list("col" = "1"))) c <- collect(select(df, alias(from_csv(df$col, "a INT"), "csv"))) expect_equal(c[[1]][[1]]$a, 1) + c <- collect(select(df, alias(from_csv(df$col, lit("a INT")), "csv"))) + expect_equal(c[[1]][[1]]$a, 1) # Test to_json(), from_json() df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people") diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index b86fd32bca196..9a0ff159f1897 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -25,9 +25,12 @@ if sys.version < "3": from itertools import imap as map +if sys.version >= '3': + basestring = str + from pyspark import since, SparkContext from pyspark.rdd import ignore_unicode_prefix, PythonEvalType -from pyspark.sql.column import Column, _to_java_column, _to_seq +from pyspark.sql.column import Column, _to_java_column, _to_seq, _create_column_from_literal from pyspark.sql.dataframe import DataFrame from pyspark.sql.types import StringType, DataType # Keep UserDefinedFunction import for backwards compatible import; moved in SPARK-22409 @@ -2693,9 +2696,20 @@ def from_csv(col, schema, options={}): >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(from_csv(df.value, "a INT").alias("csv")).collect() [Row(csv=Row(a=1))] + >>> data = [(1, '1')] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(from_csv(df.value, lit("a INT")).alias("csv")).collect() + [Row(csv=Row(a=1))] """ sc = SparkContext._active_spark_context + if isinstance(schema, basestring): + schema = _create_column_from_literal(schema) + elif isinstance(schema, Column): + schema = _to_java_column(schema) + else: + raise TypeError("schema argument should be a column or string") + jc = sc._jvm.functions.from_csv(_to_java_column(col), schema, options) return Column(jc) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExpressionUtils.scala similarity index 99% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExpressionUtils.scala index 109325089ff88..22fd5b041b493 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExpressionUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.csv -object CSVUtils { +object CSVExpressionUtils { /** * Filter ignorable rows for CSV iterator (lines empty and starting with `comment`). * This is currently being used in CSV reading path and CSV schema inference. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala index b1b010de9f2f0..4665537dcdb70 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala @@ -123,7 +123,7 @@ class CSVHeaderChecker( // Note: if there are only comments in the first block, the header would probably // be not extracted. if (options.headerFlag && isStartOfFile) { - CSVUtils.extractHeader(lines, options).foreach { header => + CSVExpressionUtils.extractHeader(lines, options).foreach { header => checkHeaderColumnNames(tokenizer.parseLine(header)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 1f39b20bb2f5e..957238ff881d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -83,7 +83,7 @@ class CSVOptions( } } - val delimiter = CSVUtils.toChar( + val delimiter = CSVExpressionUtils.toChar( parameters.getOrElse("sep", parameters.getOrElse("delimiter", ","))) val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index f0890e0d36e89..e61771ae714ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -338,7 +338,7 @@ private[sql] object UnivocityParser { val options = parser.options - val filteredLines: Iterator[String] = CSVUtils.filterCommentAndEmpty(lines, options) + val filteredLines: Iterator[String] = CSVExpressionUtils.filterCommentAndEmpty(lines, options) val safeParser = new FailureSafeParser[String]( input => Seq(parser.parse(input)), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVUtilsSuite.scala index dde46a97e6734..0d4f1f6eaa674 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVUtilsSuite.scala @@ -19,42 +19,42 @@ package org.apache.spark.sql.catalyst.csv import org.apache.spark.SparkFunSuite -class CSVUtilsSuite extends SparkFunSuite { +class CSVExpressionUtilsSuite extends SparkFunSuite { test("Can parse escaped characters") { - assert(CSVUtils.toChar("""\t""") === '\t') - assert(CSVUtils.toChar("""\r""") === '\r') - assert(CSVUtils.toChar("""\b""") === '\b') - assert(CSVUtils.toChar("""\f""") === '\f') - assert(CSVUtils.toChar("""\"""") === '\"') - assert(CSVUtils.toChar("""\'""") === '\'') - assert(CSVUtils.toChar("""\u0000""") === '\u0000') - assert(CSVUtils.toChar("""\\""") === '\\') + assert(CSVExpressionUtils.toChar("""\t""") === '\t') + assert(CSVExpressionUtils.toChar("""\r""") === '\r') + assert(CSVExpressionUtils.toChar("""\b""") === '\b') + assert(CSVExpressionUtils.toChar("""\f""") === '\f') + assert(CSVExpressionUtils.toChar("""\"""") === '\"') + assert(CSVExpressionUtils.toChar("""\'""") === '\'') + assert(CSVExpressionUtils.toChar("""\u0000""") === '\u0000') + assert(CSVExpressionUtils.toChar("""\\""") === '\\') } test("Does not accept delimiter larger than one character") { val exception = intercept[IllegalArgumentException]{ - CSVUtils.toChar("ab") + CSVExpressionUtils.toChar("ab") } assert(exception.getMessage.contains("cannot be more than one character")) } test("Throws exception for unsupported escaped characters") { val exception = intercept[IllegalArgumentException]{ - CSVUtils.toChar("""\1""") + CSVExpressionUtils.toChar("""\1""") } assert(exception.getMessage.contains("Unsupported special character for delimiter")) } test("string with one backward slash is prohibited") { val exception = intercept[IllegalArgumentException]{ - CSVUtils.toChar("""\""") + CSVExpressionUtils.toChar("""\""") } assert(exception.getMessage.contains("Single backslash is prohibited")) } test("output proper error message for empty string") { val exception = intercept[IllegalArgumentException]{ - CSVUtils.toChar("") + CSVExpressionUtils.toChar("") } assert(exception.getMessage.contains("Delimiter cannot be empty string")) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index b133bb5a57b27..9e7b45db9f280 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -35,7 +35,6 @@ import org.apache.spark.rdd.{BinaryFileRDD, RDD} import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, UnivocityParser} -import org.apache.spark.sql.catalyst.csv.CSVUtils.filterCommentAndEmpty import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat import org.apache.spark.sql.types.StructType @@ -130,7 +129,7 @@ object TextInputCSVDataSource extends CSVDataSource { val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, parsedOptions) val sampled: Dataset[String] = CSVUtils.sample(csv, parsedOptions) val tokenRDD = sampled.rdd.mapPartitions { iter => - val filteredLines = filterCommentAndEmpty(iter, parsedOptions) + val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions) val linesWithoutHeader = CSVUtils.filterHeaderLine(filteredLines, maybeFirstLine.get, parsedOptions) val parser = new CsvParser(parsedOptions.asParserSettings) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala index 689555f53eb26..8b6945968181f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.csv import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset +import org.apache.spark.sql.catalyst.csv.CSVExpressionUtils import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.functions._ @@ -125,4 +126,7 @@ object CSVUtils { csv.sample(withReplacement = false, options.samplingRatio, 1) } } + + def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = + CSVExpressionUtils.filterCommentAndEmpty(iter, options) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 466db861ddf3a..8def9967cffb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3882,8 +3882,8 @@ object functions { * @group collection_funcs * @since 3.0.0 */ - def from_csv(e: Column, schema: String, options: java.util.Map[String, String]): Column = { - withExpr(new CsvToStructs(e.expr, lit(schema).expr, options.asScala.toMap)) + def from_csv(e: Column, schema: Column, options: java.util.Map[String, String]): Column = { + withExpr(new CsvToStructs(e.expr, schema.expr, options.asScala.toMap)) } // scalastyle:off line.size.limit diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index 6ccd06be26a50..38a2143d6d0f0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -31,7 +31,7 @@ class CsvFunctionsSuite extends QueryTest with SharedSQLContext { val schema = "a int" checkAnswer( - df.select(from_csv($"value", schema, Map[String, String]().asJava)), + df.select(from_csv($"value", lit(schema), Map[String, String]().asJava)), Row(Row(1)) :: Nil) } From a32bbcb44ae2ca9f5d329e96b7c33d37ed3208a0 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 15 Oct 2018 18:34:17 +0800 Subject: [PATCH 41/44] nit --- python/pyspark/sql/functions.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 9a0ff159f1897..32d7f02f61883 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2696,7 +2696,6 @@ def from_csv(col, schema, options={}): >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(from_csv(df.value, "a INT").alias("csv")).collect() [Row(csv=Row(a=1))] - >>> data = [(1, '1')] >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(from_csv(df.value, lit("a INT")).alias("csv")).collect() [Row(csv=Row(a=1))] From 93d094f45b02afc0ab2f0650bbde1513823471a2 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Mon, 15 Oct 2018 18:46:53 +0800 Subject: [PATCH 42/44] name nits (#11) --- ...pressionUtils.scala => CSVExprUtils.scala} | 2 +- .../sql/catalyst/csv/CSVHeaderChecker.scala | 2 +- .../spark/sql/catalyst/csv/CSVOptions.scala | 2 +- .../sql/catalyst/csv/UnivocityParser.scala | 2 +- .../sql/catalyst/csv/CSVUtilsSuite.scala | 24 +++++++++---------- .../execution/datasources/csv/CSVUtils.scala | 4 ++-- 6 files changed, 18 insertions(+), 18 deletions(-) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/{CSVExpressionUtils.scala => CSVExprUtils.scala} (99%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExpressionUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtils.scala similarity index 99% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExpressionUtils.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtils.scala index 22fd5b041b493..bbe27831f01df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExpressionUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.csv -object CSVExpressionUtils { +object CSVExprUtils { /** * Filter ignorable rows for CSV iterator (lines empty and starting with `comment`). * This is currently being used in CSV reading path and CSV schema inference. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala index 4665537dcdb70..c39f77e891ae1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala @@ -123,7 +123,7 @@ class CSVHeaderChecker( // Note: if there are only comments in the first block, the header would probably // be not extracted. if (options.headerFlag && isStartOfFile) { - CSVExpressionUtils.extractHeader(lines, options).foreach { header => + CSVExprUtils.extractHeader(lines, options).foreach { header => checkHeaderColumnNames(tokenizer.parseLine(header)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 957238ff881d0..3e25d820e9941 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -83,7 +83,7 @@ class CSVOptions( } } - val delimiter = CSVExpressionUtils.toChar( + val delimiter = CSVExprUtils.toChar( parameters.getOrElse("sep", parameters.getOrElse("delimiter", ","))) val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index e61771ae714ee..46ed58ed92830 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -338,7 +338,7 @@ private[sql] object UnivocityParser { val options = parser.options - val filteredLines: Iterator[String] = CSVExpressionUtils.filterCommentAndEmpty(lines, options) + val filteredLines: Iterator[String] = CSVExprUtils.filterCommentAndEmpty(lines, options) val safeParser = new FailureSafeParser[String]( input => Seq(parser.parse(input)), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVUtilsSuite.scala index 0d4f1f6eaa674..1133b3d3355d3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVUtilsSuite.scala @@ -21,40 +21,40 @@ import org.apache.spark.SparkFunSuite class CSVExpressionUtilsSuite extends SparkFunSuite { test("Can parse escaped characters") { - assert(CSVExpressionUtils.toChar("""\t""") === '\t') - assert(CSVExpressionUtils.toChar("""\r""") === '\r') - assert(CSVExpressionUtils.toChar("""\b""") === '\b') - assert(CSVExpressionUtils.toChar("""\f""") === '\f') - assert(CSVExpressionUtils.toChar("""\"""") === '\"') - assert(CSVExpressionUtils.toChar("""\'""") === '\'') - assert(CSVExpressionUtils.toChar("""\u0000""") === '\u0000') - assert(CSVExpressionUtils.toChar("""\\""") === '\\') + assert(CSVExprUtils.toChar("""\t""") === '\t') + assert(CSVExprUtils.toChar("""\r""") === '\r') + assert(CSVExprUtils.toChar("""\b""") === '\b') + assert(CSVExprUtils.toChar("""\f""") === '\f') + assert(CSVExprUtils.toChar("""\"""") === '\"') + assert(CSVExprUtils.toChar("""\'""") === '\'') + assert(CSVExprUtils.toChar("""\u0000""") === '\u0000') + assert(CSVExprUtils.toChar("""\\""") === '\\') } test("Does not accept delimiter larger than one character") { val exception = intercept[IllegalArgumentException]{ - CSVExpressionUtils.toChar("ab") + CSVExprUtils.toChar("ab") } assert(exception.getMessage.contains("cannot be more than one character")) } test("Throws exception for unsupported escaped characters") { val exception = intercept[IllegalArgumentException]{ - CSVExpressionUtils.toChar("""\1""") + CSVExprUtils.toChar("""\1""") } assert(exception.getMessage.contains("Unsupported special character for delimiter")) } test("string with one backward slash is prohibited") { val exception = intercept[IllegalArgumentException]{ - CSVExpressionUtils.toChar("""\""") + CSVExprUtils.toChar("""\""") } assert(exception.getMessage.contains("Single backslash is prohibited")) } test("output proper error message for empty string") { val exception = intercept[IllegalArgumentException]{ - CSVExpressionUtils.toChar("") + CSVExprUtils.toChar("") } assert(exception.getMessage.contains("Delimiter cannot be empty string")) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala index 8b6945968181f..21fabac472f4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.csv import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset -import org.apache.spark.sql.catalyst.csv.CSVExpressionUtils +import org.apache.spark.sql.catalyst.csv.CSVExprUtils import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.functions._ @@ -128,5 +128,5 @@ object CSVUtils { } def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = - CSVExpressionUtils.filterCommentAndEmpty(iter, options) + CSVExprUtils.filterCommentAndEmpty(iter, options) } From cb23bd7c8c1d91661ff7c8241768df51891f6b7d Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Tue, 16 Oct 2018 12:07:35 +0800 Subject: [PATCH 43/44] Address comments (#12) --- .../spark/sql/catalyst/expressions/csvExpressions.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 459a53c7c0fe6..3484335353693 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -79,6 +79,10 @@ case class CsvToStructs( lazy val converter = (rows: Iterator[InternalRow]) => { if (rows.hasNext) { rows.next() + val result = rows.next + // CSV's parser produces one record only. + assert(!rows.hasNext) + result } else { throw new IllegalArgumentException("Expected one row from CSV parser.") } From 205e4a474b459d47015d39f696aa444c3894703d Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Tue, 16 Oct 2018 20:28:30 +0800 Subject: [PATCH 44/44] Fix some nits (#13) --- .../apache/spark/sql/catalyst/expressions/csvExpressions.scala | 3 +-- .../csv/{CSVUtilsSuite.scala => CSVExprUtilsSuite.scala} | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) rename sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/{CSVUtilsSuite.scala => CSVExprUtilsSuite.scala} (97%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 3484335353693..a63b6245c499e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -78,8 +78,7 @@ case class CsvToStructs( @transient lazy val converter = (rows: Iterator[InternalRow]) => { if (rows.hasNext) { - rows.next() - val result = rows.next + val result = rows.next() // CSV's parser produces one record only. assert(!rows.hasNext) result diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtilsSuite.scala similarity index 97% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVUtilsSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtilsSuite.scala index 1133b3d3355d3..838ac42184fa5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtilsSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.csv import org.apache.spark.SparkFunSuite -class CSVExpressionUtilsSuite extends SparkFunSuite { +class CSVExprUtilsSuite extends SparkFunSuite { test("Can parse escaped characters") { assert(CSVExprUtils.toChar("""\t""") === '\t') assert(CSVExprUtils.toChar("""\r""") === '\r')