Skip to content

Commit 0e50e7e

Browse files
ODP-1103: HADOOP-11245. Update NFS gateway to use Netty4 (apache#2832)
netty - part1 (cherry picked from commit d94759b) (cherry picked from commit 66544b9)
1 parent bec87a9 commit 0e50e7e

File tree

27 files changed

+474
-359
lines changed

27 files changed

+474
-359
lines changed

hadoop-common-project/hadoop-nfs/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@
9090
</dependency>
9191
<dependency>
9292
<groupId>io.netty</groupId>
93-
<artifactId>netty</artifactId>
93+
<artifactId>netty-all</artifactId>
9494
<scope>compile</scope>
9595
</dependency>
9696
<dependency>

hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ abstract public class MountdBase {
4141
private final RpcProgram rpcProgram;
4242
private int udpBoundPort; // Will set after server starts
4343
private int tcpBoundPort; // Will set after server starts
44+
private SimpleUdpServer udpServer = null;
45+
private SimpleTcpServer tcpServer = null;
4446

4547
public RpcProgram getRpcProgram() {
4648
return rpcProgram;
@@ -57,7 +59,7 @@ public MountdBase(RpcProgram program) throws IOException {
5759

5860
/* Start UDP server */
5961
private void startUDPServer() {
60-
SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(),
62+
udpServer = new SimpleUdpServer(rpcProgram.getPort(),
6163
rpcProgram, 1);
6264
rpcProgram.startDaemons();
6365
try {
@@ -76,7 +78,7 @@ private void startUDPServer() {
7678

7779
/* Start TCP server */
7880
private void startTCPServer() {
79-
SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
81+
tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
8082
rpcProgram, 1);
8183
rpcProgram.startDaemons();
8284
try {
@@ -118,6 +120,14 @@ public void stop() {
118120
rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP, tcpBoundPort);
119121
tcpBoundPort = 0;
120122
}
123+
if (udpServer != null) {
124+
udpServer.shutdown();
125+
udpServer = null;
126+
}
127+
if (tcpServer != null) {
128+
tcpServer.shutdown();
129+
tcpServer = null;
130+
}
121131
}
122132

123133
/**

hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public abstract class Nfs3Base {
3535
public static final Logger LOG = LoggerFactory.getLogger(Nfs3Base.class);
3636
private final RpcProgram rpcProgram;
3737
private int nfsBoundPort; // Will set after server starts
38+
private SimpleTcpServer tcpServer = null;
3839

3940
public RpcProgram getRpcProgram() {
4041
return rpcProgram;
@@ -61,7 +62,7 @@ public void start(boolean register) {
6162
}
6263

6364
private void startTCPServer() {
64-
SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
65+
tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
6566
rpcProgram, 0);
6667
rpcProgram.startDaemons();
6768
try {
@@ -84,6 +85,10 @@ public void stop() {
8485
nfsBoundPort = 0;
8586
}
8687
rpcProgram.stopDaemons();
88+
if (tcpServer != null) {
89+
tcpServer.shutdown();
90+
tcpServer = null;
91+
}
8792
}
8893
/**
8994
* Priority of the nfsd shutdown hook.

hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@
1919

2020
import java.util.Arrays;
2121

22+
import io.netty.buffer.ByteBuf;
23+
import io.netty.channel.ChannelHandlerContext;
2224
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
23-
import org.jboss.netty.buffer.ChannelBuffer;
24-
import org.jboss.netty.channel.ChannelHandlerContext;
25-
import org.jboss.netty.channel.MessageEvent;
2625
import org.slf4j.Logger;
2726
import org.slf4j.LoggerFactory;
2827

@@ -58,10 +57,10 @@ private boolean validMessageLength(int len) {
5857
}
5958

6059
@Override
61-
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
62-
ChannelBuffer buf = (ChannelBuffer) e.getMessage(); // Read reply
60+
public void channelRead(ChannelHandlerContext ctx, Object msg) {
61+
ByteBuf buf = (ByteBuf) msg; // Read reply
6362
if (!validMessageLength(buf.readableBytes())) {
64-
e.getChannel().close();
63+
ctx.channel().close();
6564
return;
6665
}
6766

@@ -83,7 +82,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
8382
RpcDeniedReply deniedReply = (RpcDeniedReply) reply;
8483
handle(deniedReply);
8584
}
86-
e.getChannel().close(); // shutdown now that request is complete
85+
ctx.channel().close(); // shutdown now that request is complete
8786
}
8887

8988
private void handle(RpcDeniedReply deniedReply) {

hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,21 @@
1919

2020
import java.net.SocketAddress;
2121

22-
import org.jboss.netty.buffer.ChannelBuffer;
23-
import org.jboss.netty.channel.Channel;
24-
import org.jboss.netty.channel.ChannelHandlerContext;
22+
import io.netty.buffer.ByteBuf;
23+
import io.netty.channel.Channel;
24+
import io.netty.channel.ChannelHandlerContext;
2525

2626
/**
2727
* RpcInfo records all contextual information of an RPC message. It contains
2828
* the RPC header, the parameters, and the information of the remote peer.
2929
*/
3030
public final class RpcInfo {
3131
private final RpcMessage header;
32-
private final ChannelBuffer data;
32+
private final ByteBuf data;
3333
private final Channel channel;
3434
private final SocketAddress remoteAddress;
3535

36-
public RpcInfo(RpcMessage header, ChannelBuffer data,
36+
public RpcInfo(RpcMessage header, ByteBuf data,
3737
ChannelHandlerContext channelContext, Channel channel,
3838
SocketAddress remoteAddress) {
3939
this.header = header;
@@ -46,7 +46,7 @@ public RpcMessage header() {
4646
return header;
4747
}
4848

49-
public ChannelBuffer data() {
49+
public ByteBuf data() {
5050
return data;
5151
}
5252

hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,24 @@
2222
import java.net.InetSocketAddress;
2323
import java.net.SocketAddress;
2424

25-
import com.google.common.annotations.VisibleForTesting;
25+
import io.netty.buffer.ByteBuf;
26+
import io.netty.buffer.Unpooled;
27+
import io.netty.channel.ChannelHandlerContext;
28+
import io.netty.channel.ChannelInboundHandlerAdapter;
29+
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
2630
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
31+
import org.apache.hadoop.oncrpc.security.Verifier;
2732
import org.apache.hadoop.oncrpc.security.VerifierNone;
2833
import org.apache.hadoop.portmap.PortmapMapping;
2934
import org.apache.hadoop.portmap.PortmapRequest;
30-
import org.jboss.netty.buffer.ChannelBuffer;
31-
import org.jboss.netty.buffer.ChannelBuffers;
32-
import org.jboss.netty.channel.ChannelHandlerContext;
33-
import org.jboss.netty.channel.MessageEvent;
34-
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
3535
import org.slf4j.Logger;
3636
import org.slf4j.LoggerFactory;
3737

3838
/**
3939
* Class for writing RPC server programs based on RFC 1050. Extend this class
4040
* and implement {@link #handleInternal} to handle the requests received.
4141
*/
42-
public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
42+
public abstract class RpcProgram extends ChannelInboundHandlerAdapter {
4343
static final Logger LOG = LoggerFactory.getLogger(RpcProgram.class);
4444
public static final int RPCB_PORT = 111;
4545
private final String program;
@@ -161,9 +161,9 @@ public void startDaemons() {}
161161
public void stopDaemons() {}
162162

163163
@Override
164-
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
164+
public void channelRead(ChannelHandlerContext ctx, Object msg)
165165
throws Exception {
166-
RpcInfo info = (RpcInfo) e.getMessage();
166+
RpcInfo info = (RpcInfo) msg;
167167
RpcCall call = (RpcCall) info.header();
168168

169169
SocketAddress remoteAddress = info.remoteAddress();
@@ -221,7 +221,7 @@ private void sendAcceptedReply(RpcCall call, SocketAddress remoteAddress,
221221
out.writeInt(lowProgVersion);
222222
out.writeInt(highProgVersion);
223223
}
224-
ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
224+
ByteBuf b = Unpooled.wrappedBuffer(out.asReadOnlyWrap()
225225
.buffer());
226226
RpcResponse rsp = new RpcResponse(b, remoteAddress);
227227
RpcUtil.sendRpcResponse(ctx, rsp);
@@ -234,7 +234,7 @@ protected static void sendRejectedReply(RpcCall call,
234234
RpcReply.ReplyState.MSG_DENIED,
235235
RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
236236
reply.write(out);
237-
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
237+
ByteBuf buf = Unpooled.wrappedBuffer(out.asReadOnlyWrap()
238238
.buffer());
239239
RpcResponse rsp = new RpcResponse(buf, remoteAddress);
240240
RpcUtil.sendRpcResponse(ctx, rsp);

hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,27 +19,30 @@
1919

2020
import java.net.SocketAddress;
2121

22-
import org.jboss.netty.buffer.ChannelBuffer;
22+
import io.netty.buffer.ByteBuf;
23+
import io.netty.channel.DefaultAddressedEnvelope;
2324

2425
/**
2526
* RpcResponse encapsulates a response to a RPC request. It contains the data
2627
* that is going to cross the wire, as well as the information of the remote
2728
* peer.
2829
*/
29-
public class RpcResponse {
30-
private final ChannelBuffer data;
31-
private final SocketAddress remoteAddress;
30+
public class RpcResponse extends
31+
DefaultAddressedEnvelope<ByteBuf, SocketAddress> {
32+
public RpcResponse(ByteBuf message, SocketAddress recipient) {
33+
super(message, recipient, null);
34+
}
3235

33-
public RpcResponse(ChannelBuffer data, SocketAddress remoteAddress) {
34-
this.data = data;
35-
this.remoteAddress = remoteAddress;
36+
public RpcResponse(ByteBuf message, SocketAddress recipient,
37+
SocketAddress sender) {
38+
super(message, recipient, sender);
3639
}
3740

38-
public ChannelBuffer data() {
39-
return data;
41+
public ByteBuf data() {
42+
return this.content();
4043
}
4144

4245
public SocketAddress remoteAddress() {
43-
return remoteAddress;
46+
return this.recipient();
4447
}
4548
}

0 commit comments

Comments
 (0)