Skip to content

Commit d6667da

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into SPARK-9416
Conflicts: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
2 parents 8df6b09 + e375456 commit d6667da

File tree

628 files changed

+31068
-7528
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

628 files changed

+31068
-7528
lines changed

R/pkg/NAMESPACE

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ export("print.jobj")
1212

1313
# MLlib integration
1414
exportMethods("glm",
15-
"predict")
15+
"predict",
16+
"summary")
1617

1718
# Job group lifecycle management methods
1819
export("setJobGroup",
@@ -28,6 +29,7 @@ exportMethods("arrange",
2829
"count",
2930
"crosstab",
3031
"describe",
32+
"dim",
3133
"distinct",
3234
"dropna",
3335
"dtypes",
@@ -44,11 +46,16 @@ exportMethods("arrange",
4446
"isLocal",
4547
"join",
4648
"limit",
49+
"merge",
50+
"names",
51+
"ncol",
52+
"nrow",
4753
"orderBy",
4854
"mutate",
4955
"names",
5056
"persist",
5157
"printSchema",
58+
"rbind",
5259
"registerTempTable",
5360
"rename",
5461
"repartition",
@@ -63,8 +70,10 @@ exportMethods("arrange",
6370
"show",
6471
"showDF",
6572
"summarize",
73+
"summary",
6674
"take",
6775
"unionAll",
76+
"unique",
6877
"unpersist",
6978
"where",
7079
"withColumn",

R/pkg/R/DataFrame.R

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,16 @@ setMethod("names",
255255
columns(x)
256256
})
257257

258+
#' @rdname columns
259+
setMethod("names<-",
260+
signature(x = "DataFrame"),
261+
function(x, value) {
262+
if (!is.null(value)) {
263+
sdf <- callJMethod(x@sdf, "toDF", listToSeq(as.list(value)))
264+
dataFrame(sdf)
265+
}
266+
})
267+
258268
#' Register Temporary Table
259269
#'
260270
#' Registers a DataFrame as a Temporary Table in the SQLContext
@@ -473,6 +483,18 @@ setMethod("distinct",
473483
dataFrame(sdf)
474484
})
475485

486+
#' @title Distinct rows in a DataFrame
487+
#
488+
#' @description Returns a new DataFrame containing distinct rows in this DataFrame
489+
#'
490+
#' @rdname unique
491+
#' @aliases unique
492+
setMethod("unique",
493+
signature(x = "DataFrame"),
494+
function(x) {
495+
distinct(x)
496+
})
497+
476498
#' Sample
477499
#'
478500
#' Return a sampled subset of this DataFrame using a random seed.
@@ -534,6 +556,58 @@ setMethod("count",
534556
callJMethod(x@sdf, "count")
535557
})
536558

559+
#' @title Number of rows for a DataFrame
560+
#' @description Returns number of rows in a DataFrames
561+
#'
562+
#' @name nrow
563+
#'
564+
#' @rdname nrow
565+
#' @aliases count
566+
setMethod("nrow",
567+
signature(x = "DataFrame"),
568+
function(x) {
569+
count(x)
570+
})
571+
572+
#' Returns the number of columns in a DataFrame
573+
#'
574+
#' @param x a SparkSQL DataFrame
575+
#'
576+
#' @rdname ncol
577+
#' @export
578+
#' @examples
579+
#'\dontrun{
580+
#' sc <- sparkR.init()
581+
#' sqlContext <- sparkRSQL.init(sc)
582+
#' path <- "path/to/file.json"
583+
#' df <- jsonFile(sqlContext, path)
584+
#' ncol(df)
585+
#' }
586+
setMethod("ncol",
587+
signature(x = "DataFrame"),
588+
function(x) {
589+
length(columns(x))
590+
})
591+
592+
#' Returns the dimentions (number of rows and columns) of a DataFrame
593+
#' @param x a SparkSQL DataFrame
594+
#'
595+
#' @rdname dim
596+
#' @export
597+
#' @examples
598+
#'\dontrun{
599+
#' sc <- sparkR.init()
600+
#' sqlContext <- sparkRSQL.init(sc)
601+
#' path <- "path/to/file.json"
602+
#' df <- jsonFile(sqlContext, path)
603+
#' dim(df)
604+
#' }
605+
setMethod("dim",
606+
signature(x = "DataFrame"),
607+
function(x) {
608+
c(count(x), ncol(x))
609+
})
610+
537611
#' Collects all the elements of a Spark DataFrame and coerces them into an R data.frame.
538612
#'
539613
#' @param x A SparkSQL DataFrame
@@ -1205,6 +1279,15 @@ setMethod("join",
12051279
dataFrame(sdf)
12061280
})
12071281

1282+
#' rdname merge
1283+
#' aliases join
1284+
setMethod("merge",
1285+
signature(x = "DataFrame", y = "DataFrame"),
1286+
function(x, y, joinExpr = NULL, joinType = NULL, ...) {
1287+
join(x, y, joinExpr, joinType)
1288+
})
1289+
1290+
12081291
#' UnionAll
12091292
#'
12101293
#' Return a new DataFrame containing the union of rows in this DataFrame
@@ -1231,6 +1314,22 @@ setMethod("unionAll",
12311314
dataFrame(unioned)
12321315
})
12331316

1317+
#' @title Union two or more DataFrames
1318+
#
1319+
#' @description Returns a new DataFrame containing rows of all parameters.
1320+
#
1321+
#' @rdname rbind
1322+
#' @aliases unionAll
1323+
setMethod("rbind",
1324+
signature(... = "DataFrame"),
1325+
function(x, ..., deparse.level = 1) {
1326+
if (nargs() == 3) {
1327+
unionAll(x, ...)
1328+
} else {
1329+
unionAll(x, Recall(..., deparse.level = 1))
1330+
}
1331+
})
1332+
12341333
#' Intersect
12351334
#'
12361335
#' Return a new DataFrame containing rows only in both this DataFrame
@@ -1322,9 +1421,11 @@ setMethod("write.df",
13221421
"org.apache.spark.sql.parquet")
13231422
}
13241423
allModes <- c("append", "overwrite", "error", "ignore")
1424+
# nolint start
13251425
if (!(mode %in% allModes)) {
13261426
stop('mode should be one of "append", "overwrite", "error", "ignore"')
13271427
}
1428+
# nolint end
13281429
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
13291430
options <- varargsToEnv(...)
13301431
if (!is.null(path)) {
@@ -1384,9 +1485,11 @@ setMethod("saveAsTable",
13841485
"org.apache.spark.sql.parquet")
13851486
}
13861487
allModes <- c("append", "overwrite", "error", "ignore")
1488+
# nolint start
13871489
if (!(mode %in% allModes)) {
13881490
stop('mode should be one of "append", "overwrite", "error", "ignore"')
13891491
}
1492+
# nolint end
13901493
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
13911494
options <- varargsToEnv(...)
13921495
callJMethod(df@sdf, "saveAsTable", tableName, source, jmode, options)
@@ -1430,6 +1533,19 @@ setMethod("describe",
14301533
dataFrame(sdf)
14311534
})
14321535

1536+
#' @title Summary
1537+
#'
1538+
#' @description Computes statistics for numeric columns of the DataFrame
1539+
#'
1540+
#' @rdname summary
1541+
#' @aliases describe
1542+
setMethod("summary",
1543+
signature(x = "DataFrame"),
1544+
function(x) {
1545+
describe(x)
1546+
})
1547+
1548+
14331549
#' dropna
14341550
#'
14351551
#' Returns a new DataFrame omitting rows with null values.

R/pkg/R/RDD.R

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
8585

8686
isPipelinable <- function(rdd) {
8787
e <- rdd@env
88+
# nolint start
8889
!(e$isCached || e$isCheckpointed)
90+
# nolint end
8991
}
9092

9193
if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
@@ -97,7 +99,8 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
9799
# prev_serializedMode is used during the delayed computation of JRDD in getJRDD
98100
} else {
99101
pipelinedFunc <- function(partIndex, part) {
100-
func(partIndex, prev@func(partIndex, part))
102+
f <- prev@func
103+
func(partIndex, f(partIndex, part))
101104
}
102105
.Object@func <- cleanClosure(pipelinedFunc)
103106
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
@@ -841,7 +844,7 @@ setMethod("sampleRDD",
841844
if (withReplacement) {
842845
count <- rpois(1, fraction)
843846
if (count > 0) {
844-
res[(len + 1):(len + count)] <- rep(list(elem), count)
847+
res[ (len + 1) : (len + count) ] <- rep(list(elem), count)
845848
len <- len + count
846849
}
847850
} else {
@@ -1261,12 +1264,12 @@ setMethod("pipeRDD",
12611264
signature(x = "RDD", command = "character"),
12621265
function(x, command, env = list()) {
12631266
func <- function(part) {
1264-
trim.trailing.func <- function(x) {
1267+
trim_trailing_func <- function(x) {
12651268
sub("[\r\n]*$", "", toString(x))
12661269
}
1267-
input <- unlist(lapply(part, trim.trailing.func))
1270+
input <- unlist(lapply(part, trim_trailing_func))
12681271
res <- system2(command, stdout = TRUE, input = input, env = env)
1269-
lapply(res, trim.trailing.func)
1272+
lapply(res, trim_trailing_func)
12701273
}
12711274
lapplyPartition(x, func)
12721275
})

R/pkg/R/backend.R

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ invokeJava <- function(isStatic, objId, methodName, ...) {
110110

111111
# TODO: check the status code to output error information
112112
returnStatus <- readInt(conn)
113-
stopifnot(returnStatus == 0)
113+
if (returnStatus != 0) {
114+
stop(readString(conn))
115+
}
114116
readObject(conn)
115117
}

R/pkg/R/column.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ functions <- c("min", "max", "sum", "avg", "mean", "count", "abs", "sqrt",
6565
"acos", "asin", "atan", "cbrt", "ceiling", "cos", "cosh", "exp",
6666
"expm1", "floor", "log", "log10", "log1p", "rint", "sign",
6767
"sin", "sinh", "tan", "tanh", "toDegrees", "toRadians")
68-
binary_mathfunctions<- c("atan2", "hypot")
68+
binary_mathfunctions <- c("atan2", "hypot")
6969

7070
createOperator <- function(op) {
7171
setMethod(op,

R/pkg/R/context.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ parallelize <- function(sc, coll, numSlices = 1) {
121121
numSlices <- length(coll)
122122

123123
sliceLen <- ceiling(length(coll) / numSlices)
124-
slices <- split(coll, rep(1:(numSlices + 1), each = sliceLen)[1:length(coll)])
124+
slices <- split(coll, rep(1: (numSlices + 1), each = sliceLen)[1:length(coll)])
125125

126126
# Serialize each slice: obtain a list of raws, or a list of lists (slices) of
127127
# 2-tuples of raws

R/pkg/R/generics.R

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,10 @@ setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues")
254254

255255
# @rdname intersection
256256
# @export
257-
setGeneric("intersection", function(x, other, numPartitions = 1) {
258-
standardGeneric("intersection") })
257+
setGeneric("intersection",
258+
function(x, other, numPartitions = 1) {
259+
standardGeneric("intersection")
260+
})
259261

260262
# @rdname keys
261263
# @export
@@ -459,6 +461,10 @@ setGeneric("isLocal", function(x) { standardGeneric("isLocal") })
459461
#' @export
460462
setGeneric("limit", function(x, num) {standardGeneric("limit") })
461463

464+
#' rdname merge
465+
#' @export
466+
setGeneric("merge")
467+
462468
#' @rdname withColumn
463469
#' @export
464470
setGeneric("mutate", function(x, ...) {standardGeneric("mutate") })
@@ -489,9 +495,7 @@ setGeneric("sample",
489495
#' @rdname sample
490496
#' @export
491497
setGeneric("sample_frac",
492-
function(x, withReplacement, fraction, seed) {
493-
standardGeneric("sample_frac")
494-
})
498+
function(x, withReplacement, fraction, seed) { standardGeneric("sample_frac") })
495499

496500
#' @rdname saveAsParquetFile
497501
#' @export
@@ -531,6 +535,10 @@ setGeneric("showDF", function(x,...) { standardGeneric("showDF") })
531535
#' @export
532536
setGeneric("summarize", function(x,...) { standardGeneric("summarize") })
533537

538+
##' rdname summary
539+
##' @export
540+
setGeneric("summary", function(x, ...) { standardGeneric("summary") })
541+
534542
# @rdname tojson
535543
# @export
536544
setGeneric("toJSON", function(x) { standardGeneric("toJSON") })
@@ -553,8 +561,8 @@ setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn
553561

554562
#' @rdname withColumnRenamed
555563
#' @export
556-
setGeneric("withColumnRenamed", function(x, existingCol, newCol) {
557-
standardGeneric("withColumnRenamed") })
564+
setGeneric("withColumnRenamed",
565+
function(x, existingCol, newCol) { standardGeneric("withColumnRenamed") })
558566

559567

560568
###################### Column Methods ##########################
@@ -669,3 +677,7 @@ setGeneric("upper", function(x) { standardGeneric("upper") })
669677
#' @rdname glm
670678
#' @export
671679
setGeneric("glm")
680+
681+
#' @rdname rbind
682+
#' @export
683+
setGeneric("rbind", signature = "...")

0 commit comments

Comments
 (0)