#include <chrono>
#include <coroutine>
#include <functional>
#include <iostream>
#include <queue>
#include <string>
#include <thread>

using task = std::function<void(const std::string&)>;

class executor {
private:
    std::thread work_thread_;
    std::queue<task> tasks_;

public:
    void add_task(task t) { tasks_.emplace(std::move(t)); }

    void run() {
        work_thread_ = std::thread([this]() {
            uint64_t times = 1;
            std::cout << "executor thread: " << std::this_thread::get_id() << "\n";
            while (true) {
                auto& task = tasks_.front();
                task(std::to_string(times));
                times++;
                tasks_.pop();

                using namespace std::chrono_literals;
                std::this_thread::sleep_for(1000ms);
            }
            });

        if (work_thread_.joinable()) {
            work_thread_.join();
        }
    }
};

class coroutine_task {
public:
    struct promise_type {
        std::suspend_never initial_suspend() noexcept { return {}; }

        std::suspend_always final_suspend() noexcept { return {}; }

        void unhandled_exception() noexcept {}

        coroutine_task get_return_object() noexcept {
            auto tk = coroutine_task{
                std::coroutine_handle<promise_type>::from_promise(*this) };
            return tk;
        }

        /*void return_value(int v) noexcept {}*/
        void return_void() noexcept {}

        std::suspend_always yield_value(std::string&& from) noexcept {
            value_ = std::move(from);
            return {};
        }

        std::string value_;
    };

private:
    std::coroutine_handle<promise_type> coro_;

public:
    coroutine_task(std::coroutine_handle<promise_type> h) : coro_(h) {}

    ~coroutine_task() {
        if (coro_) {
            coro_.destroy();
        }
    }

    std::string value() { return coro_.promise().value_; }
};

template <typename R, typename Executor = executor>
struct awaitable {
private:
    R buf_;
    Executor& e_;
    std::coroutine_handle<> coro_handle_;

public:
    awaitable(Executor& e) : e_(e) {}

    bool await_ready() noexcept { return false; }

    R await_resume() noexcept { return buf_; }

    void await_suspend(std::coroutine_handle<> p) noexcept {
        coro_handle_ = p;
    }

    void async_start() {
        e_.add_task([this](const R& times) {
            std::cout << "async resume thread: " << std::this_thread::get_id() << "\n";
            buf_ = times;
            coro_handle_.resume();
            });
    }
};

auto async_read(executor& e) {
    awaitable<std::string> aw{ e };
    aw.async_start();
    return aw;
}

coroutine_task coro_read1(executor& e) {
    std::cout << "coro_read1 begin thread: " << std::this_thread::get_id() << "\n";
    for (;;) {
        auto value = co_await async_read(e);
        std::cout << "1、coro_read1: " << value << " thread: " << std::this_thread::get_id() << "\n";
        value = co_await async_read(e);
        std::cout << "2、coro_read1: " << value << " thread: " << std::this_thread::get_id() << "\n";
    }
}

coroutine_task coro_read2(executor& e) {
    std::cout << "coro_read2 begin thread: " << std::this_thread::get_id() << "\n";
    for (;;) {
        auto value = co_await async_read(e);
        std::cout << "coro_read2: " << value << " thread: " << std::this_thread::get_id() << "\n";
    }
}

int main() {
    executor e;

    auto cr1 = coro_read1(e);
    auto cr2 = coro_read2(e);

    e.run();
    return 0;
}
 

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐