消息队列(mqueue)深入理解(2)

int main(int argc, char* argv[])
{
    int flag = O_RDWR;
    int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
    mqd_t mqid = mq_open("/mq_test",flag,mode,NULL);
    if (mqid == -1)
    {
        printf("open mqueue failed!\n");
        return 1;
    }

struct mq_attr attr;
    mq_getattr(mqid,&attr);
    char buf[256] = {0};
    int priority = 0;
    mq_receive(mqid,buf,attr.mq_msgsize,&priority);
    printf("%s\n",buf);
    mq_close(mqid);   
    return 0;
}

运行结果如下:

消息队列(mqueue)深入理解

首先我们运行三次send,然后运行四次recv,可见recv的前三次是可以收到消息队列里的三个消息的,当运行第四次的时,系统消息队列里为空,recv就会阻塞;关于非阻塞式mqueue见下文。

5、mq_notify函数

如前文介绍,poxis消息队列运行异步通知,以告知何时有一个消息放置到某个空消息队列中,这种通知有两种方式可以选择:

(1)产生一个信号

(2)创建一个线程来执行一个指定的函数

这种通知通过mq_notify() 函数建立。该函数为指定的消息消息队列建立或删除异步事件通知,

#include <mqueue.h> 

int mq_notify(mqd_t mqdes, const struct sigevent* notification);

(1)如果notification参数为非空,那么当前进程希望在有一个消息到达所指定的先前为空的对列时得到通知。

(2)如果notification参数为空,而且当前进程被注册为接收指定队列的通知,那么已存在的注册将被撤销。

(3)任意时刻只有一个进程可以被注册为接收某个给定队列的通知。

(4)当有一个消息到达先前为空的消息队列,而且已有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的mq_receive调用中的前提下,通知才会发出。即说明,在mq_receive调用中的阻塞比任何通知的注册都优先。

(5)当前通知被发送给它的注册进程时,其注册即被撤销。该进程必须再次调用mq_notify以重新注册。

sigevent结构如下:

union sigval{ 
int    sival_int;          /*integer value*/ 
void    *sival_ptr;        /*pointer value*/ 
}; 
 
struct sigevent{ 
int    sigev_notify;      /*SIGEV_{NONE, SIGNAL, THREAD}*/ 
int    sigev_signo;        /*signal number if SIGEV_SIGNAL*/ 
 
union sigval    sigev_value; 
 
void    (*sigev_notify_function)(union sigval); 
pthread_attr_t  *sigev_notify_attributes; 
}; 

5.1 mq_notify() 使用信号处理程序

一个正确的使用非阻塞mq_receive的信号通知的例子:

#include <unistd.h>
#include <stdio.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <signal.h>
#include <stdlib.h>
#include <errno.h>


void sig_usr1(int );
volatile sig_atomic_t mqflag;

int main(int argc, char* argv[])
{
    mqd_t mqid = 0;
    void *buff;
    struct mq_attr attr;
    struct sigevent sigev;
    sigset_t zeromask,newmask,oldmask;

int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
    mqid = mq_open("/mq_test",O_RDONLY | O_NONBLOCK,mode,NULL);  // 非阻塞式打开mqueue
    mq_getattr(mqid,&attr);
    buff = malloc(attr.mq_msgsize);

sigemptyset(&zeromask);
    sigemptyset(&newmask);
    sigemptyset(&oldmask);
    sigaddset(&newmask,SIGUSR1);    // 初始化信号集
    signal(SIGUSR1,sig_usr1);      // 信号处理程序
    sigev.sigev_notify = SIGEV_SIGNAL;
    sigev.sigev_signo = SIGUSR1;
    int n = mq_notify(mqid,&sigev);  // 启用通知

for (;;)
    {
        sigprocmask(SIG_BLOCK,&newmask,&oldmask);
        while(mqflag == 0)
            sigsuspend(&zeromask);

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/12866.html