Kafka HA集群配置搭建与SpringBoot使用示例总结
Kafka HA集群配置搭建与SpringBoot使用示例总结
一、Kafka高可用(HA)集群搭建
1. 环境准备
- 至少3台服务器(推荐奇数台,如3、5、7)
- 已安装Java环境(JDK 1.8+)
- 下载Kafka二进制包(如kafka_2.13-3.2.1.tgz)
2. 集群配置步骤
2.1 解压安装
tar -xzf kafka_2.13-3.2.1.tgz -C /opt/
cd /opt/kafka_2.13-3.2.1
2.2 配置Zookeeper(Kafka 3.0+可使用KRaft模式,无需Zookeeper)
# config/zookeeper.properties
dataDir=/data/zookeeper
clientPort=2181
maxClientCnxns=100
tickTime=2000
initLimit=10
syncLimit=5
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
在每个节点创建myid文件:
# node1
echo "1" > /data/zookeeper/myid
# node2
echo "2" > /data/zookeeper/myid
# node3
echo "3" > /data/zookeeper/myid
2.3 配置Kafka
# config/server.properties
broker.id=1 # 每个节点唯一
listeners=PLAINTEXT://node1:9092
advertised.listeners=PLAINTEXT://node1:9092
log.dirs=/data/kafka-logs
num.partitions=3
default.replication.factor=3
min.insync.replicas=2
zookeeper.connect=node1:2181,node2:2181,node3:2181
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
2.4 启动服务
# 启动Zookeeper(每个节点)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties# 启动Kafka(每个节点)
bin/kafka-server-start.sh -daemon config/server.properties
2.5 验证集群
# 创建topic测试
bin/kafka-topics.sh --create --bootstrap-server node1:9092,node2:9092,node3:9092 \
--replication-factor 3 --partitions 3 --topic test-ha# 查看topic详情
bin/kafka-topics.sh --describe --bootstrap-server node1:9092 --topic test-ha
二、SpringBoot集成Kafka示例
1. 添加依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.6</version>
</dependency>
2. 配置application.yml
spring:kafka:bootstrap-servers: node1:9092,node2:9092,node3:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializeracks: allretries: 3consumer:group-id: springboot-groupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliestenable-auto-commit: falselistener:ack-mode: manual_immediate
3. 生产者示例
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message).addCallback(result -> System.out.println("Message sent: " + message),ex -> System.err.println("Failed to send message: " + ex.getMessage()));}// 发送带key的消息public void sendMessageWithKey(String topic, String key, String message) {kafkaTemplate.send(topic, key, message);}
}
4. 消费者示例
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumerService {@KafkaListener(topics = "test-ha", groupId = "springboot-group")public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {try {System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n",record.key(), record.value(), record.partition(), record.offset());// 业务处理逻辑// 手动提交offsetack.acknowledge();} catch (Exception e) {// 处理异常,可选择不提交offset以便重试System.err.println("Error processing message: " + e.getMessage());}}// 批量消费@KafkaListener(topics = "batch-topic", groupId = "springboot-group", containerFactory = "batchFactory")public void consumeBatch(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {System.out.println("Received batch with " + records.size() + " messages");// 批量处理逻辑ack.acknowledge();}
}
5. 配置类(可选)
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConfig {// 批量消费工厂@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> batchFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.setBatchListener(true); // 开启批量消费factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}// 自定义生产者配置(可选)@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.ACKS_CONFIG, "all");configProps.put(ProducerConfig.RETRIES_CONFIG, 3);return new DefaultKafkaProducerFactory<>(configProps);}
}
三、最佳实践与注意事项
-
高可用配置建议:
- 生产环境至少3个broker节点
- 重要topic设置replication.factor≥3
- 设置min.insync.replicas=2确保数据安全
- 监控Kafka集群健康状态
-
性能优化:
- 根据业务需求合理设置partition数量
- 调整batch.size和linger.ms优化生产者吞吐量
- 消费者可配置max.poll.records控制单次拉取数量
-
错误处理:
- 实现KafkaListenerErrorHandler处理消费异常
- 配置死信队列(DLT)处理无法消费的消息
- 生产者实现RetryTemplate进行重试
-
安全配置(生产环境建议):
- 启用SASL/SSL认证
- 配置ACL权限控制
- 启用日志审计
-
监控与运维:
- 使用Kafka Manager或CMAK管理集群
- 集成Prometheus+Grafana监控
- 定期清理过期数据,配置log.retention.hours
通过以上配置和示例,可以搭建一个高可用的Kafka集群,并在SpringBoot应用中实现可靠的消息生产和消费。