File tree Expand file tree Collapse file tree 1 file changed +9
-2
lines changed
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate Expand file tree Collapse file tree 1 file changed +9
-2
lines changed Original file line number Diff line number Diff line change @@ -86,8 +86,15 @@ class SortBasedAggregationIterator(
8686 // The aggregation buffer used by the sort-based aggregation.
8787 private [this ] val sortBasedAggregationBuffer : MutableRow = newBuffer
8888
89- // A SafeProjection to turn UnsafeRow into GenericInternalRow, because UnsafeRow can't be
90- // compared to MutableRow (aggregation buffer) directly.
89+ // This safe projection is used to turn the input row into safe row. This is necessary
90+ // because the input row may be produced by unsafe projection in child operator and all the
91+ // produced rows share one byte array. However, when we update the aggregate buffer according to
92+ // the input row, we may cache some values from input row, e.g. `Max` will keep the max value from
93+ // input row via MutableProjection, `CollectList` will keep all values in an array via
94+ // ImperativeAggregate framework. These values may get changed unexpectedly if the underlying
95+ // unsafe projection update the shared byte array. By applying a safe projection to the input row,
96+ // we can cut down the connection from input row to the shared byte array, and thus it's safe to
97+ // cache values from input row while updating the aggregation buffer.
9198 private [this ] val safeProj : Projection = FromUnsafeProjection (valueAttributes.map(_.dataType))
9299
93100 protected def initialize (): Unit = {
You can’t perform that action at this time.
0 commit comments