Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8ad83b8
[SPARK-23462][SQL] improve missing field error message in `StructType`
xysun Mar 12, 2018
041a416
[SPARK-23618][K8S][BUILD] Initialize BUILD_ARGS in docker-image-tool.sh
Mar 12, 2018
31814c7
[SPARK-23412][ML] Add cosine distance to BisectingKMeans
mgaido91 Mar 12, 2018
1c05d46
[SPARK-23656][TEST] Perform assertions in XXH64Suite.testKnownByteArr…
kiszk Mar 13, 2018
5348c55
[MINOR][SQL][TEST] Create table using `dataSourceName` in `HadoopFsRe…
jiangxb1987 Mar 13, 2018
0f78b51
[SPARK-23547][SQL] Cleanup the .pipeout file when the Hive Session cl…
Mar 13, 2018
8288d9d
[SPARK-23598][SQL] Make methods in BufferedRowIterator public to avoi…
kiszk Mar 13, 2018
26e5385
[SPARK-22915][MLLIB] Streaming tests for spark.ml.feature, from N to Z
attilapiros Mar 15, 2018
02fb832
[SPARK-23642][DOCS] AccumulatorV2 subclass isZero scaladoc fix
Mar 15, 2018
824ab5c
[SPARK-23533][SS] Add support for changing ContinuousDataReader's sta…
xuanyuanking Mar 15, 2018
a30bd0d
[SPARK-23695][PYTHON] Fix the error message for Kinesis streaming tests
HyukjinKwon Mar 15, 2018
89d4534
[HOT-FIX] Fix SparkOutOfMemoryError: Unable to acquire 262144 bytes o…
wangyum Mar 15, 2018
cbb79a6
[SPARK-23658][LAUNCHER] InProcessAppHandle uses the wrong class in ge…
Mar 16, 2018
e1343b8
[SPARK-23671][CORE] Fix condition to enable the SHS thread pool.
Mar 16, 2018
5dd8fde
[SPARK-23608][CORE][WEBUI] Add synchronization in SHS between attachS…
zhouyejoe Mar 16, 2018
52e4e7e
[SPARK-23670][SQL] Fix memory leak on SparkPlanGraphWrapper
myroslavlisniak Mar 16, 2018
5aa3850
[SPARK-23644][CORE][UI] Use absolute path for REST call in SHS
mgaido91 Mar 16, 2018
fa28e04
[SPARK-23635][YARN] AM env variable should not overwrite same name en…
jerryshao Mar 16, 2018
36166b8
[SPARK-23553][TESTS] Tests should not assume the default value of `sp…
dongjoon-hyun Mar 16, 2018
2295556
[SPARK-18371][STREAMING] Spark Streaming backpressure generates batch…
Mar 16, 2018
cf011c2
[SPARK-23581][SQL] Add interpreted unsafe projection
hvanhovell Mar 16, 2018
7e6a978
[SPARK-23680] Fix entrypoint.sh to properly support Arbitrary UIDs
Mar 16, 2018
43d5f0f
[SPARK-23623][SS] Avoid concurrent use of cached consumers in CachedK…
tdas Mar 16, 2018
269c789
[SPARK-15009][PYTHON][ML] Construct a CountVectorizerModel from a voc…
BryanCutler Mar 16, 2018
ac14bbc
[SPARK-23683][SQL] FileCommitProtocol.instantiate() hardening
steveloughran Mar 16, 2018
f10f59c
[SPARK-23706][PYTHON] spark.conf.get(value, default=None) should prod…
HyukjinKwon Mar 18, 2018
2bcbfd2
[SPARK-23645][PYTHON] Allow python udfs to be called with keyword arg…
mstewart141 Mar 11, 2018
a503af4
Incomplete / Show issue with partial fn in pandas_udf
mstewart141 Mar 18, 2018
e2a90d5
Add note RE no keyword args in python UDFs
mstewart141 Mar 18, 2018
b4f7056
Address comments
mstewart141 Mar 24, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/docker-image-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ function build {
else
# Not passed as an argument to docker, but used to validate the Spark directory.
IMG_PATH="kubernetes/dockerfiles"
BUILD_ARGS=()
fi

if [ ! -d "$IMG_PATH" ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ $(document).ready(function() {
status: (requestedIncomplete ? "running" : "completed")
};

$.getJSON("api/v1/applications", appParams, function(response,status,jqXHR) {
$.getJSON(uiRoot + "/api/v1/applications", appParams, function(response,status,jqXHR) {
var array = [];
var hasMultipleAttempts = false;
for (i in response) {
Expand Down Expand Up @@ -151,7 +151,7 @@ $(document).ready(function() {
"showCompletedColumns": !requestedIncomplete,
}

$.get("static/historypage-template.html", function(template) {
$.get(uiRoot + "/static/historypage-template.html", function(template) {
var sibling = historySummary.prev();
historySummary.detach();
var apps = $(Mustache.render($(template).filter("#history-summary-template").html(),data));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
* Fixed size thread pool to fetch and parse log files.
*/
private val replayExecutor: ExecutorService = {
if (Utils.isTesting) {
if (!Utils.isTesting) {
ThreadUtils.newDaemonFixedThreadPool(NUM_PROCESSING_THREADS, "log-replay-executor")
} else {
MoreExecutors.sameThreadExecutor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,18 @@ class HistoryServer(
ui: SparkUI,
completed: Boolean) {
assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs")
ui.getHandlers.foreach(attachHandler)
addFilters(ui.getHandlers, conf)
handlers.synchronized {
ui.getHandlers.foreach(attachHandler)
addFilters(ui.getHandlers, conf)
}
}

/** Detach a reconstructed UI from this server. Only valid after bind(). */
override def detachSparkUI(appId: String, attemptId: Option[String], ui: SparkUI): Unit = {
assert(serverInfo.isDefined, "HistoryServer must be bound before detaching SparkUIs")
ui.getHandlers.foreach(detachHandler)
handlers.synchronized {
ui.getHandlers.foreach(detachHandler)
}
provider.onUIDetached(appId, attemptId, ui)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.internal.io
import org.apache.hadoop.fs._
import org.apache.hadoop.mapreduce._

import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils


Expand Down Expand Up @@ -132,7 +133,7 @@ abstract class FileCommitProtocol {
}


object FileCommitProtocol {
object FileCommitProtocol extends Logging {
class TaskCommitMessage(val obj: Any) extends Serializable

object EmptyTaskCommitMessage extends TaskCommitMessage(null)
Expand All @@ -145,15 +146,23 @@ object FileCommitProtocol {
jobId: String,
outputPath: String,
dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol = {

logDebug(s"Creating committer $className; job $jobId; output=$outputPath;" +
s" dynamic=$dynamicPartitionOverwrite")
val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
// First try the constructor with arguments (jobId: String, outputPath: String,
// dynamicPartitionOverwrite: Boolean).
// If that doesn't exist, try the one with (jobId: string, outputPath: String).
try {
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean])
logDebug("Using (String, String, Boolean) constructor")
ctor.newInstance(jobId, outputPath, dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean])
} catch {
case _: NoSuchMethodException =>
logDebug("Falling back to (String, String) constructor")
require(!dynamicPartitionOverwrite,
"Dynamic Partition Overwrite is enabled but" +
s" the committer ${className} does not have the appropriate constructor")
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
ctor.newInstance(jobId, outputPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {
private var _count = 0L

/**
* Adds v to the accumulator, i.e. increment sum by v and count by 1.
* Returns false if this accumulator has had any values added to it or the sum is non-zero.
*
* @since 2.0.0
*/
override def isZero: Boolean = _sum == 0L && _count == 0
Expand Down Expand Up @@ -368,6 +369,9 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] {
private var _sum = 0.0
private var _count = 0L

/**
* Returns false if this accumulator has had any values added to it or the sum is non-zero.
*/
override def isZero: Boolean = _sum == 0.0 && _count == 0

override def copy(): DoubleAccumulator = {
Expand Down Expand Up @@ -441,6 +445,9 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] {
class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
private val _list: java.util.List[T] = Collections.synchronizedList(new ArrayList[T]())

/**
* Returns false if this accumulator instance has any values in it.
*/
override def isZero: Boolean = _list.isEmpty

override def copyAndReset(): CollectionAccumulator[T] = new CollectionAccumulator
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.internal.io

import org.apache.spark.SparkFunSuite

/**
* Unit tests for instantiation of FileCommitProtocol implementations.
*/
class FileCommitProtocolInstantiationSuite extends SparkFunSuite {

test("Dynamic partitions require appropriate constructor") {

// you cannot instantiate a two-arg client with dynamic partitions
// enabled.
val ex = intercept[IllegalArgumentException] {
instantiateClassic(true)
}
// check the contents of the message and rethrow if unexpected.
// this preserves the stack trace of the unexpected
// exception.
if (!ex.toString.contains("Dynamic Partition Overwrite")) {
fail(s"Wrong text in caught exception $ex", ex)
}
}

test("Standard partitions work with classic constructor") {
instantiateClassic(false)
}

test("Three arg constructors have priority") {
assert(3 == instantiateNew(false).argCount,
"Wrong constructor argument count")
}

test("Three arg constructors have priority when dynamic") {
assert(3 == instantiateNew(true).argCount,
"Wrong constructor argument count")
}

test("The protocol must be of the correct class") {
intercept[ClassCastException] {
FileCommitProtocol.instantiate(
classOf[Other].getCanonicalName,
"job",
"path",
false)
}
}

test("If there is no matching constructor, class hierarchy is irrelevant") {
intercept[NoSuchMethodException] {
FileCommitProtocol.instantiate(
classOf[NoMatchingArgs].getCanonicalName,
"job",
"path",
false)
}
}

/**
* Create a classic two-arg protocol instance.
* @param dynamic dyanmic partitioning mode
* @return the instance
*/
private def instantiateClassic(dynamic: Boolean): ClassicConstructorCommitProtocol = {
FileCommitProtocol.instantiate(
classOf[ClassicConstructorCommitProtocol].getCanonicalName,
"job",
"path",
dynamic).asInstanceOf[ClassicConstructorCommitProtocol]
}

/**
* Create a three-arg protocol instance.
* @param dynamic dyanmic partitioning mode
* @return the instance
*/
private def instantiateNew(
dynamic: Boolean): FullConstructorCommitProtocol = {
FileCommitProtocol.instantiate(
classOf[FullConstructorCommitProtocol].getCanonicalName,
"job",
"path",
dynamic).asInstanceOf[FullConstructorCommitProtocol]
}

}

/**
* This protocol implementation does not have the new three-arg
* constructor.
*/
private class ClassicConstructorCommitProtocol(arg1: String, arg2: String)
extends HadoopMapReduceCommitProtocol(arg1, arg2) {
}

/**
* This protocol implementation does have the new three-arg constructor
* alongside the original, and a 4 arg one for completeness.
* The final value of the real constructor is the number of arguments
* used in the 2- and 3- constructor, for test assertions.
*/
private class FullConstructorCommitProtocol(
arg1: String,
arg2: String,
b: Boolean,
val argCount: Int)
extends HadoopMapReduceCommitProtocol(arg1, arg2, b) {

def this(arg1: String, arg2: String) = {
this(arg1, arg2, false, 2)
}

def this(arg1: String, arg2: String, b: Boolean) = {
this(arg1, arg2, false, 3)
}
}

/**
* This has the 2-arity constructor, but isn't the right class.
*/
private class Other(arg1: String, arg2: String) {

}

/**
* This has no matching arguments as well as being the wrong class.
*/
private class NoMatchingArgs() {

}

Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,16 @@ case class KafkaContinuousDataReaderFactory(
startOffset: Long,
kafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] {
failOnDataLoss: Boolean) extends ContinuousDataReaderFactory[UnsafeRow] {

override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[UnsafeRow] = {
val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset]
require(kafkaOffset.topicPartition == topicPartition,
s"Expected topicPartition: $topicPartition, but got: ${kafkaOffset.topicPartition}")
new KafkaContinuousDataReader(
topicPartition, kafkaOffset.partitionOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
}

override def createDataReader(): KafkaContinuousDataReader = {
new KafkaContinuousDataReader(
topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
Expand All @@ -187,8 +196,7 @@ class KafkaContinuousDataReader(
kafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean) extends ContinuousDataReader[UnsafeRow] {
private val consumer =
CachedKafkaConsumer.createUncached(topicPartition.topic, topicPartition.partition, kafkaParams)
private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false)
private val converter = new KafkaRecordToUnsafeRowConverter

private var nextKafkaOffset = startOffset
Expand Down Expand Up @@ -236,6 +244,6 @@ class KafkaContinuousDataReader(
}

override def close(): Unit = {
consumer.close()
consumer.release()
}
}
Loading