Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
ba14f28
test
zhzhan Aug 8, 2014
f6a8a40
revert
zhzhan Aug 8, 2014
cb53a2c
Merge branch 'master' of https://github.com/apache/spark
zhzhan Aug 30, 2014
789ea21
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 2, 2014
921e914
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 2, 2014
e4c1982
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 5, 2014
af9feb9
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 10, 2014
1ccd7cc
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 16, 2014
2b0d513
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 17, 2014
3ee3b2b
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 22, 2014
68deb11
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 22, 2014
7e0cc36
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 25, 2014
d10bf00
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 30, 2014
adf4924
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 1, 2014
cedcc6f
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 1, 2014
301eb4a
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 5, 2014
a72c0d4
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 9, 2014
4a2e36d
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 13, 2014
497b0f4
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 13, 2014
a00f60f
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 16, 2014
a9d372b
Merge branch 'master' of https://github.com/zhzhan/spark
zhzhan Oct 16, 2014
3764505
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 17, 2014
f406a83
SPARK-3926 [CORE] Result of JavaRDD.collectAsMap() is not Serializable
srowen Oct 18, 2014
05db2da
[SPARK-3952] [Streaming] [PySpark] add Python examples in Streaming P…
davies Oct 19, 2014
7e63bb4
[SPARK-2546] Clone JobConf for each task (branch-1.0 / 1.1 backport)
JoshRosen Oct 19, 2014
d1966f3
[SPARK-3902] [SPARK-3590] Stabilize AsynRDDActions and add Java API
JoshRosen Oct 20, 2014
c7aeecd
[SPARK-3948][Shuffle]Fix stream corruption bug in sort-based shuffle
jerryshao Oct 20, 2014
51afde9
[SPARK-4010][Web UI]Spark UI returns 500 in yarn-client mode
witgo Oct 20, 2014
ea054e1
[SPARK-3986][SQL] Fix package names to fit their directory names.
ueshin Oct 20, 2014
4afe9a4
[SPARK-3736] Workers reconnect when disassociated from the master.
mccheah Oct 20, 2014
eadc4c5
[SPARK-3207][MLLIB]Choose splits for continuous features in DecisionT…
chouqin Oct 20, 2014
1b3ce61
[SPARK-3906][SQL] Adds multiple join support for SQLContext
liancheng Oct 20, 2014
e9c1afa
[SPARK-3800][SQL] Clean aliases from grouping expressions
marmbrus Oct 20, 2014
364d52b
[SPARK-3966][SQL] Fix nullabilities of Cast related to DateType.
ueshin Oct 20, 2014
fce1d41
[SPARK-3945]Properties of hive-site.xml is invalid in running the Thr…
luogankun Oct 20, 2014
7586e2e
[SPARK-3969][SQL] Optimizer should have a super class as an interface.
ueshin Oct 21, 2014
0fe1c09
[SPARK-3940][SQL] Avoid console printing error messages three times
wangxiaojing Oct 21, 2014
342b57d
Update Building Spark link.
rxin Oct 21, 2014
5a8f64f
[SPARK-3958] TorrentBroadcast cleanup / debugging improvements.
JoshRosen Oct 21, 2014
8570816
[SPARK-4023] [MLlib] [PySpark] convert rdd into RDD of Vector
Oct 21, 2014
2aeb84b
replace awaitTransformation with awaitTermination in scaladoc/javadoc
holdenk Oct 21, 2014
c262cd5
[SPARK-4035] Fix a wrong format specifier
zsxwing Oct 21, 2014
61ca774
[SPARK-4020] Do not rely on timeouts to remove failed block managers
andrewor14 Oct 21, 2014
1a623b2
SPARK-3770: Make userFeatures accessible from python
Oct 21, 2014
5fdaf52
[SPARK-3994] Use standard Aggregator code path for countByKey and cou…
aarondav Oct 21, 2014
814a9cd
SPARK-3568 [mllib] add ranking metrics
coderxiang Oct 21, 2014
856b081
[SQL]redundant methods for broadcast
scwf Oct 21, 2014
6bb56fa
SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy
sryza Oct 22, 2014
bae4ca3
Update JavaCustomReceiver.java
Oct 22, 2014
f05e09b
use isRunningLocally rather than runningLocally
CrazyJvm Oct 22, 2014
97cf19f
Fix for sampling error in NumPy v1.9 [SPARK-3995][PYSPARK]
freeman-lab Oct 22, 2014
93f3081
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 22, 2014
fa2e42b
Merge branch 'orc1' of https://github.com/scwf/spark into orc1
zhzhan Oct 22, 2014
ef84c9d
make wrap consistent with InsertIntoHiveTable.wrapperFor
zhzhan Oct 22, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ To build Spark and its example programs, run:

(You do not need to do this if you downloaded a pre-built package.)
More detailed documentation is available from the project site, at
["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).
["Building Spark with Maven"](http://spark.apache.org/docs/latest/building-with-maven.html).

## Interactive Scala Shell

Expand Down
33 changes: 33 additions & 0 deletions core/src/main/java/org/apache/spark/api/java/JavaFutureAction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java;


import java.util.List;
import java.util.concurrent.Future;

public interface JavaFutureAction<T> extends Future<T> {

/**
* Returns the job IDs run by the underlying async operation.
*
* This returns the current snapshot of the job list. Certain operations may run multiple
* jobs, so multiple calls to this method may return different lists.
*/
List<Integer> jobIds();
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val computedValues = rdd.computeOrReadCheckpoint(partition, context)

// If the task is running locally, do not persist the result
if (context.runningLocally) {
if (context.isRunningLocally) {
return computedValues
}

Expand Down
86 changes: 71 additions & 15 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@

package org.apache.spark

import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.util.Try
import java.util.Collections
import java.util.concurrent.TimeUnit

import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaFutureAction
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter}

import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.util.{Failure, Try}

/**
* :: Experimental ::
* A future for the result of an action to support cancellation. This is an extension of the
* Scala Future interface to support cancellation.
*/
@Experimental
trait FutureAction[T] extends Future[T] {
// Note that we redefine methods of the Future trait here explicitly so we can specify a different
// documentation (with reference to the word "action").
Expand Down Expand Up @@ -69,6 +70,11 @@ trait FutureAction[T] extends Future[T] {
*/
override def isCompleted: Boolean

/**
* Returns whether the action has been cancelled.
*/
def isCancelled: Boolean

/**
* The value of this Future.
*
Expand Down Expand Up @@ -96,15 +102,16 @@ trait FutureAction[T] extends Future[T] {


/**
* :: Experimental ::
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
* count, collect, reduce.
*/
@Experimental
class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
extends FutureAction[T] {

@volatile private var _cancelled: Boolean = false

override def cancel() {
_cancelled = true
jobWaiter.cancel()
}

Expand Down Expand Up @@ -143,6 +150,8 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
}

override def isCompleted: Boolean = jobWaiter.jobFinished

override def isCancelled: Boolean = _cancelled

override def value: Option[Try[T]] = {
if (jobWaiter.jobFinished) {
Expand All @@ -164,12 +173,10 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:


/**
* :: Experimental ::
* A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
* takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
* action thread if it is being blocked by a job.
*/
@Experimental
class ComplexFutureAction[T] extends FutureAction[T] {

// Pointer to the thread that is executing the action. It is set when the action is run.
Expand Down Expand Up @@ -222,7 +229,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
// If the action hasn't been cancelled yet, submit the job. The check and the submitJob
// command need to be in an atomic block.
val job = this.synchronized {
if (!cancelled) {
if (!isCancelled) {
rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc)
} else {
throw new SparkException("Action has been cancelled")
Expand All @@ -243,10 +250,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
}
}

/**
* Returns whether the promise has been cancelled.
*/
def cancelled: Boolean = _cancelled
override def isCancelled: Boolean = _cancelled

@throws(classOf[InterruptedException])
@throws(classOf[scala.concurrent.TimeoutException])
Expand All @@ -271,3 +275,55 @@ class ComplexFutureAction[T] extends FutureAction[T] {
def jobIds = jobs

}

private[spark]
class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S => T)
extends JavaFutureAction[T] {

import scala.collection.JavaConverters._

override def isCancelled: Boolean = futureAction.isCancelled

override def isDone: Boolean = {
// According to java.util.Future's Javadoc, this returns True if the task was completed,
// whether that completion was due to successful execution, an exception, or a cancellation.
futureAction.isCancelled || futureAction.isCompleted
}

override def jobIds(): java.util.List[java.lang.Integer] = {
Collections.unmodifiableList(futureAction.jobIds.map(Integer.valueOf).asJava)
}

private def getImpl(timeout: Duration): T = {
// This will throw TimeoutException on timeout:
Await.ready(futureAction, timeout)
futureAction.value.get match {
case scala.util.Success(value) => converter(value)
case Failure(exception) =>
if (isCancelled) {
throw new CancellationException("Job cancelled").initCause(exception)
} else {
// java.util.Future.get() wraps exceptions in ExecutionException
throw new ExecutionException("Exception thrown by job", exception)
}
}
}

override def get(): T = getImpl(Duration.Inf)

override def get(timeout: Long, unit: TimeUnit): T =
getImpl(Duration.fromNanos(unit.toNanos(timeout)))

override def cancel(mayInterruptIfRunning: Boolean): Boolean = synchronized {
if (isDone) {
// According to java.util.Future's Javadoc, this should return false if the task is completed.
false
} else {
// We're limited in terms of the semantics we can provide here; our cancellation is
// asynchronous and doesn't provide a mechanism to not cancel if the job is running.
futureAction.cancel()
true
}
}

}
17 changes: 16 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.collection.mutable.{HashMap, LinkedHashSet}
import org.apache.spark.serializer.KryoSerializer

/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
Expand Down Expand Up @@ -140,6 +141,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
this
}

/**
* Use Kryo serialization and register the given set of classes with Kryo.
* If called multiple times, this will append the classes from all calls together.
*/
def registerKryoClasses(classes: Array[Class[_]]): SparkConf = {
val allClassNames = new LinkedHashSet[String]()
allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').filter(!_.isEmpty)
allClassNames ++= classes.map(_.getName)

set("spark.kryo.classesToRegister", allClassNames.mkString(","))
set("spark.serializer", classOf[KryoSerializer].getName)
this
}

/** Remove a parameter from the configuration */
def remove(key: String): SparkConf = {
settings.remove(key)
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ class SparkContext(config: SparkConf) extends Logging {
None
}

// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
ui.foreach(_.bind())

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)

Expand Down Expand Up @@ -341,10 +345,6 @@ class SparkContext(config: SparkConf) extends Logging {
postEnvironmentUpdate()
postApplicationStart()

// Bind the SparkUI after starting the task scheduler
// because certain pages and listeners depend on it
ui.foreach(_.bind())

private[spark] var checkpointDir: Option[String] = None

// Thread Local variable that can be used by users to pass information down the stack
Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.Partitioner._
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
Expand Down Expand Up @@ -265,10 +266,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* before sending results to a reducer, similarly to a "combiner" in MapReduce.
*/
def reduceByKeyLocally(func: JFunction2[V, V, V]): java.util.Map[K, V] =
mapAsJavaMap(rdd.reduceByKeyLocally(func))
mapAsSerializableJavaMap(rdd.reduceByKeyLocally(func))

/** Count the number of elements for each key, and return the result to the master as a Map. */
def countByKey(): java.util.Map[K, Long] = mapAsJavaMap(rdd.countByKey())
def countByKey(): java.util.Map[K, Long] = mapAsSerializableJavaMap(rdd.countByKey())

/**
* :: Experimental ::
Expand All @@ -277,7 +278,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*/
@Experimental
def countByKeyApprox(timeout: Long): PartialResult[java.util.Map[K, BoundedDouble]] =
rdd.countByKeyApprox(timeout).map(mapAsJavaMap)
rdd.countByKeyApprox(timeout).map(mapAsSerializableJavaMap)

/**
* :: Experimental ::
Expand All @@ -287,7 +288,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
@Experimental
def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
: PartialResult[java.util.Map[K, BoundedDouble]] =
rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap)
rdd.countByKeyApprox(timeout, confidence).map(mapAsSerializableJavaMap)

/**
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
Expand Down Expand Up @@ -614,7 +615,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
/**
* Return the key-value pairs in this RDD to the master as a Map.
*/
def collectAsMap(): java.util.Map[K, V] = mapAsJavaMap(rdd.collectAsMap())
def collectAsMap(): java.util.Map[K, V] = mapAsSerializableJavaMap(rdd.collectAsMap())


/**
* Pass each value in the key-value pair RDD through a map function without changing the keys;
Expand Down
Loading