Skip to content

Commit 7e0f218

Browse files
committed
Merge remote-tracking branch 'upstream/master' into migrate-ddl-describe
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
2 parents 838f701 + c00744e commit 7e0f218

File tree

85 files changed

+1165
-1037
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+1165
-1037
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
143143
// Creating the file to write to and creating a disk writer both involve interacting with
144144
// the disk, and can take a long time in aggregate when we open many files, so should be
145145
// included in the shuffle write time.
146-
writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);
146+
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
147147

148148
while (records.hasNext()) {
149149
final Product2<K, V> record = records.next();
@@ -203,7 +203,7 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
203203
threwException = false;
204204
} finally {
205205
Closeables.close(out, threwException);
206-
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
206+
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
207207
}
208208
partitionWriters = null;
209209
return lengths;

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,8 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
233233
// Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
234234
// Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
235235
// This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
236-
writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
237-
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten());
236+
writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten());
237+
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten());
238238
}
239239
}
240240

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -298,8 +298,8 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti
298298
// final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
299299
// to be counted as shuffle write, but this will lead to double-counting of the final
300300
// SpillInfo's bytes.
301-
writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length());
302-
writeMetrics.incShuffleBytesWritten(outputFile.length());
301+
writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
302+
writeMetrics.incBytesWritten(outputFile.length());
303303
return partitionLengths;
304304
}
305305
} catch (IOException e) {
@@ -411,7 +411,7 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
411411
spillInputChannelPositions[i] += actualBytesTransferred;
412412
bytesToTransfer -= actualBytesTransferred;
413413
}
414-
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
414+
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
415415
bytesWrittenToMergedFile += partitionLengthInSpill;
416416
partitionLengths[partition] += partitionLengthInSpill;
417417
}

core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,34 +42,34 @@ public TimeTrackingOutputStream(ShuffleWriteMetrics writeMetrics, OutputStream o
4242
public void write(int b) throws IOException {
4343
final long startTime = System.nanoTime();
4444
outputStream.write(b);
45-
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
45+
writeMetrics.incWriteTime(System.nanoTime() - startTime);
4646
}
4747

4848
@Override
4949
public void write(byte[] b) throws IOException {
5050
final long startTime = System.nanoTime();
5151
outputStream.write(b);
52-
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
52+
writeMetrics.incWriteTime(System.nanoTime() - startTime);
5353
}
5454

5555
@Override
5656
public void write(byte[] b, int off, int len) throws IOException {
5757
final long startTime = System.nanoTime();
5858
outputStream.write(b, off, len);
59-
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
59+
writeMetrics.incWriteTime(System.nanoTime() - startTime);
6060
}
6161

6262
@Override
6363
public void flush() throws IOException {
6464
final long startTime = System.nanoTime();
6565
outputStream.flush();
66-
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
66+
writeMetrics.incWriteTime(System.nanoTime() - startTime);
6767
}
6868

6969
@Override
7070
public void close() throws IOException {
7171
final long startTime = System.nanoTime();
7272
outputStream.close();
73-
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
73+
writeMetrics.incWriteTime(System.nanoTime() - startTime);
7474
}
7575
}

core/src/main/scala/org/apache/spark/Accumulators.scala renamed to core/src/main/scala/org/apache/spark/Accumulable.scala

Lines changed: 3 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,12 @@ package org.apache.spark
2020
import java.io.{ObjectInputStream, Serializable}
2121

2222
import scala.collection.generic.Growable
23-
import scala.collection.Map
24-
import scala.collection.mutable
25-
import scala.ref.WeakReference
2623
import scala.reflect.ClassTag
2724

2825
import org.apache.spark.serializer.JavaSerializer
2926
import org.apache.spark.util.Utils
3027

28+
3129
/**
3230
* A data type that can be accumulated, ie has an commutative and associative "add" operation,
3331
* but where the result type, `R`, may be different from the element type being added, `T`.
@@ -166,6 +164,7 @@ class Accumulable[R, T] private[spark] (
166164
override def toString: String = if (value_ == null) "null" else value_.toString
167165
}
168166

167+
169168
/**
170169
* Helper object defining how to accumulate values of a particular type. An implicit
171170
* AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type.
@@ -201,6 +200,7 @@ trait AccumulableParam[R, T] extends Serializable {
201200
def zero(initialValue: R): R
202201
}
203202

203+
204204
private[spark] class
205205
GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
206206
extends AccumulableParam[R, T] {
@@ -224,176 +224,3 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
224224
copy
225225
}
226226
}
227-
228-
/**
229-
* A simpler value of [[Accumulable]] where the result type being accumulated is the same
230-
* as the types of elements being merged, i.e. variables that are only "added" to through an
231-
* associative operation and can therefore be efficiently supported in parallel. They can be used
232-
* to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric
233-
* value types, and programmers can add support for new types.
234-
*
235-
* An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
236-
* Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.
237-
* However, they cannot read its value. Only the driver program can read the accumulator's value,
238-
* using its value method.
239-
*
240-
* The interpreter session below shows an accumulator being used to add up the elements of an array:
241-
*
242-
* {{{
243-
* scala> val accum = sc.accumulator(0)
244-
* accum: spark.Accumulator[Int] = 0
245-
*
246-
* scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
247-
* ...
248-
* 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
249-
*
250-
* scala> accum.value
251-
* res2: Int = 10
252-
* }}}
253-
*
254-
* @param initialValue initial value of accumulator
255-
* @param param helper object defining how to add elements of type `T`
256-
* @tparam T result type
257-
*/
258-
class Accumulator[T] private[spark] (
259-
@transient private[spark] val initialValue: T,
260-
param: AccumulatorParam[T],
261-
name: Option[String],
262-
internal: Boolean)
263-
extends Accumulable[T, T](initialValue, param, name, internal) {
264-
265-
def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
266-
this(initialValue, param, name, false)
267-
}
268-
269-
def this(initialValue: T, param: AccumulatorParam[T]) = {
270-
this(initialValue, param, None, false)
271-
}
272-
}
273-
274-
/**
275-
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
276-
* in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
277-
* available when you create Accumulators of a specific type.
278-
*
279-
* @tparam T type of value to accumulate
280-
*/
281-
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
282-
def addAccumulator(t1: T, t2: T): T = {
283-
addInPlace(t1, t2)
284-
}
285-
}
286-
287-
object AccumulatorParam {
288-
289-
// The following implicit objects were in SparkContext before 1.2 and users had to
290-
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
291-
// them automatically. However, as there are duplicate codes in SparkContext for backward
292-
// compatibility, please update them accordingly if you modify the following implicit objects.
293-
294-
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
295-
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
296-
def zero(initialValue: Double): Double = 0.0
297-
}
298-
299-
implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
300-
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
301-
def zero(initialValue: Int): Int = 0
302-
}
303-
304-
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
305-
def addInPlace(t1: Long, t2: Long): Long = t1 + t2
306-
def zero(initialValue: Long): Long = 0L
307-
}
308-
309-
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
310-
def addInPlace(t1: Float, t2: Float): Float = t1 + t2
311-
def zero(initialValue: Float): Float = 0f
312-
}
313-
314-
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
315-
}
316-
317-
// TODO: The multi-thread support in accumulators is kind of lame; check
318-
// if there's a more intuitive way of doing it right
319-
private[spark] object Accumulators extends Logging {
320-
/**
321-
* This global map holds the original accumulator objects that are created on the driver.
322-
* It keeps weak references to these objects so that accumulators can be garbage-collected
323-
* once the RDDs and user-code that reference them are cleaned up.
324-
*/
325-
val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
326-
327-
private var lastId: Long = 0
328-
329-
def newId(): Long = synchronized {
330-
lastId += 1
331-
lastId
332-
}
333-
334-
def register(a: Accumulable[_, _]): Unit = synchronized {
335-
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
336-
}
337-
338-
def remove(accId: Long) {
339-
synchronized {
340-
originals.remove(accId)
341-
}
342-
}
343-
344-
// Add values to the original accumulators with some given IDs
345-
def add(values: Map[Long, Any]): Unit = synchronized {
346-
for ((id, value) <- values) {
347-
if (originals.contains(id)) {
348-
// Since we are now storing weak references, we must check whether the underlying data
349-
// is valid.
350-
originals(id).get match {
351-
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
352-
case None =>
353-
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
354-
}
355-
} else {
356-
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
357-
}
358-
}
359-
}
360-
361-
}
362-
363-
private[spark] object InternalAccumulator {
364-
val PEAK_EXECUTION_MEMORY = "peakExecutionMemory"
365-
val TEST_ACCUMULATOR = "testAccumulator"
366-
367-
// For testing only.
368-
// This needs to be a def since we don't want to reuse the same accumulator across stages.
369-
private def maybeTestAccumulator: Option[Accumulator[Long]] = {
370-
if (sys.props.contains("spark.testing")) {
371-
Some(new Accumulator(
372-
0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true))
373-
} else {
374-
None
375-
}
376-
}
377-
378-
/**
379-
* Accumulators for tracking internal metrics.
380-
*
381-
* These accumulators are created with the stage such that all tasks in the stage will
382-
* add to the same set of accumulators. We do this to report the distribution of accumulator
383-
* values across all tasks within each stage.
384-
*/
385-
def create(sc: SparkContext): Seq[Accumulator[Long]] = {
386-
val internalAccumulators = Seq(
387-
// Execution memory refers to the memory used by internal data structures created
388-
// during shuffles, aggregations and joins. The value of this accumulator should be
389-
// approximately the sum of the peak sizes across all such data structures created
390-
// in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
391-
new Accumulator(
392-
0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
393-
) ++ maybeTestAccumulator.toSeq
394-
internalAccumulators.foreach { accumulator =>
395-
sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
396-
}
397-
internalAccumulators
398-
}
399-
}

0 commit comments

Comments
 (0)