Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
a367840
[SPARK-10859] [SQL] fix stats of StringType in columnar cache
Sep 28, 2015
9b3014b
[SPARK-10833] [BUILD] Inline, organize BSD/MIT licenses in LICENSE
srowen Sep 29, 2015
d544932
[SPARK-10825] [CORE] [TESTS] Fix race conditions in StandaloneDynamic…
zsxwing Sep 29, 2015
3b23873
[SPARK-10871] include number of executor failures in error msg
ryan-williams Sep 29, 2015
cbc6aec
[SPARK-10058] [CORE] [TESTS] Fix the flaky tests in HeartbeatReceiver…
zsxwing Oct 1, 2015
8836ac3
[SPARK-10904] [SPARKR] Fix to support `select(df, c("col1", "col2"))`
felixcheung Oct 4, 2015
d323e5e
[SPARK-10889] [STREAMING] Bump KCL to add MillisBehindLatest metric
akatz Oct 4, 2015
c8392cd
[SPARK-10934] [SQL] handle hashCode of unsafe array correctly
cloud-fan Oct 6, 2015
6847be6
[SPARK-10901] [YARN] spark.yarn.user.classpath.first doesn't work
Oct 6, 2015
84f510c
[SPARK-10885] [STREAMING] Display the failed output op in Streaming UI
zsxwing Oct 6, 2015
b6a0933
[SPARK-10952] Only add hive to classpath if HIVE_HOME is set.
kevincox Oct 7, 2015
57978ae
[SPARK-10980] [SQL] fix bug in create Decimal
Oct 7, 2015
ba601b1
[SPARK-10914] UnsafeRow serialization breaks when two machines have d…
rxin Oct 9, 2015
3df7500
[SPARK-10955] [STREAMING] Add a warning if dynamic allocation for Str…
harishreedharan Oct 9, 2015
a3b4b93
Merge branch 'branch-1.5' of github.com:apache/spark into csd-1.5
markhamstra Oct 9, 2015
f95129c
[SPARK-10959] [PYSPARK] StreamingLogisticRegressionWithSGD does not t…
BryanCutler Oct 9, 2015
9a625f3
Merge branch 'branch-1.5' of github.com:apache/spark into csd-1.5
markhamstra Oct 9, 2015
5a10e10
[SPARK-10389] [SQL] support order by non-attribute grouping expressio…
cloud-fan Sep 2, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
699 changes: 20 additions & 679 deletions LICENSE

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -572,3 +572,38 @@ Copyright 2009-2013 The Apache Software Foundation

Apache Avro IPC
Copyright 2009-2013 The Apache Software Foundation


Vis.js
Copyright 2010-2015 Almende B.V.

Vis.js is dual licensed under both

* The Apache 2.0 License
http://www.apache.org/licenses/LICENSE-2.0

and

* The MIT License
http://opensource.org/licenses/MIT

Vis.js may be distributed under either license.


Vis.js uses and redistributes the following third-party libraries:

- component-emitter
https://github.com/component/emitter
The MIT License

- hammer.js
http://hammerjs.github.io/
The MIT License

- moment.js
http://momentjs.com/
The MIT License

- keycharm
https://github.com/AlexDM0/keycharm
The MIT License
14 changes: 11 additions & 3 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1044,12 +1044,20 @@ setMethod("subset", signature(x = "DataFrame"),
#' select(df, c("col1", "col2"))
#' select(df, list(df$name, df$age + 1))
#' # Similar to R data frames columns can also be selected using `$`
#' df$age
#' df[,df$age]
#' }
setMethod("select", signature(x = "DataFrame", col = "character"),
function(x, col, ...) {
sdf <- callJMethod(x@sdf, "select", col, toSeq(...))
dataFrame(sdf)
if (length(col) > 1) {
if (length(list(...)) > 0) {
stop("To select multiple columns, use a character vector or list for col")
}

select(x, as.list(col))
} else {
sdf <- callJMethod(x@sdf, "select", col, toSeq(...))
dataFrame(sdf)
}
})

#' @rdname select
Expand Down
7 changes: 7 additions & 0 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,13 @@ test_that("select with column", {
expect_equal(columns(df3), c("x"))
expect_equal(count(df3), 3)
expect_equal(collect(select(df3, "x"))[[1, 1]], "x")

df4 <- select(df, c("name", "age"))
expect_equal(columns(df4), c("name", "age"))
expect_equal(count(df4), 3)

expect_error(select(df, c("name", "age"), "name"),
"To select multiple columns, use a character vector or list for col")
})

test_that("subsetting", {
Expand Down
10 changes: 6 additions & 4 deletions build/sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
# When creating new tests for Spark SQL Hive, the HADOOP_CLASSPATH must contain the hive jars so
# that we can run Hive to generate the golden answer. This is not required for normal development
# or testing.
for i in "$HIVE_HOME"/lib/*
do HADOOP_CLASSPATH="$HADOOP_CLASSPATH:$i"
done
export HADOOP_CLASSPATH
if [ -n "$HIVE_HOME" ]; then
for i in "$HIVE_HOME"/lib/*
do HADOOP_CLASSPATH="$HADOOP_CLASSPATH:$i"
done
export HADOOP_CLASSPATH
fi

realpath () {
(
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/org/apache/spark/util/collection/TimSort.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,24 @@
* limitations under the License.
*/

/*
* Based on TimSort.java from the Android Open Source Project
*
* Copyright (C) 2008 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.collection;

import java.util.Comparator;
Expand Down
25 changes: 23 additions & 2 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark
import java.util.concurrent.{ScheduledFuture, TimeUnit}

import scala.collection.mutable
import scala.concurrent.Future

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext}
Expand Down Expand Up @@ -147,11 +148,31 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
}
}

/**
* Send ExecutorRegistered to the event loop to add a new executor. Only for test.
*
* @return if HeartbeatReceiver is stopped, return None. Otherwise, return a Some(Future) that
* indicate if this operation is successful.
*/
def addExecutor(executorId: String): Option[Future[Boolean]] = {
Option(self).map(_.ask[Boolean](ExecutorRegistered(executorId)))
}

/**
* If the heartbeat receiver is not stopped, notify it of executor registrations.
*/
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
Option(self).foreach(_.ask[Boolean](ExecutorRegistered(executorAdded.executorId)))
addExecutor(executorAdded.executorId)
}

/**
* Send ExecutorRemoved to the event loop to remove a executor. Only for test.
*
* @return if HeartbeatReceiver is stopped, return None. Otherwise, return a Some(Future) that
* indicate if this operation is successful.
*/
def removeExecutor(executorId: String): Option[Future[Boolean]] = {
Option(self).map(_.ask[Boolean](ExecutorRemoved(executorId)))
}

/**
Expand All @@ -165,7 +186,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
* and expire it with loud error messages.
*/
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
Option(self).foreach(_.ask[Boolean](ExecutorRemoved(executorRemoved.executorId)))
removeExecutor(executorRemoved.executorId)
}

private def expireDeadHosts(): Unit = {
Expand Down
51 changes: 37 additions & 14 deletions core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package org.apache.spark

import java.util.concurrent.{ExecutorService, TimeUnit}

import scala.collection.Map
import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps

import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
Expand Down Expand Up @@ -96,18 +99,18 @@ class HeartbeatReceiverSuite

test("normal heartbeat") {
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
triggerHeartbeat(executorId1, executorShouldReregister = false)
triggerHeartbeat(executorId2, executorShouldReregister = false)
val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen())
val trackedExecutors = getTrackedExecutors
assert(trackedExecutors.size === 2)
assert(trackedExecutors.contains(executorId1))
assert(trackedExecutors.contains(executorId2))
}

test("reregister if scheduler is not ready yet") {
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
addExecutorAndVerify(executorId1)
// Task scheduler is not set yet in HeartbeatReceiver, so executors should reregister
triggerHeartbeat(executorId1, executorShouldReregister = true)
}
Expand All @@ -116,20 +119,20 @@ class HeartbeatReceiverSuite
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
// Received heartbeat from unknown executor, so we ask it to re-register
triggerHeartbeat(executorId1, executorShouldReregister = true)
assert(heartbeatReceiver.invokePrivate(_executorLastSeen()).isEmpty)
assert(getTrackedExecutors.isEmpty)
}

test("reregister if heartbeat from removed executor") {
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
// Remove the second executor but not the first
heartbeatReceiver.onExecutorRemoved(SparkListenerExecutorRemoved(0, executorId2, "bad boy"))
removeExecutorAndVerify(executorId2)
// Now trigger the heartbeats
// A heartbeat from the second executor should require reregistering
triggerHeartbeat(executorId1, executorShouldReregister = false)
triggerHeartbeat(executorId2, executorShouldReregister = true)
val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen())
val trackedExecutors = getTrackedExecutors
assert(trackedExecutors.size === 1)
assert(trackedExecutors.contains(executorId1))
assert(!trackedExecutors.contains(executorId2))
Expand All @@ -138,8 +141,8 @@ class HeartbeatReceiverSuite
test("expire dead hosts") {
val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs())
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
triggerHeartbeat(executorId1, executorShouldReregister = false)
triggerHeartbeat(executorId2, executorShouldReregister = false)
// Advance the clock and only trigger a heartbeat for the first executor
Expand All @@ -149,7 +152,7 @@ class HeartbeatReceiverSuite
heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts)
// Only the second executor should be expired as a dead host
verify(scheduler).executorLost(Matchers.eq(executorId2), any())
val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen())
val trackedExecutors = getTrackedExecutors
assert(trackedExecutors.size === 1)
assert(trackedExecutors.contains(executorId1))
assert(!trackedExecutors.contains(executorId2))
Expand All @@ -175,8 +178,8 @@ class HeartbeatReceiverSuite
fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type](
RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty))
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
triggerHeartbeat(executorId1, executorShouldReregister = false)
triggerHeartbeat(executorId2, executorShouldReregister = false)

Expand Down Expand Up @@ -222,6 +225,26 @@ class HeartbeatReceiverSuite
}
}

private def addExecutorAndVerify(executorId: String): Unit = {
assert(
heartbeatReceiver.addExecutor(executorId).map { f =>
Await.result(f, 10.seconds)
} === Some(true))
}

private def removeExecutorAndVerify(executorId: String): Unit = {
assert(
heartbeatReceiver.removeExecutor(executorId).map { f =>
Await.result(f, 10.seconds)
} === Some(true))
}

private def getTrackedExecutors: Map[String, Long] = {
// We may receive undesired SparkListenerExecutorAdded from LocalBackend, so exclude it from
// the map. See SPARK-10800.
heartbeatReceiver.invokePrivate(_executorLastSeen()).
filterKeys(_ != SparkContext.DRIVER_IDENTIFIER)
}
}

// TODO: use these classes to add end-to-end tests for dynamic allocation!
Expand Down
Loading