Netty搭建TCP服务器实践(2)

childHandler:设置将被添加到已被接收的子Channel的ChannelPipeline中的ChannelHandler,其实就是让你在里面定义处理连接收发数据,需要哪些ChannelHandler按什么顺序去处理;

第二步接下来我们实现ServerChannelInitializer类,这个类继承实现自netty的ChannelInitializer抽象类,这个类的作用就是对channel(连接)的ChannelPipeline进行初始化工作,说白了就是你要把处理数据的方法添加到这个任务链中去,netty才知道每一步拿着socket连接和数据去做什么。

@ChannelHandler.Sharable
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    static final EventExecutorGroup group = new DefaultEventExecutorGroup(2);
   
    public ServerChannelInitializer() throws InterruptedException {
    }
   
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {   
        ChannelPipeline pipeline = socketChannel.pipeline();
        //IdleStateHandler心跳机制,如果超时触发Handle中userEventTrigger()方法
        pipeline.addLast("idleStateHandler",
                new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES));
        // netty基于分割符的自带解码器,根据提供的分隔符解析报文,这里是0x7e;1024表示单条消息的最大长度,解码器在查找分隔符的时候,达到该长度还没找到的话会抛异常
//        pipeline.addLast(
//                new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(new byte[] { 0x7e }),
//                        Unpooled.copiedBuffer(new byte[] { 0x7e })));
        //自定义编解码器
        pipeline.addLast(
                new MessagePacketDecoder(),
                new MessagePacketEncoder()
                );
        //自定义Hadler
        pipeline.addLast("handler",new TCPServerHandler());
        //自定义Hander,可用于处理耗时操作,不阻塞IO处理线程
        pipeline.addLast(group,"BussinessHandler",new BussinessHandler());
    }
}

这里我们注意下

pipeline.addLast(group,"BussinessHandler",new BussinessHandler());

在这里我们可以把一些比较耗时的操作(如存储、入库)等操作放在BussinessHandler中进行,因为我们为它单独分配了EventExecutorGroup 线程池执行,所以说即使这里发生阻塞,也不会影响TCPServerHandler中数据的接收。

最后就是各个部分的具体实现

解码器的实现:

public class MessagePacketDecoder extends ByteToMessageDecoder
{

public MessagePacketDecoder() throws Exception
    {
    }

@Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception
    {
        try {
            if (buffer.readableBytes() > 0) {
                // 待处理的消息包
                byte[] bytesReady = new byte[buffer.readableBytes()];
                buffer.readBytes(bytesReady);
                //这之间可以进行报文的解析处理
                out.add(bytesReady );
            }
        }finally {
           
        }
    }


}

编码器的实现

public class MessagePacketEncoder extends MessageToByteEncoder<Object>
{
    public MessagePacketEncoder()
    {
    }

@Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception
    {
        try {
            //在这之前可以实现编码工作。
            out.writeBytes((byte[])msg);
        }finally {
           
        } 
    }
}

TCPServerHandler的实现

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/419f533dc1b1d9c1e17743c4458fd603.html