Skip to content

Commit 593f2b2

Browse files
committed
Read date values stored in Spark 1.5.0.
1 parent 4fbf332 commit 593f2b2

File tree

3 files changed

+151
-2
lines changed

3 files changed

+151
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,38 @@ private[sql] object JacksonGenerator {
7373
valWriter(field.dataType, v)
7474
}
7575
gen.writeEndObject()
76+
77+
// For UDT, udt.serialize will produce SQL types. So, we need the following three cases.
78+
case (ArrayType(ty, _), v: ArrayData) =>
79+
gen.writeStartArray()
80+
v.foreach(ty, (_, value) => valWriter(ty, value))
81+
gen.writeEndArray()
82+
83+
case (MapType(kt, vt, _), v: MapData) =>
84+
gen.writeStartObject()
85+
v.foreach(kt, vt, { (k, v) =>
86+
gen.writeFieldName(k.toString)
87+
valWriter(vt, v)
88+
})
89+
gen.writeEndObject()
90+
91+
case (StructType(ty), v: InternalRow) =>
92+
gen.writeStartObject()
93+
var i = 0
94+
while (i < ty.length) {
95+
val field = ty(i)
96+
val value = v.get(i, field.dataType)
97+
if (value != null) {
98+
gen.writeFieldName(field.name)
99+
valWriter(field.dataType, value)
100+
}
101+
i += 1
102+
}
103+
gen.writeEndObject()
104+
105+
case (dt, v) =>
106+
sys.error(
107+
s"Failed to convert value $v (class of ${v.getClass}}) with the type of $dt to JSON.")
76108
}
77109

78110
valWriter(rowSchema, row)
@@ -133,6 +165,10 @@ private[sql] object JacksonGenerator {
133165
i += 1
134166
}
135167
gen.writeEndObject()
168+
169+
case (dt, v) =>
170+
sys.error(
171+
s"Failed to convert value $v (class of ${v.getClass}}) with the type of $dt to JSON.")
136172
}
137173

138174
valWriter(rowSchema, row)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,23 @@ private[sql] object JacksonParser {
6262
// guard the non string type
6363
null
6464

65+
case (VALUE_STRING, BinaryType) =>
66+
parser.getBinaryValue
67+
6568
case (VALUE_STRING, DateType) =>
66-
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
69+
val stringValue = parser.getText
70+
if (stringValue.contains("-")) {
71+
// The format of this string will probably be "yyyy-mm-dd".
72+
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
73+
} else {
74+
// In Spark 1.5.0, we store the data as number of days since epoch in string.
75+
// So, we just convert it to Int.
76+
stringValue.toInt
77+
}
6778

6879
case (VALUE_STRING, TimestampType) =>
80+
// This one will lose microseconds parts.
81+
// See https://issues.apache.org/jira/browse/SPARK-10681.
6982
DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
7083

7184
case (VALUE_NUMBER_INT, TimestampType) =>

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.core.JsonFactory
2424
import org.apache.spark.rdd.RDD
2525
import org.scalactic.Tolerance._
2626

27-
import org.apache.spark.sql.{QueryTest, Row, SQLConf}
27+
import org.apache.spark.sql._
2828
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2929
import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
3030
import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType
@@ -1159,4 +1159,104 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
11591159
"SELECT count(a) FROM test_myjson_with_part where d1 = 1"), Row(9))
11601160
})
11611161
}
1162+
1163+
// scalastyle:off
1164+
test("backward compatibility") {
1165+
// This test we make sure our JSON support can read JSON data generated by previous version of Spark
1166+
// generated through toJSON method and JSON data source.
1167+
// The data is generated by the following program.
1168+
// Here are a few notes:
1169+
// - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13)
1170+
// in the JSON object.
1171+
// - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to
1172+
// JSON objects generated by those Spark versions (col17).
1173+
// - If the type is NullType, we do not write data out.
1174+
1175+
// Create the schema.
1176+
val struct =
1177+
StructType(
1178+
StructField("f1", FloatType, true) ::
1179+
StructField("f2", ArrayType(BooleanType), true) :: Nil)
1180+
1181+
val dataTypes =
1182+
Seq(
1183+
StringType, BinaryType, NullType, BooleanType,
1184+
ByteType, ShortType, IntegerType, LongType,
1185+
FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
1186+
DateType, TimestampType,
1187+
ArrayType(IntegerType), MapType(StringType, LongType), struct,
1188+
new MyDenseVectorUDT())
1189+
val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
1190+
StructField(s"col$index", dataType, nullable = true)
1191+
}
1192+
val schema = StructType(fields)
1193+
1194+
val constantValues =
1195+
Seq(
1196+
"a string in binary".getBytes("UTF-8"),
1197+
null,
1198+
true,
1199+
1.toByte,
1200+
2.toShort,
1201+
3,
1202+
Long.MaxValue,
1203+
0.25.toFloat,
1204+
0.75,
1205+
new java.math.BigDecimal(s"1234.23456"),
1206+
new java.math.BigDecimal(s"1.23456"),
1207+
java.sql.Date.valueOf("2015-01-01"),
1208+
java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"),
1209+
Seq(2, 3, 4),
1210+
Map("a string" -> 2000L),
1211+
Row(4.75.toFloat, Seq(false, true)),
1212+
new MyDenseVector(Array(0.25, 2.25, 4.25)))
1213+
val data = Row.fromSeq(Seq("Spark " + sqlContext.sparkContext.version) ++ constantValues) :: Nil
1214+
1215+
// Data generated by previous versions.
1216+
val existingJSONData =
1217+
"""{"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1218+
"""{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1219+
"""{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1220+
"""{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1221+
"""{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1222+
"""{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
1223+
"""{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil
1224+
1225+
// Generate data for the current version.
1226+
val df = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(data, 1), schema)
1227+
withTempPath { path =>
1228+
df.write.format("json").mode("overwrite").save(path.getCanonicalPath)
1229+
1230+
// df.toJSON will convert internal rows to external rows first and then generate JSON objects.
1231+
// While, df.write.format("json") will write internal rows directly.
1232+
val allJSON =
1233+
existingJSONData ++
1234+
df.toJSON.collect() ++
1235+
sparkContext.textFile(path.getCanonicalPath).collect()
1236+
1237+
Utils.deleteRecursively(path)
1238+
sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath)
1239+
1240+
// Read data back with the schema specified.
1241+
val col0Values =
1242+
Seq(
1243+
"Spark 1.2.2",
1244+
"Spark 1.3.1",
1245+
"Spark 1.3.1",
1246+
"Spark 1.4.1",
1247+
"Spark 1.4.1",
1248+
"Spark 1.5.0",
1249+
"Spark 1.5.0",
1250+
"Spark " + sqlContext.sparkContext.version,
1251+
"Spark " + sqlContext.sparkContext.version)
1252+
val expectedResult = col0Values.map { v =>
1253+
Row.fromSeq(Seq(v) ++ constantValues)
1254+
}
1255+
checkAnswer(
1256+
sqlContext.read.format("json").schema(schema).load(path.getCanonicalPath),
1257+
expectedResult
1258+
)
1259+
}
1260+
}
1261+
// scalastyle:on
11621262
}

0 commit comments

Comments
 (0)