服务端
/** * * @param args * @throws IOException */ public static void main(String[] args) throws IOException { int port = 8080; if(args != null &&args.length >0){ try{ port = Integer.valueOf(args[0]); }catch (NumberFormatException ex){ //采用默认值 } } AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port); new Thread(timeServer,"AIO-AsyncTimeServerHandler-001").start(); } import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.AsynchronousServerSocketChannel; import java.util.concurrent.CountDownLatch; public class AsyncTimeServerHandler implements Runnable { private int port; CountDownLatch latch; AsynchronousServerSocketChannel asynchronousServerSocketChannel; public AsyncTimeServerHandler(int port){ this.port = port; try{ //创建一个异步的服务端通道 asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(); //绑定端口 asynchronousServerSocketChannel.bind(new InetSocketAddress(port)); System.out.println("The time server is start in port:"+ port); }catch (IOException e){ e.printStackTrace(); } } public void run(){ latch = new CountDownLatch(1); doAccept(); try{ latch.await();//允许当前线程阻塞,防止服务端执行完退出 }catch (InterruptedException e){ e.printStackTrace(); } } public void doAccept(){ //传递一个CompletionHandler实例来接收通知 asynchronousServerSocketChannel.accept(this,new AcceptCompletionHandler()); } } import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> { @Override public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) { //继续接收 attachment.asynchronousServerSocketChannel.accept(attachment, this); ByteBuffer buffer = ByteBuffer.allocate(1024); result.read(buffer, buffer, new ReadCompletionHandler(result)); } @Override public void failed(Throwable exc, AsyncTimeServerHandler attachment) { exc.printStackTrace(); attachment.latch.countDown(); } } import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel channel; public ReadCompletionHandler(AsynchronousSocketChannel channel){ if(this.channel == null){ this.channel = channel; } } @Override public void completed(Integer result,ByteBuffer attachment){ attachment.flip(); byte[] body = new byte[attachment.remaining()]; attachment.get(body); try{ String req = new String(body,"UTF-8"); System.out.println("The time server receive order:"+req); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req)? new java.util.Date(System.currentTimeMillis()).toString():"BAD ORDER"; doWrite(currentTime); }catch (UnsupportedEncodingException e){ e.printStackTrace(); } } private void doWrite(String currentTime){ if(currentTime !=null && currentTime.trim().length()>0){ byte[] bytes = (currentTime).getBytes(); final ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { //如果没有发送完成,继续发送 if(buffer.hasRemaining()) channel.write(buffer,buffer,this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try{ channel.close(); }catch (IOException e){ //ingnore on close } } }); } } public void failed(Throwable exc,ByteBuffer attachment){ try{ this.channel.close(); }catch (IOException e){ e.printStackTrace(); } } }客户端
/** * * @param args * @throws IOException */ public static void main(String[] args) throws IOException { int port = 8080; if(args != null &&args.length >0){ try{ port = Integer.valueOf(args[0]); }catch (NumberFormatException ex){ //采用默认值 } } new Thread(new AsyncTimeClientHandler("127.0.0.1",port),"AIO-AsyncTimeClientHandler-001").start(); } import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncTimeClientHandler implements CompletionHandler<Void,AsyncTimeClientHandler>,Runnable { private AsynchronousSocketChannel client; private String host; private int port; private CountDownLatch latch; public AsyncTimeClientHandler(String host,int port){ this.host = host; this.port = port; try{ client = AsynchronousSocketChannel.open(); }catch (IOException e){ e.printStackTrace(); } } @Override public void run(){ latch = new CountDownLatch(1); client.connect(new InetSocketAddress(host,port),this,this); try{ latch.await(); }catch (InterruptedException el){ el.printStackTrace(); } try{ client.close(); }catch (IOException e){ e.printStackTrace(); } } @Override public void completed(Void result,AsyncTimeClientHandler attachment){ byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, final ByteBuffer buffer) { if(buffer.hasRemaining()){ client.write(buffer,buffer,this); }else{ ByteBuffer readBuffer = ByteBuffer.allocate(1024); client.read( readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); byte[] bytes = new byte[attachment.remaining()]; attachment.get(bytes); String body; try{ body = new String(bytes,"UTF-8"); System.out.println("Now is:"+body); latch.countDown(); }catch (UnsupportedEncodingException e){ e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try{ client.close(); latch.countDown(); }catch (IOException e){ //ingnore on close } } } ); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try{ client.close(); latch.countDown(); }catch (IOException e){ //ingnore on close } } }); } @Override public void failed(Throwable exc,AsyncTimeClientHandler attachment){ exc.printStackTrace(); try{ client.close(); latch.countDown(); }catch (IOException e){ e.printStackTrace(); } } } GitHub地址Java-DEMO/nettys/