当前位置: 首页 > news >正文

SpringBoot集成Kafka详解

在Spring Boot项目中集成Kafka,首先需要确保Kafka服务器已经启动并正常运行。然后,通过引入 Kafka依赖,编写yml配置,创建Kafka生产者和消费者。最后通过接口调用,实现消息发送和消费。

一、pom依赖

<!--kafka-->

<!--注意版本冲突-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.0</version>
</dependency>

二、yml配置

spring:

  kafka:  

    bootstrap-servers: localhost:9092   # Kafka集群地址

    # 生产者配置

    producer:

      compression-type: gzip   # 默认不压缩,可选值:"gzip", "snappy", "lz4", "zstd"

      acks: all   # 确认机制

      retries: 3   # 发送失败后重试次数

      retry-backoff-ms: 500    # 每次重试之间的间隔时间(单位:毫秒)

      enable-idempotence: true   # 启用幂等性

      linger-ms: 5   # 延迟发送的时间,单位毫秒,默认 0ms(立即发送)

      batch-size: 32768   # 生产者每个批次发送的最大字节数,默认 16KB

      buffer-memory: 67108864   # 生产者缓冲区内存大小,单位字节,默认32MB

      key-serializer: org.apache.kafka.common.serialization.StringSerializer   # 键的序列化器

      value-serializer: org.apache.kafka.common.serialization.StringSerializer   # 值的序列化器

      transactional-id-prefix: tx-   # 启用事务时的前缀,用于事务标识

      # 生产者拦截器,用于记录日志和指标收集

      interceptors:

        - org.apache.kafka.clients.producer.internals.LogAndMetricsProducerInterceptor

    # 消费者配置

    consumer:

      group-id: my-consumer-group   # 消费者组ID

      concurrency: 10   # 设置消费者并发数

      enable-auto-commit: false   # 禁用自动提交,手动提交偏移量

      auto-offset-reset: earliest   # 如果偏移量无效则从最早的消息开始消费。可选"latest"

      session-timeout-ms: 10000   # session会话超时时间(与心跳有关)

      max-poll-records: 1000   # 每次poll的最大记录数

      max-poll-interval-ms: 300000   # 最大轮询间隔 ,确保消费者不会在空闲期间超时 

      fetch-max-wait-ms: 300   # 拉取消息时的最大等待时间

      fetch-max-bytes: 104857600   # 拉取的最大字节数,单位字节

      fetch-min-size: 5000   # 拉取消息时的最小字节数,单位字节

      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer   # 键的反序列化器

      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer   # 值的反序列化器

      isolation-level: read_committed   # 事务隔离级别,确保读取已提交的数据。可选 "read_uncommitted"

    # Admin配置,用于创建和管理Kafka的主题

    admin:  

      bootstrap-servers: localhost:9092   # Kafka集群地址

      default-topic:

        partitions: 5   # 默认主题的分区数

        replication-factor: 3   # 默认主题的副本数

    # 副本机制

    replication:   # 副本落后超过此时间(毫秒)会被移出 ISR 列表

      replica_lag_time_max_ms: 10000   # 默认为 10000 毫秒,即 10 秒

      # 最少需要多少个同步副本才能进行写入

      min_insync_replicas: 2   # 默认为 1,根据需求设置

      # 是否允许在没有足够 ISR 的情况下进行领导者选举

      unclean_leader_election_enable: false   # 默认为 false,防止数据丢失

    # 监听器的配置

    listener:  

      concurrency: 10   # 设置监听器的并发数,控制消费者的并发线程数

      ack-mode: manual   # 手动确认消息,默认是 "auto" 自动确认

      poll-timeout: 3000   # 拉取消息的超时时间,单位毫秒

      missing-topics-fatal: false   # 是否忽略主题缺失的错误,默认是false

      container-factory: kafkaListenerContainerFactory   # 默认监听器容器工厂

    # 错误处理配置

    error-handler:

      # 在消息消费出错时重试的配置

      retry:

        max-attempts: 3   # 最大重试次数

        initial-interval: 1000   # 初始重试间隔,单位毫秒

        multiplier: 1.5   # 重试间隔的增量倍数

    #日志段和日志保留配置

    log: 

      # 设置每个日志段的大小

      segment:

        bytes: 1073741824   # 每个日志段的大小,默认为 1GB # 设置日志保留时间(毫秒)

      retention:

        ms: 604800000   # 7天,单位是毫秒

      # 设置日志的最大保留大小

      retention_bytes: 10737418240   # 默认为 10GB

      # 设置日志清理策略(delete 或 compact)

      cleanup_policy: "delete"   # 可选:delete 或 compact

      # 设置日志索引间隔大小

      index_interval_bytes: 4096   # 默认为 4096

三、KafkaConfig(可选)

@Configuration

public class KafkaConfig {

    @Bean

    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {

        return new KafkaTemplate<>(producerFactory);

    }

    @Bean

    public ProducerFactory<String, String> producerFactory() {

        Map<String, Object> configProps = new HashMap<>();

        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(configProps);

    }

}

四、KafkaProducer

在Spring Boot中创建Kafka生产者,向Kafka主题发送消息。

1.KafkaTemplate<String, String> 

Spring Kafka提供的用于生产消息的模板类。

2.sendMessage()

向指定的Kafka主题(test_topic)发送消息。

@Service

public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {

        this.kafkaTemplate = kafkaTemplate;

    }

    public void sendMessage(String topic, String message) {

        kafkaTemplate.send(topic, message);

    }

}

五、KafkaConsumer

编写Kafka消费者来接收消息。

1.@KafkaListener

用于指定消费的主题及消费者组。

2.consumeMessage()

将会接收到来自test_topic主题的消息,并将其打印出来。

@Service

public class KafkaConsumer {

    // 监听test主题的消息

   @KafkaListener(topics = "test", groupId = "my-consumer-group")

    public void consumeMessage(String message) {

        System.out.println("Received Message in group my-group: " + message);

    }

}

六、自定义调用

在启动类中,使用CommandLineRunner来在应用启动时发送一条消息。

@SpringBootApplication

public class KafkaApplication {

    public static void main(String[] args) {

        SpringApplication.run(KafkaApplication.class, args);

    }

    @Bean

    public CommandLineRunner demo(KafkaProducer producer) {

        return (args) -> {

            producer.sendMessage("Hello, Kafka!");  

            // 启动时发送一条消息

        };

    }

}

七、注意事项

根据你的 Spring Boot 版本,选择兼容的 Kafka 依赖版本:

八、总结

在Spring Boot项目中集成Kafka,首先需要确保Kafka服务器已经启动并正常运行。然后,通过引入 Kafka依赖,编写yml配置,创建Kafka生产者和消费者。最后通过接口调用,实现消息发送和消费。

相关文章:

  • 【锂电池SOH估计】SVM支持向量机锂电池健康状态估计,锂电池SOH估计(Matlab完整源码和数据)
  • 零点、驻点、拐点、极值点、最值点的定义、几何意义、求解方法
  • 2025年4月19日-得物算法岗春招笔试题-第二题
  • 项目预期管理:超越甘特图,实现客户价值交付
  • The_Planets_Earth靶场笔记(VulnHub)
  • 996引擎-拓展变量:物品变量
  • python:循环语句 while循环,for遍历循环,break,continue,else,嵌套循环(打印矩形、三角形,九九乘法表)
  • AI与思维模型【68】——排列组合
  • ASP.NET 0~1学习
  • 物联网技术赋能:复杂环境下的能源数据零丢失
  • 文件管理详解(曼波脑图版)
  • 爆肝整理!Stable Diffusion的完全使用手册(二)
  • 相控阵列天线:原理、优势和类型
  • strings.ToLower 使用详解
  • 健身房管理系统设计与实现(springboot+ssm+vue+mysql)含万字详细文档
  • 【AI论文】CLIMB:基于聚类的迭代数据混合自举语言模型预训练
  • LACP协议解析
  • pandoc实战教程(一):安装latex工具
  • 使用 Visual Studio 2022 中的 .http 文件
  • AI与思维模型【67】——元认知
  • 李家超率团访问浙江
  • 人民日报聚焦外贸“重镇”福建晋江:多元化布局扩大“朋友圈”
  • 中印尼举行外长防长“2+2”对话机制首次部长级会议
  • 申花迎来中超三连胜,这一次终于零封对手了
  • 国常会:要持续稳定股市,持续推动房地产市场平稳健康发展
  • 为护航企业“出海”,“无问西东·中外商会”海上沙龙举行