File tree Expand file tree Collapse file tree 1 file changed +3
-2
lines changed
streaming/src/main/scala/org/apache/spark/streaming/util Expand file tree Collapse file tree 1 file changed +3
-2
lines changed Original file line number Diff line number Diff line change @@ -26,7 +26,7 @@ import scala.language.postfixOps
2626import org .apache .hadoop .conf .Configuration
2727import org .apache .hadoop .fs .Path
2828
29- import org .apache .spark .util .ThreadUtils
29+ import org .apache .spark .util .{ CompletionIterator , ThreadUtils }
3030import org .apache .spark .{Logging , SparkConf }
3131
3232/**
@@ -124,7 +124,8 @@ private[streaming] class FileBasedWriteAheadLog(
124124
125125 logFilesToRead.iterator.map { file =>
126126 logDebug(s " Creating log reader with $file" )
127- new FileBasedWriteAheadLogReader (file, hadoopConf)
127+ val reader = new FileBasedWriteAheadLogReader (file, hadoopConf)
128+ CompletionIterator [ByteBuffer , Iterator [ByteBuffer ]](reader, reader.close _)
128129 } flatMap { x => x }
129130 }
130131
You can’t perform that action at this time.
0 commit comments