Linux 下C++线程池的简单实现(2)

//分配人物到top,然后通知有任务需要执行。
bool ThreadPool::assignWork(WorkerThread *workerThread)
{
    pthread_mutex_lock(&mutexWorkCompletion);
    incompleteWork++;
  //cout << "assignWork...incomapleteWork=" << incompleteWork << endl;
 pthread_mutex_unlock(&mutexWorkCompletion);
 sem_wait(&availableThreads);
 pthread_mutex_lock(&mutexSync);
    //workerVec[topIndex] = workerThread;
    workerQueue[topIndex] = workerThread;
    //cout << "Assigning Worker[" << workerThread->id << "] Address:[" << workerThread << "] to Queue index [" << topIndex << "]" << endl;
 if(queueSize !=1 )
  topIndex = (topIndex+1) % (queueSize-1);
    sem_post(&availableWork);
 pthread_mutex_unlock(&mutexSync);
 return true;
}

//当已经有人物放到队列里面后,就会受到通知,然后从底部拿走工作,在workerArg中返回
bool ThreadPool::fetchWork(WorkerThread **workerArg)
{
 sem_wait(&availableWork);

pthread_mutex_lock(&mutexSync);
 WorkerThread * workerThread = workerQueue[bottomIndex];
    workerQueue[bottomIndex] = NULL;
 *workerArg = workerThread;
 if(queueSize !=1 )
  bottomIndex = (bottomIndex+1) % (queueSize-1);
 sem_post(&availableThreads);
 pthread_mutex_unlock(&mutexSync);
    return true;
}

//每个线程运行的静态函数实体,executeThis 方法将会被继承累从写,之后实现具体线程的工作。
void *ThreadPool::threadExecute(void *param)
{
 WorkerThread *worker = NULL;
 while(((ThreadPool *)param)->fetchWork(&worker))
 {
  if(worker)
        {
   worker->executeThis();
            //cout << "worker[" << worker->id << "]\tdelete address: [" << worker << "]" << endl;
            delete worker;
            worker = NULL;
        }

pthread_mutex_lock( &(((ThreadPool *)param)->mutexWorkCompletion) );
        //cout << "Thread " << pthread_self() << " has completed a Job !" << endl;
   ((ThreadPool *)param)->incompleteWork--;
  pthread_mutex_unlock( &(((ThreadPool *)param)->mutexWorkCompletion) );
 }
 return 0;
}

#include <iostream>
#include "threadpool.h"

using namespace std;


#define ITERATIONS 20

class SampleWorkerThread : public WorkerThread
{
public:
    int id;
 unsigned virtual executeThis()
 {
 // Instead of sleep() we could do anytime consuming work here.
 // Using ThreadPools is advantageous only when the work to be done is really time consuming. (atleast 1 or 2 seconds)
  cout<<"This is SampleWorkerThread sleep 2s"<<endl;
  sleep(2);
  return(0);
 }

SampleWorkerThread(int id) : WorkerThread(id), id(id)
    {
//      cout << "Creating SampleWorkerThread " << id << "\t address=" << this << endl;
    }

~SampleWorkerThread()
    {
//      cout << "Deleting SampleWorkerThread " << id << "\t address=" << this << endl;
    }
};


int main(int argc, char **argv)
{
 
 cout<<"Thread pool"<<endl;
 ThreadPool* myPool = new ThreadPool(25);
 //pthread_create()执行,开始等待任务分配
 myPool->initializeThreads();

//用来计算时间间隔。
    time_t t1=time(NULL);

//分配具体工作到线程池
 for(unsigned int i=0;i<ITERATIONS;i++){
  SampleWorkerThread* myThreathreadExecuted = new SampleWorkerThread(i);
  myPool->assignWork(myThreathreadExecuted);
 }
 
 //销毁钱等待所有线程结束,等待间隔为2秒。
    myPool->destroyPool(2);

time_t t2=time(NULL);
    cout << t2-t1 << " seconds elapsed\n" << endl;
 delete myPool;
 
    return 0;
}

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/90c629465358d24709ccc19f91b4d81f.html