Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,14 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {

File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
File tmp = Utils.tempFileWith(output);
partitionLengths = writePartitionedFile(tmp);
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
try {
partitionLengths = writePartitionedFile(tmp);
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,21 @@ void closeAndWriteOutput() throws IOException {
final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
final File tmp = Utils.tempFileWith(output);
try {
partitionLengths = mergeSpills(spills, tmp);
} finally {
for (SpillInfo spill : spills) {
if (spill.file.exists() && ! spill.file.delete()) {
logger.error("Error while deleting spill file {}", spill.file.getPath());
try {
partitionLengths = mergeSpills(spills, tmp);
} finally {
for (SpillInfo spill : spills) {
if (spill.file.exists() && ! spill.file.delete()) {
logger.error("Error while deleting spill file {}", spill.file.getPath());
}
}
}
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ private[spark] abstract class Task[T](
Utils.tryLogNonFatalError {
// Release memory used by this thread for unrolling blocks
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
SparkEnv.get.blockManager.memoryStore.releasePendingUnrollMemoryForThisTask()
// Notify any tasks waiting for execution memory to be freed to wake up and try to
// acquire memory again. This makes impossible the scenario where a task sleeps forever
// because there are no other tasks left to notify it. Since this is safe to do but may
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,48 +138,54 @@ private[spark] class IndexShuffleBlockResolver(
dataTmp: File): Unit = {
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
offset += length
try {
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
offset += length
out.writeLong(offset)
}
} {
out.close()
}
} {
out.close()
}

val dataFile = getDataFile(shuffleId, mapId)
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
// the following check and rename are atomic.
synchronized {
val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
if (existingLengths != null) {
// Another attempt for the same task has already written our map outputs successfully,
// so just use the existing partition lengths and delete our temporary map outputs.
System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
if (dataTmp != null && dataTmp.exists()) {
dataTmp.delete()
}
indexTmp.delete()
} else {
// This is the first successful attempt in writing the map outputs for this task,
// so override any existing index and data files with the ones we wrote.
if (indexFile.exists()) {
indexFile.delete()
}
if (dataFile.exists()) {
dataFile.delete()
}
if (!indexTmp.renameTo(indexFile)) {
throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
}
if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
val dataFile = getDataFile(shuffleId, mapId)
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
// the following check and rename are atomic.
synchronized {
val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
if (existingLengths != null) {
// Another attempt for the same task has already written our map outputs successfully,
// so just use the existing partition lengths and delete our temporary map outputs.
System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
if (dataTmp != null && dataTmp.exists()) {
dataTmp.delete()
}
indexTmp.delete()
} else {
// This is the first successful attempt in writing the map outputs for this task,
// so override any existing index and data files with the ones we wrote.
if (indexFile.exists()) {
indexFile.delete()
}
if (dataFile.exists()) {
dataFile.delete()
}
if (!indexTmp.renameTo(indexFile)) {
throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
}
if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
}
}
}
} finally {
if (indexTmp.exists() && !indexTmp.delete()) {
logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,16 @@ private[spark] class SortShuffleWriter[K, V, C](
// (see SPARK-3570).
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
try {
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}

/** Close this writer, passing along whether the map completed */
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
if (memoryToRelease > 0) {
unrollMemoryMap(taskAttemptId) -= memoryToRelease
if (unrollMemoryMap(taskAttemptId) == 0) {
unrollMemoryMap.remove(taskAttemptId)
}
memoryManager.releaseUnrollMemory(memoryToRelease)
}
if (unrollMemoryMap(taskAttemptId) == 0) {
unrollMemoryMap.remove(taskAttemptId)
}
}
}
}
Expand All @@ -530,11 +530,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
val memoryToRelease = math.min(memory, pendingUnrollMemoryMap(taskAttemptId))
if (memoryToRelease > 0) {
pendingUnrollMemoryMap(taskAttemptId) -= memoryToRelease
if (pendingUnrollMemoryMap(taskAttemptId) == 0) {
pendingUnrollMemoryMap.remove(taskAttemptId)
}
memoryManager.releaseUnrollMemory(memoryToRelease)
}
if (pendingUnrollMemoryMap(taskAttemptId) == 0) {
pendingUnrollMemoryMap.remove(taskAttemptId)
}
}
}
}
Expand Down
15 changes: 15 additions & 0 deletions extras/kinesis-asl-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,21 @@
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<!-- SPARK-17418: prevent the kinesis-asl-assembly from being published to Maven -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,11 @@ case class Remainder(left: Expression, right: Expression) extends BinaryArithmet
if (input1 == null) {
null
} else {
integral.rem(input1, input2)
input1 match {
case d: Double => d % input2.asInstanceOf[java.lang.Double]
case f: Float => f % input2.asInstanceOf[java.lang.Float]
case _ => integral.rem(input1, input2)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,17 @@ class ArithmeticExpressionSuite extends SparkFunSuite with ExpressionEvalHelper
// }
}

test("SPARK-17617: % (Remainder) double % double on super big double") {
val leftDouble = Literal(-5083676433652386516D)
val rightDouble = Literal(10D)
checkEvaluation(Remainder(leftDouble, rightDouble), -6.0D)

// Float has smaller precision
val leftFloat = Literal(-5083676433652386516F)
val rightFloat = Literal(10F)
checkEvaluation(Remainder(leftFloat, rightFloat), -2.0F)
}

test("Abs") {
testNumericDataTypes { convert =>
val input = Literal(convert(1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,13 @@ private[hive] object HiveContext {
// hive.metastore.uris is not set.
propMap.put(ConfVars.METASTOREURIS.varname, "")

// The execution client will generate garbage events, therefore the listeners that are generated
// for the execution clients are useless. In order to not output garbage, we don't generate
// these listeners.
propMap.put(ConfVars.METASTORE_PRE_EVENT_LISTENERS.varname, "")
propMap.put(ConfVars.METASTORE_EVENT_LISTENERS.varname, "")
propMap.put(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname, "")

propMap.toMap
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import org.apache.hadoop.hive.conf.HiveConf.ConfVars

import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.QueryTest

class HiveContextSuite extends QueryTest with TestHiveSingleton {
test("newTemporaryConfiguration overwrites listener configurations") {
val conf = HiveContext.newTemporaryConfiguration()
assert(conf(ConfVars.METASTORE_PRE_EVENT_LISTENERS.varname) === "")
assert(conf(ConfVars.METASTORE_EVENT_LISTENERS.varname) === "")
assert(conf(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname) === "")
}
}