`

使用Netty实现通用二进制协议的高效数据传输

 
阅读更多

Netty是一个高性能的NIO通信框架,提供异步的、事件驱动的网络编程模型。使用Netty可以方便用户开发各种常用协议的网络程序。例如:TCP、UDP、HTTP等等。

Netty的最新版本是3.2.7,官网地址是:http://www.jboss.org/netty

本文的主要目的是基于Netty实现一个通用二进制协议的高效数据传输。协议是通用的二进制协议,高效并且扩展性很好。

一个好的协议有两个标准:

(1)生成的传输数据要少,即数据压缩比要高。这样可以减少网络开销。

(2)传输数据和业务对象之间的转换速度要快。

(友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen

一、协议的定义

无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后。

(1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
       编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、数据包长(4byte)
(2)数据:由数据包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
       数据格式定义:
       字段1键名长度    字段1键名 字段1值长度    字段1值
       字段2键名长度    字段2键名 字段2值长度    字段2值
       字段3键名长度    字段3键名 字段3值长度    字段3值
       …    …    …    …
       长度为整型,占4个字节

  代码中用两个Vo对象来表示:XLRequest和XLResponse。

<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />-->  1package org.jboss.netty.example.xlsvr.vo;
  2
  3import java.util.HashMap;
  4import java.util.Map;
  5
  6/** *//**
  7@author hankchen
10 *  2012-2-3 下午02:46:52
11 */

12
13
14/** *//**
15 * 响应数据
16 */

17
18/** *//**
19 * 通用协议介绍
20 *
21 * 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后
22 * (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
23 * 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte)
24 * (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
25 * 数据格式定义:
26 * 字段1键名长度    字段1键名 字段1值长度    字段1值
27 * 字段2键名长度    字段2键名 字段2值长度    字段2值
28 * 字段3键名长度    字段3键名 字段3值长度    字段3值
29 * …    …    …    …
30 * 长度为整型,占4个字节
31 */

32public class XLResponse {
33    private byte encode;// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1
34    private byte encrypt;// 加密类型。0表示不加密
35    private byte extend1;// 用于扩展协议。暂未定义任何值
36    private byte extend2;// 用于扩展协议。暂未定义任何值
37    private int sessionid;// 会话ID
38    private int result;// 结果码
39    private int length;// 数据包长
40   
41    private Map<String,String> values=new HashMap<String, String>();
42   
43    private String ip;
44   
45    public void setValue(String key,String value){
46        values.put(key, value);
47    }

48   
49    public String getValue(String key){
50        if (key==null) {
51            return null;
52        }

53        return values.get(key);
54    }

55
56    public byte getEncode() {
57        return encode;
58    }

59
60    public void setEncode(byte encode) {
61        this.encode = encode;
62    }

63
64    public byte getEncrypt() {
65        return encrypt;
66    }

67
68    public void setEncrypt(byte encrypt) {
69        this.encrypt = encrypt;
70    }

71
72    public byte getExtend1() {
73        return extend1;
74    }

75
76    public void setExtend1(byte extend1) {
77        this.extend1 = extend1;
78    }

79
80    public byte getExtend2() {
81        return extend2;
82    }

83
84    public void setExtend2(byte extend2) {
85        this.extend2 = extend2;
86    }

87
88    public int getSessionid() {
89        return sessionid;
90    }

91
92    public void setSessionid(int sessionid) {
93        this.sessionid = sessionid;
94    }

95
96    public int getResult() {
97        return result;
98    }

99
100    public void setResult(int result) {
101        this.result = result;
102    }

103
104    public int getLength() {
105        return length;
106    }

107
108    public void setLength(int length) {
109        this.length = length;
110    }

111
112    public Map<String, String> getValues() {
113        return values;
114    }

115
116    public String getIp() {
117        return ip;
118    }

119
120    public void setIp(String ip) {
121        this.ip = ip;
122    }

123
124    public void setValues(Map<String, String> values) {
125        this.values = values;
126    }

127
128    @Override
129    public String toString() {
130        return "XLResponse [encode=" + encode + ", encrypt=" + encrypt + ", extend1=" + extend1 + ", extend2=" + extend2
131                + ", sessionid=" + sessionid + ", result=" + result + ", length=" + length + ", values=" + values + ", ip=" + ip + "]";
132    }

133}

 

<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />-->  1package org.jboss.netty.example.xlsvr.vo;
  2
  3import java.util.HashMap;
  4import java.util.Map;
  5
  6/** *//**
  7@author hankchen
  8 *  2012-2-3 下午02:46:41
  9 */

10
11/** *//**
12 * 请求数据
13 */

14
15/** *//**
16 * 通用协议介绍
17 *
18 * 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后
19 * (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
20 * 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte)
21 * (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
22 * 数据格式定义:
23 * 字段1键名长度    字段1键名 字段1值长度    字段1值
24 * 字段2键名长度    字段2键名 字段2值长度    字段2值
25 * 字段3键名长度    字段3键名 字段3值长度    字段3值
26 * …    …    …    …
27 * 长度为整型,占4个字节
28 */

29public class XLRequest {
30    private byte encode;// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1
31    private byte encrypt;// 加密类型。0表示不加密
32    private byte extend1;// 用于扩展协议。暂未定义任何值
33    private byte extend2;// 用于扩展协议。暂未定义任何值
34    private int sessionid;// 会话ID
35    private int command;// 命令
36    private int length;// 数据包长
37   
38    private Map<String,String> params=new HashMap<String, String>(); //参数
39   
40    private String ip;
41
42    public byte getEncode() {
43        return encode;
44    }

45
46    public void setEncode(byte encode) {
47        this.encode = encode;
48    }

49
50    public byte getEncrypt() {
51        return encrypt;
52    }

53
54    public void setEncrypt(byte encrypt) {
55        this.encrypt = encrypt;
56    }

57
58    public byte getExtend1() {
59        return extend1;
60    }

61
62    public void setExtend1(byte extend1) {
63        this.extend1 = extend1;
64    }

65
66    public byte getExtend2() {
67        return extend2;
68    }

69
70    public void setExtend2(byte extend2) {
71        this.extend2 = extend2;
72    }

73
74    public int getSessionid() {
75        return sessionid;
76    }

77
78    public void setSessionid(int sessionid) {
79        this.sessionid = sessionid;
80    }

81
82    public int getCommand() {
83        return command;
84    }

85
86    public void setCommand(int command) {
87        this.command = command;
88    }

89
90    public int getLength() {
91        return length;
92    }

93
94    public void setLength(int length) {
95        this.length = length;
96    }

97
98    public Map<String, String> getParams() {
99        return params;
100    }

101   
102    public void setValue(String key,String value){
103        params.put(key, value);
104    }

105   
106    public String getValue(String key){
107        if (key==null) {
108            return null;
109        }

110        return params.get(key);
111    }

112
113    public String getIp() {
114        return ip;
115    }

116
117    public void setIp(String ip) {
118        this.ip = ip;
119    }

120
121    public void setParams(Map<String, String> params) {
122        this.params = params;
123    }

124
125    @Override
126    public String toString() {
127        return "XLRequest [encode=" + encode + ", encrypt=" + encrypt + ", extend1=" + extend1 + ", extend2=" + extend2
128                + ", sessionid=" + sessionid + ", command=" + command + ", length=" + length + ", params=" + params + ", ip=" + ip + "]";
129    }

130}

131

二、协议的编码和解码

对于自定义二进制协议,编码解码器往往是Netty开发的重点。这里直接给出相关类的代码。

 

<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />-->1package org.jboss.netty.example.xlsvr.codec;
2
3import java.nio.ByteBuffer;
4
5import org.jboss.netty.buffer.ChannelBuffer;
6import org.jboss.netty.buffer.ChannelBuffers;
7import org.jboss.netty.channel.ChannelHandlerContext;
8import org.jboss.netty.channel.Channels;
9import org.jboss.netty.channel.MessageEvent;
10import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
11import org.jboss.netty.example.xlsvr.util.ProtocolUtil;
12import org.jboss.netty.example.xlsvr.vo.XLResponse;
13import org.slf4j.Logger;
14import org.slf4j.LoggerFactory;
15
16/** *//**
17@author hankchen
18 *  2012-2-3 上午10:48:15
19 */

20
21/** *//**
22 * 服务器端编码器
23 */

24public class XLServerEncoder extends SimpleChannelDownstreamHandler {
25    Logger logger=LoggerFactory.getLogger(XLServerEncoder.class);
26   
27    @Override
28    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
29        XLResponse response=(XLResponse)e.getMessage();
30        ByteBuffer headBuffer=ByteBuffer.allocate(16);
31        /** *//**
32         * 先组织报文头
33         */

34        headBuffer.put(response.getEncode());
35        headBuffer.put(response.getEncrypt());
36        headBuffer.put(response.getExtend1());
37        headBuffer.put(response.getExtend2());
38        headBuffer.putInt(response.getSessionid());
39        headBuffer.putInt(response.getResult());
40       
41        /** *//**
42         * 组织报文的数据部分
43         */

44        ChannelBuffer dataBuffer=ProtocolUtil.encode(response.getEncode(),response.getValues());
45        int length=dataBuffer.readableBytes();
46        headBuffer.putInt(length);
47        /** *//**
48         * 非常重要
49         * ByteBuffer需要手动flip(),ChannelBuffer不需要
50         */

51        headBuffer.flip();
52        ChannelBuffer totalBuffer=ChannelBuffers.dynamicBuffer();
53        totalBuffer.writeBytes(headBuffer);
54        logger.info("totalBuffer size="+totalBuffer.readableBytes());
55        totalBuffer.writeBytes(dataBuffer);
56        logger.info("totalBuffer size="+totalBuffer.readableBytes());
57        Channels.write(ctx, e.getFuture(), totalBuffer);
58    }

59
60}

61

 

<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />-->1package org.jboss.netty.example.xlsvr.codec;
2
3import org.jboss.netty.buffer.ChannelBuffer;
4import org.jboss.netty.buffer.ChannelBuffers;
5import org.jboss.netty.channel.Channel;
6import org.jboss.netty.channel.ChannelHandlerContext;
7import org.jboss.netty.example.xlsvr.util.ProtocolUtil;
8import org.jboss.netty.example.xlsvr.vo.XLResponse;
9import org.jboss.netty.handler.codec.frame.FrameDecoder;
10
11/** *//**
12@author hankchen
13 *  2012-2-3 上午10:47:54
14 */

15
16/** *//**
17 * 客户端解码器
18 */

19public class XLClientDecoder extends FrameDecoder {
20
21    @Override
22    protected Object decode(ChannelHandlerContext context, Channel channel, ChannelBuffer buffer) throws Exception {
23        if (buffer.readableBytes()<16) {
24            return null;
25        }

26        buffer.markReaderIndex();
27        byte encode=buffer.readByte();
28        byte encrypt=buffer.readByte();
29        byte extend1=buffer.readByte();
30        byte extend2=buffer.readByte();
31        int sessionid=buffer.readInt();
32        int result=buffer.readInt();
33        int length=buffer.readInt(); // 数据包长
34        if (buffer.readableBytes()<length) {
35            buffer.resetReaderIndex();
36            return null;
37        }

38        ChannelBuffer dataBuffer=ChannelBuffers.buffer(length);
39        buffer.readBytes(dataBuffer, length);
40       
41        XLResponse response=new XLResponse();
42        response.setEncode(encode);
43        response.setEncrypt(encrypt);
44        response.setExtend1(extend1);
45        response.setExtend2(extend2);
46        response.setSessionid(sessionid);
47        response.setResult(result);
48        response.setLength(length);
49        response.setValues(ProtocolUtil.decode(encode, dataBuffer));
50        response.setIp(ProtocolUtil.getClientIp(channel));
51        return response;
52    }

53
54}

 

<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />-->  1package org.jboss.netty.example.xlsvr.util;
  2
  3import java.net.SocketAddress;
  4import java.nio.charset.Charset;
  5import java.util.HashMap;
  6import java.util.Map;
  7import java.util.Map.Entry;
  8
  9import org.jboss.netty.buffer.ChannelBuffer;
10import org.jboss.netty.buffer.ChannelBuffers;
11import org.jboss.netty.channel.Channel;
12
13/** *//**
14@author hankchen
15 *  2012-2-4 下午01:57:33
16 */

17public class ProtocolUtil {
18   
19    /** *//**
20     * 编码报文的数据部分
21     * @param encode
22     * @param values
23     * @return
24     */

25    public static ChannelBuffer encode(int encode,Map<String,String> values){
26        ChannelBuffer totalBuffer=null;
27        if (values!=null && values.size()>0) {
28            totalBuffer=ChannelBuffers.dynamicBuffer();
29            int length=0,index=0;
30            ChannelBuffer [] channelBuffers=new ChannelBuffer[values.size()];
31            Charset charset=XLCharSetFactory.getCharset(encode);
32            for(Entry<String,String> entry:values.entrySet()){
33                String key=entry.getKey();
34                String value=entry.getValue();
35                ChannelBuffer buffer=ChannelBuffers.dynamicBuffer();
36                buffer.writeInt(key.length());
37                buffer.writeBytes(key.getBytes(charset));
38                buffer.writeInt(value.length());
39                buffer.writeBytes(value.getBytes(charset));
40                channelBuffers[index++]=buffer;
41                length+=buffer.readableBytes();
42            }

43           
44            for (int i = 0; i < channelBuffers.length; i++) {
45                totalBuffer.writeBytes(channelBuffers[i]);
46            }

47        }

48        return totalBuffer;
49    }

50   
51    /** *//**
52     * 解码报文的数据部分
53     * @param encode
54     * @param dataBuffer
55     * @return
56     */

57    public static Map<String,String> decode(int encode,ChannelBuffer dataBuffer){
58        Map<String,String> dataMap=new HashMap<String, String>();
59        if (dataBuffer!=null && dataBuffer.readableBytes()>0) {
60            int processIndex=0,length=dataBuffer.readableBytes();
61            Charset charset=XLCharSetFactory.getCharset(encode);
62            while(processIndex<length){
63                /** *//**
64                 * 获取Key
65                 */

66                int size=dataBuffer.readInt();
67                byte [] contents=new byte [size];
68                dataBuffer.readBytes(contents);
69                String key=new String(contents, charset);
70                processIndex=processIndex+size+4;
71                /** *//**
72                 * 获取Value
73                 */

74                size=dataBuffer.readInt();
75                contents=new byte [size];
76                dataBuffer.readBytes(contents);
77                String value=new String(contents, charset);
78                dataMap.put(key, value);
79                processIndex=processIndex+size+4;
80            }

81        }

82        return dataMap;
83    }

84   
85    /** *//**
86     * 获取客户端IP
87     * @param channel
88     * @return
89     */

90    public static String getClientIp(Channel channel){
91        /** *//**
92         * 获取客户端IP
93         */

94        SocketAddress address = channel.getRemoteAddress();
95        String ip = "";
96        if (address != null) {
97            ip = address.toString().trim();
98            int index = ip.lastIndexOf(':');
99            if (index < 1) {
100                index = ip.length();
101            }

102            ip = ip.substring(1, index);
103        }

104        if (ip.length() > 15) {
105            ip = ip.substring(Math.max(ip.indexOf("/") + 1, ip.length() - 15));
106        }

107        return ip;
108    }

109}

110

三、服务器端实现

服务器端提供的功能是:

1、接收客户端的请求(非关闭命令),返回XLResponse类型的数据。

2、如果客户端的请求是关闭命令:shutdown,则服务器端关闭自身进程。

为了展示多协议的运用,这里客户端的请求采用的是基于问本行(\n\r)的协议。

具体代码如下:

<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />-->1package org.jboss.netty.example.xlsvr;
2
3import java.net.InetSocketAddress;
4import java.util.concurrent.Executors;
5
6import org.jboss.netty.bootstrap.ServerBootstrap;
7import org.jboss.netty.channel.Channel;
8import org.jboss.netty.channel.ChannelPipeline;
9import org.jboss.netty.channel.group.ChannelGroup;
10import org.jboss.netty.channel.group.ChannelGroupFuture;
11import org.jboss.netty.channel.group.DefaultChannelGroup;
12import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
13import org.jboss.netty.example.xlsvr.codec.XLServerEncoder;
14import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
15import org.jboss.netty.handler.codec.frame.Delimiters;
16import org.jboss.netty.handler.codec.string.StringDecoder;
17import org.jboss.netty.util.CharsetUtil;
18import org.slf4j.Logger;
19import org.slf4j.LoggerFactory;
20
21/** *//**
22@author hankchen
23 *  2012-1-30 下午03:21:38
24 */

25
26public class XLServer {
27    public static final int port =8080;
28    public static final Logger logger=LoggerFactory.getLogger(XLServer.class);
29    public static final ChannelGroup allChannels=new DefaultChannelGroup("XLServer");
30    private static final ServerBootstrap serverBootstrap=new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
31   
32    public static void main(String [] args){
33        try {
34            XLServer.startup();
35        }
 catch (Exception e) {
36            e.printStackTrace();
37        }

38    }

39   
40    public static boolean startup() throws Exception{
41        /** *//**
42         * 采用默认ChannelPipeline管道
43         * 这意味着同一个XLServerHandler实例将被多个Channel通道共享
44         * 这种方式对于XLServerHandler中无有状态的成员变量是可以的,并且可以提高性能!
45         */

46        ChannelPipeline pipeline=serverBootstrap.getPipeline();
47        /** *//**
48         * 解码器是基于文本行的协议,\r\n或者\n\r
49         */

50        pipeline.addLast("frameDecoder", new DelimiterBasedFrameDecoder(80, Delimiters.lineDelimiter()));
51        pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
52        pipeline.addLast("encoder", new XLServerEncoder());
53        pipeline.addLast("handler", new XLServerHandler());
54       
55        serverBootstrap.setOption("child.tcpNoDelay", true); //注意child前缀
56        serverBootstrap.setOption("child.keepAlive", true); //注意child前缀
57       
58        /** *//**
59         * ServerBootstrap对象的bind方法返回了一个绑定了本地地址的服务端Channel通道对象
60         */

61        Channel channel=serverBootstrap.bind(new InetSocketAddress(port));
62        allChannels.add(channel);
63        logger.info("server is started on port "+port);
64        return false;
65    }

66   
67    public static void shutdown() throws Exception{
68        try {
69            /** *//**
70             * 主动关闭服务器
71             */

72            ChannelGroupFuture future=allChannels.close();
73            future.awaitUninterruptibly();//阻塞,直到服务器关闭
74            //serverBootstrap.releaseExternalResources();
75        }
 catch (Exception e) {
76            e.printStackTrace();
77            logger.error(e.getMessage(),e);
78        }

79        finally{
80            logger.info("server is shutdown on port "+port);
81            System.exit(1);
82        }

83    }

84}

85

 

<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />-->  1package org.jboss.netty.example.xlsvr;
  2
  3import java.util.Random;
  4
  5import org.jboss.netty.channel.Channel;
  6import org.jboss.netty.channel.ChannelFuture;
  7import org.jboss.netty.channel.ChannelHandlerContext;
  8import org.jboss.netty.channel.ChannelHandler.Sharable;
  9import org.jboss.netty.channel.ChannelStateEvent;
10import org.jboss.netty.channel.ExceptionEvent;
11import org.jboss.netty.channel.MessageEvent;
12import org.jboss.netty.channel.SimpleChannelHandler;
13import org.jboss.netty.example.xlsvr.vo.XLResponse;
14import org.slf4j.Logger;
15import org.slf4j.LoggerFactory;
16
17/** *//**
18@author hankchen
19 *  2012-1-30 下午03:22:24
20 */

21
22@Sharable
23public class XLServerHandler extends SimpleChannelHandler {
24    private static final Logger logger=LoggerFactory.getLogger(XLServerHandler.class);
25   
26    @Override
27    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
28        logger.info("messageReceived");
29        if (e.getMessage() instanceof String) {
30            String content=(String)e.getMessage();
31            logger.info("content is "+content);
32            if ("shutdown".equalsIgnoreCase(content)) {
33                //e.getChannel().close();
34                XLServer.shutdown();
35            }
else {
36                sendResponse(ctx);
37            }

38        }
else {
39            logger.error("message is not a String.");
40            e.getChannel().close();
41        }

42    }

43
44    @Override
45    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
46        logger.error(e.getCause().getMessage(),e.getCause());
47        e.getCause().printStackTrace();
48        e.getChannel().close();
49    }

50
51    @Override
52    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
53        logger.info("channelConnected");
54        sendResponse(ctx);
55    }

56
57    @Override
58    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
59        logger.info("channelClosed");
60        //删除通道
61        XLServer.allChannels.remove(e.getChannel());
62    }

63
64    @Override
65    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
66        logger.info("channelDisconnected");
67        super.channelDisconnected(ctx, e);
68    }

69
70    @Override
71    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
72        logger.info("channelOpen");
73        //增加通道
74        XLServer.allChannels.add(e.getChannel());
75    }

76
77    /** *//**
78     * 发送响应内容
79     * @param ctx
80     * @param e
81     * @return
82     */

83    private ChannelFuture sendResponse(ChannelHandlerContext ctx){
84        Channel channel=ctx.getChannel();
85        Random random=new Random();
86        XLResponse response=new XLResponse();
87        response.setEncode((byte)0);
88        response.setResult(1);
89        response.setValue("name","hankchen");
90        response.setValue("time", String.valueOf(System.currentTimeMillis()));
91        response.setValue("age",String.valueOf(random.nextInt()));
92        /** *//**
93         * 发送接收信息的时间戳到客户端
94         * 注意:Netty中所有的IO操作都是异步的!
95         */

96        ChannelFuture future=channel.write(response); //发送内容
97        return future;
98    }

99}

100

四、客户端实现

客户端的功能是连接服务器,发送10次请求,然后发送关闭服务器的命令,最后主动关闭客户端。

关键代码如下:

<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />--> 1/** *//**
2 *  Copyright (C): 2012
3@author hankchen
4 *  2012-1-30 下午03:21:26
5 */

6
7/** *//**
8 * 服务器特征:
9 * 1、使用专用解码器解析服务器发过来的数据
10 * 2、客户端主动关闭连接
11 */

12public class XLClient {
13    public static final int port =XLServer.port;
14    public static final String host ="localhost";
15    private static final Logger logger=LoggerFactory.getLogger(XLClient.class);
16    private static final NioClientSocketChannelFactory clientSocketChannelFactory=new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());
17    private static final ClientBootstrap clientBootstrap=new ClientBootstrap(clientSocketChannelFactory);
18   
19    /** *//**
20     * @param args
21     * @throws Exception
22     */

23    public static void main(String[] args) throws Exception {
24        ChannelFuture future=XLClient.startup();
25        logger.info("future state is "+future.isSuccess());
26    }

27   
28    /** *//**
29     * 启动客户端
30     * @return
31     * @throws Exception
32     */

33    public static ChannelFuture startup() throws Exception {
34        /** *//**
35         * 注意:由于XLClientHandler中有状态的成员变量,因此不能采用默认共享ChannelPipeline的方式
36         * 例如,下面的代码形式是错误的:
37         * ChannelPipeline pipeline=clientBootstrap.getPipeline();
38         * pipeline.addLast("handler", new XLClientHandler());
39         */

40        clientBootstrap.setPipelineFactory(new XLClientPipelineFactory()); //只能这样设置
41        /** *//**
42         * 请注意,这里不存在使用“child.”前缀的配置项,客户端的SocketChannel实例不存在父级Channel对象
43         */

44        clientBootstrap.setOption("tcpNoDelay", true);
45        clientBootstrap.setOption("keepAlive", true);
46       
47        ChannelFuture future=clientBootstrap.connect(new InetSocketAddress(host, port));
48        /** *//**
49         * 阻塞式的等待,直到ChannelFuture对象返回这个连接操作的成功或失败状态
50         */

51        future.awaitUninterruptibly();
52        /** *//**
53         * 如果连接失败,我们将打印连接失败的原因。
54         * 如果连接操作没有成功或者被取消,ChannelFuture对象的getCause()方法将返回连接失败的原因。
55         */

56        if (!future.isSuccess()) {
57            future.getCause().printStackTrace();
58        }
else {
59            logger.info("client is connected to server "+host+":"+port);
60        }

61        return future;
62    }

63   
64    /** *//**
65     * 关闭客户端
66     * @param future
67     * @throws Exception
68     */

69    public static void shutdown(ChannelFuture future) throws Exception{
70        try {
71            /** *//**
72             * 主动关闭客户端连接,会阻塞等待直到通道关闭
73             */

74            future.getChannel().close().awaitUninterruptibly();
75            //future.getChannel().getCloseFuture().awaitUninterruptibly();
76            /** *//**
77             * 释放ChannelFactory通道工厂使用的资源。
78             * 这一步仅需要调用 releaseExternalResources()方法即可。
79             * 包括NIO Secector和线程池在内的所有资源将被自动的关闭和终止。
80             */

81            clientBootstrap.releaseExternalResources();
82        }
 catch (Exception e) {
83            e.printStackTrace();
84            logger.error(e.getMessage(),e);
85        }

86        finally{
87            System.exit(1);
88            logger.info("client is shutdown to server "+host+":"+port);
89        }

90    }

91}

 

<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />--> 1public class XLClientPipelineFactory implements ChannelPipelineFactory{
2
3    @Override
4    public ChannelPipeline getPipeline() throws Exception {
5        ChannelPipeline pipeline=Channels.pipeline();
6        /** *//**
7         * 使用专用的解码器,解决数据分段的问题
8         * 从业务逻辑代码中分离协议处理部分总是一个很不错的想法。
9         */

10        pipeline.addLast("decoder", new XLClientDecoder());
11        /** *//**
12         * 有专门的编码解码器,这时处理器就不需要管数据分段和数据格式问题,只需要关注业务逻辑了!
13         */

14        pipeline.addLast("handler", new XLClientHandler());
15        return pipeline;
16    }

17
18}

 

<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />--> 1/** *//**
2 *  Copyright (C): 2012
3@author hankchen
4 *  2012-1-30 下午03:21:52
5 */

6
7/** *//**
8 * 服务器特征:
9 * 1、使用专用的编码解码器,解决数据分段的问题
10 * 2、使用POJO替代ChannelBuffer传输
11 */

12public class XLClientHandler extends SimpleChannelHandler {
13    private static final Logger logger=LoggerFactory.getLogger(XLClientHandler.class);
14    private final AtomicInteger count=new AtomicInteger(0); //计数器
15   
16    @Override
17    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
18        processMethod1(ctx, e); //处理方式一
19    }

20   
21    /** *//**
22     * @param ctx
23     * @param e
24     * @throws Exception
25     */

26    public void processMethod1(ChannelHandlerContext ctx, MessageEvent e) throws Exception{
27        logger.info("processMethod1……,count="+count.addAndGet(1));
28        XLResponse serverTime=(XLResponse)e.getMessage();
29        logger.info("messageReceived,content:"+serverTime.toString());
30        Thread.sleep(1000);
31       
32        if (count.get()<10) {
33            //从新发送请求获取最新的服务器时间
34            ctx.getChannel().write(ChannelBuffers.wrappedBuffer("again\r\n".getBytes()));
35        }
else{
36            //从新发送请求关闭服务器
37            ctx.getChannel().write(ChannelBuffers.wrappedBuffer("shutdown\r\n".getBytes()));
38        }

39    }

40   
41    @Override
42    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
43        logger.info("exceptionCaught");
44        e.getCause().printStackTrace();
45        ctx.getChannel().close();
46    }

47
48    @Override
49    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
50        logger.info("channelClosed");
51        super.channelClosed(ctx, e);
52    }

53   
54   
55}

全文代码较多,写了很多注释,希望对读者有用,谢谢!

(友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen

分享到:
评论
5 楼 xuzhou530 2013-07-24  
为什么
int length = buffer.readInt(); // 数据包长

if (buffer.readableBytes() < length) {
buffer.resetReaderIndex();
return null;
}
一直都会进入这里?
4 楼 xuzhou530 2013-07-24  
我使用你上面的代码启动服务端没有问题,启动客户端出现了下面问题什么原因请指教:
INFO (  XLServer) server is started on port 8080
INFO (XLServerHandler) channelOpen
INFO (XLServerHandler) channelConnected
接受到信息writeRequested:XLResponse [encode=0, encrypt=0, extend=0, extend =0, sessionid=0, result=1, length=0, values={time=1374653003832, age=-182129588, name=hankchen}, ip=null]
ERROR(XLServerHandler) charsetName
java.lang.NullPointerException: charsetName
at com.factory.XLCharSetFactory.getCharset(XLCharSetFactory.java:25)
at com.util.ProtocolUtil.encode(ProtocolUtil.java:33)
at com.codec.XLServerEncoder.writeRequested(XLServerEncoder.java:46)
at org.jboss.netty.channel.Channels.write(Channels.java:611)
at org.jboss.netty.channel.Channels.write(Channels.java:578)
at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:251)
at com.handle.XLServerHandler.sendResponse(XLServerHandler.java:104)
at com.handle.XLServerHandler.channelConnected(XLServerHandler.java:58)
at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:66)
at org.jboss.netty.channel.Channels.fireChannelConnected(Channels.java:233)
at org.jboss.netty.channel.socket.nio.NioAcceptedSocketChannel.<init>(NioAcceptedSocketChannel.java:53)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink$Boss.registerAcceptedChannel(NioServerSocketPipelineSink.java:285)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink$Boss.run(NioServerSocketPipelineSink.java:249)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
3 楼 xuzhou530 2013-07-24  
哦哦,谢谢,了
2 楼 drager 2013-06-14  
xuzhou530 写道
XLCharSetFactory这个类没有呀,这个类干嘛的


我给你补充一下这个类吧


package org.jboss.netty.example.xlsvr.factory;

import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;

public class XLCharSetFactory {

static String[] charsetArray={"UTF-8","GBK","GB2312","ISO8859-1"};
/**
* :0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1
* @param encode
* @return
* @throws InstantiationException
* @throws IllegalAccessException
* @throws ClassNotFoundException
*/
public static Charset getCharset(int  encode)  {

Charset charset=null;

if (encode == 0)
    throw new NullPointerException("charsetName");

try {
switch(encode)
{
case 0:
charset = Charset.forName(charsetArray[0]);
            break;
case 1:
charset = Charset.forName(charsetArray[1]);
    break;
case 2:
charset = Charset.forName(charsetArray[2]);
   break;
case 3:
charset = Charset.forName(charsetArray[3]);
    break;
}

} catch (IllegalCharsetNameException e) {
e.printStackTrace();
        } catch (UnsupportedCharsetException e) {
        e.printStackTrace();
    }
return charset;
    }
}
1 楼 xuzhou530 2013-06-04  
XLCharSetFactory这个类没有呀,这个类干嘛的

相关推荐

    java开源包3

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包4

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包8

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包1

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包11

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包2

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包6

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包5

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包10

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包7

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包9

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包101

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    Java资源包01

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    JAVA上百实例源码以及开源项目源代码

    2个目标文件,FTP的目标是:(1)提高文件的共享性(计算机程序和/或数据),(2)鼓励间接地(通过程序)使用远程计算机,(3)保护用户因主机之间的文件存储系统导致的变化,(4)为了可靠和高效地传输,虽然用户...

    JAVA上百实例源码以及开源项目

    2个目标文件,FTP的目标是:(1)提高文件的共享性(计算机程序和/或数据),(2)鼓励间接地(通过程序)使用远程计算机,(3)保护用户因主机之间的文件存储系统导致的变化,(4)为了可靠和高效地传输,虽然用户...

Global site tag (gtag.js) - Google Analytics