Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,9 @@ setMethod("collect",
vec <- do.call(c, col)
stopifnot(class(vec) != "list")
class(vec) <- PRIMITIVE_TYPES[[colType]]
if (is.character(vec) && stringsAsFactors) {
vec <- as.factor(vec)
}
df[[colIndex]] <- vec
} else {
df[[colIndex]] <- col
Expand Down
6 changes: 4 additions & 2 deletions R/pkg/inst/tests/testthat/test_basic.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
context("basic tests for CRAN")

test_that("create DataFrame from list or data.frame", {
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE,
sparkConfig = sparkRTestConfig)

i <- 4
df <- createDataFrame(data.frame(dummy = 1:i))
Expand Down Expand Up @@ -49,7 +50,8 @@ test_that("create DataFrame from list or data.frame", {
})

test_that("spark.glm and predict", {
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE,
sparkConfig = sparkRTestConfig)

training <- suppressWarnings(createDataFrame(iris))
# gaussian family
Expand Down
6 changes: 6 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,12 @@ test_that("create DataFrame with different data types", {
expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE))
})

test_that("SPARK-17902: collect() with stringsAsFactors enabled", {
df <- suppressWarnings(collect(createDataFrame(iris), stringsAsFactors = TRUE))
expect_equal(class(iris$Species), class(df$Species))
expect_equal(iris$Species, df$Species)
})

test_that("SPARK-17811: can create DataFrame containing NA as date and time", {
df <- data.frame(
id = 1:2,
Expand Down
9 changes: 9 additions & 0 deletions R/pkg/tests/run-all.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,17 @@ invisible(lapply(sparkRWhitelistSQLDirs,
sparkRFilesBefore <- list.files(path = sparkRDir, all.files = TRUE)

sparkRTestMaster <- "local[1]"
sparkRTestConfig <- list()
if (identical(Sys.getenv("NOT_CRAN"), "true")) {
sparkRTestMaster <- ""
} else {
# Disable hsperfdata on CRAN
old_java_opt <- Sys.getenv("_JAVA_OPTIONS")
Sys.setenv("_JAVA_OPTIONS" = paste("-XX:-UsePerfData", old_java_opt))
tmpDir <- tempdir()
tmpArg <- paste0("-Djava.io.tmpdir=", tmpDir)
sparkRTestConfig <- list(spark.driver.extraJavaOptions = tmpArg,
spark.executor.extraJavaOptions = tmpArg)
}

test_package("SparkR")
Expand Down
8 changes: 7 additions & 1 deletion R/pkg/vignettes/sparkr-vignettes.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ opts_hooks$set(eval = function(options) {
}
options
})
r_tmp_dir <- tempdir()
tmp_arg <- paste("-Djava.io.tmpdir=", r_tmp_dir, sep = "")
sparkSessionConfig <- list(spark.driver.extraJavaOptions = tmp_arg,
spark.executor.extraJavaOptions = tmp_arg)
old_java_opt <- Sys.getenv("_JAVA_OPTIONS")
Sys.setenv("_JAVA_OPTIONS" = paste("-XX:-UsePerfData", old_java_opt, sep = " "))
```

## Overview
Expand All @@ -57,7 +63,7 @@ We use default settings in which it runs in local mode. It auto downloads Spark

```{r, include=FALSE}
install.spark()
sparkR.session(master = "local[1]")
sparkR.session(master = "local[1]", sparkConfig = sparkSessionConfig, enableHiveSupport = FALSE)
```
```{r, eval=FALSE}
sparkR.session()
Expand Down
16 changes: 13 additions & 3 deletions core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.storage

import java.util.UUID

import org.apache.spark.SparkException
import org.apache.spark.annotation.DeveloperApi

/**
Expand Down Expand Up @@ -100,6 +101,10 @@ private[spark] case class TestBlockId(id: String) extends BlockId {
override def name: String = "test_" + id
}

@DeveloperApi
class UnrecognizedBlockId(name: String)
extends SparkException(s"Failed to parse $name into a block ID")

@DeveloperApi
object BlockId {
val RDD = "rdd_([0-9]+)_([0-9]+)".r
Expand All @@ -109,10 +114,11 @@ object BlockId {
val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
val TASKRESULT = "taskresult_([0-9]+)".r
val STREAM = "input-([0-9]+)-([0-9]+)".r
val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
val TEST = "test_(.*)".r

/** Converts a BlockId "name" String back into a BlockId. */
def apply(id: String): BlockId = id match {
def apply(name: String): BlockId = name match {
case RDD(rddId, splitIndex) =>
RDDBlockId(rddId.toInt, splitIndex.toInt)
case SHUFFLE(shuffleId, mapId, reduceId) =>
Expand All @@ -127,9 +133,13 @@ object BlockId {
TaskResultBlockId(taskId.toLong)
case STREAM(streamId, uniqueId) =>
StreamBlockId(streamId.toInt, uniqueId.toLong)
case TEMP_LOCAL(uuid) =>
TempLocalBlockId(UUID.fromString(uuid))
case TEMP_SHUFFLE(uuid) =>
TempShuffleBlockId(UUID.fromString(uuid))
case TEST(value) =>
TestBlockId(value)
case _ =>
throw new IllegalStateException("Unrecognized BlockId: " + id)
throw new UnrecognizedBlockId(name)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,16 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea

/** List all the blocks currently stored on disk by the disk manager. */
def getAllBlocks(): Seq[BlockId] = {
getAllFiles().map(f => BlockId(f.getName))
getAllFiles().flatMap { f =>
try {
Some(BlockId(f.getName))
} catch {
case _: UnrecognizedBlockId =>
// Skip files which do not correspond to blocks, for example temporary
// files created by [[SortShuffleWriter]].
None
}
}
}

/** Produces a unique block id and File suitable for storing local intermediate results. */
Expand Down
73 changes: 61 additions & 12 deletions core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,54 @@ private[spark] object ClosureCleaner extends Logging {
(seen - obj.getClass).toList
}

/** Initializes the accessed fields for outer classes and their super classes. */
private def initAccessedFields(
accessedFields: Map[Class[_], Set[String]],
outerClasses: Seq[Class[_]]): Unit = {
for (cls <- outerClasses) {
var currentClass = cls
assert(currentClass != null, "The outer class can't be null.")

while (currentClass != null) {
accessedFields(currentClass) = Set.empty[String]
currentClass = currentClass.getSuperclass()
}
}
}

/** Sets accessed fields for given class in clone object based on given object. */
private def setAccessedFields(
outerClass: Class[_],
clone: AnyRef,
obj: AnyRef,
accessedFields: Map[Class[_], Set[String]]): Unit = {
for (fieldName <- accessedFields(outerClass)) {
val field = outerClass.getDeclaredField(fieldName)
field.setAccessible(true)
val value = field.get(obj)
field.set(clone, value)
}
}

/** Clones a given object and sets accessed fields in cloned object. */
private def cloneAndSetFields(
parent: AnyRef,
obj: AnyRef,
outerClass: Class[_],
accessedFields: Map[Class[_], Set[String]]): AnyRef = {
val clone = instantiateClass(outerClass, parent)

var currentClass = outerClass
assert(currentClass != null, "The outer class can't be null.")

while (currentClass != null) {
setAccessedFields(currentClass, clone, obj, accessedFields)
currentClass = currentClass.getSuperclass()
}

clone
}

/**
* Clean the given closure in place.
*
Expand Down Expand Up @@ -200,9 +248,8 @@ private[spark] object ClosureCleaner extends Logging {
logDebug(s" + populating accessed fields because this is the starting closure")
// Initialize accessed fields with the outer classes first
// This step is needed to associate the fields to the correct classes later
for (cls <- outerClasses) {
accessedFields(cls) = Set[String]()
}
initAccessedFields(accessedFields, outerClasses)

// Populate accessed fields by visiting all fields and methods accessed by this and
// all of its inner closures. If transitive cleaning is enabled, this may recursively
// visits methods that belong to other classes in search of transitively referenced fields.
Expand Down Expand Up @@ -248,13 +295,8 @@ private[spark] object ClosureCleaner extends Logging {
// required fields from the original object. We need the parent here because the Java
// language specification requires the first constructor parameter of any closure to be
// its enclosing object.
val clone = instantiateClass(cls, parent)
for (fieldName <- accessedFields(cls)) {
val field = cls.getDeclaredField(fieldName)
field.setAccessible(true)
val value = field.get(obj)
field.set(clone, value)
}
val clone = cloneAndSetFields(parent, obj, cls, accessedFields)

// If transitive cleaning is enabled, we recursively clean any enclosing closure using
// the already populated accessed fields map of the starting closure
if (cleanTransitively && isClosure(clone.getClass)) {
Expand Down Expand Up @@ -393,8 +435,15 @@ private[util] class FieldAccessFinder(
if (!visitedMethods.contains(m)) {
// Keep track of visited methods to avoid potential infinite cycles
visitedMethods += m
ClosureCleaner.getClassReader(cl).accept(
new FieldAccessFinder(fields, findTransitively, Some(m), visitedMethods), 0)

var currentClass = cl
assert(currentClass != null, "The outer class can't be null.")

while (currentClass != null) {
ClosureCleaner.getClassReader(currentClass).accept(
new FieldAccessFinder(fields, findTransitively, Some(m), visitedMethods), 0)
currentClass = currentClass.getSuperclass()
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,8 @@ class BlockIdSuite extends SparkFunSuite {
}

test("test-bad-deserialization") {
try {
// Try to deserialize an invalid block id.
intercept[UnrecognizedBlockId] {
BlockId("myblock")
fail()
} catch {
case e: IllegalStateException => // OK
case _: Throwable => fail()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.storage

import java.io.{File, FileWriter}
import java.util.UUID

import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

Expand Down Expand Up @@ -79,6 +80,12 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
assert(diskBlockManager.getAllBlocks.toSet === ids.toSet)
}

test("SPARK-22227: non-block files are skipped") {
val file = diskBlockManager.getFile("unmanaged_file")
writeToFile(file, 10)
assert(diskBlockManager.getAllBlocks().isEmpty)
}

def writeToFile(file: File, numBytes: Int) {
val writer = new FileWriter(file, true)
for (i <- 0 until numBytes) writer.write(i)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,63 @@ class ClosureCleanerSuite extends SparkFunSuite {
test("createNullValue") {
new TestCreateNullValue().run()
}

test("SPARK-22328: ClosureCleaner misses referenced superclass fields: case 1") {
val concreteObject = new TestAbstractClass {
val n2 = 222
val s2 = "bbb"
val d2 = 2.0d

def run(): Seq[(Int, Int, String, String, Double, Double)] = {
withSpark(new SparkContext("local", "test")) { sc =>
val rdd = sc.parallelize(1 to 1)
body(rdd)
}
}

def body(rdd: RDD[Int]): Seq[(Int, Int, String, String, Double, Double)] = rdd.map { _ =>
(n1, n2, s1, s2, d1, d2)
}.collect()
}
assert(concreteObject.run() === Seq((111, 222, "aaa", "bbb", 1.0d, 2.0d)))
}

test("SPARK-22328: ClosureCleaner misses referenced superclass fields: case 2") {
val concreteObject = new TestAbstractClass2 {
val n2 = 222
val s2 = "bbb"
val d2 = 2.0d
def getData: Int => (Int, Int, String, String, Double, Double) = _ => (n1, n2, s1, s2, d1, d2)
}
withSpark(new SparkContext("local", "test")) { sc =>
val rdd = sc.parallelize(1 to 1).map(concreteObject.getData)
assert(rdd.collect() === Seq((111, 222, "aaa", "bbb", 1.0d, 2.0d)))
}
}

test("SPARK-22328: multiple outer classes have the same parent class") {
val concreteObject = new TestAbstractClass2 {

val innerObject = new TestAbstractClass2 {
override val n1 = 222
override val s1 = "bbb"
}

val innerObject2 = new TestAbstractClass2 {
override val n1 = 444
val n3 = 333
val s3 = "ccc"
val d3 = 3.0d

def getData: Int => (Int, Int, String, String, Double, Double, Int, String) =
_ => (n1, n3, s1, s3, d1, d3, innerObject.n1, innerObject.s1)
}
}
withSpark(new SparkContext("local", "test")) { sc =>
val rdd = sc.parallelize(1 to 1).map(concreteObject.innerObject2.getData)
assert(rdd.collect() === Seq((444, 333, "aaa", "ccc", 1.0d, 3.0d, 222, "bbb")))
}
}
}

// A non-serializable class we create in closures to make sure that we aren't
Expand Down Expand Up @@ -377,3 +434,18 @@ class TestCreateNullValue {
nestedClosure()
}
}

abstract class TestAbstractClass extends Serializable {
val n1 = 111
val s1 = "aaa"
protected val d1 = 1.0d

def run(): Seq[(Int, Int, String, String, Double, Double)]
def body(rdd: RDD[Int]): Seq[(Int, Int, String, String, Double, Double)]
}

abstract class TestAbstractClass2 extends Serializable {
val n1 = 111
val s1 = "aaa"
protected val d1 = 1.0d
}
Loading