@@ -114,7 +114,10 @@ trait CodegenSupport extends SparkPlan {
114114 * Consume the columns generated from current SparkPlan, call it's parent.
115115 */
116116 final def consume (ctx : CodegenContext , input : Seq [ExprCode ], row : String = null ): String = {
117- if (input != null && ! parent.consumeUnsafeRow) {
117+ // We check if input expressions has same length as output when:
118+ // 1. parent can't consume UnsafeRow and input is not null.
119+ // 2. parent consumes UnsafeRow and row is null.
120+ if ((input != null && ! parent.consumeUnsafeRow) || (parent.consumeUnsafeRow && row == null )) {
118121 assert(input.length == output.length)
119122 }
120123 parent.consumeChild(ctx, this , input, row)
@@ -211,12 +214,15 @@ case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport {
211214 ctx.addMutableState(" scala.collection.Iterator" , input, s " $input = inputs[0]; " )
212215 val row = ctx.freshName(" row" )
213216
217+ // If the parent of this InputAdapter can't consume UnsafeRow,
218+ // we unpack variables from the row.
214219 val columns : Seq [ExprCode ] = if (! this .parent.consumeUnsafeRow) {
215220 val exprs = output.zipWithIndex.map(x => new BoundReference (x._2, x._1.dataType, true ))
216221 ctx.INPUT_ROW = row
217222 ctx.currentVars = null
218223 exprs.map(_.gen(ctx))
219224 } else {
225+ // If the parent consumes UnsafeRow, we don't need to unpack the row.
220226 Seq .empty
221227 }
222228 val columnsCode = if (columns.isEmpty) {
0 commit comments