Java NIO 概述
NIO主要有三大核心部分: Channel(通道)
、Buffer(缓冲区)
、Selector
(选择区),区别与传统IO
基于字节流和字符流进行操作,而NIO
基于Channel
和Buffer
进行操作,数据总是从通道读取到缓存区中,或者从缓冲区写入到通道中,Selector
用与监听多个通道的事件.因此单线程可以监听多个数据通道.
Channel 和 Buffer
基本上,所有的 IO 在NIO 中都从一个Channel 开始。Channel 有点象流。 数据可以从Channel读到Buffer中,也可以从Buffer 写到Channel中。
Channel的主要实现:
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
这些实现涵盖了UDP和TCP及网络IO与文件IO.
Buffer的主要实现hexo
- ByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
- MappedByteBuffer(用于内存映射)
这些Buffer覆盖了你能通过IO发送的基本数据类型:byte, short, int, long, float, double 和 char。
Selector
Selector允许单个线程处理多个Channel,如果你的应用打开了多个连接(通道),但每个流量都很低(低流量场景),使用Selector会方便.
要使用Selector,得向Selector注册Channel,然后调用它的Select()
方法.这个方法会一直阻塞到某个注册的通道有事件就绪,一旦这个方法返回,线程就可以处理这些事件.
Buffer
使用Buffer读写数据一般遵循四个步骤
- 写入数据到
Buffer
- 调用
flip()
方法 - 从
Buffer
中读取数据 - 调用
clear()
方法或者compact()
方法
向buffer写入数据时,buffer会记录写入了多少数据,一旦要读取数据,需要通过filp()
方法将buffer切换到读模式,在读模式下,可以读取之前写入到buffer的所有数据.
一旦读完了所有数据,就需要清空缓冲区,让它可以被再次写入,有两种方法能清空缓冲区:
claer()
清空整个缓冲区compact()
清除已读过的数据,未读的会被移动至缓冲区起始处,新数据接在后面
案例:
File file = new File("test.txt");
FileChannel fc = new RandomAccessFile(file, "rw").getChannel();
//创建容量为 3 字节的缓冲区
ByteBuffer buf = ByteBuffer.allocate(3);
int bytesRead;
//读入缓冲区
while ((bytesRead = fc.read(buf)) != -1) {
System.out.println("Read-" + bytesRead);
//使Buffer状态变为准备读
buf.flip();
while (buf.hasRemaining()){
//一次读取 1 个字节
System.out.println((char) buf.get());
}
//清空缓冲区,准备重新写入
buf.clear();
}
Buffer(缓冲区)的本质是一块可以写入数据,然后可以从中读取数据的内存,这块内存被包装为NIO Buffer对象,提供了一组方法,用来方便访问该块内存.
为了理解Buffer的工作原理,需要熟悉它的三个属性:
- capacity
- position
- limit
position和limit的含义取决于Buffer是处于读模式还是写模式.不管Buffer处于什么模式,capacity的含义是一样的.
capacity
作为一个内存块,Buffer有一个固定的大小,值为capacity,你只能往里面写入capacity个byte、long、char等类型,一旦写满,需要将其清空(通过读数据或者清除数据)才能继续写入数据.
position
写模式:position表示当前位置,初始的position为0,当一个byte、long等数据写入Buffer后,position会向前移动到下一个可插入数据的Buffer单元,position最大可为 capacity-1
读模式:从某个特定的位置读,将当前Buffer从写模式切换到读模式,position会被重置为0,当从Buffer的position处读取数据时,position向前移动到下一个可读位置.
limit
写模式:Buffer的limit表示你最多能往Buffer里写入多少数据.写模式下,limit最大值等于capacity.
读模式:limit表示最多能读多少数据,因此,当切换Buffer到读模式时,limit会被设置为写模式下的position的值.换句话说,你能读到你之前写入的所有数据.(limit被设置成已写数据的数量,这个值在写模式下就是position)
Buffer的类型
Java NIO有一下Buffer类型
- ByteBuffer
- MappedByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
不太Buffer对应不同数据类型,换句话说,就是可以通过char、short、int、long、float或double类型来操作缓冲区中的字节.
Buffer的分配
想要获得一个Buffer对象首先要进行分配,每个Buffer类都有一个allocate方法,下面是一个分配48字节capacity
的ByteBuffer的例子.
ByteBuffer buf = ByteBuffer.allocate(48);
System.out.println(ByteBuffer.allocate(10).getClass());
System.out.println(ByteBuffer.allocateDirect(10).getClass());
/**
* class java.nio.HeapByteBuffer -使用Java堆内存,读写效率低,受到GC影响
* class java.nio.DirectByteBuffer -使用直接内存。读写效率高(少一次数据拷贝),不会受到GC影响,分配的效率低,使用不当会内存泄露
*/
Buffer写入数据
写入数据到Buffer有两种方法:
从
Channel
写入到Buffer
int bytesRead = inChannel.read(buf); //read into buffer.
通过
Buffer
的put()
方法写到Buffer里buf.put((byte)127);
put()
方法有很多版本,允许你以不同方法从Buffer
中读取数据,例如,从指定position
读取,或者从Buffer
中读取数据到字节数组,更多Buffer实现的细节参考JavaDoc.
Buffer读取数据
读取数据到Buffer有两种方法:
调用
channel
的write
方法int writeBytes = channel.write(buf);
调用
buffer
自己的get
方法byte b = buf.get();
get ()
方法会让position
读指针向后走,如果想重复读取数据,可以调用 rewind
方法将 position
重新置为 0,或者调用 get(int i)
方法获取索引 i 的内容,它不会移动读指针.
rewind()
方法
Buffer.rewind()
将position
设回0,所以可以重读Buffer
中的内容,limit
保持不变.
clear()
和compact()
方法
一旦读完Buffer
中的数据,需要清空Buffer
以备再次写入,可以通过clear()
或compact()
方法来完成.
调用的是clear()
方法,position
将被设置为0,limit
被设置为capacity
值,意思是,Buffer
被清空,但是数据未清除,只是标记我们可以从那个标志位开始向Buffer
写入数据.
如果Buffer
中有一些未读数据,调用clear()
方法,数据将会被遗忘,意味着不会标记任何数据是读过的还是未读的.
如果希望保留未读的数据,且后续需要这些数据,但又要先写入一些数据,则需使用compact()
方法
compact()
方法将未读的数据拷贝到Buffer
起始处,然后将position
设到最后一个未读元素后面,limit
属性依然和clear()
一样,设置为capacity
,现在Buffer
准备好写入数据了,但不会覆盖未读数据.
mark()
与reset()
方法
调用Buffer.make()
方法,可以标记Buffer
中的一个特定position
,之后可以同过Buffer.reset()
方法将position
恢复到make
位置.
buffer.mark();
// 调用 buffer.get() 几次,例如 在解析过程中。
buffer.reset(); //将位置设置回标记
/注意
rewind 和 flip 都会清除 mark 位置
equals()
方法
满足以下条件时为true
- 相同的类型
Buffer
中剩余的byte、char等的个数相等- Buffer中所有剩余的byte、char等都相同
equals
只是比较Buffer
的一部分,不是每一个在它里面的元素都比较.实际上,它只比较Buffer中的剩余元素.
comareTo()
方法
compareTo()方法比较两个Buffer的剩余元素(byte、char等), 如果满足下列条件,则认为一个Buffer“小于”另一个Buffer:
- 第一个不相等的元素小于另一个Buffer中对应的元素 。
- 所有元素都相等,但第一个Buffer比另一个先耗尽(第一个Buffer的元素个数比另一个少)。
Buffer与字符串的转换
//1. 字符串转为ByteBuffer
ByteBuffer buffer1 = ByteBuffer.allocate(16);
buffer1.put("hello".getBytes());
ByteBufferUtil.debugAll(buffer1);
//2. Charset
ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("hello"); //会自动切换到读模式
ByteBufferUtil.debugAll(buffer2);
//3. wrap
ByteBuffer buffer3 = ByteBuffer.wrap("hello".getBytes()); //会自动切换到读模式
ByteBufferUtil.debugAll(buffer3);
/**--*--buffer转字符串--*--**/
CharBuffer str1 = StandardCharsets.UTF_8.decode(buffer2);
System.out.println(str1.toString());
//需要先切换到读模式
buffer1.flip();
CharBuffer str2 = StandardCharsets.UTF_8.decode(buffer1);
System.out.println(str2.toString());
Scatter/Gather
Scatter(分散)从Channel中读取是指在读操作时将读取的数据写入多个Buffer中。因此,Channel将从Channel中读取的数据“分散(Scatter)”到多个Buffer中。
Gather(集散)写入Channel是指在写操作时将多个Buffer的数据写入同一个Channel中,因此,Channel将多个Buffer中的数据“聚集(Gather)”后发送到Buffer中。
scatter / gather经常用于需要将传输的数据分开处理的场合,例如传输一个由消息头和消息体组成的消息,你可能会将消息体和消息头分散到不同的buffer中,这样你可以方便的处理消息头和消息体。
Scatterering Reads
Scattering Reads是指数据从一个channel读取到多个buffer中。如下图描述:
代码示例:
try (FileChannel channel = new RandomAccessFile("words.txt", "r").getChannel()) {
ByteBuffer b1 = ByteBuffer.allocate(3);
ByteBuffer b2 = ByteBuffer.allocate(3);
ByteBuffer b3 = ByteBuffer.allocate(5);
channel.read(new ByteBuffer[]{b1, b2, b3});
b1.flip();
b2.flip();
b3.flip();
ByteBufferUtil.debugAll(b1);
ByteBufferUtil.debugAll(b2);
ByteBufferUtil.debugAll(b3);
} catch (IOException e) {
} ;
这里需要注意,Buffer数组作为read()
的入参,会按顺序将将数据写入到Buffer中,当写满一个才会到下一个。
Scattering Reads在移动下一个buffer前,必须填满当前的buffer,意味着它不适合动态的消息,或者说大小不固定的消息,换句话说,如果存在消息头和消息体,消息头必须完成填充(例如 128byte),Scattering Reads才能正常工作。
Gathering Writes
Gathering Writes是指数据从多个buffer写入到同一个channel。如下图描述:
代码示例:
ByteBuffer b1 = StandardCharsets.UTF_8.encode("hello");
ByteBuffer b2 = StandardCharsets.UTF_8.encode("world");
ByteBuffer b3 = StandardCharsets.UTF_8.encode("你好");
try (FileChannel channel = new RandomAccessFile("words2.txt", "rw").getChannel()) {
channel.write(new ByteBuffer[]{b1, b2, b3});
} catch (IOException e) {
}
buffers数组是write()方法的入参,write()方法会按照buffer在数组中的顺序,将数据写入到channel,注意只有position和limit之间的数据才会被写入。因此,如果一个buffer的容量为128byte,但是仅仅包含58byte的数据,那么这58byte的数据将被写入到channel中。因此与Scattering Reads相反,Gathering Writes能较好的处理动态消息。
通道之间的数据传输
在NIO中,如果两个通道中有一个是FileChannel
,那就可以之间将数据从一个Channel
拷贝到另一个Channel
。涉及到零拷贝,后面有写
transferTo()
transferTo()方法将数据从FileChannel传输到其他的channel中。下面是一个简单的例子:
try {
FileChannel from = new FileInputStream("data.txt").getChannel();
FileChannel to = new FileOutputStream("to.txt").getChannel();
// 效率高 底层会利用操作系统的零拷贝优化 注意:一次最大传2G 下面解决方法
from.transferTo(0, from.size(), to);
} catch (IOException e) {
e.printStackTrace();
}
//解决方法
try {
FileChannel from = new FileInputStream("data.txt").getChannel();
FileChannel to = new FileOutputStream("to.txt").getChannel();
// 效率高 底层会利用操作系统的零拷贝优化 一次最大传2G
long size = from.size();
//left代表还剩余多少数据没有传输
for (long left = size; left > 0; ) {
System.out.println("position: " + (size - left) + " left: " + left);
left -= from.transferTo(size - left, left, to);
}
} catch (IOException e) {
e.printStackTrace();
}
transferFrom()
FileChannel的transferFrom()方法可以将数据从源通道传输到FileChannel中。
简单例子:
RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
FileChannel fromChannel = fromFile.getChannel();
RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
FileChannel toChannel = toFile.getChannel();
long position = 0;
long count = fromChannel.size();
toChannel.transferFrom(position, count, fromChannel);
方法的输入参数position表示从position处开始向目标文件写入数据,count表示最多传输的字节数。如果源通道的剩余空间小于 count 个字节,则所传输的字节数要小于请求的字节数。
此外要注意,在SoketChannel的实现中,SocketChannel只会传输此刻准备好的数据(可能不足count字节)。因此,SocketChannel可能不会将请求的所有数据(count个字节)全部传输到FileChannel中。
是不是发现这个例子和前面那个例子特别相似?除了调用方法的FileChannel对象不一样外,其他的都一样。
Selector
Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件,这样,一个单独的线程可以管理多个Channel,从而管理多个网络连接。
为什么使用Selector
仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)。因此,使用的线程越少越好。
但是,需要记住,现代的操作系统和CPU在多任务方面表现的越来越好,所以多线程的开销随着时间的推移,变得越来越小了。实际上,如果一个CPU有多个内核,不使用多任务可能是在浪费CPU能力。不管怎么说,关于那种设计的讨论应该放在另一篇不同的文章中。在这里,只要知道使用Selector能够处理多个通道就足够了。
Selector的创建
通过调用Selector.open()
方法创建一个Selector,如下:
Selector selector = Selector.open();
绑定Channel事件
也称之为注册事件,为了Channel和Selector配合使用,必须将Channel注册到Selector上,通过SelectableChannel.register()
方法实现,如下:
channel.configureBlocking(false);//设置非阻塞模式
//向Selector注册Channel时,`Channel.register()`方法会返回一个SelectionKey对象,这个对象代表注册到该Selector的通道。
SelectionKey key = channel.register(selector, 绑定事件);
- channel必须工作在非阻塞模式下
- FileChannel 没用非阻塞模式,因此不能配合 selector 一起使用
注意register()方法的第二个参数。这是一个“interest集合”,意思是在通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件:
- Connect - 客户端连接建立后触发
- Accept - 会在有连接请求时触发
- Read - 可读事件
- Write - 可写事件
通道触发了一个事件意思是该事件已经就绪。所以,某个channel成功连接到另一个服务器称为“连接就绪”。一个server socket channel准备好接收新进入的连接称为“接收就绪”。一个有数据可读的通道可以说是“读就绪”。等待写数据的通道可以说是“写就绪”。
这四种事件用SelectionKey的四个常量来表示:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如下:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
在下面还会继续提到interest集合。
SelectionKey
向Selector注册Channel时,register()
方法会返回一个SelectionKey对象,这个对象包含了一些属性:
- interest集合
- ready集合
- Channel
- Selector
- 附加对象(可选)
interest集合
就像向Selector注册通道一节中所描述的,interest集合是你所选择的感兴趣的事件集合。可以通过SelectionKey读写interest集合,像这样:
int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
可以看到,用“位与”操作insterest集合和SelectionKey常量,可以确定某个确定的事件是否在insterest集合中。
ready集合
ready集合是通道已经准备就绪的操作的集合,在一次选择(Selection)之后,你会首先访问这个ready set。Selection将在下一小节进行解释。可以这样访问ready集合:
int readySet = selectionKey.readyOps();
可以用像检测interest集合那样的方法,来检测channel中什么事件或操作已经就绪。但是,也可以使用以下四个方法,它们都会返回一个布尔类型:
selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();
Channel+Selector
从SelectionKey访问Channel和Selector很简单。如下:
Channel channel = selectionKey.channel(); Selector selector = selectionKey.selector();
附加对象
可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。例如,可以附加 与通道一起使用的Buffer,或是包含聚集数据的某个对象。使用方法如下:
selectionKey.attach(theObject); Object attachedObj = selectionKey.attachment();
还可以在用
register()
方法向Selector注册Channel的时候附加对象。如:SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
监听Channel事件
一旦向Selector注册了一个或多个通道,就可用调用select()
方法,这些方法会根据你所感兴趣的事件(如连接、接受、读或写)已经准备就绪的那些通道。
方法1,阻塞直到绑定事件发生
int count = selector.select();
方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)
int count = selector.select(long timeout);
方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件
int count = selector.selectNow();
select 何时不阻塞
- 事件发生时
- 客户端发起连接请求,会触发 accept 事件
- 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件
- channel 可写,会触发 write 事件
- 在 linux 下 nio bug 发生时
- 调用 selector.wakeup()
- 调用 selector.close()
- selector 所在线程 interrupt
selectedKeys()
一旦select()
方法执行完毕,就可以通过调用selector的selectedKeys()
方法,访问就绪通道,如下:
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 或
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
可以通过遍历这个集合访问就绪的通道,如下:
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
}
这个循环遍历了所有已就绪的Key,并检测对应的事件。
SelectionKey.channel()方法返回的通道需要转型成你要处理的类型,如ServerSocketChannel或SocketChannel等。
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
完整的示例:
server端:
//1.创建selector对象 管理多个channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
//2.建立selector和channel的联系/注册
//SelectionKey 就是将来事件发生后 通过它可以知道事件和哪个channel的事件
SelectionKey sscKey = ssc.register(selector, 0, null);
//key 只关注 accpet 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
System.out.println("register: " + sscKey);
while (true) {
//3. select方法
//select在事件未处理时,它不会阻塞,事件发生后要么处理,要么取消,不能不管
selector.select();//阻塞方法 没有事件发生时会阻塞 发生事件继续运行
//4. 处理事件 selectedKeys()内部包含了所有发生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
System.out.println("key: " + key);
//处理事件
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel accept = channel.accept();
System.out.println("" + accept);
//取消事件
//key.channel();
// 处理完毕,必须将事件移除
iter.remove();
}
}
事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发
因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如
- 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey
- 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常
client端:
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",8080));
System.out.println("waiting...");
消息边界问题
在服务器端接收到的消息,长度是不定的,如图:
为了解决这种情况,有如下三种方法:
- 固定消息长度,数据大小一样,服务器按照预定长度读取
- 分隔符拆分
- TLV格式(T:Type类型,L:Length长度,V:Value类型),类型和格式已知的情况下,去分配Buffer
- HTTP1是TLV格式
- HTTP2是LTV格式
使用第二种方式的代码:
服务器端
public static void main(String[] args) throws IOException {
//1.创建selector对象 管理多个channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
//2.建立selector和channel的联系/注册
//SelectionKey 就是将来事件发生后 通过它可以知道事件和哪个channel的事件
SelectionKey sscKey = ssc.register(selector, 0, null);
//key 只关注 accpet 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
System.out.println("register: " + sscKey);
while (true) {
//3. select方法
//select在事件未处理时,它不会阻塞,事件发生后要么处理,要么取消,不能不管
selector.select();//阻塞方法 没有事件发生时会阻塞 发生事件继续运行
//4. 处理事件 selectedKeys()内部包含了所有发生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();//处理key后 必须从selectedKeys()中删除 否则下次还会处理
System.out.println("key: " + key);
//5.区分事件类型
if (key.isAcceptable()) { //如果是accpet
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16);
//将一个ByteBuffer作为附件关联到selectionKey上
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
System.out.println("" + sc);
} else if (key.isReadable()) {//如果是read
try {
SocketChannel channel = (SocketChannel) key.channel();
// ByteBuffer buffer = ByteBuffer.allocate(16);
ByteBuffer buffer = (ByteBuffer) key.attachment();//从附件中获取
int read = channel.read(buffer);
if (read == -1) {//-1表示客户端主动断开连接
key.cancel();
System.out.println("一个客户端断开连接");
} else {
split(buffer);
if (buffer.position() == buffer.limit()) { //如果读到的消息没用结束符 则进行扩容
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();//切换到读模式
newBuffer.put(buffer);
key.attach(newBuffer);//将新的Buffer放到附件中
}
}
} catch (IOException e) {
e.printStackTrace();
//因为客户端断开了,因此需要将key取消(从selector的keys中真正删除key)
key.cancel();
}
}
}
}
}
private static void split(ByteBuffer source) {
source.flip();
for (int i = 0; i < source.limit(); i++) {
//找到一条完整消息
if (source.get(i) == '\n') {
int length = i + 1 - source.position();
//将完整消息存入新的Buffer
ByteBuffer target = ByteBuffer.allocate(length);
//从 source 读 , 向 target 写
for (int j = 0; j < length; j++) {
target.put(source.get());
}
ByteBufferUtil.debugAll(target);
}
}
source.compact();
}
客户端
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",8080));
sc.write(Charset.defaultCharset().encode("1234567890\n0987654321aaaa"));
sc.write(Charset.defaultCharset().encode("12345678900987654321\n"));
System.out.println("waiting...");
System.in.read();
NIO vs BIO
Stream vs Channel
- Stream不会自动缓存数据,channel会利用系统提供的发送缓冲区、接收缓冲区
- Stream仅支持阻塞API,Channel同时支持阻塞、非阻塞API,网络Channel可配合Selector实现多路复用
- 二者均为全双工,即读写可以同时进行
IO模型
同步阻塞、同步非阻塞、同步多路复用、异步阻塞(不存在的)、异步非阻塞
- 同步:线程自己去获取结果(一个线程)
- 异步:线程不自己获取结果,而是由其他线程发送结果(至少两个线程)
零拷贝
传统IO问题
传统的IO将一个文件通过Socket写出
File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");
byte[] buf = new byte[(int)f.length()];
file.read(buf);
Socket socket = ...;
socket.getOutputStream().write(buf);
内部工作流程:
java本身不具备IO读写能力,因此Read方法调用后,要从java程序的用户态切换到内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统会使用DMA(Direct Memory Access)来实现文件读,期间也不会使用CPU
DMA也可以理解为硬件单元,用来解放 cpu 完成文件 IO
从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即byte[] buf),这期间cpu会参与拷贝,无法利用cpu
调用write方法,这时将数据从用户缓冲区(即byte[] buf)写入socket缓冲区,cpu会参与拷贝
接下来要向网卡写入数据,这项能力java又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用DMA将socket缓冲区的数据写入网卡,又不会使用cpu
可以看到中间环节较多,java的IO实际不是物理设备级别的读写,而是缓存的复制,底层真正的读写是又操作系统来完成的
- 用户态和内核态的切换发生了3次,这是重量级的
- 数据拷贝了共4次
NIO
通过DirectByteBuff
ByteBuffer.allocate(10) HeapByteBuffer
使用的是 java 内存ByteBuffer.allocateDirect(10) DirectByteBuffer
使用的是操作系统内存
基本和优化前相同,唯有一点:java可以通过 DirectByteBuff
将堆外内存映射到jvm内存中来直接访问使用
- 这块内存不受jvm垃圾回收的影响,因此内存地址固定,有助于IO读写
- java中的DirectByteBuf对象仅维护了此内存的虚引用,内存回收分成两部
- DirectByteBuf对象垃圾回收,将虚引用加入引用队列
- 通过专门线程访问引用队列,根据虚引用是否堆外内存
- 减少了一次数据拷贝,用户态和内核态的切换次数没用减少
进一步优化(底层使用了lunux2.1后提供的sendFile方法),java中对应着两个channel调用transferTo
和transferForm
方法拷贝数据
- java调用transferTo方法后,要从java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不会使用
- 数据从内核缓冲区传输到socket缓冲区,cpu会参与拷贝
- 最后使用DMA将socket缓冲区的数据写入网卡,不会使用cpu
可以看到,只发生了一次用户态和内核态的切换,数据拷贝了3次
再进一步优化(linux2.4)
- java调用transferTo方法后,要从java程序的用户态切换到内核态,使用DMA将数据读入内核缓冲区,不会使用cpu
- 只会将一些 offset 和 length 信息考入 socket缓冲区,几乎无消耗
- 使用DMA将内核缓冲区的数据写入网卡,不会使用cpu
整个过程同意只发生了一次用户态和内核态的切换,数据拷贝了2次,所谓的零拷贝,并不是真正的无拷贝,而是不会拷贝重复数据到jvm内存中,零拷贝的的优点:
- 更少的用户态与内核态的切换
- 不利于cpu计算,减少cpu利用
- 零拷贝适合小文件传输
AIO
AIO用于解决数据复制阶段的阻塞问题
- 同步意味着,在进行读写操作时,线程需要等待结果,相当于闲置
- 异步意味着,在进行读写操作时,线程不必等待结果,而是由操作系统来通过回调方式由另外的线程获得结果
异步模型需要底层操作系统(Kernel)提供支持
- Windows系统通过IOCP实现了真正的异步IO
- Linux系统异步IO在2.6版本引入,但是底层实现还是多路复用模拟异步IO,性能没优势
案例:
public class AioFileChannel {
public static void main(String[] args) throws IOException {
try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ)) {
//参数1:ByterBuffer
//参数2:读取的起始位置
//参数3:附件
//参数4:回调对象
ByteBuffer buffer = ByteBuffer.allocate(16);
log.debug("read begin...");
channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
log.debug("read completed...{}", result);
attachment.flip();
debugAll(attachment);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
log.error(exc.getMessage());
}
});
log.debug("read end...");
} catch (IOException e) {
e.printStackTrace();
}
System.in.read();
}
}
输出
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - begin...
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - do other things...
13:44:56 [DEBUG] [Thread-5] c.i.aio.AioDemo1 - read completed...2
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 0d |a. |
+--------+-------------------------------------------------+----------------+
- 相应文件读取成功的是另一个线程Thread-5
- 主线程并没用IO操作
参考
文章1: https://blog.csdn.net/forezp/article/details/88414741/