{p.name}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 1b34ba9f03c44..5183c80ab4526 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -99,7 +99,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val parameterTaskPageSize = request.getParameter("task.pageSize")
val taskPage = Option(parameterTaskPage).map(_.toInt).getOrElse(1)
- val taskSortColumn = Option(parameterTaskSortColumn).getOrElse("Index")
+ val taskSortColumn = Option(parameterTaskSortColumn).map { sortColumn =>
+ UIUtils.decodeURLParameter(sortColumn)
+ }.getOrElse("Index")
val taskSortDesc = Option(parameterTaskSortDesc).map(_.toBoolean).getOrElse(false)
val taskPageSize = Option(parameterTaskPageSize).map(_.toInt).getOrElse(100)
diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
index 14b6ba4af489a..86bbaa20f6cf2 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
@@ -17,7 +17,7 @@
package org.apache.spark.util.logging
-import java.io.{File, FileOutputStream, InputStream}
+import java.io.{File, FileOutputStream, InputStream, IOException}
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.util.{IntParam, Utils}
@@ -29,7 +29,6 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
extends Logging {
@volatile private var outputStream: FileOutputStream = null
@volatile private var markedForStop = false // has the appender been asked to stopped
- @volatile private var stopped = false // has the appender stopped
// Thread that reads the input stream and writes to file
private val writingThread = new Thread("File appending thread for " + file) {
@@ -47,11 +46,7 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
* or because of any error in appending
*/
def awaitTermination() {
- synchronized {
- if (!stopped) {
- wait()
- }
- }
+ writingThread.join()
}
/** Stop the appender */
@@ -63,24 +58,28 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
protected def appendStreamToFile() {
try {
logDebug("Started appending thread")
- openFile()
- val buf = new Array[Byte](bufferSize)
- var n = 0
- while (!markedForStop && n != -1) {
- n = inputStream.read(buf)
- if (n != -1) {
- appendToFile(buf, n)
+ Utils.tryWithSafeFinally {
+ openFile()
+ val buf = new Array[Byte](bufferSize)
+ var n = 0
+ while (!markedForStop && n != -1) {
+ try {
+ n = inputStream.read(buf)
+ } catch {
+ // An InputStream can throw IOException during read if the stream is closed
+ // asynchronously, so once appender has been flagged to stop these will be ignored
+ case _: IOException if markedForStop => // do nothing and proceed to stop appending
+ }
+ if (n > 0) {
+ appendToFile(buf, n)
+ }
}
+ } {
+ closeFile()
}
} catch {
case e: Exception =>
logError(s"Error writing stream to file $file", e)
- } finally {
- closeFile()
- synchronized {
- stopped = true
- notifyAll()
- }
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
index dd8d5ec27f87e..bc8a5d494dbd3 100644
--- a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
@@ -67,6 +67,20 @@ class UIUtilsSuite extends SparkFunSuite {
s"\nRunning progress bar should round down\n\nExpected:\n$expected\nGenerated:\n$generated")
}
+ test("decodeURLParameter (SPARK-12708: Sorting task error in Stages Page when yarn mode.)") {
+ val encoded1 = "%252F"
+ val decoded1 = "/"
+ val encoded2 = "%253Cdriver%253E"
+ val decoded2 = ""
+
+ assert(decoded1 === decodeURLParameter(encoded1))
+ assert(decoded2 === decodeURLParameter(encoded2))
+
+ // verify that no affect to decoded URL.
+ assert(decoded1 === decodeURLParameter(decoded1))
+ assert(decoded2 === decodeURLParameter(decoded2))
+ }
+
private def verify(
desc: String, expected: Elem, errorMsg: String = "", baseUrl: String = ""): Unit = {
val generated = makeDescription(desc, baseUrl)
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index 2b76ae1f8a24b..5a14fc7b1d38a 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -18,14 +18,18 @@
package org.apache.spark.util
import java.io._
+import java.util.concurrent.CountDownLatch
import scala.collection.mutable.HashSet
import scala.reflect._
-import org.scalatest.BeforeAndAfter
-
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
+import org.apache.log4j.{Appender, Level, Logger}
+import org.apache.log4j.spi.LoggingEvent
+import org.mockito.ArgumentCaptor
+import org.mockito.Mockito.{atLeast, mock, verify}
+import org.scalatest.BeforeAndAfter
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
import org.apache.spark.util.logging.{RollingFileAppender, SizeBasedRollingPolicy, TimeBasedRollingPolicy, FileAppender}
@@ -189,6 +193,67 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
testAppenderSelection[FileAppender, Any](rollingStrategy("xyz"))
}
+ test("file appender async close stream abruptly") {
+ // Test FileAppender reaction to closing InputStream using a mock logging appender
+ val mockAppender = mock(classOf[Appender])
+ val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]
+
+ // Make sure only logging errors
+ val logger = Logger.getRootLogger
+ logger.setLevel(Level.ERROR)
+ logger.addAppender(mockAppender)
+
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream)
+
+ // Close the stream before appender tries to read will cause an IOException
+ testInputStream.close()
+ testOutputStream.close()
+ val appender = FileAppender(testInputStream, testFile, new SparkConf)
+
+ appender.awaitTermination()
+
+ // If InputStream was closed without first stopping the appender, an exception will be logged
+ verify(mockAppender, atLeast(1)).doAppend(loggingEventCaptor.capture)
+ val loggingEvent = loggingEventCaptor.getValue
+ assert(loggingEvent.getThrowableInformation !== null)
+ assert(loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
+ }
+
+ test("file appender async close stream gracefully") {
+ // Test FileAppender reaction to closing InputStream using a mock logging appender
+ val mockAppender = mock(classOf[Appender])
+ val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]
+
+ // Make sure only logging errors
+ val logger = Logger.getRootLogger
+ logger.setLevel(Level.ERROR)
+ logger.addAppender(mockAppender)
+
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream) with LatchedInputStream
+
+ // Close the stream before appender tries to read will cause an IOException
+ testInputStream.close()
+ testOutputStream.close()
+ val appender = FileAppender(testInputStream, testFile, new SparkConf)
+
+ // Stop the appender before an IOException is called during read
+ testInputStream.latchReadStarted.await()
+ appender.stop()
+ testInputStream.latchReadProceed.countDown()
+
+ appender.awaitTermination()
+
+ // Make sure no IOException errors have been logged as a result of appender closing gracefully
+ verify(mockAppender, atLeast(0)).doAppend(loggingEventCaptor.capture)
+ import scala.collection.JavaConverters._
+ loggingEventCaptor.getAllValues.asScala.foreach { loggingEvent =>
+ assert(loggingEvent.getThrowableInformation === null
+ || !loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
+ }
+ }
+
/**
* Run the rolling file appender with data and see whether all the data was written correctly
* across rolled over files.
@@ -229,4 +294,15 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
file.getName.startsWith(testFile.getName)
}.foreach { _.delete() }
}
+
+ /** Used to synchronize when read is called on a stream */
+ private trait LatchedInputStream extends PipedInputStream {
+ val latchReadStarted = new CountDownLatch(1)
+ val latchReadProceed = new CountDownLatch(1)
+ abstract override def read(): Int = {
+ latchReadStarted.countDown()
+ latchReadProceed.await()
+ super.read()
+ }
+ }
}
diff --git a/docs/ml-guide.md b/docs/ml-guide.md
index 44a316a07dfef..6d9659686f96c 100644
--- a/docs/ml-guide.md
+++ b/docs/ml-guide.md
@@ -428,7 +428,7 @@ This example follows the simple text document `Pipeline` illustrated in the figu
{% highlight scala %}
-import org.apache.spark.ml.Pipeline
+import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.mllib.linalg.Vector
@@ -466,7 +466,7 @@ model.save("/tmp/spark-logistic-regression-model")
pipeline.save("/tmp/unfit-lr-model")
// and load it back in during production
-val sameModel = Pipeline.load("/tmp/spark-logistic-regression-model")
+val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = sqlContext.createDataFrame(Seq(
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 3193e17853483..ed720f1039f94 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -202,7 +202,7 @@ where each application gets more or fewer machines as it ramps up and down, but
additional overhead in launching each task. This mode may be inappropriate for low-latency
requirements like interactive queries or serving web requests.
-To run in coarse-grained mode, set the `spark.mesos.coarse` property to false in your
+To run in fine-grained mode, set the `spark.mesos.coarse` property to false in your
[SparkConf](configuration.html#spark-properties):
{% highlight scala %}
@@ -266,13 +266,11 @@ See the [configuration page](configuration.html) for information on Spark config
| Property Name | Default | Meaning |
spark.mesos.coarse |
- false |
+ true |
- If set to true, runs over Mesos clusters in
- "coarse-grained" sharing mode,
- where Spark acquires one long-lived Mesos task on each machine instead of one Mesos task per
- Spark task. This gives lower-latency scheduling for short queries, but leaves resources in use
- for the whole duration of the Spark job.
+ If set to true, runs over Mesos clusters in "coarse-grained" sharing mode, where Spark acquires one long-lived Mesos task on each machine.
+ If set to false, runs over Mesos cluster in "fine-grained" sharing mode, where one Mesos task is created per Spark task.
+ Detailed information in 'Mesos Run Modes'.
|
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index 1dbedaaca3d67..30a184901925c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -152,7 +152,7 @@ class Word2Vec extends Serializable with Logging {
/** context words from [-window, window] */
private var window = 5
- private var trainWordsCount = 0
+ private var trainWordsCount = 0L
private var vocabSize = 0
@transient private var vocab: Array[VocabWord] = null
@transient private var vocabHash = mutable.HashMap.empty[String, Int]
@@ -160,13 +160,13 @@ class Word2Vec extends Serializable with Logging {
private def learnVocab(words: RDD[String]): Unit = {
vocab = words.map(w => (w, 1))
.reduceByKey(_ + _)
+ .filter(_._2 >= minCount)
.map(x => VocabWord(
x._1,
x._2,
new Array[Int](MAX_CODE_LENGTH),
new Array[Int](MAX_CODE_LENGTH),
0))
- .filter(_.cn >= minCount)
.collect()
.sortWith((a, b) => a.cn > b.cn)
@@ -180,7 +180,7 @@ class Word2Vec extends Serializable with Logging {
trainWordsCount += vocab(a).cn
a += 1
}
- logInfo("trainWordsCount = " + trainWordsCount)
+ logInfo(s"vocabSize = $vocabSize, trainWordsCount = $trainWordsCount")
}
private def createExpTable(): Array[Float] = {
@@ -330,7 +330,7 @@ class Word2Vec extends Serializable with Logging {
val random = new XORShiftRandom(seed ^ ((idx + 1) << 16) ^ ((-k - 1) << 8))
val syn0Modify = new Array[Int](vocabSize)
val syn1Modify = new Array[Int](vocabSize)
- val model = iter.foldLeft((syn0Global, syn1Global, 0, 0)) {
+ val model = iter.foldLeft((syn0Global, syn1Global, 0L, 0L)) {
case ((syn0, syn1, lastWordCount, wordCount), sentence) =>
var lwc = lastWordCount
var wc = wordCount
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
index 23c8d7c7c8075..1c583a45153ee 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
@@ -109,7 +109,9 @@ private[stat] object ChiSqTest extends Logging {
}
i += 1
distinctLabels += label
- features.toArray.view.zipWithIndex.slice(startCol, endCol).map { case (feature, col) =>
+ val brzFeatures = features.toBreeze
+ (startCol until endCol).map { col =>
+ val feature = brzFeatures(col)
allDistinctFeatures(col) += feature
(col, feature, label)
}
@@ -122,7 +124,7 @@ private[stat] object ChiSqTest extends Logging {
pairCounts.keys.filter(_._1 == startCol).map(_._3).toArray.distinct.zipWithIndex.toMap
}
val numLabels = labels.size
- pairCounts.keys.groupBy(_._1).map { case (col, keys) =>
+ pairCounts.keys.groupBy(_._1).foreach { case (col, keys) =>
val features = keys.map(_._2).toArray.distinct.zipWithIndex.toMap
val numRows = features.size
val contingency = new BDM(numRows, numLabels, new Array[Double](numRows * numLabels))
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index 99331297c19f0..26cafca8b8381 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -76,4 +76,6 @@
# which allows us to execute the user's PYTHONSTARTUP file:
_pythonstartup = os.environ.get('OLD_PYTHONSTARTUP')
if _pythonstartup and os.path.isfile(_pythonstartup):
- execfile(_pythonstartup)
+ with open(_pythonstartup) as f:
+ code = compile(f.read(), _pythonstartup, 'exec')
+ exec(code)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
index f0697613cff3b..f7596300e89f1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
@@ -28,11 +28,13 @@ import scala.reflect.ClassTag
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input, Output}
+import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.exec.{UDF, Utilities}
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils
import org.apache.hadoop.hive.serde2.avro.{AvroGenericRecordWritable, AvroSerdeUtils}
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector
@@ -47,6 +49,7 @@ private[hive] object HiveShim {
// scale Hive 0.13 infers for BigDecimals from sources that don't specify them (e.g. UDFs)
val UNLIMITED_DECIMAL_PRECISION = 38
val UNLIMITED_DECIMAL_SCALE = 18
+ val HIVE_GENERIC_UDF_MACRO_CLS = "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro"
/*
* This function in hive-0.13 become private, but we have to do this to walkaround hive bug
@@ -125,6 +128,26 @@ private[hive] object HiveShim {
// for Serialization
def this() = this(null)
+ override def hashCode(): Int = {
+ if (functionClassName == HIVE_GENERIC_UDF_MACRO_CLS) {
+ Objects.hashCode(functionClassName, instance.asInstanceOf[GenericUDFMacro].getBody())
+ } else {
+ functionClassName.hashCode()
+ }
+ }
+
+ override def equals(other: Any): Boolean = other match {
+ case a: HiveFunctionWrapper if functionClassName == a.functionClassName =>
+ // In case of udf macro, check to make sure they point to the same underlying UDF
+ if (functionClassName == HIVE_GENERIC_UDF_MACRO_CLS) {
+ a.instance.asInstanceOf[GenericUDFMacro].getBody() ==
+ instance.asInstanceOf[GenericUDFMacro].getBody()
+ } else {
+ true
+ }
+ case _ => false
+ }
+
@transient
def deserializeObjectByKryo[T: ClassTag](
kryo: Kryo,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 9deb1a6db15ad..f8b0f01f9a873 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -350,6 +350,13 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
sqlContext.dropTempTable("testUDF")
}
+ test("Hive UDF in group by") {
+ Seq(Tuple1(1451400761)).toDF("test_date").registerTempTable("tab1")
+ val count = sql("select date(cast(test_date as timestamp))" +
+ " from tab1 group by date(cast(test_date as timestamp))").count()
+ assert(count == 1)
+ }
+
test("SPARK-11522 select input_file_name from non-parquet table"){
withTempDir { tempDir =>
|