From 480f9cdaf69254dd429b949d9ccc6d0b2c617ad0 Mon Sep 17 00:00:00 2001 From: Dubovsky Jakub Date: Wed, 8 Oct 2014 15:49:41 +0200 Subject: [PATCH 1/5] Bug 3121 fixed --- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../org/apache/spark/SparkContextSuite.scala | 34 +++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) create mode 100644 core/src/test/scala/org/apache/spark/SparkContextSuite.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 979d178c35969..ccaf8a0754e23 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1409,7 +1409,7 @@ object SparkContext extends Logging { simpleWritableConverter[Boolean, BooleanWritable](_.get) implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = { - simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes) + simpleWritableConverter[Array[Byte], BytesWritable](bw => bw.getBytes.take(bw.getLength)) } implicit def stringWritableConverter(): WritableConverter[String] = diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala new file mode 100644 index 0000000000000..a50c24ae6e939 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.scalatest.FunSuite + +import org.apache.hadoop.io.BytesWritable + +class SparkContextSuite extends FunSuite { + test("test of writing spark scala test") { + val bytesWritable = new BytesWritable() + bytesWritable.set((1 to 10).map(_.toByte).toArray, 0, 10) + bytesWritable.set((1 to 5).map(_.toByte).toArray, 0, 5) + + val converter = SparkContext.bytesWritableConverter() + val byteArray = converter.convert(bytesWritable) + assert(byteArray.length === 5) + } +} From f92ffa64c9587057dce5a4b51d32074b68aeab23 Mon Sep 17 00:00:00 2001 From: Dubovsky Jakub Date: Thu, 9 Oct 2014 16:05:43 +0200 Subject: [PATCH 2/5] performance tuning --- core/src/main/scala/org/apache/spark/SparkContext.scala | 4 +++- .../test/scala/org/apache/spark/SparkContextSuite.scala | 9 +++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ccaf8a0754e23..9b62520d55038 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -17,6 +17,8 @@ package org.apache.spark +import java.util.Arrays + import scala.language.implicitConversions import java.io._ @@ -1409,7 +1411,7 @@ object SparkContext extends Logging { simpleWritableConverter[Boolean, BooleanWritable](_.get) implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = { - simpleWritableConverter[Array[Byte], BytesWritable](bw => bw.getBytes.take(bw.getLength)) + simpleWritableConverter[Array[Byte], BytesWritable](bw => Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)) } implicit def stringWritableConverter(): WritableConverter[String] = diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index a50c24ae6e939..dc31a0aa07704 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -24,11 +24,16 @@ import org.apache.hadoop.io.BytesWritable class SparkContextSuite extends FunSuite { test("test of writing spark scala test") { val bytesWritable = new BytesWritable() - bytesWritable.set((1 to 10).map(_.toByte).toArray, 0, 10) - bytesWritable.set((1 to 5).map(_.toByte).toArray, 0, 5) + val inputArray = (1 to 10).map(_.toByte).toArray + bytesWritable.set(inputArray, 0, 10) + bytesWritable.set(inputArray, 0, 5) val converter = SparkContext.bytesWritableConverter() val byteArray = converter.convert(bytesWritable) assert(byteArray.length === 5) + + bytesWritable.set(inputArray, 0, 0) + val byteArray2 = converter.convert(bytesWritable) + assert(byteArray2.length === 0) } } From 406e26c5aa9a28b76a69565e939dc1dc557ccc4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Dubovsk=C3=BD?= Date: Sat, 11 Oct 2014 00:00:59 +0200 Subject: [PATCH 3/5] Scala style fixed --- core/src/main/scala/org/apache/spark/SparkContext.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9b62520d55038..e6f48856443b1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1411,7 +1411,9 @@ object SparkContext extends Logging { simpleWritableConverter[Boolean, BooleanWritable](_.get) implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = { - simpleWritableConverter[Array[Byte], BytesWritable](bw => Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)) + simpleWritableConverter[Array[Byte], BytesWritable](bw => + Arrays.copyOfRange(bw.getBytes, 0, bw.getLength) + ) } implicit def stringWritableConverter(): WritableConverter[String] = From 1b20d5193fa149347f9c8c05bb25298992324d4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Dubovsk=C3=BD?= Date: Sat, 11 Oct 2014 10:58:02 +0200 Subject: [PATCH 4/5] Import placed correctly --- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e6f48856443b1..9e011a5c512d9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -17,12 +17,11 @@ package org.apache.spark -import java.util.Arrays - import scala.language.implicitConversions import java.io._ import java.net.URI +import java.util.Arrays import java.util.concurrent.atomic.AtomicInteger import java.util.{Properties, UUID} import java.util.UUID.randomUUID From f85d24c954be419045236cfabc5613aab4c2a169 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Dubovsk=C3=BD?= Date: Sun, 12 Oct 2014 01:16:18 +0200 Subject: [PATCH 5/5] Test name changed, comments added --- core/src/main/scala/org/apache/spark/SparkContext.scala | 1 + core/src/test/scala/org/apache/spark/SparkContextSuite.scala | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9e011a5c512d9..95c0520f14994 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1411,6 +1411,7 @@ object SparkContext extends Logging { implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = { simpleWritableConverter[Array[Byte], BytesWritable](bw => + // getBytes method returns array which is longer then data to be returned Arrays.copyOfRange(bw.getBytes, 0, bw.getLength) ) } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index dc31a0aa07704..31edad1c56c73 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -22,7 +22,8 @@ import org.scalatest.FunSuite import org.apache.hadoop.io.BytesWritable class SparkContextSuite extends FunSuite { - test("test of writing spark scala test") { + //Regression test for SPARK-3121 + test("BytesWritable implicit conversion is correct") { val bytesWritable = new BytesWritable() val inputArray = (1 to 10).map(_.toByte).toArray bytesWritable.set(inputArray, 0, 10)