本文共 2433 字,大约阅读时间需要 8 分钟。
Tars底层实现了一个线程池库,主要源代码位于tc_thread_poo.(cpp,h)文件中。线程池队列涉及的核心是工作线程和任务队列的设计。本文基于tars中的实现进行介绍。
线程池的工作线程类(ThreadWorker)继承自TC_Thread,它是tars实现的线程类,实现原理是基于C++的pthread_create函数。外部用户通过调用TC_Thread的start函数来启动线程,新线程的入口为threadEntry函数,该函数会调用TC_Thread的run函数。run函数是具体的线程执行逻辑,它是一个虚函数
virtual void run() = 0;
因此子类可以通过实现这个函数来控制线程的运行逻辑。我们下面我们根据ThreadWorker实现的run函数来看线程池中的线程的执行逻辑。
void TC_ThreadPool::ThreadWorker::run(){ //调用初始化部分 TC_FunctorWrapperInterface *pst = _tpool->get(); if(pst) { try { (*pst)(); } catch ( ... ) { } delete pst; pst = NULL; } //调用处理部分 while (!_bTerminate) { TC_FunctorWrapperInterface *pfw = _tpool->get(this); if(pfw != NULL) { auto_ptrapfw(pfw); try { (*pfw)(); } catch ( ... ) { } _tpool->idle(this); } } //结束 _tpool->exit();}
可以看到,run函数的执行流可以简单分为两个部分:
1. 在线程启动时,对线程进行初始化 2. while主循环,不断尝试从任务队列中提取任务并执行。这里的任务是以仿函数对象的形式存在的,这样就可以使得线程不必关心任务的具体实现逻辑,只需要直接调用仿函数即可。下面我们介绍一下线程池的实现,以及对外提供的接口。
TC_ThreadPool主要维护以下几个成员数据:
/** * 任务队列 */ TC_ThreadQueue_jobqueue; /** * 启动任务 */ TC_ThreadQueue _startqueue; /** * 工作线程 */ std::vector _jobthread; /** * 繁忙线程 */ std::set _busthread; /** * 任务队列的锁 */ TC_ThreadLock _tmutex; /** * 是否所有任务都执行完毕 */ bool _bAllDone;
_jobqueue是任务队列,用户通过调用线程池的exec接口向任务队列中添加任务
void exec(const TC_FunctorWrapper&tf)
_startqueue是中放的是线程启动初始化逻辑。带参start函数负责设置这个队列:
void start(const TC_FunctorWrapper&tf)
从这个函数的实现可以看到,每个线程的初始化逻辑是一样的。
_jobthread和_busthread是放置线程对象的集合,其中_busthread中存放正在执行任务的线程。
最后,_tmutex是任务锁,_bAllDone是一个状态变量。
线程池中的线程在操作TC_ThreadQueue时并没有加锁,这是因为它是一个线程安全的队列。它底层数据存储结构是deque。
TC_ThreadQueue的线程安全性得益于它继承了TC_ThreadLock类:
typedef TC_MonitorTC_ThreadLock;
该类提供了互斥锁和条件变量来保证多并发操作的安全。通过在TC_ThreadQueue涉及并发竞争问题的操作函数中调用TC_ThreadLock提供的相关操作接口,就可以保证TC_ThreadQueue相应操作函数的线程安全性。
本文以线程池的实现为入口,分别介绍了tars中实现的线程类,线程锁,以及基于线程锁的线程安全对象。所有需要线程安全的对象都可以通过继承TC_ThreadLock基类,然后运用TC_ThreadLock提供的操作函数达到对象的成员数据访问操作的并发安全(当然,这是由于TC_ThreadLock中内置了互斥锁和条件变量)。
转载地址:http://tpqxi.baihongyu.com/