当前位置: 首页 > news >正文

使用OpenAMP多核框架RPMsg实现高效控制和通信设计

随着嵌入式系统的不断发展,多核设计逐渐成为主流。不同的硬件和软件架构也在不断演进,以满足日益复杂的计算需求。本文将介绍如何使用OpenAMP多核框架中的RPMsg机制,实现高效的控制和通信协议设计。

多核处理架构选择

多核处理器为嵌入式系统提供了强大的计算能力。在软件层面,大致有两种架构选择:对称多处理(SMP)和非对称多处理(AMP)。SMP适用于同构多核平台,即所有内核配置相同,操作系统的一个实例跨所有内核运行,任务自动分布于各核心之间。然而,SMP系统受限于其需要特定支持SMP操作的变体,且可能无法满足特定应用程序的需求,如将任务锁定到特定内核等。

相比之下,AMP提供了更大的灵活性,适用于异构多核平台。在AMP系统中,每个内核可以独立运行不同的操作系统(甚至不运行操作系统),各操作系统可以独立选择,无需特殊的多核支持版本。这种架构能够更好地适应不同应用的需求,比如将实时性要求高的任务分配到高性能核心,将低功耗需求的任务分配到低功耗核心,或者将资源密集型任务分配到专门的硬件加速器。

AMP系统中的通信机制

在多核设计中,内核间通信是一个关键问题。在SMP系统中,操作系统负责任务调度和通信,任务间通信通过标准的API即可完成,无需复杂的通信机制。而在AMP系统中,由于每个内核上的操作系统是独立的,因此需要专门的机制来管理内核间的通信。Remoteproc和RPMsg便是针对多核架构设计的机制。Remoteproc负责管理核心的生命周期,RPMsg则负责内核间的双向通信。

RPMsg(Remote Processor Messaging,远程处理器消息传递)是一种基于共享内存的多核间通信机制,特别适用于嵌入式系统,尤其是那些包含多个不同类型的处理器(如CPU和MCU)的系统。通过共享内存实现高效的双向通信链路,而不需要依赖硬件中断或低级通信机制。

Linux RPMsg框架是在virtio框架顶层上实现的消息传送框架,其用于主机和远程处理器进行通信。它基于virtio vring,可通过共享内存向远程CPU发送消息或从远程CPU接收消息。

这些vring是单向的,一个vring专用于发送到远程处理器的消息,另一个vring用于从远程处理器接收的消息。此外,共享缓冲区需要在两个处理器都可见的内存空间中创建。

当新消息在共享缓冲区中等待时,会使用到另一个框架 Linux Mailbox framework ,该框架将用于通知对应的Core。

依靠这些框架,RPMsg框架实现了基于不同通道的通信。通道可被文本名称标识,并有一个本地(“源”)的RPMsg地址和一个远程(“目的”)的RPMsg地址。

Linux内核源码目录给出的rpmsg client的示例代码位置如下:

samples/rpmsg/rpmsg_client_sample.c

rpmsg框架Linux内核驱动源码位于:drivers/rpmsg

RPMsg简介

RPMsg(Remote Processor Messaging),是一种专为异构多核处理系统设计的通信协议。它允许不同处理器核心之间通过共享内存高效地交换信息,为主核心和从核心之间提供了一种标准化的消息传递机制,使得这些不同架构的核心能够协同工作,最大限度地发挥它们的性能。
RPMsg的主要特点包括:

(1)基于VirtIO管理共享内存,实现了高效的数据传输;

(2)避免额外的拷贝开销,优化了内存使用;

(3)配备同步与互斥机制,确保数据交换的高效与安全;
在这里插入图片描述

RPMsg通信原理

OpenAMP的RPMsg(远程处理器消息)通信的实现原理基于VirtIO和共享内存技术,旨在为异构多核系统提供高效的核间通信机制。以下是其核心实现原理的详细分析:

  1. 架构分层与组件
    RPMsg的实现分为三层:
  • 传输层(RPMsg层):负责消息的封装与路由,定义了消息头(包含源地址、目标地址、负载长度等字段)和端点管理机制。这一层的作用是确保消息能够在系统中的不同核之间正确地传递和接收。

  • 媒体访问层(VirtIO层):通过VirtIO的虚拟队列(vring)管理共享内存的环形缓冲区,实现无锁的单写者单读者通信模型,避免内核间同步需求。VirtIO提供了一种标准化的接口,使得虚拟机中的设备能够高效地与主机或其他虚拟机进行通信。

  • 物理层:依赖共享内存和中断机制(或轮询)完成数据传输,硬件要求包括共享内存区域和可选的中断线。共享内存作为不同核之间通信的数据交换区域,而中断或轮询则用于通知对方核有新的数据待处理。

  1. 核心数据结构:vring
    vring是VirtIO的核心组件,由三部分构成:
  • 描述符池(Descriptor Pool):存储缓冲区的元数据,包括地址、长度和标志位。这些元数据描述了缓冲区的具体位置、大小以及当前的状态。

  • 可用环形缓冲区(Avail Ring):记录待发送的缓冲区索引,由发送方更新。当一个核需要向另一个核发送数据时,它会将数据放置在描述符池中的某个缓冲区,并将该缓冲区的索引放入Avail Ring。

  • 已用环形缓冲区(Used Ring):记录已处理的缓冲区索引,由接收方更新。接收方处理完数据后,会将已使用的缓冲区索引放入Used Ring,通知发送方该缓冲区可以被重新使用。

每个vring通过单写者单读者模式工作,确保无需同步原语(如信号量)即可实现高效传输。例如,发送方将消息写入Avail Ring后触发中断(或由接收方轮询),接收方处理完成后将缓冲区归还至Used Ring。

  1. 消息传递流程
    发送流程:主核从Used Ring分配缓冲区,写入RPMsg头(含源/目标地址)和有效负载,将缓冲区索引推入Avail Ring,并触发中断通知远程核。发送过程首先从Used Ring获取一个可用的缓冲区,然后在这个缓冲区中写入消息头(包含消息来源和目标地址等信息)和消息的有效负载,接着将这个缓冲区的索引写入Avail Ring,最后通过中断或轮询的方式通知远程核有新的数据可以接收。

    接收流程:远程核从Avail Ring读取缓冲区,解析消息头并路由至对应端点(Endpoint),处理完成后将缓冲区索引归还至Used Ring,通知主核释放资源。接收核从Avail Ring中读取待处理的缓冲区索引,解析缓冲区中的消息头以确定目标端点,然后将消息路由到该端点进行处理。处理完成后,接收核会将缓冲区的索引放入Used Ring,通知发送核可以重新使用这个缓冲区。

  2. 端点(Endpoint)与通道管理
    端点是RPMsg实现多路复用通信的基础。每个端点由唯一地址和回调函数标识,支持在同一通道上绑定多个接收逻辑。通道通过名称和地址对进行标识,例如在ARM与RISC-V核间通信案例中,通道名称如c906_rproc用于匹配服务。这种多路复用机制使得不同的应用或服务可以通过不同的端点在系统中进行通信。

  3. 中断与轮询机制
    默认使用中断通知对方核心处理新消息,但可通过配置vring的标志位(如F_NOTIFY)禁用中断,改为轮询模式。中断模式提供了更快的响应时间,而轮询模式则可以在不需要中断的情况下进行通信,减少中断处理的开销。虽然轮询模式减少了中断开销,但可能会增加延迟,因为接收核需要定期检查Avail Ring是否有新的数据需要处理。

  4. 与Remoteproc的协同
    Remoteproc负责远程处理器的生命周期管理(如启动、固件加载),而RPMsg依赖其初始化共享内存和vring资源。例如,在STM32MP157平台中,Remoteproc初始化virtio设备后,RPMsg才能建立通信通道。Remoteproc和RPMsg协同工作,确保远程处理器的正确初始化和通信通道的稳定建立。

OpenAMP框架下的通信设计

本文将介绍如何使用OpenAMP框架下的RPMsg来设计嵌入式系统中的通信协议。OpenAMP定义了多核框架的标准架构,许多供应商都提供了其实现。在OpenAMP框架下,每个核心运行一个独立的RPMsg实例,Remoteproc负责管理核心的启动顺序,其中一个核心被指定为“主”核心。

在具体的通信设计中,Linux核与MCU核之间的通信采用基于共享内存的RPMSG机制。为了解决RPMsg一次报文交互不能超过512-16字节的限制,我们采用nanopb(Google的protobuf序列化库),提高数据吞吐量。nanopb是一个轻量级的Protocol Buffers库,适用于嵌入式系统。它支持编译时生成C代码,确保代码的高效性和小型化。

为了简化协议设计,我们仿照RPC(远程过程调用)机制定义了RPMsg的通信报文。通信报文整体包含固定长度的RpcHeader部分和载体Payload部分。其中Payload部分使用nanopb对传输报文进行序列化和反序列化,以提高通信效率。

MCU核上的RPMsg实现

在嵌入式系统中,MCU核通常负责实时性和低功耗的任务。为了实现与Linux核之间的高效通信,我们可以使用RPMsg机制。本文将展示一个简单的MCU核应用代码示例,展示如何使用RPMsg接收和发送消息。以下仅为伪代码示例

硬件和环境初始化

首先,我们需要初始化硬件平台和RPMsg环境。假设MCU核使用裸机编程(不运行操作系统),我们将展示如何通过RPMsg与Linux核进行通信。

#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <stdbool.h>
#include "rpmsg_lite.h"
#include "rpmsg_env.h"
#include "rpmsg_env_specific.h"#define RPMSG_SERVICE_NAME "echo_demo"
#define RPMSG_EPT_ADDR 0x1000
#define RPMSG_BUFFER_SIZE 1024// RPMsg消息头定义 固定头 12个字节
#pragma pack(push, 1)
typedef struct {uint16_t head;          // 固定值0x5250 ("RP"),用于协议识别uint8_t version;        // 协议版本uint8_t type;           // 消息类型(0x01=请求,0x02=响应)uint8_t service_id;     // 服务标识符uint8_t method_id;      // 方法标识符uint16_t id;            // 唯一消息IDuint16_t status;        // 响应的状态码uint16_t payload_len;   // 载体内容长度
} RPMsgHeader;
#pragma pack(pop)// 消息类型定义
#define RPMSG_TYPE_REQUEST 0x01
#define RPMSG_TYPE_RESPONSE 0x02// 服务ID定义
#define RPMSG_SERVICE_CONFIG 0x01
#define RPMSG_SERVICE_IO 0x02// 方法ID定义
#define RPMSG_METHOD_SET_CONFIG 0x01
#define RPMSG_METHOD_GET_CONFIG 0x02
#define RPMSG_METHOD_GET_IO 0x01
#define RPMSG_METHOD_SET_IO 0x02// 固定的MagicHead值
#define MagicHead 0x5250// RPMsg消息结构体
typedef struct {RPMsgHeader header;uint8_t payload[RPMSG_BUFFER_SIZE - sizeof(RPMsgHeader)];
} RpmsgMessage;static rpmsg_lite_instance *rpmsg_inst;
static rpmsg_lite_endpoint *rpmsg_ept;// 处理接收到的消息
static void rpmsg_endpoint_cb(void *payload, uint32_t payload_len, uint32_t src, void *priv) {RpmsgMessage *msg = (RpmsgMessage *)payload;// 检查消息头if (msg->header.head != MagicHead) {printf("Received invalid RPMsg message header head\n");return;}if (msg->header.type != RPMSG_TYPE_REQUEST) {printf("Received invalid RPMsg message header type\n");return;}// 打印接收到的消息printf("Received RPMsg request: id=%d, type=%d, service_id=%d, method_id=%d, payload_len=%d\n",msg->header.id, msg->header.type, msg->header.service_id, msg->header.method_id, msg->header.payload_len);// 处理载荷数据(这里简单地复制载荷数据)memcpy(msg->payload, payload + sizeof(RPMsgHeader), msg->header.payload_len);// 修改消息头为响应msg->header.type = RPMSG_TYPE_RESPONSE;msg->header.status = 0; // 成功状态码// 发送响应消息rpmsg_lite_send(rpmsg_inst, rpmsg_ept->ept_addr, src, (char *)msg, sizeof(RPMsgHeader) + msg->header.payload_len, RL_BLOCK);
}int main(void) {// 初始化硬件平台rpmsg_env_init(NULL);// 初始化RPMsg Lite实例rpmsg_inst = rpmsg_lite_remote_init(rpmsg_env_get_memory_desc(RPMSG_LITE_LINK_ID, RL_BLOCK));if (!rpmsg_inst) {printf("Failed to initialize RPMsg Lite instance\n");return -1;}// 创建RPMsg Lite端点rpmsg_ept = rpmsg_lite_create_ept(rpmsg_inst, RPMSG_EPT_ADDR, rpmsg_endpoint_cb, NULL, RL_BLOCK);if (!rpmsg_ept) {printf("Failed to create RPMsg endpoint\n");return -1;}printf("RPMsg echo demo started\n");// 主循环等待消息while (1) {// 等待中断表示有消息到达rpmsg_env_sleep_ms(100);}// 清理RPMsg Lite实例rpmsg_lite_destroy_ept(rpmsg_inst, rpmsg_ept);rpmsg_lite_release(rpmsg_inst);rpmsg_env_deinit();return 0;
}
代码解释
  1. 硬件初始化rpmsg_env_init(NULL) 初始化RPMsg的环境,包括共享内存的管理。
  2. RPMsg实例初始化rpmsg_lite_remote_init 初始化RPMsg Lite实例。RPMSG_LITE_LINK_ID 是一个标识符,用于区分不同的RPMsg实例。
  3. 创建端点rpmsg_lite_create_ept 创建一个RPMsg端点。RPMSG_EPT_ADDR 是端点的地址,rpmsg_endpoint_cb 是消息接收回调函数。
  4. 消息回调函数rpmsg_endpoint_cb 处理接收到的消息。首先检查消息头的有效性,然后根据消息类型进行处理。在这个示例中,我们将接收到的请求载荷数据复制到响应载荷中,并将消息类型修改为响应类型。
  5. 发送响应:使用rpmsg_lite_send函数将响应消息发送回Linux核。
  6. 主循环:程序进入一个无限循环,等待消息到达。在这个示例中,我们使用rpmsg_env_sleep_ms(100) 来模拟等待中断,实际应用中应根据具体的硬件中断机制来实现。
注意事项
  • 共享内存管理:MCU核和Linux核需要共享同一块内存区域,RPMsg Lite会自动管理这部分内存。
  • 中断处理:实际应用中,MCU核应使用中断机制来通知Linux核有消息到达,而不是简单的睡眠。
  • 错误处理:在实际应用中,应增加更多的错误处理逻辑,以确保系统的稳定性和可靠性。

Linux核上的RPMsg实现

在Linux核上,我们可以使用OpenAMP提供的库来实现RPMsg客户端。以下是一个简单的Linux核RPMsg客户端代码示例:

#include "rpmsg_client.h"
#include <fcntl.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <iostream>
#include <cstring>
#include <algorithm>
#include "librpmsg.h"
#include "logger.h"// RPMsg消息头定义 固定头 12个字节
#pragma pack(push, 1)
struct RPMsgHeader {uint16_t head;          // 固定值0x5250 ("RP"),用于协议识别uint8_t version;        // 协议版本uint8_t type;           // 消息类型(0x01=请求,0x02=响应)uint8_t service_id;     // 服务标识符uint8_t method_id;      // 方法标识符uint16_t id;            // 唯一消息IDuint16_t status;        // 响应的状态码uint16_t payload_len;   // 载体内容长度
};
#pragma pack(pop)// 消息类型定义
enum RPMsgType {RPMSG_TYPE_REQUEST = 0x01,RPMSG_TYPE_RESPONSE = 0x02
};// 服务ID定义
enum RPMsgServiceId {RPMSG_SERVICE_CONFIG = 0x01,RPMSG_SERVICE_IO = 0x02
};// 方法ID定义
enum RPMsgMethodId {RPMSG_METHOD_SET_CONFIG = 0x01,RPMSG_METHOD_GET_CONFIG = 0x02,RPMSG_METHOD_GET_IO = 0x01,RPMSG_METHOD_SET_IO = 0x02
};struct RpmsgMessage {RPMsgHeader header; // 消息头std::vector<uint8_t> payload; // 消息载荷
};class RpmsgClient {
public:RpmsgClient();~RpmsgClient();static std::shared_ptr<RpmsgClient> getInstance();bool initialize(const std::string& device, int timeout_ms = 1000, int retry_count = 3);bool sendMessage(const RpmsgMessage& message);bool sendAndReceive(const RpmsgMessage& message, RpmsgMessage& response, int timeout_ms = 1000);private:int m_fd;std::string m_device;int m_timeout_ms;int m_retry_count;std::atomic<bool> m_running;std::thread m_receiveThread;std::mutex m_sendMutex;std::mutex m_callbackMutex;std::mutex m_pendingMutex;std::vector<CallbackInfo> m_callbacks;std::unordered_map<uint16_t, std::shared_ptr<PendingResponse>> m_pendingResponses;std::atomic<uint16_t> m_nextMessageId;uint16_t m_nextCallbackId;void receiveLoop();void handleMessage(const RpmsgMessage& message);void notifyCallbacks(const RpmsgMessage& message);
};RpmsgClient::RpmsgClient(): m_fd(-1), m_timeout_ms(1000), m_retry_count(3), m_running(false),m_nextMessageId(1), m_nextCallbackId(0) {
}RpmsgClient::~RpmsgClient() {close();
}std::shared_ptr<RpmsgClient> RpmsgClient::getInstance() {static std::shared_ptr<RpmsgClient> instance = std::make_shared<RpmsgClient>();return instance;
}bool RpmsgClient::initialize(const std::string& device, int timeout_ms, int retry_count) {int ret = 0;int ept_id = 0;char ept_file_path[64];const char *ctrl_name = device.c_str();const char *ept_name = "echo_demo";// Store parametersm_device = device;m_timeout_ms = timeout_ms;m_retry_count = retry_count;// Open device/* Create ept generate the device /dev/rpmsgx */ret = rpmsg_alloc_ept(ctrl_name, ept_name);if (ret < 0) {printf("rpmsg_alloc_ept for ept %s (%s) failed\n", ept_name, ctrl_name);return false;}/* Open ept */ept_id = ret;snprintf(ept_file_path, sizeof(ept_file_path), "/dev/rpmsg%d", ept_id);m_fd = open(ept_file_path, O_RDWR);if (m_fd < 0) {printf("open %s failed\n", ept_file_path);std::cerr << "Failed to open RPMsg device: " << ept_file_path << " - " << strerror(errno) << std::endl;return false;}printf("RPMsg %s init ok\n", ept_file_path);// Set non-blocking modeint flags = fcntl(m_fd, F_GETFL, 0);fcntl(m_fd, F_SETFL, flags | O_NONBLOCK);// Start receive threadm_running = true;m_receiveThread = std::thread(&RpmsgClient::receiveLoop, this);return true;
}bool RpmsgClient::sendMessage(const RpmsgMessage& message) {if (m_fd < 0) {std::cerr << "RPMsg device not initialized" << std::endl;return false;}// Prepare message bufferstd::vector<uint8_t> buffer;// Add header (id, type, flags)buffer.resize(12); // msg header size// 复制消息头memcpy(buffer.data(), &message.header, sizeof(message.header));// Add payloadbuffer.insert(buffer.end(), message.payload.begin(), message.payload.end());// Send messagestd::lock_guard<std::mutex> lock(m_sendMutex);ssize_t sent = 0;int retries = 0;while (sent < static_cast<ssize_t>(buffer.size()) && retries < m_retry_count) {ssize_t result = ::write(m_fd, buffer.data() + sent, buffer.size() - sent);if (result < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK) {// Resource temporarily unavailable, retryretries++;std::this_thread::sleep_for(std::chrono::milliseconds(10));continue;}std::cerr << "Failed to send RPMsg message: " << strerror(errno) << std::endl;return false;}sent += result;}if (sent < static_cast<ssize_t>(buffer.size())) {std::cerr << "Failed to send complete RPMsg message after " << m_retry_count << " retries" << std::endl;return false;}return true;
}bool RpmsgClient::sendAndReceive(const RpmsgMessage& message, RpmsgMessage& response, int timeout_ms) {if (m_fd < 0) {std::cerr << "RPMsg device not initialized" << std::endl;return false;}LOG(LOG_INFO) << "sendAndReceive:";// Create a unique message ID if not provideduint16_t messageId = message.header.id;if (messageId == 0) {messageId = m_nextMessageId.fetch_add(1);}// Create a pending responseauto pendingResponse = std::make_shared<PendingResponse>();pendingResponse->id = messageId;pendingResponse->completed = false;// Register the pending response{std::lock_guard<std::mutex> lock(m_pendingMutex);m_pendingResponses[messageId] = pendingResponse;}// Create a copy of the message with the assigned IDRpmsgMessage outgoingMessage = message;outgoingMessage.header.id = messageId;// Send the messageif (!sendMessage(outgoingMessage)) {// Remove the pending responsestd::lock_guard<std::mutex> lock(m_pendingMutex);m_pendingResponses.erase(messageId);return false;}// Wait for the response{std::unique_lock<std::mutex> lock(m_pendingMutex);if (!pendingResponse->cv.wait_for(lock, std::chrono::milliseconds(timeout_ms),[pendingResponse]() { return pendingResponse->completed; })) {// Timeoutm_pendingResponses.erase(messageId);LOG(LOG_WARN) << "Timeout waiting for RPMsg response" << std::endl;return false;}// Copy the responseresponse = pendingResponse->response;// Remove the pending responsem_pendingResponses.erase(messageId);}return true;
}int RpmsgClient::registerCallback(uint8_t type, MessageCallback callback) {std::lock_guard<std::mutex> lock(m_callbackMutex);uint16_t callbackId = m_nextCallbackId++;CallbackInfo info;info.id = callbackId;info.type = type;info.callback = callback;m_callbacks.push_back(info);return callbackId;
}void RpmsgClient::unregisterCallback(uint16_t callbackId) {std::lock_guard<std::mutex> lock(m_callbackMutex);auto it = std::find_if(m_callbacks.begin(), m_callbacks.end(),[callbackId](const CallbackInfo& info) { return info.id == callbackId; });if (it != m_callbacks.end()) {m_callbacks.erase(it);}
}void RpmsgClient::close() {// Stop receive threadm_running = false;if (m_receiveThread.joinable()) {m_receiveThread.join();}// Close deviceif (m_fd >= 0) {::close(m_fd);m_fd = -1;}
}void RpmsgClient::receiveLoop() {const size_t bufferSize = 1024;std::vector<uint8_t> buffer(bufferSize);while (m_running) {// Read from devicessize_t bytesRead = ::read(m_fd, buffer.data(), buffer.size());if (bytesRead < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK) {// No data available, sleep for a whilestd::this_thread::sleep_for(std::chrono::milliseconds(10));continue;}std::cerr << "Failed to read from RPMsg device: " << strerror(errno) << std::endl;break;}if (bytesRead == 0) {// End of filestd::cerr << "RPMsg device closed" << std::endl;break;}// Parse messageif (bytesRead < 12) {std::cerr << "Received incomplete RPMsg message header" << std::endl;continue;}RpmsgMessage message;// Parse message ID (little-endian)memcpy(&message.header, buffer.data(), 12);if (message.header.head != MagicHead) {std::cerr << "Received invalid RPMsg message header head" << std::endl;continue;}if (message.header.type != RPMSG_TYPE_RESPONSE) {std::cerr << "Received invalid RPMsg message header type" << std::endl;continue;}// Parse message payloadmessage.payload.assign(buffer.begin() + 12, buffer.begin() + bytesRead - 12);// Handle messagehandleMessage(message);}
}void RpmsgClient::handleMessage(const RpmsgMessage& message) {// Check if this is a response to a pending requestLOG(LOG_INFO) << "handleMessage,receive:" << message.header.id << ":" << message.header.type << ":" << message.payload.size() << std::endl;{std::lock_guard<std::mutex> lock(m_pendingMutex);auto it = m_pendingResponses.find(message.header.id);if (it != m_pendingResponses.end()) {// This is a response to a pending requestauto pendingResponse = it->second;// Set the responsependingResponse->response = message;pendingResponse->completed = true;// Notify the waiting threadpendingResponse->cv.notify_one();return;}}// Notify callbacksnotifyCallbacks(message);
}void RpmsgClient::notifyCallbacks(const RpmsgMessage& message) {// Copy callbacks to avoid holding the mutex during callback executionstd::vector<CallbackInfo> callbacks;{std::lock_guard<std::mutex> lock(m_callbackMutex);callbacks = m_callbacks;}// Call all registered callbacks for this message typefor (const auto& info : callbacks) {if (info.type == message.header.type) {info.callback(message);}}
}bool RpmsgClient::sendWithHeader(uint8_t service_id, uint8_t method_id,const void* data, size_t len,uint8_t msg_type) {if (m_fd < 0) {std::cerr << "RPMsg device not initialized" << std::endl;return false;}// 准备消息头RPMsgHeader header;header.head = MagicHead;  // "RP"header.version = 1;header.type = msg_type;header.id = m_nextMessageId.fetch_add(1);header.service_id = service_id;header.method_id = method_id;header.payload_len = len;// 准备发送缓冲区std::vector<uint8_t> buffer(sizeof(RPMsgHeader) + len);// 复制消息头memcpy(buffer.data(), &header, sizeof(RPMsgHeader));// 复制数据if (data != nullptr && len > 0) {memcpy(buffer.data() + sizeof(RPMsgHeader), data, len);}// 发送消息std::lock_guard<std::mutex> lock(m_sendMutex);ssize_t sent = 0;int retries = 0;while (sent < static_cast<ssize_t>(buffer.size()) && retries < m_retry_count) {ssize_t result = ::write(m_fd, buffer.data() + sent, buffer.size() - sent);if (result < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK) {// Resource temporarily unavailable, retryretries++;std::this_thread::sleep_for(std::chrono::milliseconds(10));continue;}std::cerr << "Failed to send RPMsg message: " << strerror(errno) << std::endl;return false;}sent += result;}if (sent < static_cast<ssize_t>(buffer.size())) {std::cerr << "Failed to send complete RPMsg message after " << m_retry_count << " retries" << std::endl;return false;}return true;
}bool RpmsgClient::sendWithResponse(uint8_t service_id, uint8_t method_id,uint8_t *data, size_t len,uint8_t msg_type) {if (m_fd < 0) {std::cerr << "RPMsg device not initialized" << std::endl;return false;}// 准备消息头RPMsgHeader header;header.head = MagicHead;  // "RP"header.version = 1;header.type = msg_type;header.id = m_nextMessageId.fetch_add(1);header.service_id = service_id;header.method_id = method_id;header.payload_len = len;RpmsgMessage sendMsg, rcvMsg;sendMsg.header = header;// 复制数据if (data != nullptr && len > 0) {sendMsg.payload.assign(data, data + len);}bool ret = sendAndReceive(sendMsg, rcvMsg, 200);if (rcvMsg.header.id != header.id) {std::cerr << "error,receive id:" << rcvMsg.header.id << std::endl;return false;}if (rcvMsg.header.service_id != header.service_id || rcvMsg.header.method_id != header.method_id) {std::cerr << "error,service_id:" << (int)rcvMsg.header.service_id << ",method_id:" << (int)rcvMsg.header.method_id << std::endl;return false;}if (rcvMsg.header.status != 0) {std::cerr << "error,status:" << (int)rcvMsg.header.status << std::endl;return false;}// 处理返回的载荷数据if (data != nullptr && len > 0) {memcpy(data, rcvMsg.payload.data(), rcvMsg.payload.size());}return ret;
}void RpmsgClient::setReceiveCallback(ReceiveCallback callback) {// 创建一个包装函数,将原始数据转换为RpmsgMessage并调用回调auto wrappedCallback = [callback](const RpmsgMessage& message) {if (callback) {callback(message.payload.data(), message.payload.size());}};// 注册回调registerCallback(0, wrappedCallback);  // 0表示接收所有类型的消息
}
代码解释
  1. 硬件和环境初始化rpmsg_env_init(NULL) 初始化RPMsg的环境。
  2. RPMsg实例初始化rpmsg_lite_remote_init 初始化RPMsg Lite实例。
  3. 创建端点rpmsg_lite_create_ept 创建一个RPMsg端点,并指定消息接收回调函数rpmsg_endpoint_cb
  4. 消息回调函数rpmsg_endpoint_cb 处理接收到的消息。首先检查消息头的有效性,然后根据消息类型进行处理。在这个示例中,我们将接收到的请求载荷数据复制到响应载荷中,并将消息类型修改为响应类型。
  5. 发送响应:使用rpmsg_lite_send函数将响应消息发送回Linux核。
  6. 主循环:程序进入一个无限循环,等待消息到达。在这个示例中,我们使用rpmsg_env_sleep_ms(100) 来模拟等待中断,实际应用中应根据具体的硬件中断机制来实现。
  7. Linux核上的RPMsg客户端:在Linux核上,我们使用C++编写一个RPMsg客户端,通过rpmsg_client.hlibrpmsg.h等库实现消息的发送和接收。

使用nanopb库扩展RPMsg

虽然RPMsg的基于共享内存的机制是高效的,但受限于一次报文交互不能超过512-16字节的限制。为了增大吞吐量,可以采用Google的nanopb库,该库是一个轻量级的Protocol Buffers库,适用于嵌入式系统。Protocol Buffers是一种语言中立、平台中立、可扩展的数据序列化格式,最初由Google开发。nanopb旨在为嵌入式设备提供高效的、小型化的protobuf实现,支持编译时生成C代码,确保代码的高效性和小型化。

示例:使用nanopb进行序列化和反序列化

假设我们有一个简单的服务需要通过RPMsg进行通信,服务ID为RPMSG_SERVICE_CONFIG,方法ID为RPMSG_METHOD_SET_CONFIG。我们将使用nanopb来序列化和反序列化消息载荷。

  1. 定义消息结构:首先定义消息结构体,并使用nanopb生成相应的序列化和反序列化代码。
  2. 序列化和反序列化:在MCU核和Linux核上分别进行序列化和反序列化操作。
定义消息结构体

.proto文件中定义消息结构体:

syntax = "proto2";package hub;message ConfigMessage {required uint8_t service_id = 1;required uint8_t method_id = 2;required uint32_t config_value = 3;
}

使用nanopb生成C代码:

nanopb_generator config.proto -I .
MCU核上的序列化和反序列化示例
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <stdbool.h>
#include "rpmsg_lite.h"
#include "rpmsg_env.h"
#include "rpmsg_env_specific.h"
#include "config.pb.h" // 由nanopb生成的头文件#define RPMSG_SERVICE_NAME "echo_demo"
#define RPMSG_EPT_ADDR 0x1000
#define RPMSG_BUFFER_SIZE 1024static rpmsg_lite_instance *rpmsg_inst;
static rpmsg_lite_endpoint *rpmsg_ept;// 处理接收到的消息
static void rpmsg_endpoint_cb(void *payload, uint32_t payload_len, uint32_t src, void *priv) {RpmsgMessage *msg = (RpmsgMessage *)payload;// 检查消息头if (msg->header.head != MagicHead) {printf("Received invalid RPMsg message header head\n");return;}if (msg->header.type != RPMSG_TYPE_REQUEST) {printf("Received invalid RPMsg message header type\n");return;}// 打印接收到的消息printf("Received RPMsg request: id=%d, type=%d, service_id=%d, method_id=%d, payload_len=%d\n",msg->header.id, msg->header.type, msg->header.service_id, msg->header.method_id, msg->header.payload_len);// 反序列化载荷数据ConfigMessage config_msg;bool status = config_message_decode(&config_msg, msg->payload, msg->header.payload_len);if (!status) {printf("Failed to decode ConfigMessage\n");return;}// 处理载荷数据(这里简单地打印数据)printf("ConfigMessage: service_id=%d, method_id=%d, config_value=%d\n",config_msg.service_id, config_msg.method_id, config_msg.config_value);// 构造响应消息ConfigMessage response_msg;response_msg.service_id = config_msg.service_id;response_msg.method_id = config_msg.method_id;response_msg.config_value = config_msg.config_value + 1; // 示例处理// 序列化响应消息uint8_t response_buffer[RPMSG_BUFFER_SIZE - sizeof(RPMsgHeader)];size_t response_len = config_message_encode(&response_msg, response_buffer, sizeof(response_buffer));if (response_len == 0) {printf("Failed to encode ConfigMessage response\n");return;}// 修改消息头为响应msg->header.type = RPMSG_TYPE_RESPONSE;msg->header.status = 0; // 成功状态码msg->header.payload_len = response_len;// 复制响应载荷数据memcpy(msg->payload, response_buffer, response_len);// 发送响应消息rpmsg_lite_send(rpmsg_inst, rpmsg_ept->ept_addr, src, (char *)msg, sizeof(RPMsgHeader) + response_len, RL_BLOCK);
}int main(void) {// 初始化硬件平台rpmsg_env_init(NULL);// 初始化RPMsg Lite实例rpmsg_inst = rpmsg_lite_remote_init(rpmsg_env_get_memory_desc(RPMSG_LITE_LINK_ID, RL_BLOCK));if (!rpmsg_inst) {printf("Failed to initialize RPMsg Lite instance\n");return -1;}// 创建RPMsg Lite端点rpmsg_ept = rpmsg_lite_create_ept(rpmsg_inst, RPMSG_EPT_ADDR, rpmsg_endpoint_cb, NULL, RL_BLOCK);if (!rpmsg_ept) {printf("Failed to create RPMsg endpoint\n");return -1;}printf("RPMsg echo demo started\n");// 主循环等待消息while (1) {// 等待中断表示有消息到达rpmsg_env_sleep_ms(100);}// 清理RPMsg Lite实例rpmsg_lite_destroy_ept(rpmsg_inst, rpmsg_ept);rpmsg_lite_release(rpmsg_inst);rpmsg_env_deinit();return 0;
}
结语

通过使用OpenAMP框架中的RPMsg机制,可以实现高效的多核间通信,为嵌入式系统提供强大的支持。RPMsg利用共享内存直接进行数据传输,减少了数据复制和硬件中断的开销,提高了通信效率。本设计仿RPC通信机制,设计交互协议,并结合nanopb库进行序列化和反序列化,可以简化交互协议,处理更复杂的消息结构,进一步提高系统的灵活性和可靠性。

本文展示了如何在Linux核和MCU核上使用RPMsg进行通信,并提供了详细的代码示例。希望这些内容能够帮助开发者更好地理解和实现高效的多核间通信机制。希望本文能够为嵌入式系统开发者提供有益的参考。

其他资源

https://doc.embedfire.com/linux/stm32mp1/driver/zh/latest/linux_driver/framework_ipcc.html
http://x.pinpaidadao.com/m/?arnoldlu/p/18296424
https://tronlong.com/Article/show/353.html
https://blog.csdn.net/gitblog_00203/article/details/141210851

相关文章:

  • 二极管钳位电路——Multisim电路仿真
  • 《Windows系统Java环境安装指南:从JDK17下载到环境变量配置》
  • leetcode 143. 重排链表
  • 解答UnityShader学习过程中的一些疑惑(持续更新中)
  • 在 Spring Boot 中实现异常处理的全面指南
  • Callable Future 实现多线程按照顺序上传文件
  • 知识付费平台推荐及对比介绍
  • 更新日期自动填充
  • ESG跨境电商怎么样?esg跨境电商有哪些功用?
  • 【动手学大模型开发】使用 LLM API:ChatGPT
  • 使用Curl进行本地MinIO的操作
  • 30天通过软考高项-第六天
  • MTKAndroid12-13-开机应用自启功能实现
  • Vue 对话框出现时,为什么滚动鼠标还是能滚动底层元素
  • Spring系列四:AOP切面编程第三部分
  • 软件工程(一):黑盒测试与白盒测试
  • 如何在WordPress网站中设置双重验证,提升安全性
  • 打火机检测数据集VOC+YOLO格式925张1类别
  • 案例篇:如何用tcpdump和Wireshark识别潜在威胁
  • Finish技术生态计划: FinishRpc
  • 直播电商行业代表呼吁:携手并肩伸出援手助力外贸企业攻坚克难
  • 暗蓝评《性别打结》丨拆解性别之结需要几步?
  • 葡萄牙总理:未来几小时内将全面恢复供电
  • AI观察|算力饥渴与泡沫
  • 央行回应美债波动:单一市场、单一资产变动对我国外储影响总体有限
  • 从 “沪惠保” 到 “沪骑保”看普惠保险的 “上海样式”