From 0da6fc44f866194f4a4a62eb1e94b2d08de1ba14 Mon Sep 17 00:00:00 2001 From: Xiu Guo Date: Tue, 29 Dec 2015 17:27:11 -0800 Subject: [PATCH 1/3] [SPARK-12562][SQL] DataFrame.write.format(text) requires the column name to be called value --- .../sql/execution/datasources/text/DefaultSource.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index 4a1cbe4c38fa2..93ccccd97ffd6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -50,7 +50,7 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { partitionColumns: Option[StructType], parameters: Map[String, String]): HadoopFsRelation = { dataSchema.foreach(verifySchema) - new TextRelation(None, partitionColumns, paths)(sqlContext) + new TextRelation(None, dataSchema, partitionColumns, paths)(sqlContext) } override def shortName(): String = "text" @@ -70,6 +70,7 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { private[sql] class TextRelation( val maybePartitionSpec: Option[PartitionSpec], + val textSchema: Option[StructType], override val userDefinedPartitionColumns: Option[StructType], override val paths: Array[String] = Array.empty[String], parameters: Map[String, String] = Map.empty[String, String]) @@ -77,8 +78,8 @@ private[sql] class TextRelation( extends HadoopFsRelation(maybePartitionSpec, parameters) { /** Data schema is always a single column, named "value". */ - override def dataSchema: StructType = new StructType().add("value", StringType) - + override def dataSchema: StructType = + textSchema.getOrElse(new StructType().add("value", StringType)) /** This is an internal data source that outputs internal row format. */ override val needConversion: Boolean = false From 5f9aaa9cd277a2972af566dfdec14c1010a8dd80 Mon Sep 17 00:00:00 2001 From: Xiu Guo Date: Tue, 29 Dec 2015 23:15:36 -0800 Subject: [PATCH 2/3] [SPARK-12562] add unit test --- .../execution/datasources/text/DefaultSource.scala | 2 +- .../sql/execution/datasources/text/TextSuite.scala | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index 93ccccd97ffd6..10529f2bdfd12 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -77,7 +77,7 @@ private[sql] class TextRelation( (@transient val sqlContext: SQLContext) extends HadoopFsRelation(maybePartitionSpec, parameters) { - /** Data schema is always a single column, named "value". */ + /** Data schema is always a single column, named "value" if original Data source has no schema. */ override def dataSchema: StructType = textSchema.getOrElse(new StructType().add("value", StringType)) /** This is an internal data source that outputs internal row format. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 914e516613f9e..90738c79fabda 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -58,6 +58,17 @@ class TextSuite extends QueryTest with SharedSQLContext { } } + test("SPARK-12562 verify write.text() can handle column name beyond `value`") { + val df = sqlContext.read.text(testFile).withColumnRenamed("value", "col1") + + val tempFile = Utils.createTempDir() + tempFile.delete() + df.write.text(tempFile.getCanonicalPath) + verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath)) + + Utils.deleteRecursively(tempFile) + } + private def testFile: String = { Thread.currentThread().getContextClassLoader.getResource("text-suite.txt").toString } From d01ecac4966896cdd05698006c032d58f511c36b Mon Sep 17 00:00:00 2001 From: Xiu Guo Date: Tue, 29 Dec 2015 23:44:04 -0800 Subject: [PATCH 3/3] [SPARK-12562] modify unit test --- .../execution/datasources/text/TextSuite.scala | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 90738c79fabda..02c416af50cd7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -33,8 +33,8 @@ class TextSuite extends QueryTest with SharedSQLContext { verifyFrame(sqlContext.read.text(testFile)) } - test("writing") { - val df = sqlContext.read.text(testFile) + test("SPARK-12562 verify write.text() can handle column name beyond `value`") { + val df = sqlContext.read.text(testFile).withColumnRenamed("value", "adwrasdf") val tempFile = Utils.createTempDir() tempFile.delete() @@ -58,17 +58,6 @@ class TextSuite extends QueryTest with SharedSQLContext { } } - test("SPARK-12562 verify write.text() can handle column name beyond `value`") { - val df = sqlContext.read.text(testFile).withColumnRenamed("value", "col1") - - val tempFile = Utils.createTempDir() - tempFile.delete() - df.write.text(tempFile.getCanonicalPath) - verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath)) - - Utils.deleteRecursively(tempFile) - } - private def testFile: String = { Thread.currentThread().getContextClassLoader.getResource("text-suite.txt").toString }