抱歉,您的浏览器无法访问本站

本页面需要浏览器支持(启用)JavaScript


了解详情 >

网络IO模型

IO相关概念

按照 Unix网络编程 分类,可以分为以下几类:

  • 阻塞式IO
  • 非阻塞IO
  • IO多路复用
  • 信号驱动IO
  • 异步IO

对于一个IO的操作,分两个步骤:

  1. 发起 IO请求(数据准备)
  2. 实际 IO读写(内核态和用户态的数据拷贝)

阻塞与非阻塞,异步与非异步:

- 阻塞与非阻塞(blocking/non-blocking):在第一个步骤,在进行阻塞操作时,当前线程会处于阻塞状态,无法从事其他任务,只有当条件就绪才能继续,比如 ServerSocket 新连接建立完毕, 或数据读取、写入操作完成;而非阻塞则是不管 IO 操作是否结束,直接返回,相应操作在后台继续处理。

- 同步或异步(synchronous/asynchronous):在第二个步骤,实际的 IO 读写调用者是否需要进程参与,如果需要就是同步 IO,否则就不是(先开线程就已经脱离IO读写的范围了)。简单来说,同步是一种可靠的有序运行机制,当进行同步操作时,后续的任务是等待当前调用返回,才会进行下一步;而异步则相反,其他任务不需要等待当前调用返回,通常依靠事件、回调等机制来实现任务间次序关系。

而我们一般所说的异步

@Async
public void create() {
    // TODO
}

public void build() {
    executor.execute(() -> build());
} 

不管是用 @Async 注解,还是往线程池里提交任务,他们最终都是同一个结果,就是把要执行的任务,交给另外一个线程来执行。 这个时候,可以大致的认为,所谓的 “异步”,就是多线程,执行任务,也就是程序结果的差异,同步是有序的,而异步是顺序的。

Linux 文件描述符FD

Linux一切皆是文件。

文件类型

  • 套接字
  • 普通文件
  • 目录文件
  • 符号链接(软硬链)
  • 设备文件
  • FIFO

进程打开一个文件时就会创建一个文件描述符(fd), 这个fd是为了内核高效管理已经被打开文件创建的索引, 通过fd可以快速地指向被打开的文件,所有的IO操作,都是通过FD进行。

  • 进程级的文件描述符表, 每一个进程都会维护一个自己打开的文件列表, 这样就可以快速找打自己打开的文件
  • 系统级的文件描述符, 例如键盘, 鼠标…
  • 文件系统的i-Node表

阻塞式IO

用户态切换到内核态. DMA去拷贝数据, 当DMA准备好数据后, CPU会把数据从 内核态 拷贝到 用户态. 拷贝完成后, 返回成功。对应Java中的就是ServerSocket,HttpURLConnection等IO类库。

非阻塞IO

进程反复询问是否已经完成. 与阻塞IO最明显的特点是把IO操作的第一个步骤转变成了非阻塞, 把原来大块不能用的阻塞时间分成了很多小阻塞, 有点像自旋. 第二阶段是阻塞的.

IO多路复用

单个线程处监听多个Socket。节约了大量的线程上下文切换。

一旦某个FD就绪,内核就能够通知线程进行相应的读写操作; 没有文件句柄就绪会阻塞线程,让出CPU。

FD : File Descriptor, 文件描述符。一个打开的文件通过唯一的描述符进行引用, 该描述符是打开文件的元数据到文件本身的映射。

用户空间拿文件描述符找操作系统要文件(读到内存中)。

线程通过系统调用(select/epoll)获取FD列表,遍历有事件的FD进行accept/recv/send,使其能支持更多并发连接。

核心问题: 内核分发消息

同时注册多个套接字, 使用文件描述符(FD)注册到OS, 通过 select 系统调用来等待数据, 等待的是多个套接字返回, 而不仅仅是一个. 当多个套接字任意一个可读了, 就会进行返回, 告诉进程说已经有至少一个套接字可以进行数据的读取了。

然后通过 recvfrom 读取数据, 处理数据。

最明显的特点是同时注册多个文件套接字,而前面的两个IO模型都是只关注一个FD。

可以同时监听(阻塞)多个 套接字; 其中一个文件可以读取数据就马上返回。

这样就可以在一个线程里面, 去处理多个套接字.

多路复用的实现

  • Select: 线性扫描所有监听的文件描述符FD,底层基于数组。
  • poll: 同select,性能有所优化,底层基于链表
  • epoll: 使用红黑树管理数据结构,性能好。

select模型

线程维护一个 Socket FD 的列表. Selector 能够同时监控多个FD(文件描述符)的可读可写的情况, 当其中的某些FD可读或可写时, Selector 方法就会返回可读以及可写的FD,而程序就可以做其它事情而不被阻塞了。

周期性检查(监听)是否有FD变动. 通过循环遍历

  • 每次调用select,都需要把FD集合从用户态拷贝到内核态
  • 对socket扫描是线性的,采用轮询的方式。

epoll模型

内核维护了一个基于高效的二叉搜索树(红黑树)的事件表和一个就绪链表以及一个epollwait等待列表。全部工作都在内核态。通过红黑树,可以高效管理连接。在数据到来的时候,不断将数据Ready的socket放在socket的就绪链表中。

这样应用层和内核协作就很容易了。线程只要去就绪列表中查看有没有 Ready 需要被处理的Socket。有就拿走处理。只要足够多需要处理的socket,epoll_wait根本不会让线程阻塞。

epoll把用户关心的文件描述符上的事件放在内核的一个事件表中。不需要像select,poll那样每次都要重复传入文件描述符集。

一般由硬件产生的信号需要CPU立马做出回应,不然数据可能丢失,所以它的优先级很高。CPU应该中断正在执行的程序,去做出响应,在响应完之后再重新执行用户程序。中断和函数调用差不多,只不过函数调用的位置是固定的,而中断的位置则由”信号”决定。

当网卡把数据写入到内存后,网卡向cpu发出一个中断信号,操作系统便能得知由新数据到来,再通过网卡中断程序去处理数据。

OS会分时执行各个运行状态的进程(工作队列)。

阻塞指进程在等待某事件发生之前的等待状态,recv,select和epoll都是阻塞方法。

当进程创建socket语句时,OS会创建一个由文件系统管理的socket对象。socket对象包含发送缓冲区,接收缓冲区,等待队列等成员。等待队列指向要等待该socket事件的进程。

当程序执行到recv时,OS会将进程从工作队列移动到socket的等待队列中。因为进程从工作队列移除了,CPU只会执行在工作队列的进程,所以进程就阻塞了,不会占用CPU的资源。

PS:OS添加等待队列只是对这个等待的进程引用,以便在接收到数据时获取进程对象,将其唤醒,而非直接将进程纳入自己的管理之下。

Events:消息(事件),ThreadID: 线程ID。

线程注册要关注的事件, 当某个FD发生变化的时候, 可以快速通知处理该事件的线程。

注意: 通常来讲, 性能提升归根结底是算法数据结构

epoll整体流程。

select, poll, epoll 的区别

支持一个进程所能打开的最大连接数

IO多路复用 一个进程所能打开的最大连接数
select 单个进程所能打开的最大连接数由 FD_SETSIZE 宏定义, 其大小是32个整数的大小(在32位的机器上, 大小是32*32, 64位机器上是32*64, 可以对其进行修改, 然后重新编译内核, 但是性能无法保证, 需要做进一步测试), 底层是基于数组的.
poll 本质上与 select 没有区别, 但是它没有最大连接数的限制, 原因是它是基于链表来存储的.
epoll 虽然连接数上有限制, 但是很大, 1G内存的机器上可以打开10w左右的连接.

FD 剧增后带来 I/O 效率问题

IO多路复用 IO效率问题
select 因为每次调用时都会对连接进行线性遍历, 所以随着 FD 的增加会造成遍历速度的 “线性下降” 的性能问题.
poll 同上.
epoll 由于 epoll 是根据每个fd上的callback函数来实现的, 只有活跃的 socket 才能主动调用 callback, 所以在活跃 socket 较少的情况下, 使用 epoll 不会有”线性下降”的性能问题, 但是所有 socket 都很活跃的情况下, 可能会有性能问题

消息传递

IO多路复用 消息传递
select 内核需要将消息传递到用户空间, 需要内核的拷贝动作
poll 同上
epoll 通过内核和用户空间共享一块内存来实现, 性能较高

信号驱动式IO

建立信号处理程序后马上返回. 进程可以继续执行别的逻辑. 等准备好之后, OS发出信号, 告诉进程说数据准备好了. 之后就会阻塞等待数据完成. 通过回调实现的。

异步IO

aio_read调用后直接返回,到数据复制完成再返回成功给调用方。

IO模型对比

Java IO

BIO

Blocking IO,同步阻塞IO。

InputStream 和 OutputStream(字节流), Reader 和 Writer(字符流)。

  • API设计: 操作会阻塞线程
  • 原理: 利用CPU中断
    • 阻塞的线程进入休眠, 将执行权限交给其他线程.
    • 优点: 阻塞时不会占用系统资源; 程序好理解.
    • 缺点: I/O执行的两个阶段都被阻塞住了, 效率和扩展性存在明显的瓶颈, 高并发场景需要较高的线程数量(资源占用); 线程切换有成本.

适用场景: 一般的高并发场景也可以考虑(并行量特别大的场景, 如聊天除外).

注意: 性能是一个千层饼, 是一个综合而复杂的问题, 在没有压测前不要形成任何定论(多用Jmeter)

NIO

No-Blocking IO,非阻塞IO。

构建多路复用的, 同步非阻塞的IO操作. jdk1.4引入.

NIOBIO 最明显的不同是, 把I/O操作的第一个步骤变成非阻塞的, 是反复检查数据是否已经准备好了, 把原来大块不能用的阻塞时间分成了许多小阻塞, 有点类似自旋. 所以线程会不断地有机会执行, 检查数据是否准备好了, 有点类似轮询. 第二个阶段是同步的.

  • API设计: 操作不会阻塞线程, 读不到就返回null
  1. I/O变更发生后, 记录在某个特定的数据结构上, 比如epoll的红黑树.
  2. 线程可以在 数据结构注册自己关注点
    1. 文件描述符(FD)
    2. 消息类型(读, 写…)
  3. 通常给线程提供一个方法可以一次性获得自己关注的事件

jdk 中的 nio 看是 selector, 本质上会判断 linux 内核的版本, 如果大于 2.6 就使用 epoll 模型, 如果不是则使用 select 模型。

高并发可以考虑, 但是会增加写程序的成本.

NIO核心

  • Channels
  • Buffers
  • Selectors

NIO-Buffers

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer
  • MappedByteBuffer

NIO-Channels

  • FileChannel
    • transferTo : 把 FileChannel 中的数据拷贝到另外一个 Channel
    • transferFrom : 把另外一个 Channel 中的数据拷贝到 FileChannel
    • 避免了两次用户态内核态的上下文切换, 即”零拷贝”, 效率较高
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

NIO-Selector

上图是一个 Selector 处理 3 个 Channel. 要向 Selector 注册 Channel. 这个方法会一直阻塞, 直到某个 Channel 有事件返回.

// selector 的提供源码
public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                        // 这个最终是调用 JDK 底层的, 根据操作系统的不同而不同
                        // 具体可以看 JDK 源码中的 各个系统的 DefaultSelectorProvider 类, src/java.base/${ostype}
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}

在 Linux 下会使用 I/O多路复用 机制, 调用系统级别的 select/poll/epoll.

AIO

Asynchronous IO,异步IO,下面会具体说下AIO的相关问题。

  • API设计: 异步编程(基于Future)

本质是 异步 转 同步. 基于事件回调机制, jdk7引入.

future.get()是阻塞的, 是同步的 API.

原理: 利用线程池技术, 协程技术, 调度所有 Future 的计算.

通常结合epolldirectmemory技术.

epoll模型比较适合Non-Blocking(事件驱动)

三者对比

属性/模型 阻塞BIO 非阻塞IO 异步IO
blocking 阻塞并同步 非阻塞但同步 非阻塞且异步
线程数(server:client) 1:1 1:N 0:N
复杂度 简单 较复杂 复杂
吞吐量

同步与异步产生的理解偏差

对于Java的 BIO 和 NIO 是异步还是同步的,取决于使用者,给它个多线程,它就是异步的。

但实际上,是理解上出现了偏差。

那就是参考系的问题,以前学物理时,公交车上的乘客是运动还是静止,需要有参考系前提,如果以地面为参考,他是运动的,以公交车为参考,他是静止的。

Java IO 也是一样,需要有个参考系,才能定义它是同步异步,既然我们讨论的是 IO 是哪一种模式,那就是要针对 IO 读写操作这件事来理解,而其他的启动另外一个线程去处理数据,已经是脱离 IO 读写的范围了,不应该把他们扯进来。

所以以 IO 读写操作这事件作为参照,我们先尝试的这样定义,就是发起 IO 读写的线程 (调用 read 和 write 的线程),和实际操作 IO 读写的线程,如果是同一个线程,就称之为同步,否则是异步

  • 显然 BIO 只能是同步,调用 in.read () 当前线程阻塞,有数据返回的时候,接收到数据的还是原来的线程。

  • 而 NIO 也称之为同步,原因也是如此,调用 channel.read () 时,线程虽然不会阻塞,但读到数据的还是当前线程。

按照这个思路,AIO 应该是发起 IO 读写的线程,和实际收到数据的线程,可能不是同一个线程是不是这样呢,现在开始上 Java AIO 的代码。

AIO程序示例

Server端

public class AIOServer {
    public static void main(String[] args) {
        System.out.println(Thread.currentThread().getName() + " AioServer start");
        CompletionHandler<AsynchronousSocketChannel, Void> clientChannel =
                new CompletionHandler<AsynchronousSocketChannel, Void>() {
                    @Override
                    public void completed(AsynchronousSocketChannel clientChannel, Void attachment) {
                        System.out.println(Thread.currentThread().getName() + " client is connected");
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        clientChannel.read(buffer, buffer, new ClientHandler());
                    }

                    @Override
                    public void failed(Throwable exc, Void attachment) {
                        System.out.println("accept fail");
                    }
                };
        try (AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open()
                .bind(new InetSocketAddress("127.0.0.1", 8080))) {
            while (true) {
                serverChannel.accept(null, clientChannel);
                System.in.read();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static class ClientHandler implements CompletionHandler<Integer, ByteBuffer> {
        @Override
        public void completed(Integer result, ByteBuffer buffer) {
            buffer.flip();
            byte[] data = new byte[buffer.remaining()];
            buffer.get(data);
            System.out.println(Thread.currentThread().getName() + " received:" +
                    new String(data, StandardCharsets.UTF_8));

        }

        @Override
        public void failed(Throwable exc, ByteBuffer buffer) {
            System.out.println("handle fail...");
        }
    }
}

Client端

public class AIOClient {
    public static void main(String[] args) throws InterruptedException, IOException {
        try(AsynchronousSocketChannel channel = AsynchronousSocketChannel.open()) {
            channel.connect(new InetSocketAddress("127.0.0.1", 8080));
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            buffer.put("Java AIO".getBytes(StandardCharsets.UTF_8));
            buffer.flip();
            Thread.sleep(1000L);
            channel.write(buffer);
        }
    }
}

运行结果

main AioServer start
Thread-25 client is connected
Thread-24 received:Java AIO 1

查看下线程栈

jstack pid

Windows下的截图:

Linux下的截图:

从Linux角度分析线程栈,可以发现,程序启动了多个线程

  1. 线程 Thread-2 阻塞在 EPoll.wait() 方法上。
  2. 线程 Thread-0,Thread-1。。。Thread-n 从阻塞队列 take() 任务,阻塞等待有任务返回。

在Linux下,AIO 服务端程序启动之后,就开始创建了这些线程,且线程都处于阻塞等待状态。

AIO源码分析

以注册监听 read 为例 clientChannel.read (…),它主要的核心流程是:

1、注册事件 -> 2、监听事件 -> 3、处理事件

注册事件

UnixAsynchronousSocketChannelImpl.java

    // register events for outstanding I/O operations, caller already owns updateLock
    private void updateEvents() {
        assert Thread.holdsLock(updateLock);
        int events = 0;
        if (readPending)
            events |= Net.POLLIN;
        if (connectPending || writePending)
            events |= Net.POLLOUT;
        if (events != 0)
            port.startPoll(fdVal, events);
    }

EPollPort.java

@Override
void startPoll(int fd, int events) {
    // 往 EPoll 添加事件监听
    // EPoll.ctl 是 native 方法
    // update events (or add to epoll on first usage)
    int err = EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
    if (err == ENOENT)
        err = EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
    if (err != 0)
        throw new AssertionError();     // should not happen
}

注册事件调用 EPoll.ctl (…) 函数,这个函数在最后的参数用于指定是一次性的,还是永久性。上面代码 events | EPOLLONSHOT 字面意思看来,是一次性的。

监听事件

EPollPort.java

private class EventHandlerTask implements Runnable {
        private Event poll() throws IOException {
            try {
                for (;;) {
                    int n;
                    do {
                        // 轮询等待事件
                        n = EPoll.wait(epfd, address, MAX_EPOLL_EVENTS, -1);
                    } while (n == IOStatus.INTERRUPTED);

                    /**
                     * 'n' events have been read. Here we map them to their
                     * corresponding channel in batch and queue n-1 so that
                     * they can be handled by other handler threads. The last
                     * event is handled by this thread (and so is not queued).
                     */
                    fdToChannelLock.readLock().lock();
                    try {
                        while (n-- > 0) {
                            // 获取事件
                            long eventAddress = EPoll.getEvent(address, n);
                            int fd = EPoll.getDescriptor(eventAddress);

                            // wakeup
                            if (fd == sp[0]) {
                                if (wakeupCount.decrementAndGet() == 0) {
                                    // consume one wakeup byte, never more as this
                                    // would interfere with shutdown when there is
                                    // a wakeup byte queued to wake each thread
                                    int nread;
                                    do {
                                        nread = IOUtil.drain1(sp[0]);
                                    } while (nread == IOStatus.INTERRUPTED);
                                }

                                // queue special event if there are more events
                                // to handle.
                                if (n > 0) {
                                    queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
                                    continue;
                                }
                                return EXECUTE_TASK_OR_SHUTDOWN;
                            }
                          // 找到对应的 channel
                            PollableChannel channel = fdToChannel.get(fd);
                            if (channel != null) {
                                int events = EPoll.getEvents(eventAddress);
                                Event ev = new Event(channel, events);

                                // n-1 events are queued; This thread handles
                                // the last one except for the wakeup
                                if (n > 0) {
                                    // 往任务队列提交任务
                                    queue.offer(ev);
                                } else {
                                    return ev;
                                }
                            }
                        }
                    } finally {
                        fdToChannelLock.readLock().unlock();
                    }
                }
            } finally {
                // to ensure that some thread will poll when all events have
                // been consumed
                queue.offer(NEED_TO_POLL);
            }
        }

        public void run() {
            // ...处理事件,代码在下面
        }
    }

处理事件

sun.nio.ch.EPollPort.EventHandlerTask#run

public void run() {
    Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
        Invoker.getGroupAndInvokeCount();
    final boolean isPooledThread = (myGroupAndInvokeCount != null);
    boolean replaceMe = false;
    Event ev;
    try {
        for (;;) {
            // reset invoke count
            if (isPooledThread)
                myGroupAndInvokeCount.resetInvokeCount();

            try {
                replaceMe = false;
                // 从队列获取任务
                ev = queue.take();

                // no events and this thread has been "selected" to
                // poll for more.
                if (ev == NEED_TO_POLL) {
                    try {
                        ev = poll();
                    } catch (IOException x) {
                        x.printStackTrace();
                        return;
                    }
                }
            } catch (InterruptedException x) {
                continue;
            }

            // handle wakeup to execute task or shutdown
            if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
                Runnable task = pollTask();
                if (task == null) {
                    // shutdown request
                    return;
                }
                // run task (may throw error/exception)
                replaceMe = true;
                task.run();
                continue;
            }

            // process event
            try {
                // 处理事件
                ev.channel().onEvent(ev.events(), isPooledThread);
            } catch (Error x) {
                replaceMe = true; throw x;
            } catch (RuntimeException x) {
                replaceMe = true; throw x;
            }
        }
    } finally {
        // last handler to exit when shutdown releases resources
        int remaining = threadExit(this, replaceMe);
        if (remaining == 0 && isShutdown()) {
            implClose();
        }
    }
}

sun.nio.ch.UnixAsynchronousSocketChannelImpl#implRead

@Override
    @SuppressWarnings("unchecked")
    <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
                                            ByteBuffer dst,
                                            ByteBuffer[] dsts,
                                            long timeout,
                                            TimeUnit unit,
                                            A attachment,
                                            CompletionHandler<V,? super A> handler)
    {
        // A synchronous read is not attempted if disallowed by system property
        // or, we are using a fixed thread pool and the completion handler may
        // not be invoked directly (because the thread is not a pooled thread or
        // there are too many handlers on the stack).
        Invoker.GroupAndInvokeCount myGroupAndInvokeCount = null;
        boolean invokeDirect = false;
        boolean attemptRead = false;
        if (!disableSynchronousRead) {
            if (handler == null) {
                attemptRead = true;
            } else {
                myGroupAndInvokeCount = Invoker.getGroupAndInvokeCount();
                invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port);
                // okay to attempt read with user thread pool
                attemptRead = invokeDirect || !port.isFixedThreadPool();
            }
        }

        int n = IOStatus.UNAVAILABLE;
        Throwable exc = null;
        boolean pending = false;

        try {
            begin();

            if (attemptRead) {
                if (isScatteringRead) {
                    n = (int)IOUtil.read(fd, dsts, true, nd);
                } else {
                    // 把数据读到 ByteBuffer 中
                    n = IOUtil.read(fd, dst, -1, true, nd);
                }
            }

            if (n == IOStatus.UNAVAILABLE) {
                PendingFuture<V,A> result = null;
                synchronized (updateLock) {
                    this.isScatteringRead = isScatteringRead;
                    this.readScopeHandleReleasers = IOUtil.acquireScopes(dst, dsts);
                    this.readBuffer = dst;
                    this.readBuffers = dsts;
                    if (handler == null) {
                        this.readHandler = null;
                        result = new PendingFuture<V,A>(this, OpType.READ);
                        this.readFuture = (PendingFuture<Number,Object>)result;
                        this.readAttachment = null;
                    } else {
                        this.readHandler = (CompletionHandler<Number,Object>)handler;
                        this.readAttachment = attachment;
                        this.readFuture = null;
                    }
                    if (timeout > 0L) {
                        this.readTimer = port.schedule(readTimeoutTask, timeout, unit);
                    }
                    this.readPending = true;
                    updateEvents();
                }
                pending = true;
                return result;
            }
        } catch (Throwable x) {
            if (x instanceof ClosedChannelException)
                x = new AsynchronousCloseException();
            if (x instanceof ConnectionResetException)
                x = new IOException(x.getMessage());
            exc = x;
        } finally {
            if (!pending)
                enableReading();
            end();
        }

        Number result = (exc != null) ? null : (isScatteringRead) ?
            (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);

        // read completed immediately
        if (handler != null) {
            if (invokeDirect) {
                // 回调
                Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc);
            } else {
                Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc);
            }
            return null;
        } else {
            return CompletedFuture.withResult((V)result, exc);
        }
    }

sun.nio.ch.Invoker#invokeDirect

    static <V,A> void invokeDirect(GroupAndInvokeCount myGroupAndInvokeCount,
                                   CompletionHandler<V,? super A> handler,
                                   A attachment,
                                   V result,
                                   Throwable exc)
    {
        myGroupAndInvokeCount.incrementInvokeCount();
        Invoker.invokeUnchecked(handler, attachment, result, exc);
    }
        
    static <V,A> void invokeUnchecked(CompletionHandler<V,? super A> handler,
                                      A attachment,
                                      V value,
                                      Throwable exc)
    {
        // 执行回调方法
        if (exc == null) {
           
            handler.completed(value, attachment);
        } else {
            handler.failed(exc, attachment);
        }

        // clear interrupt
        Thread.interrupted();
        // ....
    }

核心流程总结

在分析完上面的代码流程后会发现,每一次 IO 读写都要经历的这三个事件是一次性的,也就是在处理事件完,本次流程就结束了,如果想继续下一次的 IO 读写,就得从头开始再来一遍。这样就会存在所谓的死亡回调(回调方法里再添加下一个回调方法),这对于编程的复杂度大大提高了。

监听回调的本质

先说一下结论,所谓监听回调的本质,就是用户态线程,调用内核态的函数(准确的说是 API,例如 read,write,epollWait),该函数还没有返回时,用户线程被阻塞了。当函数返回时,会唤醒阻塞的线程,执行所谓回调函数

系统调用与函数调用

函数调用

找到某个函数,并执行函数里的相关命令,

系统调用

操作系统对用户应用程序提供了编程接口,所谓 API。

系统调用执行过程:

  1. 传递系统调用参数。

  2. 执行陷入指令,用用户态切换到核心态,这是因为系统调用一般都需要再核心态下执行。

  3. 执行系统调用程序。

  4. 返回用户态。

用户态和内核态之间的通信

用户态 -> 内核态,通过系统调用方式即可。

内核态 -> 用户态,内核态根本不知道用户态程序有什么函数,参数是啥,地址在哪里。所以内核是不可能去调用用户态的函数,只能通过发送信号,比如 kill 命令关闭程序就是通过发信号让用户程序优雅退出的。

既然内核态是不可能主动去调用用户态的函数,为什么还会有回调呢,只能说这个所谓回调其实就是用户态的自导自演。它既做了监听,又做了执行回调函数。

IntelliJ IDEA 它是如何监听鼠标、键盘事件和处理事件的?

先打印一下线程栈,会发现鼠标、键盘等事件的监听是由 “AWT-XAWT” 线程负责的,处理事件则是 “AWT-EventQueue” 线程负责。

sun.awt.X11.XToolkit#run(boolean)

    public void run(boolean loop)
    {
        XEvent ev = new XEvent();
        while(true) {
            // Fix for 6829923: we should gracefully handle toolkit thread interruption
            if (Thread.currentThread().isInterrupted()) {
                // We expect interruption from the AppContext.dispose() method only.
                // If the thread is interrupted from another place, let's skip it
                // for compatibility reasons. Probably some time later we'll remove
                // the check for AppContext.isDisposed() and will unconditionally
                // break the loop here.
                if (AppContext.getAppContext().isDisposed()) {
                    break;
                }
            }
            awtLock();
            try {
                if (loop == SECONDARY_LOOP) {
                    // In the secondary loop we may have already acquired awt_lock
                    // several times, so waitForEvents() might be unable to release
                    // the awt_lock and this causes lock up.
                    // For now, we just avoid waitForEvents in the secondary loop.
                    if (!XlibWrapper.XNextSecondaryLoopEvent(getDisplay(),ev.pData)) {
                        break;
                    }
                } else {
                    callTimeoutTasks();
                    // If no events are queued, waitForEvents() causes calls to
                    // awtUnlock(), awtJNI_ThreadYield, poll, awtLock(),
                    // so it spends most of its time in poll, without holding the lock.
                    while ((XlibWrapper.XEventsQueued(getDisplay(), XConstants.QueuedAfterReading) == 0) &&
                           (XlibWrapper.XEventsQueued(getDisplay(), XConstants.QueuedAfterFlush) == 0)) {
                        callTimeoutTasks();
                        // 监听事件,线程阻塞
                        waitForEvents(getNextTaskTime());
                    }
                    XlibWrapper.XNextEvent(getDisplay(),ev.pData);
                }

                if (ev.get_type() != XConstants.NoExpose) {
                    eventNumber++;
                }
                if (awt_UseXKB_Calls && ev.get_type() ==  awt_XKBBaseEventCode) {
                    processXkbChanges(ev);
                }

                if (XDropTargetEventProcessor.processEvent(ev) ||
                    XDragSourceContextPeer.processEvent(ev)) {
                    continue;
                }

                if (eventLog.isLoggable(PlatformLogger.Level.FINER)) {
                    eventLog.finer("{0}", ev);
                }

                // Check if input method consumes the event
                long w = 0;
                if (windowToXWindow(ev.get_xany().get_window()) != null) {
                    Component owner =
                        XKeyboardFocusManagerPeer.getInstance().getCurrentFocusOwner();
                    if (owner != null) {
                        XWindow ownerWindow = AWTAccessor.getComponentAccessor().getPeer(owner);
                        if (ownerWindow != null) {
                            w = ownerWindow.getContentWindow();
                        }
                    }
                }
                if (keyEventLog.isLoggable(PlatformLogger.Level.FINE) && (
                        ev.get_type() == XConstants.KeyPress
                                || ev.get_type() == XConstants.KeyRelease)) {
                    keyEventLog.fine("before XFilterEvent:" + ev);
                }
                if (XlibWrapper.XFilterEvent(ev.getPData(), w)) {
                    continue;
                }
                if (keyEventLog.isLoggable(PlatformLogger.Level.FINE) && (
                        ev.get_type() == XConstants.KeyPress
                                || ev.get_type() == XConstants.KeyRelease)) {
                    keyEventLog.fine(
                            "after XFilterEvent:" + ev); // IS THIS CORRECT?
                }

                dispatchEvent(ev);
            } catch (ThreadDeath td) {
                XBaseWindow.ungrabInput();
                return;
            } catch (Throwable thr) {
                XBaseWindow.ungrabInput();
                processException(thr);
            } finally {
                awtUnlock();
            }
        }
    }

Java AIO本质

1、由于内核态无法直接调用用户态函数,Java AIO 的本质,就是只在用户态实现异步。并没有达到理想意义上的异步。

理想中的异步。

何谓理想意义上的异步?这里举个网购的例子

两个角色,消费者 A,快递员 B

  • A 在网上购物时,填好家庭地址付款提交订单,这个相当于注册监听事件

  • 商家发货,B 把东西送到 A 家门口,这个相当于回调。

A 在网上下完单,后续的发货流程就不用他来操心了,可以继续做其他事。B 送货也不关心 A 在不在家,反正就把货扔到家门口就行了,两个人互不依赖,互不相干扰

假设 A 购物是用户态来做,B 送快递是内核态来做,这种程序运行方式过于理想了,实际中实现不了。

现实中的异步

A 住的是高档小区,不能随意进去,快递只能送到小区门口。

A 买了一件比较重的商品,比如一台电视,因为 A 要上班不在家里,所以找了一个好友 C 帮忙把电视搬到他家。
A 出门上班前,跟门口的保安 D 打声招呼,说今天有一台电视送过来,送到小区门口时,请电话联系 C,让他过来拿。

  • 此时,A 下单并跟 D 打招呼,相当于注册事件。在 AIO 中就是 EPoll.ctl (…) 注册事件。

  • 保安在门口蹲着相当于监听事件,在 AIO 中就是 Thread-0 线程,做 EPoll.wait (…)

  • 快递员把电视送到门口,相当于有 IO 事件到达。

  • 保安通知 C 电视到了,C 过来搬电视,相当于处理事件。

在 AIO 中就是 Thread-0 往任务队列提交任务,

Thread-1 ~n 去取数据,并执行回调方法。

整个过程中,保安 D 必须一直蹲着,寸步不能离开,否则电视送到门口,就被人偷了。

好友 C 也必须在 A 家待着,受人委托,东西到了,人却不在现场,这有点失信于人。

所以实际的异步和理想中的异步,在互不依赖,互不干扰,这两点相违背了。保安的作用最大,这是他人生的高光时刻。

异步过程中的注册事件、监听事件、处理事件,还有开启多线程,这些过程的发起者全是用户态一手操办,所以说 Java AIO 只在用户态实现了异步,这个和 BIO、NIO 先阻塞,阻塞唤醒后开启异步线程处理的本质一致。

2、Java AIO 跟 NIO 一样,在各个平台的底层实现方式也不同,在 Linux 是用 EPoll,Windows 是 IOCP,Mac OS 是 KQueue。原理是大同小异,都是需要一个用户线程阻塞等待 IO 事件,一个线程池从队列里处理事件。

3、 Netty 之所以移除掉 AIO,很大的原因是在性能上 AIO 并没有比 NIO 高。Linux 虽然也有一套原生的 AIO 实现(类似 Windows 上的 IOCP),但 Java AIO 在 Linux 并没有采用,而是用 EPoll 来实现。

4、 Java AIO 不支持 UDP。

5、 AIO 编程方式略显复杂,比如 “死亡回调”

事件驱动编程实现

事件驱动框架是一种设计应用软件架构和模型,可以最大程度减少耦合度,通常可以说是观察者设计模式。在Java中,对于事件驱动编程,一般都是GUI的开发,比如Swing和Android的EventBus。因为GUI应用程序不得不面对大量的输入输出,引入了基于消息循环的事件驱动机制。

系统中断也是一种事件驱动。网络层基于系统底层的网络中断,因此在等待的同时,可以将CPU交给其他不需要阻塞的线程去使用。

事件驱动是一种映射方式。本质上和线程一样,都是调度代码执行的方式。事件驱动比较擅长处理的问题是异步问题,也叫异步事件,就是事件的发生和不受执行流程的影响;而线程适合处理事务之间明显存在因果关系或者相关性很强的场合。

线程,准确来说,是分时调度或抢占式调度机制通过模拟一个个独立个体来表现并发性;而事件驱动则是和个体以外的其他个体交互表现出来的。

相比其线程而言,更适合处理大量的随机事件,避免了引入大量线程可能导致的额外开销,以及同步的问题。但是相对于线程这种顺序的处理方式,事件驱动需要维护额外的状态来对不确定的事件进行处理,相对与线程增加了复杂性,特别是事件处理过程之间本来是具有先后关系的情况下,这在现实问题中可能是常见的情景。

  • 套接字文件描述符FD
  • 向操作OS注册关注事件(文件的可读可写是两个不同的事件)
  • 读/写事件就绪逻辑

Apache 使用的是多线程模型

  • 多线程支持并发(上下文切换成本)
  • 事件驱动支持并发

浏览器的事件机制

如何获取数据实时变化,使用轮询。

绝大部分的JS代码是运行在浏览器的JS主线程中,这个主线程的核心就是事件循环。Event Loop的实现,可以简单地认为也是while true,不断从宏任务队列和微任务队列中取出要执行的函数,取到了就压入栈中执行。宏任务包括DOM事件,网络IO,定时器等;微任务包括Promise,MutationObserver等。

前端要知道用户输入数据或者DOM的实时变化,通过浏览器提供的API注册EventListener即可,Vue,React这类MVVM框架提供了更完善的数据驱动机制,但其实框架的实现也需要注册EventListener。事件回调机制看似不是在轮询,变化直接调用回调函数,但是底层是依赖浏览器的Event Loop轮询任务队列实现的。

前端需要后端的实时数据变化,在没有WebSocket的年代,会使用长轮询(Long-Polling)的机制,服务端在变化发生时,用当前保持的HTTP连接返回变化的数据,浏览器收到网络IO事件放入宏任务队列,下次轮询触发回调;在WebSocket之后,长轮询用于一直保持HTTP连接的无限循环被干掉,但是WebSocket的IO事件由Event Loop接管,区别只是少了一个循环,节约了一些无所谓的HTTP请求响应的开销。

评论