Skip to content

Commit 2f10a99

Browse files
committed
Clean up CSV and enable JSON types for timestamp
1 parent 9b5a1d1 commit 2f10a99

File tree

4 files changed

+23
-32
lines changed

4 files changed

+23
-32
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
8484
job: Job,
8585
options: Map[String, String],
8686
dataSchema: StructType): OutputWriterFactory = {
87-
verifySchema(dataSchema)
8887
val conf = job.getConfiguration
8988
val csvOptions = new CSVOptions(options)
9089
csvOptions.compressionCodec.foreach { codec =>
@@ -184,15 +183,4 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
184183
.mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset)))
185184
}
186185
}
187-
188-
private def verifySchema(schema: StructType): Unit = {
189-
schema.foreach { field =>
190-
field.dataType match {
191-
case _: ArrayType | _: MapType | _: StructType =>
192-
throw new UnsupportedOperationException(
193-
s"CSV data source does not support ${field.dataType.simpleString} data type.")
194-
case _ =>
195-
}
196-
}
197-
}
198186
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,12 @@ private[csv] class CsvOutputWriter(
227227
}
228228

229229
private def makeConverter(dataType: DataType): ValueConverter = dataType match {
230+
case ByteType | ShortType | IntegerType | LongType =>
231+
(row: InternalRow, ordinal: Int) => row.get(ordinal, dataType).toString
232+
233+
case FloatType | DoubleType | _: DecimalType | BooleanType | StringType =>
234+
(row: InternalRow, ordinal: Int) => row.get(ordinal, dataType).toString
235+
230236
case DateType =>
231237
(row: InternalRow, ordinal: Int) =>
232238
params.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
@@ -235,11 +241,9 @@ private[csv] class CsvOutputWriter(
235241
(row: InternalRow, ordinal: Int) =>
236242
params.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
237243

238-
case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
239-
240-
case dt: DataType =>
241-
(row: InternalRow, ordinal: Int) =>
242-
row.get(ordinal, dt).toString
244+
case _ =>
245+
throw new UnsupportedOperationException(
246+
s"CSV data source does not support ${dataType.simpleString} data type.")
243247
}
244248

245249
override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -665,21 +665,24 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
665665
}
666666

667667
test("error handling for unsupported data types.") {
668-
withTempDir { dir =>
669-
val csvDir = new File(dir, "csv").getCanonicalPath
670-
var msg = intercept[UnsupportedOperationException] {
671-
Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, b)").write.csv(csvDir)
672-
}.getMessage
668+
withTempPath { path =>
669+
val msg = intercept[SparkException] {
670+
Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, b)").write.csv(path.getAbsolutePath)
671+
}.getCause.getMessage
673672
assert(msg.contains("CSV data source does not support struct<a:int,b:string> data type"))
673+
}
674674

675-
msg = intercept[UnsupportedOperationException] {
676-
Seq((1, Map("Tesla" -> 3))).toDF("id", "cars").write.csv(csvDir)
677-
}.getMessage
675+
withTempPath { path =>
676+
val msg = intercept[SparkException] {
677+
Seq((1, Map("Tesla" -> 3))).toDF().write.csv(path.getAbsolutePath)
678+
}.getCause.getMessage
678679
assert(msg.contains("CSV data source does not support map<string,int> data type"))
680+
}
679681

680-
msg = intercept[UnsupportedOperationException] {
681-
Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands").write.csv(csvDir)
682-
}.getMessage
682+
withTempPath { path =>
683+
val msg = intercept[SparkException] {
684+
Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF().write.csv(path.getAbsolutePath)
685+
}.getCause.getMessage
683686
assert(msg.contains("CSV data source does not support array<string> data type"))
684687
}
685688
}

sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,6 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
3232
override protected def supportsDataType(dataType: DataType): Boolean = dataType match {
3333
case _: NullType => false
3434
case _: BinaryType => false
35-
// `TimestampType` is disabled because `DatatypeConverter.parseDateTime()`
36-
// in `DateTimeUtils` parses the formatted string wrongly when the date is
37-
// too early. (e.g. "1600-07-13T08:36:32.847").
38-
case _: TimestampType => false
3935
case _: CalendarIntervalType => false
4036
case _ => true
4137
}

0 commit comments

Comments
 (0)