diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 378763268acc6..3ab86013f9722 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -624,6 +624,40 @@ class Dataset[T] private[sql]( * Joins * * ****** */ + /** + * Using specified join type to join this [[Dataset]] returning a [[Tuple2]] for each pair + * where `condition` evaluates to true. + * + * @param other Right side of the join. + * @param condition Join expression. + * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. + * @since 1.6.0 + */ + def joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] = { + joinWith(other, Some(condition), joinType) + } + + /** + * Using inner equi-join to join this [[Dataset]] returning a [[Tuple2]] for each pair + * where `condition` evaluates to true. + * + * @param other Right side of the join. + * @param condition Join expression. + * @since 1.6.0 + */ + def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] = { + joinWith(other, Some(condition), "inner") + } + + /** + * Using cartesian inner join to join this [[Dataset]] returning a [[Tuple2]] for each pair + * Note: cartesian joins are very expensive without a filter that can be pushed down. + * + * @param other Right side of the join. + * @since 2.0.0 + */ + def joinWith[U](other: Dataset[U]): Dataset[(T, U)] = joinWith(other, None, "inner") + /** * Joins this [[Dataset]] returning a [[Tuple2]] for each pair where `condition` evaluates to * true. @@ -641,12 +675,19 @@ class Dataset[T] private[sql]( * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. * @since 1.6.0 */ - def joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] = { + protected def joinWith[U]( + other: Dataset[U], + condition: Option[Column], + joinType: String): Dataset[(T, U)] = { val left = this.logicalPlan val right = other.logicalPlan + val expression = { + if (condition.isDefined) Some(condition.get.expr) + else None + } val joined = sqlContext.executePlan(Join(left, right, joinType = - JoinType(joinType), Some(condition.expr))) + JoinType(joinType), expression)) val leftOutput = joined.analyzed.output.take(left.output.length) val rightOutput = joined.analyzed.output.takeRight(right.output.length) @@ -668,18 +709,6 @@ class Dataset[T] private[sql]( } } - /** - * Using inner equi-join to join this [[Dataset]] returning a [[Tuple2]] for each pair - * where `condition` evaluates to true. - * - * @param other Right side of the join. - * @param condition Join expression. - * @since 1.6.0 - */ - def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] = { - joinWith(other, condition, "inner") - } - /* ************************** * * Gather to Driver Actions * * ************************** */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index f9ba60770022d..f3c1ef68320e7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -257,6 +257,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext { ((("b", 2), ("b", 2)), ("b", 2))) } + test("cartesian join") { + val ds1 = Seq(("a", 1), ("b", 2)).toDS() + val ds2 = Seq(("a", 1)).toDS() + + checkAnswer( + ds1.joinWith(ds2), + (("a", 1), ("a", 1)), (("b", 2), ("a", 1))) + } + test("groupBy function, keys") { val ds = Seq(("a", 1), ("b", 1)).toDS() val grouped = ds.groupBy(v => (1, v._2))