IPC进程通信之消息队列详解


IPC,interprocess communication,进程间通信。主要有管道,命名管道(也叫FIFO),消息队列,信号量,域套接字,共享内存等方式。在这里,我只介绍消息队列、

    Linux中,消息队列的操作,主要有msgget、msgctl、msgsnd,msgrcv等方法。具体的可以通过man 查看具体的细节。我这里说的,其实也是man介绍的内容。

msgget

作用:获取消息队列的标识符

方法原型:int msgget(key_t key, int msgflg);

描述: msgget系统调用,返回参数key相关联的消息队列标识符。当key的值是IPC_PRIVATE时,一个新的消息队列会被创建;或者当key的值不是IPC_PRIVATE,而给定的key值没有关联的消息队列,并且参数msgflg中指定了IPC_CREAT,一个新的消息队列也会被创建。

    如果msgflg参数同时指定了IPC_CREAT和IPC_EXCL,并且key关联的的消息队列已存在,则msgget失败,设置errno的值为EEXIST。(类似于open中O_CREAT|O_EXCL 的结合)

    在创建消息队列之后,参数msgflg的最低有效位定义了消息队列的权限。这些权限位有着相同的样式和语义,正如open中的mode参数。(执行权限未被使用)

    如果一个新的消息队列被创建,那么与之关联的msqid_ds数据结构按照如下形式初始化(msqid_ds结构稍后讲述):

  • msg_perm.cuid 和 msg_perm.uid  字段被设置为进程的有效user id
  • msg_perm.cgid 和 msg_perm.gid 字段被设置为进程的有效 group id
  •  msg_perm.mode的9位最低有效位被设置为msgflg的9位最低有效位
  • msg_qnum, msg_lspid, msg_lrpid, msg_stime 和 msg_rtime 初始化为0
  • msg_ctime 设置成当前时间
  • msg_qbytes 设置成系统设置的MSGMNB

    如果消息队列已经存在,则验证权限,并检查是否标记为销毁

返回值:如果成功,返回消息队列标识符(非负的整型),否则返回-1,errno说明错误原因。

错误码:

    失败时,errno会被设置成下面几个值:

  • EACCES - key关联的消息队列存在,但是进程没有进入队列的权限,并且也没有CAP_IPC_OWNER的能力。
  • EEXIST - key关联的消息队列存在,并且msgflg参数指定了IPC_CREAT 和 IPC_EXCL。
  • ENOENT - key关联的消息队列不存在,并且msgflg并未指定IPC_CREAT 。
  • ENOMEM - 消息队列应该被创建,但是系统没有足够内存。
  • ENOSPC - 消息队列应该被创建,但是系统设定的消息队列数目已达上限(MSGMNI)    

笔记

    IPC_PRIVATE不是一个标志字段,而是key_t的类型。如果key使用了这个特殊的值,系统调用会忽视任何参数除了msgflg的9位最低有效位并且创建一个新的消息队列。

    MSGMNI,系统最大消息队列数:值取决于策略。(在Linux上,可以通过读取和修改文件/proc/sys/kernel/msgmni管理MSGMNI

msgctl

作用:消息队列的控制操作

方法原型:int msgctl(int msqid, int cmd, struct msqid_ds *buf);

描述: msgctl执行参数cmd指定的控制操作,针对参数msgid指定的消息队列。

    结构体msqid_ds 定义在 <sys/msg.h> 文件中,结构如下:

struct msqid_ds {
       struct ipc_perm msg_perm;     /* 属主和权限*/
       time_t          msg_stime;    /* 最近一次调用msgsnd的时间 */
       time_t          msg_rtime;    /* 最近一次调用msgrcv的时间*/
       time_t          msg_ctime;     /* 最近一次修改的时间*/
       unsigned long   __msg_cbytes; /*队列中当前拥有的数据字节数*/
       msgqnum_t       msg_qnum;     /* 当前队列中的消息数目 */
       msglen_t        msg_qbytes;   /* 队列允许的最大字节数 */
       pid_t           msg_lspid;    /* 最近一次调用msgsnd的进程PID*/
       pid_t           msg_lrpid;    /*最近一次调用msgrcv的进程 PID*/
};
结构ipc_perm 定义在<sys/ipc.h> 中,结构如下:
struct ipc_perm {
    key_t          __key;       /* msgget提供的key*/
    uid_t          uid;         /* 属主的有效UID*/
    gid_t          gid;         /* 属主的有效GID */
    uid_t          cuid;        /*创建者的有效UID*/
    gid_t          cgid;        /* 创建者的有效GID*/
    unsigned short mode;        /* 权限 */
    unsigned short __seq;       /*序号 */
};
cmd的有效值如下:
  • IPC_STAT - 从msgid关联的内核数据结构复制到参数buf指向的msqid_ds结构体中。调用者必须有对消息队列的读权限。
  • IPC_SET - 从参数buf指向的msqid_ds结构中,写msqid_ds的一些成员值到这个消息队列相关的内核数据结构,也更新msg_ctime成员。下面的数据成员会被更新: msg_qbytes, msg_perm.uid, msg_perm.gid 和(9位最低有效位)msg_perm.mode。进程的有效UI必须匹配消息队列的属主(msg_perm.uid)或者消息队列的创建者(msg_perm.cuid),或者调用者被赋予特权。要设置msg_qbytes的值超过MSGMNB系统参数,需要合适的权限(Linux上要求CAP_IPC_RESOURCE能力)。
  • IPC_RMID -   立即删除消息队列,并且唤醒所有正在等待的读进程和写进程(伴有错误返回,并且errno设置成EIDRM)。进程必须有合适的权限或者它的有效UD是消息队列的属主或者创建者。
  • IPC_INFO - (只适合LINUX)返回参数buf指向的系统最大消息队列限制的相关信息。这个结构是msginfo类型,定义于<sys/msg.h>文静中,如果_GNU_SOURCE特性的测试宏被设置。结构如下:
struct msginfo {
    int msgpool; /* 缓冲池中已拥有的消息数据的字节数;内核未使用*/
    int msgmap;  /*消息map的最大项数; 内核未使用 */
    int msgmax;  /*  单个消息可以写入的最大字节数 */
    int msgmnb;  /*队列可以写入的最大字节数; 用于消息队列创建时字段msg_qbytes 的初始化*/
    int msgmni;  /* 消息队列的最大数目 */
    int msgssz;  /* 消息段的长度;内核未使用*/
    int msgtql;  /* 系统中所有消息队列的最大消息数;内核未使用 */
    unsigned short int msgseg;  /* 段的最大数目;内核未使用 */
};
  •     MSG_INFO(Linux专用)。返回msginfo结构,同IPC_INFO包含相同的信息,除了接下来的字段是返回消息队列消耗的系统资源信息:msgpool字段返回系统当前存在的消息队列数目;msgmap 字段返回系统中所有消息队列的总的消息数目;msgtql 字段返回系统中所有的消息队列中的所有消息的字节数.
  • MSG_STAT(Linux专用)。返回msqid_ds结构,如同IPC_STAT。但是,msqid不是消息队列的标识符,而是内核内部维持所有消息队列信息的数组索引。

返回值:

    如果成功,则IPC_STAT,  IPC_SET,  和 IPC_RMID 返回0。一个成功的IPC_INFO或MSG_INFO操作,返回在内核内部数组记录有关所有消息队列的最高使用条目的索引。(这个信息可以与重复的MSG_STAT操作一起使用,用以获得系统上所有队列的信息)。一个成功的MSG_STAT操作返回消息队列的标识符,它的索引是参数msqid。

    如果失败,返回-1.并且errno被设置

错误码:

    如果失败,错误码被设置成以下值:

  • EACCES - 参数cmd是IPC_STAT或MSG_STAT,但进程没有msqid对应的消息队列的读权限,并且也不具备CAP_IPC_OWNER的能力。
  • EFAULT - 参数cmd的值是IPC_SET或IPC_STAT,但buf指向的地址不可访问。
  • EIDRM - 消息队列被删除
  • EINVAL - 无效的cmd或者msqid值。
  • EPERM - 参数cmd的值是IPC_SET或IPC_RMID,但是进程的有效UID不是消息队列的创建者(msg_perm.cuid)或者消息队列的属主(msg_perm.uid),进程也没有特权。(对于Linux,就是没有CAP_SYS_ADMIN)

msgsnd和msgrcv

作用:消息操作。分别用于发送和接收消息

方法原型:int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

方法原型:ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,  int msgflg);

描述: msgsnd和msgrcv系统调用,分别用于发送消息到消息队列和从消息队列接收消息。发送消息时,进程必须拥有写权限,接收消息时,进程必须拥有读权限。

    参数msgp是一个调用者定义的结构体,一般的形式如下:

struct msgbuf {
    long mtype;       /* 消息类型, 必须> 0 */
    char mtext[1];    /* 消息数据 */
};
     mtext字段,是一个数组(或者其他结构),它的长度由参数msgsz指定,msgsz是非负的整型类型,长度为0的消息是允许的。mtype字段必须严格地是一个正整数,这个值被用于接收进程的消息选择。

msgsnd()

    msgsnd()调用,追加一个msgp指向的消息副本到参数msqid指向的标识符的消息队列中。如果队列的空间充足,msgsnd立即成功返回(队列的容量定义在msg_bytes字段)。如果空间不充足,msgsnd的默认行为是阻塞直到空间可获得。如果参数msgflg指定了IPC_NOWAIT,那么调用会立即返回失败,并且设置errno为EAGAIN。

    阻塞的msgsnd()调用也会返回失败,当遇到以下情况:

  • 队列被删除了,系统调用返回失败,errno设置为EIDRM
  • 一个信号被捕捉。在这种情况下,errno设置为EINTR。msgsnd()在被信号处理函数中断后,永远不会自动地重启,不管信号处理函数设置了SA_RESTART标志。

msgsnd()调用成功后,消息队列数据结构会进行如下更新:

    msg_lspid - 设置成进程的PID

    msg_qnum - 加1

    msg_stime - 设置成当前时间

msgrcv()

    msgrcv()系统调用,删除msqid对应的队列中的消息,并把消息放入到参数msgp指向的缓冲中。

    参数msgsz指定了参数msgp指向的结构中的mtext字段的最大字节数。如果消息的数据长度大于msgsz,那么调用的行为取决于MSG_NOERROR是否在参数msgflg中被指定。如果MSG_NOERROR被指定,那么消息文本会被截断(截断的部分丢弃);如果MSG_NOERROR没有被指定,那么消息不会从队列中删除,并且调用失败,返回-1,设置errno为E2BIG。

    参数msgtyp指定了如下的一些消息类型:

  • 如果msgtyp的值是0,那么读取队列的第一个消息
  • 如果msgtyp大于0,那么队列中第一个类型为msgtyp的消息被读取。除非MSG_EXCEPT在msgflg中被指定,在这种情况下,将读取类型不等于msgtyp的队列中的第一条消息。
  • 如果msgtyp小于0,那么会读取第一条类型小于等于msgtyp绝对值的消息。

参数msgflg由0构成的一个位掩码,或以下的一些标志:

  • IPC_NOWAIT -  如果队列中没有指定的类型,立即返回失败。errno设置成ENOMSG。
  • MSG_EXCEPT -  与大于0的msgtyp值一起使用,用于读取第一个类型不同于msgtyp的消息。
  • MSG_NOERROR - 如果消息长度大于参数msgsz,那么截断消息。

    如果队列中没有请求的类型,并且IPC_NOWAIT没有在msgflg参数中指定,那么进程会一直阻塞,直到以下情况发生:

  • 队列中有了预期的消息类型
  • 消息队列从系统中删除。这种情况下,调用失败。设置errno为EIDRM
  • 进程捕捉到了信号。这种情况下,调用返回失败,设置errno为EINTR。 msgrcv() 在被信号处理函数中断后,永远不会自动地重启,不管信号处理函数设置了SA_RESTART标志。 

消息成功完成之后,消息队列数据结构会进行如下更新:

    msg_lrpid - 设置成进程的PID

    msg_qnum - 减1

    msg_rtime - 设置成当前时间

返回值:

    调用失败时,返回-1,并且errno表示了错误的类型。反之,成功时,msgsnd返回0,msgrcv返回实际拷贝到mtext数组的字节数。

错误码:

    当msgsnd()失败时,errno会设置成以下的一些值

  • EACCES - 进程没有对消息队列的写权限,并且没有CAP_IPC_OWNER能力
  • EAGAIN - 消息因为队列的msg_qbytes限制而不能发送,并且IPC_NOWAIT在参数msgflg中被指定
  • EFAULT - msgp指针不可访问
  • EIDRM - 消息队列被删除
  • EINTR - 在满的消息队列中睡眠的情况下,进程捕捉到了信号
  • EINVAL - 无效的msqid值,mtype值非正数,或者无效的msgsz值(小于0或者大于系统值MSGMAX)
  • ENOMEM - 系统没有足够的内存去制造msgp指向的消息的副本。

当msgrcv()调用失败时,errno会被设置成以下一些值:

  • E2BIG - 消息的文本长度大于参数msgsz,并且msgflg参数没有指定MSG_NOERROR
  • EACCES - 进程对消息队列没有读权限,并且没有CAP_IPC_OWNER能力。
  • EAGAIN - 队列中没有消息可以获取,并且msgflg参数指定IPC_NOWAIT。
  • EFAULT - msgp指向的地址不可访问
  • EIDRM - 当进程阻塞于接收消息时,消息队列被删除
  • EINTR - 当进程阻塞于接收消息时,进程捕捉到一个信号
  • EINVAL - 参数msgqid无效,或者msgsz小于0
  • ENOMSG - msgflg参数指定了IPC_NOWAIT,并且在队列中没有指定的消息类型可以读取。

代码示例

    从上面的讲解中,我们了解了消息队列的四个方法。接下来,我们看下示例代码。

消息发送进程,msgdemo_send.cpp:

#include <iostream>
#include <sys/msg.h>
#include <cstdlib>
#include <cerrno>
#include <cstring>
#include <stdexcept>
#include <cstdio>

using namespace std;

void PrintError()
{
    cerr<<"id error. errno:"<<errno<<", reason="<<strerror(errno)<<endl;
}


void PrintMsg(msqid_ds & msg)
{
    cout<<"stime:"<<msg.msg_stime<<endl;   
    cout<<"rtime:"<<msg.msg_rtime<<endl;   
    cout<<"ctime:"<<msg.msg_ctime<<endl;   
    cout<<"__msg_cbytes:"<<msg.__msg_cbytes<<endl;   
    cout<<"msg_qnum:"<<msg.msg_qnum<<endl;   
    
    cout<<"msg_qbytes:"<<msg.msg_qbytes<<endl;   
    cout<<"msg_lspid:"<<msg.msg_lspid<<endl;   
    cout<<"msg_lrpid:"<<msg.msg_lrpid<<endl;   
}

int main(int argc, char **argv)
{
    int key = 212;
    int type = 1;

    if (argc >= 2)
    {
        key = atoi(argv[1]);    
        type = argc > 2 ? atoi(argv[2]): 1;
    }

    int msgid = msgget(key, IPC_CREAT);
 
    if (-1 == msgid)
    {
        PrintError();
        return 1;
    }

    cout<<"msgid:"<<msgid<<endl;

    msqid_ds dmsg;
    msgctl(msgid, IPC_STAT, &dmsg);
    PrintMsg(dmsg);

    //msg send 
    msgbuf buff;
    buff.mtype = type;
    string msg;

    while(1)
    {
        cout<<"ready to input data"<<endl;
        getline(cin, msg);
        
        sprintf(buff.mtext, "%s", msg.c_str());

        int ret= msgsnd(msgid, &buff, msg.size(), 0);

        if (ret == -1)
        {
            PrintError();
            return 2;
        }

        //after send
        cout<<"after send:"<<endl;
    
        msgctl(msgid, IPC_STAT, &dmsg);
        PrintMsg(dmsg);

        sleep(1);
    }


    return 0;
}
运行效果图如下:


    从图中,可以很明显的看出,随着消息的发送,队列中拥有的消息数目,以及队列中数据的字节数都在增加。

这时候,我们运行接收消息的进程。

接收消息的进程,msgdemo_recv.cpp

#include <iostream>
#include <sys/msg.h>
#include <cstdlib>
#include <cerrno>
#include <cstring>
#include <stdexcept>

using namespace std;

void PrintError()
{
    cerr<<"id error. errno:"<<errno<<", reason="<<strerror(errno)<<endl;
}

void PrintMsg(msqid_ds & msg)
{
    cout<<"stime:"<<msg.msg_stime<<endl;   
    cout<<"rtime:"<<msg.msg_rtime<<endl;   
    cout<<"ctime:"<<msg.msg_ctime<<endl;   
    cout<<"__msg_cbytes:"<<msg.__msg_cbytes<<endl;   
    cout<<"msg_qnum:"<<msg.msg_qnum<<endl;   
    
    cout<<"msg_qbytes:"<<msg.msg_qbytes<<endl;   
    cout<<"msg_lspid:"<<msg.msg_lspid<<endl;   
    cout<<"msg_lrpid:"<<msg.msg_lrpid<<endl;   
}

int main(int argc, char **argv)
{
    int key = 212;

    if (argc >= 2)
    {
        key = atoi(argv[1]);       
    }

    int msgid = msgget(key, IPC_CREAT);
    
    if (-1 == msgid)
    {
        PrintError();
        return 1;
    }

    cout<<"msgid:"<<msgid<<endl;

    msqid_ds dmsg;
    msgctl(msgid, IPC_STAT, &dmsg);
    PrintMsg(dmsg);

    //recv msg
    msgbuf buff;
    cout<<"ready to recv "<<endl;
    int ret = msgrcv(msgid, &buff, 1024, 2, 0);
    cout<<"read over, value:"<<buff.mtext<<endl;

    return 0;
}

接收消息的进程结果如下图:

    可以很明显的看到,随着接收消息的进程的多次执行,队列中的消息数目在减少,队列中的数据所占字节数在减小。最后一次运行时,因为队列里没有了要读取的消息,从而一直阻塞在那里。

    以上就是IPC消息队列的简单使用示例。


评论

发表评论