@@ -49,8 +49,16 @@ private[sql] class UnsafeRowSerializer(numFields: Int) extends Serializer with S
4949
5050private class UnsafeRowSerializerInstance (numFields : Int ) extends SerializerInstance {
5151
52+ /**
53+ * Marks the end of a stream written with [[serializeStream() ]].
54+ */
5255 private [this ] val EOF : Int = - 1
5356
57+ /**
58+ * Serializes a stream of UnsafeRows. Within the stream, each record consists of a record
59+ * length (stored as a 4-byte integer, written high byte first), followed by the record's bytes.
60+ * The end of the stream is denoted by a record with the special length `EOF` (-1).
61+ */
5462 override def serializeStream (out : OutputStream ): SerializationStream = new SerializationStream {
5563 private [this ] var writeBuffer : Array [Byte ] = new Array [Byte ](4096 )
5664 private [this ] val dOut : DataOutputStream = new DataOutputStream (out)
@@ -62,15 +70,28 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
6270 row.writeToStream(out, writeBuffer)
6371 this
6472 }
73+
6574 override def writeKey [T : ClassTag ](key : T ): SerializationStream = {
75+ // The key is only needed on the map side when computing partition ids. It does not need to
76+ // be shuffled.
6677 assert(key.isInstanceOf [Int ])
6778 this
6879 }
69- override def writeAll [T : ClassTag ](iter : Iterator [T ]): SerializationStream =
80+
81+ override def writeAll [T : ClassTag ](iter : Iterator [T ]): SerializationStream = {
82+ // This method is never called by shuffle code.
7083 throw new UnsupportedOperationException
71- override def writeObject [T : ClassTag ](t : T ): SerializationStream =
84+ }
85+
86+ override def writeObject [T : ClassTag ](t : T ): SerializationStream = {
87+ // This method is never called by shuffle code.
7288 throw new UnsupportedOperationException
73- override def flush (): Unit = dOut.flush()
89+ }
90+
91+ override def flush (): Unit = {
92+ dOut.flush()
93+ }
94+
7495 override def close (): Unit = {
7596 writeBuffer = null
7697 dOut.writeInt(EOF )
@@ -81,6 +102,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
81102 override def deserializeStream (in : InputStream ): DeserializationStream = {
82103 new DeserializationStream {
83104 private [this ] val dIn : DataInputStream = new DataInputStream (in)
105+ // 1024 is a default buffer size; this buffer will grow to accommodate larger rows
84106 private [this ] var rowBuffer : Array [Byte ] = new Array [Byte ](1024 )
85107 private [this ] var row : UnsafeRow = new UnsafeRow ()
86108 private [this ] var rowTuple : (Int , UnsafeRow ) = (0 , row)
@@ -112,14 +134,40 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
112134 }
113135 }
114136 }
115- override def asIterator : Iterator [Any ] = throw new UnsupportedOperationException
116- override def readKey [T : ClassTag ](): T = throw new UnsupportedOperationException
117- override def readValue [T : ClassTag ](): T = throw new UnsupportedOperationException
118- override def readObject [T : ClassTag ](): T = throw new UnsupportedOperationException
119- override def close (): Unit = dIn.close()
137+
138+ override def asIterator : Iterator [Any ] = {
139+ // This method is never called by shuffle code.
140+ throw new UnsupportedOperationException
141+ }
142+
143+ override def readKey [T : ClassTag ](): T = {
144+ // We skipped serialization of the key in writeKey(), so just return a dummy value since
145+ // this is going to be discarded anyways.
146+ null .asInstanceOf [T ]
147+ }
148+
149+ override def readValue [T : ClassTag ](): T = {
150+ val rowSize = dIn.readInt()
151+ if (rowBuffer.length < rowSize) {
152+ rowBuffer = new Array [Byte ](rowSize)
153+ }
154+ ByteStreams .readFully(in, rowBuffer, 0 , rowSize)
155+ row.pointTo(rowBuffer, PlatformDependent .BYTE_ARRAY_OFFSET , numFields, rowSize, null )
156+ row.asInstanceOf [T ]
157+ }
158+
159+ override def readObject [T : ClassTag ](): T = {
160+ // This method is never called by shuffle code.
161+ throw new UnsupportedOperationException
162+ }
163+
164+ override def close (): Unit = {
165+ dIn.close()
166+ }
120167 }
121168 }
122169
170+ // These methods are never called by shuffle code.
123171 override def serialize [T : ClassTag ](t : T ): ByteBuffer = throw new UnsupportedOperationException
124172 override def deserialize [T : ClassTag ](bytes : ByteBuffer ): T =
125173 throw new UnsupportedOperationException
0 commit comments