ltask :Lua 的多任务库

云风 2021-02-07 17:22

ltask 是我前两周实现的一个 lua 的多任务库

这个项目复用了我之前的一个类似项目的名字。目的是一样的,但是我做了全新的设计。所以我干脆将以前的仓库移除,以同样的名字创建了新的仓库。

和之前的版本设计不同。在消息通讯机制上,这个更接近 skynet 的模型,但它是一个库而不是一个框架。调度模块是按前段时间的想法实现的,不过只做了一个初步的模型,细节上还有一些工作待完善。

它的 C 部分提供的库被分为四个部分:

  1. ltask
  2. ltask.bootstrap
  3. ltask.exclusive
  4. ltask.root

如果用官方的 Lua 解析器作为入口,那么入口代码只可以使用 ltask.bootstrap 这个子库。它提供的 api 可以完成一系列框架的搭建工作。可以视为把 skynet 设置和启动线程的部分,以库的形式提供了出来,方便用 lua 代码驱动。

所有设定工作完成后,ltask.bootstrap.run 这个 api 会启动调度器并阻塞住主线程,之后的一切任务都是由设定完成的工作线程驱动。

和 skynet 类似,一切任务( task )都被放在 service 中完成。每个 service 是一个独立的 lua 虚拟机。它们均配有一个和外界通讯的通信管道。我们用 32bit 来标识服务及所属的通信管道。和 skynet 不同,内核不负责管理服务的名字。

0 号 id 是保留的,可以用来表示无效的服务或系统服务。1 号服务是一个特殊的 root 服务,它会有一些特权。我计划保留 2-1023 号服务,用来做特定的系统任务。例如,名字解析,timer ,socket 等等。这样,一些必要的服务就可以直接用固定的数字 id 而不需要起字符串名字。

ltask.bootstrap 里有一系列 api 可以用来做上面的配置工作:启动服务但不运行。

和 skynet 不同,我将服务分为两种:shared 共享和 exclusive 独占。

skyent 的服务可以全部视为共享服务,它们共享 N 个工作线程。skynet 中也有独立的线程去完成 timer 和 socket monitor 等工作。但这些独占线程都是 C 代码实现的。这次,我希望由 lua 来驱动。所以新设立了 exclusive 服务的概念。它只能在 boostrap 过程创建,并绑定在独立的系统线程上。

exclusive 服务是为了方便实现一些会阻塞在系统调用上的业务,例如 socket 的 select (epoll) 上。如果以后需要使用一些第三方的自带网络处理的库,例如 MQ ,DB driver 等等,就不必担心它会占用工作线程太长时间了。

目前我只实现了一个 timer 服务,用于发送定时器消息。虽然 exclusive 服务也可以接收 ltask 内部的消息(它也有消息通信管道),但是,它有可能阻塞在系统调用上(对于 timer 服务来说,它会阻塞在 sleep 调用上)。所以没有使用内部消息来创建定时器。定时器还是和 skynet 的实现一样,直接用 spinlock 插入管理器。

前面提到,root 服务有一些特权。这个特权就是创建和销毁服务。这是因为调度器尽量避免零碎的锁,所以创建销毁服务 handle 都是在拿到调度器时完成的(避免对服务 handle 映射表加额外的锁)。这要求创建服务的业务编写起来要格外小心,所以我直接实现在 root 服务中,不把相关的 api 暴露出来。(实际上,是 root 发送特定消息给系统来完成的。而系统会拒绝非 root 服务发送来的相关消息)

exclusive 服务也有一些特权,比如它们可以批量发送消息。这样的 api 只在 ltask.exclusive 只模块中提供。而 shared 服务每次只可以发送一条消息,必须等待调度器将消息投递完成后,才可以发送下一条。

这里,需要谈谈 ltask 的消息模型和 skynet 最大的不同:每条消息发送后,系统都会将服务挂起,之后给出一个回执(receipt),业务层必须处理完这条回执才能跑后面的业务。

所以,在业务层面看,ltask 和 skynet 是一致的:send 消息后,业务逻辑不会被打断; call ( request )则可能在回应到来前被重新进入,有可能影响业务逻辑的上下文状态。但 ltask 无论是 send 还是 call 都可能挂起服务。

实际上,ltask 在底层已经不再区分是单项投递 (send) ,还是发起请求期待回应 (call)。所有消息都带有 session ,在底层都会有回应。只不过暴露给业务层时,send 这个 api 会忽略回应消息,call 这个 api 会挂起当前 coroutine 等待对方回应。

消息的回执单可能有三种状态:成功投递、目标服务不存在、目标服务的消息队列忙。

在目前的实现中,队列忙会抛出 error 。这可以极大的缓解系统繁忙时的雪崩效应。不过,以后可以进一步完善,让业务层可以做更细致的处理。例如,选择隔一段时间重新投递、写日志、等等。

这次我把绝大多数业务都放在了 lua 层实现,C 层只提供了必要的机制。对于大部分业务,都应该放在共享服务中。服务应该在处理完当前任务后挂起(在服务的主线程调用 coroutine.yield 即可),等待调度器再次将自己唤醒。ltask 的 api 提供了最少的对消息管道的处理:读取一条消息、发送一条消息、读取发送消息的回执。当一个服务的消息队列不为空或发送消息被处理完成后,调度器都会不断的唤醒服务。

对于 exclusive 服务,我预期是用 message based 模式编写的:即直接用一个消息循环去处理消息,并自行调用系统 api (例如 sleep ,socket select 等),每完成一部分工作后,就调用 corotoutine.yield 把 cpu 让出给调度器。而调度器会使用一个独立系统线程在调度工作完成后,立刻 resume 它。不用担心 exclusive 服务会饿死。

对于 shared 服务,我做了一些 lua 层面的封装工作。使用起来比较接近 skynet ,但做了不少简化。它和 skynet 一样,还是 request/response 模式的。服务可以在启动的时候注册一张表,每条 request 消息的第一个字段用来索引这张表,找到对应的处理函数。服务框架会为每个请求的处理函数创建一个独立的 coroutine ,在它对外发起请求时挂起,收到回应时延续。

最后谈谈新的调度器的实现。大体上和上一篇 blog 相同,但是在实现时,发现比预想的要复杂一点。

第一,消息投递:

每个服务有一个固定长度的消息队列,用于接收消息。这个服务是消息队列的唯一消费者。而调度器是所有消息队列的唯一生产者。

每个服务有一个单元的发送消息槽位和一个单元的回执单槽位。这个服务是它们的唯一生产者,调度器是唯一消费者。如果发送消息没有被调度器取走(消费),那么服务不可以生产下一条消息。

消息从服务产生,调度器取走并投递到目的服务中,并给源服务写入一个回执单。这样便完成了消息投递工作。这里全部是单一的生产者和单一的消费者,故而实现起来比较简单,也不需要任何锁。

第二,共享服务的调度:

每个工作线程都有三个槽位:预备做的任务(服务 id),正在做的任务,已经完成的任务。

预备做的任务由调度器安排进去(调度器是唯一生产者),由工作线程取出(消费者)。但这里有一个特殊的设定:调度器当没有任务可以分配时,它可以偷取已经分配到预备槽位的任务。所以这里的消费者并不唯一(调度器也可能是消费者),需要用 CAS 来操作。

正在做的任务完全是私有槽位,只有工作线程才可以读写。未来可能有监控机制可以读出来供管理员查看。

当工作线程把手头的工作做完后,它需要把已经完成的任务放在完成槽位,等待调度器收走。如果上一个任务迟迟没有收走,那么工作线程就不可以放入下一个完成任务。(这种情况非常罕见,但一旦发生,工作线程将竞争调度器的控制权,执行收走已完成的任务)

第三,调度器:

  1. 处理所有线程的发送消息,投递这些消息,生成回执。
  2. 收取所有工作线程已经完成的任务,放回调度队列中(若消息队列不为空)。
  3. 从调度队列中取出若干服务,分配给预备任务槽为空的工作线程。

调度器并不是一个独立线程,它是一个模块,可以被任意线程调用,但同时只能有一个线程有控制权。工作线程每次完成一个任务后,如果没有被分配下一个预备任务,它都会尝试竞争一次调度器的控制权。如果竞争失败,它会 sleep 。因为有偷取任务的设定,所以不用担心有边界状态导致一个任务被分配后,工作线程 sleep ,导致服务无法运行。

exclusive 服务每次让出后,关联的独立线程一定会抢占一次调度器模块,保证它发出的消息一定会被处理(exclusive 服务有一个专有的消息发送队列,而不仅仅是一个发送槽位。这可以让它批量发送消息)。

Todo :调度器应该根据服务最近被分配的工作线程,尽量保证分配到同一个工作线程处理。目前的实现中,虽然记录了服务最后一次所在的工作线程 id ,但暂时还没有用这个数据指导调度器的分配工作。

目前,ltask 还很初步。我想先把它用在我们的客户端引擎中。所以这次的实现尽量做成跨平台的(包括 windows 平台)。因为客户端对网络层要求不高,所以我暂时还没有实现。我想先可以用 luasocket lsocket 等现成的库来实现,而不必移植 skynet 的网络处理模块。

关于消息,目前复用的 skynet 中的序列化模块。但我想这次底层不再考虑跨进程通讯,未必需要把消息序列化成一个字符串。完全可以用一个自定义的数据结构来保存 lua 的值,用于服务间的数据交换。理论上可以比序列化字符串更高效。这个优化工作留到以后再做。

[返回] [原文链接]