Linux编程之自定义消息队列

我这里要讲的并不是IPC中的消息队列,我要讲的是在进程内部实现自定义的消息队列,让各个线程的消息来推动整个进程的运动。进程间的消息队列用于进程与进程之间的通信,而我将要实现的进程内的消息队列是用于有序妥当处理来自于各个线程请求,避免一窝蜂的请求而导致消息的异常丢失。想想socket编程里的listen函数吧,里面要设置一个队列长度的参数,其实来自网络的请求已经排成一个请求队列了,只是这个队列是系统帮我们做好了,我们看不到而已。如果系统不帮我们做这个等待队列的话,那就需要我们程序员在应用层实现了。

进程内的消息队列实现并不难,总的来说有以下几点:

自定义消息结构,并构造队列

一个线程负责依次从消息队列中取出消息,并处理该消息

多个线程产生事件,并将消息放进消息队列,等待处理

长话短说,我们开始动手吧!

 

一、定义消息结构

先贴代码再解释:

typedef struct Msg_Hdr_s { uint32 msg_type; uint32 msg_len; uint32 msg_src; uint32 msg_dst; }Msg_Hdr_t; typedef struct Msg_s { Msg_Hdr_t hdr; uint8 data[100]; } Msg_t;

下面是我设计的消息格式内容的解释:

msg_type:标记消息类型,当消息接收者看到该msg_type后就知道他要干什么事了

msg_len:消息长度,待扩展,暂时没用到(以后会扩展为变长消息)

msg_src:消息的源地址,即消息的发起者

msg_dst:消息的目的地,即消息的接受者

data[100]:消息除去消息头外可以携带的信息量,定义为100字节

由该消息数据结构可以知道,这个消息是定长的,当然也可以实现为变长消息,但现在暂不实现,今天先把定长消息实现了,以后再完善变长消息。

 

 

二、构造循环队列

队列可以由链表实现,也可以由数组实现,这里就使用数组实现的循环链表作为我们消息队列的队列模型。

typedef struct Queue_s { int head; int rear; sem_t sem; Msg_t data[QUEUE_SIZE]; }Queue_t; int MsgQueueInit(Queue_t* Q) { if(!Q) { printf("Invalid Queue!\n"); return -1; } Q->rear = 0; Q->head = 0; sem_init(&Q->sem, 0, 1); return 0; } int MsgDeQueue(Queue_t* Q, Msg_t* msg) { if(!Q) { printf("Invalid Queue!\n"); return -1; } if(Q->rear == Q->head) //only one consumer,no need to lock head { printf("Empty Queue!\n"); return -1; } memcpy(msg, &(Q->data[Q->head]), sizeof(Msg_t)); Q->head = (Q->head+1)%QUEUE_SIZE; return 0; } int MsgEnQueue(Queue_t* Q, Msg_t* msg) { if(Q->head == (Q->rear+1)%QUEUE_SIZE) { printf("Full Queue!\n"); return -1; } sem_wait(&Q->sem); memcpy(&(Q->data[Q->rear]), msg, sizeof(Msg_t)); Q->rear = (Q->rear+1)%QUEUE_SIZE; sem_post(&Q->sem); return 0; }

 

循环队列的实现想必大家都比较熟悉,但这里需要提示的几点是:

队列中应加入信号量或锁来保证进队时的互斥访问,因为多个消息可能同时进队,互相覆盖其队列节点

这里的信号量仅用于进队而没用于出队,理由是消息处理者只有一个,不存在互斥的情形

三、构造消息处理者

if(pthread_create(&handler_thread_id, NULL, (void*)msg_handler, NULL)) { printf("create handler thread fail!\n"); return -1; } void msg_printer(Msg_t* msg) { if(!msg) { return; } printf("%s: I have recieved a message!\n", __FUNCTION__); printf("%s: msgtype:%d msg_src:%d dst:%d\n\n",__FUNCTION__,msg->hdr.msg_type,msg->hdr.msg_src,msg->hdr.msg_dst); } void msg_handler() { sleep(5); //let's wait 5s when starts while(1) { Msg_t msg; memset(&msg, 0 ,sizeof(Msg_t)); int res = MsgDeQueue((Queue_t*)&MsgQueue, &msg); if(res != 0) { sleep(10); continue; } msg_printer(&msg); sleep(1); } }

我在进程里create了一个线程作为消息处理者(handler)来处理消息队列的消息,甘进入该线程时先等个5秒钟来让生产者往队列里丢些消息,然后再开始消息处理。当队列没消息可取时,就休息十秒,再去取消息。

 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/29f144372d28fb5b69cfa4f8aaeb65b0.html