当前位置: 首页 > news >正文

SystemV-消息队列与责任链模式

一、SystemV 消息队列

1. 消息队列API

Ftok
  • 函数定义
    key_t ftok(const char *pathname, int proj_id);
  • 函数作用
    获取唯一的key值标识符,用于标识系统V消息队列。
  • 参数解释
    • pathname:有效的文件路径(需存在)。
    • proj_id:小于255的整数数字(通常为非零值)。
    • 返回值:唯一的key值,失败时返回-1
Msgget
  • 函数定义
    int msgget(key_t key, int msgflg);
  • 函数作用
    创建或获取内核消息队列,返回消息队列ID(文件描述符)。
  • 参数解释
    • key:由ftok函数返回的有效key值。
    • msgflg:标志位(常用宏组合):
      • IPC_CREATE:若队列不存在则创建,否则获取已有队列ID。
      • IPC_EXCL:需与IPC_CREATE联合使用,若队列存在则返回错误(避免重复创建)。
      • 权限模式(如0666):设置队列的访问权限。
相关指令
  • 查看内核消息队列
    ipcs -q  
    
  • 删除消息队列
    ipcrm -q <队列ID>  
    
  • 特性:消息队列是全双工的,支持双向通信。
Msgsnd(发送消息)
  • 函数定义
    int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
  • 自定义结构体
    struct msgbuf {  long mtype;       // 消息类型(正整数)  char mtext[N];   // 消息内容(自定义大小)  
    };  
    
  • 细节
    • msgp指向msgbuf结构体地址。
    • msgsz必须为sizeof(msgbuf.mtext)(仅计算内容部分,不含mtype)。
Msgrcv(接收消息)
  • 函数定义
    ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long mtype, int msgflg);
  • 细节:参数与msgsnd类似,通过mtype指定接收的消息类型(如0表示接收任意类型)。
Msgctl(控制消息队列)
  • 函数定义
    int msgctl(int msqid, int cmd, struct msqid_ds *buf);
  • 常用cmd参数
    • IPC_STAT:获取消息队列属性(存入buf)。
    • IPC_RMID:删除消息队列(立即标记为删除,后续无法访问)。

2. System-V 基本通信代码(C-S架构)

#pragma once  
#include <iostream>  
#include <unistd.h>  
#include <sys/types.h>  
#include <sys/ipc.h>  
#include <stdlib.h>  
#include <string.h>  
#include <sys/msg.h>  
#include "Log.hpp"  #define PATHNAME "./Queue"  
#define PROID 123  
#define CREATEMSGQUEUE (IPC_CREAT | IPC_EXCL | 0666)  
#define GETMSGQUEUE (IPC_CREAT)  enum {  SERVER_MSG_TYPE = 1,  CLIENT_MSG_TYPE  
} TYPE;  using namespace ns_log;  const int default_fd = -1;  
const int default_size = 1024; // 1k  class MsgQueue {  
private:  int _msgfd;  struct msgbuf {  long mtype;  char mtext[default_size];  };  public:  MsgQueue() : _msgfd(default_fd) {}  // 创建消息队列  void Create(int flag) {  key_t key = ::ftok(PATHNAME, PROID);  if (key == -1) {  LOG(FATAL, "ftok错误,error : %s\n", strerror(errno));  abort();  }  _msgfd = msgget(key, flag);  if (_msgfd == -1) {  LOG(FATAL, "创建消息队列失败\n");  abort();  }  LOG(DEBUG, "创建消息队列完毕\n");  }  // 发送消息  void Send(int type, const std::string &in) {  struct msgbuf buf;  memset(&buf, 0, sizeof(buf));  buf.mtype = type;  memcpy(buf.mtext, in.c_str(), in.size());  int n = msgsnd(_msgfd, &buf, sizeof(buf.mtext), 0);  if (n == -1) {  LOG(ERROR, "发送消息失败\n");  return;  }  LOG(DEBUG, "发送消息完毕\n");  }  // 读取数据  void Recv(int type, std::string *out) {  struct msgbuf buf;  int n = msgrcv(_msgfd, &buf, sizeof(buf.mtext), type, 0);  if (n == -1) {  LOG(ERROR, "接收消息错误\n");  return;  }  *out = std::string(buf.mtext, n);  }  
};  

二、责任链模式

1. 模式介绍

  • 定义:责任链模式是一种行为型设计模式,通过将请求处理对象连成一条链,使每个对象可以选择处理请求或传递给下一个对象,从而解耦请求的发送者与接收者。
  • 核心思想:请求沿链传递,直到被处理或链结束,每个节点可动态决定是否处理请求或转发。

2. 业务需求

  • 格式化消息:为接收的信息添加时间戳和进程PID。
  • 持久化保存:将消息保存到文件。
  • 文件分片:当文件过大时,按规则分片并保存到指定目录(Linux下重命名为原子操作)。

3. 编码实现

责任链基类
#pragma once  
#include <iostream>  
#include <string>  
#include <memory>  
#include <sstream>  
#include <fstream>  
#include <filesystem>  
#include <ctime>  
#include <unistd.h>  
#include <signal.h>  
#include <sys/types.h>  
#include <sys/wait.h>  const std::string default_filepath = "./log/";  
const std::string default_filename = "test.log";  
static std::string delfile;  // 责任链基类  
class Handler {  
public:  virtual ~Handler() {}  virtual void Execute(const std::string &text) = 0;  void SetNext(std::shared_ptr<Handler> next) { _next = next; }  void Enable() { _isEnable = true; }  void Disable() { _isEnable = false; }  protected:  std::shared_ptr<Handler> _next;  bool _isEnable = true;  // 获取当前时间(带PID)  virtual std::string GetCurrTime() {  time_t now_time = time(nullptr);  struct tm *curr_time = localtime(&now_time);  char buf[128];  snprintf(buf, sizeof(buf), "[%d-%02d-%02d %02d:%02d:%02d][%d]",  curr_time->tm_year + 1900, curr_time->tm_mon + 1,  curr_time->tm_mday, curr_time->tm_hour,  curr_time->tm_min, curr_time->tm_sec, getpid());  return buf;  }  
};  
格式化处理节点
// 格式化处理节点  
class HandlerFormatText : public Handler {  
public:  void Execute(const std::string &text) override {  std::string formattext = text;  if (_isEnable) {  std::stringstream ss;  ss << GetCurrTime() << text << "\n";  formattext = ss.str();  LOG(INFO, "格式化完毕,结果: %s\n", formattext.c_str());  }  if (_next) _next->Execute(formattext);  else LOG(DEBUG, "到达责任链结尾,本次任务处理完毕\n");  }  
};  
保存文件节点
// 保存文件节点  
class HandleSaveFile : public Handler {  
private:  std::string _filepath;  std::string _filename;  // 创建目录(若不存在)  void CreateFilepath() {  try {  std::filesystem::create_directories(_filepath);  } catch (std::filesystem::filesystem_error &error) {  LOG(ERROR, "创建⽬录%s失败\n", error.what());  }  }  public:  HandleSaveFile(const std::string &filepath = default_filepath,  const std::string &filename = default_filename)  : _filepath(filepath), _filename(filename) {  if (!std::filesystem::exists(_filepath)) CreateFilepath();  }  void Execute(const std::string &text) override {  std::string file = _filepath + _filename;  if (_isEnable) {  std::ofstream ofs(file, std::ios::app);  if (!ofs.is_open()) {  LOG(ERROR, "%s打开失败\n", file.c_str());  return;  }  ofs << text;  ofs.close();  LOG(INFO, "保存到⽂件完毕\n");  }  if (_next) _next->Execute(file);  else LOG(DEBUG, "到达责任链结尾,本次任务处理完毕\n");  }  
};  
文件分片处理节点
const int default_maxline = 5; // 可配置的最大行数  // 检测文件大小并分片  
class HandleBackUp : public Handler {  
private:  std::string _filepath;  std::string _filename;  int _maxline;  // 生成带时间戳的备份后缀  virtual std::string GetCurrTime() override {  time_t now_time = time(nullptr);  struct tm *curr_time = localtime(&now_time);  char buf[128];  snprintf(buf, sizeof(buf), ".%d_%02d_%02d_%02d_%02d_%02d_%d",  curr_time->tm_year + 1900, curr_time->tm_mon + 1,  curr_time->tm_mday, curr_time->tm_hour,  curr_time->tm_min, curr_time->tm_sec, getpid());  return buf;  }  // 检查文件是否超过分片阈值  bool IsOutRange(const std::string &file) {  std::ifstream ifs(file);  if (!ifs.is_open()) return false;  int lines = 0;  std::string line;  while (std::getline(ifs, line)) ++lines;  ifs.close();  return lines > _maxline;  }  // 执行分片备份(子进程处理)  void BackUp(const std::string &file) {  std::string suffix = GetCurrTime();  std::string backfilename = file + suffix;  std::string tgzfilesrc = _filename + suffix;  std::string tgzfiledst = tgzfilesrc + ".tgz";  pid_t pid = fork();  if (pid == 0) {  std::filesystem::rename(file, backfilename);  std::filesystem::current_path(_filepath);  execlp("tar", "tar", "-czf", tgzfiledst.c_str(),  tgzfilesrc.c_str(), nullptr);  exit(1);  }  }  public:  HandleBackUp(const std::string &filepath = default_filepath,  const std::string &filename = default_filename,  int maxline = default_maxline)  : _filepath(filepath), _filename(filename), _maxline(maxline) {}  void Execute(const std::string &text) override {  if (_isEnable) {  std::string file = _filepath + _filename;  if (IsOutRange(file)) BackUp(file);  LOG(INFO, "分⽚检测处理完毕\n");  }  if (_next) _next->Execute(text);  else LOG(DEBUG, "到达责任链结尾,本次任务处理完毕\n");  }  
};  
责任链入口
// 责任链入口类  
class HandlerEntry {  
private:  std::shared_ptr<HandlerFormatText> _format;  std::shared_ptr<HandleSaveFile> _save;  std::shared_ptr<HandleBackUp> _backup;  public:  HandlerEntry() {  // 初始化节点  _format = std::make_shared<HandlerFormatText>();  _save = std::make_shared<HandleSaveFile>();  _backup = std::make_shared<HandleBackUp>();  // 连接责任链  _format->SetNext(_save);  _save->SetNext(_backup);  // 处理子进程僵尸状态  signal(SIGCHLD, [](int signum) {  int status = 0;  pid_t id = 0;  while ((id = waitpid(-1, &status, WNOHANG)) > 0) {  if (WIFEXITED(status)) {  LOG(DEBUG, "删除临时备份文件: %s\n", delfile.c_str());  std::filesystem::remove(delfile);  }  }  });  }  // 执行责任链处理  void Run(std::string &text) {  _format->Execute(text);  }  
};  

4. 模式特点

  • 解耦性:发送者与接收者解耦,请求处理逻辑可动态组合。
  • 扩展性:新增处理节点无需修改原有代码,符合开闭原则。
  • 灵活性:可通过启用/禁用节点动态调整处理流程(如Enable/Disable方法)。

通过结合SystemV消息队列与责任链模式,可实现高效的进程间通信及灵活的消息处理流水线,满足消息格式化、持久化及分片等复杂业务需求。

相关文章:

  • Discuz!+DeepSeek:传统论坛的智能化蜕变之路
  • 鸿蒙移动应用开发--渲染控制实验
  • C++异步操作 - future async package_task promise
  • Vue项目依赖注入的使用
  • 深度学习模型搭建的基础原理详细介绍
  • C++学习笔记(三十六)——STL之排序算法
  • Java中内部类
  • 电脑硬盘丢失怎么找回?解决硬盘数据恢复的2种方法
  • win10中打开python的交互模式
  • c++STL——stack、queue、priority_queue的模拟实现
  • 【Linux】46.网络基础(3.3)
  • linux下使用wireshark捕捉snmp报文
  • GAEA的技术优势:分层加密与去中心化数据治理
  • 若依SpringCloud项目-定制微服务模块
  • macOS安全隐私最佳实践分析
  • Mujoco xml < sensor>
  • SEO长尾关键词优化核心策略
  • 【Vue】状态管理(Vuex、Pinia)
  • 深度学习训练中的显存溢出问题分析与优化:以UNet图像去噪为例
  • yaml里的挪威问题是啥
  • 门票在“缩水”,古镇怎么办
  • 一季度提高两只医药基金股票仓位,中欧基金葛兰加仓科伦药业、百利天恒
  • 海南一季度GDP为1904.17亿元,同比增长4.0%
  • 建行深圳市分行原副行长李华峰一审被判15年
  • 电商平台全面取消“仅退款”:电商反内卷一大步,行业回归良性竞争
  • 护航民营企业出海,上海设37家维权工作站、建立近百人专家团队