From 9361fe707bf21698cc1a9afe84d0525a41463013 Mon Sep 17 00:00:00 2001 From: Ray Ortigas Date: Sat, 18 Apr 2015 18:43:18 -0700 Subject: [PATCH 1/2] Support loading of RDDs (of case classes) from CSV. Squashed commit of the following: commit e75167f9fb03827757843fd093f454e048c988c5 Author: Ray Ortigas Date: Sat Apr 18 15:39:30 2015 -0700 Test for rejection of case classes with non-primitive fields. commit c4a1de0fe090eff5d771379c712f373e4d80c63b Author: Ray Ortigas Date: Sat Apr 18 11:54:53 2015 -0700 Don't inherit from csv.CsvContext. commit 674672d74190977434d81f70b0f39a1c1da22e7a Author: Ray Ortigas Date: Fri Apr 17 19:37:52 2015 -0700 Add TSV support. commit e93ec4c88fd05226fb913dfb8cc9f4c7d66ba36c Author: Ray Ortigas Date: Fri Apr 17 19:22:52 2015 -0700 Add comment about not handling inner case classes. commit 1495f51da3c18344152640bfa8fce8fa24dd8b3e Author: Ray Ortigas Date: Fri Apr 17 19:22:38 2015 -0700 Add test for headerless CSV. commit 6f7fcf3a2f919584da9187c1f23fa3ee9411dc9f Author: Ray Ortigas Date: Fri Apr 17 19:12:19 2015 -0700 Add test for permissive mode (which is invalid). commit ccbb6ba564fc4eeab405e84eae7ff68034758dc7 Author: Ray Ortigas Date: Fri Apr 17 19:10:54 2015 -0700 Add test for fail-fast mode. commit fb0f50d1e316d8dc5bfb951635675aaebec7543c Author: Ray Ortigas Date: Fri Apr 17 19:04:33 2015 -0700 Add test. commit 51a9868df0c8bbe59265e54f59b6c2aee4f6e586 Author: Ray Ortigas Date: Fri Apr 17 17:21:13 2015 -0700 Move RDD-related methods to own package. commit f5a2c2cffb2ab96f06243e84e5c93227736a4441 Author: Ray Ortigas Date: Fri Apr 17 16:31:10 2015 -0700 Use TypeTag and ClassTag instead of manifest. commit ffed4fc806b004c07c0c9590e837f0ce56ce0306 Author: Ray Ortigas Date: Fri Apr 17 15:41:32 2015 -0700 Express csvFileToRDD() in terms of csvFile(). commit b52f582c5badc4c1f29e64455bda3bd920fbb154 Author: Ray Ortigas Date: Fri Apr 17 15:38:43 2015 -0700 First cut at typed RDD. --- build.sbt | 4 ++ .../com/databricks/spark/csv/package.scala | 9 ++- .../databricks/spark/csv/rdd/package.scala | 59 +++++++++++++++++ ...ars-with-typed-columns-without-headers.csv | 4 ++ .../resources/cars-with-typed-columns.csv | 5 ++ .../resources/cars-with-typed-columns.tsv | 5 ++ .../spark/csv/rdd/CsvToRDDSuite.scala | 65 +++++++++++++++++++ 7 files changed, 148 insertions(+), 3 deletions(-) create mode 100644 src/main/scala/com/databricks/spark/csv/rdd/package.scala create mode 100644 src/test/resources/cars-with-typed-columns-without-headers.csv create mode 100644 src/test/resources/cars-with-typed-columns.csv create mode 100644 src/test/resources/cars-with-typed-columns.tsv create mode 100644 src/test/scala/com/databricks/spark/csv/rdd/CsvToRDDSuite.scala diff --git a/build.sbt b/build.sbt index 1f511bb..d4f2fc0 100755 --- a/build.sbt +++ b/build.sbt @@ -60,3 +60,7 @@ sparkComponents += "sql" libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.1" % "test" libraryDependencies += "com.novocode" % "junit-interface" % "0.9" % "test" + +// Fork to help tests of methods using reflection. +// See https://issues.apache.org/jira/browse/SPARK-5281. +fork in Test := true diff --git a/src/main/scala/com/databricks/spark/csv/package.scala b/src/main/scala/com/databricks/spark/csv/package.scala index 3d41de0..4cb6abe 100755 --- a/src/main/scala/com/databricks/spark/csv/package.scala +++ b/src/main/scala/com/databricks/spark/csv/package.scala @@ -19,6 +19,7 @@ import org.apache.commons.csv.CSVFormat import org.apache.hadoop.io.compress.CompressionCodec import org.apache.spark.sql.{SQLContext, DataFrame, Row} +import org.apache.spark.sql.types.StructType package object csv { @@ -31,14 +32,16 @@ package object csv { delimiter: Char = ',', quote: Char = '"', escape: Char = '\\', - mode: String = "PERMISSIVE") = { + mode: String = "PERMISSIVE", + schema: Option[StructType] = None) = { val csvRelation = CsvRelation( location = filePath, useHeader = useHeader, delimiter = delimiter, quote = quote, escape = escape, - parseMode = mode)(sqlContext) + parseMode = mode, + userSchema = schema.orNull)(sqlContext) sqlContext.baseRelationToDataFrame(csvRelation) } @@ -53,7 +56,7 @@ package object csv { sqlContext.baseRelationToDataFrame(csvRelation) } } - + implicit class CsvSchemaRDD(dataFrame: DataFrame) { /** diff --git a/src/main/scala/com/databricks/spark/csv/rdd/package.scala b/src/main/scala/com/databricks/spark/csv/rdd/package.scala new file mode 100644 index 0000000..056385b --- /dev/null +++ b/src/main/scala/com/databricks/spark/csv/rdd/package.scala @@ -0,0 +1,59 @@ +package com.databricks.spark.csv + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SQLContext, DataFrame, Row} +import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.types.StructType + +package object rdd { + implicit class CsvContextRDD(sqlContext: SQLContext) { + def csvFileToRDD[T: scala.reflect.runtime.universe.TypeTag : scala.reflect.ClassTag]( + filePath: String, + useHeader: Boolean = true, + delimiter: Char = ',', + quote: Char = '"', + escape: Char = '\\', + mode: String = "DROPMALFORMED"): RDD[T] = { + + if (mode == util.ParseModes.PERMISSIVE_MODE) + throw new IllegalArgumentException(s"permissive mode is invalid for this method") + + val schema = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType] + if (schema.exists { structField => !structField.dataType.isPrimitive }) + throw new IllegalArgumentException(s"type must be a case class with only primitive fields") + + val df = new CsvContext(sqlContext).csvFile(filePath, useHeader, delimiter, quote, escape, mode, Some(schema)) + df.mapPartitions[T] { iter => + val rowConverter = RowConverter[T]() + iter.map { row => rowConverter.convert(row) } + } + } + + def tsvFileToRDD[T: scala.reflect.runtime.universe.TypeTag : scala.reflect.ClassTag]( + filePath: String, + useHeader: Boolean = true, + mode: String = "DROPMALFORMED"): RDD[T] = { + csvFileToRDD[T](filePath, useHeader, delimiter = '\t', quote = '"', escape = '\\', mode) + } + } + + case class RowConverter[T]()(implicit ct: scala.reflect.ClassTag[T]) { + // http://docs.scala-lang.org/overviews/reflection/environment-universes-mirrors.html#types-of-mirrors-their-use-cases--examples + + // For Scala 2.10, because we're initializing the runtime universe, this is not thread-safe. + // http://docs.scala-lang.org/overviews/reflection/thread-safety.html + val ru = scala.reflect.runtime.universe + + val mirror = ru.runtimeMirror(getClass.getClassLoader) + val classSymbol = mirror.classSymbol(ct.runtimeClass) + val classMirror = mirror.reflectClass(classSymbol) + val constructorSymbol = classSymbol.toType.declaration(ru.nme.CONSTRUCTOR).asMethod + val constructorMirror = classMirror.reflectConstructor(constructorSymbol) + + def convert(row: Row): T = { + val args = row.toSeq + require(constructorSymbol.paramss.head.size == args.size) + constructorMirror.apply(args: _*).asInstanceOf[T] + } + } +} \ No newline at end of file diff --git a/src/test/resources/cars-with-typed-columns-without-headers.csv b/src/test/resources/cars-with-typed-columns-without-headers.csv new file mode 100644 index 0000000..20230ea --- /dev/null +++ b/src/test/resources/cars-with-typed-columns-without-headers.csv @@ -0,0 +1,4 @@ +"2012","Tesla","S","No comment",1,350000.00 + +1997,Ford,E350,"Go get one now they are going fast",3,25000.00 +2015,Chevy,Volt \ No newline at end of file diff --git a/src/test/resources/cars-with-typed-columns.csv b/src/test/resources/cars-with-typed-columns.csv new file mode 100644 index 0000000..6099267 --- /dev/null +++ b/src/test/resources/cars-with-typed-columns.csv @@ -0,0 +1,5 @@ +year,make,model,comment,stocked,price +"2012","Tesla","S","No comment",1,350000.00 + +1997,Ford,E350,"Go get one now they are going fast",3,25000.00 +2015,Chevy,Volt \ No newline at end of file diff --git a/src/test/resources/cars-with-typed-columns.tsv b/src/test/resources/cars-with-typed-columns.tsv new file mode 100644 index 0000000..df824a8 --- /dev/null +++ b/src/test/resources/cars-with-typed-columns.tsv @@ -0,0 +1,5 @@ +year make model comment stocked price +"2012" "Tesla" "S" "No comment" 1 350000.00 + +1997 Ford E350 "Go get one now they are going fast" 3 25000.00 +2015 Chevy Volt \ No newline at end of file diff --git a/src/test/scala/com/databricks/spark/csv/rdd/CsvToRDDSuite.scala b/src/test/scala/com/databricks/spark/csv/rdd/CsvToRDDSuite.scala new file mode 100644 index 0000000..736e3bc --- /dev/null +++ b/src/test/scala/com/databricks/spark/csv/rdd/CsvToRDDSuite.scala @@ -0,0 +1,65 @@ +package com.databricks.spark.csv.rdd + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.test._ +import org.apache.spark.sql.types._ +import org.scalatest.FunSuite +import org.scalatest.Matchers + +// Because this suite tests reflection, the test only works in SBT if the config uses forking. +// There is no workaround for Eclipse. +// See https://issues.apache.org/jira/browse/SPARK-5281. +class CsvToRDDSuite extends FunSuite with Matchers { + import CsvToRDDSuite._ + import TestSQLContext._ + + val carsFile = "src/test/resources/cars-with-typed-columns.csv" + val carsFileTsv = "src/test/resources/cars-with-typed-columns.tsv" + val carsFileWithoutHeaders = "src/test/resources/cars-with-typed-columns-without-headers.csv" + + test("DSL for RDD with DROPMALFORMED parsing mode") { + val rdd = TestSQLContext.csvFileToRDD[Car](carsFile) + rdd.collect() should contain theSameElementsAs Seq( + Car(2012, "Tesla", "S", "No comment", 1, 350000.00), + Car(1997, "Ford", "E350", "Go get one now they are going fast", 3, 25000.00)) + } + + test("DSL for RDD with DROPMALFORMED parsing mode, TSV") { + val rdd = TestSQLContext.tsvFileToRDD[Car](carsFileTsv) + rdd.collect() should contain theSameElementsAs Seq( + Car(2012, "Tesla", "S", "No comment", 1, 350000.00), + Car(1997, "Ford", "E350", "Go get one now they are going fast", 3, 25000.00)) + } + + test("DSL for RDD with DROPMALFORMED parsing mode, without headers") { + val rdd = TestSQLContext.csvFileToRDD[Car](carsFileWithoutHeaders, useHeader = false) + rdd.collect() should contain theSameElementsAs Seq( + Car(2012, "Tesla", "S", "No comment", 1, 350000.00), + Car(1997, "Ford", "E350", "Go get one now they are going fast", 3, 25000.00)) + } + + test("DSL for RDD with FAILFAST parsing mode") { + intercept[org.apache.spark.SparkException] { + val rdd = TestSQLContext.csvFileToRDD[Car](carsFile, mode = "FAILFAST") + println(rdd.collect()) + } + } + + test("DSL for RDD with PERMISSIVE parsing mode") { + intercept[IllegalArgumentException] { + TestSQLContext.csvFileToRDD[Car](carsFile, mode = "PERMISSIVE") + } + } + + test("DSL for RDD with invalid type argument") { + intercept[IllegalArgumentException] { + TestSQLContext.csvFileToRDD[CarWithNonPrimitive](carsFile) + } + } +} + +object CsvToRDDSuite { + case class Car(year: Int, make: String, model: String, comment: String, stocked: Int, price: Double) + case class CarWithNonPrimitive(year: Int, makeAndModel: MakeAndModel, comment: String, stocked: Int, price: Double) + case class MakeAndModel(make: String, model: String) +} From 2649026105611f3ddf608eab2ac5c8f17ff31f39 Mon Sep 17 00:00:00 2001 From: Ray Ortigas Date: Sat, 18 Apr 2015 19:11:58 -0700 Subject: [PATCH 2/2] Address style checking by spark-csv Travis CI. --- .../databricks/spark/csv/rdd/package.scala | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/rdd/package.scala b/src/main/scala/com/databricks/spark/csv/rdd/package.scala index 056385b..b4f4498 100644 --- a/src/main/scala/com/databricks/spark/csv/rdd/package.scala +++ b/src/main/scala/com/databricks/spark/csv/rdd/package.scala @@ -1,3 +1,18 @@ +/* + * Copyright 2014 Databricks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.databricks.spark.csv import org.apache.spark.rdd.RDD @@ -15,14 +30,17 @@ package object rdd { escape: Char = '\\', mode: String = "DROPMALFORMED"): RDD[T] = { - if (mode == util.ParseModes.PERMISSIVE_MODE) + if (mode == util.ParseModes.PERMISSIVE_MODE) { throw new IllegalArgumentException(s"permissive mode is invalid for this method") + } val schema = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType] - if (schema.exists { structField => !structField.dataType.isPrimitive }) + if (schema.exists { structField => !structField.dataType.isPrimitive }) { throw new IllegalArgumentException(s"type must be a case class with only primitive fields") + } - val df = new CsvContext(sqlContext).csvFile(filePath, useHeader, delimiter, quote, escape, mode, Some(schema)) + val csvContext = new CsvContext(sqlContext) + val df = csvContext.csvFile(filePath, useHeader, delimiter, quote, escape, mode, Some(schema)) df.mapPartitions[T] { iter => val rowConverter = RowConverter[T]() iter.map { row => rowConverter.convert(row) } @@ -38,7 +56,7 @@ package object rdd { } case class RowConverter[T]()(implicit ct: scala.reflect.ClassTag[T]) { - // http://docs.scala-lang.org/overviews/reflection/environment-universes-mirrors.html#types-of-mirrors-their-use-cases--examples + // http://docs.scala-lang.org/overviews/reflection/environment-universes-mirrors.html // For Scala 2.10, because we're initializing the runtime universe, this is not thread-safe. // http://docs.scala-lang.org/overviews/reflection/thread-safety.html @@ -56,4 +74,4 @@ package object rdd { constructorMirror.apply(args: _*).asInstanceOf[T] } } -} \ No newline at end of file +}