【Linux】一篇文章搞定 线程池
线程池
- 线程池的概念
- Linux下线程池的CPP模拟实现
1. 线程池的概念
- 线程池的本质:一个
线程安全
的队列
+ 一堆线程
- 线程安全队列中的元素类型: 类型 =
数据
+该数据处理方式
图解
2. Linux下线程池的CPP模拟实现
- 将
数据
和处理该数据的方法
封装成一个数据类
,并提供一个run方法
,调用数据处理方法
处理数据
主线程
负责向线程池
的安全队列
中插入
数据线程池
中的工作线程
负责从安全队列
中拿出
数据,并用数据类提供的run方法,让数据自己处理自己include
include
include
include
include
include
using namespace std;
//函数指针的宏定义
typedef void (*Handler)(int);template
class QueueData{public:
//传入数据和处理数据的函数
QueueData(T data_,Handler handler_)
{
data = data_;
handler = handler_;
}
~QueueData(){ }
//调用run函数来使用传入的函数处理传入的数据
void run()
{
handler(data);
}
private:
T data; //数据
Handler handler; //处理数据的方法
};
template
class ThreadPool{public:
//初始化线程池 并按照传入 线程数量 创建 相应个数的线程
ThreadPool(int Capacity,int Thread_count)
{
//初始化 队列容量+线程数量+互斥锁+条件变量+退出标志
capacity = Capacity;
thread_count = Thread_count;
lock = PTHREAD_MUTEX_INITIALIZER;
consumer_cond = PTHREAD_COND_INITIALIZER;
flag = false;
//创建线程
pthread_t ptid;
for(int i = 0; i < thread_count; i++)
{
int ret = pthread_create(&ptid,NULL,PoolStart,(void*)this);
if(ret < 0)
perror("pthread_create");
}
}
~ThreadPool()
{
// 销毁 条件变量+互斥锁
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&consumer_cond);
}
//主线程调用 向队列中插入数据
void Push(QueueData<T>* qd)
{
pthread_mutex_lock(&lock);
if(flag) //
{
pthread_mutex_unlock(&lock);
return;
}
safe_queue.push(qd);
pthread_mutex_unlock(&lock);
pthread_cond_signal(&consumer_cond); //提醒PCB等待队列中的线程可以来取数据啦
}
//调用该函数表示数据处理完了 可以退出所有线程了
void ThreadExit()
{
pthread_mutex_lock(&lock);
flag = true;
pthread_mutex_unlock(&lock);
pthread_cond_broadcast(&consumer_cond);
}
private:
queue<QueueData<T>* > safe_queue; //队列
size_t capacity; //安全队列中最大容量
pthread_mutex_t lock; //保证主线程与消费线程之间互斥
pthread_cond_t consumer_cond; //保证消费线程之间同步
size_t thread_count; //线程池中的线程数量
bool flag; //线程是否可以退出的标志
//工作线程调用
void Pop(QueueData<T>** qd)
{
(*qd) = safe_queue.front(); //将安全队列中的数据放入参数中
safe_queue.pop(); //将队首元素弹出
}
//线程入口函数中调用Pop函数,将this指针传入进来
static void* PoolStart(void* arg)
{
//线程分离,线程退出后OS自动回收其资源
pthread_detach(pthread_self());
//将void*类型的参数强转成ThreadPool*
ThreadPool* p = (ThreadPool*)arg;
//工作线程的任务就是取出数据并处理数据
while(1)
{
//为了保证线程能够独占式访问共享资源,须加锁
pthread_mutex_lock(&p->lock);
while(p->safe_queue.empty())
{
if(p->flag) //当flag为true时表示数据处理完了 该线程可以退出了
{
(p->thread_count)--;
pthread_mutex_unlock(&p->lock);
pthread_exit(NULL);
}
pthread_cond_wait(&p->consumer_cond,&p->lock); //当队列空就等待并释放锁
}
QueueData<T>* qd;
p->Pop(&qd); //从安全队列中拿出来数据
//先释放锁 再 处理数据,反之,会导致处理完数据再释放锁,释放锁时间过长
pthread_mutex_unlock(&p->lock);
qd->run();
}
}
};
//处理数据的函数
void DealData(int data)
{printf("%d\n",data);
}
int main()
{//创建线程池,设置安全队列中最多可以保存4个元素,线程数量规定为4个
ThreadPool<int>* tp = new ThreadPool<int>(4,4);
if(!tp) return -1;
//假设需要处理 100 个数据
for(int i = 0; i < 100; i++)
{
QueueData<int>* qd = new QueueData<int>(i,DealData);
if(!qd) continue;
tp->Push(qd);
}
sleep(3);
//退出线程池
tp->ThreadExit();
return 0;
}
还没有评论,来说两句吧...