Skip to content
6 changes: 5 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ exportMethods("arrange",
"attach",
"cache",
"collect",
"colnames",
"colnames<-",
"coltypes",
"coltypes<-",
"columns",
"count",
"cov",
Expand Down Expand Up @@ -56,6 +59,7 @@ exportMethods("arrange",
"mutate",
"na.omit",
"names",
"names<-",
"ncol",
"nrow",
"orderBy",
Expand Down Expand Up @@ -276,4 +280,4 @@ export("structField",
"structType",
"structType.jobj",
"structType.structField",
"print.structType")
"print.structType")
166 changes: 117 additions & 49 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ setMethod("dtypes",
#' @family DataFrame functions
#' @rdname columns
#' @name columns

#' @export
#' @examples
#'\dontrun{
Expand All @@ -262,6 +263,7 @@ setMethod("dtypes",
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlContext, path)
#' columns(df)
#' colnames(df)
#'}
setMethod("columns",
signature(x = "DataFrame"),
Expand Down Expand Up @@ -290,6 +292,121 @@ setMethod("names<-",
}
})

#' @rdname columns
#' @name colnames
setMethod("colnames",
signature(x = "DataFrame"),
function(x) {
columns(x)
})

#' @rdname columns
#' @name colnames<-
setMethod("colnames<-",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

colnames<- also needs to be added to the exported list of functions ?

signature(x = "DataFrame", value = "character"),
function(x, value) {
sdf <- callJMethod(x@sdf, "toDF", as.list(value))
dataFrame(sdf)
})

#' coltypes
#'
#' Get column types of a DataFrame
#'
#' @param x A SparkSQL DataFrame
#' @return value A character vector with the column types of the given DataFrame
#' @rdname coltypes
#' @name coltypes
#' @family DataFrame functions
#' @export
#' @examples
#'\dontrun{
#' irisDF <- createDataFrame(sqlContext, iris)
#' coltypes(irisDF)
#'}
setMethod("coltypes",
signature(x = "DataFrame"),
function(x) {
# Get the data types of the DataFrame by invoking dtypes() function
types <- sapply(dtypes(x), function(x) {x[[2]]})

# Map Spark data types into R's data types using DATA_TYPES environment
rTypes <- sapply(types, USE.NAMES=F, FUN=function(x) {
# Check for primitive types
type <- PRIMITIVE_TYPES[[x]]

if (is.null(type)) {
# Check for complex types
for (t in names(COMPLEX_TYPES)) {
if (substring(x, 1, nchar(t)) == t) {
type <- COMPLEX_TYPES[[t]]
break
}
}

if (is.null(type)) {
stop(paste("Unsupported data type: ", x))
}
}
type
})

# Find which types don't have mapping to R
naIndices <- which(is.na(rTypes))

# Assign the original scala data types to the unmatched ones
rTypes[naIndices] <- types[naIndices]

rTypes
})

#' coltypes
#'
#' Set the column types of a DataFrame.
#'
#' @param x A SparkSQL DataFrame
#' @param value A character vector with the target column types for the given
#' DataFrame. Column types can be one of integer, numeric/double, character, logical, or NA
#' to keep that column as-is.
#' @rdname coltypes
#' @name coltypes<-
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlContext, path)
#' coltypes(df) <- c("character", "integer")
#' coltypes(df) <- c(NA, "numeric")
#'}
setMethod("coltypes<-",
signature(x = "DataFrame", value = "character"),
function(x, value) {
cols <- columns(x)
ncols <- length(cols)
if (length(value) == 0) {
stop("Cannot set types of an empty DataFrame with no Column")
}
if (length(value) != ncols) {
stop("Length of type vector should match the number of columns for DataFrame")
}
newCols <- lapply(seq_len(ncols), function(i) {
col <- getColumn(x, cols[i])
if (!is.na(value[i])) {
stype <- rToSQLTypes[[value[i]]]
if (is.null(stype)) {
stop("Only atomic type is supported for column types")
}
cast(col, stype)
} else {
col
}
})
nx <- select(x, newCols)
dataFrame(nx@sdf)
})

#' Register Temporary Table
#'
#' Registers a DataFrame as a Temporary Table in the SQLContext
Expand Down Expand Up @@ -2102,52 +2219,3 @@ setMethod("with",
newEnv <- assignNewEnv(data)
eval(substitute(expr), envir = newEnv, enclos = newEnv)
})

#' Returns the column types of a DataFrame.
#'
#' @name coltypes
#' @title Get column types of a DataFrame
#' @family dataframe_funcs
#' @param x (DataFrame)
#' @return value (character) A character vector with the column types of the given DataFrame
#' @rdname coltypes
#' @examples \dontrun{
#' irisDF <- createDataFrame(sqlContext, iris)
#' coltypes(irisDF)
#' }
setMethod("coltypes",
signature(x = "DataFrame"),
function(x) {
# Get the data types of the DataFrame by invoking dtypes() function
types <- sapply(dtypes(x), function(x) {x[[2]]})

# Map Spark data types into R's data types using DATA_TYPES environment
rTypes <- sapply(types, USE.NAMES=F, FUN=function(x) {

# Check for primitive types
type <- PRIMITIVE_TYPES[[x]]

if (is.null(type)) {
# Check for complex types
for (t in names(COMPLEX_TYPES)) {
if (substring(x, 1, nchar(t)) == t) {
type <- COMPLEX_TYPES[[t]]
break
}
}

if (is.null(type)) {
stop(paste("Unsupported data type: ", x))
}
}
type
})

# Find which types don't have mapping to R
naIndices <- which(is.na(rTypes))

# Assign the original scala data types to the unmatched ones
rTypes[naIndices] <- types[naIndices]

rTypes
})
20 changes: 16 additions & 4 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,22 @@ setGeneric("agg", function (x, ...) { standardGeneric("agg") })
#' @export
setGeneric("arrange", function(x, col, ...) { standardGeneric("arrange") })

#' @rdname columns
#' @export
setGeneric("colnames", function(x, do.NULL = TRUE, prefix = "col") { standardGeneric("colnames") })

#' @rdname columns
#' @export
setGeneric("colnames<-", function(x, value) { standardGeneric("colnames<-") })

#' @rdname coltypes
#' @export
setGeneric("coltypes", function(x) { standardGeneric("coltypes") })

#' @rdname coltypes
#' @export
setGeneric("coltypes<-", function(x, value) { standardGeneric("coltypes<-") })

#' @rdname schema
#' @export
setGeneric("columns", function(x) {standardGeneric("columns") })
Expand Down Expand Up @@ -1081,7 +1097,3 @@ setGeneric("attach")
#' @rdname with
#' @export
setGeneric("with")

#' @rdname coltypes
#' @export
setGeneric("coltypes", function(x) { standardGeneric("coltypes") })
8 changes: 8 additions & 0 deletions R/pkg/R/types.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ COMPLEX_TYPES <- list(

# The full list of data types.
DATA_TYPES <- as.environment(c(as.list(PRIMITIVE_TYPES), COMPLEX_TYPES))

# An environment for mapping R to Scala, names are R types and values are Scala types.
rToSQLTypes <- as.environment(list(
"integer" = "integer", # in R, integer is 32bit
"numeric" = "double", # in R, numeric == double which is 64bit
"double" = "double",
"character" = "string",
"logical" = "boolean"))
40 changes: 39 additions & 1 deletion R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,26 @@ test_that("schema(), dtypes(), columns(), names() return the correct values/form
expect_equal(testNames[2], "name")
})

test_that("names() colnames() set the column names", {
df <- jsonFile(sqlContext, jsonPath)
names(df) <- c("col1", "col2")
expect_equal(colnames(df)[2], "col2")

colnames(df) <- c("col3", "col4")
expect_equal(names(df)[1], "col3")

# Test base::colnames base::names
m2 <- cbind(1, 1:4)
expect_equal(colnames(m2, do.NULL = FALSE), c("col1", "col2"))
colnames(m2) <- c("x","Y")
expect_equal(colnames(m2), c("x", "Y"))

z <- list(a = 1, b = "c", c = 1:3)
expect_equal(names(z)[3], "c")
names(z)[3] <- "c2"
expect_equal(names(z)[3], "c2")
})

test_that("head() and first() return the correct data", {
df <- jsonFile(sqlContext, jsonPath)
testHead <- head(df)
Expand Down Expand Up @@ -1616,7 +1636,7 @@ test_that("with() on a DataFrame", {
expect_equal(nrow(sum2), 35)
})

test_that("Method coltypes() to get R's data types of a DataFrame", {
test_that("Method coltypes() to get and set R's data types of a DataFrame", {
expect_equal(coltypes(irisDF), c(rep("numeric", 4), "character"))

data <- data.frame(c1=c(1,2,3),
Expand All @@ -1635,6 +1655,24 @@ test_that("Method coltypes() to get R's data types of a DataFrame", {
x <- createDataFrame(sqlContext, list(list(as.environment(
list("a"="b", "c"="d", "e"="f")))))
expect_equal(coltypes(x), "map<string,string>")

df <- selectExpr(jsonFile(sqlContext, jsonPath), "name", "(age * 1.21) as age")
expect_equal(dtypes(df), list(c("name", "string"), c("age", "decimal(24,2)")))

df1 <- select(df, cast(df$age, "integer"))
coltypes(df) <- c("character", "integer")
expect_equal(dtypes(df), list(c("name", "string"), c("age", "int")))
value <- collect(df[, 2])[[3, 1]]
expect_equal(value, collect(df1)[[3, 1]])
expect_equal(value, 22)

coltypes(df) <- c(NA, "numeric")
expect_equal(dtypes(df), list(c("name", "string"), c("age", "double")))

expect_error(coltypes(df) <- c("character"),
"Length of type vector should match the number of columns for DataFrame")
expect_error(coltypes(df) <- c("environment", "list"),
"Only atomic type is supported for column types")
})

unlink(parquetPath)
Expand Down