加入收藏 | 设为首页 | 会员中心 | 我要投稿 财气旺网 - 财气网 (https://www.caiqiwang.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 服务器 > 搭建环境 > Linux > 正文

C++11实现的100行线程池

发布时间:2022-10-05 13:06:41 所属栏目:Linux 来源:
导读:  linux服务器开发相关视频解析:

  带你手写线程池,面试不惧手撕( 完整版)

  BAT面试必备:多线程、多进程、协程如何选择及线程池如何最高效

  C++线程池一直都是各位程序员们造轮子的首选项
  linux服务器开发相关视频解析:
 
  带你手写线程池,面试不惧手撕( 完整版)
 
  BAT面试必备:多线程、多进程、协程如何选择及线程池如何最高效
 
  C++线程池一直都是各位程序员们造轮子的首选项目之一。今天,小编带大家一起来看看这个轻量的线程池,本线程池是header-only的,并且整个文件只有100行,其中C++的高级用法有很多,很值得我们学习,一起来看看吧。
 
  线程池
 
  C++带有线程操作,异步操作,就是没有线程池,至于线程池的概念,我先搜一下别人的解释:
 
  一般而言,线程池有以下几个部分:
 
  1. 完成主要任务的一个或多个线程。
 
  2. 用于调度管理的管理线程。
 
  3. 要求执行的任务队列。
 
  我来讲讲人话:你的函数需要在多线程中运行,但是你又不能每来一个函数就开启一个线程,所以你就需要固定的N个线程来跑执行,但是有的线程还没有执行完,有的又在空闲,如何分配任务呢,你就需要封装一个线程池来完成这些操作,有了线程池这层封装,你就只需要告诉它开启几个线程,然后直接塞任务就行了,然后通过一定的机制获取执行结果。
 
  分析源代码 头文件
 
  #include
  #include
  #include
  #include
  #include
  #include
  #include
  #include
  #include
  vector,queue,momory 都没啥说的,thread线程相关,mutex 互斥量,解决资源抢占问题,condition_variable 条件量,用于唤醒线程和阻塞线程,future 从使用的角度出发,它是一个获取线程数据的函数。functional 函数子,可以理解为规范化的函数指针。stdexcept 就跟它的名字一样,标准异常。
 
  class ThreadPool {
  public:
      ThreadPool(size_t);
      template
      auto enqueue(F&& f, Args&&... args)
          -> std::future::type>;
      ~ThreadPool();
  private:
      // need to keep track of threads so we can join them
      std::vector< std::thread > workers;
      // the task queue
      std::queue< std::function > tasks;
      
      // synchronization
      std::mutex queue_mutex;
      std::condition_variable condition;
      bool stop;
  };
  线程池的声明,构造函数,一个enqueue模板函数 返回std::future, 然后这个type又利用了运行时检测(还是编译时检测?)推断出来的,非常的amazing啊。成功的使用一行代码反复套娃,这高阶的用法就是大佬的水平吗,i了i了。
 
  workers 是vector 俗称工作线程。
 
  std::queue> tasks 俗称任务队列。
 
  那么问题来了,这个任务队列的任务只能是void() 类型的吗?感觉没那么简单,还得接着看呐。
 
  mutex,condition_variable 没啥讲的,stop 控制线程池停止的。
 
  // the constructor just launches some amount of workers
  inline ThreadPool::ThreadPool(size_t threads)
      :   stop(false)
  {
      for(size_t i = 0;i task;
                      {
                          std::unique_lock lock(this->queue_mutex);
                          this->condition.wait(lock,
                              [this]{ return this->stop || !this->tasks.empty(); });
                          if(this->stop && this->tasks.empty())
                              return;
                          task = std::move(this->tasks.front());
                          this->tasks.pop();
                      }
                      task();
                  }
              }
          );
  }
  大佬写的注释就是这么朴实无华,说这个构造函数仅仅是把一定数量的线程塞进去,我是看了又看才悟出来这玩意是什么意思……虽然本质上的确是它说的只是把线程塞进去,但是这个线程也太绕了。
 
  workers.emplace_back 参数是一个lambda表达式,不会阻塞,也就是说最外层的是一个异步函数,每个线程里面的事情才是重点。
 
  labmda表达式中最外层是一个死循环,至于为什么是for(;;)而不是while(1) 这虽然不是重点,不过大佬的用法还是值得揣摩的,我估计是效率会更高?
 
  task 申明后,紧跟着一个大括号,这个{}里面的部分,是一个同步操作,至于为什么用this->lock 而不是直接使用[&]来捕获参数,想来也是处于内存考虑。精打细算的风格像极了抠门的地主,i了i了。
 
  【文章福利】需要C/C++ Linux服务器架构师学习资料加群812855908(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等)
 
  紧接着一个wait(lock,condtion)的操作,像极了千层饼的套路。
 
  第一层:这TM不是要锁死自己啊?这样不是构造都得卡死?
 
  第二层:我们看到它emplace_back了一个线程,不会阻塞,但是等开锁,锁不就在它自己的线程里面嘛?那不得锁死了啊?
 
  第三层:我们看到这个lock其实只是个包装,真正的锁是外层的mutex,所以从这里是不存在死锁的。但是你的wait的condition怎么可能不懂呢,必须要 stop 或者 !empty 才wait吗?
 
  第四层:我们查资料发现后面的condition是返回false才会wait,也就是说要!stop && empty才会wait,就是说这个线程池是 运行态,并且没有任务才才会执行等待操作!否则就不等了,直接冲!
 
  第五层:既然你判断了上面判断了stop和非空,为啥下面还要判断stop和空才退出呢?不显得冗余?
 
  第六层:要确定它的确是被置为stop了,且队列执行空了,它才能够光荣退休。有没有问题呢,有,最后所有线程都阻塞了,你stop置为true它们也不知道啊……
 
  我估计它的stop会有唤醒所有线程的操作,不过如果有的在执行,有的在等待,应该没办法都通知到位,但是在执行的在下一次判断的时候也能正常退出。
 
  因为有了疑惑,我们就想看stop相关的操作,结果发现放在了析构函数里面……
 
  // the destructor joins all threads
  inline ThreadPool::~ThreadPool()
  {
      {
          std::unique_lock lock(queue_mutex);
          stop = true;
      }
      condition.notify_all();
      for(std::thread &worker: workers)
          worker.join();
  }
  {}里面上锁进行了stop为true的操作,至于为什么不用原子操作,我也不知道,但是仔细想了下大概是因为本来就有一把锁了,再用原子就不是内味儿了。然后它果然通知了所有,并且还把工作线程join了。也就是等它们结束。
 
  结束了千层饼の解析之后,我们看看最重要的入队操作
 
  // add new work item to the pool
  template
  auto ThreadPool::enqueue(F&& f, Args&&... args)
      -> std::future::type>
  {
      using return_type = typename std::result_of::type;
      auto task = std::make_shared< std::packaged_task >(
              std::bind(std::forward(f), std::forward(args)...)
          );
          
      std::future res = task->get_future();
      {
          std::unique_lock lock(queue_mutex);
          // don't allow enqueueing after stopping the pool
          if(stop)
              throw std::runtime_error("enqueue on stopped ThreadPool");
          tasks.emplace([task](){ (*task)(); });
      }
      condition.notify_one();
      return res;
  }
  typename std::result_of::type中的typename 应该是为消除歧义的,或者因为嵌套依赖名字的关系,做为一个坚决不写模板的普通程序员,这段代码太难了……-> type 我倒是知道怎么回事,就是指明它的返回类型的一种方式result_of 应该是指明了F是一个函数,签名为Args...这个变参线程池linux,Args是啥它不关系,它关心的是返回值的参数类型 所以有个type。
 
  至于为什么函数入口是一个右值引用那就超出我的理解范围了。难道说functional 必须要右值引用?那它的销毁谁来管呢?这个线程来管吗?这些坑我以后慢慢填。
 
  前面我们说了tasks 只能接收void() 的函数类型,这里使用std::packaged_task完成对函数类型的推导,至于为什么不用 function ,因为这还不是最终放入tasks的对象,它要承接一个返回future的工作,而package_task就是来打包返回future的……
 
  然后就是加锁入队+通知工作线程+返回future的操作。本来是线程池最难理解的部分,反而显得平淡无奇了,因为前面那些花里胡哨的操作已经很好的打通了我们的理解能力。
 

(编辑:财气旺网 - 财气网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!