Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ exportMethods("arrange",
"withColumnRenamed",
"write.df",
"write.json",
"write.parquet")
"write.parquet",
"write.text")

exportClasses("Column")

Expand Down Expand Up @@ -274,6 +275,7 @@ export("as.DataFrame",
"parquetFile",
"read.df",
"read.parquet",
"read.text",
"sql",
"table",
"tableNames",
Expand Down
28 changes: 28 additions & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,34 @@ setMethod("saveAsParquetFile",
write.parquet(x, path)
})

#' write.text
#'
#' Saves the content of the DataFrame in a text file at the specified path.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might want to add the DataFrame must have a column with the name "value" I recall there is a similar doc clarification in Scala recently.

#' The DataFrame must have only one column of string type with the name "value".
#' Each row becomes a new line in the output file.
#'
#' @param x A SparkSQL DataFrame
#' @param path The directory where the file is saved
#'
#' @family DataFrame functions
#' @rdname write.text
#' @name write.text
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.txt"
#' df <- read.text(sqlContext, path)
#' write.text(df, "/tmp/sparkr-tmp/")
#'}
setMethod("write.text",
signature(x = "DataFrame", path = "character"),
function(x, path) {
write <- callJMethod(x@sdf, "write")
invisible(callJMethod(write, "text", path))
})

#' Distinct
#'
#' Return a new DataFrame containing the distinct rows in this DataFrame.
Expand Down
26 changes: 26 additions & 0 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,32 @@ parquetFile <- function(sqlContext, ...) {
dataFrame(sdf)
}

#' Create a DataFrame from a text file.
#'
#' Loads a text file and returns a DataFrame with a single string column named "value".
#' Each line in the text file is a new row in the resulting DataFrame.
#'
#' @param sqlContext SQLContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @return DataFrame
#' @rdname read.text
#' @name read.text
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.txt"
#' df <- read.text(sqlContext, path)
#' }
read.text <- function(sqlContext, path) {
# Allow the user to have a more flexible definiton of the text file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sqlContext, "read")
sdf <- callJMethod(read, "text", paths)
dataFrame(sdf)
}

#' SQL Query
#'
#' Executes a SQL query using Spark, returning the result as a DataFrame.
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,10 @@ setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet")
#' @export
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })

#' @rdname write.text
#' @export
setGeneric("write.text", function(x, path) { standardGeneric("write.text") })

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't read.text be added too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found all functions in SQLContext.R did not use setGeneric, is this on purpose or a bug?
If it's a bug, I can fix it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is historical legacy:) (you can find same cases in context.R.) I guess that because we did not implement SparkContext and SQLContext as S4 classes. If there is no strong reason, we can keep it as is.
cc @shivaram.

#' @rdname schema
#' @export
setGeneric("schema", function(x) { standardGeneric("schema") })
Expand Down
21 changes: 21 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,27 @@ test_that("read/write Parquet files", {
unlink(parquetPath4)
})

test_that("read/write text files", {
# Test write.df and read.df
df <- read.df(sqlContext, jsonPath, "text")
expect_is(df, "DataFrame")
expect_equal(colnames(df), c("value"))
expect_equal(count(df), 3)
textPath <- tempfile(pattern = "textPath", fileext = ".txt")
write.df(df, textPath, "text", mode="overwrite")

# Test write.text and read.text
textPath2 <- tempfile(pattern = "textPath2", fileext = ".txt")
write.text(df, textPath2)
df2 <- read.text(sqlContext, c(textPath, textPath2))
expect_is(df2, "DataFrame")
expect_equal(colnames(df2), c("value"))
expect_equal(count(df2), count(df) * 2)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should unlink(textPath) and unlink(textPath2)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this, I will update it.

unlink(textPath)
unlink(textPath2)
})

test_that("describe() and summarize() on a DataFrame", {
df <- read.json(sqlContext, jsonPath)
stats <- describe(df, "age")
Expand Down