Skip to content

Commit c1f85fc

Browse files
author
Marcelo Vanzin
committed
[SPARK-11956][CORE] Fix a few bugs in network lib-based file transfer.
- NettyRpcEnv::openStream() now correctly propagates errors to the read side of the pipe. - NettyStreamManager now throws if the file being transferred does not exist. - The network library now correctly handles zero-sized streams. Author: Marcelo Vanzin <[email protected]> Closes #9941 from vanzin/SPARK-11956.
1 parent 0a5aef7 commit c1f85fc

File tree

5 files changed

+75
-24
lines changed

5 files changed

+75
-24
lines changed

core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import javax.annotation.Nullable
2727

2828
import scala.concurrent.{Future, Promise}
2929
import scala.reflect.ClassTag
30-
import scala.util.{DynamicVariable, Failure, Success}
30+
import scala.util.{DynamicVariable, Failure, Success, Try}
3131
import scala.util.control.NonFatal
3232

3333
import org.apache.spark.{Logging, SecurityManager, SparkConf}
@@ -368,13 +368,22 @@ private[netty] class NettyRpcEnv(
368368

369369
@volatile private var error: Throwable = _
370370

371-
def setError(e: Throwable): Unit = error = e
371+
def setError(e: Throwable): Unit = {
372+
error = e
373+
source.close()
374+
}
372375

373376
override def read(dst: ByteBuffer): Int = {
374-
if (error != null) {
375-
throw error
377+
val result = if (error == null) {
378+
Try(source.read(dst))
379+
} else {
380+
Failure(error)
381+
}
382+
383+
result match {
384+
case Success(bytesRead) => bytesRead
385+
case Failure(error) => throw error
376386
}
377-
source.read(dst)
378387
}
379388

380389
override def close(): Unit = source.close()

core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
4444
case _ => throw new IllegalArgumentException(s"Invalid file type: $ftype")
4545
}
4646

47-
require(file != null, s"File not found: $streamId")
47+
require(file != null && file.isFile(), s"File not found: $streamId")
4848
new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length())
4949
}
5050

core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -729,23 +729,36 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
729729
val tempDir = Utils.createTempDir()
730730
val file = new File(tempDir, "file")
731731
Files.write(UUID.randomUUID().toString(), file, UTF_8)
732+
val empty = new File(tempDir, "empty")
733+
Files.write("", empty, UTF_8);
732734
val jar = new File(tempDir, "jar")
733735
Files.write(UUID.randomUUID().toString(), jar, UTF_8)
734736

735737
val fileUri = env.fileServer.addFile(file)
738+
val emptyUri = env.fileServer.addFile(empty)
736739
val jarUri = env.fileServer.addJar(jar)
737740

738741
val destDir = Utils.createTempDir()
739-
val destFile = new File(destDir, file.getName())
740-
val destJar = new File(destDir, jar.getName())
741-
742742
val sm = new SecurityManager(conf)
743743
val hc = SparkHadoopUtil.get.conf
744-
Utils.fetchFile(fileUri, destDir, conf, sm, hc, 0L, false)
745-
Utils.fetchFile(jarUri, destDir, conf, sm, hc, 0L, false)
746744

747-
assert(Files.equal(file, destFile))
748-
assert(Files.equal(jar, destJar))
745+
val files = Seq(
746+
(file, fileUri),
747+
(empty, emptyUri),
748+
(jar, jarUri))
749+
files.foreach { case (f, uri) =>
750+
val destFile = new File(destDir, f.getName())
751+
Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)
752+
assert(Files.equal(f, destFile))
753+
}
754+
755+
// Try to download files that do not exist.
756+
Seq("files", "jars").foreach { root =>
757+
intercept[Exception] {
758+
val uri = env.address.toSparkURL + s"/$root/doesNotExist"
759+
Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)
760+
}
761+
}
749762
}
750763

751764
}

network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -185,16 +185,24 @@ public void handle(ResponseMessage message) {
185185
StreamResponse resp = (StreamResponse) message;
186186
StreamCallback callback = streamCallbacks.poll();
187187
if (callback != null) {
188-
StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
189-
callback);
190-
try {
191-
TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
192-
channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
193-
frameDecoder.setInterceptor(interceptor);
194-
streamActive = true;
195-
} catch (Exception e) {
196-
logger.error("Error installing stream handler.", e);
197-
deactivateStream();
188+
if (resp.byteCount > 0) {
189+
StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
190+
callback);
191+
try {
192+
TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
193+
channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
194+
frameDecoder.setInterceptor(interceptor);
195+
streamActive = true;
196+
} catch (Exception e) {
197+
logger.error("Error installing stream handler.", e);
198+
deactivateStream();
199+
}
200+
} else {
201+
try {
202+
callback.onComplete(resp.streamId);
203+
} catch (Exception e) {
204+
logger.warn("Error in stream handler onComplete().", e);
205+
}
198206
}
199207
} else {
200208
logger.error("Could not find callback for StreamResponse.");

network/common/src/test/java/org/apache/spark/network/StreamSuite.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,14 @@
5151
import org.apache.spark.network.util.TransportConf;
5252

5353
public class StreamSuite {
54-
private static final String[] STREAMS = { "largeBuffer", "smallBuffer", "file" };
54+
private static final String[] STREAMS = { "largeBuffer", "smallBuffer", "emptyBuffer", "file" };
5555

5656
private static TransportServer server;
5757
private static TransportClientFactory clientFactory;
5858
private static File testFile;
5959
private static File tempDir;
6060

61+
private static ByteBuffer emptyBuffer;
6162
private static ByteBuffer smallBuffer;
6263
private static ByteBuffer largeBuffer;
6364

@@ -73,6 +74,7 @@ private static ByteBuffer createBuffer(int bufSize) {
7374
@BeforeClass
7475
public static void setUp() throws Exception {
7576
tempDir = Files.createTempDir();
77+
emptyBuffer = createBuffer(0);
7678
smallBuffer = createBuffer(100);
7779
largeBuffer = createBuffer(100000);
7880

@@ -103,6 +105,8 @@ public ManagedBuffer openStream(String streamId) {
103105
return new NioManagedBuffer(largeBuffer);
104106
case "smallBuffer":
105107
return new NioManagedBuffer(smallBuffer);
108+
case "emptyBuffer":
109+
return new NioManagedBuffer(emptyBuffer);
106110
case "file":
107111
return new FileSegmentManagedBuffer(conf, testFile, 0, testFile.length());
108112
default:
@@ -138,6 +142,18 @@ public static void tearDown() {
138142
}
139143
}
140144

145+
@Test
146+
public void testZeroLengthStream() throws Throwable {
147+
TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
148+
try {
149+
StreamTask task = new StreamTask(client, "emptyBuffer", TimeUnit.SECONDS.toMillis(5));
150+
task.run();
151+
task.check();
152+
} finally {
153+
client.close();
154+
}
155+
}
156+
141157
@Test
142158
public void testSingleStream() throws Throwable {
143159
TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
@@ -226,6 +242,11 @@ public void run() {
226242
outFile = File.createTempFile("data", ".tmp", tempDir);
227243
out = new FileOutputStream(outFile);
228244
break;
245+
case "emptyBuffer":
246+
baos = new ByteArrayOutputStream();
247+
out = baos;
248+
srcBuffer = emptyBuffer;
249+
break;
229250
default:
230251
throw new IllegalArgumentException(streamId);
231252
}

0 commit comments

Comments
 (0)