POSIX 消息队列

POSIX 消息队列与System V消息队列主要区别在于:Posix消息队列的读总是返回最高优先级的消息,往空队列放置消息时,Posix消息队列允许产生一个信号或启动一个线程。


mq_open函数

  • 功能:用来创建和访问一个消息队列
  • 原型:

    1
    2
    mqd_t mq_open(const char *name, int oflag);
    mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
  • 参数:

    • name:某个消息队列的名字
    • oflag:与open函数类似,可以是O_RDONLY、O_WRONLU、O_RDWR,还可以按位或上O_CREAT、O_EXCEL、O_NONBLOCK等。
    • mode:如果flag指定了O_CREAT,需要设置mode
  • 返回值:成功返回消息队列文件描述符;失败返回-1

查看消息队列方式:

1
2
mkdir /dev/mqueue
mount -t mqueue nont /dev/mqueue

POSIX IPC名字限定:

  • 必须以/打头,并且后续不能有其他/,形如/somename
  • 长度不能超过NAME_MAX

mq_close函数

  • 功能:关闭一个消息队列
  • 原型:

    1
    mqd_t mq_close(mqd_t mqdes);
  • 参数:

    • mqdes:消息队列描述符
  • 返回值:成功返回0;失败返回-1

mq_unlink函数

  • 功能:删除消息队列
  • 原型:

    1
    mqd_t mq_unlink(const char *name);
  • 参数:

    • name:消息队列的名字
  • 返回值:成功返回0;失败返回-1

mq_getattr/mq_setattr函数

  • 功能:获取/设置消息队列属性
  • 原型:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);
    mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);
    struct mq_attr {
    long mq_flags; /* Flags: 0 or O_NONBLOCK */
    long mq_maxmsg; /* Max. # of messages on queue */
    long mq_msgsize; /* Max. message size (bytes) */
    long mq_curmsgs; /* # of messages currently in queue */
    };
  • 返回值:成功返回0;失败返回-1


mq_send函数

  • 功能:发送消息
  • 原型:

    1
    mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
  • 参数:

    • mqdes:消息队列描述符
    • msg_ptr:指向消息的指针
    • msg_len:消息长度
    • msg_prio:消息优先级
  • 返回值:成功返回0;失败返回-1

实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/fcntl.h>
#include <string.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <mqueue.h>
#define ERR_EXIT(m) \
do { \
perror(m); \
exit(EXIT_FAILURE); \
} while (0)
typedef struct stu
{
char name[32];
int age;
} STU;
int main(int argc, char *argv[]) {
if (argc != 2) {
fprintf(stderr, "Usage: %s <prio>\n", argv[0]);
exit(EXIT_FAILURE);
}
mqd_t mqid;
mqid = mq_open("/abc", O_WRONLY);
if (mqid == (mqd_t)-1) {
ERR_EXIT("mq_open");
}
STU stu;
strcpy(stu.name, "test");
stu.age = 20;
mq_send(mqid, (const char*)&stu, sizeof(stu), atoi(argv[1]));
return 0;
}


mq_receive函数

  • 功能:接受消息
  • 原型:

    1
    ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msq_len, unsigned *msg_prio);
  • 参数:

    • mqdes:消息队列描述符
    • msg_ptr:可接受消息的指针
    • msg_len:消息长度
    • msg_prop:返回接收到的消息优先级
  • 返回值:成功返回接收到的消息字节数;失败返回-1
  • 注意:返回指定消息队列中最高优先级的最早消息

实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/fcntl.h>
#include <string.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <mqueue.h>
#define ERR_EXIT(m) \
do { \
perror(m); \
exit(EXIT_FAILURE); \
} while (0)
typedef struct stu
{
char name[32];
int age;
} STU;
int main(int argc, char *argv[]) {
mqd_t mqid;
mqid = mq_open("/abc", O_RDONLY);
if (mqid == (mqd_t)-1) {
ERR_EXIT("mq_open");
}
STU stu;
unsigned int prio;
struct mq_attr attr;
mq_getattr(mqid, &attr);
if (mq_receive(mqid, (char*)&stu, attr.mq_msgsize, &prio) == -1) {
ERR_EXIT("mq_receive");
}
printf("name=%s age=%d prio=%d\n", stu.name, stu.age, prio);
return 0;
}


mq_notify函数

  • 功能:建立或者删除消息到达通知事件
  • 原型:

    1
    mqd_t mq_notify(mqd_t mqdes, const struct sigevent *notification);
  • 参数:

    • mqdes:消息队列描述符
    • notification:
      • 非空表示当前消息到达且消息队列先前为空,那么将得到通知;
      • NULL表示撤销已注册的通知
  • 返回值:成功返回0;失败返回-1
  • 通知方式:
    • 产生一个信号
    • 创建一个线程执行一个指定的函数

结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
union sigval { /* Data passed with notification */
int sival_int; /* Integer value */
void *sival_ptr; /* Pointer value */
};
struct sigevent {
int sigev_notify; /* Notification method */
int sigev_signo; /* Notification signal */
union sigval sigev_value; /* Data passed with
notification */
void (*sigev_notify_function) (union sigval);
/* Function used for thread
notification (SIGEV_THREAD) */
void *sigev_notify_attributes;
/* Attributes for notification thread
(SIGEV_THREAD) */
pid_t sigev_notify_thread_id;
/* ID of thread to signal (SIGEV_THREAD_ID) */
};

注意:

  • 任何时刻只能有一个进程可以被注册接受某个给定队列的通知
  • 当有一个消息到达某个先前为空的队列,而且已有一个进程被注册为接受该队列的通知时,只有没有任何线程阻塞在该队列的mq_receive调用的前提下,通知才会发出
  • 当通知被发送给他的注册进程时,其注册被撤销。进程必须再次调用mq_notify以重新注册(如果需要的话),重新注册要放在消息队列读出消息之前而不是之后

实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/fcntl.h>
#include <string.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <mqueue.h>
#include <signal.h>
#define ERR_EXIT(m) \
do { \
perror(m); \
exit(EXIT_FAILURE); \
} while (0)
typedef struct stu
{
char name[32];
int age;
} STU;
mqd_t mqid;
int size;
struct sigevent sigev;
void handle_sigusr1(int sig) {
mq_notify(mqid, &sigev);
unsigned int prio;
STU stu;
if (mq_receive(mqid, (char*)&stu, size, &prio) == -1) {
ERR_EXIT("mq_receive");
}
printf("name=%s age=%d prio=%d\n", stu.name, stu.age, prio);
}
int main(int argc, char *argv[]) {
mqid = mq_open("/abc", O_RDONLY);
if (mqid == (mqd_t)-1) {
ERR_EXIT("mq_open");
}
struct mq_attr attr;
mq_getattr(mqid, &attr);
size = attr.mq_msgsize;
signal(SIGUSR1, handle_sigusr1);
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
mq_notify(mqid, &sigev);
for (;;) {
pause();
}
return 0;
}