C/C++手撕線程池(線程池的封裝和實現(xiàn))
本文使用的源碼地址:
https://github.com/SCljh/thread_pool
線程池描述
?
池式結(jié)構(gòu)
??在計算機體系結(jié)構(gòu)中有許多池式結(jié)構(gòu):內(nèi)存池、數(shù)據(jù)庫連接池、請求池、消息隊列、
對象池等等。
??池式結(jié)構(gòu)解決的主要問題為緩沖問題,起到的是緩沖區(qū)的作用。
?
線程池
?通過使用線程池,我們可以有效降低多線程操作中任務(wù)申請和釋放產(chǎn)生的性能消耗。
特別是當(dāng)我們每個線程的任務(wù)處理比較快時,系統(tǒng)大部分性能消耗都花在了
pthread_create以及釋放線程的過程中。
那既然是這樣的話,何不在程序開始運行階段提前創(chuàng)建好一堆線程,
等我們需要用的時候只要去這一堆線程中領(lǐng)一個線程,用完了再放回去,
等程序運行結(jié)束時統(tǒng)一釋放這一堆線程呢?按照這個想法,線程池出現(xiàn)了。
?
線程池解決的問題
解決任務(wù)處理
阻塞IO
解決線程創(chuàng)建于銷毀的成本問題
管理線程
??線程池應(yīng)用之一:日志存儲
??在服務(wù)器保存日志至磁盤上時,性能往往壓在磁盤讀寫上,
而引入線程池利用異步解耦可以解決磁盤讀寫性能問題。
??線程池的主要作用:異步解耦
?
說了那么多線程池的優(yōu)點,那接下來要做的就是手撕這誘人的玩意了。
?
樸實無華但不枯燥的代碼(以c++為例)
本文主要講解的是c++線程池的實現(xiàn),C語言實現(xiàn)其實思想和c++是一致的,
具體的代碼可見文章開頭的鏈接。
線程池中比較關(guān)鍵的東西
??若想自己編寫一個線程池框架,那么可以先關(guān)注線程池中比較關(guān)鍵的東西:
工作隊列
任務(wù)隊列
線程池的池
pthread_create中的回調(diào)函數(shù)
為什么說這些東西比較關(guān)鍵?因為這“大四元”基本上支撐起了整個線程池的框架。
而線程池的框架如下所示:
————————————————
工作隊列
??worker隊列,首先要有worker才有隊列,我們首先定義worker結(jié)構(gòu)體:
??可想而知,worker中要有create_pthread函數(shù)的id參數(shù),
還需要有控制每一個worker live or die的標(biāo)志terminate,
我們最好再設(shè)置一個標(biāo)志表示這個worker是否在工作。
最后,我們要知道這個worker隸屬于那個線程池(至于為什么下文將介紹)
struct NWORKER{ pthread_t threadid; //線程id bool terminate; //是否需要結(jié)束該worker的標(biāo)志 int isWorking; //該worker是否在工作 ThreadPool *pool; //隸屬于的線程池 }任務(wù)隊列
struct NJOB{ void (*func)(void *arg); //任務(wù)函數(shù) void *user_data; //函數(shù)參數(shù) };
線程池本池
??對于一個線程池,任務(wù)隊列和工作隊列已經(jīng)是必不可少的東西了,
那線程池還有需要哪些東西輔助它以達(dá)到它該有的功能呢?
??一說到線程,那處理好線程同步就是一個繞不開的話題,
那在線程池中我們需要處理的臨界資源有哪些呢?
想想我們工作隊列中的每個worker都在等待一個任務(wù)隊列看其是否有任務(wù)到來,
所以很容易得出結(jié)論我們必須要在線程池中實現(xiàn)兩把鎖:
一把是用來控制對任務(wù)隊列操作的互斥鎖,
另一把是當(dāng)任務(wù)隊列有新任務(wù)時喚醒worker的條件鎖。
??有了這兩把鎖,線程池中再加點必要的一些數(shù)字以及對線程池操作的函數(shù),
那么這個類就寫完了。實現(xiàn)代碼如下:
class ThreadPool{ private: struct NWORKER{ pthread_t threadid; bool terminate; int isWorking; ThreadPool *pool; } *m_workers; struct NJOB{ void (*func)(void *arg); //任務(wù)函數(shù) void *user_data; }; public: //線程池初始化 //numWorkers:線程數(shù)量 ThreadPool(int numWorkers, int max_jobs); //銷毀線程池 ~ThreadPool(); //面向用戶的添加任務(wù) int pushJob(void (*func)(void *data), void *arg, int len); private: //向線程池中添加任務(wù) bool _addJob(NJOB* job); //回調(diào)函數(shù) static void* _run(void *arg); void _threadLoop(void *arg); private: std::list<NJOB*> m_jobs_list; int m_max_jobs; //任務(wù)隊列中的最大任務(wù)數(shù) int m_sum_thread; //worker總數(shù) int m_free_thread; //空閑worker數(shù) pthread_cond_t m_jobs_cond; //線程條件等待 pthread_mutex_t m_jobs_mutex; //為任務(wù)加鎖防止一個任務(wù) 被兩個線程執(zhí)行等其他情況 };
可以看到我們做了一些必要的封裝,只給用戶提供了構(gòu)造函數(shù)、
析構(gòu)函數(shù)以及添加任務(wù)的函數(shù)。這也是一個基本的線程池框架必要的接口。
回調(diào)函數(shù)
static?
??根據(jù)上方代碼可以看見,回調(diào)函數(shù)為static函數(shù)。
我可不想在我使用使用回調(diào)函數(shù)的時候自動給我加上*this參數(shù)。
??首先回調(diào)函數(shù)是每個線程創(chuàng)建之后就開始執(zhí)行的函數(shù),
該函數(shù)作為pthread_create的第三個參數(shù)傳入。我們來看看pthread_create的函數(shù)原型:
int pthread_create(pthread_t *tidp,const pthread_attr_t *attr, void *(*start_rtn)(void*),void *arg);
注意到,此處的我們傳入的回調(diào)函數(shù)必須是接受一個void*參數(shù),
且返回類型為void*的函數(shù)。如果我們將回調(diào)函數(shù)寫成線程池的普通成員函數(shù),
那么c++會在這個函數(shù)參數(shù)前默認(rèn)加上一個*this參數(shù),
**這也是為什么我們能在成員函數(shù)中使用當(dāng)前對象中的一些屬性。
**然而就是這個原因,若我們傳入的回調(diào)函數(shù)指針為類的成員函數(shù),
那c++編譯器會破壞我們的函數(shù)結(jié)構(gòu)(因為給我們加了一個形參),
導(dǎo)致pthread_create的第三個參數(shù)不符合要求而報錯:
看吧,編譯器不讓我們用non-static的成員函數(shù)作為回調(diào)函數(shù)傳入pthread_create中。
其實在c++中,大多數(shù)回調(diào)函數(shù)都有這個要求。
??那為什么static就可以呢?
這是因為static函數(shù)為類的靜態(tài)函數(shù),當(dāng)類的成員函數(shù)被static修飾后,
調(diào)用該函數(shù)將不會默認(rèn)傳遞*this指針,
這也是為什么static成員函數(shù)中不能使用對象的非static屬性:
你*this指針都沒傳我上哪去找你的對象?
函數(shù)本身
??在運行回調(diào)函數(shù)的時候,我們又想用對象里的東西(比如鎖),
編譯器又不讓我們用,那怎么辦?別忘了雖然static函數(shù)沒有*this指針,
但它卻可以有一個*void的參數(shù)啊。有了這個*void,我們還怕少一個*this指針?
我們可以先寫一個static函數(shù),將需要的對象指針通過形參傳到這個函數(shù)里,
再在這個函數(shù)中通過這個對象調(diào)用成員函數(shù)的方法,就能使用這個對象的成員屬性了。
??就像這樣:
//run為static函數(shù) void* ThreadPool::_run(void *arg) { NWORKER *worker = (NWORKER *)arg; worker->pool->_threadLoop(arg); } //threadLoop為普通成員函數(shù) void ThreadPool::_threadLoop(void *arg) { //在這里就能直接用當(dāng)前ThreadPool對象的東西了 }
至于threadLoop的實現(xiàn),由于線程是要一直存在的,
一個while(true)的循環(huán)肯定少不了了。這個循環(huán)中具體做什么呢:
不斷檢查任務(wù)隊列中是否有job:
如果有,則取出這個job,并將該job從任務(wù)隊列中刪除,且執(zhí)行job中的func函數(shù)。
如果沒有,調(diào)用pthread_cond_wait函數(shù)等待job到來時被喚醒。
若當(dāng)前worker的terminate為真,則退出循環(huán)結(jié)束線程。
注意在對job操作前別忘了加鎖,函數(shù)實現(xiàn)如下:
void ThreadPool::_threadLoop(void *arg) { NWORKER *worker = (NWORKER*)arg; while (1){ //線程只有兩個狀態(tài):執(zhí)行\(zhòng)等待 //查看任務(wù)隊列前先獲取鎖 pthread_mutex_lock(&m_jobs_mutex); //當(dāng)前沒有任務(wù) while (m_jobs_list.size() == 0) { //檢查worker是否需要結(jié)束生命 if (worker->terminate) break; //條件等待直到被喚醒 pthread_cond_wait(&m_jobs_cond,&m_jobs_mutex); } //檢查worker是否需要結(jié)束生命 if (worker->terminate){ pthread_mutex_unlock(&m_jobs_mutex); break; } //獲取到j(luò)ob后將該job從任務(wù)隊列移出,免得其他worker過來重復(fù)做這個任務(wù) struct NJOB *job = m_jobs_list.front(); m_jobs_list.pop_front(); //對任務(wù)隊列的操作結(jié)束,釋放鎖 pthread_mutex_unlock(&m_jobs_mutex); m_free_thread--; worker->isWorking = true; //執(zhí)行job中的func job->func(job->user_data); worker->isWorking = false; free(job->user_data); free(job); } free(worker); pthread_exit(NULL); }
原文鏈接:https://blog.csdn.net/ACMer_L/article/details/107578636
————————————————
原文鏈接:https://blog.csdn.net/ACMer_L/article/details/107578636
*博客內(nèi)容為網(wǎng)友個人發(fā)布,僅代表博主個人觀點,如有侵權(quán)請聯(lián)系工作人員刪除。