Skip to content

Commit b444ff7

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into SPARK-27463-poc-arrow-stream
# Conflicts: # core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala # python/pyspark/rdd.py # python/pyspark/worker.py # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
2 parents 6bbe31c + 4ebff5b commit b444ff7

File tree

158 files changed

+5350
-1402
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

158 files changed

+5350
-1402
lines changed

R/pkg/R/DataFrame.R

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1203,7 +1203,8 @@ setMethod("collect",
12031203
requireNamespace1 <- requireNamespace
12041204
if (requireNamespace1("arrow", quietly = TRUE)) {
12051205
read_arrow <- get("read_arrow", envir = asNamespace("arrow"), inherits = FALSE)
1206-
as_tibble <- get("as_tibble", envir = asNamespace("arrow"))
1206+
# Arrow drops `as_tibble` since 0.14.0, see ARROW-5190.
1207+
useAsTibble <- exists("as_tibble", envir = asNamespace("arrow"))
12071208

12081209
portAuth <- callJMethod(x@sdf, "collectAsArrowToR")
12091210
port <- portAuth[[1]]
@@ -1213,7 +1214,12 @@ setMethod("collect",
12131214
output <- tryCatch({
12141215
doServerAuth(conn, authSecret)
12151216
arrowTable <- read_arrow(readRaw(conn))
1216-
as.data.frame(as_tibble(arrowTable), stringsAsFactors = stringsAsFactors)
1217+
if (useAsTibble) {
1218+
as_tibble <- get("as_tibble", envir = asNamespace("arrow"))
1219+
as.data.frame(as_tibble(arrowTable), stringsAsFactors = stringsAsFactors)
1220+
} else {
1221+
as.data.frame(arrowTable, stringsAsFactors = stringsAsFactors)
1222+
}
12171223
}, finally = {
12181224
close(conn)
12191225
})

R/pkg/R/deserialize.R

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,9 @@ readDeserializeInArrow <- function(inputCon) {
237237
if (requireNamespace1("arrow", quietly = TRUE)) {
238238
RecordBatchStreamReader <- get(
239239
"RecordBatchStreamReader", envir = asNamespace("arrow"), inherits = FALSE)
240-
as_tibble <- get("as_tibble", envir = asNamespace("arrow"))
240+
# Arrow drops `as_tibble` since 0.14.0, see ARROW-5190.
241+
useAsTibble <- exists("as_tibble", envir = asNamespace("arrow"))
242+
241243

242244
# Currently, there looks no way to read batch by batch by socket connection in R side,
243245
# See ARROW-4512. Therefore, it reads the whole Arrow streaming-formatted binary at once
@@ -246,8 +248,13 @@ readDeserializeInArrow <- function(inputCon) {
246248
arrowData <- readBin(inputCon, raw(), as.integer(dataLen), endian = "big")
247249
batches <- RecordBatchStreamReader(arrowData)$batches()
248250

249-
# Read all groupped batches. Tibble -> data.frame is cheap.
250-
lapply(batches, function(batch) as.data.frame(as_tibble(batch)))
251+
if (useAsTibble) {
252+
as_tibble <- get("as_tibble", envir = asNamespace("arrow"))
253+
# Read all groupped batches. Tibble -> data.frame is cheap.
254+
lapply(batches, function(batch) as.data.frame(as_tibble(batch)))
255+
} else {
256+
lapply(batches, function(batch) as.data.frame(batch))
257+
}
251258
} else {
252259
stop("'arrow' package should be installed.")
253260
}

common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -237,11 +237,16 @@ public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs) {
237237
sendRpc(message, new RpcResponseCallback() {
238238
@Override
239239
public void onSuccess(ByteBuffer response) {
240-
ByteBuffer copy = ByteBuffer.allocate(response.remaining());
241-
copy.put(response);
242-
// flip "copy" to make it readable
243-
copy.flip();
244-
result.set(copy);
240+
try {
241+
ByteBuffer copy = ByteBuffer.allocate(response.remaining());
242+
copy.put(response);
243+
// flip "copy" to make it readable
244+
copy.flip();
245+
result.set(copy);
246+
} catch (Throwable t) {
247+
logger.warn("Error in responding PRC callback", t);
248+
result.setException(t);
249+
}
245250
}
246251

247252
@Override
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.shuffle;
19+
20+
import java.io.File;
21+
import java.util.regex.Matcher;
22+
import java.util.regex.Pattern;
23+
24+
import com.google.common.annotations.VisibleForTesting;
25+
26+
import org.apache.spark.network.util.JavaUtils;
27+
28+
public class ExecutorDiskUtils {
29+
30+
private static final Pattern MULTIPLE_SEPARATORS = Pattern.compile(File.separator + "{2,}");
31+
32+
/**
33+
* Hashes a filename into the corresponding local directory, in a manner consistent with
34+
* Spark's DiskBlockManager.getFile().
35+
*/
36+
public static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) {
37+
int hash = JavaUtils.nonNegativeHash(filename);
38+
String localDir = localDirs[hash % localDirs.length];
39+
int subDirId = (hash / localDirs.length) % subDirsPerLocalDir;
40+
return new File(createNormalizedInternedPathname(
41+
localDir, String.format("%02x", subDirId), filename));
42+
}
43+
44+
/**
45+
* This method is needed to avoid the situation when multiple File instances for the
46+
* same pathname "foo/bar" are created, each with a separate copy of the "foo/bar" String.
47+
* According to measurements, in some scenarios such duplicate strings may waste a lot
48+
* of memory (~ 10% of the heap). To avoid that, we intern the pathname, and before that
49+
* we make sure that it's in a normalized form (contains no "//", "///" etc.) Otherwise,
50+
* the internal code in java.io.File would normalize it later, creating a new "foo/bar"
51+
* String copy. Unfortunately, we cannot just reuse the normalization code that java.io.File
52+
* uses, since it is in the package-private class java.io.FileSystem.
53+
*/
54+
@VisibleForTesting
55+
static String createNormalizedInternedPathname(String dir1, String dir2, String fname) {
56+
String pathname = dir1 + File.separator + dir2 + File.separator + fname;
57+
Matcher m = MULTIPLE_SEPARATORS.matcher(pathname);
58+
pathname = m.replaceAll("/");
59+
// A single trailing slash needs to be taken care of separately
60+
if (pathname.length() > 1 && pathname.endsWith("/")) {
61+
pathname = pathname.substring(0, pathname.length() - 1);
62+
}
63+
return pathname.intern();
64+
}
65+
66+
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 5 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.concurrent.ExecutionException;
2525
import java.util.concurrent.Executor;
2626
import java.util.concurrent.Executors;
27-
import java.util.regex.Matcher;
2827
import java.util.regex.Pattern;
2928

3029
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -298,15 +297,15 @@ private void deleteNonShuffleServiceServedFiles(String[] dirs) {
298297
*/
299298
private ManagedBuffer getSortBasedShuffleBlockData(
300299
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
301-
File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
300+
File indexFile = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
302301
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
303302

304303
try {
305304
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
306305
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
307306
return new FileSegmentManagedBuffer(
308307
conf,
309-
getFile(executor.localDirs, executor.subDirsPerLocalDir,
308+
ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
310309
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
311310
shuffleIndexRecord.getOffset(),
312311
shuffleIndexRecord.getLength());
@@ -317,7 +316,7 @@ private ManagedBuffer getSortBasedShuffleBlockData(
317316

318317
public ManagedBuffer getDiskPersistedRddBlockData(
319318
ExecutorShuffleInfo executor, int rddId, int splitIndex) {
320-
File file = getFile(executor.localDirs, executor.subDirsPerLocalDir,
319+
File file = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
321320
"rdd_" + rddId + "_" + splitIndex);
322321
long fileLength = file.length();
323322
ManagedBuffer res = null;
@@ -327,19 +326,6 @@ public ManagedBuffer getDiskPersistedRddBlockData(
327326
return res;
328327
}
329328

330-
/**
331-
* Hashes a filename into the corresponding local directory, in a manner consistent with
332-
* Spark's DiskBlockManager.getFile().
333-
*/
334-
@VisibleForTesting
335-
static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) {
336-
int hash = JavaUtils.nonNegativeHash(filename);
337-
String localDir = localDirs[hash % localDirs.length];
338-
int subDirId = (hash / localDirs.length) % subDirsPerLocalDir;
339-
return new File(createNormalizedInternedPathname(
340-
localDir, String.format("%02x", subDirId), filename));
341-
}
342-
343329
void close() {
344330
if (db != null) {
345331
try {
@@ -350,28 +336,6 @@ void close() {
350336
}
351337
}
352338

353-
/**
354-
* This method is needed to avoid the situation when multiple File instances for the
355-
* same pathname "foo/bar" are created, each with a separate copy of the "foo/bar" String.
356-
* According to measurements, in some scenarios such duplicate strings may waste a lot
357-
* of memory (~ 10% of the heap). To avoid that, we intern the pathname, and before that
358-
* we make sure that it's in a normalized form (contains no "//", "///" etc.) Otherwise,
359-
* the internal code in java.io.File would normalize it later, creating a new "foo/bar"
360-
* String copy. Unfortunately, we cannot just reuse the normalization code that java.io.File
361-
* uses, since it is in the package-private class java.io.FileSystem.
362-
*/
363-
@VisibleForTesting
364-
static String createNormalizedInternedPathname(String dir1, String dir2, String fname) {
365-
String pathname = dir1 + File.separator + dir2 + File.separator + fname;
366-
Matcher m = MULTIPLE_SEPARATORS.matcher(pathname);
367-
pathname = m.replaceAll("/");
368-
// A single trailing slash needs to be taken care of separately
369-
if (pathname.length() > 1 && pathname.endsWith("/")) {
370-
pathname = pathname.substring(0, pathname.length() - 1);
371-
}
372-
return pathname.intern();
373-
}
374-
375339
public int removeBlocks(String appId, String execId, String[] blockIds) {
376340
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
377341
if (executor == null) {
@@ -380,7 +344,8 @@ public int removeBlocks(String appId, String execId, String[] blockIds) {
380344
}
381345
int numRemovedBlocks = 0;
382346
for (String blockId : blockIds) {
383-
File file = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
347+
File file =
348+
ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
384349
if (file.delete()) {
385350
numRemovedBlocks++;
386351
} else {

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,16 @@ public Future<Integer> removeBlocks(
163163
client.sendRpc(removeBlocksMessage, new RpcResponseCallback() {
164164
@Override
165165
public void onSuccess(ByteBuffer response) {
166-
BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response);
167-
numRemovedBlocksFuture.complete(((BlocksRemoved)msgObj).numRemovedBlocks);
168-
client.close();
166+
try {
167+
BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response);
168+
numRemovedBlocksFuture.complete(((BlocksRemoved) msgObj).numRemovedBlocks);
169+
} catch (Throwable t) {
170+
logger.warn("Error trying to remove RDD blocks " + Arrays.toString(blockIds) +
171+
" via external shuffle service from executor: " + execId, t);
172+
numRemovedBlocksFuture.complete(0);
173+
} finally {
174+
client.close();
175+
}
169176
}
170177

171178
@Override

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public void testNormalizeAndInternPathname() {
149149

150150
private void assertPathsMatch(String p1, String p2, String p3, String expectedPathname) {
151151
String normPathname =
152-
ExternalShuffleBlockResolver.createNormalizedInternedPathname(p1, p2, p3);
152+
ExecutorDiskUtils.createNormalizedInternedPathname(p1, p2, p3);
153153
assertEquals(expectedPathname, normPathname);
154154
File file = new File(normPathname);
155155
String returnedPath = file.getPath();

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ public void insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks) thr
7676

7777
try {
7878
dataStream = new FileOutputStream(
79-
ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
79+
ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
8080
indexStream = new DataOutputStream(new FileOutputStream(
81-
ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".index")));
81+
ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId + ".index")));
8282

8383
long offset = 0;
8484
indexStream.writeLong(offset);
@@ -121,10 +121,11 @@ private void insertFile(String filename) throws IOException {
121121

122122
private void insertFile(String filename, byte[] block) throws IOException {
123123
OutputStream dataStream = null;
124-
File file = ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, filename);
124+
File file = ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename);
125125
assert(!file.exists()) : "this test file has been already generated";
126126
try {
127-
dataStream = new FileOutputStream(file);
127+
dataStream = new FileOutputStream(
128+
ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename));
128129
dataStream.write(block);
129130
} finally {
130131
Closeables.close(dataStream, false);

common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ public static void throwException(Throwable t) {
304304
static {
305305
boolean _unaligned;
306306
String arch = System.getProperty("os.arch", "");
307-
if (arch.equals("ppc64le") || arch.equals("ppc64")) {
307+
if (arch.equals("ppc64le") || arch.equals("ppc64") || arch.equals("s390x")) {
308308
// Since java.nio.Bits.unaligned() doesn't return true on ppc (See JDK-8165231), but
309309
// ppc64 and ppc64le support it
310310
_unaligned = true;

0 commit comments

Comments
 (0)