netty 入门 (3)

我们写一个Echo服务,客户端输入什么,我们就回复什么。

public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.write(msg);//1 ctx.flush();//2 } }

通过ChannelHandlerContext对象,我们可以触发一些IO事件或者执行一些操作。这里我们不需要手动release msg,因为当我们执行了wirte方法,Netty会帮我们释放。

ctx.write(Object)会把内容写到缓冲区,在调用flush后再输出出去。可以用writeAndFlush方法代替。

测试一下,发送3个字节,收到3个字节的回复。

pic

写一个TIME Server

这个例子用来实现一个Time协议。通过实现这个协议,我们可以了解Netty如何构造发送数据。根据RFC868协议,Time协议有这么几步

服务器监听37端口

客户端连接

服务端返回一个4字节的int时间数据

客户端接收到这个数据

客户端关闭连接

服务端关闭连接。

这里服务端忽略收到的任何客户端数据,而是当客户端一建立连接就返回数据,所以这里不使用channelRead方法,而是channelActive方法。

package io.netty.example.time; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception {//1 final ByteBuf timeBuf = ctx.alloc().buffer(4);//2 timeBuf.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture channelFuture = ctx.writeAndFlush(timeBuf);//3 channelFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { assert channelFuture == future; ctx.close(); } });//4 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

重写的是channelActive方法,这个方法会在连接进来的时候调用。

因为要返回一个int值,所以需要4个字节,通过ChannelHandlerContext分配,然后writeAndFlush方法写入并发送。

把数据发送给非阻塞IO流的时候不需要调用java.nio.ByteBuffer.flip()方法,Netty的ByteBuf没有提供这个方法,因为他不需要。ByteBuf内部有两个指针,一个用于读,一个用于写。write的时候读指针移动,写指针不动,反之同理。在使用ByteBuffer的时候如果没有flip,数据就会乱。

Netty里面所有的IO操作都是异步的,这样可能会导致write没有开始(或者没有完成)之前就连接就close掉了。比如下面的代码:

Channel ch = ...; ch.writeAndFlush(message); ch.close();//这也不是立马关闭,也是一个ChannelFuture对象

write(writeAndFlush)返回的是一个ChannelFuture对象,来大致看下这个对象的解释。

pic

继承自Future,表示一个Channel的IO操作的结果,不过他还没完成,只是表示已经创建。【详细的以后再讲。】

如何能知道这个IO操作的结果呢?我们可以给这个ChannelFuture增加一个ChannelFutureListener的实例(接口),然后实现它的operationComplete方法。这里面的方法比较简单,就是close掉这个ChannelHandlerContext,所以,可以使用定义好的ChannelFutureListener.CLOSE方法。像下面这样

channelFuture.addListener(ChannelFutureListener.CLOSE);

用rdate 测试一下。测试通过。

pic

写一个TIME Client

写完server之后就要写client了。client程序和server程序最大的不同在于选择的Bootstrap和Channel的实现类的差异。

package io.netty.example.time; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class TimeClient { public static void main(String[] args) throws InterruptedException { int port = 37; String host = "192.168.1.181"; EventLoopGroup workerGroup = new NioEventLoopGroup(); try { final Bootstrap bootstrap = new Bootstrap();//1 bootstrap.group(workerGroup);//2 bootstrap.channel(NioSocketChannel.class);//3 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);//4 bootstrap.handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler());//6 } }); final ChannelFuture connectFuture = bootstrap.connect(host, port).sync();//5 connectFuture.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }

Bootstrap和前面的ServerBootstrap类似,不过这个不是给non-server非服务器用,而是给客户端或者connectionless非连接的用。

客户端就不需要boss EventLoopGroup了。其实前面的Server中group能用同一个。

serverBootstrap.group(workGroup, workGroup);//同一个group

channel也需要换成NioSocketChannel,而不是NioServerSocketChannel。

这里直接用option方法,而不是childOption和option,因为对应client,没有childOption的概念。

client需要去connect,而不是bind来监听。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsxffs.html