#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
#include "multi.h"
worker_queue::worker_queue():head(NULL),tail(NULL),cur_size(0),destroy(false){
pthread_mutex_init(&mutex,NULL);
pthread_cond_init(&ready,NULL);
}
worker_queue::~worker_queue(){
clean();
destroy = true ;
pthread_cond_broadcast(&ready);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&ready);
}
int worker_queue::clean(){
worker_node* temp;
pthread_mutex_lock(&mutex);
while(NULL!=head){
temp = head ;
head = head->next ;
delete temp ;
}
tail=head=NULL;
cur_size = 0;
pthread_mutex_unlock(&mutex);
return 0;
}
int worker_queue::in_queue(const worker_node *node){
worker_node *temp;
temp = new worker_node ;
if(NULL == temp) return -1 ; //new err ;
*temp = *node ;
pthread_mutex_lock(&mutex);
if(NULL==head){
head=tail=temp;
}else{
tail->next=temp ;
tail = temp ;
tail->next = NULL;
}
cur_size++;
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&ready);
return 0 ;
}
worker_node * worker_queue::out_queue(){ //out of the queue
pthread_mutex_lock(&mutex);
if((NULL==head) && (false == destroy)) { //queue empty
pthread_cond_wait(&ready,&mutex);
}
// assert(head!=NULL);
// assert(cur_size!=0);
if(destroy){
pthread_mutex_unlock(&mutex);
return NULL;
}
worker_node *temp;
temp = head ;
head = head->next ;
cur_size -- ;
pthread_mutex_unlock(&mutex);
return temp;
}
pmulti::pmulti():
is_init(false),thread_num(0),w_queue(NULL),threads(NULL),is_destroy(false){
}
pmulti::~pmulti(){
}
int pmulti::init(unsigned int thr_num){
if(is_init)
return -1;//alread init
int i;
//pool = new thread_pool;
//if(NULL == pool) return -2;//new error
//bzero(pool,sizeof(thread_pool));
w_queue = new worker_queue ;
if(NULL == w_queue) return -2;//new error
thread_num = thr_num;
threads = new pthread_t[thr_num] ;
for(i=0;i<thr_num;i++){
if(0!=pthread_create((threads+i),NULL,thread_fun,(void*)this ))
printf("pthread_create error\n");
}
is_destroy = false ;
is_init = true;
return 0;
}
int pmulti::add_worker(const worker_node * node){
return w_queue->in_queue(node);
}
int pmulti::destroy(){
int i=0;
char* res;
is_destroy = true ;
if(NULL != w_queue)
delete w_queue;
for(i=0;i<thread_num;i++){
pthread_join(threads[i],(void**)&res);
printf("joined :%lld end;\n",res);
}
if(thread_num > 0)
delete[] threads;
return 0;
}
void* pmulti::thread_fun(void*arg){
printf("thread num:%lld\n",pthread_self());
pmulti *pm =(pmulti*)arg;
worker_node * node ;
while(1){
if(pm->is_destroy)
pthread_exit(NULL);
node = pm->w_queue->out_queue();
if((NULL == node)|| (pm->is_destroy))
pthread_exit(NULL);
node->process(node->arg);
printf("thread_fun:%lld run\n",pthread_self());
}
return (void*)pthread_self();
}
C++实现线程池的经典模型(2)
内容版权声明:除非注明,否则皆为本站原创文章。
转载注明出处:http://www.heiqu.com/7780b336fff65da1cf6b41fc590fad6f.html