Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ exportMethods("arrange",
"write.df",
"write.json",
"write.parquet",
"write.text")
"write.text",
"write.csv")

exportClasses("Column")

Expand Down Expand Up @@ -284,6 +285,7 @@ export("as.DataFrame",
"read.json",
"read.parquet",
"read.text",
"read.csv",
"sql",
"str",
"tableToDF",
Expand Down
27 changes: 27 additions & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,33 @@ setMethod("write.text",
invisible(callJMethod(write, "text", path))
})

#' write.csv
#'
#' Save the contents of a DataFrame as a CSV file. Files written out
#' with this method can be read back in as a DataFrame using read.csv().
#'
#' @param x A SparkSQL DataFrame
#' @param path The directory where the file is saved
#'
#' @family DataFrame functions
#' @rdname write.csv
#' @name write.csv
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.csv"
#' df <- read.csv(sqlContext, path)
#' write.csv(df, "/tmp/sparkr-tmp/")
#'}
setMethod("write.csv",
signature(x = "DataFrame", path = "character"),
function(x, path) {
write <- callJMethod(x@sdf, "write")
invisible(callJMethod(write, "csv", path))
})

#' Distinct
#'
#' Return a new DataFrame containing the distinct rows in this DataFrame.
Expand Down
30 changes: 28 additions & 2 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ setMethod("toDF", signature(x = "RDD"),
#' df <- jsonFile(sqlContext, path)
#' }
read.json <- function(sqlContext, path) {
# Allow the user to have a more flexible definiton of the text file path
# Allow the user to have a more flexible definiton of the JSON file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sqlContext, "read")
sdf <- callJMethod(read, "json", paths)
Expand Down Expand Up @@ -279,7 +279,7 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
#' @name read.parquet
#' @export
read.parquet <- function(sqlContext, path) {
# Allow the user to have a more flexible definiton of the text file path
# Allow the user to have a more flexible definiton of the Parquet file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sqlContext, "read")
sdf <- callJMethod(read, "parquet", paths)
Expand Down Expand Up @@ -321,6 +321,32 @@ read.text <- function(sqlContext, path) {
dataFrame(sdf)
}

#' Create a DataFrame from a CSV file.
#'
#' Loads a CSV file, returning the result as a DataFrame
#' It goes through the entire dataset once to determine the schema.
#'
#' @param sqlContext SQLContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @return DataFrame
#' @rdname read.csv
#' @name read.csv
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.csv"
#' df <- read.csv(sqlContext, path)
#' }
read.csv <- function(sqlContext, path) {
# Allow the user to have a more flexible definiton of the CSV file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sqlContext, "read")
sdf <- callJMethod(read, "csv", paths)
dataFrame(sdf)
}

#' SQL Query
#'
#' Executes a SQL query using Spark, returning the result as a DataFrame.
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,10 @@ setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParqu
#' @export
setGeneric("write.text", function(x, path) { standardGeneric("write.text") })

#' @rdname write.csv
#' @export
setGeneric("write.csv", function(x, path) { standardGeneric("write.csv") })

#' @rdname schema
#' @export
setGeneric("schema", function(x) { standardGeneric("schema") })
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/testthat/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ test_that("Check masked functions", {
maskedBySparkR <- masked[funcSparkROrEmpty]
namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
"summary", "transform", "drop")
"summary", "transform", "drop", "read.csv", "write.csv")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixcheung @sun-rui (Firstly, sorry I am not pretty much fluent to R).

I am not too sure why it says they are being masked. I anyway added those here and could pass all tests but still I am wondering why it is and if it is correct or not. When I change the function name to another, it looks fine but it looks there are no such functions to mask.

It looks it masks the functions called read.csv and write.csv. The console output was below:

The following object is masked from ‘package:testthat’:

    describe

The following objects are masked from ‘package:stats’:

    cov, filter, lag, na.omit, predict, sd, var

The following objects are masked from ‘package:utils’:

    read.csv, write.csv

The following objects are masked from ‘package:base’:

    colnames, colnames<-, drop, intersect, rank, rbind, sample, subset,
    summary, transform

Would you please give me some advices on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are existing read.csv()/csv2() and write.csv()/csv2() in the R base package. It would be great that we can implement these functions in SparkR with the same signature in the R base package. Thus users can use these function in both SparkR and base package. It seems that the supported option in the signatures of these functions in the R base package are also supported in spark-csv.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sun-rui Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To oversimplify it, it would mask the other implementation when the name conflicts but the parameter signature is not exactly the same. Therefore try to see if you could change it to match as sun-rui suggests.
https://stat.ethz.ch/R-manual/R-devel/library/utils/html/read.table.html

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixcheung Actually, I have been looking up the link and working on this; however, I realised that it actually cannot use the extactly same parameters as spark-csv does not support all. So, I tried to add some possible parameters (locally) but it looks it ended up with incomplete versions.

This drags me into a thought that this one might have to be merged first and then dealt with later (also partly because CSV options are not confirmed yet and they are not fully merged yet, SPARK-12420).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, would you maybe create another PR based on this? I think it takes pretty much to figure out for me since this is my very second R codes (sorry).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is likely an issue though - R base:: read.csv is very commonly used and a name conflict as detected here will make base version inaccessible whenever SparkR package is loaded.
I can probably help to sort out the right signature - I'll try to check this out in the next few days.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixcheung Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its better to match the arguments of read.csv and not mask the local read.csv -- One thing is that we don't need to support all the options and in case some of the flags we don't support are set we can throw a warning / error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixcheung Could you maybe give me the link for the PR if you opened? I will close this after checking another PR is open.

expect_equal(length(maskedBySparkR), length(namesOfMasked))
expect_equal(sort(maskedBySparkR), sort(namesOfMasked))
# above are those reported as masked when `library(SparkR)`
Expand All @@ -36,7 +36,7 @@ test_that("Check masked functions", {
any(grepl("=\"ANY\"", capture.output(showMethods(x)[-1])))
}))
maskedCompletely <- masked[!funcHasAny]
namesOfMaskedCompletely <- c("cov", "filter", "sample")
namesOfMaskedCompletely <- c("cov", "filter", "sample", "read.csv", "write.csv")
expect_equal(length(maskedCompletely), length(namesOfMaskedCompletely))
expect_equal(sort(maskedCompletely), sort(namesOfMaskedCompletely))
})
Expand Down
25 changes: 25 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ mockLines <- c("{\"name\":\"Michael\"}",
"{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"Justin\", \"age\":19}")
jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
csvPath <- tempfile(pattern="sparkr-test", fileext=".csv")
parquetPath <- tempfile(pattern="sparkr-test", fileext=".parquet")
writeLines(mockLines, jsonPath)

Expand Down Expand Up @@ -1585,6 +1586,30 @@ test_that("read/write text files", {
unlink(textPath2)
})

test_that("read/write csv files", {
df <- read.df(sqlContext, jsonPath, "json")
write.df(df, csvPath, "csv", "overwrite")

# Test write.df and read.df
dfCSV <- read.df(sqlContext, csvPath, "csv")
expect_is(dfCSV, "DataFrame")
expect_equal(colnames(dfCSV), c("C0", "C1"))
expect_equal(count(dfCSV), 3)
csvPath2 <- tempfile(pattern = "csvPath2", fileext = ".csv")
write.df(dfCSV, csvPath2, "csv", mode="overwrite")

# Test write.csv and read.csv
csvPath3 <- tempfile(pattern = "csvPath3", fileext = ".csv")
write.csv(dfCSV, csvPath3)
dfCSV2 <- read.csv(sqlContext, c(csvPath2, csvPath3))
expect_is(dfCSV2, "DataFrame")
expect_equal(colnames(dfCSV2), c("C0", "C1"))
expect_equal(count(dfCSV2), count(dfCSV) * 2)

unlink(csvPath2)
unlink(csvPath3)
})

test_that("describe() and summarize() on a DataFrame", {
df <- read.json(sqlContext, jsonPath)
stats <- describe(df, "age")
Expand Down