@@ -73,22 +73,23 @@ case class SortMergeJoin(
7373 private [this ] var rightElement : Row = _
7474 private [this ] var leftKey : Row = _
7575 private [this ] var rightKey : Row = _
76- private [this ] var leftMatches : CompactBuffer [Row ] = _
7776 private [this ] var rightMatches : CompactBuffer [Row ] = _
78- private [this ] var leftPosition : Int = - 1
7977 private [this ] var rightPosition : Int = - 1
78+ private [this ] var stop : Boolean = false
79+ private [this ] var matchKey : Row = _
8080
81- override final def hasNext : Boolean = leftPosition != - 1 || nextMatchingPair
81+ override final def hasNext : Boolean = nextMatchingPair()
8282
8383 override final def next (): Row = {
8484 if (hasNext) {
85- val joinedRow = joinRow(leftMatches(leftPosition) , rightMatches(rightPosition))
85+ val joinedRow = joinRow(leftElement , rightMatches(rightPosition))
8686 rightPosition += 1
8787 if (rightPosition >= rightMatches.size) {
88- leftPosition += 1
8988 rightPosition = 0
90- if (leftPosition >= leftMatches.size) {
91- leftPosition = - 1
89+ fetchLeft()
90+ if (leftElement == null || ordering.compare(leftKey, matchKey) != 0 ) {
91+ stop = false
92+ rightMatches = null
9293 }
9394 }
9495 joinedRow
@@ -130,9 +131,7 @@ case class SortMergeJoin(
130131 * of tuples.
131132 */
132133 private def nextMatchingPair (): Boolean = {
133- if (leftPosition == - 1 ) {
134- leftMatches = null
135- var stop : Boolean = false
134+ if (! stop && rightElement != null ) {
136135 while (! stop && leftElement != null && rightElement != null ) {
137136 stop = ordering.compare(leftKey, rightKey) == 0 && ! leftKey.anyNull
138137 if (ordering.compare(leftKey, rightKey) > 0 || rightKey.anyNull) {
@@ -142,27 +141,21 @@ case class SortMergeJoin(
142141 }
143142 }
144143 rightMatches = new CompactBuffer [Row ]()
145- while (stop && rightElement != null ) {
146- rightMatches += rightElement
147- fetchRight()
148- // exit loop when run out of right matches
149- stop = ordering.compare(leftKey, rightKey) == 0
150- }
151- if (rightMatches.size > 0 ) {
152- leftMatches = new CompactBuffer [Row ]()
153- val leftMatch = leftKey.copy()
154- while (ordering.compare(leftKey, leftMatch) == 0 && leftElement != null ) {
155- leftMatches += leftElement
156- fetchLeft()
144+ if (stop) {
145+ stop = false
146+ while (! stop && rightElement != null ) {
147+ rightMatches += rightElement
148+ fetchRight()
149+ // exit loop when run out of right matches
150+ stop = ordering.compare(leftKey, rightKey) != 0
151+ }
152+ if (rightMatches.size > 0 ) {
153+ rightPosition = 0
154+ matchKey = leftKey
157155 }
158- }
159-
160- if (leftMatches != null ) {
161- leftPosition = 0
162- rightPosition = 0
163156 }
164157 }
165- leftPosition > - 1
158+ rightMatches != null && rightMatches.size > 0
166159 }
167160 }
168161 }
0 commit comments