从 0 手写一个工业级线程池(支持 future + 拒绝策略 + 优雅关闭)—— C++ 多线程与并发系统取向(实战篇)
一、为什么要自己实现线程池在 C 多线程学习中我们通常掌握了std::threadstd::mutexstd::condition_variablestd::atomic但这些只是“工具”。真正的工程能力是✅ 把这些能力组合成一个可复用的并发组件这就是线程池的本质控制线程数量避免频繁创建销毁解耦任务提交与执行提供统一调度能力支持返回值 / 异常 / 生命周期管理二、Java 对比理解更快在 Java 开发里常见做法是不用自己手写线程池实现也不太建议直接用Executors.newFixedThreadPool()这种快捷工厂而是自己new ThreadPoolExecutor(...)来创建线程池这里只是演示。Java 中线程池ExecutorService pool Executors.newFixedThreadPool(4); FutureInteger f pool.submit(() - 1 2); int result f.get(); pool.shutdown();核心能力能力JavaC提交任务submitsubmit返回结果Futurefuture异常传播get() 抛异常get() 抛异常优雅关闭shutdownshutdown强制停止shutdownNowshutdown_now C 要自己实现这一整套机制。三、线程池系统设计核心思维1️⃣ 线程模型固定数量 worker 线程循环取任务执行2️⃣ 共享资源std::queuestd::functionvoid() tasks_; 多线程共享 → 必须加锁3️⃣ 资源保护std::mutex std::unique_lock4️⃣ 线程协作std::condition_variable 队列空 → 等待 有任务 → 唤醒5️⃣ 状态控制std::atomicState6️⃣ 生命周期管理join() shutdown()一句话总结atomic 管状态mutex 管资源cv 管等待join 管生命周期四、future / packaged_task 机制重点目标auto f pool.submit([] { return 42; }); int result f.get();并且throw exception → future.get() 抛出实现方式std::packaged_task std::future五、基础线程池实现最小闭环这一部分是简化版逻辑帮助理解结构最终版在后面六、为什么必须有界队列如果使用无界队列请求过多 → 队列无限增长 → 内存爆炸OOM所以必须max_queue_size 控制系统压力七、拒绝策略设计工程核心当队列满时必须处理策略行为Abort抛异常CallerRuns当前线程执行Discard丢弃Block阻塞等待 本质系统背压Backpressure机制八、最终工程版线程池完整代码整合future shutdown 有界队列 拒绝策略#include atomic #include condition_variable #include functional #include future #include mutex #include queue #include thread #include vector #include stdexcept class ThreadPool { public: enum class State { Running, Stopping, Stopped }; enum class RejectPolicy { Abort, CallerRuns, Discard, Block }; ThreadPool(size_t thread_count, size_t max_queue_size, RejectPolicy policy RejectPolicy::Abort) : state_(State::Running), max_queue_size_(max_queue_size), reject_policy_(policy) { for (size_t i 0; i thread_count; i) { workers_.emplace_back([this] { worker_loop(); }); } } ~ThreadPool() { shutdown(); } templateclass F, class... Args auto submit(F f, Args... args) - std::futurestd::invoke_result_tF, Args... { using R std::invoke_result_tF, Args...; auto task std::make_sharedstd::packaged_taskR()( std::bind(std::forwardF(f), std::forwardArgs(args)...) ); std::futureR future task-get_future(); std::unique_lockstd::mutex lock(mtx_); if (state_ ! State::Running) { throw std::runtime_error(ThreadPool stopped); } // 队列满处理 if (tasks_.size() max_queue_size_) { switch (reject_policy_) { case RejectPolicy::Abort: throw std::runtime_error(Queue full); case RejectPolicy::Discard: return future; case RejectPolicy::CallerRuns: lock.unlock(); (*task)(); return future; case RejectPolicy::Block: not_full_cv_.wait(lock, [this] { return tasks_.size() max_queue_size_; }); break; } } tasks_.emplace([task]() { (*task)(); }); lock.unlock(); cv_.notify_one(); return future; } void shutdown() { state_ State::Stopping; cv_.notify_all(); for (auto t : workers_) { if (t.joinable()) t.join(); } } private: void worker_loop() { while (true) { std::functionvoid() task; { std::unique_lockstd::mutex lock(mtx_); cv_.wait(lock, [this] { return !tasks_.empty() || state_ ! State::Running; }); if (tasks_.empty() state_ ! State::Running) { return; } task std::move(tasks_.front()); tasks_.pop(); // 通知生产者队列有空间 not_full_cv_.notify_one(); } task(); // 锁外执行 } } private: std::vectorstd::thread workers_; std::queuestd::functionvoid() tasks_; std::mutex mtx_; std::condition_variable cv_; std::condition_variable not_full_cv_; std::atomicState state_; size_t max_queue_size_; RejectPolicy reject_policy_; };九、关键工程点总结1️⃣ 为什么任务必须锁外执行否则所有线程被串行化2️⃣ 为什么必须 predicate wait防止假唤醒丢通知3️⃣ 为什么 atomic 不能替代 mutexatomic 只能保证单变量安全无法保护复杂结构如 queue4️⃣ shutdown 为什么不能强杀线程因为C / Java 都只能协作退出十、系统取向总结线程池本质不是“线程集合”而是任务调度系统 并发控制系统 背压系统十一、最终口诀建议记住线程私有栈共享堆队列用 mutex等待用 cv状态用 atomic执行在锁外队列必须有界十二、你现在的水平做到这里你已经✔ 能写并发组件✔ 能讲线程池设计✔ 能对标 Java 并发模型✔ 能处理工程问题背压 / 生命周期 已经达到C 并发工程中高级水平下一篇线程池 网络服务骨架