Linux进程学习之:消息队列

以你之姓@ 2023-06-13 11:23 75阅读 0赞

消息队列是一个链式队列

创建消息队列需要用到msgget函数,函数声明在“sys/msg.h”头文件里
函数声明如下:

  1. int msgget(key_t key, int msgflg);

msgget 函数有两个参数,第一个是key,可以传入宏 IPC_PRIVATE或者有ftok函数生成的key值,如果传入的是IPC_PRIVATE宏,消息队列将用于具有亲缘关系的进程间通信,如果是ftok函数生成的key值,则可以用于非亲缘关系的进程间通信,第二个参数是生成的消息队列的读写权限,一般使用0777

下面贴一段生成消息队列的代码

  1. #include <iostream>
  2. #include <sys/msg.h>
  3. #include <stdlib.h>
  4. using namespace std;
  5. int main() {
  6. int msgid = msgget(IPC_PRIVATE, 0777);
  7. if ( msgid == -1) {
  8. cout << "Create msgid failed ..." << endl;
  9. return -1;
  10. }
  11. else
  12. cout << "Create msgid successfully ..." << endl;
  13. system("ipcs -q");
  14. return 0;
  15. }

最后的system(“ipcs -q”)的作用是将系统中所有的消息的队列信息打印出来,包括刚才创建的消息队列

删除消息队列用的msgctl函数,同样声明在“sys/msg.h”头文件中
声明如下:

  1. int msgctl(int msqid, int cmd, struct msqid_ds *buf);

第一个参数是告诉函数要操作的msg对象的id,第二个参数是告诉函数要怎么做,因为我们要删除创建的消息队列,所以传入的IPC_RMID宏,第三个参数直接传入空就可以

贴一个创建之后又删除消息队列的问题

  1. #include <iostream>
  2. #include <sys/msg.h>
  3. #include <stdlib.h>
  4. using namespace std;
  5. int main() {
  6. int msgid = msgget(IPC_PRIVATE, 0777);
  7. if ( msgid == -1) {
  8. cout << "Create msgid failed ..." << endl;
  9. return -1;
  10. }
  11. else
  12. cout << "Create msgid successfully, msgid = " << msgid <<endl;
  13. system("ipcs -q");
  14. // do something ...
  15. //delete msg
  16. int ret = msgctl(msgid, IPC_RMID, NULL);
  17. if (ret == -1) {
  18. cout << "delete msg failed ..." << endl;
  19. return -2;
  20. }
  21. else
  22. cout << "delete msg successfully ..." << endl;
  23. system("ipcs -q");
  24. return 0;
  25. }

接下来说消息队列的消息发送和接受,消息发送用的msgsnd函数,接受用的是msgrcv函数,两个函数声明分别如下:

  1. int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
  2. ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

1、msgsnd函数第一个参数是消息队列的id,第二个参数是一个指向用户自定义结构体的指针,不过这个结构体必须具有如下格式,第一个是参数类型,必须大于0,第二个是一个字符数组,大小可以自己定,不过也必须大于0:

  1. struct msgbuf {
  2. long mtype; /* message type, must be > 0 */
  3. char mtext[1]; /* message data */
  4. };

msgsnd函数的第三个参数是阻塞标志,0表示阻塞,还有其他的各种标志,可以自己去看

2、msgrcv函数和msgsnd函数的区别是只有第四个参数不一样,第四个参数是意思是当前队列消息的类型,需要明确才可以,如果输入的消息类型有误,程序就会被阻塞

接下来贴一个消息队列发送和接受消息的代码:

  1. #include <iostream>
  2. #include <sys/msg.h>
  3. #include <stdlib.h>
  4. #include <string.h>
  5. using namespace std;
  6. struct msgBuff {
  7. long type;
  8. char voltage[124];
  9. };
  10. int main() {
  11. int msgid = msgget(IPC_PRIVATE, 0777);
  12. if ( msgid == -1) {
  13. cout << "Create msgid failed ..." << endl;
  14. return -1;
  15. }
  16. else
  17. cout << "Create msgid successfully, msgid = " << msgid <<endl;
  18. system("ipcs -q");
  19. msgBuff sendBuf, rcvBuf;
  20. sendBuf.type = 100;
  21. cout << "please input message: ";
  22. cin >> sendBuf.voltage;
  23. msgsnd(msgid, (void *)&sendBuf, sizeof(sendBuf.voltage), 0);
  24. memset(rcvBuf.voltage, 0, sizeof(rcvBuf.voltage));
  25. int ret = msgrcv(msgid, (void *)&rcvBuf, sizeof(rcvBuf.voltage), 100, 0);
  26. if (ret == -1){
  27. cout << "read rcvBuf failed ..." << endl;
  28. }
  29. else {
  30. cout << "read rcvBuf successfully, rcvBuf = " << rcvBuf.voltage << endl;
  31. }
  32. //delete msg
  33. ret = msgctl(msgid, IPC_RMID, NULL);
  34. if (ret == -1) {
  35. cout << "delete msg failed ..." << endl;
  36. return -2;
  37. }
  38. else
  39. cout << "delete msg successfully ..." << endl;
  40. system("ipcs -q");
  41. return 0;
  42. }

运行以上代码,可以实现先往消息队列写入信息,接着从消息队列读取信息

接下来贴一个两个无亲缘关系的进程间通过消息队列通信的代码:

首先是写进程代码:

  1. #include <iostream>
  2. #include <sys/msg.h>
  3. #include <stdlib.h>
  4. #include <string.h>
  5. #include <sys/ipc.h>
  6. using namespace std;
  7. struct msgBuff {
  8. long type;
  9. char voltage[124];
  10. };
  11. int main() {
  12. int key = ftok("./a", 1);
  13. if (key < 0) {
  14. cout << "Create key failed" << endl;
  15. return -1;
  16. }
  17. else
  18. cout << "Create key successfully, key = " << key << endl;
  19. int msgid = msgget(key, IPC_CREAT | 0777);
  20. if ( msgid == -1) {
  21. cout << "Create msgid failed ..." << endl;
  22. return -1;
  23. }
  24. else
  25. cout << "Create msgid successfully, msgid = " << msgid <<endl;
  26. system("ipcs -q");
  27. msgBuff sendBuf;
  28. sendBuf.type = 100;
  29. while(1) {
  30. cout << "please input message: ";
  31. cin >> sendBuf.voltage;
  32. msgsnd(msgid, (void *)&sendBuf, sizeof(sendBuf.voltage), 0);
  33. }
  34. int ret = msgctl(msgid, IPC_RMID, NULL);
  35. if (ret == -1) {
  36. cout << "delete msg failed ..." << endl;
  37. return -2;
  38. }
  39. else
  40. cout << "delete msg successfully ..." << endl;
  41. system("ipcs -q");
  42. return 0;
  43. }

然后是读进程的代码

  1. #include <iostream>
  2. #include <sys/msg.h>
  3. #include <stdlib.h>
  4. #include <string.h>
  5. #include <sys/ipc.h>
  6. using namespace std;
  7. struct msgBuff {
  8. long type;
  9. char voltage[124];
  10. };
  11. int main() {
  12. int key = ftok("./a", 1);
  13. if (key < 0) {
  14. cout << "Create key failed" << endl;
  15. return -1;
  16. }
  17. else
  18. cout << "Create key successfully, key = " << key << endl;
  19. int msgid = msgget(key, IPC_CREAT | 0777);
  20. if ( msgid == -1) {
  21. cout << "Create msgid failed ..." << endl;
  22. return -1;
  23. }
  24. else
  25. cout << "Create msgid successfully, msgid = " << msgid <<endl;
  26. system("ipcs -q");
  27. msgBuff rcvBuf;
  28. while(1) {
  29. memset(rcvBuf.voltage, 0, sizeof(rcvBuf.voltage));
  30. msgrcv(msgid, (void *)&rcvBuf, sizeof(rcvBuf.voltage), 100, 0);
  31. cout << "read buff:" << rcvBuf.voltage << endl;
  32. }
  33. int ret = msgctl(msgid, IPC_RMID, NULL);
  34. if (ret == -1) {
  35. cout << "delete msg failed ..." << endl;
  36. return -2;
  37. }
  38. else
  39. cout << "delete msg successfully ..." << endl;
  40. system("ipcs -q");
  41. return 0;
  42. }

运行以上代码:可以实现在写进程写入消息,在读进程读取信息的功能

接下里贴一个可以在server进程和client进程双向通信的代码:

首先是server代码:

  1. #include <iostream>
  2. #include <sys/msg.h>
  3. #include <stdlib.h>
  4. #include <string.h>
  5. #include <sys/ipc.h>
  6. #include <unistd.h>
  7. using namespace std;
  8. struct msgBuff {
  9. long type;
  10. char voltage[124];
  11. };
  12. int main() {
  13. int key = ftok("./a", 1);
  14. if (key < 0) {
  15. cout << "Create key failed" << endl;
  16. return -1;
  17. }
  18. else
  19. cout << "Create key successfully, key = " << key << endl;
  20. int msgid = msgget(key, IPC_CREAT | 0777);
  21. if ( msgid == -1) {
  22. cout << "Create msgid failed ..." << endl;
  23. return -1;
  24. }
  25. else
  26. cout << "Create msgid successfully, msgid = " << msgid <<endl;
  27. system("ipcs -q");
  28. int pid = fork();
  29. if (pid > 0) {
  30. msgBuff sendBuf;
  31. sendBuf.type = 100;
  32. while(1) {
  33. cout << "please input message: ";
  34. cin >> sendBuf.voltage;
  35. msgsnd(msgid, (void *)&sendBuf, sizeof(sendBuf.voltage), 0);
  36. }
  37. }
  38. else if (pid == 0){
  39. msgBuff rcvBuf;
  40. while(1) {
  41. memset(rcvBuf.voltage, 0, sizeof(rcvBuf.voltage));
  42. msgrcv(msgid, (void *)&rcvBuf, sizeof(rcvBuf.voltage), 200, 0);
  43. cout << "server recive: " << rcvBuf.voltage << endl;
  44. }
  45. }
  46. int ret = msgctl(msgid, IPC_RMID, NULL);
  47. if (ret == -1) {
  48. cout << "delete msg failed ..." << endl;
  49. return -2;
  50. }
  51. else
  52. cout << "delete msg successfully ..." << endl;
  53. system("ipcs -q");
  54. return 0;
  55. }

然后是client代码:

  1. #include <iostream>
  2. #include <sys/msg.h>
  3. #include <stdlib.h>
  4. #include <string.h>
  5. #include <sys/ipc.h>
  6. #include <unistd.h>
  7. using namespace std;
  8. struct msgBuff {
  9. long type;
  10. char voltage[124];
  11. };
  12. int main() {
  13. int key = ftok("./a", 1);
  14. if (key < 0) {
  15. cout << "Create key failed" << endl;
  16. return -1;
  17. }
  18. else
  19. cout << "Create key successfully, key = " << key << endl;
  20. int msgid = msgget(key, IPC_CREAT | 0777);
  21. if ( msgid == -1) {
  22. cout << "Create msgid failed ..." << endl;
  23. return -1;
  24. }
  25. else
  26. cout << "Create msgid successfully, msgid = " << msgid <<endl;
  27. system("ipcs -q");
  28. int pid = fork();
  29. if (pid > 0) {
  30. msgBuff sendBuf;
  31. sendBuf.type = 200;
  32. while(1) {
  33. cout << "please input message: ";
  34. cin >> sendBuf.voltage;
  35. msgsnd(msgid, (void *)&sendBuf, sizeof(sendBuf.voltage), 0);
  36. }
  37. }
  38. else if (pid == 0){
  39. msgBuff rcvBuf;
  40. while(1) {
  41. memset(rcvBuf.voltage, 0, sizeof(rcvBuf.voltage));
  42. msgrcv(msgid, (void *)&rcvBuf, sizeof(rcvBuf.voltage), 100, 0);
  43. cout << "server recive: " << rcvBuf.voltage << endl;
  44. }
  45. }
  46. int ret = msgctl(msgid, IPC_RMID, NULL);
  47. if (ret == -1) {
  48. cout << "delete msg failed ..." << endl;
  49. return -2;
  50. }
  51. else
  52. cout << "delete msg successfully ..." << endl;
  53. system("ipcs -q");
  54. return 0;
  55. }

运行以上代码,可以实现两个非亲缘关系进程间的双向通信,其中server进程写入消息的类型是100,读取消息的类型是200,而client进程读取消息的类型是100,写入消息的类型是200,这样双方就可以正常通信了。

发表评论

表情:
评论列表 (有 0 条评论,75人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Linux进程间通信POSIX消息队列

    消息队列可认为是一个消息链表,它允许进程之间以消息的形式交换数据。有足够写权限的进程或线程可往队列中放置消息,有足够读权限的进程或线程可从队列中取走消息。每个消息都是一个记录,