diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index c89d5cc971d2..d993e97fddd3 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -453,7 +453,7 @@ final class ShuffleBlockFetcherIterator( // since the last call. val msg = s"Received a zero-size buffer for block $blockId from $address " + s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)" - throwFetchFailedException(blockId, address, new IOException(msg)) +// throwFetchFailedException(blockId, address, new IOException(msg)) } val in = try { diff --git a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala index 126ba0e8b1e9..a2c9002bbaef 100644 --- a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.serializer +import org.roaringbitmap.RoaringBitmap + import org.apache.spark.internal.config.Kryo._ class UnsafeKryoSerializerSuite extends KryoSerializerSuite { @@ -32,4 +34,19 @@ class UnsafeKryoSerializerSuite extends KryoSerializerSuite { conf.set(KRYO_USE_UNSAFE, false) super.afterAll() } + + test("SPARK-27216: kryo serialization with RoaringBitmap") { + val expected = new RoaringBitmap + expected.add(1787) + + conf.set(KRYO_USE_UNSAFE, false) + val safeSer = new KryoSerializer(conf).newInstance() + var actual : RoaringBitmap = safeSer.deserialize(safeSer.serialize(expected)) + assert(actual === expected) + + conf.set(KRYO_USE_UNSAFE, true) + val unsafeSer = new KryoSerializer(conf).newInstance() + actual = unsafeSer.deserialize(unsafeSer.serialize(expected)) + assert(actual !== expected) + } } diff --git a/sql/core/src/test/resources/test-data/dates.parquet b/sql/core/src/test/resources/test-data/dates.parquet new file mode 100644 index 000000000000..f0143f15f359 Binary files /dev/null and b/sql/core/src/test/resources/test-data/dates.parquet differ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryWithKryoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryWithKryoSuite.scala new file mode 100644 index 000000000000..8c26d98f4818 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryWithKryoSuite.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.internal.config +import org.apache.spark.internal.config.Kryo._ +import org.apache.spark.internal.config.SERIALIZER +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class SQLQueryWithKryoSuite extends QueryTest with SharedSQLContext { + + override protected def sparkConf = super.sparkConf + .set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") + .set(KRYO_USE_UNSAFE, true) + + test("kryo unsafe data quality issue") { + // This issue can be reproduced when + // 1. Enable KryoSerializer + // 2. Set spark.kryo.unsafe to true + // 3. Use HighlyCompressedMapStatus since it uses RoaringBitmap + // 4. Set spark.sql.shuffle.partitions to 6000, 6000 can trigger issue based the supplied data + // 5. Comment the zero-size blocks fetch fail exception in ShuffleBlockFetcherIterator + // or this job will failed with FetchFailedException. + withSQLConf( + SQLConf.SHUFFLE_PARTITIONS.key -> "6000", + config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.key -> "-1") { + withTempView("t") { + val df = spark.read.parquet(testFile("test-data/dates.parquet")).toDF("date") + df.createOrReplaceTempView("t") + checkAnswer( + sql("SELECT COUNT(*) FROM t"), + sql( + """ + |SELECT SUM(a) FROM + |( + |SELECT COUNT(*) a, date + |FROM t + |GROUP BY date + |) + """.stripMargin)) + } + } + } +}