Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
028ad4b
[SPARK-7509][SQL] DataFrame.drop in Python for dropping columns.
rxin May 12, 2015
b94a933
[SPARK-7435] [SPARKR] Make DataFrame.show() consistent with that of S…
rekhajoshm May 12, 2015
1669675
[SQL] Rename Dialect -> ParserDialect.
rxin May 12, 2015
640f63b
[SPARK-6994][SQL] Update docs for fetching Row fields by name
May 12, 2015
8a4edec
[SPARK-7534] [CORE] [WEBUI] Fix the Stage table when a stage is missing
zsxwing May 12, 2015
9847875
[MINOR] [PYSPARK] Set PYTHONPATH to python/lib/pyspark.zip rather tha…
Sephiroth-Lin May 12, 2015
82e890f
[SPARK-7485] [BUILD] Remove pyspark files from assembly.
May 12, 2015
f3e8e60
[SPARK-7467] Dag visualization: treat checkpoint as an RDD operation
May 12, 2015
ec6f2a9
[SPARK-7532] [STREAMING] StreamingContext.start() made to logWarning …
tdas May 12, 2015
d86ce84
[SPARK-6876] [PySpark] [SQL] add DataFrame na.replace in pyspark
adrian-wang May 12, 2015
831504c
[DataFrame][minor] cleanup unapply methods in DataTypes
cloud-fan May 12, 2015
0595b6d
[SPARK-3928] [SPARK-5182] [SQL] Partitioning support for the data sou…
liancheng May 12, 2015
bfcaf8a
[DataFrame][minor] support column in field accessor
cloud-fan May 12, 2015
65697bb
[SPARK-7500] DAG visualization: move cluster labeling to dagre-d3
May 12, 2015
4e29052
[SPARK-7276] [DATAFRAME] speed up DataFrame.select by collapsing Project
cloud-fan May 12, 2015
b9b01f4
[HOT FIX #6076] DAG visualization: curve the edges
May 12, 2015
8e935b0
[SPARK-7487] [ML] Feature Parity in PySpark for ml.regression
brkyvz May 12, 2015
5438f49
[SPARK-2018] [CORE] Upgrade LZF library to fix endian serialization p…
tellison May 12, 2015
595a675
[SPARK-7015] [MLLIB] [WIP] Multiclass to Binary Reduction: One Agains…
May 12, 2015
2a41c0d
[SPARK-7569][SQL] Better error for invalid binary expressions
marmbrus May 12, 2015
23b9863
[SPARK-7559] [MLLIB] Bucketizer should include the right most boundar…
mengxr May 12, 2015
455551d
[SPARK-7484][SQL]Support jdbc connection properties
gvramana May 12, 2015
a4874b0
[SPARK-7571] [MLLIB] rename Math to math
mengxr May 12, 2015
1422e79
[SPARK-7406] [STREAMING] [WEBUI] Add tooltips for "Scheduling Delay",…
zsxwing May 12, 2015
1d70366
[SPARK-7496] [MLLIB] Update Programming guide with Online LDA
hhbyyh May 12, 2015
f0c1bc3
[SPARK-7557] [ML] [DOC] User guide for spark.ml HashingTF, Tokenizer
jkbradley May 12, 2015
96c4846
[SPARK-7573] [ML] OneVsRest cleanups
jkbradley May 12, 2015
00e7b09
[SPARK-7553] [STREAMING] Added methods to maintain a singleton Stream…
tdas May 12, 2015
2713bc6
[SPARK-7528] [MLLIB] make RankingMetrics Java-friendly
mengxr May 12, 2015
23f7d66
[SPARK-7554] [STREAMING] Throw exception when an active/stopped Strea…
tdas May 13, 2015
77f64c7
[SPARK-7572] [MLLIB] do not import Param/Params under pyspark.ml
mengxr May 13, 2015
247b703
[HOTFIX] Use the old Job API to support old Hadoop versions
zsxwing May 13, 2015
1b9e434
[SPARK-7592] Always set resolution to "Fixed" in PR merge script.
pwendell May 13, 2015
8fd5535
[SPARK-7588] Document all SQL/DataFrame public methods with @since tag
rxin May 13, 2015
97dee31
[SPARK-7321][SQL] Add Column expression for conditional statements (w…
rxin May 13, 2015
208b902
[SPARK-7566][SQL] Add type to HiveContext.analyzer
smola May 13, 2015
df9b94a
[SPARK-7482] [SPARKR] Rename some DataFrame API methods in SparkR to …
May 13, 2015
98195c3
[SPARK-7526] [SPARKR] Specify ip of RBackend, MonitorServer and RRDD …
Sephiroth-Lin May 13, 2015
50c7270
[SPARK-6568] spark-shell.cmd --jars option does not accept the jar th…
tsudukim May 13, 2015
10c546e
[SPARK-7599] [SQL] Don't restrict customized output committers to be …
liancheng May 13, 2015
b061bd5
[SQL] In InsertIntoFSBasedRelation.insert, log cause before abort job…
yhuai May 13, 2015
aa6ba3f
[MINOR] [SQL] Removes debugging println
liancheng May 13, 2015
0da254f
[SPARK-6734] [SQL] Add UDTF.close support in Generate
chenghao-intel May 13, 2015
db65660
Migrates Parquet data source to FSBasedRelation
liancheng May 10, 2015
261d8c1
Minor bug fix and more tests
liancheng May 11, 2015
f9ea56e
Adds ParquetRelation2 related classes to MiMa check whitelist
liancheng May 12, 2015
bfd1cf0
Fixes compilation error introduced while rebasing
liancheng May 13, 2015
6063f87
Casts to OutputCommitter rather than FileOutputCommtter
liancheng May 13, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ exportMethods("arrange",
"registerTempTable",
"rename",
"repartition",
"sampleDF",
"sample",
"sample_frac",
"saveAsParquetFile",
"saveAsTable",
Expand All @@ -53,7 +53,8 @@ exportMethods("arrange",
"unpersist",
"where",
"withColumn",
"withColumnRenamed")
"withColumnRenamed",
"write.df")

exportClasses("Column")

Expand Down Expand Up @@ -101,6 +102,7 @@ export("cacheTable",
"jsonFile",
"loadDF",
"parquetFile",
"read.df",
"sql",
"table",
"tableNames",
Expand Down
42 changes: 26 additions & 16 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ setMethod("isLocal",
callJMethod(x@sdf, "isLocal")
})

#' ShowDF
#' showDF
#'
#' Print the first numRows rows of a DataFrame
#'
Expand All @@ -170,7 +170,8 @@ setMethod("isLocal",
setMethod("showDF",
signature(x = "DataFrame"),
function(x, numRows = 20) {
callJMethod(x@sdf, "showString", numToInt(numRows))
s <- callJMethod(x@sdf, "showString", numToInt(numRows))
cat(s)
})

#' show
Expand All @@ -187,7 +188,7 @@ setMethod("showDF",
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' show(df)
#' df
#'}
setMethod("show", "DataFrame",
function(object) {
Expand Down Expand Up @@ -293,8 +294,8 @@ setMethod("registerTempTable",
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' df <- loadDF(sqlCtx, path, "parquet")
#' df2 <- loadDF(sqlCtx, path2, "parquet")
#' df <- read.df(sqlCtx, path, "parquet")
#' df2 <- read.df(sqlCtx, path2, "parquet")
#' registerTempTable(df, "table1")
#' insertInto(df2, "table1", overwrite = TRUE)
#'}
Expand Down Expand Up @@ -472,14 +473,14 @@ setMethod("distinct",
dataFrame(sdf)
})

#' SampleDF
#' Sample
#'
#' Return a sampled subset of this DataFrame using a random seed.
#'
#' @param x A SparkSQL DataFrame
#' @param withReplacement Sampling with replacement or not
#' @param fraction The (rough) sample target fraction
#' @rdname sampleDF
#' @rdname sample
#' @aliases sample_frac
#' @export
#' @examples
Expand All @@ -488,10 +489,10 @@ setMethod("distinct",
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' collect(sampleDF(df, FALSE, 0.5))
#' collect(sampleDF(df, TRUE, 0.5))
#' collect(sample(df, FALSE, 0.5))
#' collect(sample(df, TRUE, 0.5))
#'}
setMethod("sampleDF",
setMethod("sample",
# TODO : Figure out how to send integer as java.lang.Long to JVM so
# we can send seed as an argument through callJMethod
signature(x = "DataFrame", withReplacement = "logical",
Expand All @@ -502,13 +503,13 @@ setMethod("sampleDF",
dataFrame(sdf)
})

#' @rdname sampleDF
#' @aliases sampleDF
#' @rdname sample
#' @aliases sample
setMethod("sample_frac",
signature(x = "DataFrame", withReplacement = "logical",
fraction = "numeric"),
function(x, withReplacement, fraction) {
sampleDF(x, withReplacement, fraction)
sample(x, withReplacement, fraction)
})

#' Count
Expand Down Expand Up @@ -1302,17 +1303,17 @@ setMethod("except",
#' @param source A name for external data source
#' @param mode One of 'append', 'overwrite', 'error', 'ignore'
#'
#' @rdname saveAsTable
#' @rdname write.df
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' saveAsTable(df, "myfile")
#' write.df(df, "myfile", "parquet", "overwrite")
#' }
setMethod("saveDF",
setMethod("write.df",
signature(df = "DataFrame", path = 'character', source = 'character',
mode = 'character'),
function(df, path = NULL, source = NULL, mode = "append", ...){
Expand All @@ -1333,6 +1334,15 @@ setMethod("saveDF",
callJMethod(df@sdf, "save", source, jmode, options)
})

#' @rdname write.df
#' @aliases saveDF
#' @export
setMethod("saveDF",
signature(df = "DataFrame", path = 'character', source = 'character',
mode = 'character'),
function(df, path = NULL, source = NULL, mode = "append", ...){
write.df(df, path, source, mode, ...)
})

#' saveAsTable
#'
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
MAXINT)))))

# TODO(zongheng): investigate if this call is an in-place shuffle?
sample(samples)[1:total]
base::sample(samples)[1:total]
})

# Creates tuples of the elements in this RDD by applying a function.
Expand Down Expand Up @@ -996,7 +996,7 @@ setMethod("coalesce",
if (shuffle || numPartitions > SparkR:::numPartitions(x)) {
func <- function(partIndex, part) {
set.seed(partIndex) # partIndex as seed
start <- as.integer(sample(numPartitions, 1) - 1)
start <- as.integer(base::sample(numPartitions, 1) - 1)
lapply(seq_along(part),
function(i) {
pos <- (start + i) %% numPartitions
Expand Down
13 changes: 10 additions & 3 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ clearCache <- function(sqlCtx) {
#' \dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' df <- loadDF(sqlCtx, path, "parquet")
#' df <- read.df(sqlCtx, path, "parquet")
#' registerTempTable(df, "table")
#' dropTempTable(sqlCtx, "table")
#' }
Expand Down Expand Up @@ -450,10 +450,10 @@ dropTempTable <- function(sqlCtx, tableName) {
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' df <- load(sqlCtx, "path/to/file.json", source = "json")
#' df <- read.df(sqlCtx, "path/to/file.json", source = "json")
#' }

loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
read.df <- function(sqlCtx, path = NULL, source = NULL, ...) {
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] <- path
Expand All @@ -462,6 +462,13 @@ loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
dataFrame(sdf)
}

#' @aliases loadDF
#' @export

loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
read.df(sqlCtx, path, source, ...)
}

#' Create an external table
#'
#' Creates an external table based on the dataset in a data source,
Expand Down
22 changes: 13 additions & 9 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -456,19 +456,19 @@ setGeneric("rename", function(x, ...) { standardGeneric("rename") })
#' @export
setGeneric("registerTempTable", function(x, tableName) { standardGeneric("registerTempTable") })

#' @rdname sampleDF
#' @rdname sample
#' @export
setGeneric("sample_frac",
setGeneric("sample",
function(x, withReplacement, fraction, seed) {
standardGeneric("sample_frac")
})
standardGeneric("sample")
})

#' @rdname sampleDF
#' @rdname sample
#' @export
setGeneric("sampleDF",
setGeneric("sample_frac",
function(x, withReplacement, fraction, seed) {
standardGeneric("sampleDF")
})
standardGeneric("sample_frac")
})

#' @rdname saveAsParquetFile
#' @export
Expand All @@ -480,7 +480,11 @@ setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
standardGeneric("saveAsTable")
})

#' @rdname saveAsTable
#' @rdname write.df
#' @export
setGeneric("write.df", function(df, path, source, mode, ...) { standardGeneric("write.df") })

#' @rdname write.df
#' @export
setGeneric("saveDF", function(df, path, source, mode, ...) { standardGeneric("saveDF") })

Expand Down
43 changes: 22 additions & 21 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -209,18 +209,18 @@ test_that("registerTempTable() results in a queryable table and sql() results in
})

test_that("insertInto() on a registered table", {
df <- loadDF(sqlCtx, jsonPath, "json")
saveDF(df, parquetPath, "parquet", "overwrite")
dfParquet <- loadDF(sqlCtx, parquetPath, "parquet")
df <- read.df(sqlCtx, jsonPath, "json")
write.df(df, parquetPath, "parquet", "overwrite")
dfParquet <- read.df(sqlCtx, parquetPath, "parquet")

lines <- c("{\"name\":\"Bob\", \"age\":24}",
"{\"name\":\"James\", \"age\":35}")
jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".tmp")
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
writeLines(lines, jsonPath2)
df2 <- loadDF(sqlCtx, jsonPath2, "json")
saveDF(df2, parquetPath2, "parquet", "overwrite")
dfParquet2 <- loadDF(sqlCtx, parquetPath2, "parquet")
df2 <- read.df(sqlCtx, jsonPath2, "json")
write.df(df2, parquetPath2, "parquet", "overwrite")
dfParquet2 <- read.df(sqlCtx, parquetPath2, "parquet")

registerTempTable(dfParquet, "table1")
insertInto(dfParquet2, "table1")
Expand Down Expand Up @@ -421,12 +421,12 @@ test_that("distinct() on DataFrames", {
expect_true(count(uniques) == 3)
})

test_that("sampleDF on a DataFrame", {
test_that("sample on a DataFrame", {
df <- jsonFile(sqlCtx, jsonPath)
sampled <- sampleDF(df, FALSE, 1.0)
sampled <- sample(df, FALSE, 1.0)
expect_equal(nrow(collect(sampled)), count(df))
expect_true(inherits(sampled, "DataFrame"))
sampled2 <- sampleDF(df, FALSE, 0.1)
sampled2 <- sample(df, FALSE, 0.1)
expect_true(count(sampled2) < 3)

# Also test sample_frac
Expand Down Expand Up @@ -491,16 +491,16 @@ test_that("column calculation", {
expect_true(count(df2) == 3)
})

test_that("load() from json file", {
df <- loadDF(sqlCtx, jsonPath, "json")
test_that("read.df() from json file", {
df <- read.df(sqlCtx, jsonPath, "json")
expect_true(inherits(df, "DataFrame"))
expect_true(count(df) == 3)
})

test_that("save() as parquet file", {
df <- loadDF(sqlCtx, jsonPath, "json")
saveDF(df, parquetPath, "parquet", mode="overwrite")
df2 <- loadDF(sqlCtx, parquetPath, "parquet")
test_that("write.df() as parquet file", {
df <- read.df(sqlCtx, jsonPath, "json")
write.df(df, parquetPath, "parquet", mode="overwrite")
df2 <- read.df(sqlCtx, parquetPath, "parquet")
expect_true(inherits(df2, "DataFrame"))
expect_true(count(df2) == 3)
})
Expand Down Expand Up @@ -653,7 +653,8 @@ test_that("toJSON() returns an RDD of the correct values", {

test_that("showDF()", {
df <- jsonFile(sqlCtx, jsonPath)
expect_output(showDF(df), "+----+-------+\n| age| name|\n+----+-------+\n|null|Michael|\n| 30| Andy|\n| 19| Justin|\n+----+-------+\n")
s <- capture.output(showDF(df))
expect_output(s , "+----+-------+\n| age| name|\n+----+-------+\n|null|Michael|\n| 30| Andy|\n| 19| Justin|\n+----+-------+\n")
})

test_that("isLocal()", {
Expand All @@ -669,7 +670,7 @@ test_that("unionAll(), except(), and intersect() on a DataFrame", {
"{\"name\":\"James\", \"age\":35}")
jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(lines, jsonPath2)
df2 <- loadDF(sqlCtx, jsonPath2, "json")
df2 <- read.df(sqlCtx, jsonPath2, "json")

unioned <- arrange(unionAll(df, df2), df$age)
expect_true(inherits(unioned, "DataFrame"))
Expand Down Expand Up @@ -711,19 +712,19 @@ test_that("mutate() and rename()", {
expect_true(columns(newDF2)[1] == "newerAge")
})

test_that("saveDF() on DataFrame and works with parquetFile", {
test_that("write.df() on DataFrame and works with parquetFile", {
df <- jsonFile(sqlCtx, jsonPath)
saveDF(df, parquetPath, "parquet", mode="overwrite")
write.df(df, parquetPath, "parquet", mode="overwrite")
parquetDF <- parquetFile(sqlCtx, parquetPath)
expect_true(inherits(parquetDF, "DataFrame"))
expect_equal(count(df), count(parquetDF))
})

test_that("parquetFile works with multiple input paths", {
df <- jsonFile(sqlCtx, jsonPath)
saveDF(df, parquetPath, "parquet", mode="overwrite")
write.df(df, parquetPath, "parquet", mode="overwrite")
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
saveDF(df, parquetPath2, "parquet", mode="overwrite")
write.df(df, parquetPath2, "parquet", mode="overwrite")
parquetDF <- parquetFile(sqlCtx, parquetPath, parquetPath2)
expect_true(inherits(parquetDF, "DataFrame"))
expect_true(count(parquetDF) == count(df)*2)
Expand Down
Loading