Linux的多任务编程(2)

在这个文件中定义了线程池的结构,工作线程函数的原型以及线程池的基本操作.线面给出的tpool.c是线程池的实现.函数tpool_init完成了线程池的初始化操作,包括对内存的设置,对线程属性的设置和池中线程的预创建.

/* -------------------------------------------------------------------------
 * tpool.c – 线程池的实现
 * -------------------------------------------------------------------------
 */
#include  <stdio.h>
#include  <stdlib.h>
#include  <string.h>
#include  <pthread.h>
#include "tpool.h"
#include "log.h"
/* 工作线程 */
void *tpool_thread(void *tpool);
/***************线程池初始化*****************************/
tpool_t *tpool_init(int num_worker_threads,\ /*线程池线程个数*/
            int max_queue_size, \  /*最大任务数*/
      int do_not_block_when_full) /*是否阻塞任务满的时候*/
{
    int i, rtn;
    tpool_t *pool;
    lprintf(log, INFO, "init pool begin ...\n");
    /* 创建线程池结构体 */
    if((pool = (struct tpool *)malloc(sizeof(struct tpool))) == NULL)
    {
        lprintf(log, FATAL, "Unable to malloc() thread pool!\n");
        return NULL;
    }
    /* 设置线程池架构体成员 */
    pool->num_threads = num_worker_threads;                      /*工作线程个数*/
    pool->max_queue_size = max_queue_size;                      /*任务链表最大长度*/
    pool->do_not_block_when_full = do_not_block_when_full;      /*任务链表满时是否等待*/
    /* 生成线程池缓存 */
    if((pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*num_worker_threads)) == NULL)
    {
        lprintf(log, FATAL,"Unable to malloc() thread info array\n");
        return NULL;
    }
    /* 初始化任务链表 */
    pool->cur_queue_size = 0;
    pool->queue_head = NULL;
    pool->queue_tail = NULL;
    pool->queue_closed = 0;
    pool->shutdown = 0;
    /* 初始化互斥变量,条件变量 用于线程之间的同步 */
    if((rtn = pthread_mutex_init(&(pool->queue_lock),NULL)) != 0)
    {
        lprintf(log,FATAL,"pthread_mutex_init %s",strerror(rtn));
        return NULL;
    }
    if((rtn = pthread_cond_init(&(pool->queue_not_empty),NULL)) != 0)
    {
        lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
        return NULL;
    }
    if((rtn = pthread_cond_init(&(pool->queue_not_full),NULL)) != 0)
    {
        lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
        return NULL;
    }
    if((rtn = pthread_cond_init(&(pool->queue_empty),NULL)) != 0)
    {
        lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
        return NULL;
    }
    /* 创建所有的线程 */
    for(i = 0; i != num_worker_threads; i++)
    {
        if( (rtn=pthread_create(&(pool->threads[i]),NULL,tpool_thread,(void*)pool)) != 0)
        {
            lprintf(log,FATAL,"pthread_create %s\n",strerror(rtn));
            return NULL;
        }
        lprintf(log, INFO, "init pthread  %d!\n",i);
    }
    lprintf(log, INFO, "init pool end!\n");
    return pool;
}

函数tpool_add_work为线程池添加了一个工作线程.因为预创建的线程是不能做任何工作的,只有分配了适当的任务后,才会使预创建的线程真正的工作起来.

int tpool_add_work(tpool_t *pool,  \          /*线程池指针*/
                    void (*routine)(void *),\  /*工作线程函数指针*/
                    void *arg)                  /*工作线程函数参数*/
{
    int rtn;
    tpool_work_t *workp; /*当前工作线程*/
    if((rtn = pthread_mutex_lock(&pool->queue_lock)) != 0)
    {
        lprintf(log,FATAL,"pthread mutex lock failure\n");
        return -1;
    }
    /* 采取独占的形式访问任务链表 */
    if((pool->cur_queue_size == pool->max_queue_size) && \
            (pool->do_not_block_when_full))
    {
        if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0)
        {
            lprintf(log,FATAL,"pthread mutex lock failure\n");
            return -1;
        }
        return -1;
    }
    /* 等待任务链表为新线程释放空间 */
    while((pool->cur_queue_size == pool->max_queue_size) &&
            (!(pool->shutdown || pool->queue_closed)))
  {
        if((rtn = pthread_cond_wait(&(pool->queue_not_full),
                        &(pool->queue_lock)) ) != 0)
        {
            lprintf(log,FATAL,"pthread cond wait failure\n");
            return -1;
        }
    }
    if(pool->shutdown || pool->queue_closed)
    {
        if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0) 
        {
            lprintf(log,FATAL,"pthread mutex lock failure\n");
            return -1;
        }
        return -1;
    }
    /* 分配工作线程结构体 */
    if((workp = (tpool_work_t *)malloc(sizeof(tpool_work_t))) == NULL)
    {
        lprintf(log,FATAL,"unable to create work struct\n");
        return -1;
    }
    workp->handler_routine = routine;
    workp->arg = arg;
    workp->next = NULL;
    if(pool->cur_queue_size == 0)
    {
        pool->queue_tail = pool->queue_head = workp;
        if((rtn = pthread_cond_broadcast(&(pool->queue_not_empty))) != 0)
        {
            lprintf(log,FATAL,"pthread broadcast error\n");
            return -1;
        }
    }
    else
    {
        pool->queue_tail->next = workp;
        pool->queue_tail = workp;
    }
    pool->cur_queue_size++;
    /* 释放对任务链表的独占 */
    if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0)
    {
        lprintf(log,FATAL,"pthread mutex lock failure\n");
        return -1;
    }
    return 0;
}

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/2171544ed330f750996edb69a5ca4e43.html