diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 67e993c7f02e2..7aecd3c9668ea 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -99,7 +99,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def calcChecksum(block: ByteBuffer): Int = { val adler = new Adler32() if (block.hasArray) { - adler.update(block.array, block.arrayOffset + block.position, block.limit - block.position) + adler.update(block.array, block.arrayOffset + block.position(), block.limit() + - block.position()) } else { val bytes = new Array[Byte](block.remaining()) block.duplicate.get(bytes) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index e3e555eaa0277..af0a0ab656564 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -452,7 +452,7 @@ private[spark] class Executor( // TODO: do not serialize value twice val directResult = new DirectTaskResult(valueBytes, accumUpdates) val serializedDirectResult = ser.serialize(directResult) - val resultSize = serializedDirectResult.limit + val resultSize = serializedDirectResult.limit() // directSend = sending directly back to the driver val serializedResult: ByteBuffer = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 22d9c4cf81c55..bb4683caa1d8a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -288,13 +288,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { val serializedTask = TaskDescription.encode(task) - if (serializedTask.limit >= maxRpcMessageSize) { + if (serializedTask.limit() >= maxRpcMessageSize) { scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + "spark.rpc.message.maxSize or using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) + msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize) taskSetMgr.abort(msg) } catch { case e: Exception => logError("Exception in error callback", e) diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala index a938cb07724c7..a5ee0ff16b5df 100644 --- a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala +++ b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala @@ -54,7 +54,7 @@ class ByteBufferInputStream(private var buffer: ByteBuffer) override def skip(bytes: Long): Long = { if (buffer != null) { val amountToSkip = math.min(bytes, buffer.remaining).toInt - buffer.position(buffer.position + amountToSkip) + buffer.position(buffer.position() + amountToSkip) if (buffer.remaining() == 0) { cleanUp() } diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index c28570fb24560..7367af7888bd8 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -65,7 +65,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { for (bytes <- getChunks()) { while (bytes.remaining() > 0) { val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) - bytes.limit(bytes.position + ioSize) + bytes.limit(bytes.position() + ioSize) channel.write(bytes) } } @@ -206,7 +206,7 @@ private[spark] class ChunkedByteBufferInputStream( override def skip(bytes: Long): Long = { if (currentChunk != null) { val amountToSkip = math.min(bytes, currentChunk.remaining).toInt - currentChunk.position(currentChunk.position + amountToSkip) + currentChunk.position(currentChunk.position() + amountToSkip) if (currentChunk.remaining() == 0) { if (chunks.hasNext) { currentChunk = chunks.next() diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index eaec098b8d785..fc78655bf52ec 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -199,7 +199,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { def check[T: ClassTag](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) // Check that very long ranges don't get written one element at a time - assert(ser.serialize(t).limit < 100) + assert(ser.serialize(t).limit() < 100) } check(1 to 1000000) check(1 to 1000000 by 2) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index 7258fdf5efc0d..efdd02fff7871 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -118,7 +118,7 @@ class DiskStoreSuite extends SparkFunSuite { val chunks = chunkedByteBuffer.chunks assert(chunks.size === 2) for (chunk <- chunks) { - assert(chunk.limit === 10 * 1024) + assert(chunk.limit() === 10 * 1024) } val e = intercept[IllegalArgumentException]{ diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 2df8352f48660..08db4d827e400 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -296,7 +296,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L props.put("replica.socket.timeout.ms", "1500") props.put("delete.topic.enable", "true") props.put("offsets.topic.num.partitions", "1") - props.putAll(withBrokerProps.asJava) + // Can not use properties.putAll(propsMap.asJava) in scala-2.12 + // See https://github.com/scala/bug/issues/10418 + withBrokerProps.foreach { case (k, v) => props.put(k, v) } props } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala index 2f09757aa341c..341ade1a5c613 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala @@ -35,7 +35,7 @@ private[columnar] trait NullableColumnAccessor extends ColumnAccessor { nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else -1 pos = 0 - underlyingBuffer.position(underlyingBuffer.position + 4 + nullCount * 4) + underlyingBuffer.position(underlyingBuffer.position() + 4 + nullCount * 4) super.initialize() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala index bf00ad997c76e..79dcf3a6105ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala @@ -112,7 +112,7 @@ private[columnar] case object PassThrough extends CompressionScheme { var nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else capacity var pos = 0 var seenNulls = 0 - var bufferPos = buffer.position + var bufferPos = buffer.position() while (pos < capacity) { if (pos != nextNullIndex) { val len = nextNullIndex - pos diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala index d786a610f1535..3328400b214fb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala @@ -412,7 +412,9 @@ case class HiveScriptIOSchema ( propsMap = propsMap + (serdeConstants.LIST_COLUMN_TYPES -> columnTypesNames) val properties = new Properties() - properties.putAll(propsMap.asJava) + // Can not use properties.putAll(propsMap.asJava) in scala-2.12 + // See https://github.com/scala/bug/issues/10418 + propsMap.foreach { case (k, v) => properties.put(k, v) } serde.initialize(null, properties) serde @@ -424,7 +426,9 @@ case class HiveScriptIOSchema ( recordReaderClass.map { klass => val instance = Utils.classForName(klass).newInstance().asInstanceOf[RecordReader] val props = new Properties() - props.putAll(outputSerdeProps.toMap.asJava) + // Can not use props.putAll(outputSerdeProps.toMap.asJava) in scala-2.12 + // See https://github.com/scala/bug/issues/10418 + outputSerdeProps.toMap.foreach { case (k, v) => props.put(k, v) } instance.initialize(inputStream, conf, props) instance } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala index b2ec33e82ddaa..b22bbb79a5cc9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala @@ -98,7 +98,7 @@ class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel) /** Read a buffer fully from a given Channel */ private def readFully(channel: ReadableByteChannel, dest: ByteBuffer) { - while (dest.position < dest.limit) { + while (dest.position() < dest.limit()) { if (channel.read(dest) == -1) { throw new EOFException("End of channel") }