Skip to content

Commit 00a2911

Browse files
lianchengyhuai
authored andcommitted
[SPARK-10540] Fixes flaky all-data-type test
This PR breaks the original test case into multiple ones (one test case for each data type). In this way, test failure output can be much more readable. Within each test case, we build a table with two columns, one of them is for the data type to test, the other is an "index" column, which is used to sort the DataFrame and workaround [SPARK-10591] [1] [1]: https://issues.apache.org/jira/browse/SPARK-10591 Author: Cheng Lian <[email protected]> Closes #8768 from liancheng/spark-10540/test-all-data-types.
1 parent 35e8ab9 commit 00a2911

File tree

1 file changed

+43
-66
lines changed

1 file changed

+43
-66
lines changed

sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala

Lines changed: 43 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -100,80 +100,57 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
100100
}
101101
}
102102

103-
ignore("test all data types") {
104-
withTempPath { file =>
105-
// Create the schema.
106-
val struct =
107-
StructType(
108-
StructField("f1", FloatType, true) ::
109-
StructField("f2", ArrayType(BooleanType), true) :: Nil)
110-
// TODO: add CalendarIntervalType to here once we can save it out.
111-
val dataTypes =
112-
Seq(
113-
StringType, BinaryType, NullType, BooleanType,
114-
ByteType, ShortType, IntegerType, LongType,
115-
FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
116-
DateType, TimestampType,
117-
ArrayType(IntegerType), MapType(StringType, LongType), struct,
118-
new MyDenseVectorUDT())
119-
val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
120-
StructField(s"col$index", dataType, nullable = true)
121-
}
122-
val schema = StructType(fields)
123-
124-
// Generate data at the driver side. We need to materialize the data first and then
125-
// create RDD.
126-
val maybeDataGenerator =
127-
RandomDataGenerator.forType(
128-
dataType = schema,
103+
private val supportedDataTypes = Seq(
104+
StringType, BinaryType,
105+
NullType, BooleanType,
106+
ByteType, ShortType, IntegerType, LongType,
107+
FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
108+
DateType, TimestampType,
109+
ArrayType(IntegerType),
110+
MapType(StringType, LongType),
111+
new StructType()
112+
.add("f1", FloatType, nullable = true)
113+
.add("f2", ArrayType(BooleanType, containsNull = true), nullable = true),
114+
new MyDenseVectorUDT()
115+
).filter(supportsDataType)
116+
117+
for (dataType <- supportedDataTypes) {
118+
test(s"test all data types - $dataType") {
119+
withTempPath { file =>
120+
val path = file.getCanonicalPath
121+
122+
val dataGenerator = RandomDataGenerator.forType(
123+
dataType = dataType,
129124
nullable = true,
130-
seed = Some(System.nanoTime()))
131-
val dataGenerator =
132-
maybeDataGenerator
133-
.getOrElse(fail(s"Failed to create data generator for schema $schema"))
134-
val data = (1 to 10).map { i =>
135-
dataGenerator.apply() match {
136-
case row: Row => row
137-
case null => Row.fromSeq(Seq.fill(schema.length)(null))
138-
case other =>
139-
fail(s"Row or null is expected to be generated, " +
140-
s"but a ${other.getClass.getCanonicalName} is generated.")
125+
seed = Some(System.nanoTime())
126+
).getOrElse {
127+
fail(s"Failed to create data generator for schema $dataType")
141128
}
142-
}
143129

144-
// Create a DF for the schema with random data.
145-
val rdd = sqlContext.sparkContext.parallelize(data, 10)
146-
val df = sqlContext.createDataFrame(rdd, schema)
130+
// Create a DF for the schema with random data. The index field is used to sort the
131+
// DataFrame. This is a workaround for SPARK-10591.
132+
val schema = new StructType()
133+
.add("index", IntegerType, nullable = false)
134+
.add("col", dataType, nullable = true)
135+
val rdd = sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
136+
val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
147137

148-
// All columns that have supported data types of this source.
149-
val supportedColumns = schema.fields.collect {
150-
case StructField(name, dataType, _, _) if supportsDataType(dataType) => name
151-
}
152-
val selectedColumns = util.Random.shuffle(supportedColumns.toSeq)
153-
154-
val dfToBeSaved = df.selectExpr(selectedColumns: _*)
155-
156-
// Save the data out.
157-
dfToBeSaved
158-
.write
159-
.format(dataSourceName)
160-
.option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
161-
.save(file.getCanonicalPath)
138+
df.write
139+
.mode("overwrite")
140+
.format(dataSourceName)
141+
.option("dataSchema", df.schema.json)
142+
.save(path)
162143

163-
val loadedDF =
164-
sqlContext
144+
val loadedDF = sqlContext
165145
.read
166146
.format(dataSourceName)
167-
.schema(dfToBeSaved.schema)
168-
.option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
169-
.load(file.getCanonicalPath)
170-
.selectExpr(selectedColumns: _*)
147+
.option("dataSchema", df.schema.json)
148+
.schema(df.schema)
149+
.load(path)
150+
.orderBy("index")
171151

172-
// Read the data back.
173-
checkAnswer(
174-
loadedDF,
175-
dfToBeSaved
176-
)
152+
checkAnswer(loadedDF, df)
153+
}
177154
}
178155
}
179156

0 commit comments

Comments
 (0)