@@ -329,83 +329,97 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
329329 test(" SPARK-24204 error handling for unsupported Interval data types - csv, json, parquet, orc" ) {
330330 withTempDir { dir =>
331331 val tempDir = new File (dir, " files" ).getCanonicalPath
332- // TODO(SPARK-26744): support data type validating in V2 data source, and test V2 as well.
333- withSQLConf(SQLConf .USE_V1_SOURCE_WRITER_LIST .key -> " orc" ) {
334- // write path
335- Seq (" csv" , " json" , " parquet" , " orc" ).foreach { format =>
336- var msg = intercept[AnalysisException ] {
337- sql(" select interval 1 days" ).write.format(format).mode(" overwrite" ).save(tempDir)
338- }.getMessage
339- assert(msg.contains(" Cannot save interval data type into external storage." ))
340-
341- msg = intercept[AnalysisException ] {
342- spark.udf.register(" testType" , () => new IntervalData ())
343- sql(" select testType()" ).write.format(format).mode(" overwrite" ).save(tempDir)
344- }.getMessage
345- assert(msg.toLowerCase(Locale .ROOT )
346- .contains(s " $format data source does not support calendarinterval data type. " ))
332+ Seq (true , false ).foreach { useV1 =>
333+ val useV1List = if (useV1) {
334+ " orc"
335+ } else {
336+ " "
347337 }
338+ def errorMessage (format : String , isWrite : Boolean ): String = {
339+ if (isWrite && (useV1 || format != " orc" )) {
340+ " cannot save interval data type into external storage."
341+ } else {
342+ s " $format data source does not support calendarinterval data type. "
343+ }
344+ }
345+
346+ withSQLConf(SQLConf .USE_V1_SOURCE_WRITER_LIST .key -> useV1List) {
347+ // write path
348+ Seq (" csv" , " json" , " parquet" , " orc" ).foreach { format =>
349+ var msg = intercept[AnalysisException ] {
350+ sql(" select interval 1 days" ).write.format(format).mode(" overwrite" ).save(tempDir)
351+ }.getMessage
352+ assert(msg.toLowerCase(Locale .ROOT ).contains(errorMessage(format, true )))
353+ }
348354
349- // read path
350- Seq (" parquet" , " csv" ).foreach { format =>
351- var msg = intercept[AnalysisException ] {
352- val schema = StructType (StructField (" a" , CalendarIntervalType , true ) :: Nil )
353- spark.range(1 ).write.format(format).mode(" overwrite" ).save(tempDir)
354- spark.read.schema(schema).format(format).load(tempDir).collect()
355- }.getMessage
356- assert(msg.toLowerCase(Locale .ROOT )
357- .contains(s " $format data source does not support calendarinterval data type. " ))
358-
359- msg = intercept[AnalysisException ] {
360- val schema = StructType (StructField (" a" , new IntervalUDT (), true ) :: Nil )
361- spark.range(1 ).write.format(format).mode(" overwrite" ).save(tempDir)
362- spark.read.schema(schema).format(format).load(tempDir).collect()
363- }.getMessage
364- assert(msg.toLowerCase(Locale .ROOT )
365- .contains(s " $format data source does not support calendarinterval data type. " ))
355+ // read path
356+ Seq (" parquet" , " csv" ).foreach { format =>
357+ var msg = intercept[AnalysisException ] {
358+ val schema = StructType (StructField (" a" , CalendarIntervalType , true ) :: Nil )
359+ spark.range(1 ).write.format(format).mode(" overwrite" ).save(tempDir)
360+ spark.read.schema(schema).format(format).load(tempDir).collect()
361+ }.getMessage
362+ assert(msg.toLowerCase(Locale .ROOT ).contains(errorMessage(format, false )))
363+
364+ msg = intercept[AnalysisException ] {
365+ val schema = StructType (StructField (" a" , new IntervalUDT (), true ) :: Nil )
366+ spark.range(1 ).write.format(format).mode(" overwrite" ).save(tempDir)
367+ spark.read.schema(schema).format(format).load(tempDir).collect()
368+ }.getMessage
369+ assert(msg.toLowerCase(Locale .ROOT ).contains(errorMessage(format, false )))
370+ }
366371 }
367372 }
368373 }
369374 }
370375
371376 test(" SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc" ) {
372- // TODO(SPARK-26744): support data type validating in V2 data source, and test V2 as well.
373- withSQLConf(SQLConf .USE_V1_SOURCE_READER_LIST .key -> " orc" ,
374- SQLConf .USE_V1_SOURCE_WRITER_LIST .key -> " orc" ) {
375- withTempDir { dir =>
376- val tempDir = new File (dir, " files" ).getCanonicalPath
377-
378- Seq (" parquet" , " csv" , " orc" ).foreach { format =>
379- // write path
380- var msg = intercept[AnalysisException ] {
381- sql(" select null" ).write.format(format).mode(" overwrite" ).save(tempDir)
382- }.getMessage
383- assert(msg.toLowerCase(Locale .ROOT )
384- .contains(s " $format data source does not support null data type. " ))
385-
386- msg = intercept[AnalysisException ] {
387- spark.udf.register(" testType" , () => new NullData ())
388- sql(" select testType()" ).write.format(format).mode(" overwrite" ).save(tempDir)
389- }.getMessage
390- assert(msg.toLowerCase(Locale .ROOT )
391- .contains(s " $format data source does not support null data type. " ))
392-
393- // read path
394- msg = intercept[AnalysisException ] {
395- val schema = StructType (StructField (" a" , NullType , true ) :: Nil )
396- spark.range(1 ).write.format(format).mode(" overwrite" ).save(tempDir)
397- spark.read.schema(schema).format(format).load(tempDir).collect()
398- }.getMessage
399- assert(msg.toLowerCase(Locale .ROOT )
400- .contains(s " $format data source does not support null data type. " ))
401-
402- msg = intercept[AnalysisException ] {
403- val schema = StructType (StructField (" a" , new NullUDT (), true ) :: Nil )
404- spark.range(1 ).write.format(format).mode(" overwrite" ).save(tempDir)
405- spark.read.schema(schema).format(format).load(tempDir).collect()
406- }.getMessage
407- assert(msg.toLowerCase(Locale .ROOT )
408- .contains(s " $format data source does not support null data type. " ))
377+ Seq (true , false ).foreach { useV1 =>
378+ val useV1List = if (useV1) {
379+ " orc"
380+ } else {
381+ " "
382+ }
383+ def errorMessage (format : String ): String = {
384+ s " $format data source does not support null data type. "
385+ }
386+ withSQLConf(SQLConf .USE_V1_SOURCE_READER_LIST .key -> useV1List,
387+ SQLConf .USE_V1_SOURCE_WRITER_LIST .key -> useV1List) {
388+ withTempDir { dir =>
389+ val tempDir = new File (dir, " files" ).getCanonicalPath
390+
391+ Seq (" parquet" , " csv" , " orc" ).foreach { format =>
392+ // write path
393+ var msg = intercept[AnalysisException ] {
394+ sql(" select null" ).write.format(format).mode(" overwrite" ).save(tempDir)
395+ }.getMessage
396+ assert(msg.toLowerCase(Locale .ROOT )
397+ .contains(errorMessage(format)))
398+
399+ msg = intercept[AnalysisException ] {
400+ spark.udf.register(" testType" , () => new NullData ())
401+ sql(" select testType()" ).write.format(format).mode(" overwrite" ).save(tempDir)
402+ }.getMessage
403+ assert(msg.toLowerCase(Locale .ROOT )
404+ .contains(errorMessage(format)))
405+
406+ // read path
407+ msg = intercept[AnalysisException ] {
408+ val schema = StructType (StructField (" a" , NullType , true ) :: Nil )
409+ spark.range(1 ).write.format(format).mode(" overwrite" ).save(tempDir)
410+ spark.read.schema(schema).format(format).load(tempDir).collect()
411+ }.getMessage
412+ assert(msg.toLowerCase(Locale .ROOT )
413+ .contains(errorMessage(format)))
414+
415+ msg = intercept[AnalysisException ] {
416+ val schema = StructType (StructField (" a" , new NullUDT (), true ) :: Nil )
417+ spark.range(1 ).write.format(format).mode(" overwrite" ).save(tempDir)
418+ spark.read.schema(schema).format(format).load(tempDir).collect()
419+ }.getMessage
420+ assert(msg.toLowerCase(Locale .ROOT )
421+ .contains(errorMessage(format)))
422+ }
409423 }
410424 }
411425 }
0 commit comments