System V 消息队列

消息队列可以认为是一个消息列表,消息队列是随着内核持续的,本文主要介绍System V的消息队列相关概念和使用方法。

消息队列

  • 消息队列提供了一个从一个进程向另一个进程发送一块数据的方法
  • 每个数据块都被认为是有一个类型,接受者进程接受的数据块可以有不同的类型值
  • 消息队列也有管道一样的不足,解释每个消息的最大长度是有上限的(MSGMAX),每个消息队列总的字节数是有上限的(MSGMNB),系统上消息队列的总数也有一个上限(MSGMNI)

IPC对象数据结构

  • 内核为每个IPC对象维护一个数据结构
1
2
3
4
5
6
7
8
9
struct ipc_perm {
key_t _key; /*Key supplied to xxxget*/
uid_t uid; /* Effective UID of owner*/
uid_t gid; /* Effective GID of owner*/
uid_t cuid; /* Effective UID of creator*/
uid_t cgid; /* Effective GID of creator*/
unsigned short mode; /*Permissions*/
unsigned short _seq; /*sequence number*/
}

消息队列结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct msqid_ds {
struct ipc_perm msg_perm; /* Ownership and permissions */
time_t msg_stime; /* Time of last msgsnd(2) */
time_t msg_rtime; /* Time of last msgrcv(2) */
time_t msg_ctime; /* Time of last change */
unsigned long __msg_cbytes; /* Current number of bytes in
queue (nonstandard) */
msgqnum_t msg_qnum; /* Current number of messages
in queue */
msglen_t msg_qbytes; /* Maximum number of bytes
allowed in queue */
pid_t msg_lspid; /* PID of last msgsnd(2) */
pid_t msg_lrpid; /* PID of last msgrcv(2) */
};

消息队列在内核中的表示


消息队列函数

1
2
3
4
5
6
7
8
#include <sys/type.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflag);
int msgctl(int msgid, int cmd, struct msqid_ds *buf);
int msgcnd(int msgqid, const void *msgp, size_t msgsz, int msgflag);
ssize_t msgrcv(int msgqid, void *msgp, size_t msgsz, long msgtyp, int msgflag);

msgget函数

  • 功能:用来创建和访问一个消息队列
  • 原型:

    1
    int msgget(key_t key, int msgflag);
  • 参数:

    • key:某个消息队列的名字
    • msgflag:由九个权限表示构成,它们的用法和创建对象时使用的mode模式表示是一样的
  • 返回值:成功返回一个非负整数,即该消息队列的标识码;失败返回-1

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(0); \
} while (0)
int main() {
int msgid;
msgid = msgget(1234, 0666);
// msgid = msgget(1234, 0666 | IPC_CREAT);
// msgid = msgget(1234, 0666 | IPC_CREAT | IPC_EXCL);
// msgid = msgget(IPC_PRIVATE, 0666 | IPC_CREAT | IPC_EXCL);
// msgid = msgget(IPC_PRIVATE, 0666);
// msgid = msgget(1234, 0);
if (msgid == -1) {
ERR_EXIT("msgget");
}
printf("msgget succ\n");
return 0;
}

创建逻辑图


msgctl函数

  • 功能:消息队列的控制函数
  • 原型:

    1
    int msgctl(int msgqid, int cmd, struct msgqid_ds *buf);
  • 参数:

    • msqid:由msgget函数返回的消息队列标识码
    • cmd:是将要采用的动作(有三个值可取(
  • 返回值:成功返回0,失败返回-1

    cmd参数

命令 说明
IPC_STAT 把msqid_ds结构中的数据设置为消息队列的当前关联值
IPC_SET 在进程有足够权限的前提下,把消息队列的当前关联值设置为msqid_ds数据结构种给出的值
IPC_RMID 删除消息队列

获取消息队列参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(0); \
} while (0)
int main() {
int msgid;
msgid = msgget(1234, 0);
if (msgid == -1) {
ERR_EXIT("msgget");
}
printf("msgget succ\n");
printf("msgid = %d\n", msgid);
struct msqid_ds buf;
if (msgctl(msgid, IPC_STAT, &buf) == -1) {
ERR_EXIT("msgctl");
}
printf("mode = %o\n", buf.msg_perm.mode);
printf("bytes = %d\n", buf.__msg_cbytes);
printf("number = %d\n", buf.msg_qnum);
printf("msgmnb = %d\n", buf.msg_qbytes);
return 0;
}

设置消息队列参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(0); \
} while (0)
int main() {
int msgid;
msgid = msgget(1234, 0);
if (msgid == -1) {
ERR_EXIT("msgget");
}
printf("msgget succ\n");
printf("msgid = %d\n", msgid);
struct msqid_ds buf;
if (msgctl(msgid, IPC_STAT, &buf) == -1) {
ERR_EXIT("msgctl");
}
buf.msg_perm.mode = 0600;
if (msgctl(msgid, IPC_SET, &buf) == -1) {
ERR_EXIT("msgctl");
}
return 0;
}

删除消息队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(0); \
} while (0)
int main() {
int msgid;
msgid = msgget(1234, 0);
if (msgid == -1) {
ERR_EXIT("msgget");
}
printf("msgget succ\n");
printf("msgid = %d\n", msgid);
if (msgctl(msgid, IPC_RMID, NULL) == -1) {
ERR_EXIT("msgctl");
}
return 0;
}

msgsnd函数

  • 功能:把一条消息添加到消息队列中
  • 原型:

    1
    int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
  • 参数:

    • msgid:由msgget函数返回的消息队列标识码。
    • msgp:是一个指针,指针指向准备发送的消息。
    • msgsz:是msgp执行的消息长度,这个长度不含保存消息类型的那个long int长整型。
    • msgflag:控制着当前消息队列满或到达系统上限时将要发生的事情。
  • 返回值:成功返回0,失败返回-1。
    注意:
  • msgflag=IPC_NOWAIT表示队列满不等待,返回EAGAIN错误。
  • 消息结构在两方面收到制约。首先,它必须小于系统规定的上限值;其次,它必须以一个long int长整数开始,接受者函数将利用这个长整数确定消息的类型 。
  • 消息结构参考形式如下:
    1
    2
    3
    4
    struct msgbuf {
    long mtype;
    char mtext[1];
    }

向消息队列发送数据例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(0); \
} while (0)
struct msgbuf
{
long mtype;
char mtext[1];
};
int main(int argc, char *argv[]) {
if (argc != 3) {
fprintf(stderr, "Usage: %s <bytes> <type>\n", argv[0]);
exit(-1);
}
int len = atoi(argv[1]);
int type = atoi(argv[2]);
int msgid;
msgid = msgget(1234, 0);
if (msgid == -1) {
ERR_EXIT("msgget");
}
struct msgbuf *ptr;
ptr = (struct msgbuf*) malloc(sizeof(long) + len);
ptr->mtype = type;
if (msgsnd(msgid, ptr, len, 0) < 0) {
ERR_EXIT("msgsnd");
}
return 0;
}


msgrcv函数

  • 功能:是从一个消息队列接收消息
  • 原型:

    1
    ssize_t msgrcv(int msgid, void *msgp, size_t msgsz, long msgtype, int msgflg);
  • 参数:

    • msgid:由msgget函数返回的消息队列标识码。
    • msgp:是一个指针,指针执行准备接收的消息。
    • msgsz:是msgp指向的消息长度,这个长度不含保存消息类型的那个long int长整型
    • msgtype:它可以实现接受优先级的简单形式
    • msgflg:控制着队列中没有相应类型的消息可供接受时将要发生的事
  • 返回值:成功返回实际放到缓冲区里去的字符个数,失败返回-1
    注意:
  • msgtype = 0 返回队列第一条消息
  • msgtype > 0 返回队列第一条类型等于msgtype的消息
  • msgtype < 0 返回队列类型最小的小于等于msgtype绝对值的消息
  • msgflg = IPC_NOWAIT,队列没有可读消息不等待,返回ENOMSG
  • msgflag = MSG_NOERROR,消息大小超过msgsz时被截断
  • msgtype > 0 且 msgflag = MSG_EXCEPT,接受类型不等于msgtype的第一条消息

消息队列实现回射客户/服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/types.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#define MSGMAX 8192
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(0); \
} while (0)
struct msgbuf
{
long mtype;
char mtext[1];
};
int main(int argc, char *argv[]) {
int flag = 0;
int type = 0;
int opt;
while (1) {
opt = getopt(argc, argv, "nt:");
if (opt == '?') {
exit(-1);
}
if (opt == -1) {
break;
}
switch (opt) {
case 'n':
flag |= IPC_NOWAIT;
break;
case 't':
type = atoi(optarg);
break;
}
}
int msgid = msgget(1234, 0);
if (msgid == -1) {
ERR_EXIT("msgget");
}
struct msgbuf *ptr;
ptr = (struct msgbuf*) malloc(sizeof(long) + MSGMAX);
ptr->mtype = type;
int n = 0;
if ((n = msgrcv(msgid, ptr, MSGMAX, type, flag)) < 0) {
ERR_EXIT("msgrcv");
}
printf("read %d bytes type=%ld\n", n, type);
return 0;
}