Netty作为很多高性能的底层通讯工具,被很多开发框架应用再底层,今天来说说常用的序列化工具,用Jboss的Marshalling。
直接上代码,Marshalling的工厂类
package com.netty.serialize.marshalling;
import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
/**
* Created by sdc on 2017/8/28.
*/
public class MarshallingCodeCFactory {
/**
* 解码
* @return
*/
public static MarshallingDecoder buildMarshallingDecoder() {
//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
//根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
return decoder;
}
/**
* 编码
* @return
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
这个是Marshalling的序列化方式,Marshalling自带编解码,所以不用担心中途编解码半包的问题。
服务端的Server实现:
package com.netty.serialize.server;
import com.netty.serialize.coder.MsgDecoder;
import com.netty.serialize.coder.MsgEncoder;
import com.netty.serialize.handler.ServerHandler;
import com.netty.serialize.marshalling.MarshallingCodeCFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Created by sdc on 2017/8/26.
*/
public class MsgServer {
public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap sb = new ServerBootstrap();
sb.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// .childHandler(new ChildChannelHandler())
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
channel.pipeline().addLast(new ServerHandler());
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture cf = sb.bind(port).sync();
System.out.println("服务端已启动");
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static class ChildChannelHandler extends ChannelInitializer {
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
channel.pipeline().addLast(new ServerHandler());
}
}
public static void main(String[] args){
try {
new MsgServer().bind(8080);
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.netty.serialize.handler;
import com.netty.serialize.message.Message;
import com.netty.serialize.message.MsgHeader;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
* 用于测试服务端实现的
* Created by sdc on 2017/8/29.
*/
public class ServerHandler extends ChannelHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
// System.out.println("active");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
// ctx.close();
super.exceptionCaught(ctx, cause);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Message newMsg = (Message)msg;
// String msgStrClient = (String)msg;
System.out.println("获取客户端里的内容:" + newMsg);
Message message = new Message();
String msgStr = "客户端接受到通知";
MsgHeader header = new MsgHeader();
header.setStartTag(new Byte("0"));
header.setCmdCode("1234".getBytes());
header.setLength(msgStr.length());
header.setVersion("11".getBytes());
message.setBody(msgStr);
message.setHeader(header);
ctx.writeAndFlush(message);
}
}
客户端的实现:
package com.netty.serialize.client;
import com.netty.serialize.coder.MsgDecoder;
import com.netty.serialize.coder.MsgEncoder;
import com.netty.serialize.handler.ClientHandler;
import com.netty.serialize.handler.ServerHandler;
import com.netty.serialize.marshalling.MarshallingCodeCFactory;
import com.netty.serialize.message.Message;
import com.netty.serialize.message.MsgHeader;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* Created by sdc on 2017/8/26.
*/
public class MsgClient {
public void connect(String ip, int port) throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
// Message message = new Message();
// String msgStr = "我想发送一条消息";
// MsgHeader header = new MsgHeader();
// header.setStartTag(new Byte("0"));
// header.setCmdCode("1234".getBytes());
// header.setLength(msgStr.length());
// header.setVersion("11".getBytes());
//
// message.setBody(msgStr);
// message.setHeader(header);
try {
Bootstrap bs = new Bootstrap();
bs.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)//
.handler(new ChildChannelHandler());
ChannelFuture f = bs.connect(ip,port).sync();
//写入消息
// f.channel().writeAndFlush(message).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
public static class ChildChannelHandler extends ChannelInitializer {
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
channel.pipeline().addLast(new ClientHandler());
}
}
public static void main(String[] args){
try {
new MsgClient().connect("127.0.0.1", 8080);
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.netty.serialize.handler;
import com.netty.serialize.message.Message;
import com.netty.serialize.message.MsgHeader;
import io.netty.channel.*;
import io.netty.util.ReferenceCountUtil;
/**
* Created by sdc on 2017/8/29.
*/
public class ClientHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Message message = new Message();
String msgStr = "我想发送一条消息";
MsgHeader header = new MsgHeader();
header.setStartTag(new Byte("0"));
header.setCmdCode("1234".getBytes());
header.setLength(msgStr.length());
header.setVersion("11".getBytes());
message.setBody(msgStr);
message.setHeader(header);
ctx.writeAndFlush(message).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
// do sth
System.out.println("成功发送到服务端消息");
} else {
// do sth
System.out.println("失败服务端消息失败");
}
}
});
// ctx.writeAndFlush(message);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// ctx.close();
super.exceptionCaught(ctx, cause);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Message newMsg = (Message) msg;
System.out.println("收到服务端的内容" + newMsg);
}finally {
ReferenceCountUtil.release(msg);
}
}
}
传输的POJO的类,是自定义的封装好的信息。
package com.netty.serialize.message;
import java.io.Serializable;
/**
* Created by sdc on 2017/8/26.
*/
public class Message implements Serializable{
/**
*
*/
private static final long serialVersionUID = 4923081103118853877L;
private MsgHeader header;
private Object body;
//检验和
// private byte crcCode;
// public byte getCrcCode() {
// return crcCode;
// }
//
// public void setCrcCode(byte crcCode) {
// this.crcCode = crcCode;
// }
public MsgHeader getHeader() {
return header;
}
public void setHeader(MsgHeader header) {
this.header = header;
}
public Object getBody() {
return body;
}
public void setBody(Object body) {
this.body = body;
}
@Override
public String toString() {
return "Message{" +
"header=" + header +
", body=" + body +
// ", crcCode=" + crcCode +
'}';
}
}
package com.netty.serialize.message;
import java.io.Serializable;
import java.util.Arrays;
/**
* Created by sdc on 2017/8/26.
*/
public class MsgHeader implements Serializable{
/**
*
*/
private static final long serialVersionUID = 4923081103118853877L;
//固定头
private byte startTag;
//命令码,4位
private byte[] cmdCode;
//版本 2位
private byte[] version;
private int length;
public byte[] getVersion() {
return version;
}
public void setVersion(byte[] version) {
this.version = version;
}
public byte[] getCmdCode() {
return cmdCode;
}
public void setCmdCode(byte[] cmdCode) {
this.cmdCode = cmdCode;
}
public byte getStartTag() {
return startTag;
}
public void setStartTag(byte startTag) {
this.startTag = startTag;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
@Override
public String toString() {
return "MsgHeader{" +
"startTag=" + startTag +
", cmdCode=" + Arrays.toString(cmdCode) +
", version=" + Arrays.toString(version) +
", length=" + length +
'}';
}
}
到此就完事了,netty的版本,和marshalling的版本,其他的版本我不清楚会不会有什么错误。
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!--netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
<!--jboss-marshalling -->
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>2.0.0.Beta2</version>