File tree Expand file tree Collapse file tree 1 file changed +5
-5
lines changed
sql/core/src/main/scala/org/apache/spark/sql/execution Expand file tree Collapse file tree 1 file changed +5
-5
lines changed Original file line number Diff line number Diff line change @@ -169,24 +169,24 @@ case class LeftSemiJoinHash(
169169 def execute () = {
170170
171171 buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
172- val hashTable = new java.util.HashSet [Row ]()
172+ val hashSet = new java.util.HashSet [Row ]()
173173 var currentRow : Row = null
174174
175175 // Create a Hash set of buildKeys
176176 while (buildIter.hasNext) {
177177 currentRow = buildIter.next()
178178 val rowKey = buildSideKeyGenerator(currentRow)
179179 if (! rowKey.anyNull) {
180- val keyExists = hashTable .contains(rowKey)
180+ val keyExists = hashSet .contains(rowKey)
181181 if (! keyExists) {
182- hashTable .add(rowKey)
182+ hashSet .add(rowKey)
183183 }
184184 }
185185 }
186186
187+ val joinKeys = streamSideKeyGenerator()
187188 streamIter.filter(current => {
188- val joinKeys = streamSideKeyGenerator()
189- ! joinKeys(current).anyNull && hashTable.contains(joinKeys.currentValue)
189+ ! joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue)
190190 })
191191 }
192192 }
You can’t perform that action at this time.
0 commit comments