From 9e2942d92aac77d5543e6e8e0a08ba1009490dbf Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Thu, 17 Dec 2015 14:37:34 +0800 Subject: [PATCH 1/4] Add read.text and write.text for SparkR --- R/pkg/NAMESPACE | 4 +++- R/pkg/R/DataFrame.R | 28 +++++++++++++++++++++++ R/pkg/R/SQLContext.R | 26 +++++++++++++++++++++ R/pkg/R/generics.R | 4 ++++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 19 +++++++++++++++ 5 files changed, 80 insertions(+), 1 deletion(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index ccc01fe169601..beacc39500aaa 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -94,7 +94,8 @@ exportMethods("arrange", "withColumnRenamed", "write.df", "write.json", - "write.parquet") + "write.parquet", + "write.text") exportClasses("Column") @@ -274,6 +275,7 @@ export("as.DataFrame", "parquetFile", "read.df", "read.parquet", + "read.text", "sql", "table", "tableNames", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 0cfa12b997d69..6af848d051a36 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -661,6 +661,34 @@ setMethod("saveAsParquetFile", write.parquet(x, path) }) +#' write.text +#' +#' Saves the content of the DataFrame in a text file at the specified path. +#' The DataFrame must have only one column that is of string type. +#' Each row becomes a new line in the output file. +#' +#' @param x A SparkSQL DataFrame +#' @param path The directory where the file is saved +#' +#' @family DataFrame functions +#' @rdname write.text +#' @name write.text +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlContext <- sparkRSQL.init(sc) +#' path <- "path/to/file.json" +#' df <- read.json(sqlContext, path) +#' write.text(df, "/tmp/sparkr-tmp/") +#'} +setMethod("write.text", + signature(x = "DataFrame", path = "character"), + function(x, path) { + write <- callJMethod(x@sdf, "write") + invisible(callJMethod(write, "text", path)) + }) + #' Distinct #' #' Return a new DataFrame containing the distinct rows in this DataFrame. diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 9243d70e66f75..443666164b2b4 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -295,6 +295,32 @@ parquetFile <- function(sqlContext, ...) { dataFrame(sdf) } +#' Create a DataFrame from a text file. +#' +#' Loads a text file and returns a DataFrame with a single string column named "value". +#' Each line in the text file is a new row in the resulting DataFrame. +#' +#' @param sqlContext SQLContext to use +#' @param path Path of file to read. A vector of multiple paths is allowed. +#' @return DataFrame +#' @rdname read.text +#' @name read.text +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlContext <- sparkRSQL.init(sc) +#' path <- "path/to/file.txt" +#' df <- read.text(sqlContext, path) +#' } +read.text <- function(sqlContext, path) { + # Allow the user to have a more flexible definiton of the text file path + paths <- as.list(suppressWarnings(normalizePath(path))) + read <- callJMethod(sqlContext, "read") + sdf <- callJMethod(read, "text", paths) + dataFrame(sdf) +} + #' SQL Query #' #' Executes a SQL query using Spark, returning the result as a DataFrame. diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 62be2ddc8f522..ba6861709754d 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -549,6 +549,10 @@ setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") #' @export setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") }) +#' @rdname write.text +#' @export +setGeneric("write.text", function(x, path) { standardGeneric("write.text") }) + #' @rdname schema #' @export setGeneric("schema", function(x) { standardGeneric("schema") }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 135c7576e5291..366acfb40ace9 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1481,6 +1481,25 @@ test_that("read/write Parquet files", { unlink(parquetPath4) }) +test_that("read/write text files", { + # Test write.df and read.df + df <- read.df(sqlContext, jsonPath, "text") + expect_is(df, "DataFrame") + expect_equal(colnames(df), c("value")) + expect_equal(count(df), 3) + textPath <- tempfile(pattern = "textPath", fileext = ".txt") + write.df(df, textPath, "text", mode="overwrite") + + # Test write.text and read.text + textPath2 <- tempfile(pattern = "textPath2", fileext = ".txt") + write.text(df, textPath2) + df2 <- read.text(sqlContext, c(textPath, textPath2)) + expect_is(df2, "DataFrame") + expect_equal(colnames(df2), c("value")) + expect_equal(count(df2), count(df) * 2) + +}) + test_that("describe() and summarize() on a DataFrame", { df <- read.json(sqlContext, jsonPath) stats <- describe(df, "age") From c9f9bfee5cc3e12425632623b748b42574ac6df8 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Fri, 18 Dec 2015 19:47:40 +0800 Subject: [PATCH 2/4] unlink temporary paths --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 2 ++ 1 file changed, 2 insertions(+) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 366acfb40ace9..a251fd932d7a8 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1498,6 +1498,8 @@ test_that("read/write text files", { expect_equal(colnames(df2), c("value")) expect_equal(count(df2), count(df) * 2) + unlink(textPath) + unlink(textPath2) }) test_that("describe() and summarize() on a DataFrame", { From 85c3c4a140a904c38c5e6a2ab1329fbbfeff3c35 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 22 Dec 2015 18:02:51 +0800 Subject: [PATCH 3/4] update example codes --- R/pkg/R/DataFrame.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 6af848d051a36..1021238ab487e 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -678,8 +678,8 @@ setMethod("saveAsParquetFile", #'\dontrun{ #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) -#' path <- "path/to/file.json" -#' df <- read.json(sqlContext, path) +#' path <- "path/to/file.txt" +#' df <- read.text(sqlContext, path) #' write.text(df, "/tmp/sparkr-tmp/") #'} setMethod("write.text", From 5063f5f34aeee4ccce3124d5015152de68f0a7d7 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Wed, 6 Jan 2016 10:56:29 +0800 Subject: [PATCH 4/4] update doc --- R/pkg/R/DataFrame.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 1021238ab487e..7a7aef27ccb24 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -664,7 +664,7 @@ setMethod("saveAsParquetFile", #' write.text #' #' Saves the content of the DataFrame in a text file at the specified path. -#' The DataFrame must have only one column that is of string type. +#' The DataFrame must have only one column of string type with the name "value". #' Each row becomes a new line in the output file. #' #' @param x A SparkSQL DataFrame