libgo 源码剖析(2. libgo调度策略源码实现)

发布时间:2020-04-16 05:00:18 作者:暮回_zz
来源:网络 阅读:3802

本文将从源码实现上对 libgo 的调度策略进行分析,主要涉及到上一篇文章中的三个结构体的定义:

三者的关系如下图所示:

libgo 源码剖析(2. libgo调度策略源码实现)


本文会列出类内的主要成员和主要函数做以分析。


1. 协程调度器:class Scheduler

libgo/scheduler/scheduler.h

class Scheduler{
public:

    /*
    *  创建一个调度器,初始化 libgo
    *  创建主线程的执行器,如果后续 STart 的时候没有参数,默认只有一个执行器去做
    *  当仅使用一个线程进行协程调度时, 协程地执行会严格地遵循其创建顺序.
    * */
    static Scheduler* Create();

    /*
    * 创建一个协程 Task 对象,并添加到当前的执行器 processer 的任务队列中,
    * 调度器的任务数 taskCount_ +1
    * */
    void CreateTask(TaskF const& fn, TaskOpt const& opt);

    /* 启动调度器
    * @minThreadNumber : 最小调度线程数, 为0时, 设置为cpu核心数.
    * @maxThreadNumber : 最大调度线程数, 为0时, 设置为minThreadNumber.
    *          如果maxThreadNumber大于minThreadNumber, 则当协程产生长时间阻塞时,
    *          可以自动扩展调度线程数.
    *  唤醒定时器线程
    *  每个调度线程都会调用 Process 开始调度,最后开启 id 为 0 的调度线程
    * 如果 maxThreadNumber_ > 1 的话,会开启调度线程 DispatcherThread
    * */
    void Start(int minThreadNumber = 1, int maxThreadNumber = 0);

    /*
    *  停止调度,停止后无法恢复, 仅用于安全退出main函数
    *  如果某个调度线程被协程阻塞, 必须等待阻塞结束才能退出.
    * */
    void Stop();

private:
    /*
    *  调度线程,主要为平衡多个 processer 的负载将高负载或阻塞的 p 中的协程 steal 给低负载的 p
    *  如果全部阻塞但是还有协程待执行,会起新线程,线程数不超过
    maxThreadNumber_
    *  会将阻塞 P 中的协程分摊给负载较少的 P
    * */
    void DispatcherThread();

    /*
    *  创建一个新的 Processer,并添加到双端队列 processers_ 中
    * */
    void NewProcessThread();

private:
    atomic_t<uint32_t> taskCount_{0};   // 用来统计协程数量
    Deque<Processer*> processers_;    // DispatcherThread双端队列,用来存放所有的执行器,每个执行器都会单独开一个线程去执行,线程中回调 Process() 方法。
    LFLock started_;    // libgo 提供的自选锁

};

调度器负责管理 1~N 个调度线程,每个调度线程一个执行器 Processer。调度器仅负责均衡各个执行器的负载,防止全部卡住的情况,并不涉及协程的切换等工作。

使用

ligbo提供了默认的协程调度器 co_sched

#define g_Scheduler ::co::Scheduler::getInstance()
#define co_sched g_Scheduler

用户也可以创建自己的协程调度器

co::Scheduler* my_sched = co::Scheduler::Create();

启动调度

std::thread t([my_sched]{mysched->Start();});
t.detach();

调度器原理

  1. schedule 负责整个系统的协程调度,协程的运行依赖于执行器 Processer(简称 P),因此在调度器初始化的时候会选择创建 P 的数量(支持动态增长),所有的执行器会添加到双端队列中。主线程也作为一个执行器,在创建 Scheduler 对象的时候创建,位于双端队列下标为 0 的位置(注意:只是创建对象,并没有开始运行);

  2. 当调用了 Start() 函数后,会正式开始运行。在 Start 函数内部,会创建指定数量的执行器 P,具体数量取决于参数,默认会创建 minThreadNumber 个,当全部执行器都阻塞之后,会动态扩展,最多 maxThreadNumber 个执行器。每个执行器都会运行于一个单独的线程,执行器负责该线程内部协程的切换和执行;

  3. 当创建协程时,会将协程添加到某一个处于活跃状态的执行器,如果恰好都不活跃,也会添加到某一个 P 中,这并不影响执行器的正常工作,因为调度器的调度线程会去处理它;

  4. Start 函数内部,除了上述执行器所在线程,还会开启调度线程 DispatcherThread,调度线程会平衡各个 P 的协程数量和负载,进行 steal,如果所有 P 都阻塞,会根据 maxThreadNumber 动态增加 P 的数量,如果仅仅部分 P 阻塞,会将阻塞的 P 中的协程全部拿出(steal),均摊到负载最小的 P 中;

  5. Schedule 也会选择性开启协程的定时器线程;

  6. 开启 FastSteadyClock 线程。

关于定时器以及时钟的实现,会在之后的文章中讨论。


2. 协程执行器:class Processer

libgo/scheduler/processer.h

每个协程执行器对应一个线程,负责本线程的协程调度,但并非线程安全的,是协程调度的核心。

class Processer
{
public:
    // 协程挂起标识,用于后续进行唤醒和超时判断
    struct SuspendEntry {
             // ...
    };

    // 协程切出
    ALWAYS_INLINE static void StaticCoYield();

    // 挂起当前协程
    static SuspendEntry Suspend();

    // 挂起当前协程, 并在指定时间后自动唤醒
    static SuspendEntry Suspend(FastSteadyClock::duration dur);

    // 唤醒协程
    static bool Wakeup(SuspendEntry const& entry);

private:
    /*
    *  执行器对协程的调度,也是执行器所在现在的主处理逻辑
    * */
    void Process();

    /*
    * 从当前执行器中偷 n 个协程并返回
    * n 为0则全部偷出来,否则取出相应的个数
    * */
    SList<Task> Steal(std::size_t n);

    /*
    *  给当前执行器打标记,用于检测协程是否阻塞
    * */
    void Mark();

private:
    int id_;    // 线程 id,与 shcedule 中的 _processer 下标对应
    Scheduler * scheduler_;     // 该执行器依赖的调度器
    volatile bool active_ = true;   // 该执行器的活跃状态,活跃表明该执行器未被阻塞,由调度器的调度线程控制

    volatile int64_t markTick_ = 0;     // mark 的时间戳
    volatile uint64_t markSwitch_ = 0;  // mark 的时候处于第几次协程调度
    volatile uint64_t switchCount_ = 0; // 协程调度的次数

    // 当前正在运行的协程
    Task* runningTask_{nullptr};
    Task* nextTask_{nullptr};

    // 协程队列
    typedef TSQueue<Task, true> TaskQueue;
    TaskQueue runnableQueue_;   // 运行协程队列
    TaskQueue waitQueue_;   // 等待协程队列
    TSQueue<Task, false> gcQueue_;  // 待回收的协程队列,协程运行完毕之后,会被添加到该队列中,等待回收
    TaskQueue newQueue_;    // 新添加到该执行器中的协程,包括刚刚 steal 过来的协程,该队列中的协程暂不会执行,会由 Process() 函数将该队列中的协程不断添加到 runnableQueue_ 中

    volatile uint64_t switchCount_ = 0; // 协程调度的次数

    // 执行器等待的条件变量
    std::mutex cvMutex_;
    std::condition_variable cv_;
    std::atomic_bool waiting_{false};
};

执行器对协程的调度 Process()

执行器 Processer 维护了三个线程安全的协程队列:


void Processer::Process()
{
    GetCurrentProcesser() = this;

    bool & isStop = *stop_;

    while (!isStop)
    {
        runnableQueue_.front(runningTask_);

        // 获取一个可以运行对协程对象
        if (!runningTask_) {
            if (AddNewTasks())
                runnableQueue_.front(runningTask_);

            if (!runningTask_) {
                WaitCondition();    // 没有可以执行的协程,wait 条件变量
                AddNewTasks();
                continue;
            }
        }

        addNewQuota_ = 1;
        while (runningTask_ && !isStop) {
            runningTask_->state_ = TaskState::runnable;
            runningTask_->proc_ = this;

            ++switchCount_;
            runningTask_->SwapIn();
            switch (runningTask_->state_) {
                case TaskState::runnable:
                    {
                        std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef());
                        auto next = (Task*)runningTask_->next;
                        if (next) {
                            runningTask_ = next;
                            runningTask_->check_ = runnableQueue_.check_;
                            break;
                        }

                        if (addNewQuota_ < 1 || newQueue_.emptyUnsafe()) {
                            runningTask_ = nullptr;
                        } else {
                            lock.unlock();
                            if (AddNewTasks()) {
                                runnableQueue_.next(runningTask_, runningTask_);
                                -- addNewQuota_;
                            } else {
                                std::unique_lock<TaskQueue::lock_t> lock2(runnableQueue_.LockRef());
                                runningTask_ = nullptr;
                            }
                        }
                    }
                    break;

                case TaskState::block:
                    {
                        std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef());
                        runningTask_ = nextTask_;
                        nextTask_ = nullptr;
                    }
                    break;

                case TaskState::done:
                default:
                    {
                        runnableQueue_.next(runningTask_, nextTask_);
                        if (!nextTask_ && addNewQuota_ > 0) {
                            if (AddNewTasks()) {
                                runnableQueue_.next(runningTask_, nextTask_);
                                -- addNewQuota_;
                            }
                        }

                        DebugPrint(dbg_task, "task(%s) done.", runningTask_->DebugInfo());
                        runnableQueue_.erase(runningTask_);
                        if (gcQueue_.size() > 16)    // 执行完毕的协程,需要回收资源
                            GC();
                        gcQueue_.push(runningTask_);
                        if (runningTask_->eptr_) {
                            std::exception_ptr ep = runningTask_->eptr_;
                            std::rethrow_exception(ep);
                        }

                        std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef());
                        runningTask_ = nextTask_;
                        nextTask_ = nullptr;
                    }
                    break;
            }
        }
    }
}

在调度器 Schedule 执行 Stop() 函数之前,执行器 P 会一直处于调度协程阶段 Process()。在期间,执行器 P 会将运行队列 runnableQueue 中的第一个协程获取进行执行,如果可运行队列为空,执行器会尝试将处于 newQueue 中的协程添加到可运行队列中去,如果 newQueue_ 为空,说明此时该执行器处于无协程可调度状态,通过设置条件变量,将执行器设置为等待状态;

当获取到一个可执行协程之后,会执行该协程的任务。协程的执行流程是通过状态机来实现的。(协程有三个状态:运行中,阻塞,执行完毕)

条件变量

Processer 使用了 std::mutex,并且提供了条件变量用来唤醒。当调度器尝试获取下一个可运行的协程对象时,若此时无可用协程对象,就会主动去等待该条件变量,默认100毫秒的超时时间。

void Processer::WaitCondition()
{
    GC();
    std::unique_lock<std::mutex> lock(cvMutex_);
    waiting_ = true;
    cv_.wait_for(lock, std::chrono::milliseconds(100));
    waiting_ = false;
}

void Processer::NotifyCondition()
{
    cv_.notify_all();
}

当调度器向该执行器中增加了新的协程对象时,会唤醒该条件变量,继续执行 Process 流程。使用条件变量唤醒的效率,要远远高于不断去轮询。

为什么在使用了条件变量后还要设置超时时间,定时轮询,即使条件变量没有被唤醒也希望它返回呢?
因为我们不希望线程会在这里阻塞,只要没有新的协程加入,就一直在死等。我们希望线程在等待的同时,也可以定时跳出,执行一些其它的检测工作等。

从执行器中偷指定数量的协程出来 -> steal()

简单来说,从执行器中取协程出来,就是从执行器维护的双端队列中获取执行个数的结点。

为什么要取出来?前面提到过,要么该执行器负载过大,要么该执行器处于阻塞的状态。

SList<Task> Processer::Steal(std::size_t n)
{
    if (n > 0) {
        // steal 指定个数协程
        newQueue_.AssertLink();
        auto slist = newQueue_.pop_back(n);
        newQueue_.AssertLink();
        if (slist.size() >= n)
            return slist;

        std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef());
        bool pushRunningTask = false, pushNextTask = false;
        if (runningTask_)
            pushRunningTask = runnableQueue_.eraseWithoutLock(runningTask_, true) || slist.erase(runningTask_, newQueue_.check_);
        if (nextTask_)
            pushNextTask = runnableQueue_.eraseWithoutLock(nextTask_, true) || slist.erase(nextTask_, newQueue_.check_);
        auto slist2 = runnableQueue_.pop_backWithoutLock(n - slist.size());
        if (pushRunningTask)
            runnableQueue_.pushWithoutLock(runningTask_);
        if (pushNextTask)
            runnableQueue_.pushWithoutLock(nextTask_);
        lock.unlock();

        slist2.append(std::move(slist));
        if (!slist2.empty())
            DebugPrint(dbg_scheduler, "Proc(%d).Stealed = %d", id_, (int)slist2.size());
        return slist2;
    } else {
        // steal all
        newQueue_.AssertLink();
        auto slist = newQueue_.pop_all();
        newQueue_.AssertLink();

        std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef());
        bool pushRunningTask = false, pushNextTask = false;
        if (runningTask_)
            pushRunningTask = runnableQueue_.eraseWithoutLock(runningTask_, true) || slist.erase(runningTask_, newQueue_.check_);
        if (nextTask_)
            pushNextTask = runnableQueue_.eraseWithoutLock(nextTask_, true) || slist.erase(nextTask_, newQueue_.check_);
        auto slist2 = runnableQueue_.pop_allWithoutLock();
        if (pushRunningTask)
            runnableQueue_.pushWithoutLock(runningTask_);
        if (pushNextTask)
            runnableQueue_.pushWithoutLock(nextTask_);
        lock.unlock();

        slist2.append(std::move(slist));
        if (!slist2.empty())
            DebugPrint(dbg_scheduler, "Proc(%d).Stealed all = %d", id_, (int)slist2.size());
        return slist2;
    }
}

首先,会从 newQueue 队列中获取协程结点,因为 newQueue 中的结点还没有添加到运行队列中,因此可以直接取出;如果 newQueue 中协程数量不足,会从 runnableQueue 队列尾部中继续获取结点。由于 runnableQueue 队列中我们记录了正在执行的协程和下一次将执行的协程(runningTask & nextTask),需要特殊处理。在从 runnableQueue 偷协程之前,会将 runningTask & nextTask 从队列删除,待偷完结点之后再次添加到当前 runnableQueue_ 队列中。

简单说,偷协程的工作,不会从队列中获取到 runningTask & nextTask 标识的协程。

阻塞判断

void Processer::Mark()
{
    if (runningTask_ && markSwitch_ != switchCount_) {
        markSwitch_ = switchCount_;
        markTick_ = NowMicrosecond();
    }
}

uint32_t cycle_timeout_us = 10 * 1000; 

bool Processer::IsBlocking()
{
    if (!markSwitch_ || markSwitch_ != switchCount_) return false;
    return NowMicrosecond() > markTick_ + CoroutineOptions::getInstance().cycle_timeout_us;
}

Mark 函数会在调度器的调度函数中被调用,需要注意的是,只有执行器处于活跃状态时才会调用。Mark 顾名思义,是给该执行打标记,会记录mark的时间戳,并记录下是在第多少次协程调度的过程中做了标记,Mark 的作用是用来进行执行器的阻塞检测。

处于活跃状态的执行器,总是在执行着协程的切换,因此,会不断自增 switchCount_ 的值,根据 IsBlocking 函数得知,当我们此时标签记录的协程调度次数超过10ms没有发生改变,我们认为该执行器发生阻塞,Scheduler 会进行 Steal 操作。

协程挂起 Suspend

static SuspendEntry Suspend();

一种方式是直接挂起,会将该协程状态转换为 TaskState::block,然后将该协程从 runnableQueue 中删除,再添加到 waitQueue 中;

另外一种方式是挂起之后(第一种方式执行完毕之后),允许配置一个时间段之后去自动唤醒该协程。

wakeup

用于唤醒协程

唤醒协程要做的,就是讲待唤醒的协程从 waitQueue_ 中删除并重新添加到 newQueue_中去。

StaticCoYield

用于在一个执行器中切出当前协程

有两种可能,一种是协程被阻塞需要挂起;另外一种是协程执行完毕,主动切出。

具体实现是通过获取当前执行器正在执行的协程 Task,调用 SwapOut() 方法实现。

ALWAYS_INLINE void Processer::StaticCoYield()
{
    auto proc = GetCurrentProcesser();
    if (proc) proc->CoYield();
}

ALWAYS_INLINE void Processer::CoYield()
{
    Task *tk = GetCurrentTask();
    assert(tk);

    ++ tk->yieldCount_;

#if ENABLE_DEBUGGER
    DebugPrint(dbg_yield, "yield task(%s) state = %s", tk->DebugInfo(), GetTaskStateName(tk->state_));
    if (Listener::GetTaskListener())
        Listener::GetTaskListener()->onSwapOut(tk->id_);
#endif

    tk->SwapOut();
}

几个需要注意的问题

> 可能会切出协程上下文的几种情况:
  1. 协程被挂起;
  2. 协程执行完毕;
  3. 用户主动切出 co_yield。
    #define co_yield do { ::co::Processer::StaticCoYield(); } while (0)
> 协程被挂起的几种情况:
  1. 系统函数被 hook;
  2. libgo_poll (被 hook 的 io 操作函数会调用 libgo_poll 实现切换)
  3. select
  4. sleep、usleep、nanosleep
  5. 调用了协程锁 CoMutex(co_mutex),协程读写锁 CoRWMutex(co_rwmutex),或者使用了 channel。
> 切入协程上下文的几种情况:
  1. 执行器在调度(Process)期间;
  2. 唤醒挂起协程不会切入上下文,只是从等待队列中重新添加到 newQueue_。

3. 协程对象:struct Task

# 协程状态
enum class TaskState
{
    runnable,   // 可运行
    block,      // 阻塞
    done,       // 协程运行完毕
};

typedef std::function<void()> TaskF;    // c++11提供的函数模板

struct Task
{
    TaskState state_ = TaskState::runnable;
    uint64_t id_;       // 当前调度器下协程编号,从0开始
    TaskF fn_;          // 协程运行的函数
    uint64_t yieldCount_ = 0;   // 协程切出的次数
    Context ctx_;       // 上下文信息
    Processer* proc_ = nullptr;     // 归属于哪个执行器

    // 提供了协程切入、切出、切换到指定线程三个函数
    ALWAYS_INLINE void SwapIn();
    ALWAYS_INLINE void SwapTo(Task* other);
    ALWAYS_INLINE void SwapOut();

private:
    static void StaticRun(intptr_t vp);     // 参数为 Task*,函数会去执行该 Task 的 fn_(),执行完毕后,协程状态改为 TaskState::done,并在执行器 P 中切出
};

每个 Task 对象是一个协程,在使用过程中,创建一个协程实际就是创建了一个 Task 对象,再添加到对应的执行器 P 中。之前提到过,执行器进行协程调度是通过一个状态机来实现的,这里的 TaskState 就是协程状态,协程函数 fn_ 会在 StaticRun 静态方法中调用,该静态方法注册到了协程上下文 _ctx 中。

除此之外,Task 类内部,也提供了协程的切入切出方法,本质也是调用了上下文的切换。

StaticRun

控制协程的运行,内部调用了 Task::Run() 方法,会在协程函数 fn_ 执行完毕之后,将协程状态转换为 TaskState::done,并将协程切出。

void Task::Run()
{
    auto call_fn = [this]() {
        this->fn_();
        this->fn_ = TaskF(); //让协程function对象的析构也在协程中执行
    };

    \\ ...
        call_fn();
    \\ ...
    state_ = TaskState::done;
    Processer::StaticCoYield();
}

void Task::StaticRun(intptr_t vp)
{
    Task* tk = (Task*)vp;
    tk->Run();
}

这里就是对 libgo 调度相关实现的描述,本文跳过了对定时器和时钟部分的实现,这个会在之后单独叙述。本文涉及到的代码在源码目录下的

libgo-master/libgo/scheduler/processer.cpp   
libgo-master/libgo/scheduler/processer.h
libgo-master/libgo/scheduler/scheduler.cpp
libgo-master/libgo/scheduler/scheduler.h

有兴趣的读者可以对照源码学习,欢迎讨论学习

推荐阅读:
  1. BAT大牛 带你深度剖析Android 10大开源框架
  2. 怎么进行Spark Streaming 原理剖析

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

调度器 执行器 协程

上一篇:ASP.NET 简单的柱形图实现(附带示例)

下一篇:Web架构之单机时代

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》