From 67cd3322b409931dfe704f04f2b467bbf06dddbe Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 2 Mar 2016 11:20:14 +0900 Subject: [PATCH 1/3] Add read.csv and write.csv for SparkR --- R/pkg/NAMESPACE | 4 ++- R/pkg/R/DataFrame.R | 27 ++++++++++++++++++++ R/pkg/R/SQLContext.R | 30 +++++++++++++++++++++-- R/pkg/R/generics.R | 4 +++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 25 +++++++++++++++++++ 5 files changed, 87 insertions(+), 3 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 636d39e1e9ca..1f8c18de29cd 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -101,7 +101,8 @@ exportMethods("arrange", "write.df", "write.json", "write.parquet", - "write.text") + "write.text", + "write.csv") exportClasses("Column") @@ -284,6 +285,7 @@ export("as.DataFrame", "read.json", "read.parquet", "read.text", + "read.csv", "sql", "str", "tableToDF", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 3b7b8250b94f..02101bcdc0a6 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -692,6 +692,33 @@ setMethod("write.text", invisible(callJMethod(write, "text", path)) }) +#' write.csv +#' +#' Save the contents of a DataFrame as a CSV file. Files written out +#' with this method can be read back in as a DataFrame using read.csv(). +#' +#' @param x A SparkSQL DataFrame +#' @param path The directory where the file is saved +#' +#' @family DataFrame functions +#' @rdname write.csv +#' @name write.csv +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlContext <- sparkRSQL.init(sc) +#' path <- "path/to/file.csv" +#' df <- read.csv(sqlContext, path) +#' write.csv(df, "/tmp/sparkr-tmp/") +#'} +setMethod("write.csv", + signature(x = "DataFrame", path = "character"), + function(x, path) { + write <- callJMethod(x@sdf, "write") + invisible(callJMethod(write, "csv", path)) + }) + #' Distinct #' #' Return a new DataFrame containing the distinct rows in this DataFrame. diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 16a2578678cd..b6a3b145005e 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -220,7 +220,7 @@ setMethod("toDF", signature(x = "RDD"), #' df <- jsonFile(sqlContext, path) #' } read.json <- function(sqlContext, path) { - # Allow the user to have a more flexible definiton of the text file path + # Allow the user to have a more flexible definiton of the JSON file path paths <- as.list(suppressWarnings(normalizePath(path))) read <- callJMethod(sqlContext, "read") sdf <- callJMethod(read, "json", paths) @@ -279,7 +279,7 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) { #' @name read.parquet #' @export read.parquet <- function(sqlContext, path) { - # Allow the user to have a more flexible definiton of the text file path + # Allow the user to have a more flexible definiton of the Parquet file path paths <- as.list(suppressWarnings(normalizePath(path))) read <- callJMethod(sqlContext, "read") sdf <- callJMethod(read, "parquet", paths) @@ -321,6 +321,32 @@ read.text <- function(sqlContext, path) { dataFrame(sdf) } +#' Create a DataFrame from a CSV file. +#' +#' Loads a CSV file, returning the result as a DataFrame +#' It goes through the entire dataset once to determine the schema. +#' +#' @param sqlContext SQLContext to use +#' @param path Path of file to read. A vector of multiple paths is allowed. +#' @return DataFrame +#' @rdname read.csv +#' @name read.csv +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlContext <- sparkRSQL.init(sc) +#' path <- "path/to/file.csv" +#' df <- read.csv(sqlContext, path) +#' } +read.csv <- function(sqlContext, path) { + # Allow the user to have a more flexible definiton of the CSV file path + paths <- as.list(suppressWarnings(normalizePath(path))) + read <- callJMethod(sqlContext, "read") + sdf <- callJMethod(read, "csv", paths) + dataFrame(sdf) +} + #' SQL Query #' #' Executes a SQL query using Spark, returning the result as a DataFrame. diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 3db72b57954d..13fa9ba67d83 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -593,6 +593,10 @@ setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParqu #' @export setGeneric("write.text", function(x, path) { standardGeneric("write.text") }) +#' @rdname write.csv +#' @export +setGeneric("write.csv", function(x, path) { standardGeneric("write.csv") }) + #' @rdname schema #' @export setGeneric("schema", function(x) { standardGeneric("schema") }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 236bae6bded2..c74adbe49cd1 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -42,6 +42,7 @@ mockLines <- c("{\"name\":\"Michael\"}", "{\"name\":\"Andy\", \"age\":30}", "{\"name\":\"Justin\", \"age\":19}") jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp") +csvPath <- tempfile(pattern="sparkr-test", fileext=".csv") parquetPath <- tempfile(pattern="sparkr-test", fileext=".parquet") writeLines(mockLines, jsonPath) @@ -1585,6 +1586,30 @@ test_that("read/write text files", { unlink(textPath2) }) +test_that("read/write csv files", { + df <- read.df(sqlContext, jsonPath, "json") + write.df(df, csvPath, "csv", "overwrite") + + # Test write.df and read.df + dfCSV <- read.df(sqlContext, csvPath, "csv") + expect_is(dfCSV, "DataFrame") + expect_equal(colnames(dfCSV), c("C0", "C1")) + expect_equal(count(dfCSV), 3) + csvPath2 <- tempfile(pattern = "csvPath2", fileext = ".csv") + write.df(dfCSV, csvPath2, "csv", mode="overwrite") + + # Test write.csv and read.csv + csvPath3 <- tempfile(pattern = "csvPath3", fileext = ".csv") + write.csv(dfCSV, csvPath3) + dfCSV2 <- read.csv(sqlContext, c(csvPath2, csvPath3)) + expect_is(dfCSV2, "DataFrame") + expect_equal(colnames(dfCSV2), c("C0", "C1")) + expect_equal(count(dfCSV2), count(dfCSV) * 2) + + unlink(csvPath2) + unlink(csvPath3) +}) + test_that("describe() and summarize() on a DataFrame", { df <- read.json(sqlContext, jsonPath) stats <- describe(df, "age") From f6017480aeac10f914259034763cdfaed7447d36 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 2 Mar 2016 13:08:44 +0900 Subject: [PATCH 2/3] Modify masked functions test --- R/pkg/inst/tests/testthat/test_context.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index ad3f9722a480..700a5b85a31d 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -24,7 +24,7 @@ test_that("Check masked functions", { func <- lapply(masked, function(x) { capture.output(showMethods(x))[[1]] }) funcSparkROrEmpty <- grepl("\\(package SparkR\\)$|^$", func) maskedBySparkR <- masked[funcSparkROrEmpty] - namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var", + namesOfMasked <- c("read.csv", "write.csv", "describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var", "colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset", "summary", "transform", "drop") expect_equal(length(maskedBySparkR), length(namesOfMasked)) @@ -36,7 +36,7 @@ test_that("Check masked functions", { any(grepl("=\"ANY\"", capture.output(showMethods(x)[-1]))) })) maskedCompletely <- masked[!funcHasAny] - namesOfMaskedCompletely <- c("cov", "filter", "sample") + namesOfMaskedCompletely <- c("read.csv", "write.csv", "cov", "filter", "sample") expect_equal(length(maskedCompletely), length(namesOfMaskedCompletely)) expect_equal(sort(maskedCompletely), sort(namesOfMaskedCompletely)) }) From 8bb455edc98ec11fa843863a3ea266e1bb17c115 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 2 Mar 2016 13:18:53 +0900 Subject: [PATCH 3/3] Correct style --- R/pkg/inst/tests/testthat/test_context.R | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index 700a5b85a31d..41d18fd692ba 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -24,9 +24,9 @@ test_that("Check masked functions", { func <- lapply(masked, function(x) { capture.output(showMethods(x))[[1]] }) funcSparkROrEmpty <- grepl("\\(package SparkR\\)$|^$", func) maskedBySparkR <- masked[funcSparkROrEmpty] - namesOfMasked <- c("read.csv", "write.csv", "describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var", + namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var", "colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset", - "summary", "transform", "drop") + "summary", "transform", "drop", "read.csv", "write.csv") expect_equal(length(maskedBySparkR), length(namesOfMasked)) expect_equal(sort(maskedBySparkR), sort(namesOfMasked)) # above are those reported as masked when `library(SparkR)` @@ -36,7 +36,7 @@ test_that("Check masked functions", { any(grepl("=\"ANY\"", capture.output(showMethods(x)[-1]))) })) maskedCompletely <- masked[!funcHasAny] - namesOfMaskedCompletely <- c("read.csv", "write.csv", "cov", "filter", "sample") + namesOfMaskedCompletely <- c("cov", "filter", "sample", "read.csv", "write.csv") expect_equal(length(maskedCompletely), length(namesOfMaskedCompletely)) expect_equal(sort(maskedCompletely), sort(namesOfMaskedCompletely)) })