diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index c8993e17bba67..ad9c979d8d36b 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -416,8 +416,9 @@ package object config { .internal() .doc("The chunk size in bytes during writing out the bytes of ChunkedByteBuffer.") .bytesConf(ByteUnit.BYTE) - .checkValue(_ <= Int.MaxValue, "The chunk size during writing out the bytes of" + - " ChunkedByteBuffer should not larger than Int.MaxValue.") + .checkValue(_ <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, + "The chunk size during writing out the bytes of" + + " ChunkedByteBuffer should not larger than Int.MaxValue - 15.") .createWithDefault(64 * 1024 * 1024) private[spark] val CHECKPOINT_COMPRESS = @@ -488,8 +489,9 @@ package object config { "otherwise specified. These buffers reduce the number of disk seeks and system calls " + "made in creating intermediate shuffle files.") .bytesConf(ByteUnit.KiB) - .checkValue(v => v > 0 && v <= Int.MaxValue / 1024, - s"The file buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.") + .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024, + s"The file buffer size must be greater than 0 and less than" + + s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.") .createWithDefaultString("32k") private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE = @@ -497,8 +499,9 @@ package object config { .doc("The file system for this buffer size after each partition " + "is written in unsafe shuffle writer. In KiB unless otherwise specified.") .bytesConf(ByteUnit.KiB) - .checkValue(v => v > 0 && v <= Int.MaxValue / 1024, - s"The buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.") + .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024, + s"The buffer size must be greater than 0 and less than" + + s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.") .createWithDefaultString("32k") private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE = @@ -610,7 +613,7 @@ package object config { .internal() .doc("For testing only, controls the size of chunks when memory mapping a file") .bytesConf(ByteUnit.BYTE) - .createWithDefault(Int.MaxValue) + .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) private[spark] val BARRIER_SYNC_TIMEOUT = ConfigBuilder("spark.barrier.sync.timeout") diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 841e16afc7549..29963a95cb074 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -33,6 +33,7 @@ import org.apache.spark.internal.{config, Logging} import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils} import org.apache.spark.security.CryptoStreamUtils +import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.util.Utils import org.apache.spark.util.io.ChunkedByteBuffer @@ -217,7 +218,7 @@ private[spark] class EncryptedBlockData( var remaining = blockSize val chunks = new ListBuffer[ByteBuffer]() while (remaining > 0) { - val chunkSize = math.min(remaining, Int.MaxValue) + val chunkSize = math.min(remaining, ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) val chunk = allocator(chunkSize.toInt) remaining -= chunkSize JavaUtils.readFully(source, chunk) @@ -235,7 +236,8 @@ private[spark] class EncryptedBlockData( // This is used by the block transfer service to replicate blocks. The upload code reads // all bytes into memory to send the block to the remote executor, so it's ok to do this // as long as the block fits in a Java array. - assert(blockSize <= Int.MaxValue, "Block is too large to be wrapped in a byte buffer.") + assert(blockSize <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, + "Block is too large to be wrapped in a byte buffer.") val dst = ByteBuffer.allocate(blockSize.toInt) val in = open() try { diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 06fd56e54d9c8..8513359934bec 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -34,6 +34,7 @@ import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.serializer.{SerializationStream, SerializerManager} import org.apache.spark.storage._ import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.util.{SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} @@ -333,11 +334,11 @@ private[spark] class MemoryStore( // Initial per-task memory to request for unrolling blocks (bytes). val initialMemoryThreshold = unrollMemoryThreshold - val chunkSize = if (initialMemoryThreshold > Int.MaxValue) { + val chunkSize = if (initialMemoryThreshold > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " + s"is too large to be set as chunk size. Chunk size has been capped to " + - s"${Utils.bytesToString(Int.MaxValue)}") - Int.MaxValue + s"${Utils.bytesToString(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)}") + ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH } else { initialMemoryThreshold.toInt } diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index da2be84723a07..870830fff4c3e 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -97,7 +97,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { * @throws UnsupportedOperationException if this buffer's size exceeds the maximum array size. */ def toArray: Array[Byte] = { - if (size >= Integer.MAX_VALUE) { + if (size >= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { throw new UnsupportedOperationException( s"cannot call toArray because buffer size ($size bytes) exceeds maximum array size") } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index bf9b4cfe15b2c..e474cfa002fad 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -30,6 +30,7 @@ import org.apache.spark.ml.{linalg => newlinalg} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData} import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.array.ByteArrayMethods /** * Trait for a local matrix. @@ -456,7 +457,7 @@ object DenseMatrix { */ @Since("1.3.0") def zeros(numRows: Int, numCols: Int): DenseMatrix = { - require(numRows.toLong * numCols <= Int.MaxValue, + require(numRows.toLong * numCols <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, s"$numRows x $numCols dense matrix is too large to allocate") new DenseMatrix(numRows, numCols, new Array[Double](numRows * numCols)) } @@ -469,7 +470,7 @@ object DenseMatrix { */ @Since("1.3.0") def ones(numRows: Int, numCols: Int): DenseMatrix = { - require(numRows.toLong * numCols <= Int.MaxValue, + require(numRows.toLong * numCols <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, s"$numRows x $numCols dense matrix is too large to allocate") new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(1.0)) } @@ -499,7 +500,7 @@ object DenseMatrix { */ @Since("1.3.0") def rand(numRows: Int, numCols: Int, rng: Random): DenseMatrix = { - require(numRows.toLong * numCols <= Int.MaxValue, + require(numRows.toLong * numCols <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, s"$numRows x $numCols dense matrix is too large to allocate") new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(rng.nextDouble())) } @@ -513,7 +514,7 @@ object DenseMatrix { */ @Since("1.3.0") def randn(numRows: Int, numCols: Int, rng: Random): DenseMatrix = { - require(numRows.toLong * numCols <= Int.MaxValue, + require(numRows.toLong * numCols <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, s"$numRows x $numCols dense matrix is too large to allocate") new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(rng.nextGaussian())) } @@ -846,8 +847,8 @@ object SparseMatrix { s"density must be a double in the range 0.0 <= d <= 1.0. Currently, density: $density") val size = numRows.toLong * numCols val expected = size * density - assert(expected < Int.MaxValue, - "The expected number of nonzeros cannot be greater than Int.MaxValue.") + assert(expected < ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, + "The expected number of nonzeros cannot be greater than Int.MaxValue - 15.") val nnz = math.ceil(expected).toInt if (density == 0.0) { new SparseMatrix(numRows, numCols, new Array[Int](numCols + 1), Array.empty, Array.empty) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fa59fa578a969..518115dafd011 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -27,7 +27,6 @@ import scala.collection.immutable import scala.util.matching.Regex import org.apache.hadoop.fs.Path -import org.tukaani.xz.LZMA2Options import org.apache.spark.{SparkContext, TaskContext} import org.apache.spark.internal.Logging @@ -36,6 +35,7 @@ import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator +import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.util.Utils //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -1246,7 +1246,7 @@ object SQLConf { .doc("Threshold for number of rows guaranteed to be held in memory by the sort merge " + "join operator") .intConf - .createWithDefault(Int.MaxValue) + .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD = buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold") @@ -1480,7 +1480,7 @@ object SQLConf { "'SELECT x FROM t ORDER BY y LIMIT m', if m is under this threshold, do a top-K sort" + " in memory, otherwise do a global sort which spills to disk if necessary.") .intConf - .createWithDefault(Int.MaxValue) + .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index c91b0d778fab1..d53400512cb84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql import java.io.CharArrayWriter -import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ import scala.language.implicitConversions @@ -46,7 +45,6 @@ import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection} -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters} import org.apache.spark.sql.execution.command._ @@ -57,6 +55,7 @@ import org.apache.spark.sql.streaming.DataStreamWriter import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.storage.StorageLevel +import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.types.CalendarInterval import org.apache.spark.util.Utils @@ -287,7 +286,7 @@ class Dataset[T] private[sql]( _numRows: Int, truncate: Int = 20, vertical: Boolean = false): String = { - val numRows = _numRows.max(0).min(Int.MaxValue - 1) + val numRows = _numRows.max(0).min(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH - 1) // Get rows represented by Seq[Seq[String]], we may get one more line if it has more data. val tmpRows = getRows(numRows, truncate) @@ -3264,7 +3263,7 @@ class Dataset[T] private[sql]( _numRows: Int, truncate: Int): Array[Any] = { EvaluatePython.registerPicklers() - val numRows = _numRows.max(0).min(Int.MaxValue - 1) + val numRows = _numRows.max(0).min(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH - 1) val rows = getRows(numRows, truncate).map(_.toArray).toArray val toJava: (Any) => Any = EvaluatePython.toJava(_, ArrayType(ArrayType(StringType))) val iter: Iterator[Array[Byte]] = new SerDeUtil.AutoBatchedPickler(