[TOC]
阻塞队列(block queue)是一种支持阻塞的插入和移除的队列,用于多线程或多进程同步的情景。阻塞队列使用的是生产者-消费者模式,生产者指的是向队列中添加元素的线程,消费者是从队列中获取元素的线程。
- 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
- 支持阻塞的移除方法:意思是队列为空时,获取元素(同时移除元素)的线程会被阻塞,等到队列变为非空。
和阻塞队列相关的知识点有:互斥锁+条件变量+生产者-消费者模型
一、阻塞队列的实现
1、定义类模板
现在我们考虑实现基于deque的一个有界双向阻塞队列的类模板,我们基于C++的deque容器来进行封装,需要说明的是,阻塞队列的元素存储的容器类型有很多种,不一定非得是deque。首先定义如下:
#include <deque> // 包含队列容器
template<class T>
class BlockQueue{
public:
BlockQueue(size_t MaxCapacity = 1000);//构造函数
~BlockQueue();//析构函数
private:
size_t capacity_;//队列容量
std::deque<T> deq_;//队列容器
}
// 构造函数的实现
template<class T>
BlockDeque<T>::BlockDeque(size_t MaxCapacity) {
capacity_=MaxCapacity;
assert(MaxCapacity > 0);
}
上述队列是一个封装了deque容器的名为BlockQueue的类模板,因为我们定义的阻塞队列计划是有界的,其设置了一个最大容量变量capacity_。
2、基于互斥锁实现简单函数
由于阻塞队列是用于并发编程的,所以现在我们继续添加并发设置。在Linux的并发编程中,可能存在多个线程调用BlockQueue的函数,所以我们要使用互斥锁。首先导入\#include <mutex> // 互斥锁
,然后在类模板中添加一个私有成员变量:std::mutex mtx_;
。
接着我们就可以在每一个BlockQueue的函数实现中使用mtx_来加锁,这样就可以保证在同一时刻,只能有一个线程执行其函数,我们这里简单一些函数实现,下面的函数非常简单,都是直接调用deque的API,唯一的区别在于每个函数开始时都加有一行lock_guard<mutex> locker(mtx_);
,这行代码在自己的声明空间(即当前函数体)内进行了加锁,在执行该行代码时加锁,执行完该函数返回时解锁。
// 获取队列的元素个数
template<class T>
size_t BlockDeque<T>::size() {
lock_guard<mutex> locker(mtx_);// 在声明空间内加锁
return deq_.size();
}
// 获取队列的容量
template<class T>
size_t BlockDeque<T>::capacity() {
lock_guard<mutex> locker(mtx_);
return capacity_;
}
// 获取队列的队首元素
template<class T>
T BlockDeque<T>::front() {
lock_guard<mutex> locker(mtx_);
return deq_.front();
}
// 获取队列的队尾元素
template<class T>
T BlockDeque<T>::back() {
lock_guard<mutex> locker(mtx_);
return deq_.back();
}
// 清空队列
template<class T>
void BlockDeque<T>::clear() {
lock_guard<mutex> locker(mtx_);
deq_.clear();
}
// 判断队列是否为空
template<class T>
bool BlockDeque<T>::empty() {
lock_guard<mutex> locker(mtx_);
return deq_.empty();
}
// 判断队列是否已满
template<class T>
bool BlockDeque<T>::full(){
lock_guard<mutex> locker(mtx_);
return deq_.size() >= capacity_;
}
3、基于条件变量的实现push()和pop()
基于互斥锁可以简单实现阻塞队列的简单函数,但是对于它最重要的push和pop函数却还不够。我们先看互斥锁的特点
互斥锁是一种互斥机制,互斥量(mutex)从本质上说是一把锁,在访问共享资源前对互斥量进行加锁,在访问完成后释放互斥量上的锁。在互斥量进行加锁以后,任何其它试图再次对互斥量加锁的线程将会阻塞直到当前线程释放该互斥锁。如果释放互斥锁时有多个线程阻塞,所有在该互斥锁上的阻塞线程都会变成可运行状态,第一个变为可运行状态的线程可以对互斥锁加锁,其它线程将会看到互斥锁依然被锁住,只能回去再次等待它重新变为可用。
简单来讲,互斥锁只保证了同一时刻只有一个线程访问共享资源(调用一个函数),但是一旦该锁被释放,所以在等待它的线程就都会被唤醒去竞争该函数的调用权,先到先得。这种机制在阻塞队列的push和pop中是低效的。
我们先将所有等待调用push()的线程叫做生产者线程,将所有等待调用pop()的线程叫做消费者线程。那么存在如下规则:
生产者线程必须在容器没有满的时候才适合调用,这样生产者才能成功执行push()。
消费者线程必须在容器不为空的时候才适合调用,这样消费者才能成功执行pop()。
也就是说,生产者线程被调用的条件是容器没有满且互斥锁空闲;消费者线程被调用的条件是容器不为空且互斥锁空闲。互斥锁只能保证互斥锁空闲,其无法保证容器是否满或者是否为空的条件,这个时候我们可以借助条件变量来实现。
条件变量是利用线程间共享的全局变量进行同步的线程同步机制;
在条件变量上等待的线程以睡眠的方式等待条件变量的满足;
- 一个线程等待”条件变量的条件成立”挂起
- 另一个线程使”条件成立”
条件变量的使用总是和一个互斥锁结合在一起;
现在我们考虑使用条件变量,继续在类模板中添加头文件:#include <condition_variable>// 条件变量
,然后在类模板中添加2个私有成员作为条件变量:
// 条件变量
condition_variable condProducer_;//生产者
condition_variable condConsumer_;//消费者
现在我们实现阻塞队列的push_back()方法:
// 阻塞队列的push()方法
template<class T>
void BlockDeque<T>::push_back(const T &item) {
unique_lock<mutex> locker(mtx_);
// 当队列容器已满时,阻塞生产者队列
while(deq_.size() >= capacity_) {
condProducer_.wait(locker);//该函数将线程设置为阻塞状态,直至条件变量被解锁
}
// 此时队列不满,执行入队
deq_.push_back(item);
// 队列容器一定不为空,设置消费者变量解锁
condConsumer_.notify_one();
}
可以看到上述代码中存在2个条件变量的API,wait()和notify_one(),前者让当前执行push()的线程又被上了一个条件变量的”锁”,直至”锁”被打开才会返回;后者是打开一个消费者条件变量的”锁”。
相比我们现在已经能猜到pop()方法的实现:
void BlockDeque<T>::pop_back(T &item) {
unique_lock<mutex> locker(mtx_);
// 当队列容器为空时,阻塞消费者者队列
while(deq_.empty()_) {
condProducer_.wait(locker);//该函数将线程设置为阻塞状态,直至条件变量被解锁
}
// 此时队列不满,执行入队
item = deq_.front();
deq_.pop_front();
// 队列容器一定不满,设置生产者变量解锁
condProducer_.notify_one();
}
在上述代码中,可以看到,线程在执行阻塞队列的push()和pop()时,先判断一下当前条件是否满足,不满足就用条件变量上锁wait(locker),此时线程就会阻塞在这里不能继续执行。消费者线程的条件变量由生产者线程成功执行一次后解”锁”,生产者线程的条件变量由消费者线程成功执行一次后解”锁”。两个线程互相掌握着对方的”钥匙”,这会不会形成死锁呢?不会,因为队列容器一开始是空的,生产者线程一定可以执行成功一次,然后消费者线程就可以成功执行了。
此时我们就已经实现了阻塞队列的重要方法了。
实际上,如果一个消费者线程也可以设置等待条件变量解“锁”的时间,设置最大时限,如下:
// 阻塞队列的pop()方法
template<class T>
bool BlockDeque<T>::pop(T &item, int timeout) {
unique_lock<std::mutex> locker(mtx_);
// 当队列容器为空时,阻塞消费者者队列
while(deq_.empty()){
if(condConsumer_.wait_for(locker, chrono::seconds(timeout)) == cv_status::timeout)return false;
if(isClose_)return false;
}
// 此时队列为空,执行出队
item = deq_.front();
deq_.pop_front();
// 队列容器一定不满,设置生产者变量解锁
condProducer_.notify_one();
return true;
}
二、阻塞队列的类型介绍
前文我们实现了一个基于deque的阻塞队列,实际上的阻塞队列远不止deque。具体来讲如下:
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列
- DelayQueue:一个使用优先级队列实现的无界阻塞队列
- SynchronousQueue:一个不存储元素的阻塞队列
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列
如果是无界阻塞队列,队列则不会出现满的情况。
1.ArrayBlockingQueue
此队列按照先进先出(FIFO)的原则对元素进行排序
默认情况下不保证线程公平地访问队列(所谓公平是指当队列可用时,先被阻塞的线程先访问队列)
为了保证公平性通常会降低吞吐量。
公平锁是利用了可重入锁的公平锁来实现。
3.PriorityBlockingQueue
默认情况下元素采取自然顺序升序排列
可以自定义Comparator
或者自定义类实现compareTo()
方法来指定排序规则
不支持同优先级元素排序
4.DelayQueue
队列使用PriorityQueue
来实现,队列中的元素必须实现Delayed
接口
只有在延时期满才能从队列中提取元素。Java 核心技术知识点我都整理成面试题和答案了,关注公众号Java技术栈,回复:面试,可以免费获取。
三、阻塞队列不可用的处理方式
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里获取元素的线程。第一节中我们实现的队列介绍了阻塞队列不可用的两种处理方式,一种是一直阻塞,一种是超时退出。
当阻塞队列不可用时,常见的可以有如下4种相应的处理方式:
处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入操作 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除操作 | remove() | poll() | take() | poll(time, unit) |
获取操作 | element() | peek() | 不可用 | 不可用 |
- 返回特殊值:插入元素时,会返回是否插入成功,成功返回true。如果是移除方法,则是从队列中取出一个元素,没有则返回null。
- 一直阻塞:当阻塞队列满时,如果生产者线程往队列里面put元素,则生产者线程会被阻塞,知道队列不满或者响应中断退出。当队列为空时,如果消费者线程从队列里take元素。
- 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定时间,生产者线程就会退出。
四、本文所述代码
#include <deque> // 包含队列容器
#include <mutex> // 互斥锁
#include <condition_variable>// 条件变量
using namespace std;
template<class T>
class BlockQueue{
public:
explicit BlockQueue(size_t MaxCapacity = 1000);//构造函数,explicit限制其被隐式调用
~BlockQueue();//析构函数
public:
void push_back(const T &item);
void push_back(const T &item, int timeout);
void push_front(const T &item);
void push_front(const T &item, int timeout);
bool pop_front(T &item);
bool pop_front(T &item, int timeout);
bool pop_back(T &item);
bool pop_back(T &item, int timeout);
public:
void flush();
size_t size();
size_t capacity();
void clear();
bool empty();
bool full();
void Close();
T front();
T back();
private:
size_t capacity_;//队列容量
deque<T> deq_;//队列容器
bool isClose_;
// 互斥锁
mutex mtx_;
// 条件变量
condition_variable condProducer_;//生产者
condition_variable condConsumer_;//消费者
}
// 构造函数的实现
template<class T>
BlockDeque<T>::BlockDeque(size_t MaxCapacity) {
capacity_=MaxCapacity;
assert(MaxCapacity > 0);
}
// 析构函数的实现
template<class T>
BlockDeque<T>::~BlockDeque() {
Close();
};
// 关闭阻塞队列
template<class T>
void BlockDeque<T>::Close() {
{
lock_guard<mutex> locker(mtx_);
deq_.clear();//清空容器
isClose_ = true;
}
condProducer_.notify_all();
condConsumer_.notify_all();
};
// 阻塞队列的push()方法
template<class T>
void BlockDeque<T>::push_back(const T &item) {
unique_lock<mutex> locker(mtx_);
// 当队列容器已满时,阻塞生产者队列
while(deq_.size() >= capacity_) {
condProducer_.wait(locker);//该函数将线程设置为阻塞状态,直至条件变量被解锁
}
// 此时队列不满,执行入队
deq_.push_back(item);
// 队列容器一定不为空,设置消费者变量解锁
condConsumer_.notify_one();
}
void BlockDeque<T>::push_back(const T &item, int timeout) {
unique_lock<mutex> locker(mtx_);
// 当队列容器已满时,阻塞生产者队列
while(deq_.size() >= capacity_) {
if(condConsumer_.wait_for(locker, chrono::seconds(timeout)) == cv_status::timeout) return;
}
// 此时队列不满,执行入队
deq_.push_back(item);
// 队列容器一定不为空,设置消费者变量解锁
condConsumer_.notify_one();
}
template<class T>
void BlockDeque<T>::push_front(const T &item) {
unique_lock<mutex> locker(mtx_);
// 当队列容器已满时,阻塞生产者队列
while(deq_.size() >= capacity_) {
condProducer_.wait(locker);//该函数将线程设置为阻塞状态,直至条件变量被解锁
}
// 此时队列不满,执行入队
deq_.push_front(item);
// 队列容器一定不为空,设置消费者变量解锁
condConsumer_.notify_one();
}
void BlockDeque<T>::push_front(const T &item, int timeout) {
unique_lock<mutex> locker(mtx_);
// 当队列容器已满时,阻塞生产者队列
while(deq_.size() >= capacity_) {
if(condConsumer_.wait_for(locker, chrono::seconds(timeout)) == cv_status::timeout) return;
}
// 此时队列不满,执行入队
deq_.push_front(item);
// 队列容器一定不为空,设置消费者变量解锁
condConsumer_.notify_one();
}
// 阻塞队列的pop()方法
template<class T>
bool BlockDeque<T>::pop_front(T &item, int timeout) {
unique_lock<std::mutex> locker(mtx_);
// 当队列容器为空时,阻塞消费者者队列
while(deq_.empty()){
if(condConsumer_.wait_for(locker, chrono::seconds(timeout)) == cv_status::timeout)return false;
if(isClose_)return false;
}
// 此时队列为空,执行出队
item = deq_.front();
deq_.pop_front();
// 队列容器一定不满,设置生产者变量解锁
condProducer_.notify_one();
return true;
}
template<class T>
bool BlockDeque<T>::pop_front(T &item) {
unique_lock<mutex> locker(mtx_);
while(deq_.empty()){
condConsumer_.wait(locker);
if(isClose_)return false;
}
item = deq_.front();
deq_.pop_front();
condProducer_.notify_one();
return true;
}
bool BlockDeque<T>::pop_back(T &item) {
unique_lock<mutex> locker(mtx_);
// 当队列容器为空时,阻塞消费者者队列
while(deq_.empty()_) {
condProducer_.wait(locker);//该函数将线程设置为阻塞状态,直至条件变量被解锁
if(isClose_)return false;
}
// 此时队列不满,执行入队
item = deq_.front();
deq_.pop_front();
// 队列容器一定不满,设置生产者变量解锁
condProducer_.notify_one();
return true;
}
template<class T>
bool BlockDeque<T>::pop_front(T &item, int timeout) {
unique_lock<std::mutex> locker(mtx_);
// 当队列容器为空时,阻塞消费者者队列
while(deq_.empty()){
if(condConsumer_.wait_for(locker, chrono::seconds(timeout)) == cv_status::timeout)return false;
if(isClose_)return false;
}
// 此时队列为空,执行出队
item = deq_.front();
deq_.pop_front();
// 队列容器一定不满,设置生产者变量解锁
condProducer_.notify_one();
return true;
}
// 刷新
template<class T>
void BlockDeque<T>::flush() {
condConsumer_.notify_one();//
};
// 获取队列的元素个数
template<class T>
size_t BlockDeque<T>::size() {
lock_guard<mutex> locker(mtx_);// 加锁
return deq_.size();
}
// 获取队列的容量
template<class T>
size_t BlockDeque<T>::capacity() {
lock_guard<mutex> locker(mtx_);
return capacity_;
}
// 获取队列的队首元素
template<class T>
T BlockDeque<T>::front() {
lock_guard<mutex> locker(mtx_);
return deq_.front();
}
// 获取队列的队尾元素
template<class T>
T BlockDeque<T>::back() {
lock_guard<mutex> locker(mtx_);
return deq_.back();
}
// 清空队列
template<class T>
void BlockDeque<T>::clear() {
lock_guard<mutex> locker(mtx_);
deq_.clear();
}
// 判断队列是否为空
template<class T>
bool BlockDeque<T>::empty() {
lock_guard<mutex> locker(mtx_);
return deq_.empty();
}
// 判断队列是否已满
template<class T>
bool BlockDeque<T>::full(){
lock_guard<mutex> locker(mtx_);
return deq_.size() >= capacity_;
}