Skip to content

Commit 9e8e085

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into date_add
Conflicts: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
2 parents 6224ce4 + 6d94bf6 commit 9e8e085

File tree

138 files changed

+4859
-1031
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

138 files changed

+4859
-1031
lines changed

R/pkg/R/generics.R

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,10 @@ setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues")
254254

255255
# @rdname intersection
256256
# @export
257-
setGeneric("intersection", function(x, other, numPartitions = 1) {
258-
standardGeneric("intersection") })
257+
setGeneric("intersection",
258+
function(x, other, numPartitions = 1) {
259+
standardGeneric("intersection")
260+
})
259261

260262
# @rdname keys
261263
# @export
@@ -489,9 +491,7 @@ setGeneric("sample",
489491
#' @rdname sample
490492
#' @export
491493
setGeneric("sample_frac",
492-
function(x, withReplacement, fraction, seed) {
493-
standardGeneric("sample_frac")
494-
})
494+
function(x, withReplacement, fraction, seed) { standardGeneric("sample_frac") })
495495

496496
#' @rdname saveAsParquetFile
497497
#' @export
@@ -553,8 +553,8 @@ setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn
553553

554554
#' @rdname withColumnRenamed
555555
#' @export
556-
setGeneric("withColumnRenamed", function(x, existingCol, newCol) {
557-
standardGeneric("withColumnRenamed") })
556+
setGeneric("withColumnRenamed",
557+
function(x, existingCol, newCol) { standardGeneric("withColumnRenamed") })
558558

559559

560560
###################### Column Methods ##########################

R/pkg/R/pairRDD.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,8 @@ setMethod("partitionBy",
202202

203203
packageNamesArr <- serialize(.sparkREnv$.packages,
204204
connection = NULL)
205-
broadcastArr <- lapply(ls(.broadcastNames), function(name) {
206-
get(name, .broadcastNames) })
205+
broadcastArr <- lapply(ls(.broadcastNames),
206+
function(name) { get(name, .broadcastNames) })
207207
jrdd <- getJRDD(x)
208208

209209
# We create a PairwiseRRDD that extends RDD[(Int, Array[Byte])],

R/pkg/R/sparkR.R

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
connExists <- function(env) {
2323
tryCatch({
2424
exists(".sparkRCon", envir = env) && isOpen(env[[".sparkRCon"]])
25-
}, error = function(err) {
25+
},
26+
error = function(err) {
2627
return(FALSE)
2728
})
2829
}
@@ -153,7 +154,8 @@ sparkR.init <- function(
153154
.sparkREnv$backendPort <- backendPort
154155
tryCatch({
155156
connectBackend("localhost", backendPort)
156-
}, error = function(err) {
157+
},
158+
error = function(err) {
157159
stop("Failed to connect JVM\n")
158160
})
159161

@@ -264,7 +266,8 @@ sparkRHive.init <- function(jsc = NULL) {
264266
ssc <- callJMethod(sc, "sc")
265267
hiveCtx <- tryCatch({
266268
newJObject("org.apache.spark.sql.hive.HiveContext", ssc)
267-
}, error = function(err) {
269+
},
270+
error = function(err) {
268271
stop("Spark SQL is not built with Hive support")
269272
})
270273

R/pkg/inst/tests/test_sparkSQL.R

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ test_that("create DataFrame from RDD", {
112112
df <- jsonFile(sqlContext, jsonPathNa)
113113
hiveCtx <- tryCatch({
114114
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
115-
}, error = function(err) {
115+
},
116+
error = function(err) {
116117
skip("Hive is not build with SparkSQL, skipped")
117118
})
118119
sql(hiveCtx, "CREATE TABLE people (name string, age double, height float)")
@@ -602,7 +603,8 @@ test_that("write.df() as parquet file", {
602603
test_that("test HiveContext", {
603604
hiveCtx <- tryCatch({
604605
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
605-
}, error = function(err) {
606+
},
607+
error = function(err) {
606608
skip("Hive is not build with SparkSQL, skipped")
607609
})
608610
df <- createExternalTable(hiveCtx, "json", jsonPath, "json")

R/run-tests.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ FAILED=0
2323
LOGFILE=$FWDIR/unit-tests.out
2424
rm -f $LOGFILE
2525

26-
SPARK_TESTING=1 $FWDIR/../bin/sparkR --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
26+
SPARK_TESTING=1 $FWDIR/../bin/sparkR --conf spark.buffer.pageSize=4m --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
2727
FAILED=$((PIPESTATUS[0]||$FAILED))
2828

2929
if [[ $FAILED != 0 ]]; then

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,14 @@ final class UnsafeShuffleExternalSorter {
5959

6060
private final Logger logger = LoggerFactory.getLogger(UnsafeShuffleExternalSorter.class);
6161

62-
private static final int PAGE_SIZE = PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES;
6362
@VisibleForTesting
6463
static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
65-
@VisibleForTesting
66-
static final int MAX_RECORD_SIZE = PAGE_SIZE - 4;
6764

6865
private final int initialSize;
6966
private final int numPartitions;
67+
private final int pageSizeBytes;
68+
@VisibleForTesting
69+
final int maxRecordSizeBytes;
7070
private final TaskMemoryManager memoryManager;
7171
private final ShuffleMemoryManager shuffleMemoryManager;
7272
private final BlockManager blockManager;
@@ -109,7 +109,10 @@ public UnsafeShuffleExternalSorter(
109109
this.numPartitions = numPartitions;
110110
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
111111
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
112-
112+
this.pageSizeBytes = (int) Math.min(
113+
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,
114+
conf.getSizeAsBytes("spark.buffer.pageSize", "64m"));
115+
this.maxRecordSizeBytes = pageSizeBytes - 4;
113116
this.writeMetrics = writeMetrics;
114117
initializeForWriting();
115118
}
@@ -272,7 +275,11 @@ void spill() throws IOException {
272275
}
273276

274277
private long getMemoryUsage() {
275-
return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
278+
long totalPageSize = 0;
279+
for (MemoryBlock page : allocatedPages) {
280+
totalPageSize += page.size();
281+
}
282+
return sorter.getMemoryUsage() + totalPageSize;
276283
}
277284

278285
private long freeMemory() {
@@ -346,23 +353,23 @@ private void allocateSpaceForRecord(int requiredSpace) throws IOException {
346353
// TODO: we should track metrics on the amount of space wasted when we roll over to a new page
347354
// without using the free space at the end of the current page. We should also do this for
348355
// BytesToBytesMap.
349-
if (requiredSpace > PAGE_SIZE) {
356+
if (requiredSpace > pageSizeBytes) {
350357
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
351-
PAGE_SIZE + ")");
358+
pageSizeBytes + ")");
352359
} else {
353-
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
354-
if (memoryAcquired < PAGE_SIZE) {
360+
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
361+
if (memoryAcquired < pageSizeBytes) {
355362
shuffleMemoryManager.release(memoryAcquired);
356363
spill();
357-
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
358-
if (memoryAcquiredAfterSpilling != PAGE_SIZE) {
364+
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
365+
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
359366
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
360-
throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory");
367+
throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
361368
}
362369
}
363-
currentPage = memoryManager.allocatePage(PAGE_SIZE);
370+
currentPage = memoryManager.allocatePage(pageSizeBytes);
364371
currentPagePosition = currentPage.getBaseOffset();
365-
freeSpaceInCurrentPage = PAGE_SIZE;
372+
freeSpaceInCurrentPage = pageSizeBytes;
366373
allocatedPages.add(currentPage);
367374
}
368375
}

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,11 @@ public UnsafeShuffleWriter(
129129
open();
130130
}
131131

132+
@VisibleForTesting
133+
public int maxRecordSizeBytes() {
134+
return sorter.maxRecordSizeBytes;
135+
}
136+
132137
/**
133138
* This convenience method should only be called in test code.
134139
*/

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
package org.apache.spark.util.collection.unsafe.sort;
1919

20-
import com.google.common.base.Charsets;
21-
import com.google.common.primitives.Longs;
22-
import com.google.common.primitives.UnsignedBytes;
20+
import com.google.common.primitives.UnsignedLongs;
2321

2422
import org.apache.spark.annotation.Private;
2523
import org.apache.spark.unsafe.types.UTF8String;
@@ -37,32 +35,11 @@ private PrefixComparators() {}
3735
public static final class StringPrefixComparator extends PrefixComparator {
3836
@Override
3937
public int compare(long aPrefix, long bPrefix) {
40-
// TODO: can done more efficiently
41-
byte[] a = Longs.toByteArray(aPrefix);
42-
byte[] b = Longs.toByteArray(bPrefix);
43-
for (int i = 0; i < 8; i++) {
44-
int c = UnsignedBytes.compare(a[i], b[i]);
45-
if (c != 0) return c;
46-
}
47-
return 0;
48-
}
49-
50-
public long computePrefix(byte[] bytes) {
51-
if (bytes == null) {
52-
return 0L;
53-
} else {
54-
byte[] padded = new byte[8];
55-
System.arraycopy(bytes, 0, padded, 0, Math.min(bytes.length, 8));
56-
return Longs.fromByteArray(padded);
57-
}
58-
}
59-
60-
public long computePrefix(String value) {
61-
return value == null ? 0L : computePrefix(value.getBytes(Charsets.UTF_8));
38+
return UnsignedLongs.compare(aPrefix, bPrefix);
6239
}
6340

6441
public long computePrefix(UTF8String value) {
65-
return value == null ? 0L : computePrefix(value.getBytes());
42+
return value == null ? 0L : value.getPrefix();
6643
}
6744
}
6845

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import java.io.IOException;
2121
import java.util.LinkedList;
2222

23+
import scala.runtime.AbstractFunction0;
24+
import scala.runtime.BoxedUnit;
25+
2326
import com.google.common.annotations.VisibleForTesting;
2427
import org.slf4j.Logger;
2528
import org.slf4j.LoggerFactory;
@@ -41,10 +44,7 @@ public final class UnsafeExternalSorter {
4144

4245
private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class);
4346

44-
private static final int PAGE_SIZE = 1 << 27; // 128 megabytes
45-
@VisibleForTesting
46-
static final int MAX_RECORD_SIZE = PAGE_SIZE - 4;
47-
47+
private final long pageSizeBytes;
4848
private final PrefixComparator prefixComparator;
4949
private final RecordComparator recordComparator;
5050
private final int initialSize;
@@ -91,7 +91,19 @@ public UnsafeExternalSorter(
9191
this.initialSize = initialSize;
9292
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
9393
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
94+
this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
9495
initializeForWriting();
96+
97+
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
98+
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
99+
// does not fully consume the sorter's output (e.g. sort followed by limit).
100+
taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() {
101+
@Override
102+
public BoxedUnit apply() {
103+
freeMemory();
104+
return null;
105+
}
106+
});
95107
}
96108

97109
// TODO: metrics tracking + integration with shuffle write metrics
@@ -147,7 +159,11 @@ public void spill() throws IOException {
147159
}
148160

149161
private long getMemoryUsage() {
150-
return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
162+
long totalPageSize = 0;
163+
for (MemoryBlock page : allocatedPages) {
164+
totalPageSize += page.size();
165+
}
166+
return sorter.getMemoryUsage() + totalPageSize;
151167
}
152168

153169
@VisibleForTesting
@@ -214,23 +230,23 @@ private void allocateSpaceForRecord(int requiredSpace) throws IOException {
214230
// TODO: we should track metrics on the amount of space wasted when we roll over to a new page
215231
// without using the free space at the end of the current page. We should also do this for
216232
// BytesToBytesMap.
217-
if (requiredSpace > PAGE_SIZE) {
233+
if (requiredSpace > pageSizeBytes) {
218234
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
219-
PAGE_SIZE + ")");
235+
pageSizeBytes + ")");
220236
} else {
221-
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
222-
if (memoryAcquired < PAGE_SIZE) {
237+
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
238+
if (memoryAcquired < pageSizeBytes) {
223239
shuffleMemoryManager.release(memoryAcquired);
224240
spill();
225-
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
226-
if (memoryAcquiredAfterSpilling != PAGE_SIZE) {
241+
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
242+
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
227243
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
228-
throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory");
244+
throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
229245
}
230246
}
231-
currentPage = memoryManager.allocatePage(PAGE_SIZE);
247+
currentPage = memoryManager.allocatePage(pageSizeBytes);
232248
currentPagePosition = currentPage.getBaseOffset();
233-
freeSpaceInCurrentPage = PAGE_SIZE;
249+
freeSpaceInCurrentPage = pageSizeBytes;
234250
allocatedPages.add(currentPage);
235251
}
236252
}

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,4 @@ private[spark] object Accumulators extends Logging {
341341
}
342342
}
343343

344-
def stringifyPartialValue(partialValue: Any): String = "%s".format(partialValue)
345-
346-
def stringifyValue(value: Any): String = "%s".format(value)
347344
}

0 commit comments

Comments
 (0)