From a275120c2ea4c878e52d3f4ea5bd97be20d0bcd2 Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Fri, 29 Jan 2016 19:55:30 +0000 Subject: [PATCH 01/20] Fix schema/type inference issue #216 --- .../databricks/spark/csv/util/InferSchema.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/util/InferSchema.scala b/src/main/scala/com/databricks/spark/csv/util/InferSchema.scala index d9991a2..7d558c8 100644 --- a/src/main/scala/com/databricks/spark/csv/util/InferSchema.scala +++ b/src/main/scala/com/databricks/spark/csv/util/InferSchema.scala @@ -42,7 +42,11 @@ private[csv] object InferSchema { mergeRowTypes) val structFields = header.zip(rootTypes).map { case (thisHeader, rootType) => - StructField(thisHeader, rootType, nullable = true) + val dType = rootType match { + case z: NullType => StringType + case other => other + } + StructField(thisHeader, dType, nullable = true) } StructType(structFields) @@ -62,11 +66,7 @@ private[csv] object InferSchema { first: Array[DataType], second: Array[DataType]): Array[DataType] = { first.zipAll(second, NullType, NullType).map { case ((a, b)) => - val tpe = findTightestCommonType(a, b).getOrElse(StringType) - tpe match { - case _: NullType => StringType - case other => other - } + findTightestCommonType(a, b).getOrElse(NullType) } } @@ -93,7 +93,6 @@ private[csv] object InferSchema { } } - private def tryParseInteger(field: String): DataType = if ((allCatch opt field.toInt).isDefined) { IntegerType } else { @@ -152,6 +151,8 @@ private[csv] object InferSchema { case (t1, t2) if t1 == t2 => Some(t1) case (NullType, t1) => Some(t1) case (t1, NullType) => Some(t1) + case (StringType, t2) => Some(StringType) + case (t1, StringType) => Some(StringType) // Promote numeric types to the highest of the two and all numeric types to unlimited decimal case (t1, t2) if Seq(t1, t2).forall(numericPrecedence.contains) => From 8b4dbdd59c55e3d4bb76554518264f6d833ad0c1 Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Fri, 29 Jan 2016 19:55:47 +0000 Subject: [PATCH 02/20] Add testcases for #216 --- .../com/databricks/spark/csv/util/InferSchemaSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala b/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala index d713649..40b3b30 100644 --- a/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala @@ -40,6 +40,14 @@ class InferSchemaSuite extends FunSuite { assert(InferSchema.inferField(LongType, "2015-08 14:49:00") == StringType) } + test("Merging Nulltypes should yeild Nulltype.") + { + assert( + InferSchema.mergeRowTypes(Array(NullType), + Array(NullType)).deep == Array(NullType).deep) + + } + test("Type arrays are merged to highest common type") { assert( InferSchema.mergeRowTypes(Array(StringType), From b150c556cde863f0734dc5c090dd37c17895aed3 Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Sat, 30 Jan 2016 14:31:23 +0000 Subject: [PATCH 03/20] Adding a simple parse dataset for testing type/schema inference #216 --- src/test/resources/simple.csv | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 src/test/resources/simple.csv diff --git a/src/test/resources/simple.csv b/src/test/resources/simple.csv new file mode 100644 index 0000000..02d29ca --- /dev/null +++ b/src/test/resources/simple.csv @@ -0,0 +1,5 @@ +A,B,C,D +1,,, +,1,, +,,1, +,,,1 From 9be0313f6cd323e3f5d28ffae2ee8f53d78e8e66 Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Sat, 30 Jan 2016 14:31:41 +0000 Subject: [PATCH 04/20] Bit of refactoring --- .../com/databricks/spark/csv/CsvParser.scala | 35 ++++++------------- 1 file changed, 11 insertions(+), 24 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvParser.scala b/src/main/scala/com/databricks/spark/csv/CsvParser.scala index 370ab3c..e8c63b6 100644 --- a/src/main/scala/com/databricks/spark/csv/CsvParser.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvParser.scala @@ -15,7 +15,6 @@ */ package com.databricks.spark.csv - import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.types.StructType @@ -117,12 +116,11 @@ class CsvParser extends Serializable { this } - /** Returns a Schema RDD for the given CSV path. */ - @throws[RuntimeException] - def csvFile(sqlContext: SQLContext, path: String): DataFrame = { - val relation: CsvRelation = CsvRelation( - () => TextFile.withCharset(sqlContext.sparkContext, path, charset), - Some(path), + /** Returns a csvRelation instance based on the state definition of csv parser.*/ + private[csv] def csvRelation(sqlContext: SQLContext, csvRDD: RDD[String], path: Option[String]): CsvRelation = { + CsvRelation( + () => csvRDD, + path, useHeader, delimiter, quote, @@ -137,27 +135,16 @@ class CsvParser extends Serializable { inferSchema, codec, nullValue)(sqlContext) + } + /** Returns a Schema RDD for the given CSV path. */ + @throws[RuntimeException] + def csvFile(sqlContext: SQLContext, path: String): DataFrame = { + val relation: CsvRelation = csvRelation(sqlContext, TextFile.withCharset(sqlContext.sparkContext, path, charset), Some(path)) sqlContext.baseRelationToDataFrame(relation) } def csvRdd(sqlContext: SQLContext, csvRDD: RDD[String]): DataFrame = { - val relation: CsvRelation = CsvRelation( - () => csvRDD, - None, - useHeader, - delimiter, - quote, - escape, - comment, - parseMode, - parserLib, - ignoreLeadingWhiteSpace, - ignoreTrailingWhiteSpace, - treatEmptyValuesAsNulls, - schema, - inferSchema, - codec, - nullValue)(sqlContext) + val relation: CsvRelation = csvRelation(sqlContext, csvRDD, None) sqlContext.baseRelationToDataFrame(relation) } } From ff9617212e879cf55b7f59d12f82a40a010ec911 Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Sat, 30 Jan 2016 14:32:31 +0000 Subject: [PATCH 05/20] Adding an end to end test case for type inference of simple parse dataset #216 --- .../spark/csv/util/InferSchemaSuite.scala | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala b/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala index 40b3b30..8e3890a 100644 --- a/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala @@ -2,8 +2,31 @@ package com.databricks.spark.csv.util import org.apache.spark.sql.types._ import org.scalatest.FunSuite +import org.scalatest.BeforeAndAfterAll +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext +import com.databricks.spark.csv.CsvParser +import com.databricks.spark.csv.CsvRelation -class InferSchemaSuite extends FunSuite { +class InferSchemaSuite extends FunSuite with BeforeAndAfterAll { + + private val simpleDatasetFile = "src/test/resources/simple.csv" + private val utf8Charset = "utf-8" + private var sqlContext: SQLContext = _ + + override def beforeAll(): Unit = + { + super.beforeAll() + sqlContext = new SQLContext(new SparkContext("local[2]", "InferSchemaSuite")) + } + + override def afterAll(): Unit = { + try { + sqlContext.sparkContext.stop() + } finally { + super.afterAll() + } + } test("String fields types are inferred correctly from null types") { assert(InferSchema.inferField(NullType, "") == NullType) @@ -60,4 +83,13 @@ class InferSchemaSuite extends FunSuite { Array(LongType)).deep == Array(DoubleType).deep) } + test("Type/Schema inference works as expected for the simple parse dataset.") + { + val df = new CsvParser().withUseHeader(true).withInferSchema(true).csvFile(sqlContext, simpleDatasetFile) + assert( + df.schema.fields.map{field => field.dataType}.deep == + Array(IntegerType, IntegerType, IntegerType, IntegerType).deep + ) + + } } From d36b4a80626b68eaf01c0e8c8d572bf74e13c368 Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Sat, 30 Jan 2016 14:51:03 +0000 Subject: [PATCH 06/20] Revert "Bit of refactoring" This reverts commit 9be0313f6cd323e3f5d28ffae2ee8f53d78e8e66. --- .../com/databricks/spark/csv/CsvParser.scala | 35 +++++++++++++------ 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvParser.scala b/src/main/scala/com/databricks/spark/csv/CsvParser.scala index e8c63b6..370ab3c 100644 --- a/src/main/scala/com/databricks/spark/csv/CsvParser.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvParser.scala @@ -15,6 +15,7 @@ */ package com.databricks.spark.csv + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.types.StructType @@ -116,11 +117,12 @@ class CsvParser extends Serializable { this } - /** Returns a csvRelation instance based on the state definition of csv parser.*/ - private[csv] def csvRelation(sqlContext: SQLContext, csvRDD: RDD[String], path: Option[String]): CsvRelation = { - CsvRelation( - () => csvRDD, - path, + /** Returns a Schema RDD for the given CSV path. */ + @throws[RuntimeException] + def csvFile(sqlContext: SQLContext, path: String): DataFrame = { + val relation: CsvRelation = CsvRelation( + () => TextFile.withCharset(sqlContext.sparkContext, path, charset), + Some(path), useHeader, delimiter, quote, @@ -135,16 +137,27 @@ class CsvParser extends Serializable { inferSchema, codec, nullValue)(sqlContext) - } - /** Returns a Schema RDD for the given CSV path. */ - @throws[RuntimeException] - def csvFile(sqlContext: SQLContext, path: String): DataFrame = { - val relation: CsvRelation = csvRelation(sqlContext, TextFile.withCharset(sqlContext.sparkContext, path, charset), Some(path)) sqlContext.baseRelationToDataFrame(relation) } def csvRdd(sqlContext: SQLContext, csvRDD: RDD[String]): DataFrame = { - val relation: CsvRelation = csvRelation(sqlContext, csvRDD, None) + val relation: CsvRelation = CsvRelation( + () => csvRDD, + None, + useHeader, + delimiter, + quote, + escape, + comment, + parseMode, + parserLib, + ignoreLeadingWhiteSpace, + ignoreTrailingWhiteSpace, + treatEmptyValuesAsNulls, + schema, + inferSchema, + codec, + nullValue)(sqlContext) sqlContext.baseRelationToDataFrame(relation) } } From c20b852a4938642d147d1f5109e5f8f8d6200d02 Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Sat, 30 Jan 2016 14:57:28 +0000 Subject: [PATCH 07/20] Fix scalastyle issue - max length 100 --- .../scala/com/databricks/spark/csv/util/InferSchemaSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala b/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala index 8e3890a..2d1039d 100644 --- a/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala @@ -85,7 +85,8 @@ class InferSchemaSuite extends FunSuite with BeforeAndAfterAll { test("Type/Schema inference works as expected for the simple parse dataset.") { - val df = new CsvParser().withUseHeader(true).withInferSchema(true).csvFile(sqlContext, simpleDatasetFile) + val df = new CsvParser().withUseHeader(true).withInferSchema(true) + .csvFile(sqlContext, simpleDatasetFile) assert( df.schema.fields.map{field => field.dataType}.deep == Array(IntegerType, IntegerType, IntegerType, IntegerType).deep From 18957e2decf43086a779e5ee5c96d7a49aa4e57e Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Sat, 30 Jan 2016 14:31:41 +0000 Subject: [PATCH 08/20] Bit of refactoring --- .../com/databricks/spark/csv/CsvParser.scala | 35 ++++++------------- 1 file changed, 11 insertions(+), 24 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvParser.scala b/src/main/scala/com/databricks/spark/csv/CsvParser.scala index 370ab3c..e8c63b6 100644 --- a/src/main/scala/com/databricks/spark/csv/CsvParser.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvParser.scala @@ -15,7 +15,6 @@ */ package com.databricks.spark.csv - import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.types.StructType @@ -117,12 +116,11 @@ class CsvParser extends Serializable { this } - /** Returns a Schema RDD for the given CSV path. */ - @throws[RuntimeException] - def csvFile(sqlContext: SQLContext, path: String): DataFrame = { - val relation: CsvRelation = CsvRelation( - () => TextFile.withCharset(sqlContext.sparkContext, path, charset), - Some(path), + /** Returns a csvRelation instance based on the state definition of csv parser.*/ + private[csv] def csvRelation(sqlContext: SQLContext, csvRDD: RDD[String], path: Option[String]): CsvRelation = { + CsvRelation( + () => csvRDD, + path, useHeader, delimiter, quote, @@ -137,27 +135,16 @@ class CsvParser extends Serializable { inferSchema, codec, nullValue)(sqlContext) + } + /** Returns a Schema RDD for the given CSV path. */ + @throws[RuntimeException] + def csvFile(sqlContext: SQLContext, path: String): DataFrame = { + val relation: CsvRelation = csvRelation(sqlContext, TextFile.withCharset(sqlContext.sparkContext, path, charset), Some(path)) sqlContext.baseRelationToDataFrame(relation) } def csvRdd(sqlContext: SQLContext, csvRDD: RDD[String]): DataFrame = { - val relation: CsvRelation = CsvRelation( - () => csvRDD, - None, - useHeader, - delimiter, - quote, - escape, - comment, - parseMode, - parserLib, - ignoreLeadingWhiteSpace, - ignoreTrailingWhiteSpace, - treatEmptyValuesAsNulls, - schema, - inferSchema, - codec, - nullValue)(sqlContext) + val relation: CsvRelation = csvRelation(sqlContext, csvRDD, None) sqlContext.baseRelationToDataFrame(relation) } } From d41d5fb9aa76552c113d864c3ab05b2244bb9980 Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Sat, 30 Jan 2016 15:08:41 +0000 Subject: [PATCH 09/20] Fix scalastyle issue - max length 100 --- src/main/scala/com/databricks/spark/csv/CsvParser.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvParser.scala b/src/main/scala/com/databricks/spark/csv/CsvParser.scala index e8c63b6..9f62127 100644 --- a/src/main/scala/com/databricks/spark/csv/CsvParser.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvParser.scala @@ -139,7 +139,8 @@ class CsvParser extends Serializable { /** Returns a Schema RDD for the given CSV path. */ @throws[RuntimeException] def csvFile(sqlContext: SQLContext, path: String): DataFrame = { - val relation: CsvRelation = csvRelation(sqlContext, TextFile.withCharset(sqlContext.sparkContext, path, charset), Some(path)) + val relation: CsvRelation = csvRelation(sqlContext, + TextFile.withCharset(sqlContext.sparkContext, path, charset), Some(path)) sqlContext.baseRelationToDataFrame(relation) } From 9a1f428d58e7c7d5f24c399cd2719c01257b8130 Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Sat, 30 Jan 2016 17:06:15 +0000 Subject: [PATCH 10/20] Fix scalastyle issue - max length 100 --- src/main/scala/com/databricks/spark/csv/CsvParser.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvParser.scala b/src/main/scala/com/databricks/spark/csv/CsvParser.scala index 9f62127..8ce0f0b 100644 --- a/src/main/scala/com/databricks/spark/csv/CsvParser.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvParser.scala @@ -116,8 +116,9 @@ class CsvParser extends Serializable { this } - /** Returns a csvRelation instance based on the state definition of csv parser.*/ - private[csv] def csvRelation(sqlContext: SQLContext, csvRDD: RDD[String], path: Option[String]): CsvRelation = { + /** Returns a csvRelation instance based on the state definition of csv parser. */ + private[csv] def csvRelation(sqlContext: SQLContext, csvRDD: RDD[String], + path: Option[String]): CsvRelation = { CsvRelation( () => csvRDD, path, From 80fbac45e6ec15239d4b1c9c486c17b78d197eb1 Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Mon, 1 Feb 2016 06:38:31 +0000 Subject: [PATCH 11/20] Fix type --- .../scala/com/databricks/spark/csv/util/InferSchemaSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala b/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala index 2d1039d..9f35305 100644 --- a/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala @@ -83,7 +83,7 @@ class InferSchemaSuite extends FunSuite with BeforeAndAfterAll { Array(LongType)).deep == Array(DoubleType).deep) } - test("Type/Schema inference works as expected for the simple parse dataset.") + test("Type/Schema inference works as expected for the simple sparse dataset.") { val df = new CsvParser().withUseHeader(true).withInferSchema(true) .csvFile(sqlContext, simpleDatasetFile) From bb510a6b878dbea0553999b2353079f8a94d2f2c Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Mon, 1 Feb 2016 08:52:55 +0000 Subject: [PATCH 12/20] Reverting the refactoring part --- .../com/databricks/spark/csv/CsvParser.scala | 37 ++++++++++++------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvParser.scala b/src/main/scala/com/databricks/spark/csv/CsvParser.scala index 8ce0f0b..370ab3c 100644 --- a/src/main/scala/com/databricks/spark/csv/CsvParser.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvParser.scala @@ -15,6 +15,7 @@ */ package com.databricks.spark.csv + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.types.StructType @@ -116,12 +117,12 @@ class CsvParser extends Serializable { this } - /** Returns a csvRelation instance based on the state definition of csv parser. */ - private[csv] def csvRelation(sqlContext: SQLContext, csvRDD: RDD[String], - path: Option[String]): CsvRelation = { - CsvRelation( - () => csvRDD, - path, + /** Returns a Schema RDD for the given CSV path. */ + @throws[RuntimeException] + def csvFile(sqlContext: SQLContext, path: String): DataFrame = { + val relation: CsvRelation = CsvRelation( + () => TextFile.withCharset(sqlContext.sparkContext, path, charset), + Some(path), useHeader, delimiter, quote, @@ -136,17 +137,27 @@ class CsvParser extends Serializable { inferSchema, codec, nullValue)(sqlContext) - } - /** Returns a Schema RDD for the given CSV path. */ - @throws[RuntimeException] - def csvFile(sqlContext: SQLContext, path: String): DataFrame = { - val relation: CsvRelation = csvRelation(sqlContext, - TextFile.withCharset(sqlContext.sparkContext, path, charset), Some(path)) sqlContext.baseRelationToDataFrame(relation) } def csvRdd(sqlContext: SQLContext, csvRDD: RDD[String]): DataFrame = { - val relation: CsvRelation = csvRelation(sqlContext, csvRDD, None) + val relation: CsvRelation = CsvRelation( + () => csvRDD, + None, + useHeader, + delimiter, + quote, + escape, + comment, + parseMode, + parserLib, + ignoreLeadingWhiteSpace, + ignoreTrailingWhiteSpace, + treatEmptyValuesAsNulls, + schema, + inferSchema, + codec, + nullValue)(sqlContext) sqlContext.baseRelationToDataFrame(relation) } } From 2c24965f8db7a460878f7b68c9990e452dbda1c9 Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Thu, 4 Feb 2016 09:51:16 +0000 Subject: [PATCH 13/20] Move end to end test case to CSV suite; Fix indentation comments --- .../com/databricks/spark/csv/CsvSuite.scala | 13 +++++- .../spark/csv/util/InferSchemaSuite.scala | 41 ++----------------- 2 files changed, 14 insertions(+), 40 deletions(-) diff --git a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala index 6fddab5..4a0a81e 100755 --- a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala @@ -41,6 +41,7 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll { val tempEmptyDir = "target/test/empty/" val commentsFile = "src/test/resources/comments.csv" val disableCommentsFile = "src/test/resources/disable_comments.csv" + private val simpleDatasetFile = "src/test/resources/simple.csv" val numCars = 3 @@ -473,7 +474,6 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll { assert(carsCopy.collect.map(_.toString).toSet == cars.collect.map(_.toString).toSet) } - test("DSL save with quoting") { // Create temp directory TestUtils.deleteRecursively(new File(tempEmptyDir)) @@ -637,7 +637,6 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll { assert(results.toSeq.map(_.toSeq) === expected) } - test("Setting comment to null disables comment support") { val results: Array[Row] = new CsvParser() .withDelimiter(',') @@ -696,6 +695,16 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll { assert(results.size === numCars) } + + test("Type/Schema inference works as expected for the simple sparse dataset.") + { + val df = new CsvParser().withUseHeader(true).withInferSchema(true) + .csvFile(sqlContext, simpleDatasetFile) + + assert( + df.schema.fields.map{field => field.dataType}.deep == + Array(IntegerType, IntegerType, IntegerType, IntegerType).deep) + } } class CsvSuite extends AbstractCsvSuite { diff --git a/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala b/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala index 9f35305..e3cacdf 100644 --- a/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala @@ -2,31 +2,8 @@ package com.databricks.spark.csv.util import org.apache.spark.sql.types._ import org.scalatest.FunSuite -import org.scalatest.BeforeAndAfterAll -import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext -import com.databricks.spark.csv.CsvParser -import com.databricks.spark.csv.CsvRelation -class InferSchemaSuite extends FunSuite with BeforeAndAfterAll { - - private val simpleDatasetFile = "src/test/resources/simple.csv" - private val utf8Charset = "utf-8" - private var sqlContext: SQLContext = _ - - override def beforeAll(): Unit = - { - super.beforeAll() - sqlContext = new SQLContext(new SparkContext("local[2]", "InferSchemaSuite")) - } - - override def afterAll(): Unit = { - try { - sqlContext.sparkContext.stop() - } finally { - super.afterAll() - } - } +class InferSchemaSuite extends FunSuite { test("String fields types are inferred correctly from null types") { assert(InferSchema.inferField(NullType, "") == NullType) @@ -66,9 +43,8 @@ class InferSchemaSuite extends FunSuite with BeforeAndAfterAll { test("Merging Nulltypes should yeild Nulltype.") { assert( - InferSchema.mergeRowTypes(Array(NullType), - Array(NullType)).deep == Array(NullType).deep) - + InferSchema.mergeRowTypes(Array(NullType), + Array(NullType)).deep == Array(NullType).deep) } test("Type arrays are merged to highest common type") { @@ -82,15 +58,4 @@ class InferSchemaSuite extends FunSuite with BeforeAndAfterAll { InferSchema.mergeRowTypes(Array(DoubleType), Array(LongType)).deep == Array(DoubleType).deep) } - - test("Type/Schema inference works as expected for the simple sparse dataset.") - { - val df = new CsvParser().withUseHeader(true).withInferSchema(true) - .csvFile(sqlContext, simpleDatasetFile) - assert( - df.schema.fields.map{field => field.dataType}.deep == - Array(IntegerType, IntegerType, IntegerType, IntegerType).deep - ) - - } } From 890fadd1ee9140da5d0e4282b721c0f902d052cd Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Thu, 4 Feb 2016 09:52:50 +0000 Subject: [PATCH 14/20] Resolving merge conflicts --- build.sbt | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/build.sbt b/build.sbt index e556940..7d9e7bc 100755 --- a/build.sbt +++ b/build.sbt @@ -8,9 +8,9 @@ scalaVersion := "2.11.7" spName := "databricks/spark-csv" -crossScalaVersions := Seq("2.10.5", "2.11.7") +crossScalaVersions := Seq("2.10.4", "2.11.7") -sparkVersion := "1.6.0" +sparkVersion := "1.5.2" val testSparkVersion = settingKey[String]("The version of Spark to test against.") @@ -18,6 +18,8 @@ testSparkVersion := sys.props.get("spark.testVersion").getOrElse(sparkVersion.va sparkComponents := Seq("core", "sql") +assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) + libraryDependencies ++= Seq( "org.apache.commons" % "commons-csv" % "1.1", "com.univocity" % "univocity-parsers" % "1.5.1", @@ -29,7 +31,7 @@ libraryDependencies ++= Seq( libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % testSparkVersion.value % "test" force(), "org.apache.spark" %% "spark-sql" % testSparkVersion.value % "test" force(), - "org.scala-lang" % "scala-library" % scalaVersion.value % "compile" + "org.scala-lang" % "scala-library" % scalaVersion.value % "provided" ) // This is necessary because of how we explicitly specify Spark dependencies From 6f90a6bef4e5b6b5e664c3237b4e7bd77c526614 Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Thu, 4 Feb 2016 10:11:37 +0000 Subject: [PATCH 15/20] Fix indentation issue --- .../com/databricks/spark/csv/util/InferSchemaSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala b/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala index 643d04e..17db8bd 100644 --- a/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala @@ -57,9 +57,9 @@ class InferSchemaSuite extends FunSuite { test("Merging Nulltypes should yeild Nulltype.") { - assert( - InferSchema.mergeRowTypes(Array(NullType), - Array(NullType)).deep == Array(NullType).deep) + assert( + InferSchema.mergeRowTypes(Array(NullType), + Array(NullType)).deep == Array(NullType).deep) } test("Type arrays are merged to highest common type") { From 3d07d824d735a91f1013712681a682204110ebaa Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Thu, 4 Feb 2016 10:13:51 +0000 Subject: [PATCH 16/20] Revert build.sbt changes --- build.sbt | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/build.sbt b/build.sbt index 7d9e7bc..e556940 100755 --- a/build.sbt +++ b/build.sbt @@ -8,9 +8,9 @@ scalaVersion := "2.11.7" spName := "databricks/spark-csv" -crossScalaVersions := Seq("2.10.4", "2.11.7") +crossScalaVersions := Seq("2.10.5", "2.11.7") -sparkVersion := "1.5.2" +sparkVersion := "1.6.0" val testSparkVersion = settingKey[String]("The version of Spark to test against.") @@ -18,8 +18,6 @@ testSparkVersion := sys.props.get("spark.testVersion").getOrElse(sparkVersion.va sparkComponents := Seq("core", "sql") -assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) - libraryDependencies ++= Seq( "org.apache.commons" % "commons-csv" % "1.1", "com.univocity" % "univocity-parsers" % "1.5.1", @@ -31,7 +29,7 @@ libraryDependencies ++= Seq( libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % testSparkVersion.value % "test" force(), "org.apache.spark" %% "spark-sql" % testSparkVersion.value % "test" force(), - "org.scala-lang" % "scala-library" % scalaVersion.value % "provided" + "org.scala-lang" % "scala-library" % scalaVersion.value % "compile" ) // This is necessary because of how we explicitly specify Spark dependencies From 718b4670f58383639b213313a0988ee2b148cb40 Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Thu, 4 Feb 2016 12:12:36 +0000 Subject: [PATCH 17/20] Fix code style --- src/test/scala/com/databricks/spark/csv/CsvSuite.scala | 3 +-- .../scala/com/databricks/spark/csv/util/InferSchemaSuite.scala | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala index 4761e57..9ff49f6 100755 --- a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala @@ -718,8 +718,7 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll { assert(results.size === numCars) } - test("Type/Schema inference works as expected for the simple sparse dataset.") - { + test("Type/Schema inference works as expected for the simple sparse dataset.") { val df = new CsvParser().withUseHeader(true).withInferSchema(true) .csvFile(sqlContext, simpleDatasetFile) diff --git a/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala b/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala index 17db8bd..7c11302 100644 --- a/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala @@ -55,8 +55,7 @@ class InferSchemaSuite extends FunSuite { assert(InferSchema.inferField(LongType, "2015-08 14:49:00") == StringType) } - test("Merging Nulltypes should yeild Nulltype.") - { + test("Merging Nulltypes should yeild Nulltype.") { assert( InferSchema.mergeRowTypes(Array(NullType), Array(NullType)).deep == Array(NullType).deep) From 5ef29b84cb04ebb9e172e09140a9103e340ee923 Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Fri, 5 Feb 2016 05:37:29 +0000 Subject: [PATCH 18/20] Indentation --- .../scala/com/databricks/spark/csv/util/InferSchema.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/util/InferSchema.scala b/src/main/scala/com/databricks/spark/csv/util/InferSchema.scala index 7d558c8..f138264 100644 --- a/src/main/scala/com/databricks/spark/csv/util/InferSchema.scala +++ b/src/main/scala/com/databricks/spark/csv/util/InferSchema.scala @@ -43,8 +43,8 @@ private[csv] object InferSchema { val structFields = header.zip(rootTypes).map { case (thisHeader, rootType) => val dType = rootType match { - case z: NullType => StringType - case other => other + case z: NullType => StringType + case other => other } StructField(thisHeader, dType, nullable = true) } From 6771184e3b9dfffcf89745da277d7ca70185b0b6 Mon Sep 17 00:00:00 2001 From: Rahul Tanwani Date: Wed, 10 Feb 2016 19:36:46 +0000 Subject: [PATCH 19/20] Fix indentation issue --- .../scala/com/databricks/spark/csv/util/InferSchemaSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala b/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala index 7c11302..4bf578c 100644 --- a/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala @@ -72,4 +72,5 @@ class InferSchemaSuite extends FunSuite { InferSchema.mergeRowTypes(Array(DoubleType), Array(LongType)).deep == Array(DoubleType).deep) } + } From 7141f3593028c9162cad8d20fc910689d540a28a Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 11 Feb 2016 15:02:54 +0900 Subject: [PATCH 20/20] Stylic corrections --- src/test/scala/com/databricks/spark/csv/CsvSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala index 9ff49f6..2494be7 100755 --- a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala @@ -719,11 +719,13 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll { } test("Type/Schema inference works as expected for the simple sparse dataset.") { - val df = new CsvParser().withUseHeader(true).withInferSchema(true) + val df = new CsvParser() + .withUseHeader(true) + .withInferSchema(true) .csvFile(sqlContext, simpleDatasetFile) assert( - df.schema.fields.map{field => field.dataType}.deep == + df.schema.fields.map(_.dataType).deep == Array(IntegerType, IntegerType, IntegerType, IntegerType).deep) } }