当前位置: 首页 > news >正文

Spring Cloud Stream喂饭级教程【搜集全网资料整理】

文章较长,建议收藏+关注,随时查看

Spring Cloud Stream 简介

Spring Cloud Stream 是 Spring 提供的一个框架,用于构建与共享消息系统相连接的高度可伸缩的事件驱动微服务,它建立在 Spring 已有的成熟组件和最佳实践之上,提供了灵活的编程模型,并支持持久化的发布/订阅、消费组以及消息分区等核心消息机制。简单来说,Spring Cloud Stream 屏蔽了不同消息中间件的差异,让应用开发者无需关心底层使用的是 RabbitMQ 还是 Kafka——这类似于 Spring Data 对持久层的抽象,使开发者不必关心底层使用何种数据库。
在这里插入图片描述

Spring Cloud Stream 通过 Binder(绑定器) 抽象来适配具体的消息代理,中间件的实现细节对应用代码是透明的。应用程序只需定义输入通道输出通道,并与 Spring Cloud Stream 提供的 Binder 对象交互;Binder 会负责将这些通道绑定到实际的消息中间件上。开发者因此可以专注于消息处理的业务逻辑,而无需编写针对特定消息系统的集成代码。

核心概念: Spring Cloud Stream 引入了一些关键概念来实现上述能力:

  • Binder(绑定器): 框架提供的消息适配器组件,用于连接具体的消息中间件并屏蔽不同中间件的差异 。Binder 作为中间层将应用程序的通道与消息代理绑定在一起,实现应用与消息系统细节的解耦。常见的 Binder 实现有 Kafka Binder、RabbitMQ Binder 等,对应用而言切换 Binder 如同切换数据库驱动一样方便。

  • Channel(通道): 通道是消息队列或主题的一种抽象。在消息通信系统中,通道作为消息传递的管道,负责消息的暂存和转发。应用程序通过 Channel 来发送或接收消息,而具体的通道会在 Binder 的配置下对应到消息中间件上的某个队列或话题。

  • Binding(绑定): Binding 表示通道与消息代理中实际主题/队列之间的绑定关系。通过在配置中定义 binding,Spring Cloud Stream 会将应用定义的通道连接到外部消息系统的目的地上。可以简单理解为:Binder 创建了特定通道和外部MQ的绑定 (Binding),从而将生产者消费者两端连接起来。

  • Producer(生产者): 发送消息的一方应用或组件。在 Spring Cloud Stream 中,生产者应用通常会定义一个输出通道,将消息发布到 Binder,从而发送到目标消息队列/主题上。底层 Binder 会负责将消息路由到外部中间件的对应目的地。 输出通道(Output) 即对应一个生产者。

  • Consumer(消费者): 接收并处理消息的一方应用或组件。消费者应用定义输入通道,从 Binder 获取来自消息中间件的消息。Binder 负责将外部队列/主题的消息推送到应用的输入通道上。输入通道(Input)即对应一个消费者。多个消费者还可以通过消费组共享同一主题实现负载均衡(每条消息只被同组一个实例消费),但这是深入特性,在此不展开。

上述抽象使得我们可以使用统一的方式编写消息收发逻辑,而让 Spring Cloud Stream 框架去处理与 Kafka、RabbitMQ 等消息系统的集成细节。例如,一个生产者应用可以不变地发送消息,Binder 可以在不同环境下将其绑定到 Kafka 主题或 RabbitMQ Exchange 上 ;同样地,消费者只需声明绑定即可从相应主题/队列接收消息,而不用关心使用哪个消息系统。

物联网场景下的整体架构

物联网(IoT)系统通常由众多设备产生海量数据,使用消息中间件解耦数据接入和后端处理是常见方案。下面我们以 RabbitMQ 和 Kafka 作为消息中间件,绘制了一个物联网场景下的数据流动和系统分层架构示意图。如图1所示,架构大致分为设备层、消息接入层、数据处理层以及存储和分析层:
在这里插入图片描述

图1:物联网场景下基于 Spring Cloud Stream 的消息流架构示意。各层次包括:设备数据采集层(传感器等IoT设备),消息接入层(消息中间件,如 RabbitMQ/Kafka),数据处理层(微服务集群),以及存储与分析层(数据库、大数据分析)。箭头表示数据的流动方向。

  • 设备数据采集层:这一层由各种物联网设备、传感器组成,它们负责实时采集设备状态和环境数据(如温度传感、设备运行状态等)。设备通常通过轻量级协议将数据发送到边缘网关或直接发送到消息系统的接入端点。例如,传感器可以通过 HTTP、MQTT 等协议将数据上报到服务器或网关。

  • 消息接入层(消息中间件):该层由消息队列/流处理中间件组成,如 RabbitMQ Exchange 或 Kafka Topic 等。设备发送的数据先进入消息中间件进行缓冲和分发。消息中间件充当数据接入层的核心,在物联网场景下接收海量并发的设备数据并可靠地转发。在Spring Cloud Stream架构中,Binder 会将设备数据上报的入口(比如 HTTP 接口或 MQTT 网关)与中间件的主题/队列相连接,例如将传感器上报的数据统一写入名为 raw-sensor-data 的主题 。这一层确保了数据传输的解耦和弹性缓冲,当后端处理服务暂时不可用时,消息也能暂存于中间件中等待处理。

  • 数据处理层(微服务群):后端有多个微服务实例共同构成数据处理层,它们通过 Spring Cloud Stream 从消息中间件订阅消息进行异步处理。每个微服务关注不同的业务逻辑:例如有的服务计算传感数据的滑动窗口平均值,有的服务对原始数据做清洗和入库。由于采用发布-订阅模型,多个服务可以独立消费同一数据流而互不影响——比如一个微服务计算温度平均值,另一个微服务将原始传感数据写入时序数据库/HDFS 分布式存储。借助 Spring Cloud Stream,开发者只需声明绑定(Binding)将微服务的输入通道指向相应主题即可让多个应用并行地收到同一批设备数据。例如,我们可以新添一个微服务订阅平均值流来计算最高温度排名(TopN),或另一个微服务订阅平均值流以进行故障检测和报警 。

  • 存储与分析层:数据处理后的结果以及部分原始数据会进入存储与分析层,包括数据库和大数据平台。在物联网场景中,这一层可能包含关系型数据库、时序数据库(如 InfluxDB)、分布式文件系统/数据湖(如 HDFS)以及实时分析平台等。比如,上述数据处理层中的“Ingest HDFS”服务将原始传感数据写入了 HDFS 集群进行离线存储;同时,计算出的统计结果(如平均值、TopN结果)可能被存储到时序数据库用于监控展示,或进入流式计算引擎做进一步分析。在存储与分析层,运维人员和业务系统可以对历史数据进行查询,可视化监控设备状态,或者基于大数据分析结果优化设备运营。

通过上述分层架构,物联网系统实现了从设备到云端的完整数据通路:设备产生的数据经由消息中间件汇聚,后端微服务通过 Spring Cloud Stream 高效、解耦地处理数据,再将有价值的信息存储和呈现出来。各层之间松耦合、易扩展,方便根据需要横向扩展处理节点或添加新的业务功能。

Spring Cloud Stream通信方式

Stream中的消息通信方式遵循了发布-订阅模式。

Binder:很方便的连接中间件,屏蔽差异
Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

在这里插入图片描述

基于架构的典型物联网应用场景

有了上述基础架构,我们可以灵活构建各种物联网应用功能。以下是几个典型的应用场景:

  • 设备状态监控:实时采集设备运行状态参数(如温度、电压、位置等),通过消息流传输到后端监控服务进行分析。一旦检测到异常指标,系统可以及时标记并通知相关人员。借助 Spring Cloud Stream,监控服务可以方便地订阅设备数据流,对不同类型设备的数据进行统一处理和聚合。

  • 远程控制:后端系统下发控制指令给前端设备,实现对设备的远程操控。例如运维人员在平台上触发设备重启或参数修改命令,命令通过消息中间件路由到目标设备的消息通道,设备上的客户端收到指令后执行相应操作。这种命令下发也可通过 Spring Cloud Stream 将指令作为消息发布到特定主题,由设备订阅执行,实现可靠的异步控制。

  • 报警通知:当设备数据触发预定义的告警条件时(如温度超过阈值、设备离线等),后端分析服务通过消息流产生一条告警消息。告警消息可被通知服务消费,进一步触发短信、邮件或手机推送通知相关负责人。整个过程通过消息解耦,实现了告警检测和通知渠道的独立扩展。例如故障检测微服务发现异常后,发布一条告警消息到alarm主题,由通知微服务订阅发送告警,不会影响其他数据处理流程。

  • 实时数据采集与分析:海量传感器数据经由消息中间件汇聚后,实时分析服务对数据流进行计算,如统计当前系统各设备的指标均值、最大值,或者执行复杂事件处理(CEP)检测特定模式。一系列流处理结果可进一步通过消息通道传播到实时仪表盘进行展示,或者存入数据库供后续查询。通过 Spring Cloud Stream 的分区和消费组特性,能方便地将分析负载分散到多个实例上,提升吞吐量,并确保消息在多个实例间不重复消费。

以上场景只是冰山一角,实际物联网项目中还可能包括设备日志收集、固件升级广播、边缘计算协调等更多应用。但无论哪种场景,使用消息驱动的架构都能增强系统的解耦和弹性,而 Spring Cloud Stream 正是让我们更容易地实现这样的架构利器。

手把手教学:如何使用 Spring Cloud Stream

下面我们通过一个简单示例,演示如何基于 Spring Boot 2.x 使用 Spring Cloud Stream 进行消息驱动开发。我们将以 RabbitMQ 和 Kafka 为消息中间件,分别展示项目创建、依赖引入、配置以及编写消息生产者和消费者的基本步骤。

  1. 创建 Spring Boot 项目:使用熟悉的方式创建一个 Spring Boot 2.x 项目。通过 Spring Initializr 初始化项目,勾选需要的依赖(后续我们会添加 Spring Cloud Stream 相关依赖),选择合适的 GroupId/ArtifactId。对于演示目的,可以建立两个独立的微服务项目:一个作为消息生产者,另一个作为消息消费者(当然也可以在同一项目内演示收发,但实际场景下一般会分开部署不同的服务)。

  2. 添加 Spring Cloud Stream 依赖:在新建的 Spring Boot 项目中,引入 Spring Cloud Stream 的起步依赖和所需的 Binder 实现。以 Maven 为例,在 pom.xml 中添加 RabbitMQ 和 Kafka 的 binder 依赖(根据需要可以只添加其一):

    <!-- Spring Cloud Stream 核心依赖(会通过下面的binder启动器间接引入) -->
    <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
    
  3. 配置 RabbitMQ 与 Kafka:在应用的配置文件中进行消息中间件的基本连接配置,以及 Spring Cloud Stream 的绑定设置。下面分别给出 RabbitMQ 和 Kafka 的示例配置:

    RabbitMQ 配置:在 application.yml 中配置 RabbitMQ 的连接信息和 Stream Bindings,例如:

    spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestcloud:stream:binders:rabbit:             # 定义名为 "rabbit" 的 Binder,对应 RabbitMQtype: rabbitenvironment:spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings:output:             # 输出通道绑定配置(生产者用)destination: myTopic    # 目标 Exchange/Topic 名称binder: rabbitinput:              # 输入通道绑定配置(消费者用)destination: myTopic    # 同样的目标名称,用于接收上面发送的消息binder: rabbitgroup: myGroup          # 消费组名,确保多个消费者实例共享消息
    

    上述配置指定应用使用 RabbitMQ 作为消息代理(Binder 名称为 rabbit),目标主题/Exchange 为 myTopic。生产者的输出通道和消费者的输入通道都绑定到了该目标。其中 group: myGroup 表示消费者属于名为 “myGroup” 的消费组,用于实现竞争消费(同组内多实例不会重复消费消息)。如果只有一个消费者实例或不需要消费分组,也可以不配置 group 属性。

    Kafka 配置:如果切换使用 Kafka,只需要改用 Kafka Binder 并配置 Kafka 集群地址,例如:

    spring:kafka:bootstrap-servers: localhost:9092cloud:stream:binders:kafka:              # 定义名为 "kafka" 的 Binder,对应 Kafkatype: kafkaenvironment:spring:kafka:bootstrap-servers: localhost:9092bindings:output:destination: myTopic      # 目标 Kafka topic 名称binder: kafkainput:destination: myTopicbinder: kafkagroup: myGroup
    

    可以看到,Kafka 配置主要区别在于使用 spring.kafka.bootstrap-servers 指定 Kafka 集群地址,并将 Binder 类型设为 kafka。其余的 bindings 配置含义与 RabbitMQ 类似。借助 Spring Cloud Stream,我们的应用代码无需改变,只通过配置就能切换底层消息系统。

    提示:实际项目中,可将生产者和消费者的配置拆分到不同的应用里——例如生产者应用只配置 output 部分,消费者应用只配置 input 部分,各自引用正确的 Binder。这种按角色拆分有助于避免不必要的Binder依赖。为简单起见,上述配置同时展示了输入和输出绑定。

  4. 编写消息生产者:在生产者应用中,创建一个服务用于发送消息。Spring Cloud Stream 提供了@EnableBinding注解来绑定接口或内置通道。例如,我们可以使用框架内置的 Source 接口(其定义了一个名为 output 的通道)作为绑定,将该通道连接到前面配置的 myTopic。编写代码如下:

    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;@EnableBinding(Source.class)  // 绑定输出通道
    @Service
    public class MyProducer {@Autowiredprivate MessageChannel output;  // 注入输出消息通道(对应 Source.OUTPUT)public void sendMessage(String data) {// 构建并发送消息output.send(MessageBuilder.withPayload(data).build());System.out.println("发送消息: " + data);}
    }
    

    在上面的代码中,我们通过 @EnableBinding(Source.class) 激活了 Spring Cloud Stream 的输出通道绑定。框架会根据配置将 output 通道连接到消息中间件上名为 myTopic 的目的地。我们使用 MessageChannel output 来发送消息,MessageBuilder.withPayload(data).build() 用于创建消息载荷。在实际应用中,可以通过 Controller 层调用 sendMessage 方法来触发发送(例如提供一个/send的HTTP接口,接受请求后调用发送逻辑)。发送时,Binder 会确保消息进入 RabbitMQ Exchange 或 Kafka Topic 中。

  5. 编写消息消费者:在消费者应用中,编写对应的代码来接收来自 myTopic 的消息。可以使用 Spring Cloud Stream 内置的 Sink 接口(其定义了一个名为 input 的通道)来绑定输入通道。通过 @StreamListener 注解监听该通道的消息并处理:

    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.stereotype.Component;@EnableBinding(Sink.class)  // 绑定输入通道
    @Component
    public class MyConsumer {@StreamListener(Sink.INPUT)public void handleMessage(String message) {System.out.println("收到消息: " + message);// 在这里可以添加对消息的业务处理逻辑}
    }
    

    上述代码通过 @EnableBinding(Sink.class) 激活了输入通道绑定。框架会将 input 通道订阅到配置的 myTopic 队列/主题上。当有消息到达时,带有 @StreamListener(Sink.INPUT) 注解的方法会被触发,收到消息内容并进行处理。在我们的示例中,仅简单地打印出了消息。实际使用中可以将消息反序列化为业务对象并执行进一步逻辑。

    值得一提的是,如果多个消费者应用实例使用相同的 group 名(如前面配置了 group=myGroup),则它们构成一个消费组,同一条消息只会被组内一个实例处理。这对于横向扩展消费者以处理高吞吐消息非常有用。而不同消费组的应用对同一主题是发布订阅关系,每组都会各自收到完整的消息流。

  6. 启动并测试消息流转:准备好 RabbitMQ 或 Kafka 服务并确保应用能够连接后,我们就可以启动生产者和消费者应用进行测试。先启动消费者应用(确保其application.yml中含有正确的输入绑定配置),再启动生产者应用。触发生产者发送消息(例如调用之前提到的/send接口,或者在 CommandLineRunner 中调用 sendMessage 方法发送测试数据)。

    如果一切配置正确,生产者应用会将消息发布到 RabbitMQ/Kafka,而消费者应用随即能收到该消息。我们可以在消费者的控制台看到打印的日志。例如,假设发送了一条内容为 Hello IoT 的消息,消费者应用的输出可能类似:

    发送消息: Hello IoT   # 生产者控制台打印
    收到消息: Hello IoT   # 消费者控制台打印
    

    其中,第一行是在生产者应用中打印的发送确认,第二行是在消费者应用中打印的接收结果。通过这种方式,我们验证了使用 Spring Cloud Stream 实现的消息从生产者到中间件再到消费者的完整流转过程。

恭喜,你已经使用 Spring Cloud Stream 打通了一个简单消息驱动案例!在真实场景中,你可以根据业务需要扩展这个模型:例如,更改 destination 名称来划分不同类型消息的通道;引入自定义的消息序列化/反序列化;利用 Spring Cloud Stream 提供的其他高级特性(如动态绑定路由、分区消费等)来应对复杂需求。 由于 Spring Cloud Stream 对 RabbitMQ、Kafka 等的良好抽象,你的应用代码几乎无需因为更换消息中间件而做改动,大大提高了开发效率和系统弹性。

总结

本文介绍了 Spring Cloud Stream 在物联网分布式消息系统中的应用。从框架的核心概念(Binder、Channel、Binding、Producer、Consumer)入手,我们阐述了它如何简化消息驱动微服务的开发 。通过架构图,我们了解了在 IoT 场景下设备数据经由消息中间件到微服务处理再到存储分析的全过程。在实际应用中,Spring Cloud Stream 为典型的物联网需求(监控、控制、告警、实时分析)提供了简洁高效的解决方案。最后的教程部分则手把手演示了如何基于 Spring Boot 快速构建一个使用 RabbitMQ/Kafka 的消息收发应用。

对于有一定 Java 基础的开发者而言,Spring Cloud Stream 上手相对容易,却能大幅降低处理异步消息的样板代码,提升系统的可扩展性和健壮性。当你的微服务需要与消息中间件打交道时,不妨尝试引入 Spring Cloud Stream,用更少的代码拥抱更灵活的架构设计!

参考资料: Spring Cloud Stream 官方文档 、CSDN 博客等。

相关文章:

  • 【Fifty Project - D18】
  • 【Flutter】Unity 三端封装方案:Android / iOS / Web
  • NGINX `ngx_http_core_module` 深度解读与实战指南
  • 晶晨S905L/LB芯片_安卓11.0_已适配移动遥控_支持外置网卡_支持IPV6_通刷线刷包
  • 通过ThreadLocal存储登录用户信息
  • rt-linux下的D状态的堆栈抓取及TASK_RTLOCK_WAIT状态
  • 使用 OpenCV 和 dlib 进行人脸检测
  • ElasticSearch从入门到精通-覆盖DSL操作和Java实战
  • Flutter 学习之旅 之 flutter 有时候部分手机【TextField】无法唤起【输入法软键盘】的一些简单整理
  • 【玩转 JS 函数式编程_016】DIY 实战:巧用延续传递风格(CPS)重构倒计时特效逻辑
  • 【HarmonyOS 5】鸿蒙检测系统完整性
  • 解决 Elasticsearch 启动错误:failed to obtain node locks
  • OpenSPG/KAG v0.7.1 发布, 针对新版若干优化和BUGFIX
  • DeepSeek智能时空数据分析(五):基于区域人口数量绘制地图散点-大模型搜集数据NL2SQL加工数据
  • 新能源汽车运动控制器核心芯片选型与优化:MCU、DCDC与CANFD协同设计
  • STM32 定时器TIM
  • bitset
  • risc-V学习日记(4):RV32I指令集
  • spark 课程总结
  • ubuntu安装git及使用(本地git)
  • 春暖花开,为何皮肤却闹起了小情绪?
  • 央行副行长:研究建立民营中小企业增信制度,破解民营中小企业信用不足等融资制约
  • 戴昕谈隐私、数据、声誉与法律现实主义
  • 地下管道密布成难题,道路修整如何破局?
  • 日均新开三家“首店”,上海的“首发经济”密码是什么?
  • 从“高阶智驾”到“辅助驾驶”,上海车展上的“智驾”宣发变调