Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# Int -> integer
# String -> character
# Boolean -> logical
# Float -> double
# Double -> double
# Long -> double
# Array[Byte] -> raw
Expand Down
1 change: 1 addition & 0 deletions R/pkg/R/schema.R
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ structField.character <- function(x, type, nullable = TRUE) {
}
options <- c("byte",
"integer",
"float",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you have a test for create a DataFrame with float type? It may crash now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I added it.

It is ok to create a DataFrame with float type. Inserting data from RDD to the DataFrame is no problem too. But if you want to insert local data from R to the DataFrame, it will crash because we serialize double in R to Double in JVM.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the thing I worry about, create a DataFrame from local data is the most important use case right now. I think we shouldn't support FloatType or make it really works.

"double",
"numeric",
"character",
Expand Down
26 changes: 26 additions & 0 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,32 @@ test_that("create DataFrame from RDD", {
expect_equal(count(df), 10)
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))

df <- jsonFile(sqlContext, jsonPathNa)
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
}, error = function(err) {
skip("Hive is not build with SparkSQL, skipped")
})
sql(hiveCtx, "CREATE TABLE people (name string, age double, height float)")
insertInto(df, "people")
expect_equal(sql(hiveCtx, "SELECT age from people WHERE name = 'Bob'"), c(16))
expect_equal(sql(hiveCtx, "SELECT height from people WHERE name ='Bob'"), c(176.5))

schema <- structType(structField("name", "string"), structField("age", "integer"),
structField("height", "float"))
df2 <- createDataFrame(sqlContext, df.toRDD, schema)
expect_equal(columns(df2), c("name", "age", "height"))
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(collect(where(df2, df2$name == "Bob")), c("Bob", 16, 176.5))

localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18), height=c(164.10, 181.4, 173.7))
df <- createDataFrame(sqlContext, localDF, schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this, but I am not sure this is testing the same bug reported ? After constructing the DF, if I do show(df) then I see the column as double while in the original bug report the columns were marked as float

show(result)
DataFrame[offset:float, percentage:float]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked this. The column is still double due to another problem I just submitted in #7311. That is, in createDataFrame, the given schema will be overwritten.

Although I solved that in #7311, I just found that with user defined schema, it is possible to cause problem when collecting data from dataframe.

That is because we serialize double in R to Double in Java. If we define a column as float in R and create a dataframe based on this schema. The serialized and deserialized Double will be stored at the float column. Then when we collect the data from it, it will throw error.

@shivaram How do you think? Do we need to fix #7311? Or you think it is up to users to define correct schema?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davies, is there any reason that allows user pass in a schema for createDataFrame(), as we can infer types (R objects have runtime type information)? Even if in some cases, user-specified schema is needed, I think only those DataTypes that can map to native R types will be supported, for long,float, it is not natural to support.

For external sources that has float types , which will be loaded as java.lang.Float in JVM side, we can support transferring it to double type in R side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that is loaded in JVM side, I think it is no problem. We already have serialization/deserialization for values from R/Java to Java/R.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main reason for supporting user-defined schema was to have support for column names that are different from the ones given in the local R data frame. We could of course switch to only picking up names from the given schema rather than the types -- but I also think specifying schema is an advanced option, so expecting users to get it to match their data types is fine.

As a follow up JIRA, we could file a new issue to warn or print an error if we find that the schema specified doesn't match the types of values being serialized.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. That is good. As #7311 is merged now. I should update this test case or it will fail due to this issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For user specified schema for createDataFrame, my point is we may not support some DataTypes like byte, long, float, which is not natural to R users. Or alternatively, from the view point of API parity with Scala, we support these types but internally convert to R natural types, like:
byte -> integer
long -> double
float -> double
and print some warning message about the conversion.

expect_is(df, "DataFrame")
expect_equal(count(df), 3)
expect_equal(columns(df), c("name", "age", "height"))
expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(collect(where(df, df$name == "John")), c("John", 19, 164.10))
})

test_that("convert NAs to null type in DataFrames", {
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/api/r/SerDe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ private[spark] object SerDe {
// Int -> integer
// String -> character
// Boolean -> logical
// Float -> double
// Double -> double
// Long -> double
// Array[Byte] -> raw
Expand Down Expand Up @@ -215,6 +216,9 @@ private[spark] object SerDe {
case "long" | "java.lang.Long" =>
writeType(dos, "double")
writeDouble(dos, value.asInstanceOf[Long].toDouble)
case "float" | "java.lang.Float" =>
writeType(dos, "double")
writeDouble(dos, value.asInstanceOf[Float].toDouble)
case "double" | "java.lang.Double" =>
writeType(dos, "double")
writeDouble(dos, value.asInstanceOf[Double])
Expand Down
15 changes: 12 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ private[r] object SQLUtils {
dataType match {
case "byte" => org.apache.spark.sql.types.ByteType
case "integer" => org.apache.spark.sql.types.IntegerType
case "float" => org.apache.spark.sql.types.FloatType
case "double" => org.apache.spark.sql.types.DoubleType
case "numeric" => org.apache.spark.sql.types.DoubleType
case "character" => org.apache.spark.sql.types.StringType
Expand All @@ -68,20 +69,28 @@ private[r] object SQLUtils {

def createDF(rdd: RDD[Array[Byte]], schema: StructType, sqlContext: SQLContext): DataFrame = {
val num = schema.fields.size
val rowRDD = rdd.map(bytesToRow)
val rowRDD = rdd.map(bytesToRow(_, schema))
sqlContext.createDataFrame(rowRDD, schema)
}

def dfToRowRDD(df: DataFrame): JavaRDD[Array[Byte]] = {
df.map(r => rowToRBytes(r))
}

private[this] def bytesToRow(bytes: Array[Byte]): Row = {
private[this] def doConversion(data: Object, dataType: DataType): Object = {
data match {
case d: java.lang.Double if dataType == FloatType =>
new java.lang.Float(d)
case _ => data
}
}

private[this] def bytesToRow(bytes: Array[Byte], schema: StructType): Row = {
val bis = new ByteArrayInputStream(bytes)
val dis = new DataInputStream(bis)
val num = SerDe.readInt(dis)
Row.fromSeq((0 until num).map { i =>
SerDe.readObject(dis)
doConversion(SerDe.readObject(dis), schema.fields(i).dataType)
}.toSeq)
}

Expand Down