@@ -45,16 +45,9 @@ private[sql] class UnsafeRowSerializer(numFields: Int) extends Serializer with S
4545}
4646
4747private class UnsafeRowSerializerInstance (numFields : Int ) extends SerializerInstance {
48-
49- /**
50- * Marks the end of a stream written with [[serializeStream() ]].
51- */
52- private [this ] val EOF : Int = - 1
53-
5448 /**
5549 * Serializes a stream of UnsafeRows. Within the stream, each record consists of a record
5650 * length (stored as a 4-byte integer, written high byte first), followed by the record's bytes.
57- * The end of the stream is denoted by a record with the special length `EOF` (-1).
5851 */
5952 override def serializeStream (out : OutputStream ): SerializationStream = new SerializationStream {
6053 private [this ] var writeBuffer : Array [Byte ] = new Array [Byte ](4096 )
@@ -92,7 +85,6 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
9285
9386 override def close (): Unit = {
9487 writeBuffer = null
95- dOut.writeInt(EOF )
9688 dOut.close()
9789 }
9890 }
@@ -104,12 +96,20 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
10496 private [this ] var rowBuffer : Array [Byte ] = new Array [Byte ](1024 )
10597 private [this ] var row : UnsafeRow = new UnsafeRow ()
10698 private [this ] var rowTuple : (Int , UnsafeRow ) = (0 , row)
99+ private [this ] val EOF : Int = - 1
107100
108101 override def asKeyValueIterator : Iterator [(Int , UnsafeRow )] = {
109102 new Iterator [(Int , UnsafeRow )] {
110- private [this ] var rowSize : Int = dIn.readInt()
111- if (rowSize == EOF ) dIn.close()
112103
104+ private [this ] def readSize (): Int = try {
105+ dIn.readInt()
106+ } catch {
107+ case e : EOFException =>
108+ dIn.close()
109+ EOF
110+ }
111+
112+ private [this ] var rowSize : Int = readSize()
113113 override def hasNext : Boolean = rowSize != EOF
114114
115115 override def next (): (Int , UnsafeRow ) = {
@@ -118,7 +118,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
118118 }
119119 ByteStreams .readFully(dIn, rowBuffer, 0 , rowSize)
120120 row.pointTo(rowBuffer, Platform .BYTE_ARRAY_OFFSET , numFields, rowSize)
121- rowSize = dIn.readInt() // read the next row's size
121+ rowSize = readSize()
122122 if (rowSize == EOF ) { // We are returning the last row in this stream
123123 dIn.close()
124124 val _rowTuple = rowTuple
0 commit comments