Skip to content

Commit af9d7e3

Browse files
author
John Lee
committed
tested cluster & client mode
1 parent 14c680c commit af9d7e3

File tree

3 files changed

+11
-10
lines changed

3 files changed

+11
-10
lines changed

common/network-common/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public void receive(TransportClient client, ByteBuffer message, RpcResponseCallb
114114
// method returns. This assumes that the code ensures, through other means, that no outbound
115115
// messages are being written to the channel while negotiation is still going on.
116116
if (saslServer.isComplete()) {
117-
cient.setClientUser(saslServer.getUserName());
117+
client.setClientUser(saslServer.getUserName());
118118
if (!SparkSaslServer.QOP_AUTH_CONF.equals(saslServer.getNegotiatedProperty(Sasl.QOP))) {
119119
logger.debug("SASL authentication successful for channel {}", client);
120120
complete(true);

core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ private[netty] class RequestMessage(
544544
val senderAddress: RpcAddress,
545545
val receiver: NettyRpcEndpointRef,
546546
val content: Any,
547-
val senderUserName: String = null){
547+
val senderUserName: String = null) {
548548

549549
/** Manually serialize [[RequestMessage]] to minimize the size. */
550550
def serialize(nettyEnv: NettyRpcEnv): ByteBuffer = {
@@ -590,7 +590,11 @@ private[netty] object RequestMessage {
590590
}
591591
}
592592

593-
def apply(nettyEnv: NettyRpcEnv, client: TransportClient, bytes: ByteBuffer): RequestMessage = {
593+
def apply(
594+
nettyEnv: NettyRpcEnv,
595+
client: TransportClient,
596+
bytes: ByteBuffer,
597+
senderUserName: String = null): RequestMessage = {
594598
val bis = new ByteBufferInputStream(bytes)
595599
val in = new DataInputStream(bis)
596600
try {
@@ -602,7 +606,8 @@ private[netty] object RequestMessage {
602606
senderAddress,
603607
ref,
604608
// The remaining bytes in `bytes` are the message content.
605-
nettyEnv.deserialize(client, bytes))
609+
nettyEnv.deserialize(client, bytes),
610+
senderUserName)
606611
} finally {
607612
in.close()
608613
}
@@ -653,12 +658,8 @@ private[netty] class NettyRpcHandler(
653658
val addr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress]
654659
assert(addr != null)
655660
val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
656-
val requestMessage = RequestMessage(nettyEnv, client, message)
661+
var requestMessage = RequestMessage(nettyEnv, client, message, client.getClientUser)
657662

658-
if (client.getClientUser != null) {
659-
requestMessage = RequestMessage(requestMessage.senderAddress, requestMessage.receiver,
660-
requestMessage.content, client.getClientUser)
661-
}
662663
if (requestMessage.senderAddress == null) {
663664
// Create a new message with the socket address of the client as the sender.
664665
new RequestMessage(clientAddr, requestMessage.receiver, requestMessage.content,

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ private[spark] class YarnRMClient extends Logging {
7575

7676
logInfo("Registering the ApplicationMaster")
7777
synchronized {
78-
var response = amClient.registerApplicationMaster(Utils.localHostName(), port, uiAddress)
78+
var response = amClient.registerApplicationMaster(Utils.localHostName(), port, trackingUrl)
7979
registered = true
8080
masterkey = response.getClientToAMTokenMasterKey()
8181
}

0 commit comments

Comments
 (0)