当前位置: 首页 > news >正文

Kafka HA集群配置搭建与SpringBoot使用示例总结

Kafka HA集群配置搭建与SpringBoot使用示例总结

一、Kafka高可用(HA)集群搭建

1. 环境准备

  • 至少3台服务器(推荐奇数台,如3、5、7)
  • 已安装Java环境(JDK 1.8+)
  • 下载Kafka二进制包(如kafka_2.13-3.2.1.tgz)

2. 集群配置步骤

2.1 解压安装
tar -xzf kafka_2.13-3.2.1.tgz -C /opt/
cd /opt/kafka_2.13-3.2.1
2.2 配置Zookeeper(Kafka 3.0+可使用KRaft模式,无需Zookeeper)
# config/zookeeper.properties
dataDir=/data/zookeeper
clientPort=2181
maxClientCnxns=100
tickTime=2000
initLimit=10
syncLimit=5
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888

在每个节点创建myid文件:

# node1
echo "1" > /data/zookeeper/myid
# node2
echo "2" > /data/zookeeper/myid
# node3
echo "3" > /data/zookeeper/myid
2.3 配置Kafka
# config/server.properties
broker.id=1 # 每个节点唯一
listeners=PLAINTEXT://node1:9092
advertised.listeners=PLAINTEXT://node1:9092
log.dirs=/data/kafka-logs
num.partitions=3
default.replication.factor=3
min.insync.replicas=2
zookeeper.connect=node1:2181,node2:2181,node3:2181
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
2.4 启动服务
# 启动Zookeeper(每个节点)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties# 启动Kafka(每个节点)
bin/kafka-server-start.sh -daemon config/server.properties
2.5 验证集群
# 创建topic测试
bin/kafka-topics.sh --create --bootstrap-server node1:9092,node2:9092,node3:9092 \
--replication-factor 3 --partitions 3 --topic test-ha# 查看topic详情
bin/kafka-topics.sh --describe --bootstrap-server node1:9092 --topic test-ha

二、SpringBoot集成Kafka示例

1. 添加依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.6</version>
</dependency>

2. 配置application.yml

spring:kafka:bootstrap-servers: node1:9092,node2:9092,node3:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializeracks: allretries: 3consumer:group-id: springboot-groupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliestenable-auto-commit: falselistener:ack-mode: manual_immediate

3. 生产者示例

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message).addCallback(result -> System.out.println("Message sent: " + message),ex -> System.err.println("Failed to send message: " + ex.getMessage()));}// 发送带key的消息public void sendMessageWithKey(String topic, String key, String message) {kafkaTemplate.send(topic, key, message);}
}

4. 消费者示例

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumerService {@KafkaListener(topics = "test-ha", groupId = "springboot-group")public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {try {System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n",record.key(), record.value(), record.partition(), record.offset());// 业务处理逻辑// 手动提交offsetack.acknowledge();} catch (Exception e) {// 处理异常,可选择不提交offset以便重试System.err.println("Error processing message: " + e.getMessage());}}// 批量消费@KafkaListener(topics = "batch-topic", groupId = "springboot-group", containerFactory = "batchFactory")public void consumeBatch(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {System.out.println("Received batch with " + records.size() + " messages");// 批量处理逻辑ack.acknowledge();}
}

5. 配置类(可选)

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConfig {// 批量消费工厂@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> batchFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.setBatchListener(true); // 开启批量消费factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}// 自定义生产者配置(可选)@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.ACKS_CONFIG, "all");configProps.put(ProducerConfig.RETRIES_CONFIG, 3);return new DefaultKafkaProducerFactory<>(configProps);}
}

三、最佳实践与注意事项

  1. 高可用配置建议

    • 生产环境至少3个broker节点
    • 重要topic设置replication.factor≥3
    • 设置min.insync.replicas=2确保数据安全
    • 监控Kafka集群健康状态
  2. 性能优化

    • 根据业务需求合理设置partition数量
    • 调整batch.size和linger.ms优化生产者吞吐量
    • 消费者可配置max.poll.records控制单次拉取数量
  3. 错误处理

    • 实现KafkaListenerErrorHandler处理消费异常
    • 配置死信队列(DLT)处理无法消费的消息
    • 生产者实现RetryTemplate进行重试
  4. 安全配置(生产环境建议):

    • 启用SASL/SSL认证
    • 配置ACL权限控制
    • 启用日志审计
  5. 监控与运维

    • 使用Kafka Manager或CMAK管理集群
    • 集成Prometheus+Grafana监控
    • 定期清理过期数据,配置log.retention.hours

通过以上配置和示例,可以搭建一个高可用的Kafka集群,并在SpringBoot应用中实现可靠的消息生产和消费。

相关文章:

  • 设计一个新能源汽车控制系统开发框架,并提供一个符合ISO 26262标准的模块化设计方案。
  • zynq7035的arm一秒钟最多可以支持触发多少次中断
  • Docker compose 部署微服务项目(从0-1出发纯享版无废话)
  • 汽车制造行业如何在数字化转型中抓住机遇?
  • IdeaVim 配置与使用指南
  • 算法效率的钥匙:从大O看复杂度计算 —— C语言数据结构第一讲
  • Linux红帽:RHCSA认证知识讲解(十 四)分区管理、交换分区,创建逻辑卷与调整逻辑卷的大小
  • 【网络原理】从零开始深入理解TCP的各项特性和机制.(二)
  • WPF常用技巧汇总 - Part 2
  • Java详解LeetCode 热题 100(01):LeetCode 1. 两数之和(Two Sum)详解
  • EDR 保护时间(EDR Guard Time)
  • DeepSeek智能时空数据分析(四):绘制行政区域并定制样式
  • Java后端开发day37--源码解析:TreeMap可变参数--集合工具类:Collections
  • PostgreSQL的扩展 credcheck
  • 犬鼻子检测数据集VOC+YOLO格式6808张1类别近距离拍摄
  • 云原生课程-Docker
  • NLP预处理:如何 处理表情符号
  • Linux操作系统从入门到实战(四)Linux基础指令(下)
  • Hyper-V安装Win10系统,报错“No operating system was loaded“
  • 初识数据结构——二叉树从基础概念到实践应用
  • 第五届全国医院人文管理路演在昆山举办:患者体验才是温度计
  • 马上评丨机械停车库成“僵尸库”,设计不能闭门造车
  • 人民日报:应对外贸行业风险挑战,稳企业就是稳就业
  • 榆林市委常委王华胜已任榆林市政协党组书记
  • 罗马教皇方济各葬礼在梵蒂冈举行
  • 价格周报|猪价继续回暖:二次育肥热度仍存,对猪价仍有一定支撑