Java NIO 概述

NIO主要有三大核心部分: Channel(通道)Buffer(缓冲区)Selector(选择区),区别与传统IO基于字节流和字符流进行操作,而NIO基于ChannelBuffer进行操作,数据总是从通道读取到缓存区中,或者从缓冲区写入到通道中,Selector用与监听多个通道的事件.因此单线程可以监听多个数据通道.

Channel 和 Buffer

基本上,所有的 IO 在NIO 中都从一个Channel 开始。Channel 有点象流。 数据可以从Channel读到Buffer中,也可以从Buffer 写到Channel中。

image-20211128222910933

Channel的主要实现:

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

这些实现涵盖了UDP和TCP及网络IO与文件IO.


Buffer的主要实现hexo

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer
  • MappedByteBuffer(用于内存映射)

这些Buffer覆盖了你能通过IO发送的基本数据类型:byte, short, int, long, float, double 和 char。

Selector

Selector允许单个线程处理多个Channel,如果你的应用打开了多个连接(通道),但每个流量都很低(低流量场景),使用Selector会方便.

image-20211128223911936

要使用Selector,得向Selector注册Channel,然后调用它的Select()方法.这个方法会一直阻塞到某个注册的通道有事件就绪,一旦这个方法返回,线程就可以处理这些事件.

Buffer

使用Buffer读写数据一般遵循四个步骤

  1. 写入数据到Buffer
  2. 调用flip()方法
  3. Buffer中读取数据
  4. 调用clear()方法或者compact()方法

向buffer写入数据时,buffer会记录写入了多少数据,一旦要读取数据,需要通过filp()方法将buffer切换到读模式,在读模式下,可以读取之前写入到buffer的所有数据.

一旦读完了所有数据,就需要清空缓冲区,让它可以被再次写入,有两种方法能清空缓冲区:

  1. claer()清空整个缓冲区
  2. compact()清除已读过的数据,未读的会被移动至缓冲区起始处,新数据接在后面

案例:

File file = new File("test.txt");
FileChannel fc = new RandomAccessFile(file, "rw").getChannel();

//创建容量为 3 字节的缓冲区
ByteBuffer buf = ByteBuffer.allocate(3);

int bytesRead;
//读入缓冲区
while ((bytesRead = fc.read(buf)) != -1) {
    System.out.println("Read-" + bytesRead);
    
    //使Buffer状态变为准备读
    buf.flip();

    while (buf.hasRemaining()){
        //一次读取 1 个字节
        System.out.println((char) buf.get());
    }
    
    //清空缓冲区,准备重新写入
    buf.clear();
}

Buffer(缓冲区)的本质是一块可以写入数据,然后可以从中读取数据的内存,这块内存被包装为NIO Buffer对象,提供了一组方法,用来方便访问该块内存.

为了理解Buffer的工作原理,需要熟悉它的三个属性:

  • capacity
  • position
  • limit

position和limit的含义取决于Buffer是处于读模式还是写模式.不管Buffer处于什么模式,capacity的含义是一样的.

image-20211128224235167

capacity

作为一个内存块,Buffer有一个固定的大小,值为capacity,你只能往里面写入capacity个byte、long、char等类型,一旦写满,需要将其清空(通过读数据或者清除数据)才能继续写入数据.

position

写模式:position表示当前位置,初始的position为0,当一个byte、long等数据写入Buffer后,position会向前移动到下一个可插入数据的Buffer单元,position最大可为 capacity-1

读模式:从某个特定的位置读,将当前Buffer从写模式切换到读模式,position会被重置为0,当从Buffer的position处读取数据时,position向前移动到下一个可读位置.

limit

写模式:Buffer的limit表示你最多能往Buffer里写入多少数据.写模式下,limit最大值等于capacity.

读模式:limit表示最多能读多少数据,因此,当切换Buffer到读模式时,limit会被设置为写模式下的position的值.换句话说,你能读到你之前写入的所有数据.(limit被设置成已写数据的数量,这个值在写模式下就是position)

Buffer的类型

Java NIO有一下Buffer类型

  • ByteBuffer
  • MappedByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

不太Buffer对应不同数据类型,换句话说,就是可以通过char、short、int、long、float或double类型来操作缓冲区中的字节.

Buffer的分配

想要获得一个Buffer对象首先要进行分配,每个Buffer类都有一个allocate方法,下面是一个分配48字节capacity的ByteBuffer的例子.

ByteBuffer buf = ByteBuffer.allocate(48);
System.out.println(ByteBuffer.allocate(10).getClass());
System.out.println(ByteBuffer.allocateDirect(10).getClass());
/**
  * class java.nio.HeapByteBuffer    -使用Java堆内存,读写效率低,受到GC影响
  * class java.nio.DirectByteBuffer  -使用直接内存。读写效率高(少一次数据拷贝),不会受到GC影响,分配的效率低,使用不当会内存泄露
*/

Buffer写入数据

写入数据到Buffer有两种方法:

  1. Channel写入到Buffer

    int bytesRead = inChannel.read(buf); //read into buffer.
  2. 通过Bufferput()方法写到Buffer里

    buf.put((byte)127);

put()方法有很多版本,允许你以不同方法从Buffer中读取数据,例如,从指定position读取,或者从Buffer中读取数据到字节数组,更多Buffer实现的细节参考JavaDoc.


Buffer读取数据

读取数据到Buffer有两种方法:

  1. 调用channelwrite方法

    int writeBytes = channel.write(buf);
  2. 调用buffer自己的get方法

    byte b = buf.get();

get ()方法会让position读指针向后走,如果想重复读取数据,可以调用 rewind 方法将 position 重新置为 0,或者调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针.


rewind()方法

Buffer.rewind()position设回0,所以可以重读Buffer中的内容,limit保持不变.

clear()compact()方法

一旦读完Buffer中的数据,需要清空Buffer以备再次写入,可以通过clear()compact()方法来完成.

调用的是clear()方法,position将被设置为0,limit被设置为capacity值,意思是,Buffer被清空,但是数据未清除,只是标记我们可以从那个标志位开始向Buffer写入数据.

如果Buffer中有一些未读数据,调用clear()方法,数据将会被遗忘,意味着不会标记任何数据是读过的还是未读的.

如果希望保留未读的数据,且后续需要这些数据,但又要先写入一些数据,则需使用compact()方法

compact()方法将未读的数据拷贝到Buffer起始处,然后将position设到最后一个未读元素后面,limit属性依然和clear()一样,设置为capacity,现在Buffer准备好写入数据了,但不会覆盖未读数据.

mark()reset()方法

调用Buffer.make()方法,可以标记Buffer中的一个特定position,之后可以同过Buffer.reset()方法将position恢复到make位置.

buffer.mark();
// 调用 buffer.get() 几次,例如 在解析过程中。
buffer.reset(); //将位置设置回标记
/注意
rewind 和 flip 都会清除 mark 位置

equals()方法

满足以下条件时为true

  1. 相同的类型
  2. Buffer中剩余的byte、char等的个数相等
  3. Buffer中所有剩余的byte、char等都相同

equals只是比较Buffer的一部分,不是每一个在它里面的元素都比较.实际上,它只比较Buffer中的剩余元素.

comareTo()方法

compareTo()方法比较两个Buffer的剩余元素(byte、char等), 如果满足下列条件,则认为一个Buffer“小于”另一个Buffer:

  1. 第一个不相等的元素小于另一个Buffer中对应的元素 。
  2. 所有元素都相等,但第一个Buffer比另一个先耗尽(第一个Buffer的元素个数比另一个少)。

Buffer与字符串的转换

//1. 字符串转为ByteBuffer
ByteBuffer buffer1 = ByteBuffer.allocate(16);
buffer1.put("hello".getBytes());
ByteBufferUtil.debugAll(buffer1);

//2. Charset
ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("hello"); //会自动切换到读模式
ByteBufferUtil.debugAll(buffer2);

//3. wrap
ByteBuffer buffer3 = ByteBuffer.wrap("hello".getBytes()); //会自动切换到读模式
ByteBufferUtil.debugAll(buffer3);

/**--*--buffer转字符串--*--**/

CharBuffer str1 = StandardCharsets.UTF_8.decode(buffer2);
System.out.println(str1.toString());

//需要先切换到读模式
buffer1.flip();
CharBuffer str2 = StandardCharsets.UTF_8.decode(buffer1);
System.out.println(str2.toString());

Scatter/Gather

Scatter(分散)从Channel中读取是指在读操作时将读取的数据写入多个Buffer中。因此,Channel将从Channel中读取的数据“分散(Scatter)”到多个Buffer中。

Gather(集散)写入Channel是指在写操作时将多个Buffer的数据写入同一个Channel中,因此,Channel将多个Buffer中的数据“聚集(Gather)”后发送到Buffer中。

scatter / gather经常用于需要将传输的数据分开处理的场合,例如传输一个由消息头和消息体组成的消息,你可能会将消息体和消息头分散到不同的buffer中,这样你可以方便的处理消息头和消息体。

Scatterering Reads

Scattering Reads是指数据从一个channel读取到多个buffer中。如下图描述:

image-20211004161855199

代码示例:

try (FileChannel channel = new RandomAccessFile("words.txt", "r").getChannel()) {
    ByteBuffer b1 = ByteBuffer.allocate(3);
    ByteBuffer b2 = ByteBuffer.allocate(3);
    ByteBuffer b3 = ByteBuffer.allocate(5);
    channel.read(new ByteBuffer[]{b1, b2, b3});

    b1.flip();
    b2.flip();
    b3.flip();

    ByteBufferUtil.debugAll(b1);
    ByteBufferUtil.debugAll(b2);
    ByteBufferUtil.debugAll(b3);
} catch (IOException e) {
} ;

这里需要注意,Buffer数组作为read()的入参,会按顺序将将数据写入到Buffer中,当写满一个才会到下一个。

Scattering Reads在移动下一个buffer前,必须填满当前的buffer,意味着它不适合动态的消息,或者说大小不固定的消息,换句话说,如果存在消息头和消息体,消息头必须完成填充(例如 128byte),Scattering Reads才能正常工作。

Gathering Writes

Gathering Writes是指数据从多个buffer写入到同一个channel。如下图描述:

image-20211004162805846

代码示例:

ByteBuffer b1 = StandardCharsets.UTF_8.encode("hello");
ByteBuffer b2 = StandardCharsets.UTF_8.encode("world");
ByteBuffer b3 = StandardCharsets.UTF_8.encode("你好");

try (FileChannel channel = new RandomAccessFile("words2.txt", "rw").getChannel()) {
    channel.write(new ByteBuffer[]{b1, b2, b3});
} catch (IOException e) {
}

buffers数组是write()方法的入参,write()方法会按照buffer在数组中的顺序,将数据写入到channel,注意只有position和limit之间的数据才会被写入。因此,如果一个buffer的容量为128byte,但是仅仅包含58byte的数据,那么这58byte的数据将被写入到channel中。因此与Scattering Reads相反,Gathering Writes能较好的处理动态消息。

通道之间的数据传输

在NIO中,如果两个通道中有一个是FileChannel,那就可以之间将数据从一个Channel拷贝到另一个Channel。涉及到零拷贝,后面有写

transferTo()

transferTo()方法将数据从FileChannel传输到其他的channel中。下面是一个简单的例子:

try {
    FileChannel from = new FileInputStream("data.txt").getChannel();
    FileChannel to = new FileOutputStream("to.txt").getChannel();

    // 效率高 底层会利用操作系统的零拷贝优化 注意:一次最大传2G 下面解决方法
    from.transferTo(0, from.size(), to);
} catch (IOException e) {
    e.printStackTrace();
}

//解决方法
try {
    FileChannel from = new FileInputStream("data.txt").getChannel();
    FileChannel to = new FileOutputStream("to.txt").getChannel();

    // 效率高 底层会利用操作系统的零拷贝优化 一次最大传2G
    long size = from.size();
    //left代表还剩余多少数据没有传输
    for (long left = size; left > 0; ) {
        System.out.println("position: " + (size - left) + " left: " + left);
        left -= from.transferTo(size - left, left, to);
    }
} catch (IOException e) {
    e.printStackTrace();
}

transferFrom()

FileChannel的transferFrom()方法可以将数据从源通道传输到FileChannel中。

简单例子:

RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
FileChannel      fromChannel = fromFile.getChannel();
RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
FileChannel      toChannel = toFile.getChannel();
long position = 0;
long count = fromChannel.size();
toChannel.transferFrom(position, count, fromChannel);

方法的输入参数position表示从position处开始向目标文件写入数据,count表示最多传输的字节数。如果源通道的剩余空间小于 count 个字节,则所传输的字节数要小于请求的字节数。

此外要注意,在SoketChannel的实现中,SocketChannel只会传输此刻准备好的数据(可能不足count字节)。因此,SocketChannel可能不会将请求的所有数据(count个字节)全部传输到FileChannel中。

是不是发现这个例子和前面那个例子特别相似?除了调用方法的FileChannel对象不一样外,其他的都一样。

Selector

Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件,这样,一个单独的线程可以管理多个Channel,从而管理多个网络连接。

为什么使用Selector

仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)。因此,使用的线程越少越好。

但是,需要记住,现代的操作系统和CPU在多任务方面表现的越来越好,所以多线程的开销随着时间的推移,变得越来越小了。实际上,如果一个CPU有多个内核,不使用多任务可能是在浪费CPU能力。不管怎么说,关于那种设计的讨论应该放在另一篇不同的文章中。在这里,只要知道使用Selector能够处理多个通道就足够了。

image-20211005224348776

Selector的创建

通过调用Selector.open()方法创建一个Selector,如下:

Selector selector = Selector.open();

绑定Channel事件

也称之为注册事件,为了Channel和Selector配合使用,必须将Channel注册到Selector上,通过SelectableChannel.register()方法实现,如下:

channel.configureBlocking(false);//设置非阻塞模式
//向Selector注册Channel时,`Channel.register()`方法会返回一个SelectionKey对象,这个对象代表注册到该Selector的通道。
SelectionKey key = channel.register(selector, 绑定事件);
  • channel必须工作在非阻塞模式下
  • FileChannel 没用非阻塞模式,因此不能配合 selector 一起使用

注意register()方法的第二个参数。这是一个“interest集合”,意思是在通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件:

  1. Connect - 客户端连接建立后触发
  2. Accept - 会在有连接请求时触发
  3. Read - 可读事件
  4. Write - 可写事件

通道触发了一个事件意思是该事件已经就绪。所以,某个channel成功连接到另一个服务器称为“连接就绪”。一个server socket channel准备好接收新进入的连接称为“接收就绪”。一个有数据可读的通道可以说是“读就绪”。等待写数据的通道可以说是“写就绪”。

这四种事件用SelectionKey的四个常量来表示:

  1. SelectionKey.OP_CONNECT
  2. SelectionKey.OP_ACCEPT
  3. SelectionKey.OP_READ
  4. SelectionKey.OP_WRITE

如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如下:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

在下面还会继续提到interest集合。

SelectionKey

向Selector注册Channel时,register()方法会返回一个SelectionKey对象,这个对象包含了一些属性:

  • interest集合
  • ready集合
  • Channel
  • Selector
  • 附加对象(可选)

interest集合

就像向Selector注册通道一节中所描述的,interest集合是你所选择的感兴趣的事件集合。可以通过SelectionKey读写interest集合,像这样:

int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept  = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

可以看到,用“位与”操作insterest集合和SelectionKey常量,可以确定某个确定的事件是否在insterest集合中。

ready集合

ready集合是通道已经准备就绪的操作的集合,在一次选择(Selection)之后,你会首先访问这个ready set。Selection将在下一小节进行解释。可以这样访问ready集合:

int readySet = selectionKey.readyOps();

可以用像检测interest集合那样的方法,来检测channel中什么事件或操作已经就绪。但是,也可以使用以下四个方法,它们都会返回一个布尔类型:

selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

Channel+Selector

从SelectionKey访问Channel和Selector很简单。如下:

Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();

附加对象

可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。例如,可以附加 与通道一起使用的Buffer,或是包含聚集数据的某个对象。使用方法如下:

selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

还可以在用register()方法向Selector注册Channel的时候附加对象。如:

SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

监听Channel事件

一旦向Selector注册了一个或多个通道,就可用调用select()方法,这些方法会根据你所感兴趣的事件(如连接、接受、读或写)已经准备就绪的那些通道。

方法1,阻塞直到绑定事件发生

int count = selector.select();

方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)

int count = selector.select(long timeout);

方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件

int count = selector.selectNow();

select 何时不阻塞

  • 事件发生时
    • 客户端发起连接请求,会触发 accept 事件
    • 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件
    • channel 可写,会触发 write 事件
    • 在 linux 下 nio bug 发生时
  • 调用 selector.wakeup()
  • 调用 selector.close()
  • selector 所在线程 interrupt

selectedKeys()

一旦select()方法执行完毕,就可以通过调用selector的selectedKeys()方法,访问就绪通道,如下:

Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 或
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();

可以通过遍历这个集合访问就绪的通道,如下:

Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
    SelectionKey key = iter.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
}

这个循环遍历了所有已就绪的Key,并检测对应的事件。

SelectionKey.channel()方法返回的通道需要转型成你要处理的类型,如ServerSocketChannel或SocketChannel等。

ServerSocketChannel channel = (ServerSocketChannel) key.channel();

完整的示例:

server端:

//1.创建selector对象 管理多个channel
Selector selector = Selector.open();

ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));

//2.建立selector和channel的联系/注册
//SelectionKey 就是将来事件发生后 通过它可以知道事件和哪个channel的事件
SelectionKey sscKey = ssc.register(selector, 0, null);
//key 只关注 accpet 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
System.out.println("register: " + sscKey);

while (true) {
    //3. select方法
    //select在事件未处理时,它不会阻塞,事件发生后要么处理,要么取消,不能不管
    selector.select();//阻塞方法 没有事件发生时会阻塞 发生事件继续运行

    //4. 处理事件 selectedKeys()内部包含了所有发生的事件
    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
    while (iter.hasNext()) {
        SelectionKey key = iter.next();
        System.out.println("key: " + key);
        //处理事件
        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
        SocketChannel accept = channel.accept();
        System.out.println("" + accept);
        //取消事件
        //key.channel();
        // 处理完毕,必须将事件移除
        iter.remove();
    }
}

事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发

因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如

  • 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey
  • 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

client端:

SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",8080));
System.out.println("waiting...");

消息边界问题

在服务器端接收到的消息,长度是不定的,如图:

image-20211107221846884

为了解决这种情况,有如下三种方法:

  1. 固定消息长度,数据大小一样,服务器按照预定长度读取
  2. 分隔符拆分
  3. TLV格式(T:Type类型,L:Length长度,V:Value类型),类型和格式已知的情况下,去分配Buffer
    • HTTP1是TLV格式
    • HTTP2是LTV格式

image-20211114145030707

使用第二种方式的代码:

服务器端

public static void main(String[] args) throws IOException {
    //1.创建selector对象 管理多个channel
    Selector selector = Selector.open();

    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false);
    ssc.bind(new InetSocketAddress(8080));

    //2.建立selector和channel的联系/注册
    //SelectionKey 就是将来事件发生后 通过它可以知道事件和哪个channel的事件
    SelectionKey sscKey = ssc.register(selector, 0, null);
    //key 只关注 accpet 事件
    sscKey.interestOps(SelectionKey.OP_ACCEPT);
    System.out.println("register: " + sscKey);

    while (true) {
        //3. select方法
        //select在事件未处理时,它不会阻塞,事件发生后要么处理,要么取消,不能不管
        selector.select();//阻塞方法 没有事件发生时会阻塞 发生事件继续运行

        //4. 处理事件 selectedKeys()内部包含了所有发生的事件
        Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
        while (iter.hasNext()) {
            SelectionKey key = iter.next();
            iter.remove();//处理key后 必须从selectedKeys()中删除 否则下次还会处理
            System.out.println("key: " + key);
            //5.区分事件类型
            if (key.isAcceptable()) { //如果是accpet
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                SocketChannel sc = channel.accept();
                sc.configureBlocking(false);
                ByteBuffer buffer = ByteBuffer.allocate(16);
                //将一个ByteBuffer作为附件关联到selectionKey上
                SelectionKey scKey = sc.register(selector, 0, buffer);
                scKey.interestOps(SelectionKey.OP_READ);
                System.out.println("" + sc);
            } else if (key.isReadable()) {//如果是read
                try {
                    SocketChannel channel = (SocketChannel) key.channel();
                    //                        ByteBuffer buffer = ByteBuffer.allocate(16);
                    ByteBuffer buffer = (ByteBuffer) key.attachment();//从附件中获取
                    int read = channel.read(buffer);
                    if (read == -1) {//-1表示客户端主动断开连接
                        key.cancel();
                        System.out.println("一个客户端断开连接");
                    } else {
                        split(buffer);
                        if (buffer.position() == buffer.limit()) { //如果读到的消息没用结束符 则进行扩容
                            ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                            buffer.flip();//切换到读模式
                            newBuffer.put(buffer);
                            key.attach(newBuffer);//将新的Buffer放到附件中
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    //因为客户端断开了,因此需要将key取消(从selector的keys中真正删除key)
                    key.cancel();
                }
            }
        }
    }
}

private static void split(ByteBuffer source) {
    source.flip();

    for (int i = 0; i < source.limit(); i++) {
        //找到一条完整消息
        if (source.get(i) == '\n') {
            int length = i + 1 - source.position();
            //将完整消息存入新的Buffer
            ByteBuffer target = ByteBuffer.allocate(length);
            //从 source 读 , 向 target 写
            for (int j = 0; j < length; j++) {
                target.put(source.get());
            }
            ByteBufferUtil.debugAll(target);
        }
    }
    source.compact();
}

客户端

SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",8080));
sc.write(Charset.defaultCharset().encode("1234567890\n0987654321aaaa"));
sc.write(Charset.defaultCharset().encode("12345678900987654321\n"));
System.out.println("waiting...");
System.in.read();

NIO vs BIO

Stream vs Channel

  • Stream不会自动缓存数据,channel会利用系统提供的发送缓冲区、接收缓冲区
  • Stream仅支持阻塞API,Channel同时支持阻塞、非阻塞API,网络Channel可配合Selector实现多路复用
  • 二者均为全双工,即读写可以同时进行

IO模型

同步阻塞、同步非阻塞、同步多路复用、异步阻塞(不存在的)、异步非阻塞

  • 同步:线程自己去获取结果(一个线程)
  • 异步:线程不自己获取结果,而是由其他线程发送结果(至少两个线程)

零拷贝

传统IO问题

传统的IO将一个文件通过Socket写出

File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");

byte[] buf = new byte[(int)f.length()];
file.read(buf);

Socket socket = ...;
socket.getOutputStream().write(buf);

内部工作流程:

image-20211204111314653

  1. java本身不具备IO读写能力,因此Read方法调用后,要从java程序的用户态切换到内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统会使用DMA(Direct Memory Access)来实现文件读,期间也不会使用CPU

    DMA也可以理解为硬件单元,用来解放 cpu 完成文件 IO

  2. 内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即byte[] buf),这期间cpu会参与拷贝,无法利用cpu

  3. 调用write方法,这时将数据从用户缓冲区(即byte[] buf)写入socket缓冲区,cpu会参与拷贝

  4. 接下来要向网卡写入数据,这项能力java又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用DMA将socket缓冲区的数据写入网卡,又不会使用cpu

可以看到中间环节较多,java的IO实际不是物理设备级别的读写,而是缓存的复制,底层真正的读写是又操作系统来完成的

  • 用户态和内核态的切换发生了3次,这是重量级的
  • 数据拷贝了共4次

NIO

通过DirectByteBuff

  • ByteBuffer.allocate(10) HeapByteBuffer 使用的是 java 内存
  • ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存

image-20211204112340504

基本和优化前相同,唯有一点:java可以通过 DirectByteBuff 将堆外内存映射到jvm内存中来直接访问使用

  • 这块内存不受jvm垃圾回收的影响,因此内存地址固定,有助于IO读写
  • java中的DirectByteBuf对象仅维护了此内存的虚引用,内存回收分成两部
    • DirectByteBuf对象垃圾回收,将虚引用加入引用队列
    • 通过专门线程访问引用队列,根据虚引用是否堆外内存
  • 减少了一次数据拷贝,用户态和内核态的切换次数没用减少

进一步优化(底层使用了lunux2.1后提供的sendFile方法),java中对应着两个channel调用transferTotransferForm方法拷贝数据

image-20211204113142011

  1. java调用transferTo方法后,要从java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不会使用
  2. 数据从内核缓冲区传输到socket缓冲区,cpu会参与拷贝
  3. 最后使用DMA将socket缓冲区的数据写入网卡,不会使用cpu

可以看到,只发生了一次用户态和内核态的切换,数据拷贝了3次


再进一步优化(linux2.4)

image-20211204113641189

  1. java调用transferTo方法后,要从java程序的用户态切换到内核态,使用DMA将数据读入内核缓冲区,不会使用cpu
  2. 只会将一些 offset 和 length 信息考入 socket缓冲区,几乎无消耗
  3. 使用DMA将内核缓冲区的数据写入网卡,不会使用cpu

整个过程同意只发生了一次用户态内核态的切换,数据拷贝了2次,所谓的零拷贝,并不是真正的无拷贝,而是不会拷贝重复数据到jvm内存中,零拷贝的的优点:

  • 更少的用户态与内核态的切换
  • 不利于cpu计算,减少cpu利用
  • 零拷贝适合小文件传输

AIO

AIO用于解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是由操作系统来通过回调方式由另外的线程获得结果

异步模型需要底层操作系统(Kernel)提供支持

  • Windows系统通过IOCP实现了真正的异步IO
  • Linux系统异步IO在2.6版本引入,但是底层实现还是多路复用模拟异步IO,性能没优势

案例:

public class AioFileChannel {
    public static void main(String[] args) throws IOException {
        try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ)) {
            //参数1:ByterBuffer
            //参数2:读取的起始位置
            //参数3:附件
            //参数4:回调对象
            ByteBuffer buffer = ByteBuffer.allocate(16);
            log.debug("read begin...");
            channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    log.debug("read completed...{}", result);
                    attachment.flip();
                    debugAll(attachment);
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    log.error(exc.getMessage());
                }
            });
            log.debug("read end...");
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.in.read();
    }
}

输出

13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - begin...
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - do other things...
13:44:56 [DEBUG] [Thread-5] c.i.aio.AioDemo1 - read completed...2
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 0d                                           |a.              |
+--------+-------------------------------------------------+----------------+
  • 相应文件读取成功的是另一个线程Thread-5
  • 主线程并没用IO操作

参考

文章1: https://blog.csdn.net/forezp/article/details/88414741/

文章2: https://ifeve.com/java-nio-all/

视频教程: https://www.bilibili.com/video/BV1py4y1E7oA