Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
Expand Down Expand Up @@ -521,4 +522,66 @@ public long getKeyPrefix() {
return upstream.getKeyPrefix();
}
}

/**
* Returns a iterator, which will return the rows in the order as inserted.
*
* It is the caller's responsibility to call `cleanupResources()`
* after consuming this iterator.
*/
public UnsafeSorterIterator getIterator() throws IOException {
if (spillWriters.isEmpty()) {
assert(inMemSorter != null);
return inMemSorter.getIterator();
} else {
LinkedList<UnsafeSorterIterator> queue = new LinkedList<>();
for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
queue.add(spillWriter.getReader(blockManager));
}
if (inMemSorter != null) {
queue.add(inMemSorter.getIterator());
}
return new ChainedIterator(queue);
}
}

/**
* Chain multiple UnsafeSorterIterator together as single one.
*/
class ChainedIterator extends UnsafeSorterIterator {

private final Queue<UnsafeSorterIterator> iterators;
private UnsafeSorterIterator current;

public ChainedIterator(Queue<UnsafeSorterIterator> iterators) {
assert iterators.size() > 0;
this.iterators = iterators;
this.current = iterators.remove();
}

@Override
public boolean hasNext() {
while (!current.hasNext() && !iterators.isEmpty()) {
current = iterators.remove();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesnt work if iterators contain empty iterators. Fix or assert that can't be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It checked that the iterators is not empty

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not what i mean.

If iterators contains an empty one. So iterators is:
(1, 2) : empty : (3, 4)

When you move to the second iterator (current is empty) you will stop and not iterate over the iterator containing (3,4)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, thanks, will fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For UnsafeExternalSorter, it's not possible to have an empty iterator in the middle, they are spilled files. It's still good to be defensive for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea. I figured it would not be empty but I agree about being defensive. If the implementation of UnsafeExternalSorter changes, we don't want to debug this.

}
return current.hasNext();
}

@Override
public void loadNext() throws IOException {
current.loadNext();
}

@Override
public Object getBaseObject() { return current.getBaseObject(); }

@Override
public long getBaseOffset() { return current.getBaseOffset(); }

@Override
public int getRecordLength() { return current.getRecordLength(); }

@Override
public long getKeyPrefix() { return current.getKeyPrefix(); }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,11 @@ public SortedIterator getSortedIterator() {
sorter.sort(array, 0, pos / 2, sortComparator);
return new SortedIterator(pos / 2);
}

/**
* Returns an iterator over record pointers in original order (inserted).
*/
public SortedIterator getIterator() {
return new SortedIterator(pos / 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,75 @@

package org.apache.spark.sql.execution.joins

import org.apache.spark.rdd.RDD
import org.apache.spark._
import org.apache.spark.rdd.{CartesianPartition, CartesianRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow}
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter


/**
* An optimized CartesianRDD for UnsafeRow, which will cache the rows from second child RDD,
* will be much faster than building the right partition for every row in left RDD, it also
* materialize the right RDD (in case of the right RDD is nondeterministic).
*/
private[spark]
class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numFieldsOfRight: Int)
extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) {

override def compute(split: Partition, context: TaskContext): Iterator[(UnsafeRow, UnsafeRow)] = {
// We will not sort the rows, so prefixComparator and recordComparator are null.
val sorter = UnsafeExternalSorter.create(
context.taskMemoryManager(),
SparkEnv.get.blockManager,
context,
null,
null,
1024,
SparkEnv.get.memoryManager.pageSizeBytes)

val partition = split.asInstanceOf[CartesianPartition]
for (y <- rdd2.iterator(partition.s2, context)) {
sorter.insertRecord(y.getBaseObject, y.getBaseOffset, y.getSizeInBytes, 0)
}

// Create an iterator from sorter and wrapper it as Iterator[UnsafeRow]
def createIter(): Iterator[UnsafeRow] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to move this into sorter? Seems other calls might want this.

Something like UnsafeRowSorter.unsafeRowIterator().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use UnsafeExternalSorter here, it does not known the meaning of pointers. Or we should use UnsafeExternalRowSorter, it need more changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Ok.

val iter = sorter.getIterator
val unsafeRow = new UnsafeRow
new Iterator[UnsafeRow] {
override def hasNext: Boolean = {
iter.hasNext
}
override def next(): UnsafeRow = {
iter.loadNext()
unsafeRow.pointTo(iter.getBaseObject, iter.getBaseOffset, numFieldsOfRight,
iter.getRecordLength)
unsafeRow
}
}
}

val resultIter =
for (x <- rdd1.iterator(partition.s1, context);
y <- createIter()) yield (x, y)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the UnsafeExternalSorter preserve records order if it spills?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and we may also need to update CartesianProduct strategy to put smaller child at right side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed it in #7417, right now it's not clear that which metric could be used as the size of table, that could be another story.

Even the right table is larger than left, this approach is still much better than current one (building the partition is usually much expensive than loading them from memory or disk), it also fix another problem that the right table could be nondeterministic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan For the first question, yes.

CompletionIterator[(UnsafeRow, UnsafeRow), Iterator[(UnsafeRow, UnsafeRow)]](
resultIter, sorter.cleanupResources)
}
}


case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output ++ right.output

override def canProcessSafeRows: Boolean = false
override def canProcessUnsafeRows: Boolean = true
override def outputsUnsafeRows: Boolean = true

override private[sql] lazy val metrics = Map(
"numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
"numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
Expand All @@ -39,18 +98,19 @@ case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNod

val leftResults = left.execute().map { row =>
numLeftRows += 1
row.copy()
row.asInstanceOf[UnsafeRow]
}
val rightResults = right.execute().map { row =>
numRightRows += 1
row.copy()
row.asInstanceOf[UnsafeRow]
}

leftResults.cartesian(rightResults).mapPartitionsInternal { iter =>
val joinedRow = new JoinedRow
val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size)
pair.mapPartitionsInternal { iter =>
val joiner = GenerateUnsafeRowJoiner.create(left.schema, right.schema)
iter.map { r =>
numOutputRows += 1
joinedRow(r._1, r._2)
joiner.join(r._1, r._2)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
testSparkPlanMetrics(df, 1, Map(
1L -> ("CartesianProduct", Map(
"number of left rows" -> 12L, // left needs to be scanned twice
"number of right rows" -> 12L, // right is read 6 times
"number of right rows" -> 4L, // right is read twice
"number of output rows" -> 12L)))
)
}
Expand Down