当前位置: 首页 > news >正文

kafka消息队列简单使用

下面是使用Spring Boot和Kafka实现消息队列的简单例子:

  1. 引入依赖

在pom.xml中添加以下依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.5</version>
</dependency>
  1. 配置Kafka

在application.properties中添加Kafka的相关配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  1. 发送消息

创建一个生产者类,使用KafkaTemplate发送消息:

@Service
public class KafkaProducerService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}
  1. 接收消息

创建一个消费者类,使用@KafkaListener注解监听指定的主题,处理消息:

@Service
public class KafkaConsumerService {
    @KafkaListener(topics = "myTopic", groupId = "myGroup")
    public void onMessage(String message) {
        System.out.println("Received message: " + message);
    }
}
  1. 测试

在Controller中调用生产者发送消息,然后在控制台中可以看到消费者接收到的消息:

@RestController
public class KafkaController {
    @Autowired
    private KafkaProducerService kafkaProducerService;

    @GetMapping("/send")
    public String sendMessage() {
        kafkaProducerService.sendMessage("myTopic", "Hello, Kafka!");
        return "Message sent successfully";
    }
}

以上就是一个简单的使用Spring Boot和Kafka实现消息队列的例子

分区

  1. 编写Kafka生产者代码,使用KafkaTemplate发送消息,并指定分区号。如下所示:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String message, int partition) {
    kafkaTemplate.send("my-topic", partition, null, message);

2.编写Kafka消费者代码,使用@KafkaListener注解监听指定的主题,并在方法参数中获取分区号。如下所示:

@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(ConsumerRecord<String, String> record, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
    System.out.println("Received message: " + record.value() + ", partition: " + partition);

相关文章:

  • C++:vector 定义,用法,作用,注意点
  • 【Lychee图床】本地电脑搭建私人图床,公网远程访问
  • C++核心编程--对象篇
  • 【Linux学习】05-1Linux上安装部署各类软件
  • 【Redis】Redis做为缓存,MySQL如何与Redis保持数据一致
  • 浮点型数字
  • 使用Visual Studio调试排查Windows系统程序audiodg.exe频繁弹出报错
  • ip地址可以精确定位吗
  • 洗衣行业在线预约小程序系统源码搭建 支持直播功能+在线预约下单+上门取件
  • go mod tidy 报错:x509: certificate signed by unknown authority 最佳实践
  • sentinel-dashboard-1.8.0.jar开机自启动脚本
  • 文件内容显示
  • 常用音频接口:TDM,PDM,I2S,PCM
  • .NET的键盘Hook管理类,用于禁用键盘输入和切换
  • 使用Python做一个微信机器人
  • Azure AD混合部署,通过 Intune 管理设备,实现条件访问
  • 8.6 枚举类型
  • Python15题day13
  • 爬虫为什么需要 HTTP 代理 IP?
  • 学习资源汇集
  • 深化应用型人才培养,这所高校聘任行业企业专家深度参与专业设置
  • 华夏幸福:累计未能如期偿还债务金额合计为227.91亿元
  • 《王牌对王牌》确认回归,“奔跑吧”将有主题乐园
  • 外交部:中方在乌克兰问题上一直积极致力于劝和促谈
  • 生病时不能吃“发物”?你可能忌口错了
  • “80后”李建强已任内蒙古镶黄旗委书记