Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
461714d
SPARK-10807. Added as.data.frame as a synonym for collect().
Sep 24, 2015
e9e34b5
Removed operator %++%, which is a synonym for paste()
Sep 24, 2015
c65b682
Removed extra blank space.
Sep 24, 2015
cee871c
Removed extra spaces to comply with R style
Sep 24, 2015
0851163
Moved setGeneric declaration to generics.R.
Sep 28, 2015
7a8e62a
Added test cases for as.data.frame
Sep 28, 2015
de6d164
Merge remote-tracking branch 'origin/SPARK-10807' into SPARK-10807
Sep 28, 2015
a346cc6
Changed setMethod declaration to comply with standard
Sep 28, 2015
6c4dcbc
Removed changes to .gitignore
Sep 30, 2015
99e6304
Merge remote-tracking branch 'upstream/master'
Oct 5, 2015
30c5d26
coltypes
Oct 5, 2015
4a92d99
Merged
Nov 6, 2015
a68f97a
coltypes
Oct 5, 2015
360156c
coltypes
Oct 5, 2015
0c2da6c
Added more types. Scala types that can't be mapped to R will remain a…
Oct 9, 2015
909e4e3
Removed white space
Oct 9, 2015
3cd2079
Added more tests
Oct 9, 2015
a7723d9
Fixed typo
Oct 9, 2015
0a0b278
Fixed typo
Oct 9, 2015
7e89935
Moved coltypes to new file types.R and refactored schema.R
Oct 19, 2015
21c0799
Updated DESCRIPTION file to add types.R
Oct 19, 2015
fee5a2e
Updated DESCRIPTION file
Oct 19, 2015
e1056ab
Coding style for setGeneric definition
Oct 20, 2015
75f5ced
Coding style
Oct 20, 2015
908abf4
Coding style
Oct 20, 2015
37bdc46
Fixed data type mapping. Put data types in an environment for more ef…
Nov 3, 2015
3b5c2d5
Removed unnecessary cat
Nov 3, 2015
001884a
Removed white space
Nov 3, 2015
eaaf178
Removed blank space
Nov 3, 2015
9a9618e
Update DataFrame.R
Nov 3, 2015
25faa4e
Update types.R
Nov 4, 2015
57a47a4
Update types.R
Nov 4, 2015
e5ab466
Update DataFrame.R
Nov 4, 2015
772de99
Added tests for complex types
Nov 4, 2015
67b12a4
Update types.R
Nov 4, 2015
0bb39dc
Update types.R
Nov 4, 2015
8aa13ef
Update test_sparkSQL.R
Nov 4, 2015
9b36955
Removed for loop
Nov 5, 2015
95a8ece
Update DataFrame.R
Nov 5, 2015
462b1f1
Update DataFrame.R
Nov 5, 2015
cd033c0
Removed blank space
Nov 5, 2015
ba091fb
Merge tests and description files
Nov 6, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
694 changes: 678 additions & 16 deletions LICENSE

Large diffs are not rendered by default.

35 changes: 0 additions & 35 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -572,38 +572,3 @@ Copyright 2009-2013 The Apache Software Foundation

Apache Avro IPC
Copyright 2009-2013 The Apache Software Foundation


Vis.js
Copyright 2010-2015 Almende B.V.

Vis.js is dual licensed under both

* The Apache 2.0 License
http://www.apache.org/licenses/LICENSE-2.0

and

* The MIT License
http://opensource.org/licenses/MIT

Vis.js may be distributed under either license.


Vis.js uses and redistributes the following third-party libraries:

- component-emitter
https://github.com/component/emitter
The MIT License

- hammer.js
http://hammerjs.github.io/
The MIT License

- moment.js
http://momentjs.com/
The MIT License

- keycharm
https://github.com/AlexDM0/keycharm
The MIT License
1 change: 1 addition & 0 deletions R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ Collate:
'serialize.R'
'sparkR.R'
'stats.R'
'types.R'
'utils.R'
5 changes: 3 additions & 2 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ export("setJobGroup",
exportClasses("DataFrame")

exportMethods("arrange",
"attach",
"as.data.frame",
"attach",
"cache",
"collect",
"coltypes",
"columns",
"count",
"cov",
Expand Down Expand Up @@ -264,4 +266,3 @@ export("structField",
"structType.structField",
"print.structType")

export("as.data.frame")
55 changes: 55 additions & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2152,3 +2152,58 @@ setMethod("with",
newEnv <- assignNewEnv(data)
eval(substitute(expr), envir = newEnv, enclos = newEnv)
})

#' Returns the column types of a DataFrame.
#'
#' @name coltypes
#' @title Get column types of a DataFrame
#' @param x (DataFrame)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update the style of function description to be more consistent with other existing ones?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change this when updating my PR #9218

#' @return value (character) A character vector with the column types of the given DataFrame
#' @rdname coltypes
setMethod("coltypes",
signature(x = "DataFrame"),
function(x) {
# TODO: This may be moved as a global parameter
# These are the supported data types and how they map to
# R's data types
DATA_TYPES <- c("string"="character",
"long"="integer",
"tinyint"="integer",
"short"="integer",
"integer"="integer",
"byte"="integer",
"double"="numeric",
"float"="numeric",
"decimal"="numeric",
"boolean"="logical"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You only handle primitive types here, but no complex types, like Array, Struct and Map.

It would be better you can refactor the type mapping related code here and that in SerDe.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sun-rui For complex types (Array/Struct/Map), I can't think of any mapping to R types. Therefore, as agreed with @felixcheung and @shivaram, these will remain the same. For example:

Original column types: ["string", "boolean", "map..."]
Result of coltypes(): ["character", "logical", "map..."]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@olarayej I think the fall back mechanism here is good. But @sun-rui makes another good point that it will be good to have one unified place where we do a mapping from R types to java types. Right now part of that is in serialize.R / deserialize.R

Could you see if there is some refactoring we could do for this to not be duplicated ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sun-rui @shivaram
The notion of coltypes is actually spread in three files: schema.R, serialize.R, deserialize.R.

In file serialize.R, method writeType (see below) turns the full data type into a one-character string. Then, method readTypedObject (see below), uses this one-character type to read accordingly. I suspect this is because complex types could be like map (String,String)?

In my opinion, it would be better to use the full data type, as opposed to the first letter (which could be especially confusing since we support data types starting with the same letter Date/Double, String/Struct). Also, having the full data type would allow for centralizing the data types in one place, though this would require some major changes

We could have mapping arrays:

PRIMITIVE_TYPES <- c("string"="character",
"long"="integer",
"tinyint"="integer",
"short"="integer",
"integer"="integer",
"byte"="integer",
"double"="numeric",
"float"="numeric",
"decimal"="numeric",
"boolean"="logical")

COMPLEX_TYPES <- c("map", "array", "struct", ...)

DATA_TYPES <- c(PRIMITIVE_TYPES, COMPLEX_TYPES)

And then we'd need to modify deserialize.R, serialize.R, and schema.R to acknowledge these accordingly.

Thoughts?

writeType <- function(con, class) {
type <- switch(class,
NULL = "n",
integer = "i",
character = "c",
logical = "b",
double = "d",
numeric = "d",
raw = "r",
array = "a",
list = "l",
struct = "s",
jobj = "j",
environment = "e",
Date = "D",
POSIXlt = "t",
POSIXct = "t",
stop(paste("Unsupported type for serialization", class)))
writeBin(charToRaw(type), con)
}

readTypedObject <- function(con, type) {
switch (type,
"i" = readInt(con),
"c" = readString(con),
"b" = readBoolean(con),
"d" = readDouble(con),
"r" = readRaw(con),
"D" = readDate(con),
"t" = readTime(con),
"a" = readArray(con),
"l" = readList(con),
"e" = readEnv(con),
"s" = readStruct(con),
"n" = NULL,
"j" = getJobj(readString(con)),
stop(paste("Unsupported type for deserialization", type)))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The single character names are to reduce the amount of data serialized when we transfer these data types to the JVM. Its not meant to be remembered by anybody so I don't see it being a source of confusion. @sun-rui also added tests which ensure these mappings don't break.

However I think having a list of primitive types, complex types and mapping in a common file (types.R ?) sounds good to me.


# Get the data types of the DataFrame by invoking dtypes() function
types <- sapply(dtypes(x), function(x) {x[[2]]})

# Map Spark data types into R's data types using DATA_TYPES environment
rTypes <- sapply(types, USE.NAMES=F, FUN=function(x) {

# Check for primitive types
type <- PRIMITIVE_TYPES[[x]]
if (is.null(type)) {
# Check for complex types
typeName <- Filter(function(t) { substring(x, 1, nchar(t)) == t},
names(COMPLEX_TYPES))
if (length(typeName) > 0) {
type <- COMPLEX_TYPES[[typeName]]
} else {
stop(paste("Unsupported data type: ", x))
}
}
type
})

# Find which types don't have mapping to R
naIndices <- which(is.na(rTypes))

# Assign the original scala data types to the unmatched ones
rTypes[naIndices] <- types[naIndices]

rTypes
})
5 changes: 4 additions & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,6 @@ setGeneric("weekofyear", function(x) { standardGeneric("weekofyear") })
#' @export
setGeneric("year", function(x) { standardGeneric("year") })


#' @rdname glm
#' @export
setGeneric("glm")
Expand All @@ -1047,3 +1046,7 @@ setGeneric("attach")
#' @rdname with
#' @export
setGeneric("with")

#' @rdname coltypes
#' @export
setGeneric("coltypes", function(x) { standardGeneric("coltypes") })
5 changes: 2 additions & 3 deletions R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ setClass("PipelineModel", representation(model = "jobj"))
#' Fits a generalized linear model, similarly to R's glm(). Also see the glmnet package.
#'
#' @param formula A symbolic description of the model to be fitted. Currently only a few formula
#' operators are supported, including '~', '.', ':', '+', and '-'.
#' operators are supported, including '~', '+', '-', and '.'.
#' @param data DataFrame for training
#' @param family Error distribution. "gaussian" -> linear regression, "binomial" -> logistic reg.
#' @param lambda Regularization parameter
Expand All @@ -41,8 +41,7 @@ setClass("PipelineModel", representation(model = "jobj"))
#' sqlContext <- sparkRSQL.init(sc)
#' data(iris)
#' df <- createDataFrame(sqlContext, iris)
#' model <- glm(Sepal_Length ~ Sepal_Width, df, family="gaussian")
#' summary(model)
#' model <- glm(Sepal_Length ~ Sepal_Width, df)
#'}
setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFrame"),
function(formula, family = c("gaussian", "binomial"), data, lambda = 0, alpha = 0,
Expand Down
15 changes: 1 addition & 14 deletions R/pkg/R/schema.R
Original file line number Diff line number Diff line change
Expand Up @@ -115,20 +115,7 @@ structField.jobj <- function(x) {
}

checkType <- function(type) {
primtiveTypes <- c("byte",
"integer",
"float",
"double",
"numeric",
"character",
"string",
"binary",
"raw",
"logical",
"boolean",
"timestamp",
"date")
if (type %in% primtiveTypes) {
if (type %in% names(PRIMITIVE_TYPES)) {
return()
} else {
# Check complex types
Expand Down
43 changes: 43 additions & 0 deletions R/pkg/R/types.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# types.R. This file handles the data type mapping between Spark and R

# The primitive data types, where names(PRIMITIVE_TYPES) are Scala types whereas
# values are equivalent R types. This is stored in an environment to allow for
# more efficient look up (environments use hashmaps).
PRIMITIVE_TYPES <- as.environment(list(
"byte"="integer",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"byte" should be "tinyint"

"tinyint"="integer",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"smallint"="integer",

"smallint"="integer",
"integer"="integer",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"int"="integer",

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"bigint"="numeric"

"bigint"="numeric",
"float"="numeric",
"double"="numeric",
"decimal"="numeric",
"string"="character",
"binary"="raw",
"boolean"="logical",
"timestamp"="POSIXct",
"date"="Date"))

# The complex data types. These do not have any direct mapping to R's types.
COMPLEX_TYPES <- list(
"map"=NA,
"array"=NA,
"struct"=NA)

# The full list of data types.
DATA_TYPES <- as.environment(c(as.list(PRIMITIVE_TYPES), COMPLEX_TYPES))
10 changes: 1 addition & 9 deletions R/pkg/inst/tests/test_mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,6 @@ test_that("dot minus and intercept vs native glm", {
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
})

test_that("feature interaction vs native glm", {
training <- createDataFrame(sqlContext, iris)
model <- glm(Sepal_Width ~ Species:Sepal_Length, data = training)
vals <- collect(select(predict(model, training), "prediction"))
rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
})

test_that("summary coefficients match with native glm", {
training <- createDataFrame(sqlContext, iris)
stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training, solver = "l-bfgs"))
Expand All @@ -77,7 +69,7 @@ test_that("summary coefficients match with native glm", {
expect_true(all(abs(rCoefs - coefs) < 1e-6))
expect_true(all(
as.character(stats$features) ==
c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica")))
c("(Intercept)", "Sepal_Length", "Species__versicolor", "Species__virginica")))
})

test_that("summary coefficients match with native glm of family 'binomial'", {
Expand Down
31 changes: 23 additions & 8 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -695,13 +695,6 @@ test_that("select with column", {
expect_equal(columns(df3), c("x"))
expect_equal(count(df3), 3)
expect_equal(collect(select(df3, "x"))[[1, 1]], "x")

df4 <- select(df, c("name", "age"))
expect_equal(columns(df4), c("name", "age"))
expect_equal(count(df4), 3)

expect_error(select(df, c("name", "age"), "name"),
"To select multiple columns, use a character vector or list for col")
})

test_that("subsetting", {
Expand Down Expand Up @@ -1467,8 +1460,9 @@ test_that("SQL error message is returned from JVM", {
expect_equal(grepl("Table not found: blah", retError), TRUE)
})

irisDF <- createDataFrame(sqlContext, iris)

test_that("Method as.data.frame as a synonym for collect()", {
irisDF <- createDataFrame(sqlContext, iris)
expect_equal(as.data.frame(irisDF), collect(irisDF))
irisDF2 <- irisDF[irisDF$Species == "setosa", ]
expect_equal(as.data.frame(irisDF2), collect(irisDF2))
Expand Down Expand Up @@ -1503,6 +1497,27 @@ test_that("with() on a DataFrame", {
expect_equal(nrow(sum2), 35)
})

test_that("Method coltypes() to get R's data types of a DataFrame", {
expect_equal(coltypes(irisDF), c(rep("numeric", 4), "character"))

data <- data.frame(c1=c(1,2,3),
c2=c(T,F,T),
c3=c("2015/01/01 10:00:00", "2015/01/02 10:00:00", "2015/01/03 10:00:00"))

schema <- structType(structField("c1", "byte"),
structField("c3", "boolean"),
structField("c4", "timestamp"))

# Test primitive types
DF <- createDataFrame(sqlContext, data, schema)
expect_equal(coltypes(DF), c("integer", "logical", "POSIXct"))

# Test complex types
x <- createDataFrame(sqlContext, list(list(as.environment(
list("a"="b", "c"="d", "e"="f")))))
expect_equal(coltypes(x), "map<string,string>")
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test with some other types ? Also another one which runs into the NA case and uses the SQL type would be useful.


unlink(parquetPath)
unlink(jsonPath)
unlink(jsonPathNa)
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ will run the Pi example locally.

You can set the MASTER environment variable when running examples to submit
examples to a cluster. This can be a mesos:// or spark:// URL,
"yarn" to run on YARN, and "local" to run
"yarn-cluster" or "yarn-client" to run on YARN, and "local" to run
locally with one thread, or "local[N]" to run locally with N threads. You
can also use an abbreviated class name if the class is in the `examples`
package. For instance:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
} finally {
Closeables.close(in, copyThrewException);
}
if (!partitionWriters[i].fileSegment().file().delete()) {
if (!blockManager.diskBlockManager().getFile(partitionWriters[i].blockId()).delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}
Expand Down
18 changes: 0 additions & 18 deletions core/src/main/java/org/apache/spark/util/collection/TimSort.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,6 @@
* limitations under the License.
*/

/*
* Based on TimSort.java from the Android Open Source Project
*
* Copyright (C) 2008 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.collection;

import java.util.Comparator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.spark.annotation.Private;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.types.ByteArray;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.util.Utils;

Expand Down Expand Up @@ -63,7 +62,21 @@ public int compare(long aPrefix, long bPrefix) {
}

public static long computePrefix(byte[] bytes) {
return ByteArray.getPrefix(bytes);
if (bytes == null) {
return 0L;
} else {
/**
* TODO: If a wrapper for BinaryType is created (SPARK-8786),
* these codes below will be in the wrapper class.
*/
final int minLen = Math.min(bytes.length, 8);
long p = 0;
for (int i = 0; i < minLen; ++i) {
p |= (128L + Platform.getByte(bytes, Platform.BYTE_ARRAY_OFFSET + i))
<< (56 - 8 * i);
}
return p;
}
}
}

Expand Down
Loading