在这个文件中定义了线程池的结构,工作线程函数的原型以及线程池的基本操作.线面给出的tpool.c是线程池的实现.函数tpool_init完成了线程池的初始化操作,包括对内存的设置,对线程属性的设置和池中线程的预创建.
/* -------------------------------------------------------------------------
* tpool.c – 线程池的实现
* -------------------------------------------------------------------------
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include "tpool.h"
#include "log.h"
/* 工作线程 */
void *tpool_thread(void *tpool);
/***************线程池初始化*****************************/
tpool_t *tpool_init(int num_worker_threads,\ /*线程池线程个数*/
int max_queue_size, \ /*最大任务数*/
int do_not_block_when_full) /*是否阻塞任务满的时候*/
{
int i, rtn;
tpool_t *pool;
lprintf(log, INFO, "init pool begin ...\n");
/* 创建线程池结构体 */
if((pool = (struct tpool *)malloc(sizeof(struct tpool))) == NULL)
{
lprintf(log, FATAL, "Unable to malloc() thread pool!\n");
return NULL;
}
/* 设置线程池架构体成员 */
pool->num_threads = num_worker_threads; /*工作线程个数*/
pool->max_queue_size = max_queue_size; /*任务链表最大长度*/
pool->do_not_block_when_full = do_not_block_when_full; /*任务链表满时是否等待*/
/* 生成线程池缓存 */
if((pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*num_worker_threads)) == NULL)
{
lprintf(log, FATAL,"Unable to malloc() thread info array\n");
return NULL;
}
/* 初始化任务链表 */
pool->cur_queue_size = 0;
pool->queue_head = NULL;
pool->queue_tail = NULL;
pool->queue_closed = 0;
pool->shutdown = 0;
/* 初始化互斥变量,条件变量 用于线程之间的同步 */
if((rtn = pthread_mutex_init(&(pool->queue_lock),NULL)) != 0)
{
lprintf(log,FATAL,"pthread_mutex_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_not_empty),NULL)) != 0)
{
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_not_full),NULL)) != 0)
{
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_empty),NULL)) != 0)
{
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
/* 创建所有的线程 */
for(i = 0; i != num_worker_threads; i++)
{
if( (rtn=pthread_create(&(pool->threads[i]),NULL,tpool_thread,(void*)pool)) != 0)
{
lprintf(log,FATAL,"pthread_create %s\n",strerror(rtn));
return NULL;
}
lprintf(log, INFO, "init pthread %d!\n",i);
}
lprintf(log, INFO, "init pool end!\n");
return pool;
}
函数tpool_add_work为线程池添加了一个工作线程.因为预创建的线程是不能做任何工作的,只有分配了适当的任务后,才会使预创建的线程真正的工作起来.
int tpool_add_work(tpool_t *pool, \ /*线程池指针*/
void (*routine)(void *),\ /*工作线程函数指针*/
void *arg) /*工作线程函数参数*/
{
int rtn;
tpool_work_t *workp; /*当前工作线程*/
if((rtn = pthread_mutex_lock(&pool->queue_lock)) != 0)
{
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
/* 采取独占的形式访问任务链表 */
if((pool->cur_queue_size == pool->max_queue_size) && \
(pool->do_not_block_when_full))
{
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0)
{
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return -1;
}
/* 等待任务链表为新线程释放空间 */
while((pool->cur_queue_size == pool->max_queue_size) &&
(!(pool->shutdown || pool->queue_closed)))
{
if((rtn = pthread_cond_wait(&(pool->queue_not_full),
&(pool->queue_lock)) ) != 0)
{
lprintf(log,FATAL,"pthread cond wait failure\n");
return -1;
}
}
if(pool->shutdown || pool->queue_closed)
{
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0)
{
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return -1;
}
/* 分配工作线程结构体 */
if((workp = (tpool_work_t *)malloc(sizeof(tpool_work_t))) == NULL)
{
lprintf(log,FATAL,"unable to create work struct\n");
return -1;
}
workp->handler_routine = routine;
workp->arg = arg;
workp->next = NULL;
if(pool->cur_queue_size == 0)
{
pool->queue_tail = pool->queue_head = workp;
if((rtn = pthread_cond_broadcast(&(pool->queue_not_empty))) != 0)
{
lprintf(log,FATAL,"pthread broadcast error\n");
return -1;
}
}
else
{
pool->queue_tail->next = workp;
pool->queue_tail = workp;
}
pool->cur_queue_size++;
/* 释放对任务链表的独占 */
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0)
{
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return 0;
}