Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d19bccd
[SPARK-10398] [DOCS] Migrate Spark download page to use new lua mirro…
srowen Sep 1, 2015
30efa96
[SPARK-10392] [SQL] Pyspark - Wrong DateType support on JDBC connection
0x0FFF Sep 1, 2015
2fce5d8
[SPARK-10422] [SQL] String column in InMemoryColumnarCache needs to o…
yhuai Sep 3, 2015
b846a9d
[SPARK-10379] preserve first page in UnsafeShuffleExternalSorter
Sep 3, 2015
94404ee
[SPARK-10411] [SQL] Move visualization above explain output and hide …
zsxwing Sep 3, 2015
f01a967
[SPARK-10332] [CORE] Fix yarn spark executor validation
holdenk Sep 3, 2015
f945b64
[SPARK-9869] [STREAMING] Wait for all event notifications before asse…
Sep 3, 2015
4d63335
[SPARK-10431] [CORE] Fix intermittent test failure. Wait for event qu…
Sep 3, 2015
09e08db
[SPARK-10454] [SPARK CORE] wait for empty event queue
Sep 4, 2015
dc39658
[SPARK-10311] [STREAMING] Reload appId and attemptId when app starts …
XuTingjun Sep 4, 2015
cfc5f6f
[SPARK-10402] [DOCS] [ML] Add defaults to the scaladoc for params in ml/
holdenk Sep 5, 2015
ec750a7
[SPARK-10440] [STREAMING] [DOCS] Update python API stuff in the progr…
tdas Sep 5, 2015
640000b
[SPARK-10434] [SQL] Fixes Parquet schema of arrays that may contain null
liancheng Sep 5, 2015
37c5edf
[DOC] Added R to the list of languages with "high-level API" support …
Sep 8, 2015
88a07d8
Docs small fixes
jaceklaskowski Sep 8, 2015
34d417e
[SPARK-10470] [ML] ml.IsotonicRegressionModel.copy should set parent
yanboliang Sep 8, 2015
7fd4674
[SPARK-10441] [SQL] [BRANCH-1.5] Save data correctly to json.
yhuai Sep 8, 2015
63c72b9
[SPARK-10492] [STREAMING] [DOCUMENTATION] Update Streaming documentat…
tdas Sep 8, 2015
fca16c5
[SPARK-10301] [SPARK-10428] [SQL] [BRANCH-1.5] Fixes schema merging f…
liancheng Sep 9, 2015
d4b00c5
[SPARK-10071] [STREAMING] Output a warning when writing QueueInputDSt…
zsxwing Sep 9, 2015
a150625
[SPARK-7736] [CORE] [YARN] Make pyspark fail YARN app on failure.
Aug 17, 2015
d6cd356
[SPARK-7736] [CORE] Fix a race introduced in PythonRunner.
Aug 18, 2015
5e06d41
[MINOR] [MLLIB] [ML] [DOC] fixed typo: label for negative result shou…
sparadiso Sep 10, 2015
bc70043
[SPARK-10466] [SQL] UnsafeRow SerDe exception with data spill
chenghao-intel Sep 10, 2015
bff05aa
[SPARK-10469] [DOC] Try and document the three options
holdenk Sep 10, 2015
8cf1619
[SPARK-6350] [MESOS] Fine-grained mode scheduler respects mesosExecut…
dragos Sep 10, 2015
89d351b
Revert "[SPARK-6350] [MESOS] Fine-grained mode scheduler respects mes…
Sep 10, 2015
4af9256
[SPARK-10556] Remove explicit Scala version for sbt project build files
ahirreddy Sep 11, 2015
295281f
[SPARK-10540] [SQL] Ignore HadoopFsRelationTest's "test all data type…
yhuai Sep 11, 2015
7f10bd6
[SPARK-9924] [WEB UI] Don't schedule checkForLogs while some of them …
Sep 11, 2015
fcb2438
[SPARK-10564] ThreadingSuite: assertion failures in threads don't fai…
Sep 11, 2015
5bf403c
[SPARK-10566] [CORE] SnappyCompressionCodec init exception handling m…
dimfeld Sep 12, 2015
f8909a6
[SPARK-10554] [CORE] Fix NPE with ShutdownHook
Sep 12, 2015
4586f21
[SPARK-6350] [MESOS] [BACKPORT] Fine-grained mode scheduler respects
dragos Sep 13, 2015
3789ac8
Deprecates SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC
liancheng Sep 1, 2015
ab61a59
Fixes compilation error
liancheng Sep 16, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Apache Spark

Spark is a fast and general cluster computing system for Big Data. It provides
high-level APIs in Scala, Java, and Python, and an optimized engine that
high-level APIs in Scala, Java, Python, and R, and an optimized engine that
supports general computation graphs for data analysis. It also supports a
rich set of higher-level tools including Spark SQL for SQL and DataFrames,
MLlib for machine learning, GraphX for graph processing,
Expand Down Expand Up @@ -94,5 +94,5 @@ distribution.

## Configuration

Please refer to the [Configuration guide](http://spark.apache.org/docs/latest/configuration.html)
Please refer to the [Configuration Guide](http://spark.apache.org/docs/latest/configuration.html)
in the online documentation for an overview on how to configure Spark.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ public UnsafeShuffleExternalSorter(
this.maxRecordSizeBytes = pageSizeBytes - 4;
this.writeMetrics = writeMetrics;
initializeForWriting();

// preserve first page to ensure that we have at least one page to work with. Otherwise,
// other operators in the same task may starve this sorter (SPARK-9709).
acquireNewPageIfNecessary(pageSizeBytes);
}

/**
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,10 @@ class SparkException(message: String, cause: Throwable)
*/
private[spark] class SparkDriverExecutionException(cause: Throwable)
extends SparkException("Execution error", cause)

/**
* Exception thrown when the main user code is run as a child process (e.g. pyspark) and we want
* the parent SparkSubmit process to exit with the same exit code.
*/
private[spark] case class SparkUserAppException(exitCode: Int)
extends SparkException(s"User application exited with $exitCode")
29 changes: 25 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.util.Try

import org.apache.spark.SparkUserAppException
import org.apache.spark.api.python.PythonUtils
import org.apache.spark.util.{RedirectThread, Utils}

Expand All @@ -46,7 +47,20 @@ object PythonRunner {
// Launch a Py4J gateway server for the process to connect to; this will let it see our
// Java system properties and such
val gatewayServer = new py4j.GatewayServer(null, 0)
gatewayServer.start()
val thread = new Thread(new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions {
gatewayServer.start()
}
})
thread.setName("py4j-gateway-init")
thread.setDaemon(true)
thread.start()

// Wait until the gateway server has started, so that we know which port is it bound to.
// `gatewayServer.start()` will start a new thread and run the server code there, after
// initializing the socket, so the thread started above will end as soon as the server is
// ready to serve connections.
thread.join()

// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
// python directories in SPARK_HOME (if set), and any files in the pyFiles argument
Expand All @@ -64,11 +78,18 @@ object PythonRunner {
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()
try {
val process = builder.start()

new RedirectThread(process.getInputStream, System.out, "redirect output").start()
new RedirectThread(process.getInputStream, System.out, "redirect output").start()

System.exit(process.waitFor())
val exitCode = process.waitFor()
if (exitCode != 0) {
throw new SparkUserAppException(exitCode)
}
} finally {
gatewayServer.shutdown()
}
}

/**
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}

import org.apache.spark.{SparkUserAppException, SPARK_VERSION}
import org.apache.spark.api.r.RUtils
import org.apache.spark.SPARK_VERSION
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}

Expand Down Expand Up @@ -672,7 +672,13 @@ object SparkSubmit {
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
throw findCause(t)
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)

case t: Throwable =>
throw t
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// Disable the background thread during tests.
if (!conf.contains("spark.testing")) {
// A task that periodically checks for event log updates on disk.
pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)

if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
// A task that periodically cleans event logs on disk.
pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
}
}
}
Expand Down Expand Up @@ -204,11 +204,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
mod1 >= mod2
}

logInfos.sliding(20, 20).foreach { batch =>
replayExecutor.submit(new Runnable {
override def run(): Unit = mergeApplicationListing(batch)
})
}
logInfos.grouped(20)
.map { batch =>
replayExecutor.submit(new Runnable {
override def run(): Unit = mergeApplicationListing(batch)
})
}
.foreach { task =>
try {
// Wait for all tasks to finish. This makes sure that checkForLogs
// is not scheduled again while some tasks are already running in
// the replayExecutor.
task.get()
} catch {
case e: InterruptedException =>
throw e
case e: Exception =>
logError("Exception while merging application listings", e)
}
}

lastModifiedTime = newLastModifiedTime
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
try {
Snappy.getNativeLibraryVersion
} catch {
case e: Error => throw new IllegalArgumentException
case e: Error => throw new IllegalArgumentException(e)
}

override def compressedOutputStream(s: OutputStream): OutputStream = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M

// In certain join operations, prepare can be called on the same partition multiple times.
// In this case, we need to ensure that each call to compute gets a separate prepare argument.
private[this] var preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]
private[this] val preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]

/**
* Prepare a partition for a single call to compute.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils


/**
* A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
* separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
Expand Down Expand Up @@ -127,7 +126,7 @@ private[spark] class MesosSchedulerBackend(
}
val builder = MesosExecutorInfo.newBuilder()
val (resourcesAfterCpu, usedCpuResources) =
partitionResources(availableResources, "cpus", scheduler.CPUS_PER_TASK)
partitionResources(availableResources, "cpus", mesosExecutorCores)
val (resourcesAfterMem, usedMemResources) =
partitionResources(resourcesAfterCpu, "mem", calculateTotalMemory(sc))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon

private def doStop(): Unit = {
// Only perform cleanup if an external service is not serving our shuffle files.
if (!blockManager.externalShuffleServiceEnabled || blockManager.blockManagerId.isDriver) {
// Also blockManagerId could be null if block manager is not initialized properly.
if (!blockManager.externalShuffleServiceEnabled ||
(blockManager.blockManagerId != null && blockManager.blockManagerId.isDriver)) {
localDirs.foreach { localDir =>
if (localDir.isDirectory() && localDir.exists()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ private[spark] class ExternalSorter[K, V, C](

private val spills = new ArrayBuffer[SpilledFile]

/**
* Number of files this sorter has spilled so far.
* Exposed for testing.
*/
private[spark] def numSpills: Int = spills.size

override def insertAll(records: Iterator[Product2[K, V]]): Unit = {
// TODO: stop combining if we find that the reduction factor isn't high
val shouldCombine = aggregator.isDefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,9 @@ public void testPeakMemoryUsed() throws Exception {
for (int i = 0; i < numRecordsPerPage * 10; i++) {
writer.insertRecordIntoSorter(new Tuple2<Object, Object>(1, 1));
newPeakMemory = writer.getPeakMemoryUsedBytes();
if (i % numRecordsPerPage == 0) {
// We allocated a new page for this record, so peak memory should change
if (i % numRecordsPerPage == 0 && i != 0) {
// The first page is allocated in constructor, another page will be allocated after
// every numRecordsPerPage records (peak memory should change).
assertEquals(previousPeakMemory + pageSizeBytes, newPeakMemory);
} else {
assertEquals(previousPeakMemory, newPeakMemory);
Expand Down
68 changes: 45 additions & 23 deletions core/src/test/scala/org/apache/spark/ThreadingSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,30 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
val nums = sc.parallelize(1 to 2, 2)
val sem = new Semaphore(0)
ThreadingSuiteState.clear()
var throwable: Option[Throwable] = None
for (i <- 0 until 2) {
new Thread {
override def run() {
val ans = nums.map(number => {
val running = ThreadingSuiteState.runningThreads
running.getAndIncrement()
val time = System.currentTimeMillis()
while (running.get() != 4 && System.currentTimeMillis() < time + 1000) {
Thread.sleep(100)
}
if (running.get() != 4) {
ThreadingSuiteState.failed.set(true)
}
number
}).collect()
assert(ans.toList === List(1, 2))
sem.release()
try {
val ans = nums.map(number => {
val running = ThreadingSuiteState.runningThreads
running.getAndIncrement()
val time = System.currentTimeMillis()
while (running.get() != 4 && System.currentTimeMillis() < time + 1000) {
Thread.sleep(100)
}
if (running.get() != 4) {
ThreadingSuiteState.failed.set(true)
}
number
}).collect()
assert(ans.toList === List(1, 2))
} catch {
case t: Throwable =>
throwable = Some(t)
} finally {
sem.release()
}
}
}.start()
}
Expand All @@ -145,18 +152,25 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
ThreadingSuiteState.runningThreads.get() + "); failing test")
fail("One or more threads didn't see runningThreads = 4")
}
throwable.foreach { t => throw t }
}

test("set local properties in different thread") {
sc = new SparkContext("local", "test")
val sem = new Semaphore(0)

var throwable: Option[Throwable] = None
val threads = (1 to 5).map { i =>
new Thread() {
override def run() {
sc.setLocalProperty("test", i.toString)
assert(sc.getLocalProperty("test") === i.toString)
sem.release()
try {
sc.setLocalProperty("test", i.toString)
assert(sc.getLocalProperty("test") === i.toString)
} catch {
case t: Throwable =>
throwable = Some(t)
} finally {
sem.release()
}
}
}
}
Expand All @@ -165,20 +179,27 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {

sem.acquire(5)
assert(sc.getLocalProperty("test") === null)
throwable.foreach { t => throw t }
}

test("set and get local properties in parent-children thread") {
sc = new SparkContext("local", "test")
sc.setLocalProperty("test", "parent")
val sem = new Semaphore(0)

var throwable: Option[Throwable] = None
val threads = (1 to 5).map { i =>
new Thread() {
override def run() {
assert(sc.getLocalProperty("test") === "parent")
sc.setLocalProperty("test", i.toString)
assert(sc.getLocalProperty("test") === i.toString)
sem.release()
try {
assert(sc.getLocalProperty("test") === "parent")
sc.setLocalProperty("test", i.toString)
assert(sc.getLocalProperty("test") === i.toString)
} catch {
case t: Throwable =>
throwable = Some(t)
} finally {
sem.release()
}
}
}
}
Expand All @@ -188,6 +209,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
sem.acquire(5)
assert(sc.getLocalProperty("test") === "parent")
assert(sc.getLocalProperty("Foo") === null)
throwable.foreach { t => throw t }
}

test("mutations to local properties should not affect submitted jobs (SPARK-6629)") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
private def runAndReturnMetrics(job: => Unit,
collector: (SparkListenerTaskEnd) => Option[Long]): Long = {
val taskMetrics = new ArrayBuffer[Long]()

// Avoid receiving earlier taskEnd events
sc.listenerBus.waitUntilEmpty(500)

sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
collector(taskEnd).foreach(taskMetrics += _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ class DAGSchedulerSuite
}

// The map stage should have been submitted.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 1)

complete(taskSets(0), Seq(
Expand Down
Loading