Skip to content

Commit c13a2cb

Browse files
committed
Merge branch 'master' of github.com:apache/spark into submit-driver-extra
2 parents 8e552b7 + 47ccd5e commit c13a2cb

File tree

76 files changed

+1378
-481
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+1378
-481
lines changed

bin/beeline

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,14 @@
1717
# limitations under the License.
1818
#
1919

20-
# Figure out where Spark is installed
21-
FWDIR="$(cd `dirname $0`/..; pwd)"
20+
#
21+
# Shell script for starting BeeLine
2222

23-
# Find the java binary
24-
if [ -n "${JAVA_HOME}" ]; then
25-
RUNNER="${JAVA_HOME}/bin/java"
26-
else
27-
if [ `command -v java` ]; then
28-
RUNNER="java"
29-
else
30-
echo "JAVA_HOME is not set" >&2
31-
exit 1
32-
fi
33-
fi
23+
# Enter posix mode for bash
24+
set -o posix
3425

35-
# Compute classpath using external script
36-
classpath_output=$($FWDIR/bin/compute-classpath.sh)
37-
if [[ "$?" != "0" ]]; then
38-
echo "$classpath_output"
39-
exit 1
40-
else
41-
CLASSPATH=$classpath_output
42-
fi
26+
# Figure out where Spark is installed
27+
FWDIR="$(cd `dirname $0`/..; pwd)"
4328

4429
CLASS="org.apache.hive.beeline.BeeLine"
45-
exec "$RUNNER" -cp "$CLASSPATH" $CLASS "$@"
30+
exec "$FWDIR/bin/spark-class" $CLASS "$@"

bin/spark-sql

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,72 @@
2323
# Enter posix mode for bash
2424
set -o posix
2525

26+
CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
27+
2628
# Figure out where Spark is installed
2729
FWDIR="$(cd `dirname $0`/..; pwd)"
2830

29-
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
30-
echo "Usage: ./sbin/spark-sql [options]"
31+
function usage {
32+
echo "Usage: ./sbin/spark-sql [options] [cli option]"
33+
pattern="usage"
34+
pattern+="\|Spark assembly has been built with Hive"
35+
pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
36+
pattern+="\|Spark Command: "
37+
pattern+="\|--help"
38+
pattern+="\|======="
39+
3140
$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
41+
echo
42+
echo "CLI options:"
43+
$FWDIR/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
44+
}
45+
46+
function ensure_arg_number {
47+
arg_number=$1
48+
at_least=$2
49+
50+
if [[ $arg_number -lt $at_least ]]; then
51+
usage
52+
exit 1
53+
fi
54+
}
55+
56+
if [[ "$@" = --help ]] || [[ "$@" = -h ]]; then
57+
usage
3258
exit 0
3359
fi
3460

35-
CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
36-
exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@
61+
CLI_ARGS=()
62+
SUBMISSION_ARGS=()
63+
64+
while (($#)); do
65+
case $1 in
66+
-d | --define | --database | -f | -h | --hiveconf | --hivevar | -i | -p)
67+
ensure_arg_number $# 2
68+
CLI_ARGS+=($1); shift
69+
CLI_ARGS+=($1); shift
70+
;;
71+
72+
-e)
73+
ensure_arg_number $# 2
74+
CLI_ARGS+=($1); shift
75+
CLI_ARGS+=(\"$1\"); shift
76+
;;
77+
78+
-s | --silent)
79+
CLI_ARGS+=($1); shift
80+
;;
81+
82+
-v | --verbose)
83+
# Both SparkSubmit and SparkSQLCLIDriver recognizes -v | --verbose
84+
CLI_ARGS+=($1)
85+
SUBMISSION_ARGS+=($1); shift
86+
;;
87+
88+
*)
89+
SUBMISSION_ARGS+=($1); shift
90+
;;
91+
esac
92+
done
93+
94+
eval exec "$FWDIR"/bin/spark-submit --class $CLASS ${SUBMISSION_ARGS[*]} spark-internal ${CLI_ARGS[*]}

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
220220
/** Fill in values by parsing user options. */
221221
private def parseOpts(opts: Seq[String]): Unit = {
222222
var inSparkOpts = true
223+
val EQ_SEPARATED_OPT="""(--[^=]+)=(.+)""".r
223224

224225
// Delineates parsing of Spark options from parsing of user options.
225226
parse(opts)
@@ -322,33 +323,21 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
322323
verbose = true
323324
parse(tail)
324325

326+
case EQ_SEPARATED_OPT(opt, value) :: tail =>
327+
parse(opt :: value :: tail)
328+
329+
case value :: tail if value.startsWith("-") =>
330+
SparkSubmit.printErrorAndExit(s"Unrecognized option '$value'.")
331+
325332
case value :: tail =>
326-
if (inSparkOpts) {
327-
value match {
328-
// convert --foo=bar to --foo bar
329-
case v if v.startsWith("--") && v.contains("=") && v.split("=").size == 2 =>
330-
val parts = v.split("=")
331-
parse(Seq(parts(0), parts(1)) ++ tail)
332-
case v if v.startsWith("-") =>
333-
val errMessage = s"Unrecognized option '$value'."
334-
SparkSubmit.printErrorAndExit(errMessage)
335-
case v =>
336-
primaryResource =
337-
if (!SparkSubmit.isShell(v) && !SparkSubmit.isInternal(v)) {
338-
Utils.resolveURI(v).toString
339-
} else {
340-
v
341-
}
342-
inSparkOpts = false
343-
isPython = SparkSubmit.isPython(v)
344-
parse(tail)
333+
primaryResource =
334+
if (!SparkSubmit.isShell(value) && !SparkSubmit.isInternal(value)) {
335+
Utils.resolveURI(value).toString
336+
} else {
337+
value
345338
}
346-
} else {
347-
if (!value.isEmpty) {
348-
childArgs += value
349-
}
350-
parse(tail)
351-
}
339+
isPython = SparkSubmit.isPython(value)
340+
childArgs ++= tail
352341

353342
case Nil =>
354343
}

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,10 @@ class ShuffleWriteMetrics extends Serializable {
190190
/**
191191
* Number of bytes written for the shuffle by this task
192192
*/
193-
var shuffleBytesWritten: Long = _
193+
@volatile var shuffleBytesWritten: Long = _
194194

195195
/**
196196
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
197197
*/
198-
var shuffleWriteTime: Long = _
198+
@volatile var shuffleWriteTime: Long = _
199199
}

core/src/main/scala/org/apache/spark/network/BufferMessage.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId:
4848
val security = if (isSecurityNeg) 1 else 0
4949
if (size == 0 && !gotChunkForSendingOnce) {
5050
val newChunk = new MessageChunk(
51-
new MessageChunkHeader(typ, id, 0, 0, ackId, security, senderAddress), null)
51+
new MessageChunkHeader(typ, id, 0, 0, ackId, hasError, security, senderAddress), null)
5252
gotChunkForSendingOnce = true
5353
return Some(newChunk)
5454
}
@@ -66,7 +66,8 @@ class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId:
6666
}
6767
buffer.position(buffer.position + newBuffer.remaining)
6868
val newChunk = new MessageChunk(new MessageChunkHeader(
69-
typ, id, size, newBuffer.remaining, ackId, security, senderAddress), newBuffer)
69+
typ, id, size, newBuffer.remaining, ackId,
70+
hasError, security, senderAddress), newBuffer)
7071
gotChunkForSendingOnce = true
7172
return Some(newChunk)
7273
}
@@ -88,7 +89,7 @@ class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId:
8889
val newBuffer = buffer.slice().limit(chunkSize).asInstanceOf[ByteBuffer]
8990
buffer.position(buffer.position + newBuffer.remaining)
9091
val newChunk = new MessageChunk(new MessageChunkHeader(
91-
typ, id, size, newBuffer.remaining, ackId, security, senderAddress), newBuffer)
92+
typ, id, size, newBuffer.remaining, ackId, hasError, security, senderAddress), newBuffer)
9293
return Some(newChunk)
9394
}
9495
None

0 commit comments

Comments
 (0)