Linux编程之自定义消息队列(2)

这里的消息处理很简单,我只是简单地将受到的消息打印一下,证明受到的消息正是其他线程发给我的。当然,你也可以在这里扩展功能,根据受到的消息类型进一步决定该做什么事。比如:

enum MSG_TYPE { GO_HOME, GO_TO_BED, GO_TO_LUNCH, GO_TO_CINAMA, GO_TO_SCHOOL, GO_DATEING, GO_TO_WORK,//6 }; void handler() { switch(msgtype) { case GO_HOME: go_home(); break; case GO_TO_BED: go_to_bed(); break; ....... } }

这里的handler就是一个简单的状态机了,根据给定的消息类型(事件)去做特定的事,推动状态机的转动。

四、构造消息生产者

if(pthread_create(&thread1_id, NULL, (void*)msg_sender1, NULL)) { printf("create thread1 fail!\n"); return -1; } if(pthread_create(&thread2_id, NULL, (void*)msg_sender2, NULL)) { printf("create thread2 fail!\n"); return -1; } if(pthread_create(&thread3_id, NULL, (void*)msg_sender3, NULL)) { printf("create thread3 fail!\n"); return -1; } void msg_sender1() { int i = 0; while(1) { if(i > 10) { i = 0; } Msg_t msg; msg.hdr.msg_type = i++; msg.hdr.msg_src = THREAD1; msg.hdr.msg_dst = HANDLER; MsgEnQueue((Queue_t*)&MsgQueue, &msg); printf("%s: Thread1 send a message!\n",__FUNCTION__); sleep(1); } } void msg_sender2() { int i = 0; while(1) { if(i > 10) { i = 0; } Msg_t msg; msg.hdr.msg_type = i++; msg.hdr.msg_src = THREAD2; msg.hdr.msg_dst = HANDLER; MsgEnQueue((Queue_t*)&MsgQueue, &msg); printf("%s: Thread2 send a message!\n",__FUNCTION__); sleep(1); } } void msg_sender3() { int i = 0; while(1) { if(i > 10) { i = 0; } Msg_t msg; msg.hdr.msg_type = i++; msg.hdr.msg_src = THREAD3; msg.hdr.msg_dst = HANDLER; MsgEnQueue((Queue_t*)&MsgQueue, &msg); printf("%s: Thread3 send a message!\n",__FUNCTION__); sleep(1); } }

这里我create了三个线程来模拟消息生产者,每个生产者每隔1秒往消息队列里写消息。

五、跑起来看看

先贴完整的代码:

msg_queue.c:

 

1 #include <stdio.h> 2 #include <pthread.h> 3 #include <semaphore.h> 4 #include <unistd.h> 5 #include <string.h> 6 #include "msg_def.h" 7 8 Queue_t MsgQueue; 9 10 int main(int argc, char* argv[]) 11 { 12 int ret; 13 pthread_t thread1_id; 14 pthread_t thread2_id; 15 pthread_t thread3_id; 16 pthread_t handler_thread_id; 17 18 ret = MsgQueueInit((Queue_t*)&MsgQueue); 19 if(ret != 0) 20 { 21 return -1; 22 } 23 24 if(pthread_create(&handler_thread_id, NULL, (void*)msg_handler, NULL)) 25 { 26 printf("create handler thread fail!\n"); 27 return -1; 28 } 29 30 31 if(pthread_create(&thread1_id, NULL, (void*)msg_sender1, NULL)) 32 { 33 printf("create thread1 fail!\n"); 34 return -1; 35 } 36 37 if(pthread_create(&thread2_id, NULL, (void*)msg_sender2, NULL)) 38 { 39 printf("create thread2 fail!\n"); 40 return -1; 41 } 42 43 if(pthread_create(&thread3_id, NULL, (void*)msg_sender3, NULL)) 44 { 45 printf("create thread3 fail!\n"); 46 return -1; 47 } 48 49 50 while(1) 51 { 52 sleep(1); 53 } 54 55 return 0; 56 } 57 58 59 60 61 int MsgQueueInit(Queue_t* Q) 62 { 63 if(!Q) 64 { 65 printf("Invalid Queue!\n"); 66 return -1; 67 } 68 Q->rear = 0; 69 Q->head = 0; 70 sem_init(&Q->sem, 0, 1); 71 return 0; 72 } 73 74 int MsgDeQueue(Queue_t* Q, Msg_t* msg) 75 { 76 if(!Q) 77 { 78 printf("Invalid Queue!\n"); 79 return -1; 80 } 81 if(Q->rear == Q->head) //only one cosumer,no need to lock head 82 { 83 printf("Empty Queue!\n"); 84 return -1; 85 } 86 memcpy(msg, &(Q->data[Q->head]), sizeof(Msg_t)); 87 Q->head = (Q->head+1)%QUEUE_SIZE; 88 return 0; 89 90 } 91 92 int MsgEnQueue(Queue_t* Q, Msg_t* msg) 93 { 94 if(Q->head == (Q->rear+1)%QUEUE_SIZE) 95 { 96 printf("Full Queue!\n"); 97 return -1; 98 } 99 sem_wait(&Q->sem); 100 memcpy(&(Q->data[Q->rear]), msg, sizeof(Msg_t)); 101 Q->rear = (Q->rear+1)%QUEUE_SIZE; 102 sem_post(&Q->sem); 103 return 0; 104 } 105 106 void msg_printer(Msg_t* msg) 107 { 108 if(!msg) 109 { 110 return; 111 } 112 printf("%s: I have recieved a message!\n", __FUNCTION__); 113 printf("%s: msgtype:%d msg_src:%d dst:%d\n\n",__FUNCTION__,msg->hdr.msg_type,msg->hdr.msg_src,msg->hdr.msg_dst); 114 115 } 116 117 int msg_send() 118 { 119 120 Msg_t msg; 121 msg.hdr.msg_type = GO_HOME; 122 msg.hdr.msg_src = THREAD1; 123 msg.hdr.msg_dst = HANDLER; 124 return MsgEnQueue((Queue_t*)&MsgQueue, &msg); 125 126 } 127 128 void msg_handler() 129 { 130 sleep(5); //let's wait 5s when starts 131 while(1) 132 { 133 Msg_t msg; 134 memset(&msg, 0 ,sizeof(Msg_t)); 135 int res = MsgDeQueue((Queue_t*)&MsgQueue, &msg); 136 if(res != 0) 137 { 138 sleep(10); 139 continue; 140 } 141 msg_printer(&msg); 142 sleep(1); 143 } 144 } 145 146 147 void msg_sender1() 148 { 149 int i = 0; 150 while(1) 151 { 152 if(i > 10) 153 { 154 i = 0; 155 } 156 Msg_t msg; 157 msg.hdr.msg_type = i++; 158 msg.hdr.msg_src = THREAD1; 159 msg.hdr.msg_dst = HANDLER; 160 MsgEnQueue((Queue_t*)&MsgQueue, &msg); 161 printf("%s: Thread1 send a message!\n",__FUNCTION__); 162 sleep(1); 163 } 164 } 165 166 void msg_sender2() 167 { 168 int i = 0; 169 while(1) 170 { 171 if(i > 10) 172 { 173 i = 0; 174 } 175 Msg_t msg; 176 msg.hdr.msg_type = i++; 177 msg.hdr.msg_src = THREAD2; 178 msg.hdr.msg_dst = HANDLER; 179 MsgEnQueue((Queue_t*)&MsgQueue, &msg); 180 printf("%s: Thread2 send a message!\n",__FUNCTION__); 181 sleep(1); 182 } 183 } 184 185 void msg_sender3() 186 { 187 int i = 0; 188 while(1) 189 { 190 if(i > 10) 191 { 192 i = 0; 193 } 194 Msg_t msg; 195 msg.hdr.msg_type = i++; 196 msg.hdr.msg_src = THREAD3; 197 msg.hdr.msg_dst = HANDLER; 198 MsgEnQueue((Queue_t*)&MsgQueue, &msg); 199 printf("%s: Thread3 send a message!\n",__FUNCTION__); 200 sleep(1); 201 } 202 } 

msg_def.h:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/29f144372d28fb5b69cfa4f8aaeb65b0.html