Marl纤程调度原理深度解析实现高效协作式多任务处理【免费下载链接】marlA hybrid thread / fiber task scheduler written in C 11项目地址: https://gitcode.com/gh_mirrors/ma/marlMarl是一个用C 11编写的混合线程/纤程任务调度器它结合了纤程和线程的优势允许高效执行可能阻塞的任务同时保持固定数量的硬件线程。本文将深入解析Marl纤程调度的核心原理帮助开发者理解其如何实现高效协作式多任务处理。什么是纤程纤程是一种轻量级的协作式线程可以在显式的 yield 点暂停和恢复。由于目前没有标准的跨平台纤程或协程库Marl实现了marl::OSFiber类来支持各平台这些实现大多用汇编编写用于保存和恢复被调用者保存的寄存器并维护纤程栈的分配。marl::OSFiber是内部实现细节并未在公共API中公开。marl::Scheduler::Fiber是与marl::Scheduler紧密耦合的公共纤程接口它具有简单的std::condition_variable类似接口。每个marl::Scheduler::Fiber都永久关联到一个marl::Scheduler::Worker并且保证只能在用于暂停的同一线程上恢复。Marl任务调度基础任务定义与调度marl::Task是std::functionvoid()的别名即一个不接受参数也不返回值的函数。任务通过marl::schedule()进行调度通常实现为lambda表达式marl::schedule([] { printf(Hello world!\n); });虽然marl::Task签名不接受参数但通常会捕获变量作为任务的输入和输出。Marl的所有同步原语marl::ConditionVariable除外都持有内部状态的共享指针建议按值捕获这些原语以避免任务生命周期超过调用marl::schedule()的栈时导致内存损坏。工作线程Workers调度器包含多个marl::Scheduler::Worker每个工作线程包含work.tasks- 待启动的任务队列work.fibers- 已暂停但准备恢复的纤程队列work.waiting- 等待恢复或超时的暂停纤程队列work.num- 与work.tasks.size() work.fibers.size()同步的计数器work.numBlockedFibers- 记录当前在suspend()调用中阻塞的纤程数量idleFibers- 准备重用的空闲纤程集合当调用marl::schedule()调度任务时会选择一个工作线程并将任务放入其work.tasks队列。工作线程的选择规则如下如果调度器没有专用工作线程marl::Scheduler::config().workerThreads.count 0则任务被排队到当前执行线程的单线程工作线程Single-Threaded-Worker否则选择一个多线程工作线程Multi-Threaded-Worker。如果有工作线程进入spin-for-work状态则优先选择这些线程否则以轮询方式选择多线程工作线程纤程调度核心机制工作线程运行循环run()run()是工作纤程的主要处理函数由每个多线程工作线程的线程启动时调用也会在所有其他纤程阻塞时从Worker::suspend()生成新的工作纤程时调用。该函数调用runUntilShutdown()进入以下循环调用waitForWork()阻塞直到有新的工作要处理调用runUntilIdle()处理所有新任务和纤程。注意纤程可以在runUntilIdle()内部切换因此run()的执行可能在单个线程的纤程之间跳转此循环一直持续到工作线程完成所有工作并被告知关闭。一旦循环因工作线程被告知关闭而退出mainFiber将恢复处理其余的关闭逻辑。运行直到空闲runUntilIdle()顾名思义此函数执行工作直到没有更多工作或所有工作都被阻塞。其基本逻辑如下恢复所有未阻塞的任务纤程runUntilIdle()首先完成所有准备恢复不再阻塞的纤程。通过从work.fibers队列中获取一个纤程将当前纤程放入idleFibers队列此纤程被视为空闲因为它正在寻找工作并将上下文切换到获取的纤程。执行未阻塞的纤程优先于启动新任务这是因为新任务可能会产生更多纤程每个纤程都会消耗一定的内存通常用于栈。开始执行新任务一旦所有可恢复的纤程都已完成或重新阻塞就从work.tasks队列中获取新任务并执行。任务完成后控制权返回到runUntilIdle()主循环从步骤1重新开始。一旦没有更多纤程或任务可执行runUntilIdle()返回。等待工作waitForWork()当工作线程没有任务可启动且没有纤程可恢复时调用waitForWork()阻塞直到工作线程有事情可做。如果工作线程是多线程工作线程waitForWork()首先进入spinForWork()否则跳过此阶段。然后waitForWork()等待以下任一情况发生后返回纤程准备恢复被加入work.fibers队列任务被加入work.tasks队列work.waiting队列中的纤程超时工作线程被关闭返回前work.waiting队列中超时的纤程会自动移至work.fibers队列。自旋等待工作spinForWork()spinForWork()有两个作用尝试从其他工作线程窃取工作以保持工作负载均衡任务长度的持续时间可能有很大差异随着时间的推移一些工作线程可能会有大量的工作队列而其他工作线程则处于饥饿状态。spinForWork()仅在工作线程饥饿时调用并会尝试从随机选择的工作线程窃取任务。由于纤程只能在同一线程上执行因此只能窃取任务不能窃取纤程。尝试避免将线程让渡给操作系统通常会有一个任务提供者向调度器调度许多小的子任务这些子任务均匀地分配给工作线程消费者。这些消费者的数量通常超过提供者很容易出现提供者难以提供足够的工作来使消费者完全占用的情况。在这种情况下工作线程可能会进入一个循环给它们一个任务完成它然后等待一小段时间以获取更多工作。当等待另一个任务时允许工作线程让渡给操作系统例如使用std::condition_variable::wait()可能会导致性能损失。根据平台的不同线程可能需要一毫秒或更长时间才能被操作系统恢复。这种长度的停顿可能导致整个任务依赖图的显著停顿。spinForWork()包含一个运行短时间的循环循环体执行以下操作使用nops的紧凑循环使CPU保持忙碌同时定期检查work.num以查看是否有新工作可用。如果发现新工作spinForWork()立即返回。如果没有安排新工作则尝试从另一个随机工作线程窃取任务。如果窃取成功spinForWork()立即返回。如果窃取失败则调用std::this_thread::yield()以防止Marl使操作系统饥饿。暂停suspend()Marl允许任务阻塞同时保持线程忙碌。如果任务阻塞则调用Scheduler::Worker::suspend()。suspend()首先调用Scheduler::Worker::waitForWork()阻塞直到有可执行的任务或纤程。然后发生以下情况之一如果有任何未阻塞的纤程则从work.fibers队列中获取纤程并切换到该纤程。如果有任何空闲纤程则从idleFibers集合中获取一个并切换到该纤程。恢复后这个空闲纤程将继续执行任务的角色。如果以上情况都不发生则需要创建一个新的纤程来继续执行任务。创建此纤程以开始在marl::Scheduler::Worker::run()中执行并切换到该纤程。在所有情况下suspend()调用都会切换到另一个纤程。当被暂停的纤程恢复时suspend()返回给调用者。工作线程类型工作线程分为单线程工作线程Single-Threaded-Worker和多线程工作线程Multi-Threaded-Worker两种类型。两种模式的大部分逻辑是相同的。最显著的区别是多线程工作线程会生成一个专用的工作线程来调用marl::Scheduler::run()而单线程工作线程只会在所有其他纤程阻塞时在新的纤程上调用marl::Scheduler::run()。单线程工作线程每个通过调用marl::Scheduler::bind()绑定的线程都会创建一个单线程工作线程STW。如果调度器没有专用工作线程marl::Scheduler::config().workerThreads.count 0则调度的任务会排队到当前执行线程的STW。由于在此模式下没有工作线程STW上排队的任务不会自动在后台执行。相反只有在调用marl::Scheduler::Worker::suspend()时才会执行任务。suspend()的逻辑对于STW和MTW是通用的当所有其他纤程阻塞时会生成调用marl::Scheduler::Worker::run()的新纤程。void SingleThreadedWorkerExample() { marl::Scheduler::Config cfg; cfg.setWorkerThreadCount(0); // STW mode. marl::Scheduler scheduler(cfg); scheduler.bind(); defer(scheduler.unbind()); // Calling marl::schedule() enqueues the task on the STW, but does not // execute it until the thread is blocked. marl::Event done; marl::schedule([] { done.signal(); }); // This is a blocking call. // marl::Event::wait() (indirectly) calls marl::Scheduler::Worker::suspend(). // marl::Scheduler::Worker::suspend() creates and switches to a fiber which // calls marl::Scheduler::Worker::run() to run all enqueued tasks. Once the // main fiber becomes unblocked, marl::Scheduler::Worker::runUntilIdle() will // switch back to the main fiber to continue execution of the application. done.wait(); }多线程工作线程当marl::Scheduler使用正数量的工作线程marl::Scheduler::Config::workerThread::count 0构造时会创建多线程工作线程。每个MTW与一个新的std::thread配对该线程通过调用marl::Scheduler::Worker::run()开始。当工作线程被告知关闭且所有工作完成后marl::Scheduler::Worker::run()退出主处理循环并切换回主线程纤程从而结束std::thread。快速上手Marl要开始使用Marl首先需要克隆仓库git clone https://gitcode.com/gh_mirrors/ma/marl然后可以参考examples/目录中的示例程序如hello_task.cpp了解如何创建调度器并调度任务。基本步骤如下创建使用所有可用逻辑处理器的Marl调度器将调度器绑定到主线程使用marl::schedule()调度任务使用同步原语如marl::WaitGroup等待任务完成确保在返回前解绑调度器以下是一个简单的示例#include marl/scheduler.h #include marl/waitgroup.h int main() { // 创建使用所有逻辑处理器的Marl调度器 // 将此调度器绑定到主线程以便我们可以调用marl::schedule() marl::Scheduler scheduler(marl::Scheduler::Config::allCores()); scheduler.bind(); defer(scheduler.unbind()); // 在返回前自动解绑 marl::WaitGroup saidHello(4); marl::Event sayHello; // 调度一些异步运行的任务 for (int i 0; i 4; i) { // 每个任务将在4个工作线程之一上运行 marl::schedule([] { // 等待信号再开始工作 sayHello.wait(); // 任务完成时递减WaitGroup计数器 defer(saidHello.done()); // 在任务中阻塞 // 调度器会为这个线程找到其他事情做。 marl::Thread::sleep(marl::milliseconds(10)); printf(Hello from task %d!\n, i); }); } sayHello.signal(); // 解除所有任务的阻塞 saidHello.wait(); // 等待所有任务完成 printf(All tasks said hello.\n); // 所有任务都保证在调度器析构前完成。 return 0; }注意事项绑定调度器到外部创建的线程为了调用marl::schedule()调度器必须绑定到调用线程。在调用marl::schedule()之前未将调度器绑定到线程将导致未定义行为。marl::Scheduler可以同时绑定到任意数量的线程并且可以通过marl::Scheduler::get()从绑定的线程中检索调度器。从一个线程传递调度器到另一个线程的典型方式是// 在一个线程中: marl::Scheduler scheduler; some_thread.start([scheduler_ptr scheduler] { // 将调度器绑定到新线程 scheduler_ptr-bind(); defer(scheduler_ptr-unbind()); // ... });始终记住在终止线程之前解绑调度器。忘记解绑将导致marl::Scheduler析构函数无限期阻塞。不要在Marl任务中使用外部阻塞调用marl::Scheduler内部持有多个工作线程这些线程将执行调度的任务。如果Marl任务在Marl同步原语上阻塞Marl可以从阻塞的任务中让出并继续执行其他调度的任务。在Marl工作线程上调用非Marl阻塞函数将阻止该工作线程能够切换执行其他任务直到阻塞函数返回。这些非Marl阻塞函数的示例包括std::mutex::lock()、std::condition_variable::wait()、accept()。短时间的阻塞调用是可以接受的例如获取互斥锁以访问数据结构。但是要注意不要在持有std::mutex锁的情况下使用Marl阻塞调用——Marl任务可能会在持有锁的情况下让出并阻止其他任务重新获取互斥锁。这种情况可能会导致死锁。如果需要从Marl工作线程进行阻塞调用可能希望使用marl::blocking_call()它将生成一个新线程来执行调用允许Marl工作线程继续处理其他调度的任务。总结Marl通过结合纤程和线程的优势提供了一个高效的任务调度解决方案。其核心在于工作线程对任务和纤程的管理包括运行循环、任务窃取、纤程暂停和恢复等机制。通过理解这些内部工作原理开发者可以更好地利用Marl来构建高性能的并发应用程序。无论是单线程模式还是多线程模式Marl都能有效地管理任务执行确保线程利用率最大化同时避免不必要的阻塞和上下文切换开销。要深入了解更多细节可以参考项目中的官方文档docs/scheduler.md以及源代码中的关键实现文件如scheduler.cpp和fiber相关实现。【免费下载链接】marlA hybrid thread / fiber task scheduler written in C 11项目地址: https://gitcode.com/gh_mirrors/ma/marl创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考