@@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.expressions._
2525import org .apache .spark .sql .catalyst .expressions .codegen .CodegenContext
2626import org .apache .spark .sql .catalyst .plans .logical .{LogicalPlan , Statistics }
2727import org .apache .spark .sql .catalyst .plans .physical .{HashPartitioning , Partitioning , UnknownPartitioning }
28- import org .apache .spark .sql .execution .metric .SQLMetrics
2928import org .apache .spark .sql .execution .datasources .parquet .ParquetRelation
29+ import org .apache .spark .sql .execution .metric .SQLMetrics
3030import org .apache .spark .sql .sources .{BaseRelation , HadoopFsRelation }
3131import org .apache .spark .sql .types .DataType
3232
@@ -110,9 +110,7 @@ private[sql] case class PhysicalRDD(
110110 " numOutputRows" -> SQLMetrics .createLongMetric(sparkContext, " number of output rows" ))
111111
112112 protected override def doExecute (): RDD [InternalRow ] = {
113- val conf = SQLContext .getActive().get
114- // The vectorized reader does not produce UnsafeRows. In this case we will convert.
115- val unsafeRow = if (isUnsafeRow && ! conf.getConf(SQLConf .PARQUET_VECTORIZED_READER_ENABLED )) {
113+ val unsafeRow = if (isUnsafeRow) {
116114 rdd
117115 } else {
118116 rdd.mapPartitionsInternal { iter =>
@@ -142,12 +140,14 @@ private[sql] case class PhysicalRDD(
142140 override protected def doProduce (ctx : CodegenContext ): String = {
143141 val exprs = output.zipWithIndex.map(x => new BoundReference (x._2, x._1.dataType, true ))
144142 val row = ctx.freshName(" row" )
143+ val numOutputRows = metricTerm(ctx, " numOutputRows" )
145144 ctx.INPUT_ROW = row
146145 ctx.currentVars = null
147146 val columns = exprs.map(_.gen(ctx))
148147 s """
149148 | while (input.hasNext()) {
150149 | InternalRow $row = (InternalRow) input.next();
150+ | $numOutputRows.add(1);
151151 | ${columns.map(_.code).mkString(" \n " ).trim}
152152 | ${consume(ctx, columns).trim}
153153 | if (shouldStop()) {
0 commit comments