Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,42 @@ static long getPrefix(Object base, long offset, int numBytes) {
return (IS_LITTLE_ENDIAN ? java.lang.Long.reverseBytes(p) : p) & ~mask;
}

public static int compareBinary(byte[] leftBase, byte[] rightBase) {
return compareBinary(leftBase, Platform.BYTE_ARRAY_OFFSET, leftBase.length,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm only wondering if this ends up being slower - you already have byte arrays, and now have to go through platform methods to read them?

Copy link
Contributor

@JoshRosen JoshRosen Oct 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

It seems plausible that the new version will be faster, but it's probably a good idea to run a quick benchmark to confirm. There's a UTF8StringBenchmark linked from #19180 (comment) : maybe we could adapt that to work on byte arrays and do a quick before-and-after comparison to just to double check?

Edit: just to clarify: I noticed that this benchmark is also linked in the PR description. As Sean points out, I think the key difference in this PR is whether we're using getByte() versus directly accessing the on-heap byte array (in the linked UTF8String benchmark, both the old and new code were using getByte()).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @srowen and @JoshRosen for point out the difference. I follow the linked benchmark but add a new 512 byte slow benchmark which the first 511 bytes are same. The benchmark result shows it has no regression after this PR and has big benifits if the byte arrays have many same prefix.

Before this PR:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_271-b09 on Mac OS X 10.16
Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
Byte Array compareTo:                     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                            800            861          70         81.9          12.2       1.0X
8-16 byte                                           810            878          59         80.9          12.4       1.0X
16-32 byte                                          804            887          40         81.5          12.3       1.0X
512-1024 byte                                      1050           1181          43         62.4          16.0       0.8X
512 byte slow                                     23593          23698         311          2.8         360.0       0.0X
2-7 byte                                            778            784           5         84.2          11.9       1.0X

After this PR:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_271-b09 on Mac OS X 10.16
Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
Byte Array compareTo:                     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                            425            471          24        154.2           6.5       1.0X
8-16 byte                                           751            814          40         87.2          11.5       0.5X
16-32 byte                                          789            842          42         83.1          12.0       0.5X
512-1024 byte                                      1038           1175         193         63.1          15.8       0.4X
512 byte slow                                      3419           3924         NaN         19.2          52.2       0.1X
2-7 byte                                            421            424           2        155.6           6.4       1.0X

rightBase, Platform.BYTE_ARRAY_OFFSET, rightBase.length);
}

static int compareBinary(
Object leftBase,
long leftOffset,
int leftNumBytes,
Object rightBase,
long rightOffset,
int rightNumBytes) {
int len = Math.min(leftNumBytes, rightNumBytes);
int wordMax = (len / 8) * 8;
for (int i = 0; i < wordMax; i += 8) {
long left = Platform.getLong(leftBase, leftOffset + i);
long right = Platform.getLong(rightBase, rightOffset + i);
if (left != right) {
if (IS_LITTLE_ENDIAN) {
return Long.compareUnsigned(Long.reverseBytes(left), Long.reverseBytes(right));
} else {
return Long.compareUnsigned(left, right);
}
}
}
for (int i = wordMax; i < len; i++) {
// Both UTF-8 and byte array should be compared as unsigned int.
int res = (Platform.getByte(leftBase, leftOffset + i) & 0xFF) -
(Platform.getByte(rightBase, rightOffset + i) & 0xFF);
if (res != 0) {
return res;
}
}
return leftNumBytes - rightNumBytes;
}

public static byte[] subStringSQL(byte[] bytes, int pos, int len) {
// This pos calculation is according to UTF8String#subStringSQL
if (pos > bytes.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,29 +1353,8 @@ public UTF8String copy() {

@Override
public int compareTo(@Nonnull final UTF8String other) {
int len = Math.min(numBytes, other.numBytes);
int wordMax = (len / 8) * 8;
long roffset = other.offset;
Object rbase = other.base;
for (int i = 0; i < wordMax; i += 8) {
long left = getLong(base, offset + i);
long right = getLong(rbase, roffset + i);
if (left != right) {
if (IS_LITTLE_ENDIAN) {
return Long.compareUnsigned(Long.reverseBytes(left), Long.reverseBytes(right));
} else {
return Long.compareUnsigned(left, right);
}
}
}
for (int i = wordMax; i < len; i++) {
// In UTF-8, the byte should be unsigned, so we should compare them as unsigned int.
int res = (getByte(i) & 0xFF) - (Platform.getByte(rbase, roffset + i) & 0xFF);
if (res != 0) {
return res;
}
}
return numBytes - other.numBytes;
return ByteArray.compareBinary(
base, offset, numBytes, other.base, other.offset, other.numBytes);
}

public int compare(final UTF8String other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,23 @@ public void testGetPrefix() {
Assert.assertEquals(result, expected);
}
}

@Test
public void testCompareBinary() {
byte[] x1 = new byte[0];
byte[] y1 = new byte[]{(byte) 1, (byte) 2, (byte) 3};
assert(ByteArray.compareBinary(x1, y1) < 0);

byte[] x2 = new byte[]{(byte) 200, (byte) 100};
byte[] y2 = new byte[]{(byte) 100, (byte) 100};
assert(ByteArray.compareBinary(x2, y2) > 0);

byte[] x3 = new byte[]{(byte) 100, (byte) 200, (byte) 12};
byte[] y3 = new byte[]{(byte) 100, (byte) 200};
assert(ByteArray.compareBinary(x3, y3) > 0);

byte[] x4 = new byte[]{(byte) 100, (byte) 200};
byte[] y4 = new byte[]{(byte) 100, (byte) 200};
assert(ByteArray.compareBinary(x4, y4) == 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ class CodegenContext extends Logging {
s"$clsName.compareFloats($c1, $c2)"
// use c1 - c2 may overflow
case dt: DataType if isPrimitiveType(dt) => s"($c1 > $c2 ? 1 : $c1 < $c2 ? -1 : 0)"
case BinaryType => s"org.apache.spark.sql.catalyst.util.TypeUtils.compareBinary($c1, $c2)"
case BinaryType => s"org.apache.spark.unsafe.types.ByteArray.compareBinary($c1, $c2)"
case NullType => "0"
case array: ArrayType =>
val elementType = array.elementType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,6 @@ object TypeUtils {
}
}

def compareBinary(x: Array[Byte], y: Array[Byte]): Int = {
val limit = if (x.length <= y.length) x.length else y.length
var i = 0
while (i < limit) {
val res = (x(i) & 0xff) - (y(i) & 0xff)
if (res != 0) return res
i += 1
}
x.length - y.length
}

/**
* Returns true if the equals method of the elements of the data type is implemented properly.
* This also means that they can be safely used in collections relying on the equals method,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.types
import scala.reflect.runtime.universe.typeTag

import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.unsafe.types.ByteArray

/**
* The data type representing `Array[Byte]` values.
Expand All @@ -37,7 +37,7 @@ class BinaryType private() extends AtomicType {
@transient private[sql] lazy val tag = typeTag[InternalType]

private[sql] val ordering =
(x: Array[Byte], y: Array[Byte]) => TypeUtils.compareBinary(x, y)
(x: Array[Byte], y: Array[Byte]) => ByteArray.compareBinary(x, y)

/**
* The default size of a value of the BinaryType is 100 bytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,4 @@ class TypeUtilsSuite extends SparkFunSuite {
typeCheckPass(ArrayType(StringType, containsNull = true) ::
ArrayType(StringType, containsNull = false) :: Nil)
}

test("compareBinary") {
val x1 = Array[Byte]()
val y1 = Array(1, 2, 3).map(_.toByte)
assert(TypeUtils.compareBinary(x1, y1) < 0)

val x2 = Array(200, 100).map(_.toByte)
val y2 = Array(100, 100).map(_.toByte)
assert(TypeUtils.compareBinary(x2, y2) > 0)

val x3 = Array(100, 200, 12).map(_.toByte)
val y3 = Array(100, 200).map(_.toByte)
assert(TypeUtils.compareBinary(x3, y3) > 0)

val x4 = Array(100, 200).map(_.toByte)
val y4 = Array(100, 200).map(_.toByte)
assert(TypeUtils.compareBinary(x4, y4) == 0)
}
}
16 changes: 16 additions & 0 deletions sql/core/benchmarks/ByteArrayBenchmark-jdk11-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
================================================================================================
byte array comparisons
================================================================================================

OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Byte Array compareTo: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte 501 514 14 130.9 7.6 1.0X
8-16 byte 976 993 10 67.1 14.9 0.5X
16-32 byte 985 995 6 66.5 15.0 0.5X
512-1024 byte 1260 1282 13 52.0 19.2 0.4X
512 byte slow 3114 3193 46 21.0 47.5 0.2X
2-7 byte 572 578 7 114.5 8.7 0.9X


16 changes: 16 additions & 0 deletions sql/core/benchmarks/ByteArrayBenchmark-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
================================================================================================
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have 'before' numbers for these? you don't need to include them just want to verify that it also seemed to show an improvement like your local laptop one did

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the old code path benchmark result:

JDK8

================================================================================================
byte array comparisons
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.8.0-1042-azure
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
Byte Array compareTo:                     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                            799            836          24         82.0          12.2       1.0X
8-16 byte                                           832            906          32         78.8          12.7       1.0X
16-32 byte                                          812            854          28         80.7          12.4       1.0X
512-1024 byte                                      1057           1088          20         62.0          16.1       0.8X
512 byte slow                                     24628          26054         NaN          2.7         375.8       0.0X
2-7 byte                                            811            849          23         80.8          12.4       1.0X

JDK11

================================================================================================
byte array comparisons
================================================================================================

OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
Byte Array compareTo:                     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                            771            812          28         85.0          11.8       1.0X
8-16 byte                                           839            857          13         78.1          12.8       0.9X
16-32 byte                                          898            926          17         73.0          13.7       0.9X
512-1024 byte                                      1141           1189          23         57.4          17.4       0.7X
512 byte slow                                     40124          40689         495          1.6         612.2       0.0X
2-7 byte                                            827            847          14         79.3          12.6       0.9X

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shows we still have the benefits with GA env.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I just notice the env of GA is still different. The two benchmark result based on:

Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to believe it is a win based on your first benchmark. Is there any easy way to run before/after on these Xeons, or is that hard?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I compared the two code path within one patch, and here is the result.

JDK8:

================================================================================================
byte array comparisons
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.8.0-1042-azure
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
Byte Array compare offHeap:               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                            636            661          14        103.0           9.7       1.0X
8-16 byte                                          1067           1112          21         61.4          16.3       0.6X
16-32 byte                                         1226           1352          98         53.4          18.7       0.5X
512-1024 byte                                      1803           1916          46         36.3          27.5       0.4X
512 byte slow                                      4343           4662         171         15.1          66.3       0.1X
2-7 byte                                           1075           1119          26         61.0          16.4       0.6X

OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.8.0-1042-azure
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
Byte Array compare onHeap:                Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                           1511           1570          30         43.4          23.1       1.0X
8-16 byte                                          1522           1564          27         43.1          23.2       1.0X
16-32 byte                                         1426           1554          36         46.0          21.8       1.1X
512-1024 byte                                      2080           2198          86         31.5          31.7       0.7X
512 byte slow                                     28498          29222         410          2.3         434.9       0.1X
2-7 byte                                           1382           1485          61         47.4          21.1       1.1X

JDK11

================================================================================================
byte array comparisons
================================================================================================

OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
Byte Array compare offHeap:               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                            720            777          21         91.0          11.0       1.0X
8-16 byte                                          1077           1138          32         60.8          16.4       0.7X
16-32 byte                                         1347           1463          84         48.7          20.5       0.5X
512-1024 byte                                      1898           1989          40         34.5          29.0       0.4X
512 byte slow                                      4621           4878         168         14.2          70.5       0.2X
2-7 byte                                           1062           1133          28         61.7          16.2       0.7X

OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
Byte Array compare onHeap:                Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                           1377           1471          37         47.6          21.0       1.0X
8-16 byte                                          1398           1475          38         46.9          21.3       1.0X
16-32 byte                                         1452           1547          47         45.2          22.1       0.9X
512-1024 byte                                      1826           1953          55         35.9          27.9       0.8X
512 byte slow                                     45883          47146         NaN          1.4         700.1       0.0X
2-7 byte                                           1401           1484          39         46.8          21.4       1.0X

byte array comparisons
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.8.0-1042-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Byte Array compareTo: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte 407 418 9 161.1 6.2 1.0X
8-16 byte 867 919 30 75.6 13.2 0.5X
16-32 byte 882 916 23 74.3 13.5 0.5X
512-1024 byte 1123 1167 31 58.4 17.1 0.4X
512 byte slow 4054 4611 506 16.2 61.9 0.1X
2-7 byte 430 450 16 152.4 6.6 0.9X


Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.benchmark

import scala.util.Random

import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.unsafe.types.ByteArray

/**
* Benchmark to measure performance for byte array comparisons.
* {{{
* To run this benchmark:
* 1. without sbt:
* bin/spark-submit --class <this class> --jars <spark core test jar> <sql core test jar>
* 2. build/sbt "sql/test:runMain <this class>"
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
* Results will be written to "benchmarks/<this class>-results.txt".
* }}}
*/
object ByteArrayBenchmark extends BenchmarkBase {

def byteArrayComparisons(iters: Long): Unit = {
val chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
val random = new Random(0)
def randomBytes(min: Int, max: Int): Array[Byte] = {
val len = random.nextInt(max - min) + min
val bytes = new Array[Byte](len)
var i = 0
while (i < len) {
bytes(i) = chars.charAt(random.nextInt(chars.length())).toByte
i += 1
}
bytes
}

val count = 16 * 1000
val dataTiny = Seq.fill(count)(randomBytes(2, 7)).toArray
val dataSmall = Seq.fill(count)(randomBytes(8, 16)).toArray
val dataMedium = Seq.fill(count)(randomBytes(16, 32)).toArray
val dataLarge = Seq.fill(count)(randomBytes(512, 1024)).toArray
val dataLargeSlow = Seq.fill(count)(
Array.tabulate(512) {i => if (i < 511) 0.toByte else 1.toByte}).toArray

def compareBinary(data: Array[Array[Byte]]) = { _: Int =>
var sum = 0L
for (_ <- 0L until iters) {
var i = 0
while (i < count) {
sum += ByteArray.compareBinary(data(i), data((i + 1) % count))
i += 1
}
}
}

val benchmark = new Benchmark("Byte Array compareTo", count * iters, 25, output = output)
benchmark.addCase("2-7 byte")(compareBinary(dataTiny))
benchmark.addCase("8-16 byte")(compareBinary(dataSmall))
benchmark.addCase("16-32 byte")(compareBinary(dataMedium))
benchmark.addCase("512-1024 byte")(compareBinary(dataLarge))
benchmark.addCase("512 byte slow")(compareBinary(dataLargeSlow))
benchmark.addCase("2-7 byte")(compareBinary(dataTiny))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this this case is listed twice. Maybe drop this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first benchmark case may run slower than the latter due to the JIT optimization and this case has small size which can be done in a short time that would be more likely affected.

So I also keep it running twice in case this issue.

benchmark.run()
}

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runBenchmark("byte array comparisons") {
byteArrayComparisons(1024 * 4)
}
}
}