JAVA NIO工作原理及代码实例分析(6)

select(long timeout),最多阻塞timeout毫秒,即使没有通道就绪也会返回,若超时返回,则当前线程中断标志位被设置。若阻塞时间内有通道就绪,就提前返回。

seletor.selectNow(),非阻塞方法。

一个seletor对象内部维护了三个集合。

1)已注册集合:表示了所有已注册通道的SelectionKey对象。

2)就绪集合:表示了所有已就绪通道的SelectionKey对象。

3)取消集合:表示了所有需要取消注册关系的通道的SelectionKey对象。

SelectionKey的cancel方法用于取消通道和选择器的注册关系,这个方法只是把表示当前通道的SelectionKey放入取消集合中,下次调用select方法时才会真正取消注册关系。

select方法每次会从已注册的通道集合中删除所有已取消的通道的SelectionKey,然后清空已取消的通道集合,最后从更新过的已注册通道集合中选出就绪的通道,放入已就绪的集合中。每次调用select方法,会向已就绪的集合中放入已就绪通道的SelectionKey对象,调用selectedKeys 方法就会返回这个已就绪通道集合的引用。当我们处理完一个已就绪通道,该通道对应的SelectionKey对象仍然位于已就绪的集合中,这就要求我们处理一个已就绪的通道后就必须手动从已就绪的集合中删除它,否则下次调用selectedKeys时,已处理过的通道还存在于这个集合中,导致线程空转。这里也是极易产生bug的。

4.3通道的写方法注意事项

1)写方法什么时候就绪?

写操作的就绪条件为socket底层写缓冲区有空闲空间,此时并不代表我们这时有(或者需要将)数据写入通道。而底层写缓冲区绝大部分时间都是有空闲空间的,所以当你注册写事件后,写操作基本一直是就绪的。这就导致只要有一个通道对写事件感兴趣,select方法几乎总是立刻返回的,但是实际上我们可能没有数据可写的,所以使得调用select方法的线程总是空转。对于客户端发送一些数据,客户端返回一些数据的模型,我们可以在读事件完成后,再设置通道对写事件感兴趣,写操作完成后再取消该通道对写事件的兴趣,这样就可以避免上述问题。

2)如何正确的发送数据

while(writeBuffer.hasRemaining()){
    channel.write(writeBuffer);
}

上面发送数据的通常用的代码,当网络状况良好的情况下,这段代码能正常工作。 现在我们考虑一种极端情况,服务器端写事件就绪,我们向底层的写缓冲区写入一些数据后,服务器端到客户端的链路出现问题,服务器端没能把数据发送出去,此时底层的写缓冲区一直处于满的状态,假设writeBuffer中仍然还有没发送完的数据就会导致while循环空转,浪费CPU资源,同时也妨碍这个selector管理的其它通道的读写。

为了解决个问题,我们应该使用下面的方法发送数据

int len = 0;
while(writeBuffer.hasRemaining()){
    len = sc.write(writeBuffer);
    /*说明底层的socket写缓冲已满*/
    if(len == 0){
        break;
    }
}

5. 代码示例
下面这个类,后面的代码都会用到,它只是两个缓冲区的包装

package nioDemo;
 
import java.nio.ByteBuffer;
 
/*自定义Buffer类中包含读缓冲区和写缓冲区,用于注册通道时的附加对象*/
public class Buffers {
 
    ByteBuffer readBuffer;
    ByteBuffer writeBuffer;
   
    public Buffers(int readCapacity, int writeCapacity){
        readBuffer = ByteBuffer.allocate(readCapacity);
        writeBuffer = ByteBuffer.allocate(writeCapacity);
    }
   
    public ByteBuffer getReadBuffer(){
        return readBuffer;
    }
   
    public ByteBuffer gerWriteBuffer(){
        return writeBuffer;
    }
}

5.1 TCP非阻塞示例

1)服务器端代码

package nioDemo;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
 
 
/*服务器端,:接收客户端发送过来的数据并显示,
 *服务器把上接收到的数据加上"echo from service:"再发送回去*/
public class ServiceSocketChannelDemo {
   
    public static class TCPEchoServer implements Runnable{
       
        /*服务器地址*/
        private InetSocketAddress localAddress;
       
        public TCPEchoServer(int port) throws IOException{
            this.localAddress = new InetSocketAddress(port);
        }
       
       
        @Override
        public void run(){
           
            Charset utf8 = Charset.forName("UTF-8");
           
            ServerSocketChannel ssc = null;
            Selector selector = null;
           
            Random rnd = new Random();
           
            try {
                /*创建选择器*/
                selector = Selector.open();
               
                /*创建服务器通道*/
                ssc = ServerSocketChannel.open();
                ssc.configureBlocking(false);
               
                /*设置监听服务器的端口,设置最大连接缓冲数为100*/
                ssc.bind(localAddress, 100);
               
                /*服务器通道只能对tcp链接事件感兴趣*/
                ssc.register(selector, SelectionKey.OP_ACCEPT);
               
            } catch (IOException e1) {
                System.out.println("server start failed");
                return;
            }
           
            System.out.println("server start with address : " + localAddress);
           
            /*服务器线程被中断后会退出*/
            try{
               
                while(!Thread.currentThread().isInterrupted()){
                   
                    int n = selector.select();
                    if(n == 0){
                        continue;
                    }
 
                    Set<SelectionKey> keySet = selector.selectedKeys();
                    Iterator<SelectionKey> it = keySet.iterator();
                    SelectionKey key = null;
                   
                    while(it.hasNext()){
                           
                        key = it.next();
                        /*防止下次select方法返回已处理过的通道*/
                        it.remove();
                       
                        /*若发现异常,说明客户端连接出现问题,但服务器要保持正常*/
                        try{
                            /*ssc通道只能对链接事件感兴趣*/
                            if(key.isAcceptable()){
                               
                                /*accept方法会返回一个普通通道,
                                    每个通道在内核中都对应一个socket缓冲区*/
                                SocketChannel sc = ssc.accept();
                                sc.configureBlocking(false);
                               
                                /*向选择器注册这个通道和普通通道感兴趣的事件,同时提供这个新通道相关的缓冲区*/
                                int interestSet = SelectionKey.OP_READ;                           
                                sc.register(selector, interestSet, new Buffers(256, 256));
                               
                                System.out.println("accept from " + sc.getRemoteAddress());
                            }
                           
                           
                            /*(普通)通道感兴趣读事件且有数据可读*/
                            if(key.isReadable()){
                               
                                /*通过SelectionKey获取通道对应的缓冲区*/
                                Buffers  buffers = (Buffers)key.attachment();
                                ByteBuffer readBuffer = buffers.getReadBuffer();
                                ByteBuffer writeBuffer = buffers.gerWriteBuffer();
                               
                                /*通过SelectionKey获取对应的通道*/
                                SocketChannel sc = (SocketChannel) key.channel();
                               
                                /*从底层socket读缓冲区中读入数据*/
                                sc.read(readBuffer);
                                readBuffer.flip();
                               
                                /*解码显示,客户端发送来的信息*/
                                CharBuffer cb = utf8.decode(readBuffer);
                                System.out.println(cb.array());
                   
                                readBuffer.rewind();
 
                               
                                /*准备好向客户端发送的信息*/
                                /*先写入"echo:",再写入收到的信息*/
                                writeBuffer.put("echo from service:".getBytes("UTF-8"));
                                writeBuffer.put(readBuffer);
                               
                                readBuffer.clear();
                               
                                /*设置通道写事件*/
                                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
                                                               
                            }
                           
                            /*通道感兴趣写事件且底层缓冲区有空闲*/
                            if(key.isWritable()){
                               
                                Buffers  buffers = (Buffers)key.attachment();
                               
                                ByteBuffer writeBuffer = buffers.gerWriteBuffer();
                                writeBuffer.flip();
                               
                                SocketChannel sc = (SocketChannel) key.channel();
                               
                                int len = 0;
                                while(writeBuffer.hasRemaining()){
                                    len = sc.write(writeBuffer);
                                    /*说明底层的socket写缓冲已满*/
                                    if(len == 0){
                                        break;
                                    }
                                }
                               
                                writeBuffer.compact();
                               
                                /*说明数据全部写入到底层的socket写缓冲区*/
                                if(len != 0){
                                    /*取消通道的写事件*/
                                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
                                }
                               
                            }
                        }catch(IOException e){
                            System.out.println("service encounter client error");
                            /*若客户端连接出现异常,从Seletcor中移除这个key*/
                            key.cancel();
                            key.channel().close();
                        }
 
                    }
                       
                    Thread.sleep(rnd.nextInt(500));
                }
               
            }catch(InterruptedException e){
                System.out.println("serverThread is interrupted");
            } catch (IOException e1) {
                System.out.println("serverThread selecotr error");
            }finally{
                try{
                    selector.close();
                }catch(IOException e){
                    System.out.println("selector close failed");
                }finally{
                    System.out.println("server close");
                }
            }
 
        }
    }
   
    public static void main(String[] args) throws InterruptedException, IOException{
        Thread thread = new Thread(new TCPEchoServer(8080));
        thread.start();
        Thread.sleep(100000);
        /*结束服务器线程*/
        thread.interrupt();
    }
   
}

2)客户端程序

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/39980f0e5d16638c925b21f6b950f590.html