@@ -24,7 +24,7 @@ import com.fasterxml.jackson.core.JsonFactory
2424import org .apache .spark .rdd .RDD
2525import org .scalactic .Tolerance ._
2626
27- import org .apache .spark .sql .{ QueryTest , Row , SQLConf }
27+ import org .apache .spark .sql ._
2828import org .apache .spark .sql .catalyst .util .DateTimeUtils
2929import org .apache .spark .sql .execution .datasources .{ResolvedDataSource , LogicalRelation }
3030import org .apache .spark .sql .execution .datasources .json .InferSchema .compatibleType
@@ -1163,4 +1163,105 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
11631163 " SELECT count(a) FROM test_myjson_with_part where d1 = 1" ), Row (9 ))
11641164 })
11651165 }
1166+
1167+ test(" backward compatibility" ) {
1168+ // This test we make sure our JSON support can read JSON data generated by previous version
1169+ // of Spark generated through toJSON method and JSON data source.
1170+ // The data is generated by the following program.
1171+ // Here are a few notes:
1172+ // - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13)
1173+ // in the JSON object.
1174+ // - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to
1175+ // JSON objects generated by those Spark versions (col17).
1176+ // - If the type is NullType, we do not write data out.
1177+
1178+ // Create the schema.
1179+ val struct =
1180+ StructType (
1181+ StructField (" f1" , FloatType , true ) ::
1182+ StructField (" f2" , ArrayType (BooleanType ), true ) :: Nil )
1183+
1184+ val dataTypes =
1185+ Seq (
1186+ StringType , BinaryType , NullType , BooleanType ,
1187+ ByteType , ShortType , IntegerType , LongType ,
1188+ FloatType , DoubleType , DecimalType (25 , 5 ), DecimalType (6 , 5 ),
1189+ DateType , TimestampType ,
1190+ ArrayType (IntegerType ), MapType (StringType , LongType ), struct,
1191+ new MyDenseVectorUDT ())
1192+ val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
1193+ StructField (s " col $index" , dataType, nullable = true )
1194+ }
1195+ val schema = StructType (fields)
1196+
1197+ val constantValues =
1198+ Seq (
1199+ " a string in binary" .getBytes(" UTF-8" ),
1200+ null ,
1201+ true ,
1202+ 1 .toByte,
1203+ 2 .toShort,
1204+ 3 ,
1205+ Long .MaxValue ,
1206+ 0.25 .toFloat,
1207+ 0.75 ,
1208+ new java.math.BigDecimal (s " 1234.23456 " ),
1209+ new java.math.BigDecimal (s " 1.23456 " ),
1210+ java.sql.Date .valueOf(" 2015-01-01" ),
1211+ java.sql.Timestamp .valueOf(" 2015-01-01 23:50:59.123" ),
1212+ Seq (2 , 3 , 4 ),
1213+ Map (" a string" -> 2000L ),
1214+ Row (4.75 .toFloat, Seq (false , true )),
1215+ new MyDenseVector (Array (0.25 , 2.25 , 4.25 )))
1216+ val data =
1217+ Row .fromSeq(Seq (" Spark " + sqlContext.sparkContext.version) ++ constantValues) :: Nil
1218+
1219+ // Data generated by previous versions.
1220+ // scalastyle:off
1221+ val existingJSONData =
1222+ """ {"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1223+ """ {"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1224+ """ {"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1225+ """ {"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1226+ """ {"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1227+ """ {"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1228+ """ {"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil
1229+ // scalastyle:on
1230+
1231+ // Generate data for the current version.
1232+ val df = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(data, 1 ), schema)
1233+ withTempPath { path =>
1234+ df.write.format(" json" ).mode(" overwrite" ).save(path.getCanonicalPath)
1235+
1236+ // df.toJSON will convert internal rows to external rows first and then generate
1237+ // JSON objects. While, df.write.format("json") will write internal rows directly.
1238+ val allJSON =
1239+ existingJSONData ++
1240+ df.toJSON.collect() ++
1241+ sparkContext.textFile(path.getCanonicalPath).collect()
1242+
1243+ Utils .deleteRecursively(path)
1244+ sparkContext.parallelize(allJSON, 1 ).saveAsTextFile(path.getCanonicalPath)
1245+
1246+ // Read data back with the schema specified.
1247+ val col0Values =
1248+ Seq (
1249+ " Spark 1.2.2" ,
1250+ " Spark 1.3.1" ,
1251+ " Spark 1.3.1" ,
1252+ " Spark 1.4.1" ,
1253+ " Spark 1.4.1" ,
1254+ " Spark 1.5.0" ,
1255+ " Spark 1.5.0" ,
1256+ " Spark " + sqlContext.sparkContext.version,
1257+ " Spark " + sqlContext.sparkContext.version)
1258+ val expectedResult = col0Values.map { v =>
1259+ Row .fromSeq(Seq (v) ++ constantValues)
1260+ }
1261+ checkAnswer(
1262+ sqlContext.read.format(" json" ).schema(schema).load(path.getCanonicalPath),
1263+ expectedResult
1264+ )
1265+ }
1266+ }
11661267}
0 commit comments