Skip to content

Commit 51a9868

Browse files
committed
Move RDD-related methods to own package.
1 parent f5a2c2c commit 51a9868

File tree

2 files changed

+47
-39
lines changed

2 files changed

+47
-39
lines changed

src/main/scala/com/databricks/spark/csv/package.scala

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@ package com.databricks.spark
1818
import org.apache.commons.csv.CSVFormat
1919
import org.apache.hadoop.io.compress.CompressionCodec
2020

21-
import org.apache.spark.rdd.RDD
2221
import org.apache.spark.sql.{SQLContext, DataFrame, Row}
23-
import org.apache.spark.sql.catalyst.ScalaReflection
2422
import org.apache.spark.sql.types.StructType
2523

2624
package object csv {
@@ -57,43 +55,6 @@ package object csv {
5755
parseMode = "PERMISSIVE")(sqlContext)
5856
sqlContext.baseRelationToDataFrame(csvRelation)
5957
}
60-
61-
def csvFileToRDD[T: scala.reflect.runtime.universe.TypeTag : scala.reflect.ClassTag](
62-
filePath: String,
63-
useHeader: Boolean = true,
64-
delimiter: Char = ',',
65-
quote: Char = '"',
66-
escape: Char = '\\',
67-
mode: String = "PERMISSIVE"): RDD[T] = {
68-
69-
val schema = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]
70-
71-
val df = csvFile(filePath, useHeader, delimiter, quote, escape, mode, Some(schema))
72-
df.mapPartitions[T] { iter =>
73-
val rowConverter = RowConverter[T]()
74-
iter.map { row => rowConverter.convert(row) }
75-
}
76-
}
77-
}
78-
79-
case class RowConverter[T]()(implicit ct: scala.reflect.ClassTag[T]) {
80-
// http://docs.scala-lang.org/overviews/reflection/environment-universes-mirrors.html#types-of-mirrors-their-use-cases--examples
81-
82-
// For Scala 2.10, because we're initializing the runtime universe, this is not thread-safe.
83-
// http://docs.scala-lang.org/overviews/reflection/thread-safety.html
84-
val ru = scala.reflect.runtime.universe
85-
86-
val mirror = ru.runtimeMirror(getClass.getClassLoader)
87-
val classSymbol = mirror.classSymbol(ct.runtimeClass)
88-
val classMirror = mirror.reflectClass(classSymbol)
89-
val constructorSymbol = classSymbol.toType.declaration(ru.nme.CONSTRUCTOR).asMethod
90-
val constructorMirror = classMirror.reflectConstructor(constructorSymbol)
91-
92-
def convert(row: Row): T = {
93-
val args = row.toSeq
94-
require(constructorSymbol.paramss.head.size == args.size)
95-
constructorMirror.apply(row.toSeq: _*).asInstanceOf[T]
96-
}
9758
}
9859

9960
implicit class CsvSchemaRDD(dataFrame: DataFrame) {
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.databricks.spark.csv
2+
3+
import org.apache.spark.rdd.RDD
4+
import org.apache.spark.sql.{SQLContext, DataFrame, Row}
5+
import org.apache.spark.sql.catalyst.ScalaReflection
6+
import org.apache.spark.sql.types.StructType
7+
8+
package object rdd {
9+
implicit class CsvContextRDD(sqlContext: SQLContext) extends CsvContext(sqlContext) {
10+
def csvFileToRDD[T: scala.reflect.runtime.universe.TypeTag : scala.reflect.ClassTag](
11+
filePath: String,
12+
useHeader: Boolean = true,
13+
delimiter: Char = ',',
14+
quote: Char = '"',
15+
escape: Char = '\\',
16+
mode: String = "PERMISSIVE"): RDD[T] = {
17+
18+
val schema = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]
19+
20+
val df = csvFile(filePath, useHeader, delimiter, quote, escape, mode, Some(schema))
21+
df.mapPartitions[T] { iter =>
22+
val rowConverter = RowConverter[T]()
23+
iter.map { row => rowConverter.convert(row) }
24+
}
25+
}
26+
}
27+
28+
case class RowConverter[T]()(implicit ct: scala.reflect.ClassTag[T]) {
29+
// http://docs.scala-lang.org/overviews/reflection/environment-universes-mirrors.html#types-of-mirrors-their-use-cases--examples
30+
31+
// For Scala 2.10, because we're initializing the runtime universe, this is not thread-safe.
32+
// http://docs.scala-lang.org/overviews/reflection/thread-safety.html
33+
val ru = scala.reflect.runtime.universe
34+
35+
val mirror = ru.runtimeMirror(getClass.getClassLoader)
36+
val classSymbol = mirror.classSymbol(ct.runtimeClass)
37+
val classMirror = mirror.reflectClass(classSymbol)
38+
val constructorSymbol = classSymbol.toType.declaration(ru.nme.CONSTRUCTOR).asMethod
39+
val constructorMirror = classMirror.reflectConstructor(constructorSymbol)
40+
41+
def convert(row: Row): T = {
42+
val args = row.toSeq
43+
require(constructorSymbol.paramss.head.size == args.size)
44+
constructorMirror.apply(row.toSeq: _*).asInstanceOf[T]
45+
}
46+
}
47+
}

0 commit comments

Comments
 (0)