Skip to content

Commit 3d7a1f2

Browse files
committed
Add writeToStream() method to UnsafeRow
1 parent c9db8ea commit 3d7a1f2

File tree

3 files changed

+105
-15
lines changed

3 files changed

+105
-15
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions;
1919

20+
import java.io.IOException;
21+
import java.io.OutputStream;
22+
2023
import org.apache.spark.sql.catalyst.InternalRow;
2124
import org.apache.spark.sql.catalyst.util.ObjectPool;
2225
import org.apache.spark.unsafe.PlatformDependent;
@@ -365,6 +368,36 @@ public InternalRow copy() {
365368
}
366369
}
367370

371+
/**
372+
* Write this UnsafeRow's underlying bytes to the given OutputStream.
373+
*
374+
* @param out the stream to write to.
375+
* @param writeBuffer a byte array for buffering chunks of off-heap data while writing to the
376+
* output stream. If this row is backed by an on-heap byte array, then this
377+
* buffer will not be used and may be null.
378+
*/
379+
public void writeToStream(OutputStream out, byte[] writeBuffer) throws IOException {
380+
if (baseObject instanceof byte[]) {
381+
int offsetInByteArray = (int) (PlatformDependent.BYTE_ARRAY_OFFSET - baseOffset);
382+
out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes);
383+
} else {
384+
int dataRemaining = sizeInBytes;
385+
long rowReadPosition = baseOffset;
386+
while (dataRemaining > 0) {
387+
int toTransfer = Math.min(writeBuffer.length, dataRemaining);
388+
PlatformDependent.copyMemory(
389+
baseObject,
390+
rowReadPosition,
391+
writeBuffer,
392+
PlatformDependent.BYTE_ARRAY_OFFSET,
393+
toTransfer);
394+
out.write(writeBuffer, 0, toTransfer);
395+
rowReadPosition += toTransfer;
396+
dataRemaining -= toTransfer;
397+
}
398+
}
399+
}
400+
368401
@Override
369402
public boolean anyNull() {
370403
return BitSetMethods.anySet(baseObject, baseOffset, bitSetWidthInBytes);

sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -59,21 +59,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
5959
val row = value.asInstanceOf[UnsafeRow]
6060
assert(row.getPool == null, "UnsafeRowSerializer does not support ObjectPool")
6161
dOut.writeInt(row.getSizeInBytes)
62-
var dataRemaining: Int = row.getSizeInBytes
63-
val baseObject = row.getBaseObject
64-
var rowReadPosition: Long = row.getBaseOffset
65-
while (dataRemaining > 0) {
66-
val toTransfer: Int = Math.min(writeBuffer.length, dataRemaining)
67-
PlatformDependent.copyMemory(
68-
baseObject,
69-
rowReadPosition,
70-
writeBuffer,
71-
PlatformDependent.BYTE_ARRAY_OFFSET,
72-
toTransfer)
73-
out.write(writeBuffer, 0, toTransfer)
74-
rowReadPosition += toTransfer
75-
dataRemaining -= toTransfer
76-
}
62+
row.writeToStream(out, writeBuffer)
7763
this
7864
}
7965
override def writeKey[T: ClassTag](key: T): SerializationStream = {
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
import java.io.ByteArrayOutputStream
21+
22+
import org.apache.spark.SparkFunSuite
23+
import org.apache.spark.sql.catalyst.InternalRow
24+
import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
25+
import org.apache.spark.sql.types.{IntegerType, StringType}
26+
import org.apache.spark.unsafe.PlatformDependent
27+
import org.apache.spark.unsafe.memory.MemoryAllocator
28+
import org.apache.spark.unsafe.types.UTF8String
29+
30+
class UnsafeRowSuite extends SparkFunSuite {
31+
test("writeToStream") {
32+
val row = InternalRow.apply(UTF8String.fromString("hello"), UTF8String.fromString("world"), 123)
33+
val arrayBackedUnsafeRow: UnsafeRow =
34+
UnsafeProjection.create(Seq(StringType, StringType, IntegerType)).apply(row)
35+
assert(arrayBackedUnsafeRow.getBaseObject.isInstanceOf[Array[Byte]])
36+
val bytesFromArrayBackedRow: Array[Byte] = {
37+
val baos = new ByteArrayOutputStream()
38+
arrayBackedUnsafeRow.writeToStream(baos, null)
39+
baos.toByteArray
40+
}
41+
val bytesFromOffheapRow: Array[Byte] = {
42+
val offheapRowPage = MemoryAllocator.UNSAFE.allocate(arrayBackedUnsafeRow.getSizeInBytes)
43+
try {
44+
PlatformDependent.copyMemory(
45+
arrayBackedUnsafeRow.getBaseObject,
46+
arrayBackedUnsafeRow.getBaseOffset,
47+
offheapRowPage.getBaseObject,
48+
offheapRowPage.getBaseOffset,
49+
arrayBackedUnsafeRow.getSizeInBytes
50+
)
51+
val offheapUnsafeRow: UnsafeRow = new UnsafeRow()
52+
offheapUnsafeRow.pointTo(
53+
offheapRowPage.getBaseObject,
54+
offheapRowPage.getBaseOffset,
55+
3, // num fields
56+
arrayBackedUnsafeRow.getSizeInBytes,
57+
null // object pool
58+
)
59+
assert(offheapUnsafeRow.getBaseObject === null)
60+
val baos = new ByteArrayOutputStream()
61+
val writeBuffer = new Array[Byte](1024)
62+
offheapUnsafeRow.writeToStream(baos, writeBuffer)
63+
baos.toByteArray
64+
} finally {
65+
MemoryAllocator.UNSAFE.free(offheapRowPage)
66+
}
67+
}
68+
69+
assert(bytesFromArrayBackedRow === bytesFromOffheapRow)
70+
}
71+
}

0 commit comments

Comments
 (0)