Java NIO 25 Mar 2014
Blocking IO
JDK早期提供了java.net
下Socket
相关类以及java.io
来实现网络通信相关的功能,具体实现如下:
public class EchoServer {
public void serve(int port) throws IOException {
final ServerSocket socket = new ServerSocket(port); // 使用ServerSocket绑定到一个端口
while (true) {
final Socket clientSocket = socket.accept(); // 阻塞主线程直到一个新的连接被接受
System.out.println("EchoServer: Accept a new connection from " + clientSocket);
new Thread(new Runnable() { // 新建一个线程来处理该客户端连接
@Override
public void run() {
try {
BufferedReader br = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);
while(true) {
writer.println(br.readLine());
writer.flush();
} // 服务端从客户端读取数据,并将其重新写回给客户端
} catch (IOException e) {
e.printStackTrace();
try {
clientSocket.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}).start();
}
}
}
从上面代码可以看出,客户端连接数取决于JVM可以创建的线程数。当同时有几千的客户端同时连接到该服务端时,这会导致极严重的问题。
Java NIO(New IO or Non-blocking IO)
JDK7中引入了新的NIO实现,被称为NIO2,新的NIO相比于旧NIO在API与实现上都做了改动,但与旧NIO并不是完全不同,新旧NIO都使用了一个叫做ByteBuffer的抽象来作为数据容器。
ByteBuffer
ByteBuffer
的基本使用包含以下几种:
- 向
ByteBuffer
写入数据 - 调用
ByteBuffer
的flip
方法,从写模式切换到读模式 - 从
ByteBuffer
中读取数据 - 调用
ByteBuffer
的clear
方法或者compact
方法
当你向ByteBuffer
中写入数据时,它会更新自身的position
来记录写入数据的数量,当完成写入后,调用它的flip
方法,从写模式切换到读模式,其内部将limit
更新为当前写入的位置,再将position
置为0,这样你就可以读取所有已写入的数据。需要再写入数据时,调用flip
切换到写模式,然后调用clear
方法或compact
方法。
clear
方法-清空整个Buffer
compact
方法-清空Buffer中已读取的部分,然后将未读部分移动到Buffer的开头
Direct vs. Indirect
一个ByteBuffer
可以是direct的,也可以是非direct的。如果一个ByteBuffer
是direct的,JVM会尽可能使用JNI来操作它,这样可以避免在传递数据时将buffer中的数据拷贝到一个中间临时Buffer的过程,故而可以在一定程度上提高ByteBuffer的效率。另外DirectByteBuffer
由ByteBuffer.allocateDirect
方法创建,它的创建以及重新分配花销要比非direct ByteBuffer要高,其存储在JVM堆栈之外的一块独立内存区域中,当然,这也使得ByteBuffer
不能被GC回收。Oracle在Java API中建议DirectByteBuffer
主要用于长期存在的,占用空间较大的依赖于操作系统native IO的Buffer。由于DirectByteBuffer
受限于JVM所运行的环境,所以在一般情况下,尽量使用非direct ByteBuffer,除非你的应用对性能要求特别高。
ByteBuffer使用示范
Channel inChannel = ....;
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = -1;
do {
// 从channel中读取数据到ByteBuffer,返回值为从channel中读取的字节数,可能为0,如果为-1,则表示读取到channel的末端
bytesRead = inChannel.read(buf);
if (bytesRead != -1) {
buf.flip(); // 将ByteBuffer切换到读模式
while(buf.hasRemaining()){
System.out.print((char) buf.get()); // 从Buffer中读取字节流,每一次get方法调用,都从buffer中读取一个字节
}
buf.clear(); // 读取完所有内容后,清空ByteBuffer
}
} while (bytesRead != -1);
inChannel.close();
NIO Selector
- 创建一个或多个
Selector
,SocketChannel
可以注册到这些selector - 在为channel注册selector时,可以指定只关注哪些事件。一共有4种事件可用
OP_ACCEPT
:ServerSocket
接受一个新连接OP_CONNECT
:ClientSocket
连接到服务端OP_READ
:读操作OP_WRITE
:写操作
- 当channels被注册后,调用
selector.select
方法来阻塞当前线程,直到上述中的事件被选择 - 当select方法返回后,通过
selector.selectedKeys
方法获取所有SelectionKey
(可以获取被注册的channel和对应的事件)实例,然后对其进行操作
一个channel代表一个connection,由于一个selector可以注册给多个channel,因此使用NIO Selector处理该类问题将不再像上面的阻塞实例中一样受线程数影响。
EchoServer based on NIO
public class NioEchoServer {
public void serve(int port) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ServerSocket serverSocket = ssc.socket();
serverSocket.bind(new InetSocketAddress(port));
ssc.configureBlocking(false); // 绑定serverChannel到端口,并设置为非阻塞方式
Selector selector = Selector.open();
// 新建一个selector,并将serverChannel注册给selector,并只关注是否有新连接被server接受
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select(); // 调用selector.select方法,阻塞当前线程
// 当accept事件发生时,select方法返回,调用selector.selectedKeys方法获取所有SelectionKey
Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> iter = keySet.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove(); // 迭代整个keySet,并将key从iterator中移除
try {
if (key.isAcceptable()) {
SocketChannel client = ssc.accept(); // 接受客户端连接,获取客户端channel
System.out.println("NioEchoServer: Accept a new connection from " + client);
client.configureBlocking(false);
// 将客户端的channel注册到selector,并指定关注的事件为读写操作,
// 分配一个ByteBuffer空间作为SelectionKey的attachment
client.register(selector, SelectionKey.OP_READ
| SelectionKey.OP_WRITE, ByteBuffer.allocate(100));
}
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
client.read(buffer); // 从client中读取数据到buffer
}
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.flip();
client.write(buffer); // 将buffer切换为读模式,再将buffer中的数据写给client
buffer.compact();
}
} catch (IOException e) {
e.printStackTrace();
key.cancel();
try {
key.channel().close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
}
}
NIO2
在NIO2中,只需要发起一个IO操作,并提供一个回调CompletionHandler来处理IO操作完成后的相应逻辑,相比于旧的NIO,NIO2中不需要再时时监测IO状态的变化,一旦操作系统完成IO操作,就会调用我们提供的回调方法,而具体这一过程的细节对于开发者来说是隐藏的。
EchoServer based on NIO2
public class Nio2EchoServer {
public void serve(int port) throws IOException {
final AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open();
assc.bind(new InetSocketAddress(port)); // 绑定Server到指定端口
CountDownLatch latch = new CountDownLatch(1);
// Server开始接受新的Client连接,并指定一个CompletionHandler实例来处理这些连接
assc.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel client, Object attachment) {
assc.accept(null, this); // 一个连接请求过来后,Server接受该请求
ByteBuffer buffer = ByteBuffer.allocate(100);
// 从client端读取数据到buffer,并将buffer附加到IO操作,传递给EchoCompletionHandler实例
client.read(buffer, buffer, new EchoCompletionHandler(client));
}
@Override
public void failed(Throwable exc, Object attachment) {
try {
assc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private class EchoCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel clientChannel;
private EchoCompletionHandler(AsynchronousSocketChannel client) {
this.clientChannel = client;
}
// result为从client读取到buffer的字节数,buffer为上面read操作传递过来的attachment
@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip(); // 完成从客户端的读操作后,切换buffer的模式为读模式
// 将buffer的数据写给client,并将该buffer传递给CompletionHandler实例
clientChannel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
// result为写给client的字节数,buffer为write操作的attachment(第二个参数)
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
clientChannel.write(buffer, buffer, this); // 如果buffer中还有数据,继续将数据写给client
} else {
buffer.compact();
// 如果buffer中没有数据则切换回写模式,并从client读取数据,并指定EchoCompletionHandler实体
clientChannel.read(buffer, buffer, EchoCompletionHandler.this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
try {
clientChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
clientChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
可以看出,虽然NIO2的实现使用了更多代码,但是其逻辑确比旧的NIO的实现更容易理解与维护。