Skip to content

Commit cb7ce21

Browse files
committed
Merge remote-tracking branch 'upstream/master' into refactor-jdbc-filter
2 parents a7ef79e + be33a0c commit cb7ce21

File tree

185 files changed

+2004
-1529
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

185 files changed

+2004
-1529
lines changed

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,4 @@ org.apache.spark.sql.sources.DataSourceRegister
8585
org.apache.spark.scheduler.SparkHistoryListenerFactory
8686
.*parquet
8787
LZ4BlockInputStream.java
88+
spark-deps-.*

R/pkg/R/column.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ setMethod("%in%",
215215

216216
#' otherwise
217217
#'
218-
#' If values in the specified column are null, returns the value.
218+
#' If values in the specified column are null, returns the value.
219219
#' Can be used in conjunction with `when` to specify a default value for expressions.
220220
#'
221221
#' @rdname otherwise
@@ -225,7 +225,7 @@ setMethod("%in%",
225225
setMethod("otherwise",
226226
signature(x = "Column", value = "ANY"),
227227
function(x, value) {
228-
value <- ifelse(class(value) == "Column", value@jc, value)
228+
value <- if (class(value) == "Column") { value@jc } else { value }
229229
jc <- callJMethod(x@jc, "otherwise", value)
230230
column(jc)
231231
})

R/pkg/R/functions.R

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ setMethod("lit", signature("ANY"),
3737
function(x) {
3838
jc <- callJStatic("org.apache.spark.sql.functions",
3939
"lit",
40-
ifelse(class(x) == "Column", x@jc, x))
40+
if (class(x) == "Column") { x@jc } else { x })
4141
column(jc)
4242
})
4343

@@ -2262,7 +2262,7 @@ setMethod("unix_timestamp", signature(x = "Column", format = "character"),
22622262
setMethod("when", signature(condition = "Column", value = "ANY"),
22632263
function(condition, value) {
22642264
condition <- condition@jc
2265-
value <- ifelse(class(value) == "Column", value@jc, value)
2265+
value <- if (class(value) == "Column") { value@jc } else { value }
22662266
jc <- callJStatic("org.apache.spark.sql.functions", "when", condition, value)
22672267
column(jc)
22682268
})
@@ -2277,13 +2277,16 @@ setMethod("when", signature(condition = "Column", value = "ANY"),
22772277
#' @name ifelse
22782278
#' @seealso \link{when}
22792279
#' @export
2280-
#' @examples \dontrun{ifelse(df$a > 1 & df$b > 2, 0, 1)}
2280+
#' @examples \dontrun{
2281+
#' ifelse(df$a > 1 & df$b > 2, 0, 1)
2282+
#' ifelse(df$a > 1, df$a, 1)
2283+
#' }
22812284
setMethod("ifelse",
22822285
signature(test = "Column", yes = "ANY", no = "ANY"),
22832286
function(test, yes, no) {
22842287
test <- test@jc
2285-
yes <- ifelse(class(yes) == "Column", yes@jc, yes)
2286-
no <- ifelse(class(no) == "Column", no@jc, no)
2288+
yes <- if (class(yes) == "Column") { yes@jc } else { yes }
2289+
no <- if (class(no) == "Column") { no@jc } else { no }
22872290
jc <- callJMethod(callJStatic("org.apache.spark.sql.functions",
22882291
"when",
22892292
test, yes),

R/pkg/inst/tests/testthat/test_sparkSQL.R

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ mockLinesComplexType <-
6262
complexTypeJsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
6363
writeLines(mockLinesComplexType, complexTypeJsonPath)
6464

65+
test_that("calling sparkRSQL.init returns existing SQL context", {
66+
expect_equal(sparkRSQL.init(sc), sqlContext)
67+
})
68+
6569
test_that("infer types and check types", {
6670
expect_equal(infer_type(1L), "integer")
6771
expect_equal(infer_type(1.0), "double")
@@ -1120,6 +1124,14 @@ test_that("when(), otherwise() and ifelse() on a DataFrame", {
11201124
expect_equal(collect(select(df, ifelse(df$a > 1 & df$b > 2, 0, 1)))[, 1], c(1, 0))
11211125
})
11221126

1127+
test_that("when(), otherwise() and ifelse() with column on a DataFrame", {
1128+
l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
1129+
df <- createDataFrame(sqlContext, l)
1130+
expect_equal(collect(select(df, when(df$a > 1 & df$b > 2, lit(1))))[, 1], c(NA, 1))
1131+
expect_equal(collect(select(df, otherwise(when(df$a > 1, lit(1)), lit(0))))[, 1], c(0, 1))
1132+
expect_equal(collect(select(df, ifelse(df$a > 1 & df$b > 2, lit(0), lit(1))))[, 1], c(1, 0))
1133+
})
1134+
11231135
test_that("group by, agg functions", {
11241136
df <- read.json(sqlContext, jsonPath)
11251137
df1 <- agg(df, name = "max", age = "sum")

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,4 +225,13 @@ a.expandbutton {
225225
background-color: #49535a !important;
226226
color: white;
227227
cursor:pointer;
228-
}
228+
}
229+
230+
.table-head-clickable th a, .table-head-clickable th a:hover {
231+
/* Make the entire header clickable, not just the text label */
232+
display: block;
233+
width: 100%;
234+
/* Suppress the default link styling */
235+
color: #333;
236+
text-decoration: none;
237+
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -836,7 +836,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
836836
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
837837
assertNotStopped()
838838
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
839-
minPartitions).map(pair => pair._2.toString)
839+
minPartitions).map(pair => pair._2.toString).setName(path)
840840
}
841841

842842
/**
@@ -885,7 +885,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
885885
classOf[Text],
886886
classOf[Text],
887887
updateConf,
888-
minPartitions).setName(path).map(record => (record._1.toString, record._2.toString))
888+
minPartitions).map(record => (record._1.toString, record._2.toString)).setName(path)
889889
}
890890

891891
/**
@@ -2073,8 +2073,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
20732073
// its own local file system, which is incorrect because the checkpoint files
20742074
// are actually on the executor machines.
20752075
if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
2076-
logWarning("Checkpoint directory must be non-local " +
2077-
"if Spark is running on a cluster: " + directory)
2076+
logWarning("Spark is not running in local mode, therefore the checkpoint directory " +
2077+
s"must not be on the local filesystem. Directory '$directory' " +
2078+
"appears to be on the local filesystem.")
20782079
}
20792080

20802081
checkpointDir = Option(directory).map { dir =>

core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,12 @@ import org.apache.spark.SparkConf
2424
import org.apache.spark.annotation.DeveloperApi
2525

2626
/**
27-
* :: DeveloperApi ::
2827
* An interface for all the broadcast implementations in Spark (to allow
2928
* multiple broadcast implementations). SparkContext uses a user-specified
3029
* BroadcastFactory implementation to instantiate a particular broadcast for the
3130
* entire Spark job.
3231
*/
33-
@DeveloperApi
34-
trait BroadcastFactory {
32+
private[spark] trait BroadcastFactory {
3533

3634
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit
3735

core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicLong
2121

2222
import scala.reflect.ClassTag
2323

24-
import org.apache.spark._
25-
import org.apache.spark.util.Utils
24+
import org.apache.spark.{Logging, SparkConf, SecurityManager}
25+
2626

2727
private[spark] class BroadcastManager(
2828
val isDriver: Boolean,
@@ -39,15 +39,8 @@ private[spark] class BroadcastManager(
3939
private def initialize() {
4040
synchronized {
4141
if (!initialized) {
42-
val broadcastFactoryClass =
43-
conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")
44-
45-
broadcastFactory =
46-
Utils.classForName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
47-
48-
// Initialize appropriate BroadcastFactory and BroadcastObject
42+
broadcastFactory = new TorrentBroadcastFactory
4943
broadcastFactory.initialize(isDriver, conf, securityManager)
50-
5144
initialized = true
5245
}
5346
}

0 commit comments

Comments
 (0)