netty 入门 (5)

TimeClientHandler2还是保持简单。

package io.netty.example.time; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class TimeDecoder extends ByteToMessageDecoder {//1 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {//2 if (in.readableBytes()<4) return;//3 out.add(in.readBytes(4));//4 } }

ByteToMessageDecoder继承自ChannelInboundHandlerAdapter,实现了ChannelInboundHandler接口,所以这个Decoder对象也是一个ChannelHandler对象。他专门用来处理分片问题。

ByteToMessageDecoder会在有新的数据进来的时候调用decode方法,内部维护一个buffer。

ByteToMessageDecoder可以根据自己的业务逻辑来执行。

假设进来的字节数据大于4,那么他就会调用这个decode多次,每次处理4个字节。

使用POJO而不是ByteBuf

前面的例子读写数据的核心都是ByteBuf类,在ChannelHandler里面直接把object msg 转成ByteBuf,然后操作。如果我们能通过POJO来操作,那么,代码的可维护性明显会高一些。让我们来改造一下我们的代码。

第一步,定义一个UnixTime类,来表示我们要处理的对象。

package io.netty.example.time2; import java.util.Date; public class UnixTime { private final long value; public UnixTime() { this(System.currentTimeMillis() / 1000L + 2208988800L); } public UnixTime(long value) { this.value = value; } public long getValue() { return value; } @Override public String toString() { return "转换出来的时间是:"+ new Date((getValue() - 2208988800L) * 1000L).toString(); } }

第二步,我们改一下我们的TimeDecoder来产生一个UnixTime对象。

package io.netty.example.time2.client; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class TimeDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes()<4) return; out.add(new UnixTime(in.readUnsignedInt())); } }

第三步,在ChannelHandler里面我们直接按照UnixTime对象操作。

package io.netty.example.time2.client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { UnixTime time = (UnixTime) msg; System.out.println(time); ctx.close(); } }

第四步,同理,server端也可以类似的修改。

package io.netty.example.time2.server; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ChannelFuture channelFuture = ctx.writeAndFlush(new UnixTime()); channelFuture.addListener(ChannelFutureListener.CLOSE); } }

相较于以前的分配空间的操作,明显简单了许多。

第五步,现在,还缺一个东西,一个encoder,用来把UnixTime转成ByteBuf,这个是逃不开的,哈哈。

package io.netty.example.time2.server; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; public class TimeEncoder extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { UnixTime m = (UnixTime) msg; ByteBuf buffer = ctx.alloc().buffer(4); buffer.writeInt((int)m.getValue()); ctx.write(buffer,promise);//1 } }

这一行里面有一些比较重要的事情。

这里有个ChannelPromise对象,来标记write成功与否。

我们没有手动调用flush方法,因为ChannelOutboundHandlerAdapter有个flush会自动调用。

其实这个用MessageToByteEncoder泛型还能更简单一点

package io.netty.example.time2.server; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class TimeEncoder2 extends MessageToByteEncoder<UnixTime> { @Override protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) throws Exception { out.writeInt((int) msg.getValue()); } }

MessageToByteEncoder是一个ChannelOutboundHandlerAdapter的实现抽象类,专门负责把POJO对象转成ByteBuf。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsxffs.html