Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hadoop-common-project/hadoop-nfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<artifactId>netty-all</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ abstract public class MountdBase {
private final RpcProgram rpcProgram;
private int udpBoundPort; // Will set after server starts
private int tcpBoundPort; // Will set after server starts
private SimpleUdpServer udpServer = null;
private SimpleTcpServer tcpServer = null;

public RpcProgram getRpcProgram() {
return rpcProgram;
Expand All @@ -57,7 +59,7 @@ public MountdBase(RpcProgram program) throws IOException {

/* Start UDP server */
private void startUDPServer() {
SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(),
udpServer = new SimpleUdpServer(rpcProgram.getPort(),
rpcProgram, 1);
rpcProgram.startDaemons();
try {
Expand All @@ -76,7 +78,7 @@ private void startUDPServer() {

/* Start TCP server */
private void startTCPServer() {
SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
rpcProgram, 1);
rpcProgram.startDaemons();
try {
Expand Down Expand Up @@ -118,6 +120,14 @@ public void stop() {
rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP, tcpBoundPort);
tcpBoundPort = 0;
}
if (udpServer != null) {
udpServer.shutdown();
udpServer = null;
}
if (tcpServer != null) {
tcpServer.shutdown();
tcpServer = null;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public abstract class Nfs3Base {
public static final Logger LOG = LoggerFactory.getLogger(Nfs3Base.class);
private final RpcProgram rpcProgram;
private int nfsBoundPort; // Will set after server starts
private SimpleTcpServer tcpServer = null;

public RpcProgram getRpcProgram() {
return rpcProgram;
Expand All @@ -61,7 +62,7 @@ public void start(boolean register) {
}

private void startTCPServer() {
SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
rpcProgram, 0);
rpcProgram.startDaemons();
try {
Expand All @@ -84,6 +85,10 @@ public void stop() {
nfsBoundPort = 0;
}
rpcProgram.stopDaemons();
if (tcpServer != null) {
tcpServer.shutdown();
tcpServer = null;
}
}
/**
* Priority of the nfsd shutdown hook.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

import java.util.Arrays;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -58,10 +57,10 @@ private boolean validMessageLength(int len) {
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
ChannelBuffer buf = (ChannelBuffer) e.getMessage(); // Read reply
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg; // Read reply
if (!validMessageLength(buf.readableBytes())) {
e.getChannel().close();
ctx.channel().close();
return;
}

Expand All @@ -83,7 +82,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
RpcDeniedReply deniedReply = (RpcDeniedReply) reply;
handle(deniedReply);
}
e.getChannel().close(); // shutdown now that request is complete
ctx.channel().close(); // shutdown now that request is complete
}

private void handle(RpcDeniedReply deniedReply) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@

import java.net.SocketAddress;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;

/**
* RpcInfo records all contextual information of an RPC message. It contains
* the RPC header, the parameters, and the information of the remote peer.
*/
public final class RpcInfo {
private final RpcMessage header;
private final ChannelBuffer data;
private final ByteBuf data;
private final Channel channel;
private final SocketAddress remoteAddress;

public RpcInfo(RpcMessage header, ChannelBuffer data,
public RpcInfo(RpcMessage header, ByteBuf data,
ChannelHandlerContext channelContext, Channel channel,
SocketAddress remoteAddress) {
this.header = header;
Expand All @@ -46,7 +46,7 @@ public RpcMessage header() {
return header;
}

public ChannelBuffer data() {
public ByteBuf data() {
return data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,23 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.portmap.PortmapMapping;
import org.apache.hadoop.portmap.PortmapRequest;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class for writing RPC server programs based on RFC 1050. Extend this class
* and implement {@link #handleInternal} to handle the requests received.
*/
public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
public abstract class RpcProgram extends ChannelInboundHandlerAdapter {
static final Logger LOG = LoggerFactory.getLogger(RpcProgram.class);
public static final int RPCB_PORT = 111;
private final String program;
Expand Down Expand Up @@ -161,9 +160,9 @@ public void startDaemons() {}
public void stopDaemons() {}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
RpcInfo info = (RpcInfo) e.getMessage();
RpcInfo info = (RpcInfo) msg;
RpcCall call = (RpcCall) info.header();

SocketAddress remoteAddress = info.remoteAddress();
Expand Down Expand Up @@ -221,7 +220,7 @@ private void sendAcceptedReply(RpcCall call, SocketAddress remoteAddress,
out.writeInt(lowProgVersion);
out.writeInt(highProgVersion);
}
ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
ByteBuf b = Unpooled.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(b, remoteAddress);
RpcUtil.sendRpcResponse(ctx, rsp);
Expand All @@ -234,7 +233,7 @@ protected static void sendRejectedReply(RpcCall call,
RpcReply.ReplyState.MSG_DENIED,
RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
reply.write(out);
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
ByteBuf buf = Unpooled.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(buf, remoteAddress);
RpcUtil.sendRpcResponse(ctx, rsp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,30 @@

import java.net.SocketAddress;

import org.jboss.netty.buffer.ChannelBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.DefaultAddressedEnvelope;

/**
* RpcResponse encapsulates a response to a RPC request. It contains the data
* that is going to cross the wire, as well as the information of the remote
* peer.
*/
public class RpcResponse {
private final ChannelBuffer data;
private final SocketAddress remoteAddress;
public class RpcResponse extends
DefaultAddressedEnvelope<ByteBuf, SocketAddress> {
public RpcResponse(ByteBuf message, SocketAddress recipient) {
super(message, recipient, null);
}

public RpcResponse(ChannelBuffer data, SocketAddress remoteAddress) {
this.data = data;
this.remoteAddress = remoteAddress;
public RpcResponse(ByteBuf message, SocketAddress recipient,
SocketAddress sender) {
super(message, recipient, sender);
}

public ChannelBuffer data() {
return data;
public ByteBuf data() {
return this.content();
}

public SocketAddress remoteAddress() {
return remoteAddress;
return this.recipient();
}
}
Loading