Skip to content

Conversation

@yhuai
Copy link
Contributor

@yhuai yhuai commented Aug 3, 2015

https://issues.apache.org/jira/browse/SPARK-7871

This PR adds the concept of nullSafe to ClusteredDistribution and HashPartitioning. For a ClusteredDistribution, if its nullSafe field is false, it does not require all rows whose clustering expressions have nulls be clustered. For a HashPartitioning, if its nullSafe field is false, it does not guarantee that rows whose clustering expressions have nulls be clustered.

This concept can be used with equal joins. A shuffled equal join operator (ShuffledHashJoin, ShuffledHashOuterJoin, and SortMergeJoin) can use ClusteredDistributions with nullSafe = false. By adding this concept, we can avoid shuffle data when we have outer joins. For example, we only need three Exchange operators for a query like SELECT ... A LEFT OUTER JOIN B ON (A.key = B.key) LEFT OUTER JOIN (B.key = C.key) instead of four Exchange operators.

BTW, this PR does not shuffle rows with null partition keys randomly (#7685 has that part. We can add that part later).

@SparkQA
Copy link

SparkQA commented Aug 3, 2015

Test build #39520 has finished for PR 7886 at commit 2bc9be3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class NaiveBayes(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, HasProbabilityCol,
    • class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow]
    • case class ClusteredDistribution(
    • case class HashPartitioning(
    • case class PartitioningCollection(partitionings: Seq[Partitioning])

@SparkQA
Copy link

SparkQA commented Aug 3, 2015

Test build #39553 has finished for PR 7886 at commit a1d417b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredDistribution(
    • case class HashPartitioning(

@JoshRosen
Copy link
Contributor

I'd like to try to review this now since I think it's going to conflict with the SMJ outer join patch.

@JoshRosen
Copy link
Contributor

One high-level comment: unless I've overlooked it, there doesn't seem to be any documentation in the code to explain what the nullSafe concept means here, although maybe the meaning is clear from usage and context. One potential area of naming confusion is the fact that we use "null safety" when talking about whether expression evaluation methods can expect to receive null values or not. Our usage here seems slightly backwards almost, though, since it seems like this PR says that a null-safe partitioning means that the nulls will be shuffled, whereas the unsafe version drops the nulls. Am I overlooking something or is this potentially confusing?

@JoshRosen
Copy link
Contributor

Expression's use of nullSafe seems to be "safe due to absence of nulls", whereas this patch seems to use it as "safe to receive nulls / shuffle nulls."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you use a while loop here instead of a for comprehension or pair of nested for loops?

@JoshRosen
Copy link
Contributor

Actually I'm going to drop review of this for now and focus on pulling in SMJ first. That will conflict with this patch but we can remember to update SMJ's OutputPartitioning as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to overwrite the PartitioningCollection.nullSafe.

@yhuai
Copy link
Contributor Author

yhuai commented Sep 3, 2015

I am closing it for now. Will reopen it when I get a chance to work on it.

@yhuai yhuai closed this Sep 3, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants