Java NIO 25 Mar 2014

Blocking IO

JDK早期提供了java.netSocket相关类以及java.io来实现网络通信相关的功能,具体实现如下:

public class EchoServer {
    public void serve(int port) throws IOException {
        final ServerSocket socket = new ServerSocket(port);   // 使用ServerSocket绑定到一个端口
        while (true) {
            final Socket clientSocket = socket.accept();   // 阻塞主线程直到一个新的连接被接受
            System.out.println("EchoServer: Accept a new connection from " + clientSocket);
            new Thread(new Runnable() {   // 新建一个线程来处理该客户端连接
                @Override
                public void run() {
                    try {
                        BufferedReader br = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                        PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);
                        while(true) {
                            writer.println(br.readLine());
                            writer.flush();
                        }                    // 服务端从客户端读取数据,并将其重新写回给客户端
                    } catch (IOException e) {
                        e.printStackTrace();
                        try {
                            clientSocket.close();
                        } catch (IOException e1) {
                            e1.printStackTrace();
                        }
                    }
                }
            }).start();
        }
    }
}

从上面代码可以看出,客户端连接数取决于JVM可以创建的线程数。当同时有几千的客户端同时连接到该服务端时,这会导致极严重的问题。

Java NIO(New IO or Non-blocking IO)

JDK7中引入了新的NIO实现,被称为NIO2,新的NIO相比于旧NIO在API与实现上都做了改动,但与旧NIO并不是完全不同,新旧NIO都使用了一个叫做ByteBuffer的抽象来作为数据容器。

ByteBuffer

ByteBuffer的基本使用包含以下几种:

  • ByteBuffer写入数据
  • 调用ByteBufferflip方法,从写模式切换到读模式
  • ByteBuffer中读取数据
  • 调用ByteBufferclear方法或者compact方法

当你向ByteBuffer中写入数据时,它会更新自身的position来记录写入数据的数量,当完成写入后,调用它的flip方法,从写模式切换到读模式,其内部将limit更新为当前写入的位置,再将position置为0,这样你就可以读取所有已写入的数据。需要再写入数据时,调用flip切换到写模式,然后调用clear方法或compact方法。

clear方法-清空整个Buffer compact方法-清空Buffer中已读取的部分,然后将未读部分移动到Buffer的开头

Direct vs. Indirect

一个ByteBuffer可以是direct的,也可以是非direct的。如果一个ByteBuffer是direct的,JVM会尽可能使用JNI来操作它,这样可以避免在传递数据时将buffer中的数据拷贝到一个中间临时Buffer的过程,故而可以在一定程度上提高ByteBuffer的效率。另外DirectByteBufferByteBuffer.allocateDirect方法创建,它的创建以及重新分配花销要比非direct ByteBuffer要高,其存储在JVM堆栈之外的一块独立内存区域中,当然,这也使得ByteBuffer不能被GC回收。Oracle在Java API中建议DirectByteBuffer主要用于长期存在的,占用空间较大的依赖于操作系统native IO的Buffer。由于DirectByteBuffer受限于JVM所运行的环境,所以在一般情况下,尽量使用非direct ByteBuffer,除非你的应用对性能要求特别高。

ByteBuffer使用示范

Channel inChannel = ....;
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = -1;
do {
  // 从channel中读取数据到ByteBuffer,返回值为从channel中读取的字节数,可能为0,如果为-1,则表示读取到channel的末端
  bytesRead = inChannel.read(buf);  
  if (bytesRead != -1) {
    buf.flip();  // 将ByteBuffer切换到读模式
    while(buf.hasRemaining()){
      System.out.print((char) buf.get());  // 从Buffer中读取字节流,每一次get方法调用,都从buffer中读取一个字节
    }
    buf.clear();   // 读取完所有内容后,清空ByteBuffer
  }
} while (bytesRead != -1);
inChannel.close();

NIO Selector

  1. 创建一个或多个SelectorSocketChannel可以注册到这些selector
  2. 在为channel注册selector时,可以指定只关注哪些事件。一共有4种事件可用
    • OP_ACCEPT: ServerSocket接受一个新连接
    • OP_CONNECT: ClientSocket连接到服务端
    • OP_READ:读操作
    • OP_WRITE:写操作
  3. 当channels被注册后,调用selector.select方法来阻塞当前线程,直到上述中的事件被选择
  4. 当select方法返回后,通过selector.selectedKeys方法获取所有SelectionKey(可以获取被注册的channel和对应的事件)实例,然后对其进行操作

一个channel代表一个connection,由于一个selector可以注册给多个channel,因此使用NIO Selector处理该类问题将不再像上面的阻塞实例中一样受线程数影响。

EchoServer based on NIO

public class NioEchoServer {
    public void serve(int port) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ServerSocket serverSocket = ssc.socket();
        serverSocket.bind(new InetSocketAddress(port)); 
        ssc.configureBlocking(false);    // 绑定serverChannel到端口,并设置为非阻塞方式
        Selector selector = Selector.open();
        // 新建一个selector,并将serverChannel注册给selector,并只关注是否有新连接被server接受
        ssc.register(selector, SelectionKey.OP_ACCEPT); 
        while (true) {
            selector.select();   // 调用selector.select方法,阻塞当前线程
            // 当accept事件发生时,select方法返回,调用selector.selectedKeys方法获取所有SelectionKey
            Set<SelectionKey> keySet = selector.selectedKeys(); 
            Iterator<SelectionKey> iter = keySet.iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();            // 迭代整个keySet,并将key从iterator中移除
                try {
                    if (key.isAcceptable()) {
                        SocketChannel client = ssc.accept();  // 接受客户端连接,获取客户端channel
                        System.out.println("NioEchoServer: Accept a new connection from " + client);
                        client.configureBlocking(false);
                        // 将客户端的channel注册到selector,并指定关注的事件为读写操作,
                        // 分配一个ByteBuffer空间作为SelectionKey的attachment
                        client.register(selector, SelectionKey.OP_READ 
                                                | SelectionKey.OP_WRITE, ByteBuffer.allocate(100));                     
                    }
                    if (key.isReadable()) {
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        client.read(buffer);    // 从client中读取数据到buffer
                    }
                    if (key.isWritable()) {
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        buffer.flip();
                        client.write(buffer);  // 将buffer切换为读模式,再将buffer中的数据写给client
                        buffer.compact();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }
                }
            }
        }
    }
}

NIO2

在NIO2中,只需要发起一个IO操作,并提供一个回调CompletionHandler来处理IO操作完成后的相应逻辑,相比于旧的NIO,NIO2中不需要再时时监测IO状态的变化,一旦操作系统完成IO操作,就会调用我们提供的回调方法,而具体这一过程的细节对于开发者来说是隐藏的。

EchoServer based on NIO2

public class Nio2EchoServer {
    public void serve(int port) throws IOException {
        final AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open();
        assc.bind(new InetSocketAddress(port));      // 绑定Server到指定端口
        CountDownLatch latch = new CountDownLatch(1); 
        // Server开始接受新的Client连接,并指定一个CompletionHandler实例来处理这些连接 
        assc.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {     
            @Override
            public void completed(AsynchronousSocketChannel client, Object attachment) {
                assc.accept(null, this);           // 一个连接请求过来后,Server接受该请求
                ByteBuffer buffer = ByteBuffer.allocate(100);
                // 从client端读取数据到buffer,并将buffer附加到IO操作,传递给EchoCompletionHandler实例
                client.read(buffer, buffer, new EchoCompletionHandler(client));   
            }
            @Override
            public void failed(Throwable exc, Object attachment) {
                try {
                    assc.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    private class EchoCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
        private AsynchronousSocketChannel clientChannel;
        private EchoCompletionHandler(AsynchronousSocketChannel client) {
            this.clientChannel = client;
        }
        // result为从client读取到buffer的字节数,buffer为上面read操作传递过来的attachment
        @Override
        public void completed(Integer result, ByteBuffer buffer) {     
            buffer.flip();      // 完成从客户端的读操作后,切换buffer的模式为读模式
 
            // 将buffer的数据写给client,并将该buffer传递给CompletionHandler实例
            clientChannel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {    
                               
                // result为写给client的字节数,buffer为write操作的attachment(第二个参数)
                @Override
                public void completed(Integer result, ByteBuffer buffer) {
                    if (buffer.hasRemaining()) {
                        clientChannel.write(buffer, buffer, this);    // 如果buffer中还有数据,继续将数据写给client
                    } else {
                        buffer.compact();
                        // 如果buffer中没有数据则切换回写模式,并从client读取数据,并指定EchoCompletionHandler实体
                        clientChannel.read(buffer, buffer, EchoCompletionHandler.this); 
                    }
                }
                @Override
                public void failed(Throwable exc, ByteBuffer buffer) {
                    try {
                        clientChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            try {
                clientChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

可以看出,虽然NIO2的实现使用了更多代码,但是其逻辑确比旧的NIO的实现更容易理解与维护。