-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-10441][SQL] Save data correctly to json. #8597
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes | |
|
|
||
| val dataSourceName: String | ||
|
|
||
| protected def supportsDataType(dataType: DataType): Boolean = true | ||
|
|
||
| val dataSchema = | ||
| StructType( | ||
| Seq( | ||
|
|
@@ -98,6 +100,83 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes | |
| } | ||
| } | ||
|
|
||
| test("test all data types") { | ||
| withTempPath { file => | ||
| // Create the schema. | ||
| val struct = | ||
| StructType( | ||
| StructField("f1", FloatType, true) :: | ||
| StructField("f2", ArrayType(BooleanType), true) :: Nil) | ||
| // TODO: add CalendarIntervalType to here once we can save it out. | ||
| val dataTypes = | ||
| Seq( | ||
| StringType, BinaryType, NullType, BooleanType, | ||
| ByteType, ShortType, IntegerType, LongType, | ||
| FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), | ||
| DateType, TimestampType, | ||
| ArrayType(IntegerType), MapType(StringType, LongType), struct, | ||
| new MyDenseVectorUDT()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not think we can save it to any data source right now. |
||
| val fields = dataTypes.zipWithIndex.map { case (dataType, index) => | ||
| StructField(s"col$index", dataType, nullable = true) | ||
| } | ||
| val schema = StructType(fields) | ||
|
|
||
| // Generate data at the driver side. We need to materialize the data first and then | ||
| // create RDD. | ||
| val maybeDataGenerator = | ||
| RandomDataGenerator.forType( | ||
| dataType = schema, | ||
| nullable = true, | ||
| seed = Some(System.nanoTime())) | ||
| val dataGenerator = | ||
| maybeDataGenerator | ||
| .getOrElse(fail(s"Failed to create data generator for schema $schema")) | ||
| val data = (1 to 10).map { i => | ||
| dataGenerator.apply() match { | ||
| case row: Row => row | ||
| case null => Row.fromSeq(Seq.fill(schema.length)(null)) | ||
| case other => | ||
| fail(s"Row or null is expected to be generated, " + | ||
| s"but a ${other.getClass.getCanonicalName} is generated.") | ||
| } | ||
| } | ||
|
|
||
| // Create a DF for the schema with random data. | ||
| val rdd = sqlContext.sparkContext.parallelize(data, 10) | ||
| val df = sqlContext.createDataFrame(rdd, schema) | ||
|
|
||
| // All columns that have supported data types of this source. | ||
| val supportedColumns = schema.fields.collect { | ||
| case StructField(name, dataType, _, _) if supportsDataType(dataType) => name | ||
| } | ||
| val selectedColumns = util.Random.shuffle(supportedColumns.toSeq) | ||
|
|
||
| val dfToBeSaved = df.selectExpr(selectedColumns: _*) | ||
|
|
||
| // Save the data out. | ||
| dfToBeSaved | ||
| .write | ||
| .format(dataSourceName) | ||
| .option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests. | ||
| .save(file.getCanonicalPath) | ||
|
|
||
| val loadedDF = | ||
| sqlContext | ||
| .read | ||
| .format(dataSourceName) | ||
| .schema(dfToBeSaved.schema) | ||
| .option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests. | ||
| .load(file.getCanonicalPath) | ||
| .selectExpr(selectedColumns: _*) | ||
|
|
||
| // Read the data back. | ||
| checkAnswer( | ||
| loadedDF, | ||
| dfToBeSaved | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| test("save()/load() - non-partitioned table - Overwrite") { | ||
| withTempPath { file => | ||
| testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath) | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JoshRosen Can you take a look at my change to
RandomDataGenerator?