diff --git a/assembly/pom.xml b/assembly/pom.xml index 26f1f804e3203..7370209b42360 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../pom.xml org.apache.spark - spark-assembly_2.11 + spark-assembly_2.10 Spark Project Assembly http://spark.apache.org/ pom diff --git a/bagel/pom.xml b/bagel/pom.xml index 99232b31260be..e49f478cd6170 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../pom.xml org.apache.spark - spark-bagel_2.11 + spark-bagel_2.10 bagel diff --git a/core/pom.xml b/core/pom.xml index 3afa33068104b..5f3d692e49060 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../pom.xml org.apache.spark - spark-core_2.11 + spark-core_2.10 core diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java index 8f4e3229976dc..1e924d2aec442 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java @@ -61,10 +61,10 @@ public void copyElement(LongArray src, int srcPos, LongArray dst, int dstPos) { public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) { Platform.copyMemory( src.getBaseObject(), - src.getBaseOffset() + srcPos * 8, + src.getBaseOffset() + srcPos * 8L, dst.getBaseObject(), - dst.getBaseOffset() + dstPos * 8, - length * 8 + dst.getBaseOffset() + dstPos * 8L, + length * 8L ); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/RecordPointerAndKeyPrefix.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/RecordPointerAndKeyPrefix.java index dbf6770e07391..a0a2c04cc82bd 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/RecordPointerAndKeyPrefix.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/RecordPointerAndKeyPrefix.java @@ -19,7 +19,7 @@ import org.apache.spark.memory.TaskMemoryManager; -final class RecordPointerAndKeyPrefix { +public final class RecordPointerAndKeyPrefix { /** * A pointer to a record; see {@link TaskMemoryManager} for a * description of how these addresses are encoded. diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java index d3137f5f31c25..8e73631a081cd 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java @@ -29,7 +29,8 @@ * Within each long[] buffer, position {@code 2 * i} holds a pointer pointer to the record at * index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix. */ -final class UnsafeSortDataFormat extends SortDataFormat { +public final class UnsafeSortDataFormat + extends SortDataFormat { public static final UnsafeSortDataFormat INSTANCE = new UnsafeSortDataFormat(); @@ -73,10 +74,10 @@ public void copyElement(LongArray src, int srcPos, LongArray dst, int dstPos) { public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) { Platform.copyMemory( src.getBaseObject(), - src.getBaseOffset() + srcPos * 16, + src.getBaseOffset() + srcPos * 16L, dst.getBaseObject(), - dst.getBaseOffset() + dstPos * 16, - length * 16); + dst.getBaseOffset() + dstPos * 16L, + length * 16L); } @Override diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 43c89b258f2fa..871b9d1ad575b 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -22,6 +22,7 @@ import java.net.{URI, URL} import java.nio.charset.StandardCharsets import java.nio.file.Paths import java.util.Arrays +import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.jar.{JarEntry, JarOutputStream} import scala.collection.JavaConverters._ @@ -190,8 +191,14 @@ private[spark] object TestUtils { private class SpillListener extends SparkListener { private val stageIdToTaskMetrics = new mutable.HashMap[Int, ArrayBuffer[TaskMetrics]] private val spilledStageIds = new mutable.HashSet[Int] + private val stagesDone = new CountDownLatch(1) - def numSpilledStages: Int = spilledStageIds.size + def numSpilledStages: Int = { + // Long timeout, just in case somehow the job end isn't notified. + // Fails if a timeout occurs + assert(stagesDone.await(10, TimeUnit.SECONDS)) + spilledStageIds.size + } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { stageIdToTaskMetrics.getOrElseUpdate( @@ -206,4 +213,8 @@ private class SpillListener extends SparkListener { spilledStageIds += stageId } } + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + stagesDone.countDown() + } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index ab5bde55e683a..1131f8d5bb3d5 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -114,6 +114,19 @@ private[spark] class Executor( private val heartbeatReceiverRef = RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv) + /** + * When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES` + * times, it should kill itself. The default value is 60. It means we will retry to send + * heartbeats about 10 minutes because the heartbeat interval is 10s. + */ + private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60) + + /** + * Count the failure times of heartbeat. It should only be acessed in the heartbeat thread. Each + * successful heartbeat will reset it to 0. + */ + private var heartbeatFailures = 0 + startDriverHeartbeater() def launchTask( @@ -218,6 +231,7 @@ private[spark] class Executor( threwException = false res } finally { + val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId) val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory() if (freedMemory > 0) { val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId" @@ -227,6 +241,17 @@ private[spark] class Executor( logError(errMsg) } } + + if (releasedLocks.nonEmpty) { + val errMsg = + s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" + + releasedLocks.mkString("[", ", ", "]") + if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false) && !threwException) { + throw new SparkException(errMsg) + } else { + logError(errMsg) + } + } } val taskFinish = System.currentTimeMillis() @@ -434,7 +459,9 @@ private[spark] class Executor( // JobProgressListener will hold an reference of it during // onExecutorMetricsUpdate(), then JobProgressListener can not see // the changes of metrics any more, so make a deep copy of it - val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics)) + val copiedMetrics = Utils.deserialize[TaskMetrics]( + Utils.serialize(metrics), + Utils.getContextOrSparkClassLoader) tasksMetrics += ((taskRunner.taskId, copiedMetrics)) } else { // It will be copied by serialization @@ -452,8 +479,16 @@ private[spark] class Executor( logInfo("Told to re-register on heartbeat") env.blockManager.reregister() } + heartbeatFailures = 0 } catch { - case NonFatal(e) => logWarning("Issue communicating with driver in heartbeater", e) + case NonFatal(e) => + logWarning("Issue communicating with driver in heartbeater", e) + heartbeatFailures += 1 + if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) { + logError(s"Exit as unable to send heartbeats to driver " + + s"more than $HEARTBEAT_MAX_FAILURES times") + System.exit(ExecutorExitCode.HEARTBEAT_FAILURE) + } } } diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala index ea36fb60bd540..99858f785600d 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala @@ -39,6 +39,12 @@ object ExecutorExitCode { /** ExternalBlockStore failed to create a local temporary directory after many attempts. */ val EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR = 55 + /** + * Executor is unable to send heartbeats to the driver more than + * "spark.executor.heartbeat.maxFailures" times. + */ + val HEARTBEAT_FAILURE = 56 + def explainExitCode(exitCode: Int): String = { exitCode match { case UNCAUGHT_EXCEPTION => "Uncaught exception" @@ -51,6 +57,8 @@ object ExecutorExitCode { // TODO: replace external block store with concrete implementation name case EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR => "ExternalBlockStore failed to create a local temporary directory." + case HEARTBEAT_FAILURE => + "Unable to send heartbeats to driver." case _ => "Unknown executor exit code (" + exitCode + ")" + ( if (exitCode > 128) { diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala index 70af83b5ee092..89edaf58ebc29 100644 --- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -119,13 +119,13 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w } /** - * Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the number - * of bytes removed from the pool's capacity. + * Free space to shrink the size of this storage memory pool by `spaceToFree` bytes. + * Note: this method doesn't actually reduce the pool size but relies on the caller to do so. + * + * @return number of bytes to be removed from the pool's capacity. */ - def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized { - // First, shrink the pool by reclaiming free memory: + def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized { val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree) - decrementPoolSize(spaceFreedByReleasingUnusedMemory) val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory if (remainingSpaceToFree > 0) { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: @@ -134,7 +134,6 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. - decrementPoolSize(spaceFreedByEviction) spaceFreedByReleasingUnusedMemory + spaceFreedByEviction } else { spaceFreedByReleasingUnusedMemory diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 829f054dba0e9..802087c82b713 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -57,8 +57,12 @@ private[spark] class UnifiedMemoryManager private[memory] ( storageRegionSize, maxMemory - storageRegionSize) { + assertInvariant() + // We always maintain this invariant: - assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + private def assertInvariant(): Unit = { + assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + } override def maxStorageMemory: Long = synchronized { maxMemory - onHeapExecutionMemoryPool.memoryUsed @@ -77,7 +81,7 @@ private[spark] class UnifiedMemoryManager private[memory] ( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long = synchronized { - assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + assertInvariant() assert(numBytes >= 0) memoryMode match { case MemoryMode.ON_HEAP => @@ -99,9 +103,10 @@ private[spark] class UnifiedMemoryManager private[memory] ( math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize) if (memoryReclaimableFromStorage > 0) { // Only reclaim as much space as is necessary and available: - val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace( + val spaceToReclaim = storageMemoryPool.freeSpaceToShrinkPool( math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) - onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed) + storageMemoryPool.decrementPoolSize(spaceToReclaim) + onHeapExecutionMemoryPool.incrementPoolSize(spaceToReclaim) } } } @@ -137,7 +142,7 @@ private[spark] class UnifiedMemoryManager private[memory] ( blockId: BlockId, numBytes: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { - assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + assertInvariant() assert(numBytes >= 0) if (numBytes > maxStorageMemory) { // Fail fast if the block simply won't fit diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index 4e2269f81235b..1499d149391b8 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -43,5 +43,5 @@ package org.apache package object spark { // For package docs only - val SPARK_VERSION = "1.6.1" + val SPARK_VERSION = "1.6.2" } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 832eef38eef56..77a8a195ffeb1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -720,7 +720,16 @@ private[spark] class TaskSetManager( failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()). put(info.executorId, clock.getTimeMillis()) sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics) - addPendingTask(index) + + if (successful(index)) { + logInfo( + s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " + + "but another instance of the task has already succeeded, " + + "so not re-queuing the task to be re-executed.") + } else { + addPendingTask(index) + } + if (!isZombie && state != TaskState.KILLED && reason.isInstanceOf[TaskFailedReason] && reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 505c161141c88..75b1d29ab8878 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -179,6 +179,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp context.reply(true) case RemoveExecutor(executorId, reason) => + // We will remove the executor's state and cannot restore it. However, the connection + // between the driver and the executor may be still alive so that the executor won't exit + // automatically, so try to tell the executor to stop itself. See SPARK-13519. + executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor)) removeExecutor(executorId, reason) context.reply(true) @@ -263,7 +267,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason) listenerBus.post( SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString)) - case None => logInfo(s"Asked to remove non-existent executor $executorId") + case None => + // SPARK-15262: If an executor is still alive even after the scheduler has removed + // its metadata, we may receive a heartbeat from that executor and tell its block + // manager to reregister itself. If that happens, the block manager master will know + // about the executor, but the scheduler will not. Therefore, we should remove the + // executor from the block manager when we hit this case. + scheduler.sc.env.blockManager.master.removeExecutorAsync(executorId) + logInfo(s"Asked to remove non-existent executor $executorId") } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 538272dc00db6..339ee1442e158 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -19,12 +19,14 @@ package org.apache.spark.storage import java.io._ import java.nio.{ByteBuffer, MappedByteBuffer} +import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.Random import scala.util.control.NonFatal +import scala.collection.JavaConverters._ import sun.nio.ch.DirectBuffer @@ -65,7 +67,7 @@ private[spark] class BlockManager( val master: BlockManagerMaster, defaultSerializer: Serializer, val conf: SparkConf, - memoryManager: MemoryManager, + val memoryManager: MemoryManager, mapOutputTracker: MapOutputTracker, shuffleManager: ShuffleManager, blockTransferService: BlockTransferService, @@ -164,6 +166,11 @@ private[spark] class BlockManager( * loaded yet. */ private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf) + // Blocks are removing by another thread + val pendingToRemove = new ConcurrentHashMap[BlockId, Long]() + + private val NON_TASK_WRITER = -1024L + /** * Initializes the BlockManager with the given appId. This is not performed in the constructor as * the appId may not be known at BlockManager instantiation time (in particular for the driver, @@ -500,11 +507,14 @@ private[spark] class BlockManager( // Look for block on disk, potentially storing it back in memory if required if (level.useDisk) { logDebug(s"Getting block $blockId from disk") - val bytes: ByteBuffer = diskStore.getBytes(blockId) match { - case Some(b) => b - case None => - throw new BlockException( - blockId, s"Block $blockId not found on disk, though it should be") + val bytes: ByteBuffer = if (diskStore.contains(blockId)) { + // DiskStore.getBytes() always returns Some, so this .get() is guaranteed to be safe + diskStore.getBytes(blockId).get + } else { + // Remove the missing block so that its unavailability is reported to the driver + removeBlock(blockId) + throw new BlockException( + blockId, s"Block $blockId not found on disk, though it should be") } assert(0 == bytes.position()) @@ -1025,54 +1035,58 @@ private[spark] class BlockManager( val info = blockInfo.get(blockId).orNull // If the block has not already been dropped - if (info != null) { - info.synchronized { - // required ? As of now, this will be invoked only for blocks which are ready - // But in case this changes in future, adding for consistency sake. - if (!info.waitForReady()) { - // If we get here, the block write failed. - logWarning(s"Block $blockId was marked as failure. Nothing to drop") - return None - } else if (blockInfo.get(blockId).isEmpty) { - logWarning(s"Block $blockId was already dropped.") - return None - } - var blockIsUpdated = false - val level = info.level + if (info != null && pendingToRemove.putIfAbsent(blockId, currentTaskAttemptId) == 0L) { + try { + info.synchronized { + // required ? As of now, this will be invoked only for blocks which are ready + // But in case this changes in future, adding for consistency sake. + if (!info.waitForReady()) { + // If we get here, the block write failed. + logWarning(s"Block $blockId was marked as failure. Nothing to drop") + return None + } else if (blockInfo.get(blockId).isEmpty) { + logWarning(s"Block $blockId was already dropped.") + return None + } + var blockIsUpdated = false + val level = info.level - // Drop to disk, if storage level requires - if (level.useDisk && !diskStore.contains(blockId)) { - logInfo(s"Writing block $blockId to disk") - data() match { - case Left(elements) => - diskStore.putArray(blockId, elements, level, returnValues = false) - case Right(bytes) => - diskStore.putBytes(blockId, bytes, level) + // Drop to disk, if storage level requires + if (level.useDisk && !diskStore.contains(blockId)) { + logInfo(s"Writing block $blockId to disk") + data() match { + case Left(elements) => + diskStore.putArray(blockId, elements, level, returnValues = false) + case Right(bytes) => + diskStore.putBytes(blockId, bytes, level) + } + blockIsUpdated = true } - blockIsUpdated = true - } - // Actually drop from memory store - val droppedMemorySize = - if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L - val blockIsRemoved = memoryStore.remove(blockId) - if (blockIsRemoved) { - blockIsUpdated = true - } else { - logWarning(s"Block $blockId could not be dropped from memory as it does not exist") - } + // Actually drop from memory store + val droppedMemorySize = + if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L + val blockIsRemoved = memoryStore.remove(blockId) + if (blockIsRemoved) { + blockIsUpdated = true + } else { + logWarning(s"Block $blockId could not be dropped from memory as it does not exist") + } - val status = getCurrentBlockStatus(blockId, info) - if (info.tellMaster) { - reportBlockStatus(blockId, info, status, droppedMemorySize) - } - if (!level.useDisk) { - // The block is completely gone from this node; forget it so we can put() it again later. - blockInfo.remove(blockId) - } - if (blockIsUpdated) { - return Some(status) + val status = getCurrentBlockStatus(blockId, info) + if (info.tellMaster) { + reportBlockStatus(blockId, info, status, droppedMemorySize) + } + if (!level.useDisk) { + // The block is completely gone from this node;forget it so we can put() it again later. + blockInfo.remove(blockId) + } + if (blockIsUpdated) { + return Some(status) + } } + } finally { + pendingToRemove.remove(blockId) } } None @@ -1108,27 +1122,32 @@ private[spark] class BlockManager( def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = { logDebug(s"Removing block $blockId") val info = blockInfo.get(blockId).orNull - if (info != null) { - info.synchronized { - // Removals are idempotent in disk store and memory store. At worst, we get a warning. - val removedFromMemory = memoryStore.remove(blockId) - val removedFromDisk = diskStore.remove(blockId) - val removedFromExternalBlockStore = - if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false - if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) { - logWarning(s"Block $blockId could not be removed as it was not found in either " + - "the disk, memory, or external block store") - } - blockInfo.remove(blockId) - val status = getCurrentBlockStatus(blockId, info) - if (tellMaster && info.tellMaster) { - reportBlockStatus(blockId, info, status) - } - Option(TaskContext.get()).foreach { tc => - val metrics = tc.taskMetrics() - val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) - metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, status))) + if (info != null && pendingToRemove.putIfAbsent(blockId, currentTaskAttemptId) == 0L) { + try { + info.synchronized { + val level = info.level + // Removals are idempotent in disk store and memory store. At worst, we get a warning. + val removedFromMemory = if (level.useMemory) memoryStore.remove(blockId) else false + val removedFromDisk = if (level.useDisk) diskStore.remove(blockId) else false + val removedFromExternalBlockStore = + if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false + if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) { + logWarning(s"Block $blockId could not be removed as it was not found in either " + + "the disk, memory, or external block store") + } + blockInfo.remove(blockId) + val status = getCurrentBlockStatus(blockId, info) + if (tellMaster && info.tellMaster) { + reportBlockStatus(blockId, info, status) + } + Option(TaskContext.get()).foreach { tc => + val metrics = tc.taskMetrics() + val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) + metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, status))) + } } + } finally { + pendingToRemove.remove(blockId) } } else { // The block has already been removed; do nothing. @@ -1151,14 +1170,19 @@ private[spark] class BlockManager( while (iterator.hasNext) { val entry = iterator.next() val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp) - if (time < cleanupTime && shouldDrop(id)) { - info.synchronized { - val level = info.level - if (level.useMemory) { memoryStore.remove(id) } - if (level.useDisk) { diskStore.remove(id) } - if (level.useOffHeap) { externalBlockStore.remove(id) } - iterator.remove() - logInfo(s"Dropped block $id") + if (time < cleanupTime && shouldDrop(id) && + pendingToRemove.putIfAbsent(id, currentTaskAttemptId) == 0L) { + try { + info.synchronized { + val level = info.level + if (level.useMemory) { memoryStore.remove(id) } + if (level.useDisk) { diskStore.remove(id) } + if (level.useOffHeap) { externalBlockStore.remove(id) } + iterator.remove() + logInfo(s"Dropped block $id") + } + } finally { + pendingToRemove.remove(id) } val status = getCurrentBlockStatus(id, info) reportBlockStatus(id, info, status) @@ -1166,6 +1190,32 @@ private[spark] class BlockManager( } } + private def currentTaskAttemptId: Long = { + Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(NON_TASK_WRITER) + } + + def getBlockInfo(blockId: BlockId): BlockInfo = { + blockInfo.get(blockId).orNull + } + + /** + * Release all lock held by the given task, clearing that task's pin bookkeeping + * structures and updating the global pin counts. This method should be called at the + * end of a task (either by a task completion handler or in `TaskRunner.run()`). + * + * @return the ids of blocks whose pins were released + */ + def releaseAllLocksForTask(taskAttemptId: Long): ArrayBuffer[BlockId] = { + var selectLocks = ArrayBuffer[BlockId]() + pendingToRemove.entrySet().asScala.foreach { entry => + if (entry.getValue == taskAttemptId) { + pendingToRemove.remove(entry.getKey) + selectLocks += entry.getKey + } + } + selectLocks + } + private def shouldCompress(blockId: BlockId): Boolean = { blockId match { case _: ShuffleBlockId => compressShuffle @@ -1239,6 +1289,7 @@ private[spark] class BlockManager( rpcEnv.stop(slaveEndpoint) blockInfo.clear() memoryStore.clear() + pendingToRemove.clear() diskStore.clear() if (externalBlockStoreInitialized) { externalBlockStore.clear() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 440c4c18aadd0..10e1d9ec728da 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -41,6 +41,14 @@ class BlockManagerMaster( logInfo("Removed " + execId + " successfully in removeExecutor") } + /** Request removal of a dead executor from the driver endpoint. + * This is only called on the driver side. Non-blocking + */ + def removeExecutorAsync(execId: String) { + driverEndpoint.ask[Boolean](RemoveExecutor(execId)) + logInfo("Removal of executor " + execId + " requested") + } + /** Register the BlockManager's id with the driver. */ def registerBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index 47d6c3646c331..08dc17d5887e9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -23,6 +23,8 @@ import javax.servlet.http.HttpServletRequest import scala.collection.mutable.{HashMap, ListBuffer} import scala.xml._ +import org.apache.commons.lang3.StringEscapeUtils + import org.apache.spark.JobExecutionStatus import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData} import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} @@ -82,9 +84,10 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { case JobExecutionStatus.UNKNOWN => "unknown" } - // The timeline library treats contents as HTML, so we have to escape them; for the - // data-title attribute string we have to escape them twice since that's in a string. + // The timeline library treats contents as HTML, so we have to escape them. We need to add + // extra layers of escaping in order to embed this in a Javascript string literal. val escapedDesc = Utility.escape(displayJobDescription) + val jsEscapedDesc = StringEscapeUtils.escapeEcmaScript(escapedDesc) val jobEventJsonAsStr = s""" |{ @@ -94,7 +97,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { | 'end': new Date(${completionTime}), | 'content': '
' + | 'Status: ${status}
' + | 'Submitted: ${UIUtils.formatDate(new Date(submissionTime))}' + | '${ @@ -104,7 +107,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { "" } }">' + - | '${escapedDesc} (Job ${jobId})
' + | '${jsEscapedDesc} (Job ${jobId})' |} """.stripMargin jobEventJsonAsStr diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index 6a35f0e0a87a0..8c6a6681eabbc 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -24,6 +24,8 @@ import scala.xml.{NodeSeq, Node, Unparsed, Utility} import javax.servlet.http.HttpServletRequest +import org.apache.commons.lang3.StringEscapeUtils + import org.apache.spark.JobExecutionStatus import org.apache.spark.scheduler.StageInfo import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} @@ -64,9 +66,10 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { val submissionTime = stage.submissionTime.get val completionTime = stage.completionTime.getOrElse(System.currentTimeMillis()) - // The timeline library treats contents as HTML, so we have to escape them; for the - // data-title attribute string we have to escape them twice since that's in a string. + // The timeline library treats contents as HTML, so we have to escape them. We need to add + // extra layers of escaping in order to embed this in a Javascript string literal. val escapedName = Utility.escape(name) + val jsEscapedName = StringEscapeUtils.escapeEcmaScript(escapedName) s""" |{ | 'className': 'stage job-timeline-object ${status}', @@ -75,7 +78,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { | 'end': new Date(${completionTime}), | 'content': '
' + | 'Status: ${status.toUpperCase}
' + | 'Submitted: ${UIUtils.formatDate(new Date(submissionTime))}' + | '${ @@ -85,7 +88,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { "" } }">' + - | '${escapedName} (Stage ${stageId}.${attemptId})
', + | '${jsEscapedName} (Stage ${stageId}.${attemptId})', |} """.stripMargin } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0bcbf266dde42..36ab3ac99f111 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory import java.net._ import java.nio.ByteBuffer import java.nio.channels.Channels +import java.nio.charset.StandardCharsets import java.util.concurrent._ import java.util.{Locale, Properties, Random, UUID} import javax.net.ssl.HttpsURLConnection @@ -2308,29 +2309,24 @@ private[spark] class RedirectThread( * the toString method. */ private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.OutputStream { - var pos: Int = 0 - var buffer = new Array[Int](sizeInBytes) + private var pos: Int = 0 + private var isBufferFull = false + private val buffer = new Array[Byte](sizeInBytes) - def write(i: Int): Unit = { - buffer(pos) = i + def write(input: Int): Unit = { + buffer(pos) = input.toByte pos = (pos + 1) % buffer.length + isBufferFull = isBufferFull || (pos == 0) } override def toString: String = { - val (end, start) = buffer.splitAt(pos) - val input = new java.io.InputStream { - val iterator = (start ++ end).iterator - - def read(): Int = if (iterator.hasNext) iterator.next() else -1 - } - val reader = new BufferedReader(new InputStreamReader(input)) - val stringBuilder = new StringBuilder - var line = reader.readLine() - while (line != null) { - stringBuilder.append(line) - stringBuilder.append("\n") - line = reader.readLine() + if (!isBufferFull) { + return new String(buffer, 0, pos, StandardCharsets.UTF_8) } - stringBuilder.toString() + + val nonCircularBuffer = new Array[Byte](sizeInBytes) + System.arraycopy(buffer, pos, nonCircularBuffer, 0, buffer.length - pos) + System.arraycopy(buffer, 0, nonCircularBuffer, buffer.length - pos, pos) + new String(nonCircularBuffer, StandardCharsets.UTF_8) } } diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala index 203dab934ca1f..85983b278ffa3 100644 --- a/core/src/test/scala/org/apache/spark/FailureSuite.scala +++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark +import org.apache.spark.storage.StorageLevel import org.apache.spark.util.NonSerializable import java.io.{IOException, NotSerializableException, ObjectInputStream} @@ -238,6 +239,17 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext { FailureSuiteState.clear() } + test("failure because cached RDD files are missing") { + sc = new SparkContext("local[1,2]", "test") + val rdd = sc.parallelize(1 to 2, 2).persist(StorageLevel.DISK_ONLY) + rdd.count() + // Directly delete all files from the disk store, triggering failures when reading cached data: + SparkEnv.get.blockManager.diskBlockManager.getAllFiles().foreach(_.delete()) + // Each task should fail once due to missing cached data, but then should succeed on its second + // attempt because the missing cache locations will be purged and the blocks will be recomputed. + rdd.count() + } + // TODO: Need to add tests with shuffle fetch failures. } diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index 555b640cb4244..6a195ef7fe5b3 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -76,6 +76,21 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft ms } + /** + * Make a mocked [[MemoryStore]] whose [[MemoryStore.evictBlocksToFreeSpace]] method is + * stubbed to always throw [[RuntimeException]]. + */ + protected def makeBadMemoryStore(mm: MemoryManager): MemoryStore = { + val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS) + when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())).thenAnswer(new Answer[Long] { + override def answer(invocation: InvocationOnMock): Long = { + throw new RuntimeException("bad memory store!") + } + }) + mm.setMemoryStore(ms) + ms + } + /** * Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory. * diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 6cc48597d38f9..46b6916a12fc2 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -255,4 +255,27 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(evictedBlocks.nonEmpty) } + test("SPARK-15260: atomically resize memory pools") { + val conf = new SparkConf() + .set("spark.memory.fraction", "1") + .set("spark.memory.storageFraction", "0") + .set("spark.testing.memory", "1000") + val mm = UnifiedMemoryManager(conf, numCores = 2) + makeBadMemoryStore(mm) + val memoryMode = MemoryMode.ON_HEAP + // Acquire 1000 then release 600 bytes of storage memory, leaving the + // storage memory pool at 1000 bytes but only 400 bytes of which are used. + assert(mm.acquireStorageMemory(dummyBlock, 1000L, evictedBlocks)) + mm.releaseStorageMemory(600L) + // Before the fix for SPARK-15260, we would first shrink the storage pool by the amount of + // unused storage memory (600 bytes), try to evict blocks, then enlarge the execution pool + // by the same amount. If the eviction threw an exception, then we would shrink one pool + // without enlarging the other, resulting in an assertion failure. + intercept[RuntimeException] { + mm.acquireExecutionMemory(1000L, 0, memoryMode) + } + val assertInvariant = PrivateMethod[Unit]('assertInvariant) + mm.invokePrivate[Unit](assertInvariant()) + } + } diff --git a/core/src/test/scala/org/apache/spark/status/api/v1/SimpleDateParamSuite.scala b/core/src/test/scala/org/apache/spark/status/api/v1/SimpleDateParamSuite.scala index 63b0e77629dde..18baeb1cb9c71 100644 --- a/core/src/test/scala/org/apache/spark/status/api/v1/SimpleDateParamSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/api/v1/SimpleDateParamSuite.scala @@ -26,7 +26,8 @@ class SimpleDateParamSuite extends SparkFunSuite with Matchers { test("date parsing") { new SimpleDateParam("2015-02-20T23:21:17.190GMT").timestamp should be (1424474477190L) - new SimpleDateParam("2015-02-20T17:21:17.190EST").timestamp should be (1424470877190L) + // don't use EST, it is ambiguous, use -0500 instead, see SPARK-15723 + new SimpleDateParam("2015-02-20T17:21:17.190-0500").timestamp should be (1424470877190L) new SimpleDateParam("2015-02-20").timestamp should be (1424390400000L) // GMT intercept[WebApplicationException] { new SimpleDateParam("invalid date") diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index c00591fa371aa..47e854596f5dc 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import java.nio.{ByteBuffer, MappedByteBuffer} import java.util.Arrays +import java.util.concurrent.CountDownLatch import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ @@ -424,6 +425,43 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } } + test("deadlock between dropFromMemory and removeBlock") { + store = makeBlockManager(2000) + val a1 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + val lock1 = new CountDownLatch(1) + val lock2 = new CountDownLatch(1) + + val t2 = new Thread { + override def run() = { + val info = store.getBlockInfo("a1") + info.synchronized { + store.pendingToRemove.put("a1", 1L) + lock1.countDown() + lock2.await() + store.pendingToRemove.remove("a1") + } + } + } + + val t1 = new Thread { + override def run() = { + store.memoryManager.synchronized { + t2.start() + lock1.await() + val status = store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer]) + assert(status == None, "this thread can not get block a1") + lock2.countDown() + } + } + } + + t1.start() + t1.join() + t2.join() + store.removeBlock("a1", tellMaster = false) + } + test("correct BlockResult returned from get() calls") { store = makeBlockManager(12000) val list1 = List(new Array[Byte](2000), new Array[Byte](2000)) @@ -1323,4 +1361,45 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(result.data === Right(bytes)) assert(result.droppedBlocks === Nil) } + + private def testReadWithLossOfOnDiskFiles( + storageLevel: StorageLevel, + readMethod: BlockManager => Option[_]): Unit = { + store = makeBlockManager(12000) + assert(store.putSingle("blockId", new Array[Byte](4000), storageLevel).nonEmpty) + assert(store.getStatus("blockId").isDefined) + // Directly delete all files from the disk store, triggering failures when reading blocks: + store.diskBlockManager.getAllFiles().foreach(_.delete()) + // The BlockManager still thinks that these blocks exist: + assert(store.getStatus("blockId").isDefined) + // Because the BlockManager's metadata claims that the block exists (i.e. that it's present + // in at least one store), the read attempts to read it and fails when the on-disk file is + // missing. + intercept[BlockException] { + readMethod(store) + } + // Subsequent read attempts will succeed; the block isn't present but we return an expected + // "block not found" response rather than a fatal error: + assert(readMethod(store).isEmpty) + // The reason why this second read succeeded is because the metadata entry for the missing + // block was removed as a result of the read failure: + assert(store.getStatus("blockId").isEmpty) + } + + test("remove cached block if a read fails due to missing on-disk files") { + val storageLevels = Seq( + StorageLevel(useDisk = true, useMemory = false, deserialized = false), + StorageLevel(useDisk = true, useMemory = false, deserialized = true)) + val readMethods = Map[String, BlockManager => Option[_]]( + "getLocalBytes" -> ((m: BlockManager) => m.getLocalBytes("blockId")), + "getLocal" -> ((m: BlockManager) => m.getLocal("blockId")) + ) + testReadWithLossOfOnDiskFiles(StorageLevel.DISK_ONLY, _.getLocalBytes("blockId")) + for ((readMethodName, readMethod) <- readMethods; storageLevel <- storageLevels) { + withClue(s"$readMethodName $storageLevel") { + testReadWithLossOfOnDiskFiles(storageLevel, readMethod) + } + } + } + } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 7de995af512db..13f85a7fb4097 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.util -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, FileOutputStream} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, FileOutputStream, PrintStream} import java.lang.{Double => JDouble, Float => JFloat} import java.net.{BindException, ServerSocket, URI} import java.nio.{ByteBuffer, ByteOrder} @@ -679,14 +679,37 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(!Utils.isInDirectory(nullFile, childFile3)) } - test("circular buffer") { + test("circular buffer: if nothing was written to the buffer, display nothing") { + val buffer = new CircularBuffer(4) + assert(buffer.toString === "") + } + + test("circular buffer: if the buffer isn't full, print only the contents written") { + val buffer = new CircularBuffer(10) + val stream = new PrintStream(buffer, true, "UTF-8") + stream.print("test") + assert(buffer.toString === "test") + } + + test("circular buffer: data written == size of the buffer") { + val buffer = new CircularBuffer(4) + val stream = new PrintStream(buffer, true, "UTF-8") + + // fill the buffer to its exact size so that it just hits overflow + stream.print("test") + assert(buffer.toString === "test") + + // add more data to the buffer + stream.print("12") + assert(buffer.toString === "st12") + } + + test("circular buffer: multiple overflow") { val buffer = new CircularBuffer(25) - val stream = new java.io.PrintStream(buffer, true, "UTF-8") + val stream = new PrintStream(buffer, true, "UTF-8") - // scalastyle:off println - stream.println("test circular test circular test circular test circular test circular") - // scalastyle:on println - assert(buffer.toString === "t circular test circular\n") + stream.print("test circular test circular test circular test circular test circular") + assert(buffer.toString === "st circular test circular") } test("nanSafeCompareDoubles") { diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index d7b2d07a40052..dae83fec4fdeb 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -17,13 +17,17 @@ package org.apache.spark.util.collection -import org.apache.spark.memory.MemoryTestingUtils +import java.util.Comparator import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark._ +import org.apache.spark.memory.MemoryTestingUtils import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} +import org.apache.spark.unsafe.array.LongArray +import org.apache.spark.unsafe.memory.MemoryBlock +import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordPointerAndKeyPrefix, UnsafeSortDataFormat} class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { @@ -95,6 +99,25 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { testWithMultipleSer("sort without breaking sorting contracts", loadDefaults = true)( sortWithoutBreakingSortingContracts) + // This test is ignored by default as it requires a fairly large heap size (16GB) + ignore("sort without breaking timsort contracts for large arrays") { + val size = 300000000 + // To manifest the bug observed in SPARK-8428 and SPARK-13850, we explicitly use an array of + // the form [150000000, 150000001, 150000002, ...., 300000000, 0, 1, 2, ..., 149999999] + // that can trigger copyRange() in TimSort.mergeLo() or TimSort.mergeHi() + val ref = Array.tabulate[Long](size) { i => if (i < size / 2) size / 2 + i else i } + val buf = new LongArray(MemoryBlock.fromLongArray(ref)) + + new Sorter(UnsafeSortDataFormat.INSTANCE).sort( + buf, 0, size, new Comparator[RecordPointerAndKeyPrefix] { + override def compare( + r1: RecordPointerAndKeyPrefix, + r2: RecordPointerAndKeyPrefix): Int = { + PrefixComparators.LONG.compare(r1.keyPrefix, r2.keyPrefix) + } + }) + } + test("spilling with hash collisions") { val size = 1000 val conf = createSparkConf(loadDefaults = true, kryo = false) diff --git a/dev/run-tests.py b/dev/run-tests.py index 4a18d1a7469c6..94b517d270dfd 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -113,10 +113,11 @@ def determine_modules_to_test(changed_modules): modules_to_test = set() for module in changed_modules: modules_to_test = modules_to_test.union(determine_modules_to_test(module.dependent_modules)) + modules_to_test = modules_to_test.union(set(changed_modules)) # If we need to run all of the tests, then we should short-circuit and return 'root' if modules.root in modules_to_test: return [modules.root] - return modules_to_test.union(set(changed_modules)) + return modules_to_test def determine_tags_to_exclude(changed_modules): @@ -284,7 +285,7 @@ def exec_sbt(sbt_args=()): print(line, end='') retcode = sbt_proc.wait() - if retcode > 0: + if retcode != 0: exit_from_command_with_retcode(sbt_cmd, retcode) diff --git a/dev/sparktestsupport/shellutils.py b/dev/sparktestsupport/shellutils.py index d280e797077d1..05af87189b18d 100644 --- a/dev/sparktestsupport/shellutils.py +++ b/dev/sparktestsupport/shellutils.py @@ -53,7 +53,10 @@ def subprocess_check_call(*popenargs, **kwargs): def exit_from_command_with_retcode(cmd, retcode): - print("[error] running", ' '.join(cmd), "; received return code", retcode) + if retcode < 0: + print("[error] running", ' '.join(cmd), "; process was terminated by signal", -retcode) + else: + print("[error] running", ' '.join(cmd), "; received return code", retcode) sys.exit(int(os.environ.get("CURRENT_BLOCK", 255))) diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh index efb49f74098e0..7367a8bcc7411 100755 --- a/dev/test-dependencies.sh +++ b/dev/test-dependencies.sh @@ -84,7 +84,7 @@ for HADOOP_PROFILE in "${HADOOP_PROFILES[@]}"; do echo "Generating dependency manifest for $HADOOP_PROFILE" mkdir -p dev/pr-deps $MVN $HADOOP_MODULE_PROFILES -P$HADOOP_PROFILE dependency:build-classpath -pl assembly \ - | grep "Building Spark Project Assembly" -A 5 \ + | grep "Dependencies classpath:" -A 1 \ | tail -n 1 | tr ":" "\n" | rev | cut -d "/" -f 1 | rev | sort \ | grep -v spark > dev/pr-deps/spark-deps-$HADOOP_PROFILE done diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml index cd51fd8025ff5..2e8b26e458778 100644 --- a/docker-integration-tests/pom.xml +++ b/docker-integration-tests/pom.xml @@ -20,12 +20,12 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../pom.xml - spark-docker-integration-tests_2.11 + spark-docker-integration-tests_2.10 jar Spark Project Docker Integration Tests http://spark.apache.org/ diff --git a/docs/_config.yml b/docs/_config.yml index 9334516a01e7b..c2ecb59f644c3 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -14,8 +14,8 @@ include: # These allow the documentation to be updated with newer releases # of Spark, Scala, and Mesos. -SPARK_VERSION: 1.6.1 -SPARK_VERSION_SHORT: 1.6.1 +SPARK_VERSION: 1.6.2 +SPARK_VERSION_SHORT: 1.6.2 SCALA_BINARY_VERSION: "2.10" SCALA_VERSION: "2.10.5" MESOS_VERSION: 0.21.0 diff --git a/docs/_plugins/copy_api_dirs.rb b/docs/_plugins/copy_api_dirs.rb index f926d67e6beaf..174c202e37918 100644 --- a/docs/_plugins/copy_api_dirs.rb +++ b/docs/_plugins/copy_api_dirs.rb @@ -37,7 +37,7 @@ # Copy over the unified ScalaDoc for all projects to api/scala. # This directory will be copied over to _site when `jekyll` command is run. - source = "../target/scala-2.11/unidoc" + source = "../target/scala-2.10/unidoc" dest = "api/scala" puts "Making directory " + dest diff --git a/docs/configuration.md b/docs/configuration.md index 64a1899d69769..195fa3c953224 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -284,7 +284,7 @@ Apart from these, the following properties are also available, and may be useful spark.executor.logs.rolling.maxSize (none) - Set the max size of the file by which the executor logs will be rolled over. + Set the max size of the file in bytes by which the executor logs will be rolled over. Rolling is disabled by default. See spark.executor.logs.rolling.maxRetainedFiles for automatic cleaning of old logs. @@ -296,7 +296,7 @@ Apart from these, the following properties are also available, and may be useful Set the strategy of rolling of executor logs. By default it is disabled. It can be set to "time" (time-based rolling) or "size" (size-based rolling). For "time", use spark.executor.logs.rolling.time.interval to set the rolling interval. - For "size", use spark.executor.logs.rolling.size.maxBytes to set + For "size", use spark.executor.logs.rolling.maxSize to set the maximum file size for rolling. diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 803701ef73b28..26511b5148ebe 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -2248,7 +2248,7 @@ import pyspark.sql.functions as func # In 1.3.x, in order for the grouping column "department" to show up, # it must be included explicitly as part of the agg function call. -df.groupBy("department").agg("department"), func.max("age"), func.sum("expense")) +df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense")) # In 1.4+, grouping column "department" is included automatically. df.groupBy("department").agg(func.max("age"), func.sum("expense")) diff --git a/examples/pom.xml b/examples/pom.xml index 9a1e89e8cefe8..fe0cc932c837a 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../pom.xml org.apache.spark - spark-examples_2.11 + spark-examples_2.10 examples diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala index 818d4f2b81f82..ead8f46bc4764 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala @@ -34,7 +34,7 @@ object SparkPi { val y = random * 2 - 1 if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) - println("Pi is roughly " + 4.0 * count / n) + println("Pi is roughly " + 4.0 * count / (n - 1)) spark.stop() } } diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index d5e2f230d92e9..c044f182b5dff 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-streaming-flume-assembly_2.11 + spark-streaming-flume-assembly_2.10 jar Spark Project External Flume Assembly http://spark.apache.org/ diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index c9d9bfcd39a4d..1aba25bf578b4 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-streaming-flume-sink_2.11 + spark-streaming-flume-sink_2.10 streaming-flume-sink diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 4cf800d62cc85..594f446a5d6fe 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-streaming-flume_2.11 + spark-streaming-flume_2.10 streaming-flume diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml index c894b6cd51c07..5d7753af69e9c 100644 --- a/external/kafka-assembly/pom.xml +++ b/external/kafka-assembly/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-streaming-kafka-assembly_2.11 + spark-streaming-kafka-assembly_2.10 jar Spark Project External Kafka Assembly http://spark.apache.org/ diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index 3c29324915c92..f2d3706e45a74 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-streaming-kafka_2.11 + spark-streaming-kafka_2.10 streaming-kafka diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 02225d5aa7cc5..feea0aed7f16e 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -280,14 +280,20 @@ class DirectKafkaStreamSuite sendDataAndWaitForReceive(i) } + ssc.stop() + // Verify that offset ranges were generated - val offsetRangesBeforeStop = getOffsetRanges(kafkaStream) - assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated") + // Since "offsetRangesAfterStop" will be used to compare with "recoveredOffsetRanges", we should + // collect offset ranges after stopping. Otherwise, because new RDDs keep being generated before + // stopping, we may not be able to get the latest RDDs, then "recoveredOffsetRanges" will + // contain something not in "offsetRangesAfterStop". + val offsetRangesAfterStop = getOffsetRanges(kafkaStream) + assert(offsetRangesAfterStop.size >= 1, "No offset ranges generated") assert( - offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 }, + offsetRangesAfterStop.head._2.forall { _.fromOffset === 0 }, "starting offset not zero" ) - ssc.stop() + logInfo("====== RESTARTING ========") // Recover context from checkpoints @@ -297,12 +303,14 @@ class DirectKafkaStreamSuite // Verify offset ranges have been recovered val recoveredOffsetRanges = getOffsetRanges(recoveredStream) assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered") - val earlierOffsetRangesAsSets = offsetRangesBeforeStop.map { x => (x._1, x._2.toSet) } + val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) } assert( recoveredOffsetRanges.forall { or => earlierOffsetRangesAsSets.contains((or._1, or._2.toSet)) }, - "Recovered ranges are not the same as the ones generated" + "Recovered ranges are not the same as the ones generated\n" + + s"recoveredOffsetRanges: $recoveredOffsetRanges\n" + + s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets" ) // Restart context, give more data and verify the total at the end // If the total is write that means each records has been received only once diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml index b9aceecd89399..45f7e43b35dbb 100644 --- a/external/mqtt-assembly/pom.xml +++ b/external/mqtt-assembly/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-streaming-mqtt-assembly_2.11 + spark-streaming-mqtt-assembly_2.10 jar Spark Project External MQTT Assembly http://spark.apache.org/ diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index 0836a17cb8282..ff6b1654e9abb 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-streaming-mqtt_2.11 + spark-streaming-mqtt_2.10 streaming-mqtt diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml index 94bf00c054dd3..cfa35b5622244 100644 --- a/external/twitter/pom.xml +++ b/external/twitter/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-streaming-twitter_2.11 + spark-streaming-twitter_2.10 streaming-twitter diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml index 47514aa92d609..42ccbeaf8fe8b 100644 --- a/external/zeromq/pom.xml +++ b/external/zeromq/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-streaming-zeromq_2.11 + spark-streaming-zeromq_2.10 streaming-zeromq diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml index e994cad703b31..6261f5cce8d51 100644 --- a/extras/java8-tests/pom.xml +++ b/extras/java8-tests/pom.xml @@ -19,13 +19,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-3-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - java8-tests_2.11 + java8-tests_2.10 pom Spark Project Java8 Tests POM diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml index c13dc994e7e58..b71f8d780f760 100644 --- a/extras/kinesis-asl-assembly/pom.xml +++ b/extras/kinesis-asl-assembly/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-3-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-streaming-kinesis-asl-assembly_2.11 + spark-streaming-kinesis-asl-assembly_2.10 jar Spark Project Kinesis Assembly http://spark.apache.org/ diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml index ee81c6ecd58d0..6f87a0e7765cd 100644 --- a/extras/kinesis-asl/pom.xml +++ b/extras/kinesis-asl/pom.xml @@ -19,14 +19,14 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-3-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-streaming-kinesis-asl_2.11 + spark-streaming-kinesis-asl_2.10 jar Spark Kinesis Integration diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml index 97ed26b8b735b..99eb4cbdec99a 100644 --- a/extras/spark-ganglia-lgpl/pom.xml +++ b/extras/spark-ganglia-lgpl/pom.xml @@ -19,14 +19,14 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-3-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-ganglia-lgpl_2.11 + spark-ganglia-lgpl_2.10 jar Spark Ganglia Integration diff --git a/graphx/pom.xml b/graphx/pom.xml index a8f4cc0d530a1..d897fa8952db9 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../pom.xml org.apache.spark - spark-graphx_2.11 + spark-graphx_2.10 graphx diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 2ca60d51f8331..8a89295444277 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -151,7 +151,7 @@ object Pregel extends Logging { // count the iteration i += 1 } - + messages.unpersist(blocking = false) g } // end of apply diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala index 859f896039047..f72cbb15242ec 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala @@ -47,9 +47,11 @@ object ConnectedComponents { } } val initialMessage = Long.MaxValue - Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)( + val pregelGraph = Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)( vprog = (id, attr, msg) => math.min(attr, msg), sendMsg = sendMessage, mergeMsg = (a, b) => math.min(a, b)) + ccGraph.unpersist() + pregelGraph } // end of connectedComponents } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 9acbd7960e12f..a46c5dac740bb 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -428,6 +428,23 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext { } } + test("unpersist graph RDD") { + withSpark { sc => + val vert = sc.parallelize(List((1L, "a"), (2L, "b"), (3L, "c")), 1) + val edges = sc.parallelize(List(Edge[Long](1L, 2L), Edge[Long](1L, 3L)), 1) + val g0 = Graph(vert, edges) + val g = g0.partitionBy(PartitionStrategy.EdgePartition2D, 2) + val cc = g.connectedComponents() + assert(sc.getPersistentRDDs.nonEmpty) + cc.unpersist() + g.unpersist() + g0.unpersist() + vert.unpersist() + edges.unpersist() + assert(sc.getPersistentRDDs.isEmpty) + } + } + test("SPARK-14219: pickRandomVertex") { withSpark { sc => val vert = sc.parallelize(List((1L, "a")), 1) diff --git a/launcher/pom.xml b/launcher/pom.xml index 0d87ffec0ad16..ae778b1a255ab 100644 --- a/launcher/pom.xml +++ b/launcher/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../pom.xml org.apache.spark - spark-launcher_2.11 + spark-launcher_2.10 jar Spark Project Launcher http://spark.apache.org/ diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java index 414ffc2c84e52..e493514f305d9 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java +++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java @@ -298,8 +298,8 @@ protected void handle(Message msg) throws IOException { Hello hello = (Hello) msg; ChildProcAppHandle handle = pending.remove(hello.secret); if (handle != null) { - handle.setState(SparkAppHandle.State.CONNECTED); handle.setConnection(this); + handle.setState(SparkAppHandle.State.CONNECTED); this.handle = handle; } else { throw new IllegalArgumentException("Received Hello for unknown client."); diff --git a/mllib/pom.xml b/mllib/pom.xml index fc2f7e5ec2218..91254c51fedb0 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../pom.xml org.apache.spark - spark-mllib_2.11 + spark-mllib_2.10 mllib diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index aedfb48058dc5..cc1d19e4a81ff 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -496,7 +496,7 @@ private class AFTAggregator(parameters: BDV[Double], fitIntercept: Boolean) * @return This AFTAggregator object. */ def merge(other: AFTAggregator): this.type = { - if (totalCnt != 0) { + if (other.count != 0) { totalCnt += other.totalCnt lossSum += other.lossSum diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala index cde5979396178..c9468606544db 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala @@ -55,7 +55,7 @@ object SVMDataGenerator { val sc = new SparkContext(sparkMaster, "SVMGenerator") val globalRnd = new Random(94720) - val trueWeights = Array.fill[Double](nfeatures + 1)(globalRnd.nextGaussian()) + val trueWeights = Array.fill[Double](nfeatures)(globalRnd.nextGaussian()) val data: RDD[LabeledPoint] = sc.parallelize(0 until nexamples, parts).map { idx => val rnd = new Random(42 + idx) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala index d718ef63b531a..70f9693b4e96b 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala @@ -346,6 +346,23 @@ class AFTSurvivalRegressionSuite testEstimatorAndModelReadWrite(aft, datasetMultivariate, AFTSurvivalRegressionSuite.allParamSettings, checkModelData) } + + test("SPARK-15892: Incorrectly merged AFTAggregator with zero total count") { + // This `dataset` will contain an empty partition because it has five rows but + // the parallelism is bigger than that. Because the issue was about `AFTAggregator`s + // being merged incorrectly when it has an empty partition, the trained model has + // 1.0 scale from Euler's number for 0. + val points = sc.parallelize(Seq( + AFTPoint(Vectors.dense(1.560, -0.605), 1.218, 1.0), + AFTPoint(Vectors.dense(0.346, 2.158), 2.949, 0.0), + AFTPoint(Vectors.dense(1.380, 0.231), 3.627, 0.0), + AFTPoint(Vectors.dense(0.520, 1.151), 0.273, 1.0), + AFTPoint(Vectors.dense(0.795, -0.226), 4.199, 0.0)), numSlices = 6) + val dataset = sqlContext.createDataFrame(points) + val trainer = new AFTSurvivalRegression() + val model = trainer.fit(dataset) + assert(model.scale != 1) + } } object AFTSurvivalRegressionSuite { diff --git a/network/common/pom.xml b/network/common/pom.xml index ac0cd8ec9f1b6..bf28392c30469 100644 --- a/network/common/pom.xml +++ b/network/common/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-network-common_2.11 + spark-network-common_2.10 jar Spark Project Networking http://spark.apache.org/ diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml index 7d460c857d0b8..077e148a41c51 100644 --- a/network/shuffle/pom.xml +++ b/network/shuffle/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-network-shuffle_2.11 + spark-network-shuffle_2.10 jar Spark Project Shuffle Streaming Service http://spark.apache.org/ diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml index 931bf9d7ba875..8df5ef6d9487d 100644 --- a/network/yarn/pom.xml +++ b/network/yarn/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-network-yarn_2.11 + spark-network-yarn_2.10 jar Spark Project YARN Shuffle Service http://spark.apache.org/ diff --git a/pom.xml b/pom.xml index 86eb8e345dd1c..9d4107907c651 100644 --- a/pom.xml +++ b/pom.xml @@ -24,8 +24,8 @@ 14 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT pom Spark Project Parent POM http://spark.apache.org/ @@ -165,11 +165,11 @@ 3.2.2 2.10.5 - 2.11 + 2.10 ${scala.version} org.scala-lang 1.9.13 - 2.4.4 + 2.7.3 1.1.2.1 1.1.2 1.2.0-incubating diff --git a/project/plugins.sbt b/project/plugins.sbt index c06687d8f197b..6a4e401ea49cc 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -32,3 +32,12 @@ addSbtPlugin("io.spray" % "sbt-revolver" % "0.7.2") libraryDependencies += "org.ow2.asm" % "asm" % "5.0.3" libraryDependencies += "org.ow2.asm" % "asm-commons" % "5.0.3" + +// Spark uses a custom fork of the sbt-pom-reader plugin which contains a patch to fix issues +// related to test-jar dependencies (https://github.com/sbt/sbt-pom-reader/pull/14). The source for +// this fork is published at https://github.com/JoshRosen/sbt-pom-reader/tree/v1.0.0-spark +// and corresponds to commit b160317fcb0b9d1009635a7c5aa05d0f3be61936 in that repository. +// In the long run, we should try to merge our patch upstream and switch to an upstream version of +// the plugin; this is tracked at SPARK-14401. + +addSbtPlugin("org.spark-project" % "sbt-pom-reader" % "1.0.0-spark") diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index e56e22a9b920e..822ae46e45111 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -169,7 +169,12 @@ def save_function(self, obj, name=None): if name is None: name = obj.__name__ - modname = pickle.whichmodule(obj, name) + try: + # whichmodule() could fail, see + # https://bitbucket.org/gutworth/six/issues/63/importing-six-breaks-pickling + modname = pickle.whichmodule(obj, name) + except Exception: + modname = None # print('which gives %s %s %s' % (modname, obj, name)) try: themodule = sys.modules[modname] @@ -326,7 +331,12 @@ def save_global(self, obj, name=None, pack=struct.pack): modname = getattr(obj, "__module__", None) if modname is None: - modname = pickle.whichmodule(obj, name) + try: + # whichmodule() could fail, see + # https://bitbucket.org/gutworth/six/issues/63/importing-six-breaks-pickling + modname = pickle.whichmodule(obj, name) + except Exception: + modname = '__main__' if modname == '__main__': themodule = None diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py index b525d26204371..118d4ac9b2ff2 100644 --- a/python/pyspark/mllib/linalg/__init__.py +++ b/python/pyspark/mllib/linalg/__init__.py @@ -565,7 +565,7 @@ def parse(s): if start == -1: raise ValueError("Tuple should start with '('") end = s.find(')') - if start == -1: + if end == -1: raise ValueError("Tuple should end with ')'") s = s[start + 1: end].strip() diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 0dc42740abcfb..43eb6eca05d35 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -305,6 +305,11 @@ def test_udf2(self): [res] = self.sqlCtx.sql("SELECT strlen(a) FROM test WHERE strlen(a) > 1").collect() self.assertEqual(4, res[0]) + def test_udf_without_arguments(self): + self.sqlCtx.registerFunction("foo", lambda: "bar") + [row] = self.sqlCtx.sql("SELECT foo()").collect() + self.assertEqual(row[0], "bar") + def test_udf_with_array_type(self): d = [Row(l=list(range(3)), d={"key": list(range(5))})] rdd = self.sc.parallelize(d) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 5bc0773fa8660..211b01fe78429 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1195,11 +1195,7 @@ def __new__(self, *args, **kwargs): if args and kwargs: raise ValueError("Can not use both args " "and kwargs to create Row") - if args: - # create row class or objects - return tuple.__new__(self, args) - - elif kwargs: + if kwargs: # create row objects names = sorted(kwargs.keys()) row = tuple.__new__(self, [kwargs[n] for n in names]) @@ -1207,7 +1203,8 @@ def __new__(self, *args, **kwargs): return row else: - raise ValueError("No args or kwargs") + # create row class or objects + return tuple.__new__(self, args) def asDict(self, recursive=False): """ diff --git a/repl/pom.xml b/repl/pom.xml index 981cb047f8cb8..83558ab45cd8e 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../pom.xml org.apache.spark - spark-repl_2.11 + spark-repl_2.10 jar Spark Project REPL http://spark.apache.org/ diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 22632fb040ba4..98be81a074072 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-catalyst_2.11 + spark-catalyst_2.10 jar Spark Project Catalyst http://spark.apache.org/ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index acdf83dd34b11..60e3a55ebd444 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -80,7 +80,6 @@ class Analyzer( ExtractWindowExpressions :: GlobalAggregates :: ResolveAggregateFunctions :: - DistinctAggregationRewriter(conf) :: HiveTypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), Batch("Nondeterministic", Once, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala index 4e7d1341028ca..0cfec43ec72c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala @@ -102,11 +102,8 @@ import org.apache.spark.sql.types.IntegerType */ case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case p if !p.resolved => p - // We need to wait until this Aggregate operator is resolved. + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case a: Aggregate => rewrite(a) - case p => p } def rewrite(a: Aggregate): Aggregate = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 6d807c9ecf302..09bf2a71a8b7b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -95,15 +95,17 @@ abstract class Expression extends TreeNode[Expression] { ctx.subExprEliminationExprs.get(this).map { subExprState => // This expression is repeated meaning the code to evaluated has already been added // as a function and called in advance. Just use it. - val code = s"/* ${this.toCommentSafeString} */" - GeneratedExpressionCode(code, subExprState.isNull, subExprState.value) + GeneratedExpressionCode( + ctx.registerComment(this.toString), + subExprState.isNull, + subExprState.value) }.getOrElse { val isNull = ctx.freshName("isNull") val primitive = ctx.freshName("primitive") val ve = GeneratedExpressionCode("", isNull, primitive) ve.code = genCode(ctx, ve) // Add `this` in the comment. - ve.copy(s"/* ${this.toCommentSafeString} */\n" + ve.code.trim) + ve.copy(code = s"${ctx.registerComment(this.toString)}\n" + ve.code.trim) } } @@ -215,14 +217,6 @@ abstract class Expression extends TreeNode[Expression] { override def simpleString: String = toString override def toString: String = prettyName + flatArguments.mkString("(", ",", ")") - - /** - * Returns the string representation of this expression that is safe to be put in - * code comments of generated code. - */ - protected def toCommentSafeString: String = this.toString - .replace("*/", "\\*\\/") - .replace("\\u", "\\\\u") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala index 9b8b6382d753d..84a456684871f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions.codegen +import org.apache.commons.lang3.StringUtils + /** * An utility class that indents a block of code based on the curly braces and parentheses. * This is used to prettify generated code when in debug mode (or exceptions). @@ -24,7 +26,14 @@ package org.apache.spark.sql.catalyst.expressions.codegen * Written by Matei Zaharia. */ object CodeFormatter { - def format(code: String): String = new CodeFormatter().addLines(code).result() + def format(code: CodeAndComment): String = { + new CodeFormatter().addLines( + StringUtils.replaceEach( + code.body, + code.comment.keys.toArray, + code.comment.values.toArray) + ).result + } } private class CodeFormatter { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 440c7d2fc1156..0dec50e543fd4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -125,6 +125,11 @@ class CodeGenContext { private val curId = new java.util.concurrent.atomic.AtomicInteger() + /** + * The map from a place holder to a corresponding comment + */ + private val placeHolderToComments = new mutable.HashMap[String, String] + /** * Returns a term name that is unique within this instance of a `CodeGenerator`. * @@ -458,6 +463,35 @@ class CodeGenContext { if (doSubexpressionElimination) subexpressionElimination(expressions) expressions.map(e => e.gen(this)) } + + /** + * get a map of the pair of a place holder and a corresponding comment + */ + def getPlaceHolderToComments(): collection.Map[String, String] = placeHolderToComments + + /** + * Register a multi-line comment and return the corresponding place holder + */ + private def registerMultilineComment(text: String): String = { + val placeHolder = s"/*${freshName("c")}*/" + val comment = text.split("(\r\n)|\r|\n").mkString("/**\n * ", "\n * ", "\n */") + placeHolderToComments += (placeHolder -> comment) + placeHolder + } + + /** + * Register a comment and return the corresponding place holder + */ + def registerComment(text: String): String = { + if (text.contains("\n") || text.contains("\r")) { + registerMultilineComment(text) + } else { + val placeHolder = s"/*${freshName("c")}*/" + val safeComment = s"// $text" + placeHolderToComments += (placeHolder -> safeComment) + placeHolder + } + } } /** @@ -468,6 +502,19 @@ abstract class GeneratedClass { def generate(expressions: Array[Expression]): Any } +/** + * A wrapper for the source code to be compiled by [[CodeGenerator]]. + */ +class CodeAndComment(val body: String, val comment: collection.Map[String, String]) + extends Serializable { + override def equals(that: Any): Boolean = that match { + case t: CodeAndComment if t.body == body => true + case _ => false + } + + override def hashCode(): Int = body.hashCode +} + /** * A base class for generators of byte code to perform expression evaluation. Includes a set of * helpers for referring to Catalyst types and building trees that perform evaluation of individual @@ -511,14 +558,14 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin /** * Compile the Java source code into a Java class, using Janino. */ - protected def compile(code: String): GeneratedClass = { + protected def compile(code: CodeAndComment): GeneratedClass = { cache.get(code) } /** * Compile the Java source code into a Java class, using Janino. */ - private[this] def doCompile(code: String): GeneratedClass = { + private[this] def doCompile(code: CodeAndComment): GeneratedClass = { val evaluator = new ClassBodyEvaluator() evaluator.setParentClassLoader(Utils.getContextOrSparkClassLoader) // Cannot be under package codegen, or fail with java.lang.InstantiationException @@ -538,7 +585,7 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin )) evaluator.setExtendedClass(classOf[GeneratedClass]) - def formatted = CodeFormatter.format(code) + lazy val formatted = CodeFormatter.format(code) logDebug({ // Only add extra debugging info to byte code when we are going to print the source code. @@ -547,7 +594,7 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin }) try { - evaluator.cook("generated.java", code) + evaluator.cook("generated.java", code.body) } catch { case e: Exception => val msg = s"failed to compile: $e\n$formatted" @@ -569,8 +616,8 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin private val cache = CacheBuilder.newBuilder() .maximumSize(100) .build( - new CacheLoader[String, GeneratedClass]() { - override def load(code: String): GeneratedClass = { + new CacheLoader[CodeAndComment, GeneratedClass]() { + override def load(code: CodeAndComment): GeneratedClass = { val startTime = System.nanoTime() val result = doCompile(code) val endTime = System.nanoTime() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala index 26fb143d1e45c..e78ae7da8a909 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala @@ -32,8 +32,9 @@ trait CodegenFallback extends Expression { ctx.references += this val objectTerm = ctx.freshName("obj") + val placeHolder = ctx.registerComment(this.toString) s""" - /* expression: ${this.toCommentSafeString} */ + $placeHolder java.lang.Object $objectTerm = expressions[${ctx.references.size - 1}].eval(${ctx.INPUT_ROW}); boolean ${ev.isNull} = $objectTerm == null; ${ctx.javaType(this.dataType)} ${ev.value} = ${ctx.defaultValue(this.dataType)}; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala index 40189f0877764..764fbf417b393 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala @@ -81,7 +81,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu val allProjections = ctx.splitExpressions(ctx.INPUT_ROW, projectionCodes) val allUpdates = ctx.splitExpressions(ctx.INPUT_ROW, updates) - val code = s""" + val codeBody = s""" public java.lang.Object generate($exprType[] expr) { return new SpecificMutableProjection(expr); } @@ -119,6 +119,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu } """ + val code = new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()) logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}") val c = compile(code) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala index 1af7c73cd4bf5..1ecebbaeec8ad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala @@ -110,7 +110,7 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR protected def create(ordering: Seq[SortOrder]): BaseOrdering = { val ctx = newCodeGenContext() val comparisons = genComparisons(ctx, ordering) - val code = s""" + val codeBody = s""" public SpecificOrdering generate($exprType[] expr) { return new SpecificOrdering(expr); } @@ -133,6 +133,7 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR } }""" + val code = new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()) logDebug(s"Generated Ordering: ${CodeFormatter.format(code)}") compile(code).generate(ctx.references.toArray).asInstanceOf[BaseOrdering] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala index 457b4f08424a6..639736749a361 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala @@ -40,7 +40,7 @@ object GeneratePredicate extends CodeGenerator[Expression, (InternalRow) => Bool protected def create(predicate: Expression): ((InternalRow) => Boolean) = { val ctx = newCodeGenContext() val eval = predicate.gen(ctx) - val code = s""" + val codeBody = s""" public SpecificPredicate generate($exprType[] expr) { return new SpecificPredicate(expr); } @@ -61,6 +61,7 @@ object GeneratePredicate extends CodeGenerator[Expression, (InternalRow) => Bool } }""" + val code = new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()) logDebug(s"Generated predicate '$predicate':\n${CodeFormatter.format(code)}") val p = compile(code).generate(ctx.references.toArray).asInstanceOf[Predicate] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala index f229f2000d8e1..f8a3b5489086a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala @@ -152,7 +152,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { s"""if (!nullBits[$i]) arr[$i] = c$i;""" }.mkString("\n") - val code = s""" + val codeBody = s""" public SpecificProjection generate($exprType[] expr) { return new SpecificProjection(expr); } @@ -230,6 +230,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { } """ + val code = new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()) logDebug(s"MutableRow, initExprs: ${expressions.mkString(",")} code:\n" + CodeFormatter.format(code)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala index b7926bda3de19..3ae1450922794 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala @@ -147,7 +147,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] """ } val allExpressions = ctx.splitExpressions(ctx.INPUT_ROW, expressionCodes) - val code = s""" + val codeBody = s""" public java.lang.Object generate($exprType[] expr) { return new SpecificSafeProjection(expr); } @@ -173,6 +173,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] } """ + val code = new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()) logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}") val c = compile(code) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index 68005afb21d2e..6e0b5d198e500 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -323,7 +323,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro val ctx = newCodeGenContext() val eval = createCode(ctx, expressions, subexpressionEliminationEnabled) - val code = s""" + val codeBody = s""" public java.lang.Object generate($exprType[] exprs) { return new SpecificUnsafeProjection(exprs); } @@ -353,6 +353,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro } """ + val code = new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()) logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}") val c = compile(code) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala index fb3c7b1bb4f72..b3ebc9cfac2e1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala @@ -158,7 +158,7 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U }.mkString("\n") // ------------------------ Finally, put everything together --------------------------- // - val code = s""" + val codeBody = s""" |public java.lang.Object generate($exprType[] exprs) { | return new SpecificUnsafeRowJoiner(); |} @@ -195,6 +195,7 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U |} """.stripMargin + val code = new CodeAndComment(codeBody, Map.empty) logDebug(s"SpecificUnsafeRowJoiner($schema1, $schema2):\n${CodeFormatter.format(code)}") val c = compile(code) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 682b860672b2d..676e0b7aceb67 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.immutable.HashSet -import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries} +import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} +import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubQueries} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.Inner @@ -30,14 +31,13 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types._ -abstract class Optimizer extends RuleExecutor[LogicalPlan] - -object DefaultOptimizer extends Optimizer { +abstract class Optimizer(conf: CatalystConf) extends RuleExecutor[LogicalPlan] { val batches = // SubQueries are only needed for analysis and can be removed before execution. Batch("Remove SubQueries", FixedPoint(100), EliminateSubQueries) :: Batch("Aggregate", FixedPoint(100), + DistinctAggregationRewriter(conf), ReplaceDistinctWithAggregate, RemoveLiteralFromGroupExpressions) :: Batch("Operator Optimizations", FixedPoint(100), @@ -68,6 +68,18 @@ object DefaultOptimizer extends Optimizer { Batch("LocalRelation", FixedPoint(100), ConvertToLocalRelation) :: Nil } +case class DefaultOptimizer(conf: CatalystConf) extends Optimizer(conf) + +/** + * An optimizer used in test code. + * + * To ensure extendability, we leave the standard rules in the abstract optimizer rules, while + * specific rules go to the subclasses + */ +object SimpleTestOptimizer extends SimpleTestOptimizer + +class SimpleTestOptimizer extends Optimizer( + new SimpleCatalystConf(caseSensitiveAnalysis = true)) /** * Pushes operations down into a Sample. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala index e3e7a11dba973..89981d601c2f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, analysis} import org.apache.spark.sql.types.{StructField, StructType} @@ -56,10 +57,12 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) override protected def stringArgs = Iterator(output) - override def sameResult(plan: LogicalPlan): Boolean = plan match { - case LocalRelation(otherOutput, otherData) => - otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data - case _ => false + override def sameResult(plan: LogicalPlan): Boolean = { + EliminateSubQueries(plan) match { + case LocalRelation(otherOutput, otherData) => + otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data + case _ => false + } } override lazy val statistics = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 2b93882919487..157ac2ba24ca7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -89,8 +89,8 @@ object DateTimeUtils { // reverse of millisToDays def daysToMillis(days: SQLDate): Long = { - val millisUtc = days.toLong * MILLIS_PER_DAY - millisUtc - threadLocalLocalTimeZone.get().getOffset(millisUtc) + val millisLocal = days.toLong * MILLIS_PER_DAY + millisLocal - getOffsetFromLocalMillis(millisLocal, threadLocalLocalTimeZone.get()) } def dateToString(days: SQLDate): String = @@ -819,6 +819,41 @@ object DateTimeUtils { } } + /** + * Lookup the offset for given millis seconds since 1970-01-01 00:00:00 in given timezone. + */ + private def getOffsetFromLocalMillis(millisLocal: Long, tz: TimeZone): Long = { + var guess = tz.getRawOffset + // the actual offset should be calculated based on milliseconds in UTC + val offset = tz.getOffset(millisLocal - guess) + if (offset != guess) { + guess = tz.getOffset(millisLocal - offset) + if (guess != offset) { + // fallback to do the reverse lookup using java.sql.Timestamp + // this should only happen near the start or end of DST + val days = Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt + val year = getYear(days) + val month = getMonth(days) + val day = getDayOfMonth(days) + + var millisOfDay = (millisLocal % MILLIS_PER_DAY).toInt + if (millisOfDay < 0) { + millisOfDay += MILLIS_PER_DAY.toInt + } + val seconds = (millisOfDay / 1000L).toInt + val hh = seconds / 3600 + val mm = seconds / 60 % 60 + val ss = seconds % 60 + val nano = millisOfDay % 1000 * 1000000 + + // create a Timestamp to get the unix timestamp (in UTC) + val timestamp = new Timestamp(year - 1900, month - 1, day, hh, mm, ss, nano) + guess = (millisLocal - timestamp.getTime).toInt + } + } + guess + } + /** * Returns a timestamp of given timezone from utc timestamp, with the same string * representation in their timezone. @@ -835,7 +870,16 @@ object DateTimeUtils { */ def toUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = { val tz = TimeZone.getTimeZone(timeZone) - val offset = tz.getOffset(time / 1000L) + val offset = getOffsetFromLocalMillis(time / 1000L, tz) time - offset * 1000L } + + /** + * Re-initialize the current thread's thread locals. Exposed for testing. + */ + private[util] def resetThreadLocals(): Unit = { + threadLocalLocalTimeZone.remove() + threadLocalTimestampFormat.remove() + threadLocalDateFormat.remove() + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/NumberConverter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/NumberConverter.scala index 9fefc5656aac0..f295e4e210a4d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/NumberConverter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/NumberConverter.scala @@ -21,8 +21,6 @@ import org.apache.spark.unsafe.types.UTF8String object NumberConverter { - private val value = new Array[Byte](64) - /** * Divide x by m as if x is an unsigned 64-bit integer. Examples: * unsignedLongDiv(-1, 2) == Long.MAX_VALUE unsignedLongDiv(6, 3) == 2 @@ -49,7 +47,7 @@ object NumberConverter { * @param v is treated as an unsigned 64-bit integer * @param radix must be between MIN_RADIX and MAX_RADIX */ - private def decode(v: Long, radix: Int): Unit = { + private def decode(v: Long, radix: Int, value: Array[Byte]): Unit = { var tmpV = v java.util.Arrays.fill(value, 0.asInstanceOf[Byte]) var i = value.length - 1 @@ -69,11 +67,9 @@ object NumberConverter { * @param fromPos is the first element that should be conisdered * @return the result should be treated as an unsigned 64-bit integer. */ - private def encode(radix: Int, fromPos: Int): Long = { + private def encode(radix: Int, fromPos: Int, value: Array[Byte]): Long = { var v: Long = 0L val bound = unsignedLongDiv(-1 - radix, radix) // Possible overflow once - // val - // exceeds this value var i = fromPos while (i < value.length && value(i) >= 0) { if (v >= bound) { @@ -94,7 +90,7 @@ object NumberConverter { * @param radix must be between MIN_RADIX and MAX_RADIX * @param fromPos is the first nonzero element */ - private def byte2char(radix: Int, fromPos: Int): Unit = { + private def byte2char(radix: Int, fromPos: Int, value: Array[Byte]): Unit = { var i = fromPos while (i < value.length) { value(i) = Character.toUpperCase(Character.forDigit(value(i), radix)).asInstanceOf[Byte] @@ -109,9 +105,9 @@ object NumberConverter { * @param radix must be between MIN_RADIX and MAX_RADIX * @param fromPos is the first nonzero element */ - private def char2byte(radix: Int, fromPos: Int): Unit = { + private def char2byte(radix: Int, fromPos: Int, value: Array[Byte]): Unit = { var i = fromPos - while ( i < value.length) { + while (i < value.length) { value(i) = Character.digit(value(i), radix).asInstanceOf[Byte] i += 1 } @@ -124,8 +120,8 @@ object NumberConverter { */ def convert(n: Array[Byte] , fromBase: Int, toBase: Int ): UTF8String = { if (fromBase < Character.MIN_RADIX || fromBase > Character.MAX_RADIX - || Math.abs(toBase) < Character.MIN_RADIX - || Math.abs(toBase) > Character.MAX_RADIX) { + || Math.abs(toBase) < Character.MIN_RADIX + || Math.abs(toBase) > Character.MAX_RADIX) { return null } @@ -136,15 +132,16 @@ object NumberConverter { var (negative, first) = if (n(0) == '-') (true, 1) else (false, 0) // Copy the digits in the right side of the array + val temp = new Array[Byte](64) var i = 1 while (i <= n.length - first) { - value(value.length - i) = n(n.length - i) + temp(temp.length - i) = n(n.length - i) i += 1 } - char2byte(fromBase, value.length - n.length + first) + char2byte(fromBase, temp.length - n.length + first, temp) // Do the conversion by going through a 64 bit integer - var v = encode(fromBase, value.length - n.length + first) + var v = encode(fromBase, temp.length - n.length + first, temp) if (negative && toBase > 0) { if (v < 0) { v = -1 @@ -156,21 +153,20 @@ object NumberConverter { v = -v negative = true } - decode(v, Math.abs(toBase)) + decode(v, Math.abs(toBase), temp) // Find the first non-zero digit or the last digits if all are zero. val firstNonZeroPos = { - val firstNonZero = value.indexWhere( _ != 0) - if (firstNonZero != -1) firstNonZero else value.length - 1 + val firstNonZero = temp.indexWhere( _ != 0) + if (firstNonZero != -1) firstNonZero else temp.length - 1 } - - byte2char(Math.abs(toBase), firstNonZeroPos) + byte2char(Math.abs(toBase), firstNonZeroPos, temp) var resultStartPos = firstNonZeroPos if (negative && toBase < 0) { resultStartPos = firstNonZeroPos - 1 - value(resultStartPos) = '-' + temp(resultStartPos) = '-' } - UTF8String.fromBytes(java.util.Arrays.copyOfRange(value, resultStartPos, value.length)) + UTF8String.fromBytes(java.util.Arrays.copyOfRange(temp, resultStartPos, temp.length)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala index 1d73e40ffcd36..2c966230e447e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.ScalaReflectionLock * * Please use the singleton [[DataTypes.DateType]]. * - * Internally, this is represented as the number of days from epoch (1970-01-01 00:00:00 UTC). + * Internally, this is represented as the number of days from 1970-01-01. */ @DeveloperApi class DateType private() extends AtomicType { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 66a7e228f84bb..e35a1b2d7c9a4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -138,4 +138,47 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { true, InternalRow(UTF8String.fromString("\\u"))) } + + test("check compilation error doesn't occur caused by specific literal") { + // The end of comment (*/) should be escaped. + GenerateUnsafeProjection.generate( + Literal.create("*/Compilation error occurs/*", StringType) :: Nil) + + // `\u002A` is `*` and `\u002F` is `/` + // so if the end of comment consists of those characters in queries, we need to escape them. + GenerateUnsafeProjection.generate( + Literal.create("\\u002A/Compilation error occurs/*", StringType) :: Nil) + GenerateUnsafeProjection.generate( + Literal.create("\\\\u002A/Compilation error occurs/*", StringType) :: Nil) + GenerateUnsafeProjection.generate( + Literal.create("\\u002a/Compilation error occurs/*", StringType) :: Nil) + GenerateUnsafeProjection.generate( + Literal.create("\\\\u002a/Compilation error occurs/*", StringType) :: Nil) + GenerateUnsafeProjection.generate( + Literal.create("*\\u002FCompilation error occurs/*", StringType) :: Nil) + GenerateUnsafeProjection.generate( + Literal.create("*\\\\u002FCompilation error occurs/*", StringType) :: Nil) + GenerateUnsafeProjection.generate( + Literal.create("*\\002fCompilation error occurs/*", StringType) :: Nil) + GenerateUnsafeProjection.generate( + Literal.create("*\\\\002fCompilation error occurs/*", StringType) :: Nil) + GenerateUnsafeProjection.generate( + Literal.create("\\002A\\002FCompilation error occurs/*", StringType) :: Nil) + GenerateUnsafeProjection.generate( + Literal.create("\\\\002A\\002FCompilation error occurs/*", StringType) :: Nil) + GenerateUnsafeProjection.generate( + Literal.create("\\002A\\\\002FCompilation error occurs/*", StringType) :: Nil) + + // \ u002X is an invalid unicode literal so it should be escaped. + GenerateUnsafeProjection.generate( + Literal.create("\\u002X/Compilation error occurs", StringType) :: Nil) + GenerateUnsafeProjection.generate( + Literal.create("\\\\u002X/Compilation error occurs", StringType) :: Nil) + + // \ u001 is an invalid unicode literal so it should be escaped. + GenerateUnsafeProjection.generate( + Literal.create("\\u001/Compilation error occurs", StringType) :: Nil) + GenerateUnsafeProjection.generate( + Literal.create("\\\\u001/Compilation error occurs", StringType) :: Nil) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 465f7d08aa142..074785eb467d2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -24,7 +24,7 @@ import org.scalatest.prop.GeneratorDrivenPropertyChecks import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.codegen._ -import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer +import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project} import org.apache.spark.sql.types.DataType @@ -189,7 +189,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { expected: Any, inputRow: InternalRow = EmptyRow): Unit = { val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation) - val optimizedPlan = DefaultOptimizer.execute(plan) + val optimizedPlan = SimpleTestOptimizer.execute(plan) checkEvaluationWithoutCodegen(optimizedPlan.expressions.head, expected, inputRow) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala index aacc56fc44186..90f90965697a9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateProjection, GenerateMutableProjection} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer +import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project} import org.apache.spark.sql.types._ @@ -150,7 +150,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { expression: Expression, inputRow: InternalRow = EmptyRow): Unit = { val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation) - val optimizedPlan = DefaultOptimizer.execute(plan) + val optimizedPlan = SimpleTestOptimizer.execute(plan) checkNaNWithoutCodegen(optimizedPlan.expressions.head, inputRow) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala index 9da1068e9ca1d..55e4f7561e7b5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala @@ -24,7 +24,8 @@ class CodeFormatterSuite extends SparkFunSuite { def testCase(name: String)(input: String)(expected: String): Unit = { test(name) { - assert(CodeFormatter.format(input).trim === expected.trim) + val sourceCode = new CodeAndComment(input, Map.empty) + assert(CodeFormatter.format(sourceCode).trim === expected.trim) } } diff --git a/project/project/SparkPluginBuild.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala similarity index 57% rename from project/project/SparkPluginBuild.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala index cbb88dc7dd1dd..0c1feb3aa0882 100644 --- a/project/project/SparkPluginBuild.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala @@ -15,14 +15,26 @@ * limitations under the License. */ -import sbt._ -import sbt.Keys._ +package org.apache.spark.sql.catalyst.util + +import java.util.TimeZone /** - * This plugin project is there because we use our custom fork of sbt-pom-reader plugin. This is - * a plugin project so that this gets compiled first and is available on the classpath for SBT build. + * Helper functions for testing date and time functionality. */ -object SparkPluginDef extends Build { - lazy val root = Project("plugins", file(".")) dependsOn(sbtPomReader) - lazy val sbtPomReader = uri("https://github.com/ScrapCodes/sbt-pom-reader.git#ignore_artifact_id") +object DateTimeTestUtils { + + val ALL_TIMEZONES: Seq[TimeZone] = TimeZone.getAvailableIDs.toSeq.map(TimeZone.getTimeZone) + + def withDefaultTimeZone[T](newDefaultTimeZone: TimeZone)(block: => T): T = { + val originalDefaultTimeZone = TimeZone.getDefault + try { + DateTimeUtils.resetThreadLocals() + TimeZone.setDefault(newDefaultTimeZone) + block + } finally { + TimeZone.setDefault(originalDefaultTimeZone) + DateTimeUtils.resetThreadLocals() + } + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 0ce5a2fb69505..6660453da7d4a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -476,6 +476,13 @@ class DateTimeUtilsSuite extends SparkFunSuite { test("2011-12-25 09:00:00.123456", "JST", "2011-12-25 18:00:00.123456") test("2011-12-25 09:00:00.123456", "PST", "2011-12-25 01:00:00.123456") test("2011-12-25 09:00:00.123456", "Asia/Shanghai", "2011-12-25 17:00:00.123456") + + // Daylight Saving Time + test("2016-03-13 09:59:59.0", "PST", "2016-03-13 01:59:59.0") + test("2016-03-13 10:00:00.0", "PST", "2016-03-13 03:00:00.0") + test("2016-11-06 08:59:59.0", "PST", "2016-11-06 01:59:59.0") + test("2016-11-06 09:00:00.0", "PST", "2016-11-06 01:00:00.0") + test("2016-11-06 10:00:00.0", "PST", "2016-11-06 02:00:00.0") } test("to UTC timestamp") { @@ -487,5 +494,38 @@ class DateTimeUtilsSuite extends SparkFunSuite { test("2011-12-25 18:00:00.123456", "JST", "2011-12-25 09:00:00.123456") test("2011-12-25 01:00:00.123456", "PST", "2011-12-25 09:00:00.123456") test("2011-12-25 17:00:00.123456", "Asia/Shanghai", "2011-12-25 09:00:00.123456") + + // Daylight Saving Time + test("2016-03-13 01:59:59", "PST", "2016-03-13 09:59:59.0") + // 2016-03-13 02:00:00 PST does not exists + test("2016-03-13 02:00:00", "PST", "2016-03-13 10:00:00.0") + test("2016-03-13 03:00:00", "PST", "2016-03-13 10:00:00.0") + test("2016-11-06 00:59:59", "PST", "2016-11-06 07:59:59.0") + // 2016-11-06 01:00:00 PST could be 2016-11-06 08:00:00 UTC or 2016-11-06 09:00:00 UTC + test("2016-11-06 01:00:00", "PST", "2016-11-06 09:00:00.0") + test("2016-11-06 01:59:59", "PST", "2016-11-06 09:59:59.0") + test("2016-11-06 02:00:00", "PST", "2016-11-06 10:00:00.0") + } + + test("daysToMillis and millisToDays") { + // There are some days are skipped entirely in some timezone, skip them here. + val skipped_days = Map[String, Int]( + "Kwajalein" -> 8632, + "Pacific/Apia" -> 15338, + "Pacific/Enderbury" -> 9131, + "Pacific/Fakaofo" -> 15338, + "Pacific/Kiritimati" -> 9131, + "Pacific/Kwajalein" -> 8632, + "MIT" -> 15338) + for (tz <- DateTimeTestUtils.ALL_TIMEZONES) { + DateTimeTestUtils.withDefaultTimeZone(tz) { + val skipped = skipped_days.getOrElse(tz.getID, Int.MinValue) + (-20000 to 20000).foreach { d => + if (d != skipped) { + assert(millisToDays(daysToMillis(d)) === d) + } + } + } + } } } diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 182fb70f36f36..4f8bacac14c43 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-sql_2.11 + spark-sql_2.10 jar Spark Project SQL http://spark.apache.org/ @@ -71,6 +71,10 @@ org.apache.parquet parquet-hadoop + + org.eclipse.jetty + jetty-servlet + com.fasterxml.jackson.core jackson-databind diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 1beb080f81d4a..8884daa301179 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1363,7 +1363,8 @@ class DataFrame private[sql]( // All columns are string type val schema = StructType( StructField("summary", StringType) :: outputCols.map(StructField(_, StringType))).toAttributes - LocalRelation.fromExternalRows(schema, ret) + // `toArray` forces materialization to make the seq serializable + LocalRelation.fromExternalRows(schema, ret.toArray.toSeq) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 47fd7fc1178a9..8a07cee909bc6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -202,7 +202,7 @@ class SQLContext private[sql]( } @transient - protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer + protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer(conf) @transient protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index ea5a9afe03b00..dead63f2e599e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters} -import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, MultiInstanceRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.sources.{HadoopFsRelation, BaseRelation} @@ -79,9 +79,11 @@ private[sql] case class LogicalRDD( override def newInstance(): LogicalRDD.this.type = LogicalRDD(output.map(_.newInstance()), rdd)(sqlContext).asInstanceOf[this.type] - override def sameResult(plan: LogicalPlan): Boolean = plan match { - case LogicalRDD(_, otherRDD) => rdd.id == otherRDD.id - case _ => false + override def sameResult(plan: LogicalPlan): Boolean = { + EliminateSubQueries(plan) match { + case LogicalRDD(_, otherRDD) => rdd.id == otherRDD.id + case _ => false + } } @transient override lazy val statistics: Statistics = Statistics( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index 4d01b78c3c10f..4f6f58bde5c54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -22,7 +22,7 @@ import scala.collection.mutable import org.apache.spark.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.{UnsafeRowWriter, CodeFormatter, CodeGenerator} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeFormatter, CodeGenerator, UnsafeRowWriter} import org.apache.spark.sql.types._ /** @@ -152,7 +152,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera (0 to groupedAccessorsLength - 1).map { i => s"extractors$i();" }.mkString("\n")) } - val code = s""" + val codeBody = s""" import java.nio.ByteBuffer; import java.nio.ByteOrder; import scala.collection.Iterator; @@ -226,6 +226,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera } }""" + val code = new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()) logDebug(s"Generated ColumnarIterator: ${CodeFormatter.format(code)}") compile(code).generate(ctx.references.toArray).asInstanceOf[ColumnarIterator] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 219dae88e515d..fd299a21011ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.execution.datasources -import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, MultiInstanceRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.sources.BaseRelation @@ -57,9 +57,11 @@ case class LogicalRelation( com.google.common.base.Objects.hashCode(relation, output) } - override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match { - case LogicalRelation(otherRelation, _) => relation == otherRelation - case _ => false + override def sameResult(otherPlan: LogicalPlan): Boolean = { + EliminateSubQueries(otherPlan) match { + case LogicalRelation(otherRelation, _) => relation == otherRelation + case _ => false + } } // When comparing two LogicalRelations from within LogicalPlan.sameResult, we only need diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 10f650693f288..c0aede0631269 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -50,7 +50,7 @@ object JdbcUtils extends Logging { DriverManager.getDriver(url).getClass.getCanonicalName } () => { - userSpecifiedDriverClass.foreach(DriverRegistry.register) + DriverRegistry.register(driverClass) val driver: Driver = DriverManager.getDrivers.asScala.collectFirst { case d: DriverWrapper if d.wrapped.getClass.getCanonicalName == driverClass => d case d if d.getClass.getCanonicalName == driverClass => d diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index d86df4cfb9b4d..4f83e3356f161 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -506,4 +506,15 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext sqlContext.uncacheTable("t2") } } + + test("SPARK-15915 Logical plans should use subqueries eliminated plan when override sameResult") { + val localRelation = sqlContext.createDataset(Seq(1, 2, 3)).toDF() + localRelation.registerTempTable("localRelation") + + sqlContext.cacheTable("localRelation") + assert( + localRelation.queryExecution.withCachedData.collect { + case i: InMemoryRelation => i + }.size == 1) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index f630eb5a1292b..2be834359089d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2037,4 +2037,269 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(sql("SELECT value['cba'] FROM maptest where key = 1"), Row(null)) } } + + test("check code injection is prevented") { + // The end of comment (*/) should be escaped. + var literal = + """|*/ + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + var expected = + """|*/ + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + checkAnswer( + sql(s"SELECT '$literal' AS DUMMY"), + Row(s"$expected") :: Nil) + + // `\u002A` is `*` and `\u002F` is `/` + // so if the end of comment consists of those characters in queries, we need to escape them. + literal = + """|\\u002A/ + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + expected = + """|\\u002A/ + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + checkAnswer( + sql(s"SELECT '$literal' AS DUMMY"), + Row(s"$expected") :: Nil) + + literal = + """|\\\\u002A/ + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + expected = + """|\\\\u002A/ + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + checkAnswer( + sql(s"SELECT '$literal' AS DUMMY"), + Row(s"$expected") :: Nil) + + literal = + """|\\u002a/ + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + expected = + """|\\u002a/ + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + checkAnswer( + sql(s"SELECT '$literal' AS DUMMY"), + Row(s"$expected") :: Nil) + + literal = + """|\\\\u002a/ + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + expected = + """|\\\\u002a/ + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + checkAnswer( + sql(s"SELECT '$literal' AS DUMMY"), + Row(s"$expected") :: Nil) + + literal = + """|*\\u002F + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + expected = + """|*\\u002F + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + checkAnswer( + sql(s"SELECT '$literal' AS DUMMY"), + Row(s"$expected") :: Nil) + + literal = + """|*\\\\u002F + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + expected = + """|*\\\\u002F + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + checkAnswer( + sql(s"SELECT '$literal' AS DUMMY"), + Row(s"$expected") :: Nil) + + literal = + """|*\\u002f + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + expected = + """|*\\u002f + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + checkAnswer( + sql(s"SELECT '$literal' AS DUMMY"), + Row(s"$expected") :: Nil) + + literal = + """|*\\\\u002f + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + expected = + """|*\\\\u002f + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + checkAnswer( + sql(s"SELECT '$literal' AS DUMMY"), + Row(s"$expected") :: Nil) + + literal = + """|\\u002A\\u002F + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + expected = + """|\\u002A\\u002F + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + checkAnswer( + sql(s"SELECT '$literal' AS DUMMY"), + Row(s"$expected") :: Nil) + + literal = + """|\\\\u002A\\u002F + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + expected = + """|\\\\u002A\\u002F + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + checkAnswer( + sql(s"SELECT '$literal' AS DUMMY"), + Row(s"$expected") :: Nil) + + literal = + """|\\u002A\\\\u002F + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + expected = + """|\\u002A\\\\u002F + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + checkAnswer( + sql(s"SELECT '$literal' AS DUMMY"), + Row(s"$expected") :: Nil) + + literal = + """|\\\\u002A\\\\u002F + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + expected = + """|\\\\u002A\\\\u002F + |{ + | new Object() { + | void f() { throw new RuntimeException("This exception is injected."); } + | }.f(); + |} + |/*""".stripMargin.replaceAll("\n", "") + checkAnswer( + sql(s"SELECT '$literal' AS DUMMY"), + Row(s"$expected") :: Nil) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 2fb439f50117a..7d7c39c9d78b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -44,10 +44,12 @@ class PlannerSuite extends SharedSQLContext { fail(s"Could query play aggregation query $query. Is it an aggregation query?")) val aggregations = planned.collect { case n if n.nodeName contains "Aggregate" => n } - // For the new aggregation code path, there will be four aggregate operator for - // distinct aggregations. + // For the new aggregation code path, there will be three aggregate operator for + // distinct aggregations. There used to be four aggregate operators because single + // distinct aggregate used to trigger DistinctAggregationRewriter rewrite. Now the + // the rewrite only happens when there are multiple distinct aggregations. assert( - aggregations.size == 2 || aggregations.size == 4, + aggregations.size == 2 || aggregations.size == 3, s"The plan of query $query does not have partial aggregations.") } diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index 22b959dbfc7f3..0060f0f0bd116 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-hive-thriftserver_2.11 + spark-hive-thriftserver_2.10 jar Spark Project Hive Thrift Server http://spark.apache.org/ diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index ce8d8beb4297e..99d73fe89eece 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../../pom.xml org.apache.spark - spark-hive_2.11 + spark-hive_2.10 jar Spark Project Hive http://spark.apache.org/ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d0b359ada6881..8bbe178b53ff2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.metadata._ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog} +import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateSubQueries, MultiInstanceRelation, OverrideCatalog} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical @@ -874,7 +874,7 @@ private[hive] case class MetastoreRelation /** Only compare database and tablename, not alias. */ override def sameResult(plan: LogicalPlan): Boolean = { - plan match { + EliminateSubQueries(plan) match { case mr: MetastoreRelation => mr.databaseName == databaseName && mr.tableName == tableName case _ => false diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index e9b2e20236da1..075b4cb0e23eb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -496,12 +496,14 @@ private[hive] class ClientWrapper( // Throw an exception if there is an error in query processing. if (response.getResponseCode != 0) { driver.close() + CommandProcessorFactory.clean(conf) throw new QueryExecutionException(response.getErrorMessage) } driver.setMaxRows(maxRows) val results = shim.getDriverResults(driver) driver.close() + CommandProcessorFactory.clean(conf) results case _ => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index b793cccb627bb..25ecdb751f0c1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -922,6 +922,22 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(11) :: Nil) } } + + test("SPARK-14495: distinct aggregate in having clause") { + checkAnswer( + sqlContext.sql( + """ + |select key, count(distinct value1), count(distinct value2) + |from agg2 group by key + |having count(distinct value1) > 0 + """.stripMargin), + Seq( + Row(null, 3, 3), + Row(1, 2, 3), + Row(2, 2, 1) + ) + ) + } } diff --git a/streaming/pom.xml b/streaming/pom.xml index 428ac16697063..4a82994594e10 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../pom.xml org.apache.spark - spark-streaming_2.11 + spark-streaming_2.10 streaming diff --git a/tags/pom.xml b/tags/pom.xml index d502b221a4845..51048340abd66 100644 --- a/tags/pom.xml +++ b/tags/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../pom.xml org.apache.spark - spark-test-tags_2.11 + spark-test-tags_2.10 jar Spark Project Test Tags http://spark.apache.org/ diff --git a/tools/pom.xml b/tools/pom.xml index 8b3cd90f6beb2..215690283d4af 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -19,13 +19,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../pom.xml org.apache.spark - spark-tools_2.11 + spark-tools_2.10 tools diff --git a/unsafe/pom.xml b/unsafe/pom.xml index aa683f0ca76f7..5ff97da2feaa9 100644 --- a/unsafe/pom.xml +++ b/unsafe/pom.xml @@ -20,13 +20,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../pom.xml org.apache.spark - spark-unsafe_2.11 + spark-unsafe_2.10 jar Spark Project Unsafe http://spark.apache.org/ diff --git a/yarn/pom.xml b/yarn/pom.xml index 70c585541c608..ef32d49e6e4a0 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -19,13 +19,13 @@ 4.0.0 org.apache.spark - spark-parent_2.11 - 1.6.1-csd-10-SNAPSHOT + spark-parent_2.10 + 1.6.2-csd-1-SNAPSHOT ../pom.xml org.apache.spark - spark-yarn_2.11 + spark-yarn_2.10 jar Spark Project YARN diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 7631aa34bf34d..508970d35513b 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -882,7 +882,6 @@ private[spark] class Client( amContainer.setApplicationACLs( YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava) setupSecurityToken(amContainer) - UserGroupInformation.getCurrentUser().addCredentials(credentials) amContainer } @@ -910,7 +909,8 @@ private[spark] class Client( sparkConf.set("spark.yarn.keytab", keytabFileName) sparkConf.set("spark.yarn.principal", principal) } - credentials = UserGroupInformation.getCurrentUser.getCredentials + // Defensive copy of the credentials + credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials) } /**