Skip to content

Commit aa735f5

Browse files
committed
2 parents 1cf4ed2 + 1c48d4f commit aa735f5

File tree

192 files changed

+7182
-1431
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

192 files changed

+7182
-1431
lines changed

LICENSE

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -243,18 +243,18 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
243243
(BSD licence) ANTLR ST4 4.0.4 (org.antlr:ST4:4.0.4 - http://www.stringtemplate.org)
244244
(BSD licence) ANTLR StringTemplate (org.antlr:stringtemplate:3.2.1 - http://www.stringtemplate.org)
245245
(BSD License) Javolution (javolution:javolution:5.5.1 - http://javolution.org)
246-
(BSD) JLine (jline:jline:0.9.94 - http://jline.sourceforge.net)
246+
(BSD) JLine (jline:jline:2.14.3 - https://github.com/jline/jline2)
247247
(BSD) ParaNamer Core (com.thoughtworks.paranamer:paranamer:2.3 - http://paranamer.codehaus.org/paranamer)
248248
(BSD) ParaNamer Core (com.thoughtworks.paranamer:paranamer:2.6 - http://paranamer.codehaus.org/paranamer)
249249
(BSD 3 Clause) Scala (http://www.scala-lang.org/download/#License)
250250
(Interpreter classes (all .scala files in repl/src/main/scala
251251
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala),
252252
and for SerializableMapWrapper in JavaUtils.scala)
253-
(BSD-like) Scala Actors library (org.scala-lang:scala-actors:2.11.8 - http://www.scala-lang.org/)
254-
(BSD-like) Scala Compiler (org.scala-lang:scala-compiler:2.11.8 - http://www.scala-lang.org/)
255-
(BSD-like) Scala Compiler (org.scala-lang:scala-reflect:2.11.8 - http://www.scala-lang.org/)
256-
(BSD-like) Scala Library (org.scala-lang:scala-library:2.11.8 - http://www.scala-lang.org/)
257-
(BSD-like) Scalap (org.scala-lang:scalap:2.11.8 - http://www.scala-lang.org/)
253+
(BSD-like) Scala Actors library (org.scala-lang:scala-actors:2.11.12 - http://www.scala-lang.org/)
254+
(BSD-like) Scala Compiler (org.scala-lang:scala-compiler:2.11.12 - http://www.scala-lang.org/)
255+
(BSD-like) Scala Compiler (org.scala-lang:scala-reflect:2.11.12 - http://www.scala-lang.org/)
256+
(BSD-like) Scala Library (org.scala-lang:scala-library:2.11.12 - http://www.scala-lang.org/)
257+
(BSD-like) Scalap (org.scala-lang:scalap:2.11.12 - http://www.scala-lang.org/)
258258
(BSD-style) scalacheck (org.scalacheck:scalacheck_2.11:1.10.0 - http://www.scalacheck.org)
259259
(BSD-style) spire (org.spire-math:spire_2.11:0.7.1 - http://spire-math.org)
260260
(BSD-style) spire-macros (org.spire-math:spire-macros_2.11:0.7.1 - http://spire-math.org)

build/mvn

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ install_mvn() {
9393
install_zinc() {
9494
local zinc_path="zinc-0.3.15/bin/zinc"
9595
[ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1
96-
local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.typesafe.com}
96+
local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.lightbend.com}
9797

9898
install_app \
9999
"${TYPESAFE_MIRROR}/zinc/0.3.15" \
@@ -109,7 +109,7 @@ install_scala() {
109109
# determine the Scala version used in Spark
110110
local scala_version=`grep "scala.version" "${_DIR}/../pom.xml" | head -n1 | awk -F '[<>]' '{print $3}'`
111111
local scala_bin="${_DIR}/scala-${scala_version}/bin/scala"
112-
local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.typesafe.com}
112+
local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.lightbend.com}
113113

114114
install_app \
115115
"${TYPESAFE_MIRROR}/scala/${scala_version}" \
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.client;
19+
20+
public interface StreamCallbackWithID extends StreamCallback {
21+
String getID();
22+
}

common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,24 @@
2222

2323
import io.netty.buffer.ByteBuf;
2424

25+
import org.apache.spark.network.protocol.Message;
26+
import org.apache.spark.network.server.MessageHandler;
2527
import org.apache.spark.network.util.TransportFrameDecoder;
2628

2729
/**
2830
* An interceptor that is registered with the frame decoder to feed stream data to a
2931
* callback.
3032
*/
31-
class StreamInterceptor implements TransportFrameDecoder.Interceptor {
33+
public class StreamInterceptor<T extends Message> implements TransportFrameDecoder.Interceptor {
3234

33-
private final TransportResponseHandler handler;
35+
private final MessageHandler<T> handler;
3436
private final String streamId;
3537
private final long byteCount;
3638
private final StreamCallback callback;
3739
private long bytesRead;
3840

39-
StreamInterceptor(
40-
TransportResponseHandler handler,
41+
public StreamInterceptor(
42+
MessageHandler<T> handler,
4143
String streamId,
4244
long byteCount,
4345
StreamCallback callback) {
@@ -50,16 +52,24 @@ class StreamInterceptor implements TransportFrameDecoder.Interceptor {
5052

5153
@Override
5254
public void exceptionCaught(Throwable cause) throws Exception {
53-
handler.deactivateStream();
55+
deactivateStream();
5456
callback.onFailure(streamId, cause);
5557
}
5658

5759
@Override
5860
public void channelInactive() throws Exception {
59-
handler.deactivateStream();
61+
deactivateStream();
6062
callback.onFailure(streamId, new ClosedChannelException());
6163
}
6264

65+
private void deactivateStream() {
66+
if (handler instanceof TransportResponseHandler) {
67+
// we only have to do this for TransportResponseHandler as it exposes numOutstandingFetches
68+
// (there is no extra cleanup that needs to happen)
69+
((TransportResponseHandler) handler).deactivateStream();
70+
}
71+
}
72+
6373
@Override
6474
public boolean handle(ByteBuf buf) throws Exception {
6575
int toRead = (int) Math.min(buf.readableBytes(), byteCount - bytesRead);
@@ -72,10 +82,10 @@ public boolean handle(ByteBuf buf) throws Exception {
7282
RuntimeException re = new IllegalStateException(String.format(
7383
"Read too many bytes? Expected %d, but read %d.", byteCount, bytesRead));
7484
callback.onFailure(streamId, re);
75-
handler.deactivateStream();
85+
deactivateStream();
7686
throw re;
7787
} else if (bytesRead == byteCount) {
78-
handler.deactivateStream();
88+
deactivateStream();
7989
callback.onComplete(streamId);
8090
}
8191

common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java

Lines changed: 107 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,15 @@
3232
import com.google.common.base.Throwables;
3333
import com.google.common.util.concurrent.SettableFuture;
3434
import io.netty.channel.Channel;
35+
import io.netty.util.concurrent.Future;
36+
import io.netty.util.concurrent.GenericFutureListener;
3537
import org.slf4j.Logger;
3638
import org.slf4j.LoggerFactory;
3739

40+
import org.apache.spark.network.buffer.ManagedBuffer;
3841
import org.apache.spark.network.buffer.NioManagedBuffer;
39-
import org.apache.spark.network.protocol.ChunkFetchRequest;
40-
import org.apache.spark.network.protocol.OneWayMessage;
41-
import org.apache.spark.network.protocol.RpcRequest;
42-
import org.apache.spark.network.protocol.StreamChunkId;
43-
import org.apache.spark.network.protocol.StreamRequest;
42+
import org.apache.spark.network.protocol.*;
43+
4444
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
4545

4646
/**
@@ -133,34 +133,21 @@ public void fetchChunk(
133133
long streamId,
134134
int chunkIndex,
135135
ChunkReceivedCallback callback) {
136-
long startTime = System.currentTimeMillis();
137136
if (logger.isDebugEnabled()) {
138137
logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
139138
}
140139

141140
StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
142-
handler.addFetchRequest(streamChunkId, callback);
143-
144-
channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(future -> {
145-
if (future.isSuccess()) {
146-
long timeTaken = System.currentTimeMillis() - startTime;
147-
if (logger.isTraceEnabled()) {
148-
logger.trace("Sending request {} to {} took {} ms", streamChunkId,
149-
getRemoteAddress(channel), timeTaken);
150-
}
151-
} else {
152-
String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
153-
getRemoteAddress(channel), future.cause());
154-
logger.error(errorMsg, future.cause());
141+
StdChannelListener listener = new StdChannelListener(streamChunkId) {
142+
@Override
143+
void handleFailure(String errorMsg, Throwable cause) {
155144
handler.removeFetchRequest(streamChunkId);
156-
channel.close();
157-
try {
158-
callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
159-
} catch (Exception e) {
160-
logger.error("Uncaught exception in RPC response callback handler!", e);
161-
}
145+
callback.onFailure(chunkIndex, new IOException(errorMsg, cause));
162146
}
163-
});
147+
};
148+
handler.addFetchRequest(streamChunkId, callback);
149+
150+
channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(listener);
164151
}
165152

166153
/**
@@ -170,7 +157,12 @@ public void fetchChunk(
170157
* @param callback Object to call with the stream data.
171158
*/
172159
public void stream(String streamId, StreamCallback callback) {
173-
long startTime = System.currentTimeMillis();
160+
StdChannelListener listener = new StdChannelListener(streamId) {
161+
@Override
162+
void handleFailure(String errorMsg, Throwable cause) throws Exception {
163+
callback.onFailure(streamId, new IOException(errorMsg, cause));
164+
}
165+
};
174166
if (logger.isDebugEnabled()) {
175167
logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel));
176168
}
@@ -180,25 +172,7 @@ public void stream(String streamId, StreamCallback callback) {
180172
// when responses arrive.
181173
synchronized (this) {
182174
handler.addStreamCallback(streamId, callback);
183-
channel.writeAndFlush(new StreamRequest(streamId)).addListener(future -> {
184-
if (future.isSuccess()) {
185-
long timeTaken = System.currentTimeMillis() - startTime;
186-
if (logger.isTraceEnabled()) {
187-
logger.trace("Sending request for {} to {} took {} ms", streamId,
188-
getRemoteAddress(channel), timeTaken);
189-
}
190-
} else {
191-
String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
192-
getRemoteAddress(channel), future.cause());
193-
logger.error(errorMsg, future.cause());
194-
channel.close();
195-
try {
196-
callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
197-
} catch (Exception e) {
198-
logger.error("Uncaught exception in RPC response callback handler!", e);
199-
}
200-
}
201-
});
175+
channel.writeAndFlush(new StreamRequest(streamId)).addListener(listener);
202176
}
203177
}
204178

@@ -211,35 +185,44 @@ public void stream(String streamId, StreamCallback callback) {
211185
* @return The RPC's id.
212186
*/
213187
public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
214-
long startTime = System.currentTimeMillis();
215188
if (logger.isTraceEnabled()) {
216189
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
217190
}
218191

219-
long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
192+
long requestId = requestId();
220193
handler.addRpcRequest(requestId, callback);
221194

195+
RpcChannelListener listener = new RpcChannelListener(requestId, callback);
222196
channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
223-
.addListener(future -> {
224-
if (future.isSuccess()) {
225-
long timeTaken = System.currentTimeMillis() - startTime;
226-
if (logger.isTraceEnabled()) {
227-
logger.trace("Sending request {} to {} took {} ms", requestId,
228-
getRemoteAddress(channel), timeTaken);
229-
}
230-
} else {
231-
String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
232-
getRemoteAddress(channel), future.cause());
233-
logger.error(errorMsg, future.cause());
234-
handler.removeRpcRequest(requestId);
235-
channel.close();
236-
try {
237-
callback.onFailure(new IOException(errorMsg, future.cause()));
238-
} catch (Exception e) {
239-
logger.error("Uncaught exception in RPC response callback handler!", e);
240-
}
241-
}
242-
});
197+
.addListener(listener);
198+
199+
return requestId;
200+
}
201+
202+
/**
203+
* Send data to the remote end as a stream. This differs from stream() in that this is a request
204+
* to *send* data to the remote end, not to receive it from the remote.
205+
*
206+
* @param meta meta data associated with the stream, which will be read completely on the
207+
* receiving end before the stream itself.
208+
* @param data this will be streamed to the remote end to allow for transferring large amounts
209+
* of data without reading into memory.
210+
* @param callback handles the reply -- onSuccess will only be called when both message and data
211+
* are received successfully.
212+
*/
213+
public long uploadStream(
214+
ManagedBuffer meta,
215+
ManagedBuffer data,
216+
RpcResponseCallback callback) {
217+
if (logger.isTraceEnabled()) {
218+
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
219+
}
220+
221+
long requestId = requestId();
222+
handler.addRpcRequest(requestId, callback);
223+
224+
RpcChannelListener listener = new RpcChannelListener(requestId, callback);
225+
channel.writeAndFlush(new UploadStream(requestId, meta, data)).addListener(listener);
243226

244227
return requestId;
245228
}
@@ -319,4 +302,60 @@ public String toString() {
319302
.add("isActive", isActive())
320303
.toString();
321304
}
305+
306+
private static long requestId() {
307+
return Math.abs(UUID.randomUUID().getLeastSignificantBits());
308+
}
309+
310+
private class StdChannelListener
311+
implements GenericFutureListener<Future<? super Void>> {
312+
final long startTime;
313+
final Object requestId;
314+
315+
StdChannelListener(Object requestId) {
316+
this.startTime = System.currentTimeMillis();
317+
this.requestId = requestId;
318+
}
319+
320+
@Override
321+
public void operationComplete(Future future) throws Exception {
322+
if (future.isSuccess()) {
323+
if (logger.isTraceEnabled()) {
324+
long timeTaken = System.currentTimeMillis() - startTime;
325+
logger.trace("Sending request {} to {} took {} ms", requestId,
326+
getRemoteAddress(channel), timeTaken);
327+
}
328+
} else {
329+
String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
330+
getRemoteAddress(channel), future.cause());
331+
logger.error(errorMsg, future.cause());
332+
channel.close();
333+
try {
334+
handleFailure(errorMsg, future.cause());
335+
} catch (Exception e) {
336+
logger.error("Uncaught exception in RPC response callback handler!", e);
337+
}
338+
}
339+
}
340+
341+
void handleFailure(String errorMsg, Throwable cause) throws Exception {}
342+
}
343+
344+
private class RpcChannelListener extends StdChannelListener {
345+
final long rpcRequestId;
346+
final RpcResponseCallback callback;
347+
348+
RpcChannelListener(long rpcRequestId, RpcResponseCallback callback) {
349+
super("RPC " + rpcRequestId);
350+
this.rpcRequestId = rpcRequestId;
351+
this.callback = callback;
352+
}
353+
354+
@Override
355+
void handleFailure(String errorMsg, Throwable cause) {
356+
handler.removeRpcRequest(rpcRequestId);
357+
callback.onFailure(new IOException(errorMsg, cause));
358+
}
359+
}
360+
322361
}

common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.slf4j.LoggerFactory;
3030

3131
import org.apache.spark.network.client.RpcResponseCallback;
32+
import org.apache.spark.network.client.StreamCallbackWithID;
3233
import org.apache.spark.network.client.TransportClient;
3334
import org.apache.spark.network.sasl.SecretKeyHolder;
3435
import org.apache.spark.network.sasl.SaslRpcHandler;
@@ -149,6 +150,14 @@ public void receive(TransportClient client, ByteBuffer message) {
149150
delegate.receive(client, message);
150151
}
151152

153+
@Override
154+
public StreamCallbackWithID receiveStream(
155+
TransportClient client,
156+
ByteBuffer message,
157+
RpcResponseCallback callback) {
158+
return delegate.receiveStream(client, message, callback);
159+
}
160+
152161
@Override
153162
public StreamManager getStreamManager() {
154163
return delegate.getStreamManager();

0 commit comments

Comments
 (0)