当前位置: 首页 > news >正文

Spring 与 ActiveMQ 的深度集成实践(二)

三、Spring 与 ActiveMQ 的集成步骤

3.1 配置 ActiveMQ 连接

在 Spring Boot 项目中,我们可以在application.properties或application.yml配置文件中设置 ActiveMQ Broker 的连接信息 。以application.properties为例,添加以下配置:

 

# ActiveMQ Broker的URL,tcp协议,本地地址,默认端口61616

spring.activemq.broker-url=tcp://localhost:61616

# 连接ActiveMQ的用户名,默认为admin

spring.activemq.user=admin

# 连接ActiveMQ的密码,默认为admin

spring.activemq.password=admin

# 是否使用内存模式,开发测试时可设为true,生产环境一般设为false

spring.activemq.in-memory=false

# 连接池相关配置,是否启用连接池,设为false表示不启用

spring.activemq.pool.enabled=false

在这些配置中,spring.activemq.broker-url指定了 ActiveMQ 服务器的地址和端口,就像我们要去一个地方,需要知道它的具体位置一样,这里的 URL 就是 ActiveMQ 的 “位置” 。spring.activemq.user和spring.activemq.password用于身份验证,确保只有授权的客户端才能连接到 ActiveMQ 。spring.activemq.in-memory设置是否使用内存模式,内存模式下消息存储在内存中,性能较高,但在服务器重启时消息会丢失,适用于开发和测试环境;而在生产环境中,为了保证消息的可靠性,一般不使用内存模式 。spring.activemq.pool.enabled则决定是否启用连接池,连接池可以复用连接,提高系统性能和资源利用率,如果不启用,每次与 ActiveMQ 建立连接都需要重新创建,会消耗较多的资源和时间 。

3.2 自定义配置类

为了更灵活地配置 Spring 与 ActiveMQ 的集成,我们可以创建一个 Java 配置类,扩展默认配置 。在src/main/java目录下创建一个配置类,例如ActiveMQConfig.java:

 

import org.apache.activemq.ActiveMQConnectionFactory;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.jms.annotation.EnableJms;

import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

import org.springframework.jms.core.JmsTemplate;

import javax.jms.ConnectionFactory;

@Configuration

@EnableJms // 启用JMS功能

public class ActiveMQConfig {

@Bean

public ConnectionFactory connectionFactory() {

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

connectionFactory.setBrokerURL("tcp://localhost:61616");

connectionFactory.setUserName("admin");

connectionFactory.setPassword("admin");

return connectionFactory;

}

@Bean

public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {

JmsTemplate jmsTemplate = new JmsTemplate();

jmsTemplate.setConnectionFactory(connectionFactory);

// 设置消息发送失败时是否重试,这里设为true表示重试

jmsTemplate.setDeliveryPersistent(true);

// 设置重试次数,这里设为3次

jmsTemplate.setExplicitQosEnabled(true);

jmsTemplate.setDeliveryMode(2);

jmsTemplate.setPriority(4);

jmsTemplate.setTimeToLive(10000);

return jmsTemplate;

}

@Bean

public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {

DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

factory.setConnectionFactory(connectionFactory);

// 设置并发消费者数量,这里设为1 - 10个

factory.setConcurrency("1-10");

// 设置消息确认模式,这里设为自动确认

factory.setSessionAcknowledgeMode(1);

return factory;

}

}

在这个配置类中,connectionFactory方法创建了一个ActiveMQConnectionFactory实例,用于创建与 ActiveMQ 的连接 。它通过设置brokerURL、userName和password来指定连接信息,与在application.properties中配置的作用相同,只是这里是通过代码的方式进行配置,更加灵活,方便在不同环境下进行切换 。

jmsTemplate方法创建了一个JmsTemplate实例,它是 Spring 提供的用于发送和接收 JMS 消息的工具类 。通过设置connectionFactory,将其与前面创建的连接工厂关联起来 。setDeliveryPersistent(true)表示设置消息为持久化,即消息会被存储到磁盘上,即使 ActiveMQ 服务器重启,消息也不会丢失,这在生产环境中非常重要,能够保证消息的可靠性 。setExplicitQosEnabled(true)开启了显式的服务质量(QoS)设置,允许我们进一步设置消息的投递模式、优先级和存活时间等 。setDeliveryMode(2)设置消息的投递模式为持久化模式(值为 2),setPriority(4)设置消息的优先级为 4(优先级范围是 0 - 9,数值越大优先级越高),setTimeToLive(10000)设置消息的存活时间为 10000 毫秒,即如果在 10 秒内消息没有被消费,就会被自动删除 。

jmsListenerContainerFactory方法创建了一个DefaultJmsListenerContainerFactory实例,用于创建消息监听器容器 。它同样设置了connectionFactory,以关联到 ActiveMQ 的连接 。setConcurrency("1-10")设置了并发消费者的数量范围为 1 到 10 个,这样在处理消息时,可以根据实际的负载情况动态调整消费者的数量,提高系统的处理能力 。setSessionAcknowledgeMode(1)设置了消息确认模式为自动确认,即当消费者成功接收消息后,会自动向 ActiveMQ 服务器发送确认消息,告知服务器该消息已被处理,若设为其他值,如 2 表示手动确认,则需要在代码中显式地调用确认方法 。

3.3 创建消息生产者

通过注入JmsTemplate,我们可以创建消息生产者类 。在src/main/java目录下创建一个生产者类,例如MessageProducer.java:

 

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.jms.core.JmsTemplate;

import org.springframework.stereotype.Component;

import javax.jms.Queue;

@Component

public class MessageProducer {

@Autowired

private JmsTemplate jmsTemplate;

@Autowired

private Queue queue;

public void send(String message) {

jmsTemplate.convertAndSend(queue, message);

System.out.println("发送消息: " + message);

}

}

在这个生产者类中,首先通过@Autowired注解注入了JmsTemplate和Queue 。JmsTemplate是 Spring 提供的用于发送 JMS 消息的核心类,它封装了与 ActiveMQ 交互的细节,使得发送消息变得非常简单 。Queue表示消息队列,它定义了消息发送的目的地 。

send方法实现了发送消息的逻辑,通过调用jmsTemplate.convertAndSend(queue, message),将消息message发送到指定的队列queue中 。convertAndSend方法是JmsTemplate中最常用的发送消息方法,它会自动将消息转换为合适的格式,并发送到指定的目的地 。在发送消息后,打印出 “发送消息:” 加上具体的消息内容,方便我们在控制台查看消息的发送情况 。

3.4 创建消息消费者

使用@JmsListener注解可以轻松创建消息消费者类 。在src/main/java目录下创建一个消费者类,例如MessageConsumer.java:

 

import org.springframework.jms.annotation.JmsListener;

import org.springframework.stereotype.Component;

@Component

public class MessageConsumer {

@JmsListener(destination = "yourQueueName")

public void receive(String message) {

System.out.println("接收消息: " + message);

// 这里可以编写具体的消息处理逻辑,比如调用业务方法进行数据处理

}

}

在这个消费者类中,@JmsListener注解标识了receive方法为一个消息监听器,它监听名为yourQueueName的队列(需要将yourQueueName替换为实际的队列名称) 。当有消息到达该队列时,receive方法会被自动调用,并且消息内容会作为参数传递给该方法 。在receive方法中,首先打印出 “接收消息:” 加上接收到的消息内容,以便在控制台查看消息的接收情况 。然后,可以在方法内部编写具体的消息处理逻辑,例如调用业务层的方法对消息进行处理,可能是更新数据库、调用其他服务接口等操作 。

四、高级配置与优化

4.1 自定义消息转换器

在实际的项目开发中,ActiveMQ 自带的消息转换器可能无法满足复杂的业务需求,这时就需要我们创建自定义的消息转换器,以实现消息在发送和接收时的格式转换。

以将 Java 对象转换为 JSON 格式的消息为例,我们可以按照以下步骤创建自定义消息转换器。首先,定义一个实现了MessageConverter接口的类,例如JsonMessageConverter:

 

import com.fasterxml.jackson.databind.ObjectMapper;

import javax.jms.*;

import org.springframework.jms.support.converter.MessageConversionException;

import org.springframework.jms.support.converter.MessageConverter;

public class JsonMessageConverter implements MessageConverter {

private final ObjectMapper objectMapper = new ObjectMapper();

@Override

public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {

try {

String json = objectMapper.writeValueAsString(object);

return session.createTextMessage(json);

} catch (Exception e) {

throw new JMSException("Failed to convert object to JSON message: " + e.getMessage());

}

}

@Override

public Object fromMessage(Message message) throws JMSException, MessageConversionException {

if (message instanceof TextMessage) {

TextMessage textMessage = (TextMessage) message;

try {

return objectMapper.readValue(textMessage.getText(), Object.class);

} catch (Exception e) {

throw new JMSException("Failed to convert JSON message to object: " + e.getMessage());

}

}

throw new JMSException("Unsupported message type for JSON conversion");

}

}

在这个类中,toMessage方法负责将 Java 对象转换为 JSON 格式的TextMessage。它首先使用ObjectMapper将对象转换为 JSON 字符串,然后通过session.createTextMessage创建消息 。如果转换过程中出现异常,会抛出JMSException并附带错误信息 。

fromMessage方法则是将接收到的TextMessage中的 JSON 内容转换回 Java 对象 。它先判断接收到的消息是否为TextMessage,如果是,则读取消息内容并使用ObjectMapper将其转换为 Java 对象 。同样,如果转换失败,也会抛出JMSException 。

然后,在配置类中注册这个自定义的消息转换器。在之前创建的ActiveMQConfig.java类中,修改jmsListenerContainerFactory方法,添加消息转换器的设置:

 

@Bean

public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {

DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

factory.setConnectionFactory(connectionFactory);

factory.setConcurrency("1-10");

factory.setSessionAcknowledgeMode(1);

// 设置自定义的消息转换器

factory.setMessageConverter(new JsonMessageConverter());

return factory;

}

通过factory.setMessageConverter(new JsonMessageConverter()),将自定义的JsonMessageConverter设置为消息监听器容器的消息转换器 。这样,在消息接收时,就会使用这个自定义的转换器将消息从 JSON 格式转换为 Java 对象 。

在消息发送时,也需要使用这个自定义的消息转换器。可以在MessageProducer类中进行设置,修改send方法:

 

public class MessageProducer {

@Autowired

private JmsTemplate jmsTemplate;

@Autowired

private Queue queue;

public void send(Object message) {

jmsTemplate.setMessageConverter(new JsonMessageConverter());

jmsTemplate.convertAndSend(queue, message);

System.out.println("发送消息: " + message);

}

}

在send方法中,通过jmsTemplate.setMessageConverter(new JsonMessageConverter())设置了自定义的消息转换器,然后再调用convertAndSend方法发送消息 。这样,发送的 Java 对象会先被转换为 JSON 格式的消息,再发送到 ActiveMQ 中 。

4.2 多线程消费者

在高并发的业务场景下,单线程的消息消费者可能无法满足消息处理的速度需求,这时配置多线程消费者就显得尤为重要。通过配置多线程消费者,可以提高消息处理的并发能力,加快消息的处理速度。

在 Spring 与 ActiveMQ 的集成中,我们可以通过配置DefaultJmsListenerContainerFactory来实现多线程消费者 。在ActiveMQConfig.java配置类中,jmsListenerContainerFactory方法已经设置了并发消费者数量的范围为 1 到 10 个:

 

@Bean

public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {

DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

factory.setConnectionFactory(connectionFactory);

// 设置并发消费者数量范围为1到10个

factory.setConcurrency("1-10");

factory.setSessionAcknowledgeMode(1);

return factory;

}

这里的setConcurrency("1-10")表示最小并发消费者数量为 1,最大并发消费者数量为 10 。在实际运行时,Spring 会根据消息队列中的消息数量和系统的负载情况,动态地调整消费者线程的数量,在 1 到 10 个线程之间进行伸缩 。

当消息队列中积压的消息较多时,系统负载较低,Spring 会创建更多的消费者线程(最多到 10 个)来处理消息,以加快消息的处理速度;而当消息队列中的消息较少时,系统负载较高,Spring 会减少消费者线程的数量(最少到 1 个),以避免资源的浪费 。

在配置多线程消费者时,还需要注意一些事项。首先,要合理设置并发消费者的数量,不能盲目地设置过大 。如果并发消费者数量过多,可能会导致系统资源耗尽,如 CPU、内存等,反而降低系统的性能 。一般来说,可以根据系统的硬件配置、消息处理的复杂程度以及预估的并发量来综合确定并发消费者的数量 。例如,如果系统的 CPU 核心数为 8,消息处理相对简单,可以将并发消费者数量设置为 16 左右,充分利用 CPU 资源;如果消息处理复杂,占用较多的 CPU 和内存资源,则需要适当减少并发消费者数量 。

其次,要考虑线程安全问题 。由于多个线程同时处理消息,可能会出现线程安全问题,如多个线程同时访问和修改共享资源 。在编写消息处理逻辑时,要确保代码是线程安全的 。可以使用线程安全的集合类,如ConcurrentHashMap、CopyOnWriteArrayList等;对于共享资源的访问,要使用同步机制,如synchronized关键字、Lock接口等 。例如,在消息处理方法中,如果需要更新一个共享的计数器,就需要使用同步机制来保证计数器的更新是线程安全的:

 

@Component

public class MessageConsumer {

private int count = 0;

@JmsListener(destination = "yourQueueName")

public void receive(String message) {

synchronized (this) {

count++;

System.out.println("接收消息: " + message + ", 当前计数: " + count);

}

// 其他消息处理逻辑

}

}

在这个例子中,使用synchronized (this)对count的更新进行了同步,确保在多线程环境下count的更新是正确的 。

4.3 事务处理

在消息发送和接收过程中,事务处理是保证消息一致性和可靠性的关键。通过配置事务,我们可以确保一组消息操作要么全部成功,要么全部失败,避免出现部分操作成功、部分操作失败的情况,从而保证数据的完整性和一致性。

在 Spring 与 ActiveMQ 的集成中,配置事务主要涉及到配置事务管理器和设置事务的传播行为和隔离级别 。首先,配置事务管理器。在ActiveMQConfig.java配置类中添加事务管理器的配置:

 

import org.springframework.jms.connection.JmsTransactionManager;

import org.springframework.transaction.PlatformTransactionManager;

@Bean

public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {

return new JmsTransactionManager(connectionFactory);

}

这里创建了一个JmsTransactionManager实例,并将ConnectionFactory作为参数传入 。JmsTransactionManager是 Spring 提供的用于管理 JMS 事务的事务管理器,它负责协调消息的发送和接收操作,确保它们在同一个事务中进行 。

接下来,设置事务的传播行为和隔离级别 。事务传播行为定义了一个事务方法在被另一个事务方法调用时的行为 。Spring 定义了七种事务传播行为,常用的有以下几种:

  • PROPAGATION_REQUIRED:如果当前存在事务,则加入该事务;如果当前没有事务,则创建一个新的事务 。这是 Spring 的默认事务传播行为,适用于大多数业务场景 。例如,在一个订单处理系统中,订单的创建和库存的扣减操作可能需要在同一个事务中进行,以保证数据的一致性 。如果订单创建成功但库存扣减失败,整个事务会回滚,避免出现订单已创建但库存未扣减的情况 。
  • PROPAGATION_REQUIRES_NEW:总是创建一个新的事务 。如果当前存在事务,则将当前事务挂起,直到新事务完成 。这种传播行为适用于一些需要独立事务的场景,如记录日志、发送通知等操作,即使主事务回滚,这些操作也不会受到影响 。例如,在一个电商系统中,当用户下单后,除了处理订单相关的业务逻辑外,还需要发送一封邮件通知用户订单已提交 。发送邮件的操作可以使用PROPAGATION_REQUIRES_NEW传播行为,即使订单处理事务回滚,邮件也会正常发送 。
  • PROPAGATION_SUPPORTS:如果当前存在事务,则加入该事务;如果当前没有事务,则以非事务方式执行 。这种传播行为适用于一些对事务要求不严格的操作,如查询操作,它可以在有事务的环境中执行,也可以在没有事务的环境中执行 。

事务隔离级别定义了一个事务对其他事务的可见性和影响程度 。Spring 定义了五种事务隔离级别,常用的有以下几种:

  • ISOLATION_DEFAULT:使用数据库默认的事务隔离级别 。不同的数据库有不同的默认隔离级别,如 MySQL 的默认隔离级别是REPEATABLE_READ,Oracle 的默认隔离级别是READ_COMMITTED 。
  • ISOLATION_READ_COMMITTED:一个事务只能读取已经提交的数据 。这种隔离级别可以避免脏读问题,但可能会出现不可重复读和幻读问题 。例如,在一个银行转账系统中,当一个事务读取账户余额后,另一个事务修改并提交了账户余额,第一个事务再次读取时,可能会得到不同的余额值,这就是不可重复读问题 。
  • ISOLATION_REPEATABLE_READ:在同一个事务中,多次读取相同的数据结果是一致的 。这种隔离级别可以避免脏读和不可重复读问题,但可能会出现幻读问题 。例如,在一个电商系统中,当一个事务查询商品列表时,另一个事务插入了一条新的商品记录,第一个事务再次查询时,可能会看到新插入的商品,这就是幻读问题 。
  • ISOLATION_SERIALIZABLE:最高的事务隔离级别,它通过锁表的方式,保证所有事务串行执行,避免了脏读、不可重复读和幻读问题 。但这种隔离级别会严重影响系统的并发性能,因为所有事务都需要排队执行,适用于对数据一致性要求极高的场景,如金融交易系统 。

在实际应用中,我们可以根据业务需求选择合适的事务传播行为和隔离级别 。例如,在消息发送方法上设置事务传播行为和隔离级别:

 

import org.springframework.transaction.annotation.Transactional;

@Component

public class MessageProducer {

@Autowired

private JmsTemplate jmsTemplate;

@Autowired

private Queue queue;

@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED)

public void send(String message) {

jmsTemplate.convertAndSend(queue, message);

System.out.println("发送消息: " + message);

}

}

在这个例子中,@Transactional注解用于标记send方法需要事务支持 。propagation = Propagation.REQUIRED表示如果当前存在事务,则加入该事务;如果当前没有事务,则创建一个新的事务 。isolation = Isolation.READ_COMMITTED表示使用READ_COMMITTED事务隔离级别,即一个事务只能读取已经提交的数据 。这样,在发送消息时,如果出现异常,整个事务会回滚,保证了消息发送的可靠性 。

相关文章:

  • OpenCv高阶(九)——背景建模
  • playwright的简单使用
  • Linux实验课
  • SQL进阶知识:九、高级数据类型
  • 【Pandas】pandas DataFrame rsub
  • 在构造函数内部和外部定义的方法区别
  • 【时时三省】(C语言基础)循环程序举例
  • AI 场景落地:API 接口服务 VS 本地部署,哪种更适合?
  • 管家婆财贸ERP BB105.销售按结算单位价格跟踪
  • PySide与Qt工具链的深度整合
  • C语言里位操作的应用
  • 【Git】连接github时的疑难杂症(DNS解析失败)
  • 【LeetCode 热题 100】滑动窗口最大值 / 最小覆盖子串 / 轮转数组 / 缺失的第一个正数
  • 筛法求约数个数
  • Jira、PingCode、Redmine等18款缺陷管理工具对比评测
  • 数据加密技术:从对称加密到量子密码的原理与实战
  • C++[类和对象][3]
  • git 命令集
  • 设计模式-- 原型模式详解
  • mybatis-plus里的com.baomidou.mybatisplus.core.override.MybatisMapperProxy 类的详细解析
  • 文旅部:推动离境退税购物便利化有利于更多国内优质商品走出去
  • 摩根士丹利基金雷志勇:AI带来的产业演进仍在继续,看好三大景气领域
  • 中青报:“猿辅导员工猝死”事件上热搜,是对健康职场环境的共同关切
  • 济南市莱芜区委书记焦卫星任济南市副市长
  • 上海:全面建设重复使用火箭创新高地、低成本商业卫星规模制造高地
  • 5月1日起,涉外婚姻登记将在上海市16区全面铺开