SystemV-消息队列与责任链模式
一、SystemV 消息队列
1. 消息队列API
Ftok
- 函数定义:
key_t ftok(const char *pathname, int proj_id);
- 函数作用:
获取唯一的key
值标识符,用于标识系统V消息队列。 - 参数解释:
pathname
:有效的文件路径(需存在)。proj_id
:小于255的整数数字(通常为非零值)。- 返回值:唯一的
key
值,失败时返回-1
。
Msgget
- 函数定义:
int msgget(key_t key, int msgflg);
- 函数作用:
创建或获取内核消息队列,返回消息队列ID(文件描述符)。 - 参数解释:
key
:由ftok
函数返回的有效key
值。msgflg
:标志位(常用宏组合):IPC_CREATE
:若队列不存在则创建,否则获取已有队列ID。IPC_EXCL
:需与IPC_CREATE
联合使用,若队列存在则返回错误(避免重复创建)。- 权限模式(如
0666
):设置队列的访问权限。
相关指令
- 查看内核消息队列:
ipcs -q
- 删除消息队列:
ipcrm -q <队列ID>
- 特性:消息队列是全双工的,支持双向通信。
Msgsnd(发送消息)
- 函数定义:
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
- 自定义结构体:
struct msgbuf { long mtype; // 消息类型(正整数) char mtext[N]; // 消息内容(自定义大小) };
- 细节:
msgp
指向msgbuf
结构体地址。msgsz
必须为sizeof(msgbuf.mtext)
(仅计算内容部分,不含mtype
)。
Msgrcv(接收消息)
- 函数定义:
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long mtype, int msgflg);
- 细节:参数与
msgsnd
类似,通过mtype
指定接收的消息类型(如0
表示接收任意类型)。
Msgctl(控制消息队列)
- 函数定义:
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
- 常用
cmd
参数:IPC_STAT
:获取消息队列属性(存入buf
)。IPC_RMID
:删除消息队列(立即标记为删除,后续无法访问)。
2. System-V 基本通信代码(C-S架构)
#pragma once
#include <iostream>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <stdlib.h>
#include <string.h>
#include <sys/msg.h>
#include "Log.hpp" #define PATHNAME "./Queue"
#define PROID 123
#define CREATEMSGQUEUE (IPC_CREAT | IPC_EXCL | 0666)
#define GETMSGQUEUE (IPC_CREAT) enum { SERVER_MSG_TYPE = 1, CLIENT_MSG_TYPE
} TYPE; using namespace ns_log; const int default_fd = -1;
const int default_size = 1024; // 1k class MsgQueue {
private: int _msgfd; struct msgbuf { long mtype; char mtext[default_size]; }; public: MsgQueue() : _msgfd(default_fd) {} // 创建消息队列 void Create(int flag) { key_t key = ::ftok(PATHNAME, PROID); if (key == -1) { LOG(FATAL, "ftok错误,error : %s\n", strerror(errno)); abort(); } _msgfd = msgget(key, flag); if (_msgfd == -1) { LOG(FATAL, "创建消息队列失败\n"); abort(); } LOG(DEBUG, "创建消息队列完毕\n"); } // 发送消息 void Send(int type, const std::string &in) { struct msgbuf buf; memset(&buf, 0, sizeof(buf)); buf.mtype = type; memcpy(buf.mtext, in.c_str(), in.size()); int n = msgsnd(_msgfd, &buf, sizeof(buf.mtext), 0); if (n == -1) { LOG(ERROR, "发送消息失败\n"); return; } LOG(DEBUG, "发送消息完毕\n"); } // 读取数据 void Recv(int type, std::string *out) { struct msgbuf buf; int n = msgrcv(_msgfd, &buf, sizeof(buf.mtext), type, 0); if (n == -1) { LOG(ERROR, "接收消息错误\n"); return; } *out = std::string(buf.mtext, n); }
};
二、责任链模式
1. 模式介绍
- 定义:责任链模式是一种行为型设计模式,通过将请求处理对象连成一条链,使每个对象可以选择处理请求或传递给下一个对象,从而解耦请求的发送者与接收者。
- 核心思想:请求沿链传递,直到被处理或链结束,每个节点可动态决定是否处理请求或转发。
2. 业务需求
- 格式化消息:为接收的信息添加时间戳和进程PID。
- 持久化保存:将消息保存到文件。
- 文件分片:当文件过大时,按规则分片并保存到指定目录(Linux下重命名为原子操作)。
3. 编码实现
责任链基类
#pragma once
#include <iostream>
#include <string>
#include <memory>
#include <sstream>
#include <fstream>
#include <filesystem>
#include <ctime>
#include <unistd.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/wait.h> const std::string default_filepath = "./log/";
const std::string default_filename = "test.log";
static std::string delfile; // 责任链基类
class Handler {
public: virtual ~Handler() {} virtual void Execute(const std::string &text) = 0; void SetNext(std::shared_ptr<Handler> next) { _next = next; } void Enable() { _isEnable = true; } void Disable() { _isEnable = false; } protected: std::shared_ptr<Handler> _next; bool _isEnable = true; // 获取当前时间(带PID) virtual std::string GetCurrTime() { time_t now_time = time(nullptr); struct tm *curr_time = localtime(&now_time); char buf[128]; snprintf(buf, sizeof(buf), "[%d-%02d-%02d %02d:%02d:%02d][%d]", curr_time->tm_year + 1900, curr_time->tm_mon + 1, curr_time->tm_mday, curr_time->tm_hour, curr_time->tm_min, curr_time->tm_sec, getpid()); return buf; }
};
格式化处理节点
// 格式化处理节点
class HandlerFormatText : public Handler {
public: void Execute(const std::string &text) override { std::string formattext = text; if (_isEnable) { std::stringstream ss; ss << GetCurrTime() << text << "\n"; formattext = ss.str(); LOG(INFO, "格式化完毕,结果: %s\n", formattext.c_str()); } if (_next) _next->Execute(formattext); else LOG(DEBUG, "到达责任链结尾,本次任务处理完毕\n"); }
};
保存文件节点
// 保存文件节点
class HandleSaveFile : public Handler {
private: std::string _filepath; std::string _filename; // 创建目录(若不存在) void CreateFilepath() { try { std::filesystem::create_directories(_filepath); } catch (std::filesystem::filesystem_error &error) { LOG(ERROR, "创建⽬录%s失败\n", error.what()); } } public: HandleSaveFile(const std::string &filepath = default_filepath, const std::string &filename = default_filename) : _filepath(filepath), _filename(filename) { if (!std::filesystem::exists(_filepath)) CreateFilepath(); } void Execute(const std::string &text) override { std::string file = _filepath + _filename; if (_isEnable) { std::ofstream ofs(file, std::ios::app); if (!ofs.is_open()) { LOG(ERROR, "%s打开失败\n", file.c_str()); return; } ofs << text; ofs.close(); LOG(INFO, "保存到⽂件完毕\n"); } if (_next) _next->Execute(file); else LOG(DEBUG, "到达责任链结尾,本次任务处理完毕\n"); }
};
文件分片处理节点
const int default_maxline = 5; // 可配置的最大行数 // 检测文件大小并分片
class HandleBackUp : public Handler {
private: std::string _filepath; std::string _filename; int _maxline; // 生成带时间戳的备份后缀 virtual std::string GetCurrTime() override { time_t now_time = time(nullptr); struct tm *curr_time = localtime(&now_time); char buf[128]; snprintf(buf, sizeof(buf), ".%d_%02d_%02d_%02d_%02d_%02d_%d", curr_time->tm_year + 1900, curr_time->tm_mon + 1, curr_time->tm_mday, curr_time->tm_hour, curr_time->tm_min, curr_time->tm_sec, getpid()); return buf; } // 检查文件是否超过分片阈值 bool IsOutRange(const std::string &file) { std::ifstream ifs(file); if (!ifs.is_open()) return false; int lines = 0; std::string line; while (std::getline(ifs, line)) ++lines; ifs.close(); return lines > _maxline; } // 执行分片备份(子进程处理) void BackUp(const std::string &file) { std::string suffix = GetCurrTime(); std::string backfilename = file + suffix; std::string tgzfilesrc = _filename + suffix; std::string tgzfiledst = tgzfilesrc + ".tgz"; pid_t pid = fork(); if (pid == 0) { std::filesystem::rename(file, backfilename); std::filesystem::current_path(_filepath); execlp("tar", "tar", "-czf", tgzfiledst.c_str(), tgzfilesrc.c_str(), nullptr); exit(1); } } public: HandleBackUp(const std::string &filepath = default_filepath, const std::string &filename = default_filename, int maxline = default_maxline) : _filepath(filepath), _filename(filename), _maxline(maxline) {} void Execute(const std::string &text) override { if (_isEnable) { std::string file = _filepath + _filename; if (IsOutRange(file)) BackUp(file); LOG(INFO, "分⽚检测处理完毕\n"); } if (_next) _next->Execute(text); else LOG(DEBUG, "到达责任链结尾,本次任务处理完毕\n"); }
};
责任链入口
// 责任链入口类
class HandlerEntry {
private: std::shared_ptr<HandlerFormatText> _format; std::shared_ptr<HandleSaveFile> _save; std::shared_ptr<HandleBackUp> _backup; public: HandlerEntry() { // 初始化节点 _format = std::make_shared<HandlerFormatText>(); _save = std::make_shared<HandleSaveFile>(); _backup = std::make_shared<HandleBackUp>(); // 连接责任链 _format->SetNext(_save); _save->SetNext(_backup); // 处理子进程僵尸状态 signal(SIGCHLD, [](int signum) { int status = 0; pid_t id = 0; while ((id = waitpid(-1, &status, WNOHANG)) > 0) { if (WIFEXITED(status)) { LOG(DEBUG, "删除临时备份文件: %s\n", delfile.c_str()); std::filesystem::remove(delfile); } } }); } // 执行责任链处理 void Run(std::string &text) { _format->Execute(text); }
};
4. 模式特点
- 解耦性:发送者与接收者解耦,请求处理逻辑可动态组合。
- 扩展性:新增处理节点无需修改原有代码,符合开闭原则。
- 灵活性:可通过启用/禁用节点动态调整处理流程(如
Enable/Disable
方法)。
通过结合SystemV消息队列与责任链模式,可实现高效的进程间通信及灵活的消息处理流水线,满足消息格式化、持久化及分片等复杂业务需求。