Linux C 实现一个简单的线程池

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

什么时候需要创建线程池呢?

简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了。如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了。

实现程序(未完成相关资源的释放操作)
/********************

Pthread Pool
    23/02/19 10:06
   
********************/

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <pthread.h>

/* 任务节点结构 */
typedef struct node {
    void            (*func)(void *arg);
    unsigned int    *work_id;
    struct node    *next;
}TaskNode;

/* 线程池结构 */
typedef struct {
    pthread_cond_t  ready;
    pthread_mutex_t lock;

struct node    *task_head;
    pthread_t  *thread_id;
    unsigned int    cur_queue_size;
}PthreadPool;

static PthreadPool *pool = NULL;

/* 定义线程 */
void *thread_routine (void *arg)
{
    while(1)
    {
        pthread_mutex_lock(&pool->lock);

if(!pool->cur_queue_size){
            printf("Thread is waiting .... \n");
            pthread_cond_wait(&pool->ready, &pool->lock);
        }
       
        pool->cur_queue_size--;
        TaskNode *worker = pool->task_head;
        pool->task_head= worker->next;

pthread_mutex_unlock (&(pool->lock));
       
        /* 调用函数,执行任务 */
        worker->func(worker->work_id);
       
        free (worker);
        worker = NULL;
    }
}

static void system_init ()
{
    int i = 0;
   
    pool = (PthreadPool*) malloc(sizeof(PthreadPool));

/* 初始化锁与条件变量 */
    pthread_mutex_init (&pool->lock, NULL);
    pthread_cond_init (&pool->ready, NULL);

/* 在池中加入 3 个线程 */
    pool->thread_id = (pthread_t*) malloc(3 * sizeof(PthreadPool));
    pool->cur_queue_size = 0;
    pool->task_head= NULL;

for(; i<3; i++)
        pthread_create(&pool->thread_id[i], NULL, thread_routine, NULL);
}

/* 添加任务 */
int pool_add_work (void (*process)(void *arg), unsigned int *id)
{
    TaskNode *new_work = (TaskNode*) malloc(sizeof(TaskNode));

new_work->func      = process;
    new_work->work_id  = id;
    new_work->next      = NULL;

pool->cur_queue_size++;

TaskNode *temp = pool->task_head;

if(!temp)
        pool->task_head = new_work;
    else{
        while(temp->next)
            temp = temp->next;
        temp->next = new_work;
    }

pthread_cond_signal(&pool->ready);
}

/* 定义用户任务 */
void my_process(void *arg)
{
    printf ("threadid is 0x%x, working on task %d \n", pthread_self(), *(int *)arg);
    sleep (1);
}

int main()
{
    unsigned int work_id[10];
    int i = 0;
   
    system_init();

/* 等待线程运行 */
    sleep(1);

for(; i<10; i++){
        work_id[i] = i;
        pool_add_work(my_process, &work_id[i]);
    }

/* 等待任务执行完毕 */
    sleep(5);

return 0;
}

Linux 上编译后,运行结果:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/8a45ac20ebd0841df3b1b06441ef8724.html