Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,16 +459,23 @@ def join(self, other, joinExprs=None, joinType=None):
The following performs a full outer join between ``df1`` and ``df2``.

:param other: Right side of the join
:param joinExprs: Join expression
:param joinExprs: a string for join column name, or a join expression (Column).
If joinExprs is a string indicating the name of the join column,
the column must exist on both sides, and this performs an inner equi-join.
:param joinType: str, default 'inner'.
One of `inner`, `outer`, `left_outer`, `right_outer`, `semijoin`.

>>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()
[Row(name=None, height=80), Row(name=u'Alice', height=None), Row(name=u'Bob', height=85)]

>>> df.join(df2, 'name').select(df.name, df2.height).collect()
[Row(name=u'Bob', height=85)]
"""

if joinExprs is None:
jdf = self._jdf.join(other._jdf)
elif isinstance(joinExprs, basestring):
jdf = self._jdf.join(other._jdf, joinExprs)
else:
assert isinstance(joinExprs, Column), "joinExprs should be Column"
if joinType is None:
Expand Down
37 changes: 37 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,43 @@ class DataFrame private[sql](
Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
}

/**
* Inner equi-join with another [[DataFrame]] using the given column.
*
* Different from other join functions, the join column will only appear once in the output,
* i.e. similar to SQL's `JOIN USING` syntax.
*
* {{{
* // Joining df1 and df2 using the column "user_id"
* df1.join(df2, "user_id")
* }}}
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also state that similar to SQL USING the join column will only appear once in the output.

* Note that if you perform a self-join using this function without aliasing the input
* [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
* there is no way to disambiguate which side of the join you would like to reference.
*
* @param right Right side of the join operation.
* @param usingColumn Name of the column to join on. This column must exist on both sides.
* @group dfops
*/
def join(right: DataFrame, usingColumn: String): DataFrame = {
// Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
// by creating a new instance for one of the branch.
val joined = sqlContext.executePlan(
Join(logicalPlan, right.logicalPlan, joinType = Inner, None)).analyzed.asInstanceOf[Join]

// Project only one of the join column.
val joinedCol = joined.right.resolve(usingColumn)
Project(
joined.output.filterNot(_ == joinedCol),
Join(
joined.left,
joined.right,
joinType = Inner,
Some(EqualTo(joined.left.resolve(usingColumn), joined.right.resolve(usingColumn))))
)
}

/**
* Inner join with another [[DataFrame]], using the given join expression.
*
Expand Down
40 changes: 29 additions & 11 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,6 @@ class DataFrameSuite extends QueryTest {
assert(testData.head(2).head.schema === testData.schema)
}

test("self join") {
val df1 = testData.select(testData("key")).as('df1)
val df2 = testData.select(testData("key")).as('df2)

checkAnswer(
df1.join(df2, $"df1.key" === $"df2.key"),
sql("SELECT a.key, b.key FROM testData a JOIN testData b ON a.key = b.key").collect().toSeq)
}

test("simple explode") {
val df = Seq(Tuple1("a b c"), Tuple1("d e")).toDF("words")

Expand All @@ -127,8 +118,35 @@ class DataFrameSuite extends QueryTest {
)
}

test("self join with aliases") {
val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
test("join - join using") {
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")
val df2 = Seq(1, 2, 3).map(i => (i, (i + 1).toString)).toDF("int", "str")

checkAnswer(
df.join(df2, "int"),
Row(1, "1", "2") :: Row(2, "2", "3") :: Row(3, "3", "4") :: Nil)
}

test("join - join using self join") {
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")

// self join
checkAnswer(
df.join(df, "int"),
Row(1, "1", "1") :: Row(2, "2", "2") :: Row(3, "3", "3") :: Nil)
}

test("join - self join") {
val df1 = testData.select(testData("key")).as('df1)
val df2 = testData.select(testData("key")).as('df2)

checkAnswer(
df1.join(df2, $"df1.key" === $"df2.key"),
sql("SELECT a.key, b.key FROM testData a JOIN testData b ON a.key = b.key").collect().toSeq)
}

test("join - using aliases after self join") {
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")
checkAnswer(
df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("x.str").count(),
Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
Expand Down