Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate w

override def children: Seq[Expression] = Seq(child)

override def nullable: Boolean = false
override def nullable: Boolean = true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes SPARK-12335.


override def dataType: DataType = DoubleType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.expressions.aggregate
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.types._

/**
Expand All @@ -42,7 +41,7 @@ case class Corr(

override def children: Seq[Expression] = Seq(left, right)

override def nullable: Boolean = false
override def nullable: Boolean = true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes SPARK-12342.


override def dataType: DataType = DoubleType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
protected def bind(in: Seq[Expression], inputSchema: Seq[Attribute]): Seq[Expression] =
in.map(BindReferences.bindReference(_, inputSchema))

// Make Mutablility optional...
// Make Mutability optional...
protected def create(expressions: Seq[Expression]): Projection = {
val ctx = newCodeGenContext()
val columns = expressions.zipWithIndex.map {
Expand All @@ -65,7 +65,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
"""
}.mkString("\n")

val getCases = (0 until expressions.size).map { i =>
val getCases = expressions.indices.map { i =>
s"case $i: return c$i;"
}.mkString("\n")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,10 @@ object StructType extends AbstractDataType {
case leftField @ StructField(leftName, leftType, leftNullable, _) =>
rightMapped.get(leftName)
.map { case rightField @ StructField(_, rightType, rightNullable, _) =>
leftField.copy(
dataType = merge(leftType, rightType),
nullable = leftNullable || rightNullable)
}
leftField.copy(
dataType = merge(leftType, rightType),
nullable = leftNullable || rightNullable)
}
.orElse(Some(leftField))
.foreach(newFields += _)
}
Expand Down
17 changes: 9 additions & 8 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -499,10 +499,16 @@ class DataFrame private[sql](
// Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
// by creating a new instance for one of the branch.
val joined = sqlContext.executePlan(
Join(logicalPlan, right.logicalPlan, joinType = Inner, None)).analyzed.asInstanceOf[Join]
Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), None)
).analyzed.asInstanceOf[Join]

// Project only one of the join columns.
val joinedCols = usingColumns.map(col => withPlan(joined.right).resolve(col))
//
// SPARK-12336: For outer joins, attributes of at least one child plan output will be forced to
// be nullable. An `AttributeSet` is necessary so that we are not affected by different
// nullability values.
val joinedCols = AttributeSet(usingColumns.map(col => withPlan(joined.right).resolve(col)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we rename this to joinedColsFromRight?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's OK. The first line of the comment above already explained the purpose of this variable.


val condition = usingColumns.map { col =>
catalyst.expressions.EqualTo(
withPlan(joined.left).resolve(col),
Expand All @@ -514,12 +520,7 @@ class DataFrame private[sql](
withPlan {
Project(
joined.output.filterNot(joinedCols.contains(_)),
Join(
joined.left,
joined.right,
joinType = JoinType(joinType),
condition)
)
joined.copy(condition = condition))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes in this file fixes SPARK-12336.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class DescribeCommand(
new MetadataBuilder().putString("comment", "name of the column").build())(),
AttributeReference("data_type", StringType, nullable = false,
new MetadataBuilder().putString("comment", "data type of the column").build())(),
AttributeReference("comment", StringType, nullable = false,
AttributeReference("comment", StringType, nullable = true,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes SPARK-12341.

new MetadataBuilder().putString("comment", "comment of the column").build())()
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,19 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
}

test("join - join using multiple columns and specifying join type") {
val df = Seq(1, 2, 3).map(i => (i, i + 1, i.toString)).toDF("int", "int2", "str")
val df1 = Seq(1, 2, 3).map(i => (i, i + 1, i.toString)).toDF("int", "int2", "str")
val df2 = Seq(1, 2, 3).map(i => (i, i + 1, (i + 1).toString)).toDF("int", "int2", "str")

val join1 = df1.join(df2, Seq("int", "str"), "left")
assert(join1.schema.map(_.nullable) === Seq(false, false, true, true))
checkAnswer(
df.join(df2, Seq("int", "str"), "left"),
join1,
Row(1, 2, "1", null) :: Row(2, 3, "2", null) :: Row(3, 4, "3", null) :: Nil)

val join2 = df1.join(df2, Seq("int", "str"), "right")
assert(join2.schema.map(_.nullable) === Seq(true, true, true, false))
checkAnswer(
df.join(df2, Seq("int", "str"), "right"),
join2,
Row(null, null, null, 2) :: Row(null, null, null, 3) :: Row(null, null, null, 4) :: Nil)
}

Expand Down