@@ -65,13 +65,21 @@ private[spark] class HashShuffleReader[K, C](
6565 readMetrics.incRecordsRead(1 )
6666 delegate.next()
6767 }
68- }. asInstanceOf [ Iterator [ Nothing ]]
68+ }
6969
7070 val aggregatedIter : Iterator [Product2 [K , C ]] = if (dep.aggregator.isDefined) {
7171 if (dep.mapSideCombine) {
72- new InterruptibleIterator (context, dep.aggregator.get.combineCombinersByKey(iter, context))
72+ // We are reading values that are already combined
73+ val combinedKeyValuesIterator = iter.asInstanceOf [Iterator [(K ,C )]]
74+ new InterruptibleIterator (context,
75+ dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context))
7376 } else {
74- new InterruptibleIterator (context, dep.aggregator.get.combineValuesByKey(iter, context))
77+ // We don't know the value type, but also don't care -- the dependency *should*
78+ // have made sure its compatible w/ this aggregator, which will convert the value
79+ // type to the combined type C
80+ val keyValuesIterator = iter.asInstanceOf [Iterator [(K ,Nothing )]]
81+ new InterruptibleIterator (context,
82+ dep.aggregator.get.combineValuesByKey(keyValuesIterator, context))
7583 }
7684 } else {
7785 require(! dep.mapSideCombine, " Map-side combine without Aggregator specified!" )
0 commit comments