Skip to content

Commit acd1f1e

Browse files
committed
Merge remote-tracking branch 'upstream/master' into interval_add_subtract
Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala
2 parents 5abae28 + 0115516 commit acd1f1e

File tree

85 files changed

+2458
-416
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+2458
-416
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ exportMethods("abs",
7777
"atan",
7878
"atan2",
7979
"avg",
80+
"between",
8081
"cast",
8182
"cbrt",
8283
"ceiling",

R/pkg/R/column.R

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,23 @@ setMethod("substr", signature(x = "Column"),
187187
column(jc)
188188
})
189189

190+
#' between
191+
#'
192+
#' Test if the column is between the lower bound and upper bound, inclusive.
193+
#'
194+
#' @rdname column
195+
#'
196+
#' @param bounds lower and upper bounds
197+
setMethod("between", signature(x = "Column"),
198+
function(x, bounds) {
199+
if (is.vector(bounds) && length(bounds) == 2) {
200+
jc <- callJMethod(x@jc, "between", bounds[1], bounds[2])
201+
column(jc)
202+
} else {
203+
stop("bounds should be a vector of lower and upper bounds")
204+
}
205+
})
206+
190207
#' Casts the column to a different data type.
191208
#'
192209
#' @rdname column

R/pkg/R/deserialize.R

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
# Int -> integer
2424
# String -> character
2525
# Boolean -> logical
26+
# Float -> double
2627
# Double -> double
2728
# Long -> double
2829
# Array[Byte] -> raw

R/pkg/R/generics.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,10 @@ setGeneric("asc", function(x) { standardGeneric("asc") })
567567
#' @export
568568
setGeneric("avg", function(x, ...) { standardGeneric("avg") })
569569

570+
#' @rdname column
571+
#' @export
572+
setGeneric("between", function(x, bounds) { standardGeneric("between") })
573+
570574
#' @rdname column
571575
#' @export
572576
setGeneric("cast", function(x, dataType) { standardGeneric("cast") })

R/pkg/R/schema.R

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ structField.character <- function(x, type, nullable = TRUE) {
123123
}
124124
options <- c("byte",
125125
"integer",
126+
"float",
126127
"double",
127128
"numeric",
128129
"character",

R/pkg/inst/tests/test_sparkSQL.R

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,32 @@ test_that("create DataFrame from RDD", {
108108
expect_equal(count(df), 10)
109109
expect_equal(columns(df), c("a", "b"))
110110
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
111+
112+
df <- jsonFile(sqlContext, jsonPathNa)
113+
hiveCtx <- tryCatch({
114+
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
115+
}, error = function(err) {
116+
skip("Hive is not build with SparkSQL, skipped")
117+
})
118+
sql(hiveCtx, "CREATE TABLE people (name string, age double, height float)")
119+
insertInto(df, "people")
120+
expect_equal(sql(hiveCtx, "SELECT age from people WHERE name = 'Bob'"), c(16))
121+
expect_equal(sql(hiveCtx, "SELECT height from people WHERE name ='Bob'"), c(176.5))
122+
123+
schema <- structType(structField("name", "string"), structField("age", "integer"),
124+
structField("height", "float"))
125+
df2 <- createDataFrame(sqlContext, df.toRDD, schema)
126+
expect_equal(columns(df2), c("name", "age", "height"))
127+
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "int"), c("height", "float")))
128+
expect_equal(collect(where(df2, df2$name == "Bob")), c("Bob", 16, 176.5))
129+
130+
localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18), height=c(164.10, 181.4, 173.7))
131+
df <- createDataFrame(sqlContext, localDF, schema)
132+
expect_is(df, "DataFrame")
133+
expect_equal(count(df), 3)
134+
expect_equal(columns(df), c("name", "age", "height"))
135+
expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), c("height", "float")))
136+
expect_equal(collect(where(df, df$name == "John")), c("John", 19, 164.10))
111137
})
112138

113139
test_that("convert NAs to null type in DataFrames", {
@@ -612,6 +638,18 @@ test_that("column functions", {
612638
c7 <- floor(c) + log(c) + log10(c) + log1p(c) + rint(c)
613639
c8 <- sign(c) + sin(c) + sinh(c) + tan(c) + tanh(c)
614640
c9 <- toDegrees(c) + toRadians(c)
641+
642+
df <- jsonFile(sqlContext, jsonPath)
643+
df2 <- select(df, between(df$age, c(20, 30)), between(df$age, c(10, 20)))
644+
expect_equal(collect(df2)[[2, 1]], TRUE)
645+
expect_equal(collect(df2)[[2, 2]], FALSE)
646+
expect_equal(collect(df2)[[3, 1]], FALSE)
647+
expect_equal(collect(df2)[[3, 2]], TRUE)
648+
649+
df3 <- select(df, between(df$name, c("Apache", "Spark")))
650+
expect_equal(collect(df3)[[1, 1]], TRUE)
651+
expect_equal(collect(df3)[[2, 1]], FALSE)
652+
expect_equal(collect(df3)[[3, 1]], TRUE)
615653
})
616654

617655
test_that("column binary mathfunctions", {

core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,11 @@
372372
<artifactId>junit-interface</artifactId>
373373
<scope>test</scope>
374374
</dependency>
375+
<dependency>
376+
<groupId>org.apache.curator</groupId>
377+
<artifactId>curator-test</artifactId>
378+
<scope>test</scope>
379+
</dependency>
375380
<dependency>
376381
<groupId>net.razorvine</groupId>
377382
<artifactId>pyrolite</artifactId>

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark
2020
import java.util.concurrent.TimeUnit
2121

2222
import scala.collection.mutable
23+
import scala.util.control.ControlThrowable
2324

2425
import com.codahale.metrics.{Gauge, MetricRegistry}
2526

@@ -211,7 +212,16 @@ private[spark] class ExecutorAllocationManager(
211212
listenerBus.addListener(listener)
212213

213214
val scheduleTask = new Runnable() {
214-
override def run(): Unit = Utils.logUncaughtExceptions(schedule())
215+
override def run(): Unit = {
216+
try {
217+
schedule()
218+
} catch {
219+
case ct: ControlThrowable =>
220+
throw ct
221+
case t: Throwable =>
222+
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
223+
}
224+
}
215225
}
216226
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
217227
}

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ object Partitioner {
7676
* produce an unexpected or incorrect result.
7777
*/
7878
class HashPartitioner(partitions: Int) extends Partitioner {
79+
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
80+
7981
def numPartitions: Int = partitions
8082

8183
def getPartition(key: Any): Int = key match {

core/src/main/scala/org/apache/spark/TaskContext.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,20 @@ object TaskContext {
3232
*/
3333
def get(): TaskContext = taskContext.get
3434

35-
private val taskContext: ThreadLocal[TaskContext] = new ThreadLocal[TaskContext]
35+
/**
36+
* Returns the partition id of currently active TaskContext. It will return 0
37+
* if there is no active TaskContext for cases like local execution.
38+
*/
39+
def getPartitionId(): Int = {
40+
val tc = taskContext.get()
41+
if (tc == null) {
42+
0
43+
} else {
44+
tc.partitionId()
45+
}
46+
}
47+
48+
private[this] val taskContext: ThreadLocal[TaskContext] = new ThreadLocal[TaskContext]
3649

3750
// Note: protected[spark] instead of private[spark] to prevent the following two from
3851
// showing up in JavaDoc.

0 commit comments

Comments
 (0)