Skip to content

Commit c79ec51

Browse files
committed
[SPARK-13366][SQL] Support Cartesian join for Datasets
1 parent 5f37aad commit c79ec51

File tree

2 files changed

+52
-14
lines changed

2 files changed

+52
-14
lines changed

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,40 @@ class Dataset[T] private[sql](
624624
* Joins *
625625
* ****** */
626626

627+
/**
628+
* Using specified join type to join this [[Dataset]] returning a [[Tuple2]] for each pair
629+
* where `condition` evaluates to true.
630+
*
631+
* @param other Right side of the join.
632+
* @param condition Join expression.
633+
* @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
634+
* @since 1.6.0
635+
*/
636+
def joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] = {
637+
joinWith(other, Some(condition), joinType)
638+
}
639+
640+
/**
641+
* Using inner equi-join to join this [[Dataset]] returning a [[Tuple2]] for each pair
642+
* where `condition` evaluates to true.
643+
*
644+
* @param other Right side of the join.
645+
* @param condition Join expression.
646+
* @since 1.6.0
647+
*/
648+
def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] = {
649+
joinWith(other, Some(condition), "inner")
650+
}
651+
652+
/**
653+
* Using cartesian inner join to join this [[Dataset]] returning a [[Tuple2]] for each pair
654+
* Note: cartesian joins are very expensive without a filter that can be pushed down.
655+
*
656+
* @param other Right side of the join.
657+
* @since 2.0.0
658+
*/
659+
def joinWith[U](other: Dataset[U]): Dataset[(T, U)] = joinWith(other, None, "inner")
660+
627661
/**
628662
* Joins this [[Dataset]] returning a [[Tuple2]] for each pair where `condition` evaluates to
629663
* true.
@@ -641,12 +675,19 @@ class Dataset[T] private[sql](
641675
* @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
642676
* @since 1.6.0
643677
*/
644-
def joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] = {
678+
protected def joinWith[U](
679+
other: Dataset[U],
680+
condition: Option[Column],
681+
joinType: String): Dataset[(T, U)] = {
645682
val left = this.logicalPlan
646683
val right = other.logicalPlan
684+
val expression = {
685+
if (condition.isDefined) Some(condition.get.expr)
686+
else None
687+
}
647688

648689
val joined = sqlContext.executePlan(Join(left, right, joinType =
649-
JoinType(joinType), Some(condition.expr)))
690+
JoinType(joinType), expression))
650691
val leftOutput = joined.analyzed.output.take(left.output.length)
651692
val rightOutput = joined.analyzed.output.takeRight(right.output.length)
652693

@@ -668,18 +709,6 @@ class Dataset[T] private[sql](
668709
}
669710
}
670711

671-
/**
672-
* Using inner equi-join to join this [[Dataset]] returning a [[Tuple2]] for each pair
673-
* where `condition` evaluates to true.
674-
*
675-
* @param other Right side of the join.
676-
* @param condition Join expression.
677-
* @since 1.6.0
678-
*/
679-
def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] = {
680-
joinWith(other, condition, "inner")
681-
}
682-
683712
/* ************************** *
684713
* Gather to Driver Actions *
685714
* ************************** */

sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
257257
((("b", 2), ("b", 2)), ("b", 2)))
258258
}
259259

260+
test("cartesian join") {
261+
val ds1 = Seq(("a", 1), ("b", 2)).toDS()
262+
val ds2 = Seq(("a", 1)).toDS()
263+
264+
checkAnswer(
265+
ds1.joinWith(ds2),
266+
(("a", 1), ("a", 1)), (("b", 2), ("a", 1)))
267+
}
268+
260269
test("groupBy function, keys") {
261270
val ds = Seq(("a", 1), ("b", 1)).toDS()
262271
val grouped = ds.groupBy(v => (1, v._2))

0 commit comments

Comments
 (0)