Skip to content

Commit 0ba3fdd

Browse files
hlin09shivaram
authored andcommitted
[Minor][SparkR] Minor refactor and removes redundancy related to cleanClosure.
1. Only use `cleanClosure` in creation of RRDDs. Normally, user and developer do not need to call `cleanClosure` in their function definition. 2. Removes redundant code (e.g. unnecessary wrapper functions) related to `cleanClosure`. Author: hlin09 <[email protected]> Closes apache#5495 from hlin09/cleanClosureFix and squashes the following commits: 74ec303 [hlin09] Minor refactor and removes redundancy.
1 parent b45059d commit 0ba3fdd

File tree

2 files changed

+4
-16
lines changed

2 files changed

+4
-16
lines changed

R/pkg/R/RDD.R

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
8585

8686
if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
8787
# This transformation is the first in its stage:
88-
.Object@func <- func
88+
.Object@func <- cleanClosure(func)
8989
.Object@prev_jrdd <- getJRDD(prev)
9090
.Object@env$prev_serializedMode <- prev@env$serializedMode
9191
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
@@ -94,7 +94,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
9494
pipelinedFunc <- function(split, iterator) {
9595
func(split, prev@func(split, iterator))
9696
}
97-
.Object@func <- pipelinedFunc
97+
.Object@func <- cleanClosure(pipelinedFunc)
9898
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
9999
# Get the serialization mode of the parent RDD
100100
.Object@env$prev_serializedMode <- prev@env$prev_serializedMode
@@ -144,17 +144,13 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
144144
return(rdd@env$jrdd_val)
145145
}
146146

147-
computeFunc <- function(split, part) {
148-
rdd@func(split, part)
149-
}
150-
151147
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
152148
connection = NULL)
153149

154150
broadcastArr <- lapply(ls(.broadcastNames),
155151
function(name) { get(name, .broadcastNames) })
156152

157-
serializedFuncArr <- serialize(computeFunc, connection = NULL)
153+
serializedFuncArr <- serialize(rdd@func, connection = NULL)
158154

159155
prev_jrdd <- rdd@prev_jrdd
160156

@@ -551,11 +547,7 @@ setMethod("mapPartitions",
551547
setMethod("lapplyPartitionsWithIndex",
552548
signature(X = "RDD", FUN = "function"),
553549
function(X, FUN) {
554-
FUN <- cleanClosure(FUN)
555-
closureCapturingFunc <- function(split, part) {
556-
FUN(split, part)
557-
}
558-
PipelinedRDD(X, closureCapturingFunc)
550+
PipelinedRDD(X, FUN)
559551
})
560552

561553
#' @rdname lapplyPartitionsWithIndex

R/pkg/R/pairRDD.R

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -694,10 +694,6 @@ setMethod("cogroup",
694694
for (i in 1:rddsLen) {
695695
rdds[[i]] <- lapply(rdds[[i]],
696696
function(x) { list(x[[1]], list(i, x[[2]])) })
697-
# TODO(hao): As issue [SparkR-142] mentions, the right value of i
698-
# will not be captured into UDF if getJRDD is not invoked.
699-
# It should be resolved together with that issue.
700-
getJRDD(rdds[[i]]) # Capture the closure.
701697
}
702698
union.rdd <- Reduce(unionRDD, rdds)
703699
group.func <- function(vlist) {

0 commit comments

Comments
 (0)