Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,11 @@ def join(self, other, on=None, how=None):
if on is None or len(on) == 0:
jdf = self._jdf.join(other._jdf)
elif isinstance(on[0], basestring):
jdf = self._jdf.join(other._jdf, self._jseq(on))
if how is None:
jdf = self._jdf.join(other._jdf, self._jseq(on), "inner")
else:
assert isinstance(how, basestring), "how should be basestring"
jdf = self._jdf.join(other._jdf, self._jseq(on), how)
else:
assert isinstance(on[0], Column), "on should be Column or list of Column"
if len(on) > 1:
Expand Down
22 changes: 21 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,26 @@ class DataFrame private[sql](
* @since 1.4.0
*/
def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = {
join(right, usingColumns, "inner")
}

/**
* Equi-join with another [[DataFrame]] using the given columns.
*
* Different from other join functions, the join columns will only appear once in the output,
* i.e. similar to SQL's `JOIN USING` syntax.
*
* Note that if you perform a self-join using this function without aliasing the input
* [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
* there is no way to disambiguate which side of the join you would like to reference.
*
* @param right Right side of the join operation.
* @param usingColumns Names of the columns to join on. This columns must exist on both sides.
* @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
* @group dfops
* @since 1.6.0
*/
def join(right: DataFrame, usingColumns: Seq[String], joinType: String): DataFrame = {
// Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
// by creating a new instance for one of the branch.
val joined = sqlContext.executePlan(
Expand All @@ -502,7 +522,7 @@ class DataFrame private[sql](
Join(
joined.left,
joined.right,
joinType = Inner,
joinType = JoinType(joinType),
condition)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
Row(1, 2, "1", "2") :: Row(2, 3, "2", "3") :: Row(3, 4, "3", "4") :: Nil)
}

test("join - join using multiple columns and specifying join type") {
val df = Seq(1, 2, 3).map(i => (i, i + 1, i.toString)).toDF("int", "int2", "str")
val df2 = Seq(1, 2, 3).map(i => (i, i + 1, (i + 1).toString)).toDF("int", "int2", "str")

checkAnswer(
df.join(df2, Seq("int", "str"), "left"),
Row(1, 2, "1", null) :: Row(2, 3, "2", null) :: Row(3, 4, "3", null) :: Nil)

checkAnswer(
df.join(df2, Seq("int", "str"), "right"),
Row(null, null, null, 2) :: Row(null, null, null, 3) :: Row(null, null, null, 4) :: Nil)
}

test("join - join using self join") {
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")

Expand Down