Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ private[spark] class ExternalSorter[K, V, C](

private val spills = new ArrayBuffer[SpilledFile]

/**
* Number of files this sorter has spilled so far.
* Exposed for testing.
*/
private[spark] def numSpills: Int = spills.size

override def insertAll(records: Iterator[Product2[K, V]]): Unit = {
// TODO: stop combining if we find that the reduction factor isn't high
val shouldCombine = aggregator.isDefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
override def writeKey[T: ClassTag](key: T): SerializationStream = {
// The key is only needed on the map side when computing partition ids. It does not need to
// be shuffled.
assert(key.isInstanceOf[Int])
assert(null == key || key.isInstanceOf[Int])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't the right thing to do here be to allow nulls as well? In general it's a bad idea to remove assertions

assert(key == null || key.isInstanceOf[Int])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about change the dummy value to a number (-1) instead of null?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I like that better. We can't have a partition ID of -1, whereas null.asInstanceOf[Int] may be confused with the partition ID of 0

this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

package org.apache.spark.sql.execution

import java.io.{DataOutputStream, ByteArrayInputStream, ByteArrayOutputStream}
import java.io.{File, DataOutputStream, ByteArrayInputStream, ByteArrayOutputStream}

import org.apache.spark.SparkFunSuite
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.collection.ExternalSorter
import org.apache.spark.util.Utils
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.types._
import org.apache.spark._


/**
Expand All @@ -40,9 +44,15 @@ class ClosableByteArrayInputStream(buf: Array[Byte]) extends ByteArrayInputStrea
class UnsafeRowSerializerSuite extends SparkFunSuite {

private def toUnsafeRow(row: Row, schema: Array[DataType]): UnsafeRow = {
val internalRow = CatalystTypeConverters.convertToCatalyst(row).asInstanceOf[InternalRow]
val converter = unsafeRowConverter(schema)
converter(row)
}

private def unsafeRowConverter(schema: Array[DataType]): Row => UnsafeRow = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method seems strictly unnecessary... we can just remove it in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually UnsafeProjection.create(schema) will do the codegen stuff, and this causes long time if we have to generate the large mount of UnsafeRows.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we can just inline it in toUnsafeRow. There's no reason why it needs to be its own method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I got your mean, if we inline that in toUnsafeRow, then for every call of toUnsafeRow, we will get a new instance of Converter according to the schema, this is actually very expensive, as it's codegen internally for creating the converter instance.

Probably we'd better to remove the function toUnsafeRow in the future, since it's always cause performance problem, and people even not notice that.

val converter = UnsafeProjection.create(schema)
converter.apply(internalRow)
(row: Row) => {
converter(CatalystTypeConverters.convertToCatalyst(row).asInstanceOf[InternalRow])
}
}

test("toUnsafeRow() test helper method") {
Expand Down Expand Up @@ -87,4 +97,50 @@ class UnsafeRowSerializerSuite extends SparkFunSuite {
assert(!deserializerIter.hasNext)
assert(input.closed)
}

test("SPARK-10466: external sorter spilling with unsafe row serializer") {
var sc: SparkContext = null
var outputFile: File = null
val oldEnv = SparkEnv.get // save the old SparkEnv, as it will be overwritten
Utils.tryWithSafeFinally {
val conf = new SparkConf()
.set("spark.shuffle.spill.initialMemoryThreshold", "1024")
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
.set("spark.shuffle.memoryFraction", "0.0001")

sc = new SparkContext("local", "test", conf)
outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "")
// prepare data
val converter = unsafeRowConverter(Array(IntegerType))
val data = (1 to 1000).iterator.map { i =>
(i, converter(Row(i)))
}
val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow](
partitioner = Some(new HashPartitioner(10)),
serializer = Some(new UnsafeRowSerializer(numFields = 1)))

// Ensure we spilled something and have to merge them later
assert(sorter.numSpills === 0)
sorter.insertAll(data)
assert(sorter.numSpills > 0)

// Merging spilled files should not throw assertion error
val taskContext =
new TaskContextImpl(0, 0, 0, 0, null, null, InternalAccumulator.create(sc))
taskContext.taskMetrics.shuffleWriteMetrics = Some(new ShuffleWriteMetrics)
sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0), taskContext, outputFile)
} {
// Clean up
if (sc != null) {
sc.stop()
}

// restore the spark env
SparkEnv.set(oldEnv)

if (outputFile != null) {
outputFile.delete()
}
}
}
}