[TOC]

线程池(thread pool)是一种在Linux并发场景中经常用到的工具。线程池的功能:

  • 提前创建若干个线程,将其设置为休眠
  • 当需要线程执行任务时,向线程池中添加一个任务,该线程池自动挑选一个空闲线程来执行任务。

一、线程池的相关API

1、线程操作

​ C++11之前,C++语言层面上是不支持多线程的,只能利用操作系统的系统调用。C++11支持标准的线程库std::thread,其基础操作如下:

#include<thread> 
using namespace std;
// 创建并且执行线程对象
thread t1;// 创建一个空的线程对象;
thread t2(f1,n+1);// 创建一个线程对象,传入返回值为void的f1函数,f1参数为n+1;
thread t3([]{});//创建一个线程对象,传入lambda表达式
// 线程的阻塞设置
t1.join;// 阻塞主线程,执行完后,主线程继续执行,一般用在主线程最后,防止子线程没有执行完毕就退出
t1.detach();// 不阻塞主线程,同时主线程继续执行,子线程变成守护线程,被C++运行时库接管。
//一个线程只能调用一次join()或detach(),调用detach()后不能再调用join()
// 线程的操作
joinable()// 检查线程是否可被join或detach()
std::this_thread::getid()// 获取当前线程的ID
std::thie_thread::yield()// 当前线程放弃执行
sleep_until()// 线程休眠至某个指定时刻才被重新唤醒
sleep_for()// 线程休眠某个指定的时间段才被重新唤醒

2、互斥锁操作

​ C++11通过#include <mutex>提供了互斥量机制,该文件提供4种锁变量:

std::mutex,最基本的 Mutex 类。
std::recursive_mutex,递归 Mutex 类。
std::time_mutex,定时 Mutex 类。
std::recursive_timed_mutex,定时递归 Mutex 类。

​ 上述这4种锁变量,其中基本都有lock()、unlock()和try_lock()等操作,这些操作都需要程序员自行调用。

​ 为了更加方便地对互斥量进行上锁和解锁,该文件又提供了2种更加方便的上锁的类:

std::lock_guard,与 Mutex RAII 相关,方便线程对互斥量上锁,但是上锁后只有在声明区结束时才能解锁
std::unique_lock,与 Mutex RAII 相关,方便线程对互斥量上锁,同时在声明区结束前可以再次解锁或者上锁。

unique_lock效率不如lock_guard,但是unique_lock的颗粒度更细,可以在声明区解锁和上锁。

​ 使用unique_lock的用法如下:

#include <mutex>
std::mutex mtx;// 互斥锁
{
    std::unique_lock<std::mutex> locker(mtx);// 对声明区上锁
    // 执行临界区的代码(保证同一时刻只能有一个线程在执行这段代码)
    locker.unlock();// 临时解锁
    // 执行非临界区的代码(同一时刻可能有多个线程在执行这段代码)
    locker.lock();// 再次加锁
}

3、条件变量操作

条件变量来实现。

条件变量是利用线程间共享的全局变量进行同步的线程同步机制

在条件变量上等待的线程以睡眠的方式等待条件变量的满足;

  • 一个线程等待”条件变量的条件成立”挂起
  • 另一个线程使”条件成立”

条件变量的使用总是和一个互斥锁结合在一起;

​ 现在我们考虑使用条件变量,继续在类模板中添加头文件:#include <condition_variable>// 条件变量

,然后在类模板中添加2个私有成员作为条件变量:

#include <condition_variable>// 条件变量
std::mutex mtx;// 创建一个互斥锁
std::condition_variable cond;//创建一个条件变量
{
    std::unique_lock<std::mutex> locker(mtx);// 对声明区上锁
    cond.wait(locker);// 阻塞当前线程,释放锁
    cnd.wait(locker, []{ return ready;}); //如果条件满足,继续执行;否则阻塞当前线程并释放锁
    cnd.wait_for(locker, time_duration, []{ return ready;}); // 如果条件满足或者超时则执行,否则当前线程并释放锁
    cnd.notify_one();//唤醒等待队列中的第一个阻塞线程;不存在锁争用,能够立即获得锁,其余的线程不会被唤醒
    cnd.notify_all();//唤醒等待队列中的所有阻塞线程,存在锁争用,只有一个线程能够获得锁,其余未获取锁的线程继续尝试获得锁
}// 自动解锁

二、线程池的实现

​ 线程池(thread pool)是一种在Linux并发场景中经常用到的工具,它的作用是提前创建好若干个线程,避免在业务繁忙的高并发场景中临时频繁创建和销毁线程带来的性能损耗。线程池的基本需求是:

提前创建若干个线程,将其设置为休眠

当需要线程执行任务时,向线程池中添加一个任务,该线程池自动挑选一个空闲线程来执行任务。

​ 实现线程池的难点在于解决并发问题:将多个任务同时分配给多个线程,需要避免多个线程同时竞争同一个任务。解决思路是使用一个任务队列,向队列添加任务时需要加锁,确保同一时刻只能被一个线程添加;从队列取出任务时需要加锁,确保线程池中的多个线程在同一时刻只能有一个线程取出任务,并且该线程在执行任务时需要解锁,最大程度保证并发性,这中间需要涉及多线程、互斥锁、条件变量。

threadpool.h

/*
 * @Author: MRL Liu
 * @Date: 2022-04-07 23:32:44
 * @Description: threadpool类的实现
 * @LastEditTime: 2022-05-19 16:25:30
 * @FilePath: \C++\pool\threadpool.h
 */

#ifndef MRL_THREADPOOL_H
#define  MRL_THREADPOOL_H

#include <mutex>// 互斥锁
#include <condition_variable>// 条件变量
#include <queue>
#include <thread>
#include<atomic>// 原子性
#include <functional>// 调用对象
#include<assert.h>


// 线程池
class ThreadPool {
public:
    using Task= std::function<void()>;// C++11允许使用using为模板定义别名,typedef可以给普通变量定义别名
    // 构造函数,explicit表明必须显示调用
    explicit ThreadPool(size_t  threadCount):
    // 初始化列表
    poolPtr(std::make_shared<Pool>()) {
        this->poolPtr->isClosed=false;
        this->AddThreads( threadCount);
    }
    // 析构函数
    ~ThreadPool() {
        // 判断该智能指针是否存在
        if(static_cast<bool>(this->poolPtr)) {
            {
                std::lock_guard<std::mutex> locker(this->poolPtr->queue_mutex);// 声明区加锁
                this->poolPtr->isClosed = true;// 设置线程池状态为关闭
            }
            this->poolPtr->condition.notify_all();//唤醒所有阻塞线程
            for(std::thread& t: this->poolPtr->threadsArr){
                if(t.joinable()){
                    t.join();// 如果有线程没有执行完,等待执行完毕
                }
            }
        }
    }
    // 禁用如下默认函数
    ThreadPool() = delete;// 默认构造函数
    ThreadPool(ThreadPool&&) = delete; // 移动构造函数
    ThreadPool(const ThreadPool&)=delete;// 复制构造函数
    ThreadPool & operator=(const ThreadPool& other)=delete;//=运算符重载函数
public:
    // 添加若干个线程
    void AddThreads(size_t  threadCount){
        assert(threadCount > 0);
        //创建threadCount个线程对象
        for(size_t i = 0; i < threadCount; i++) {
            // 创建一个线程对象,传入lambda表达式
            this->poolPtr->threadsArr.emplace_back(
                std::thread(
                        // lambda表达式
                        [pool = this->poolPtr] {
                            std::unique_lock<std::mutex> locker(pool->queue_mutex);//对以下声明块加锁
                            while(true) {
                                // 如果任务队列不为空
                                if(!pool->taskQue.empty()) {
                                    // 出队,获取一个任务,使用了移动语义
                                    auto task = std::move(pool->taskQue.front());
                                    pool->taskQue.pop();
                                    // 临时解锁,让其他线程去竞争锁
                                    locker.unlock();
                                    // 执行任务
                                    task();
                                    // 继续加锁
                                    locker.lock();
                                } 
                                else if(pool->isClosed) break;// 如果线程池关闭,跳出循环,结束线程
                                // 此时任务队列为空,且线程池没有关闭,
                                else pool->condition.wait(locker);//先阻塞当前线程,然后解锁locker,等待唤醒
                            }// 自动解锁
                       }
                )
            );
            
        }
    }
    // 函数模板,添加一个任务
    template<class F>
    void AddTask(F&& task) {
        if(!this->poolPtr->isClosed){
            std::lock_guard<std::mutex> locker(poolPtr->queue_mutex);// 声明区加锁
            this->poolPtr->taskQue.emplace(std::forward<F>(task));// 向任务队列中添加一个任务
            this->poolPtr->condition.notify_one();// 唤醒等待队列中的一个阻塞线程
        }
        else{
            throw std::runtime_error("theadpool is closed but have task adding");
        }
    }
private:
    struct Pool {
        // 数据结构
        std::queue<Task> taskQue;//任务队列,每个元素为函数对象类型
        std::vector<std::thread> threadsArr;//工作线程数组,每个元素为线程对象类型
        // 多线程同步
        std::mutex queue_mutex;// 互斥锁,   保证每一刻只有一个线程可以从任务队列中取出任务
        std::condition_variable condition;// 条件变量,控制阻塞线程的唤醒与阻塞
        // 标志变量
        std::atomic_bool  isClosed;// 原子布尔变量,是否关闭,可以像正常bool一样使用,但是该变量具有原子性
    };
    std::shared_ptr<Pool>  poolPtr;// 共享智能指针
};

// void test_pool(){
//     ThreadPool* pool=new  ThreadPool(8);
//     pool->AddTask([]{
//             //std::cout<<"hello,world!!"<<endl;
//     });
//     getchar();
// }
#endif // MRL_THREADPOOL_H