什么是并发 Part 3:协程与事件循环
目录
本文中使用了 Lua 作为概念展示所使用的语言。
如果对文中使用的 Lua 语法有不理解的地方,可以在Lua 速查手册一文中查找对应的解释。
常见的并发模型
多进程是最简单的并发模型。
操作系统想要提供丰富的功能就不得不采用多进程进行并发。设备的 CPU 以很快频率在 不同的进程之间切换,相对均匀地推进不同进程的执行,来达到好像很多进程在同时向用户 提供功能的效果。
多线程,则是相对于多进程更加轻量的模式。在进程休眠时,操作系统需要对当前的进程的 内存状态、寄存器内容等进行保存;进程恢复执行时又需要将保存下来的内容重新加载出来。 这一过程本身耗时,还会因为重置了 CPU 缓存的内存内容,导致刚完成切换时进程执行 效率降低。
线程是由进程创建的,属于进程。一个进程下属的所有线程共享进程的内存, 光是在线程之间切换时不需要重新架设内存环境,就已经足够使其作为并发机制在效率上 高过进程了。
线程通常被分为内核级线程、用户级线程两种。
内核级线程的创建、维护、调度是由操作系统内核进行的,进程自身是不具备将任务派遣到某 一个 CPU 核心上的能力的,因此想要真正利用上多核心的优势,就一定会需要使用到内核级 线程。
用户级的线程的操作则完全由用户侧的线程库来实现,通常使用的是 M:N 模型,即 M 条用 户空间的线程使用 N 条内核线程执行。用户库创建内核级线程后,负责将不同的任务派遣 到这些线程上,而操作系统负责将这些内核级线程派遣到不同的 CPU 核心上。这一过程的 关键在于任务被重新派遣时,是不需要进行系统调用的。系统调用触发中断后,进程将执行 权限返还给操作系统,操作系统未必会立即进行系统调用的响应,也不能保证何时会回到 被中断的任务的执行中去。因此,对高并发量有要求的进程,通常都会使用用户级线程来 规避系统调用的发生。
随着网络技术的发展,人们对从服务器到大量客户端的高并发数据流动的要求越来越高。 协程这一概念,成为了越来越多的语言中标配的内容。
协程,虽然并不是一个硬性要求,但是大多数时候都是使用 M:N 用户线程来进行实现的。 大家通常会说协程是“可以暂停执行的函数”,非常适合用于进行合作式多任务处理的实现。 什么叫合作式?在使用协程进行多任务处理时,常常会听到大家警告“不要使用阻塞调用”, 因为协程任务是运行在用户线程上的(甚至有时整个应用是单线程),协程上一旦出现阻塞 式的函数调用,其所在的线程就会被阻塞,正确的处理方式是在感知到阻塞将会发生时,就 把当前线程的使用权让度给其它任务。这种每个任务单独对自身的行为进行规范,主动避免 阻塞的模式,就是合作式的并发。
本文将会以 Lua 中的协程为例,介绍在协程并发实现中常用的事件循环模型。
Lua 语言中的协程
Lua 语言提供了一种称为 thread 的变量类型,这种变量可以看做是 Lua 解释器虚拟机 上的线程。thread 变量可以主动暂停执行、可以从之前暂停的位置继续其执行、不主动放弃 执行权限就会一直占据 CPU……这些特征都说明了它是协程,是 Lua 虚拟机这个单核设备上的 协程。
创建一个协程很简单:
local function task(name)
print("hello " .. name)
end
local co = coroutine.create(task)
print(coroutine.status(co))
coroutine.resume(co, "coroutine")
print(coroutine.status(co))
创建协程时只需要一个函数作为参数,这个函数就是新创建的协程的目标任务。协程一共有 4 种可能的状态:
running,该协程占据执行权,正在执行suspended,该协程放弃了执行权,将执行权传递给了上级协程,处于休眠状态normal,当前协程将执行权传递给了下级协程,正在等待下级协程返还执行权dead,协程已经完成了任务函数的执行,或者在执行过程中出现了错误无法继续
在本文中,当一个协程将自己的执行权传递给另一个协程时,交出执行权的协程会被称作是 上级协程,接受执行权的协程会称作下级协程。
Lua 脚本在开始执行时,就会自动创建并开始主协程,当主协程结束时不论还有没有其它协程 尚未结束任务,脚本的执行都会结束。
执行权与数据的流动
所有的协程在被创建时,都是处于 suspended 状态,在协程之间传递执行权的函数是:
coroutine.resume(co, ...),将执行权传给协程对象co,并向其传递指定的参数。coroutine.yield(...),将执行权返还给当前协程的上级,并向其传递指定的参数。
这两个函数的参数与返回值都很有意思,假设有两协程 A、B,下面的时序图表示的是
coroutine.resume 与 coroutine.yield 的参数、返回值的流动方式:
sequenceDiagram
A->>A: coroutine.resume(B, arg_a1, arg_a2)
A->>B: 传递执行权与参数 arg_a1、arg_a2,B 首次开始执行
B->>B: 将 arg_a1、arg_a2 作为 B 的任务函数的调用参数启动执行
B->>B: coroutine.yield(arg_b1)
B->>A: 返还执行权,并传递参数 arg_b1
A->>A: 从 coroutine.resume 调用中返回<br/>返回值是 B 执行是否成功的标记与 arg_b1
A->>A: coroutine.resume(B, arg_a3)
A->>B: 传递执行权与参数 arg_a3
B->>B: 从 coroutine.yield 调用中返回<br/>返回值是 arg_a3
使用协程可以实现一个可以无穷执行的非波那契数列生成器:
local function fib_sequence()
local cur, next = 0, 1
while true do
cur, next = next, cur + next
coroutine.yield(cur)
end
end
local gen = coroutine.create(fib_sequence)
for _ = 1, 10 do
local _, value = coroutine.resume(gen, 1)
print(value)
end
协程 I/O
local function print_file(filename)
local file = io.open(filename, "r")
if not file then return end
local data = file:read("a")
file:close()
print(data)
end
local function main()
local files = { "data_1.txt", "data_2.txt", "data_3.txt" }
for _, file in ipairs(files) do
local co = coroutine.create(print_file)
coroutine.resume(co, file)
end
end
main()
上述代码中,print_file 所在的协程会因为打开文件、读取文件内容时 CPU 对硬盘的等待
阻塞代码的执行。如果在等待对一个文件的操作时,可以挂起被阻塞的协程,切换到对另一
个文件的处理上,就能提高硬件性能的利用率。
设想现在我们有一个新的 magic_file 类型,对它发起打开、读取等操作后,用户可以选择
时机对操作的完成度进行检查,magic_file 能在发起检查时返回自身状态,而不是硬性
要求用户等待操作完成。这样一来,用户就可以根据不同任务的优先程度,自行决定如何利用
等待的时间。
比如:
local magic_file = magic_open(filename)
-- ...
-- 只在数据立即得到返回时进行输出,否则放弃文件访问
magic_file:read("a")
if magic_file:finished() do
print(magic_file:content())
else
magic_file:close()
end
使用这样的机制,我们就可以实现一个非阻塞的协程并发:
local function print_file()
local magic_file = magic_open(filename)
magic_file:open()
while not magic_file:finished() do
-- 操作还在等待,将执行权返还给上级协程
coroutine.yield()
end
magic_file:read("a")
while not magic_file:finished() do
-- 操作还在等待,将执行权返还给上级协程
coroutine.yield()
end
print(magic_file:content())
end
local function main()
local files = { "data_1.txt", "data_2.txt", "data_3.txt" }
local co_tbl = {}
for _, file in ipairs(files) do
-- 为所有文件创建读取任务的协程
local co = coroutine.create(print_file)
co_tbl[co] = file
end
-- 循环到所有任务都完成
local is_not_empty = true
while is_not_empty do
is_not_empty = false
-- 遍历任务,寻找当前不需要等待 I/O 操作协程
for co, file in pairs(co_tbl) do
is_not_empty = true
coroutine.resume(co, file)
if coroutine.status(co) == "dead" then
co_tbl[co] = nil
end
end
end
end
main()
在新的 print_file 实现中,如果发现文件的操作请求还没有完成,print_file 就会
主动将执行权返还给上级协程,也就是返还给 main 函数所在的主协程。
这样我们就实现了一个非阻塞的 I/O 模型。
在 main 函数中,我们为每个目标文件都创建了一个协程,这些协程就是我们的 main
函数中需要进行的所有任务。在此处我们要做的事有两件:
- 让主协程在执行完所有这些任务之前,都不要进入
dead状态。 - 让执行权在所有尚未完成的任务之间流动,一旦有任务返还了执行权就立即将执行权传递 给下一个任务。
这就是事件循环。
关于 magic_file
那么 magic_file 应该怎么实现呢?这通常需要将 Lua 与外部提供的 C 语言库协同使用,
比如 libuv 就有 Lua 绑定,这个库也正是 NodeJS
用来实现它的事件循环的库。
这些库通常都会提供一个线程池,我们的 magic_file 需要进行操作时,就将 I/O 任务
发送到外部库的线程池上。线程池完成操作后便更改 magic_file 里的操作完成标记,
Lua 脚本就可以在不阻塞自身执行的情况下,感知到 I/O 任务的完成了。
实例
COPAS 是一个小巧的 Lua 库,它提供了相对完整的异步 I/O 架构,提供的了异步的 ftp、 http、smtp 支持。读者可以在这里 找到其源代码。
其基本的使用方式如下:
local copas = require "copas"
copas.addthread(function()
print("hello copas")
end)
copas.loop()
我们主要看 COPAS 的以下几个方面:
- 任务管理是如何进行的
- 执行权是如何流动的
- 非阻塞 I/O 是如何进行的
COPAS 提供 copas.addnamedthread 与 copas.addthread 函数用于创建任务。每一个任务都是
一个协程,两个函数创建的任务除了一个可以用名称识别,一个不能之外,没有任何区别。
所有的任务被分成等待读取、等待写入、等待唤醒、立即可执行几个任务队列。
任务在将执行权归还给上级协程时,可以通过 coroutine.yield 调用的参数指定接下来
自身希望去到什么队列中进行等待。
COPAS 中的几个任务队列在模块外是不可访问的,为了实现任务状态的变换,模块对外提供
了用于进行特定状态变化操作的函数,比如将函数移动到等待唤醒队列的 copas.pause
函数的实现如下:
-- yields the current coroutine and wakes it after 'sleeptime' seconds.
-- if sleeptime < 0 then it sleeps 0 seconds.
function copas.pause(sleeptime)
if sleeptime and sleeptime > 0 then
coroutine.yield(sleeptime, _sleeping)
else
coroutine.yield(0, _sleeping)
end
end
而 copas.addnamedthread 函数实现中,创建协程的部分如下:
function copas.addnamedthread(name, handler, ...)
-- ...
local thread = coroutine.create(function(_, ...)
copas.pause()
return handler(...)
end)
_doTick(thread, nil, ...)
return thread
end
这里的 handler 就是用户提供的任务函数。可以看到 COPAS 在创建协程时,并不是直接
将用户的任务函数直接分配给协程,每个协程实际获得的任务函数的第一步一定是 copas.pause。
COPAS 就是这样将用户创建的任务纳入到模块所提供的调度系统中(尽管这个调度系统很简单)。
上面的片段里的 _doTick 函数是对任务协程的一次 resume 调用。_doTick 最重要的
功能是根据任务通过 yield 返回的值将任务放入到不同的等待队列中,实现中的相关
部分如下:
local function _doTick(co, skt, ...)
-- ...
local ok, res, new_q = coroutine.resume(co, skt, ...)
if new_q == _reading or new_q == _writing or new_q == _sleeping then
-- we're yielding to a new queue
new_q:insert(res)
new_q:push(res, co)
return
end
-- ...
end
COPAS 任务创建的调用流程如下图所示:
sequenceDiagram
participant 用户
participant addthread
participant _doTick
participant 等待唤醒队列
用户->>addthread: 提供任务函数,创建新任务
addthread->>addthread: 为任务函数添加带有 copas.pause 的包裹<br/>创建协程
addthread->>_doTick: 对协程进行一次 coroutine.resume
_doTick->>_doTick: 协程因为 copas.pause 中断<br/>收到等待唤醒队列作为协程去处
_doTick->>等待唤醒队列: 放入任务协程
_doTick->>addthread: 返回
addthread->>用户: 返回新创建的协程对象
copas.loop() 是 COPAS 的事件循环函数,作用与上一节的样例中提供的 main 函数功能
一致,该函数会不断循环遍历当前活跃中的所有任务:
function copas.loop(initializer, timeout)
-- ...
while true do
copas.step(timeout)
if copas.finished() then
if copas.exiting() then
break
end
copas.exit()
end
end
-- ...
end
copas.step 函数会遍历前述的 4 种不同任务队列,可立即执行的队列、等待读取、等待写入
队列中的任务,都是已经达到执行条件的,都会立即进行 _doTick 调用;。
而等待唤醒队列,是一个二项堆队列,队列中的各个任务分别记录了自己需要被唤醒
的时间。每当 copas.step 被调用时,所有已经达到可唤醒时间的任务就会被移到可立即
执行队列,等待在下一次 copas.step 调用时被推进。
等待读取、等待写入队列是用于处理网络 I/O 的, 这两个队列会在 copas.step 每次调用
时通过 _select 函数来填充内容:
function copas.step(timeout)
if not _resumable:done() then
timeout = 0
else
timeout = math.min(_sleeping:getnext(), timeout or math.huge)
end
local err = _select(timeout)
-- ...
end
_select 的作用是等待指定的超时时长,观察给定的一组 socket 的可读、可写状态是否
能在这段时间内发生变化,并将已经变为可读或可写状态的 socket 放入到等待读取、等待
写入队列中。
如果当前有需要立即执行的队列,_select 的等待时间就是 0,这意味着只分类当下已经是
可读、可写状态的 socket 而不进行状态变化的等待;如果当前没有需要立即执行的任务,
_select 就等待到下一个等待唤醒的任务应该被唤醒的时间。
_select 所监控的 socket 列表来自于用户通过 COPAS 主动进行的网络消息传递,
在每次网络消息从应用层传递到传输层后,COPAS 就会将 socket 放入到临近列表中,不会
等待 socket 的响应。交由后续的 _select 调用间歇进行 socket 的状态进行检查。
这样一来,COPAS 就可以同时使用大量通信 socket 而不会出现 I/O 阻塞了。
尾声
协程,是单个进程内的协作式多任务模型。每一个任务的推进过程中都需要注意不对其所在 的线程造成阻塞。一旦当任务进行到需要等待其它资源响应的位置时,就主动放弃执行权, 将线程让给其它无需等待的任务。
在这一过程中,需要有一个上级的管理协程不断在所有的任务之间进行切换,检查各个任务 所等待的资源是否已经得到了响应,并对已经集齐所需资源的任务进行推进。 这样的操作被称做是轮询式异步。
很多语言都提供了这样的异步组件,大家处理的问题基本上都是:
- 如何使用数据表示一个进行到一半就被暂停的任务
- 如何判断一个任务是否已经拿到了继续执行所需要的所有资源
- 在结束一个任务的执行后,选择哪一个任务作为下一个任务
在不同的平台上,这几个决策可能根据 CPU 核心的多少、可用内存的大小,采取不同的方案。