Skip to content

Commit accd549

Browse files
committed
remove CachedColumnarIterator
1 parent 1afbf13 commit accd549

File tree

1 file changed

+2
-24
lines changed

1 file changed

+2
-24
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,8 @@ private[columnar] class CachedColumnarRDD(
5050
Iterator[CachedBatch] = {
5151
val metadataBlockId = RDDPartitionMetadataBlockId(id, split.index)
5252
val superGetOrCompute: (Partition, TaskContext) => Iterator[CachedBatch] = super.getOrCompute
53-
SparkEnv.get.blockManager.getSingle[InternalRow](metadataBlockId).map(metadataBlock =>
54-
new InterruptibleIterator[CachedBatch](context,
55-
new CachedColumnarIterator(metadataBlock, split, context, superGetOrCompute))
53+
SparkEnv.get.blockManager.getSingle[InternalRow](metadataBlockId).map(_ =>
54+
superGetOrCompute(split, context)
5655
).getOrElse {
5756
val batchIter = superGetOrCompute(split, context)
5857
if (containsPartitionMetadata && getStorageLevel != StorageLevel.NONE && batchIter.hasNext) {
@@ -98,24 +97,3 @@ private[columnar] object CachedColumnarRDD {
9897
}
9998
}
10099
}
101-
102-
private[columnar] class CachedColumnarIterator(
103-
val partitionStats: InternalRow,
104-
partition: Partition,
105-
context: TaskContext,
106-
fetchRDDPartition: (Partition, TaskContext) => Iterator[CachedBatch])
107-
extends Iterator[CachedBatch] {
108-
109-
private var delegate: Iterator[CachedBatch] = _
110-
111-
override def hasNext: Boolean = {
112-
if (delegate == null) {
113-
delegate = fetchRDDPartition(partition, context)
114-
}
115-
delegate.hasNext
116-
}
117-
118-
override def next(): CachedBatch = {
119-
delegate.next()
120-
}
121-
}

0 commit comments

Comments
 (0)