From bf3f6d2f10872567e083467da7951a7111975065 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 13 Sep 2016 16:15:44 -0700 Subject: [PATCH 1/5] [SPARK-17531][BACKPORT] Don't initialize Hive Listeners for the Execution Client ## What changes were proposed in this pull request? If a user provides listeners inside the Hive Conf, the configuration for these listeners are passed to the Hive Execution Client as well. This may cause issues for two reasons: 1. The Execution Client will actually generate garbage 2. The listener class needs to be both in the Spark Classpath and Hive Classpath This PR empties the listener configurations in HiveUtils.newTemporaryConfiguration so that the execution client will not contain the listener confs, but the metadata client will. ## How was this patch tested? Unit tests Author: Burak Yavuz Closes #15087 from brkyvz/overwrite-hive-listeners. --- .../apache/spark/sql/hive/HiveContext.scala | 7 ++++ .../spark/sql/hive/HiveContextSuite.scala | 32 +++++++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index e83941c2ecf6..fa644405de73 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -753,6 +753,13 @@ private[hive] object HiveContext { // hive.metastore.uris is not set. propMap.put(ConfVars.METASTOREURIS.varname, "") + // The execution client will generate garbage events, therefore the listeners that are generated + // for the execution clients are useless. In order to not output garbage, we don't generate + // these listeners. + propMap.put(ConfVars.METASTORE_PRE_EVENT_LISTENERS.varname, "") + propMap.put(ConfVars.METASTORE_EVENT_LISTENERS.varname, "") + propMap.put(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname, "") + propMap.toMap } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala new file mode 100644 index 000000000000..048a0c019b16 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.hadoop.hive.conf.HiveConf.ConfVars + +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.QueryTest + +class HiveContextSuite extends QueryTest with TestHiveSingleton { + test("newTemporaryConfiguration overwrites listener configurations") { + val conf = HiveContext.newTemporaryConfiguration() + assert(conf(ConfVars.METASTORE_PRE_EVENT_LISTENERS.varname) === "") + assert(conf(ConfVars.METASTORE_EVENT_LISTENERS.varname) === "") + assert(conf(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname) === "") + } +} From a447cd88897bc3d76eee0e8757e6545019704f30 Mon Sep 17 00:00:00 2001 From: Xing SHI Date: Wed, 14 Sep 2016 13:46:46 -0700 Subject: [PATCH 2/5] [SPARK-17465][SPARK CORE] Inappropriate memory management in `org.apache.spark.storage.MemoryStore` may lead to memory leak ## What changes were proposed in this pull request? The expression like `if (memoryMap(taskAttemptId) == 0) memoryMap.remove(taskAttemptId)` in method `releaseUnrollMemoryForThisTask` and `releasePendingUnrollMemoryForThisTask` should be called after release memory operation, whatever `memoryToRelease` is > 0 or not. If the memory of a task has been set to 0 when calling a `releaseUnrollMemoryForThisTask` or a `releasePendingUnrollMemoryForThisTask` method, the key in the memory map corresponding to that task will never be removed from the hash map. See the details in [SPARK-17465](https://issues.apache.org/jira/browse/SPARK-17465). Author: Xing SHI Closes #15022 from saturday-shi/SPARK-17465. --- .../main/scala/org/apache/spark/scheduler/Task.scala | 1 + .../scala/org/apache/spark/storage/MemoryStore.scala | 12 ++++++------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index c7b11999fa9f..2f4225e1b5cd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -104,6 +104,7 @@ private[spark] abstract class Task[T]( Utils.tryLogNonFatalError { // Release memory used by this thread for unrolling blocks SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask() + SparkEnv.get.blockManager.memoryStore.releasePendingUnrollMemoryForThisTask() // Notify any tasks waiting for execution memory to be freed to wake up and try to // acquire memory again. This makes impossible the scenario where a task sleeps forever // because there are no other tasks left to notify it. Since this is safe to do but may diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index aed0da96d34c..1113160f930e 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -511,11 +511,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId)) if (memoryToRelease > 0) { unrollMemoryMap(taskAttemptId) -= memoryToRelease - if (unrollMemoryMap(taskAttemptId) == 0) { - unrollMemoryMap.remove(taskAttemptId) - } memoryManager.releaseUnrollMemory(memoryToRelease) } + if (unrollMemoryMap(taskAttemptId) == 0) { + unrollMemoryMap.remove(taskAttemptId) + } } } } @@ -530,11 +530,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo val memoryToRelease = math.min(memory, pendingUnrollMemoryMap(taskAttemptId)) if (memoryToRelease > 0) { pendingUnrollMemoryMap(taskAttemptId) -= memoryToRelease - if (pendingUnrollMemoryMap(taskAttemptId) == 0) { - pendingUnrollMemoryMap.remove(taskAttemptId) - } memoryManager.releaseUnrollMemory(memoryToRelease) } + if (pendingUnrollMemoryMap(taskAttemptId) == 0) { + pendingUnrollMemoryMap.remove(taskAttemptId) + } } } } From 8646b84fb8ed319e3a998f93de4821c723f7d419 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 15 Sep 2016 11:22:58 -0700 Subject: [PATCH 3/5] [SPARK-17547] Ensure temp shuffle data file is cleaned up after error SPARK-8029 (#9610) modified shuffle writers to first stage their data to a temporary file in the same directory as the final destination file and then to atomically rename this temporary file at the end of the write job. However, this change introduced the potential for the temporary output file to be leaked if an exception occurs during the write because the shuffle writers' existing error cleanup code doesn't handle deletion of the temp file. This patch avoids this potential cause of disk-space leaks by adding `finally` blocks to ensure that temp files are always deleted if they haven't been renamed. Author: Josh Rosen Closes #15104 from JoshRosen/cleanup-tmp-data-file-in-shuffle-writer. (cherry picked from commit 5b8f7377d54f83b93ef2bfc2a01ca65fae6d3032) Signed-off-by: Josh Rosen --- .../sort/BypassMergeSortShuffleWriter.java | 10 ++- .../shuffle/sort/UnsafeShuffleWriter.java | 18 +++-- .../shuffle/IndexShuffleBlockResolver.scala | 80 ++++++++++--------- .../shuffle/sort/SortShuffleWriter.scala | 14 +++- 4 files changed, 73 insertions(+), 49 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index a1a1fb01426a..80d24b9edd7b 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -157,8 +157,14 @@ public void write(Iterator> records) throws IOException { File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); File tmp = Utils.tempFileWith(output); - partitionLengths = writePartitionedFile(tmp); - shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); + try { + partitionLengths = writePartitionedFile(tmp); + shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); + } finally { + if (tmp.exists() && !tmp.delete()) { + logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); + } + } mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 744c3008ca50..d5e16fcbb7ff 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -209,15 +209,21 @@ void closeAndWriteOutput() throws IOException { final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); final File tmp = Utils.tempFileWith(output); try { - partitionLengths = mergeSpills(spills, tmp); - } finally { - for (SpillInfo spill : spills) { - if (spill.file.exists() && ! spill.file.delete()) { - logger.error("Error while deleting spill file {}", spill.file.getPath()); + try { + partitionLengths = mergeSpills(spills, tmp); + } finally { + for (SpillInfo spill : spills) { + if (spill.file.exists() && ! spill.file.delete()) { + logger.error("Error while deleting spill file {}", spill.file.getPath()); + } } } + shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); + } finally { + if (tmp.exists() && !tmp.delete()) { + logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); + } } - shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); } diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index fadb8fe7ed0a..cae97ad6d744 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -138,48 +138,54 @@ private[spark] class IndexShuffleBlockResolver( dataTmp: File): Unit = { val indexFile = getIndexFile(shuffleId, mapId) val indexTmp = Utils.tempFileWith(indexFile) - val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp))) - Utils.tryWithSafeFinally { - // We take in lengths of each block, need to convert it to offsets. - var offset = 0L - out.writeLong(offset) - for (length <- lengths) { - offset += length + try { + val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp))) + Utils.tryWithSafeFinally { + // We take in lengths of each block, need to convert it to offsets. + var offset = 0L out.writeLong(offset) + for (length <- lengths) { + offset += length + out.writeLong(offset) + } + } { + out.close() } - } { - out.close() - } - val dataFile = getDataFile(shuffleId, mapId) - // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure - // the following check and rename are atomic. - synchronized { - val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length) - if (existingLengths != null) { - // Another attempt for the same task has already written our map outputs successfully, - // so just use the existing partition lengths and delete our temporary map outputs. - System.arraycopy(existingLengths, 0, lengths, 0, lengths.length) - if (dataTmp != null && dataTmp.exists()) { - dataTmp.delete() - } - indexTmp.delete() - } else { - // This is the first successful attempt in writing the map outputs for this task, - // so override any existing index and data files with the ones we wrote. - if (indexFile.exists()) { - indexFile.delete() - } - if (dataFile.exists()) { - dataFile.delete() - } - if (!indexTmp.renameTo(indexFile)) { - throw new IOException("fail to rename file " + indexTmp + " to " + indexFile) - } - if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) { - throw new IOException("fail to rename file " + dataTmp + " to " + dataFile) + val dataFile = getDataFile(shuffleId, mapId) + // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure + // the following check and rename are atomic. + synchronized { + val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length) + if (existingLengths != null) { + // Another attempt for the same task has already written our map outputs successfully, + // so just use the existing partition lengths and delete our temporary map outputs. + System.arraycopy(existingLengths, 0, lengths, 0, lengths.length) + if (dataTmp != null && dataTmp.exists()) { + dataTmp.delete() + } + indexTmp.delete() + } else { + // This is the first successful attempt in writing the map outputs for this task, + // so override any existing index and data files with the ones we wrote. + if (indexFile.exists()) { + indexFile.delete() + } + if (dataFile.exists()) { + dataFile.delete() + } + if (!indexTmp.renameTo(indexFile)) { + throw new IOException("fail to rename file " + indexTmp + " to " + indexFile) + } + if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) { + throw new IOException("fail to rename file " + dataTmp + " to " + dataFile) + } } } + } finally { + if (indexTmp.exists() && !indexTmp.delete()) { + logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}") + } } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index f83cf8859e58..a78cf4355bd1 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -68,10 +68,16 @@ private[spark] class SortShuffleWriter[K, V, C]( // (see SPARK-3570). val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) val tmp = Utils.tempFileWith(output) - val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) - val partitionLengths = sorter.writePartitionedFile(blockId, tmp) - shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) - mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) + try { + val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) + val partitionLengths = sorter.writePartitionedFile(blockId, tmp) + shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) + mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) + } finally { + if (tmp.exists() && !tmp.delete()) { + logError(s"Error while deleting temp file ${tmp.getAbsolutePath}") + } + } } /** Close this writer, passing along whether the map completed */ From 8f88412c31dc840c15df9822638645381c82a2fe Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Wed, 21 Sep 2016 16:53:34 +0800 Subject: [PATCH 4/5] [SPARK-17617][SQL] Remainder(%) expression.eval returns incorrect result on double value ## What changes were proposed in this pull request? Remainder(%) expression's `eval()` returns incorrect result when the dividend is a big double. The reason is that Remainder converts the double dividend to decimal to do "%", and that lose precision. This bug only affects the `eval()` that is used by constant folding, the codegen path is not impacted. ### Before change ``` scala> -5083676433652386516D % 10 res2: Double = -6.0 scala> spark.sql("select -5083676433652386516D % 10 as a").show +---+ | a| +---+ |0.0| +---+ ``` ### After change ``` scala> spark.sql("select -5083676433652386516D % 10 as a").show +----+ | a| +----+ |-6.0| +----+ ``` ## How was this patch tested? Unit test. Author: Sean Zhong Closes #15171 from clockfly/SPARK-17617. (cherry picked from commit 3977223a3268aaf6913a325ee459139a4a302b1c) Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/expressions/arithmetic.scala | 6 +++++- .../expressions/ArithmeticExpressionSuite.scala | 11 +++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala index cfae28509eec..7905825a7770 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala @@ -273,7 +273,11 @@ case class Remainder(left: Expression, right: Expression) extends BinaryArithmet if (input1 == null) { null } else { - integral.rem(input1, input2) + input1 match { + case d: Double => d % input2.asInstanceOf[java.lang.Double] + case f: Float => f % input2.asInstanceOf[java.lang.Float] + case _ => integral.rem(input1, input2) + } } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ArithmeticExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ArithmeticExpressionSuite.scala index 72285c6a2419..a5930d4a5cfd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ArithmeticExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ArithmeticExpressionSuite.scala @@ -172,6 +172,17 @@ class ArithmeticExpressionSuite extends SparkFunSuite with ExpressionEvalHelper // } } + test("SPARK-17617: % (Remainder) double % double on super big double") { + val leftDouble = Literal(-5083676433652386516D) + val rightDouble = Literal(10D) + checkEvaluation(Remainder(leftDouble, rightDouble), -6.0D) + + // Float has smaller precision + val leftFloat = Literal(-5083676433652386516F) + val rightFloat = Literal(10F) + checkEvaluation(Remainder(leftFloat, rightFloat), -2.0F) + } + test("Abs") { testNumericDataTypes { convert => val input = Literal(convert(1)) From ce0a222f56ffaf85273d2935b3e6d02aa9f6fa48 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 21 Sep 2016 11:38:10 -0700 Subject: [PATCH 5/5] [SPARK-17418] Prevent kinesis-asl-assembly artifacts from being published This patch updates the `kinesis-asl-assembly` build to prevent that module from being published as part of Maven releases and snapshot builds. The `kinesis-asl-assembly` includes classes from the Kinesis Client Library (KCL) and Kinesis Producer Library (KPL), both of which are licensed under the Amazon Software License and are therefore prohibited from being distributed in Apache releases. Author: Josh Rosen Closes #15167 from JoshRosen/stop-publishing-kinesis-assembly. --- extras/kinesis-asl-assembly/pom.xml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml index 98d6d8d8cc32..6528e4edcf4e 100644 --- a/extras/kinesis-asl-assembly/pom.xml +++ b/extras/kinesis-asl-assembly/pom.xml @@ -132,6 +132,21 @@ target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes + + + org.apache.maven.plugins + maven-deploy-plugin + + true + + + + org.apache.maven.plugins + maven-install-plugin + + true + + org.apache.maven.plugins maven-shade-plugin