Linux 下C语言简单实现线程池(3)

/**********************************
 * @author    wallwind@yeah.net
 * @date        2012/06/13
 * Last update: 2012/06/13
 * License:    LGPL
 *
 **********************************/
 #include "global.h"
 #include "Thread.h"
 #include <errno.h>
 
 static int thpool_keepalive = 1;
 
 /* 创建互斥量,并初始化 */
  pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /* used to serialize queue access */
 
thpool_t*  thpool_init(int threadN)
{
 thpool_t* thpool;
 if(!threadN || threadN < 1)
  threadN = 1;
 ///分配线程池内存
 thpool = (thpool_t*) malloc(sizeof(thpool_t));
 if(thpool ==NULL)
 {
  printf("malloc thpool_t error");
  return NULL;
 }
 //分配线程数
 thpool->threadsN = threadN;
 thpool->threads =(pthread_t*) malloc(threadN*sizeof(pthread_t));
 if(thpool->threads == NULL)
 {
  printf("malloc thpool->threads error");
  return NULL;
 }
 if(thpool_jobqueue_init(thpool))
  return -1;

thpool->jobqueue->queueSem =(sem_t*)malloc(sizeof(sem_t));
 sem_init(thpool->jobqueue->queueSem,0,1);
 int t;
 for(t = 0;t< threadN ;t++)
 {
  pthread_create(&(thpool->threads[t]),NULL,(void *)thpool_thread_do,(void*)thpool);
 }
 
 return thpool;
}

void thpool_destroy(thpool_t* tp_p)
{
 int i ;
 thpool_keepalive = 0;
 
 for(i = 0;i < (tp_p->threadsN); i++)
 {
  if(sem_post(tp_p->jobqueue->queueSem))
  {
   fprintf(stderr, "thpool_destroy(): Could not bypass sem_wait()\n");
  }

}
 if(sem_post(tp_p->jobqueue->queueSem)!=0)
 {
  fprintf(stderr, "thpool_destroy(): Could not destroy semaphore\n");
 }
 for(i = 0;i < (tp_p->threadsN); i++)
 {
  pthread_join(tp_p->threads[i],NULL);
 }
 thpool_jobqueue_empty(tp_p);
 
 free(tp_p->threads);
 free(tp_p->jobqueue->queueSem);
 free(tp_p->jobqueue);
 free (tp_p);
 
}
////对双向队列初始化
/* Initialise queue */
int thpool_jobqueue_init(thpool_t* tp_p){
 tp_p->jobqueue=(thpool_jobqueue*)malloc(sizeof(thpool_jobqueue));      /* MALLOC job queue */
 if (tp_p->jobqueue==NULL) return -1;
 tp_p->jobqueue->tail=NULL;
 tp_p->jobqueue->head=NULL;
 tp_p->jobqueue->jobN=0;
 return 0;
}

////
void thpool_thread_do(thpool_t* tp_p)
{
 while(thpool_keepalive ==1)
 {
  if(sem_wait(tp_p->jobqueue->queueSem)) ///线程阻塞,等待通知 直到消息队列有数据
  {
   perror("thpool_thread_do(): Waiting for semaphore");
   exit(1);
  }
  if(thpool_keepalive)
  {
   //(void*)(*function)(void *arg);
   FUNC function;
   void* arg_buff;
   thpool_job_t*  job_p;
   
   pthread_mutex_lock(&mutex);
    job_p = thpool_jobqueue_peek(tp_p);
   function = job_p->function;
   arg_buff = job_p->arg;
   if(thpool_jobqueue_removelast(tp_p))
    return ;
   pthread_mutex_unlock(&mutex);
   function(arg_buff);  //运行 你的方法。
   free(job_p);        ////释放掉。
  }
  else
  {
   return ;
  }
   
 }
 return ;
}

//得到第一个队列的一个节点
thpool_job_t* thpool_jobqueue_peek(thpool_t* tp_p)
{
 return tp_p->jobqueue->tail;
}
/////删除队列的最后一个节点
int thpool_jobqueue_removelast(thpool_t* tp_p)
{
 if(tp_p ==NULL)
  return -1;
 thpool_job_t* theLastJob;
 theLastJob = tp_p->jobqueue->tail;
 switch(tp_p->jobqueue->jobN)
 {
  case 0:
   return -1;
  case 1:
   tp_p->jobqueue->head =NULL;
   tp_p->jobqueue->tail =NULL;
   break;
  default:
   theLastJob->prev->next = NULL;
   tp_p->jobqueue->tail = theLastJob->prev;
   
 }
 (tp_p->jobqueue->jobN)--;
 int reval;
 sem_getvalue(tp_p->jobqueue->queueSem,&reval);
 return 0; 
}

void thpool_jobqueue_add(thpool_t* tp_p, thpool_job_t* newjob_p)
{
 newjob_p->next = NULL;
 newjob_p->prev = NULL;
 thpool_job_t* oldFirstJob;
 oldFirstJob = tp_p->jobqueue->head;
 
 switch(tp_p->jobqueue->jobN)
 {
  case 0:
   tp_p->jobqueue->head = newjob_p;
   tp_p->jobqueue->tail = newjob_p;
   break;
  default:
   oldFirstJob->prev = newjob_p;
   newjob_p->next = oldFirstJob;
   tp_p->jobqueue->head = newjob_p;
 
 }
 (tp_p->jobqueue->jobN)++;
 sem_post(tp_p->jobqueue->queueSem);
 
 int reval;
 sem_getvalue(tp_p->jobqueue->queueSem,&reval);
 return;
}

/////将消息加入线程池
int thpool_add_work(thpool_t* tp_p, void* (*function_p)(void*), void* arg_p)
{
 thpool_job_t* newjob;
 newjob = (thpool_job_t*) malloc(sizeof(thpool_job_t));
 
 if(newjob ==NULL)
 {
  fprintf(stderr, "thpool_add_work(): Could not allocate memory for new job\n");
  exit(1);
 }
 newjob ->function = function_p;
 newjob ->arg      = arg_p;
 pthread_mutex_lock(&mutex);
 thpool_jobqueue_add(tp_p,newjob);
 pthread_mutex_unlock(&mutex);   
 return 0;
}

///清空队列
void thpool_jobqueue_empty(thpool_t* tp_p)
{
 thpool_job_t* curjob;
 curjob = tp_p->jobqueue->tail;
 
 while(tp_p->jobqueue->jobN)
 {
  tp_p->jobqueue->tail = curjob->prev;
  free (curjob);
  curjob = tp_p->jobqueue->tail;
  (tp_p->jobqueue->jobN)--;
 }
 tp_p->jobqueue->head = NULL;
 tp_p->jobqueue->tail = NULL;
}

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/9f4d0d2a4e7fae1b880d046d0afc9af0.html