Skip to content

Commit 7ec04db

Browse files
committed
Resolv conflict
2 parents 1502d13 + a1e3649 commit 7ec04db

File tree

183 files changed

+3363
-622
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

183 files changed

+3363
-622
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ scalastyle-output.xml
6666
R-unit-tests.log
6767
R/unit-tests.out
6868
python/lib/pyspark.zip
69+
lint-r-report.log
6970

7071
# For Hive
7172
metastore_db/

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,4 @@ local-1430917381535_2
8686
DESCRIPTION
8787
NAMESPACE
8888
test_support/*
89+
.lintr

LICENSE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -950,3 +950,4 @@ The following components are provided under the MIT License. See project link fo
950950
(MIT License) scopt (com.github.scopt:scopt_2.10:3.2.0 - https://github.com/scopt/scopt)
951951
(The MIT License) Mockito (org.mockito:mockito-all:1.8.5 - http://www.mockito.org)
952952
(MIT License) jquery (https://jquery.org/license/)
953+
(MIT License) AnchorJS (https://github.com/bryanbraun/anchorjs)

R/pkg/.lintr

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
linters: with_defaults(line_length_linter(100), camel_case_linter = NULL)
2+
exclusions: list("inst/profile/general.R" = 1, "inst/profile/shell.R")

R/pkg/NAMESPACE

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ export("sparkR.init")
1010
export("sparkR.stop")
1111
export("print.jobj")
1212

13+
# Job group lifecycle management methods
14+
export("setJobGroup",
15+
"clearJobGroup",
16+
"cancelJobGroup")
17+
1318
exportClasses("DataFrame")
1419

1520
exportMethods("arrange",

R/pkg/R/sparkR.R

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,3 +278,47 @@ sparkRHive.init <- function(jsc = NULL) {
278278
assign(".sparkRHivesc", hiveCtx, envir = .sparkREnv)
279279
hiveCtx
280280
}
281+
282+
#' Assigns a group ID to all the jobs started by this thread until the group ID is set to a
283+
#' different value or cleared.
284+
#'
285+
#' @param sc existing spark context
286+
#' @param groupid the ID to be assigned to job groups
287+
#' @param description description for the the job group ID
288+
#' @param interruptOnCancel flag to indicate if the job is interrupted on job cancellation
289+
#' @examples
290+
#'\dontrun{
291+
#' sc <- sparkR.init()
292+
#' setJobGroup(sc, "myJobGroup", "My job group description", TRUE)
293+
#'}
294+
295+
setJobGroup <- function(sc, groupId, description, interruptOnCancel) {
296+
callJMethod(sc, "setJobGroup", groupId, description, interruptOnCancel)
297+
}
298+
299+
#' Clear current job group ID and its description
300+
#'
301+
#' @param sc existing spark context
302+
#' @examples
303+
#'\dontrun{
304+
#' sc <- sparkR.init()
305+
#' clearJobGroup(sc)
306+
#'}
307+
308+
clearJobGroup <- function(sc) {
309+
callJMethod(sc, "clearJobGroup")
310+
}
311+
312+
#' Cancel active jobs for the specified group
313+
#'
314+
#' @param sc existing spark context
315+
#' @param groupId the ID of job group to be cancelled
316+
#' @examples
317+
#'\dontrun{
318+
#' sc <- sparkR.init()
319+
#' cancelJobGroup(sc, "myJobGroup")
320+
#'}
321+
322+
cancelJobGroup <- function(sc, groupId) {
323+
callJMethod(sc, "cancelJobGroup", groupId)
324+
}

R/pkg/inst/tests/test_context.R

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,10 @@ test_that("rdd GC across sparkR.stop", {
4848
count(rdd3)
4949
count(rdd4)
5050
})
51+
52+
test_that("job group functions can be called", {
53+
sc <- sparkR.init()
54+
setJobGroup(sc, "groupId", "job description", TRUE)
55+
cancelJobGroup(sc, "groupId")
56+
clearJobGroup(sc)
57+
})

core/src/main/scala/org/apache/spark/SerializableWritable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa
4141
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
4242
in.defaultReadObject()
4343
val ow = new ObjectWritable()
44-
ow.setConf(new Configuration())
44+
ow.setConf(new Configuration(false))
4545
ow.readFields(in)
4646
t = ow.get().asInstanceOf[T]
4747
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -974,7 +974,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
974974
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
975975
assertNotStopped()
976976
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
977-
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
977+
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
978978
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
979979
new HadoopRDD(
980980
this,

core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path
2828

2929
import org.apache.spark.mapred.SparkHadoopMapRedUtil
3030
import org.apache.spark.rdd.HadoopRDD
31+
import org.apache.spark.util.SerializableJobConf
3132

3233
/**
3334
* Internal helper class that saves an RDD using a Hadoop OutputFormat.
@@ -42,7 +43,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
4243
with Serializable {
4344

4445
private val now = new Date()
45-
private val conf = new SerializableWritable(jobConf)
46+
private val conf = new SerializableJobConf(jobConf)
4647

4748
private var jobID = 0
4849
private var splitID = 0

0 commit comments

Comments
 (0)