抱歉,您的浏览器无法访问本站

本页面需要浏览器支持(启用)JavaScript


了解详情 >

协程

概念

协程是比线程, 进程更轻量的执行调度单位。

协程正在成为后台开发领域的标准技术栈。

语言层面 : Golang, Python3, C++20, Kotlin

框架层面 : Tornado(Python), LIBCO(C++), Swoole(PHP), Quasar(JavaScript)

用户态和内核态

管理计算机资源风险是很高的. 资源包括: 处理器, IO设备, 存储器, 文件等。

内核态: 运行内核相关的程序

用户态: 用户自己编写的程序

Linux设计哲学

  • 对不同的操作赋予不同的执行等级
  • 与系统相关的一些特别关键的操作必须由最高特权的程序来完成

四级特权, 内核态是0级, 用户态是3级.

内核态

  • 内核空间: 存放的是内核代码和数据
  • 进程执行操作系统内核的代码
  • CPU可以访问内存所有数据, 包括外围设备

用户态

  • 用户空间: 存放的是用户程序的代码和数据
  • 进程在执行用户自己的代码(非系统调用之类的函数)
  • CPU只可以访问有限的内存, 不允许访问外设

用户态切换到内核态 有三种方式

  • 系统调用.
  • 异常中断. 执行用户态程序时, 发生不可预估的异常事件.
  • 外围设备中断. 磁盘(当DMA数据准备好了, 就会先CPU发出中断信号), 网卡等外围设备的中断.

计算密集型和IO密集型

sysstat工具

# 安装
sudo apt install sysstat
# 验证是否安装
sar -h
# 基本使用
# 每5s统计一次, 统计10次
sar -u 5 10
# %user 用户空间cpu占用比
# %nice 表示的是优先级
# %system 内核空间占用的cpu百分比
# %iowait  cpu等待io完成的时间占比
# %steal 虚拟机相关的cpu状态
# %idle cpu空闲的时间占比

sar -b 5 10
# tps io传输总量. 等于 rtps + wtps
# rtps 读
# wtps 写
# dtps
# bread/s 从物理设备读入的数据量. 单位是 block, 块
# bwrtn/s 向物理设备写入的数据量
# bdscd/s

计算密集型

  • 也称为ieCPU密集型(CPU bound)
  • 完成一项任务的时间取决于CPU的速度
  • CPU利用率高, 其他事情处理慢
  • 游戏画面渲染, AI模型训练, 视频解码, 图片卷积处理

IO密集型

  • 频繁读写网络, 磁盘等任务都属于IO密集型任务
  • 完成一项任务的时间取决于IO设备的速度
  • CPU利用率低, 大部分时间在等待设备完成
  • Web应用(网络读写), 下载工具, 复制粘贴

线程与进程

线程的实现方式

实现方式的关键是管理线程发生的位置.

  • 内核支持线程
  • 用户级线程
  • 组合方式线程

内核支持线程(KST)

内核态线程和用户态线程是一对一的关系.

  • 无论是线程的创建, 撤销, 状态的切换都由内核态完成的. 操作系统本身也使用多线程.
  • 调度灵活, 因为和操作系统使用的线程是一致的
  • 内核线程切换成本高(平时更多逻辑集中到用户态)

用户级线程

  • 线程的创建, 销毁, 线程间通信都发生在用户态, 不需要内核态的支持

操作系统有专门的区域管理进程和线程. 相关结构就是进程控制块(PCB)和线程控制块(TCB).

因为用户态线程没有在内核态创建TCB, 所以内核态是感知不到用户级线程存在的.

所以, 从这种模型角度去看的话, 内核态线程与用户态线程是多对一的关系.

  • 由用户自行调度, 内核无法干涉
  • 内核线程阻塞, 所有线程无法运行
    • 例如文件的读写, 要经过系统调用, 这时候逻辑就会从用户态切换到内核态. 而磁盘的速度要比CPU慢很多, 所以会阻塞, 内核线程无法感知到用户线程的存在, 所以即使阻塞了, 也无法调度.

组合方式线程

多对多关系, 解决了用户级线程和内核支持线程的问题.

常见编程语言的线程模型

C/C++, 一对一的关系

  • 操作系统由C/C++实现
  • 使用原生thread

Java

  • Thread,Runnable对象
  • JVM 封装了操作系统的Thread

JavaScript

  • JavaScript是单线程的
  • 通过async,yield等关键字也实现了用户级线程

Golang

  • 典型的组合方式。
  • 高效的G-P-M 模型。G-协程, 用户级线程; M-内核级线程; P-调度G和M的关系。
  • 性能强大,并发效率高

Python

  • 组合方式,提供Thread线程对象
  • yield,await,async等关键字
  • asyncio等协程库

进程与线程的上下文切换

进程控制块:

  1. 标识符
  2. 状态
  3. 优先级
  4. 程序计数器
  5. 内存指针
  6. 上下文数据
  7. IO状态信息
  8. 记账信息

上下文类型:

  • 寄存器级上下文
  • 用户级上下文
  • 系统级上下文

进程切换大致流程:

  1. 准备就绪进程运行数据
  2. 保存当前进程运行状态
  3. 迁出当前进程数据
  4. 迁入就绪进程数据
  5. 恢复就绪进程上一次运行状态
  6. 就绪进程开始运行

上下文切换成本

sar -w 1 10

据统计,上下文切换2us ~ 5us左右。比如:8核的机器,如果每秒钟40000次切换,那么 5us * 40000 / 1000 / 8 = 25ms。

也就是说,每一核每秒钟有25ms是浪费在上下文切换的.

进程的线程共享进程资源。

  • 同一进程的多线程直接切换
  • 不同进程的线程之间切换(进程切换)

协程本质

协程: Coroutine

  • 微线程
  • 纤程
  • 协作式线程

特点

  • 比线程更小的粒度
  • 运行效率更高
  • 可以支持更高的并发

协程叫做协作式线程的原因

  • 由用户自行调度, 内核无法干涉
  • 两个用户级线程,协作式运行,并且相互让步

所以协程的本质就是 用户级线程

区分: 管理方式的位置。

一般提到线程都是指 内核级线程。

优缺点

  • 调度, 切换, 管理更加轻量
  • 内核无法感知协程的存在

解释型语言的多线程

解释型语言: Python, Ruby, Php, Lua, JavaScript, Perl, Shell

  • Ruby 解释器拥有和Python类似的GIL, 伪多线程
  • Php 默认不支持多线程; 安装额外C扩展以支持多线程
  • Lua 只支持单线程; 可以使用多进程
  • Perl 多线程在 Linux 是通过多个 Process 实现的
  • Shell 没有线程的概念
  • JavaScript 是单线程的; 异步接口很成熟

Java中的协程

目前Java还没正式支持协程的,JEP已经提出支持了。现在已经有EA版本了,下载地址:http://jdk.java.net/loom/

下面测试下线程和协程的性能,测试代码如下:

package org.lgq;

import java.io.*;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 词频统计
 */
public class Main {

    public static void main(String[] args) {
        Main main = new Main();
        // 单线程
//        main.singleThread();
        float totalTime = 0f;
        for (int i = 0; i < 10; i++) {
            // 多线程
            System.out.println();
            float costTime = main.commonWordCount((fileName, start, end, result) -> Thread.ofPlatform().start(new WordCountTask(fileName
                    , start, end, result)));
            // 协程
//            System.out.println();
//            float costTime =
//                    main.commonWordCount((fileName, start, end, result) -> Thread.startVirtualThread(new WordCountTask(fileName,
//                            start, end, result)));
            totalTime += costTime;
        }
        System.out.println("平均时间:" + totalTime / 10f);
    }

    record WordCountTask(String fileName, long start, long end, List<Map<String, Integer>> result) implements Runnable {
        @Override
        public void run() {
            wordCount(fileName, start, end, result);
        }
    }

    private static void wordCount(String fileName, long start, long end, List<Map<String, Integer>> result) {
        try (var channel = new RandomAccessFile(fileName, "rw").getChannel()) {
            var mBuf = channel.map(
                    FileChannel.MapMode.READ_ONLY,
                    start,
                    end - start
            );
            var str = StandardCharsets.US_ASCII.decode(mBuf).toString();
            result.add(countByString(str));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @FunctionalInterface
    interface TaskFun {
        Thread apply(String fileName, long start, long end, List<Map<String, Integer>> result);
    }

    /**
     * 统计词频公共逻辑
     */
    public float commonWordCount(TaskFun taskFun) {
        var fileName = "word";
        var chunkSize = 1024 * 1024 * 2;
        var file = new File(fileName);
        var fileSize = file.length();
        long position = 0;
        var startTime = System.currentTimeMillis();
        var totalMap = new ConcurrentHashMap<String, Integer>();
        List<Thread> allThread = new ArrayList<>();
        List<Map<String, Integer>> result = Collections.synchronizedList(new ArrayList<>());
        while (position < fileSize) {
            var next = Math.min(position + chunkSize, fileSize);
            final long start = position;
            allThread.add(taskFun.apply(fileName, start, next, result));
            position = next;
        }
        allThread.forEach(fiber -> {
            try {
                fiber.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        });
        System.out.format("split to %d tasks\n", result.size());
        result.forEach(map -> map.forEach((k, v) -> incKey(k, totalMap, v)));
        float costTime = (System.currentTimeMillis() - startTime) / 1000f;
        System.out.println("cost time: " + costTime + "s");
        System.out.println("ababb: " + totalMap.get("ababb"));
        System.out.println("total: " + totalMap.size());
        return costTime;
    }

    private static Map<String, Integer> countByString(String str) {
        var map = new ConcurrentHashMap<String, Integer>();
        StringTokenizer tokenizer = new StringTokenizer(str);
        while (tokenizer.hasMoreTokens()) {
            var word = tokenizer.nextToken();
            incKey(word, map, 1);
        }
        return map;
    }

    private static void incKey(String key, ConcurrentHashMap<String, Integer> map, Integer n) {
        if (map.containsKey(key)) {
            map.put(key, map.get(key) + n);
        } else {
            map.put(key, n);
        }
    }

    /**
     * 单线程
     */
    public void singleThread() {
        System.out.println("单线程处理开始");
        var fileName = "word";
        var chunkSize = 1024 * 1024 * 2;
        var file = new File(fileName);
        var fileSize = file.length();
        long position = 0;
        var startTime = System.currentTimeMillis();
        var totalMap = new ConcurrentHashMap<String, Integer>();
        List<Map<String, Integer>> result = Collections.synchronizedList(new ArrayList<>());
        while (position < fileSize) {
            var next = Math.min(position + chunkSize, fileSize);
            final long start = position;
            wordCount(fileName, start, next, result);
            position = next;
        }
        System.out.format("split to %d tasks%n", result.size());
        result.forEach(map -> map.forEach((k, v) -> incKey(k, totalMap, v)));
        System.out.println("cost time: " + (System.currentTimeMillis() - startTime) / 1000f + "s");
        System.out.println("ababb: " + totalMap.get("ababb"));
        System.out.println("total: " + totalMap.size());
    }

    /**
     * 生成测试数据
     *
     * @throws IOException
     */
    public void gen() throws IOException {
        var fileName = "word";
        var bufferSize = 4 * 1024;
        try (var fileOut = new BufferedOutputStream(new FileOutputStream(fileName), bufferSize)) {
            Random rand = SecureRandom.getInstanceStrong();
            for (int i = 0; i < 1_000_000_000; i++) {
                for (int j = 0; j < 5; j++) {
                    fileOut.write(97 + rand.nextInt(5));
                }
                fileOut.write(' ');
            }
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        }
    }

}

编译运行

# 编译
D:\SDK\JDK\jdk-19\bin\javac.exe --enable-preview --release 19 org/lgq/Main.java
# 运行
D:\SDK\JDK\jdk-19\bin\java.exe --enable-preview -Xmx30720m -Xms30720m org.lgq.Main

多线程测试结果

多协程测试结果

可以看到,协程的性能更优,内存占用更低。

评论