Kafka 配置参数性能调优建议
文章目录
- 1、生产者调优
- batch.size(重要)
- linger.ms
- compression.type
- acks(重要)
- buffer.memory
- max.in.flight.requests.per.connection(重要)
- message.max.bytes(重要)
- 2、消费者调优
- fetch.min.bytes(重要)
- fetch.max.wait.ms(重要)
- max.poll.records(重要)
- auto.offset.reset(重要)
- session.timeout.ms
- heartbeat.interval.ms
- 3、Broker 调优
- num.network.threads
- num.io.threads
- log.flush.interval.messages(重要)
- log.flush.interval.ms(重要)
- num.replica.fetchers
- auto.create.topics.enable
- unclean.leader.election.enable
- log.retention.hours
1、生产者调优
batch.size(重要)
建议:64KB ~ 1MB
batch.size
指的是生产者在将消息发送到 Broker 之前,会将多条消息收集到一个批次进行发送。
增大batch.size
可以显著减少网络请求次数,因为每次发送更多的消息,从而提高了网络传输的效率,有助于提升整体的吞吐量。
但是batch.size
调整过大,也会带来一定的延迟,因为生产者需要等待更多的消息填满批次,如果批次一直无法填满,消息就会在生产者端停留更长时间,直到达到其他触发发送的条件。在高吞吐量且对延迟要求不是特别苛刻的场景,可以适当增大该值。
linger.ms
建议:10 ~ 100ms
linger.ms
是生产者等待batch.size
填满的时间。
当设置该值后,即使没有填满,只要到了等待时间 linger.ms
值,生产者也会将当前批次的消息发送出去。
增大 linger.ms
可以让生产者有更多的时间去收集消息,从而形成更大的批次,提升吞吐量。这样消息在生产者端等待的时间会变长,会增加消息的延迟。在一些允许一定延迟的批量数据处理场景中,可以适当增大该值来提高吞吐量。
compression.type
建议:snappy 或 lz4
用于减少生产者发送到 Broker 的数据量。使用compression.type
可以将消息进行压缩后再传输,这样可以有效节省网络带宽,提高网络传输效率。
snappy
压缩算法的特点是压缩和解压缩速度快,对 CPU 资源的消耗相对较低,适用于对压缩和解压缩速度要求较高的场景;lz4
同样具有较高的压缩和解压缩速度,并且在压缩比上可能比 snappy
稍好一些。使用压缩算法本质上是用 CPU 换取带宽,在网络带宽有限的情况下非常有用。
acks(重要)
建议:1 或 all
acks
参数控制了生产者在发送消息后需要收到多少确认才认为消息发送成功。
当 acks = 1
时,只要 Leader 副本
确认接收到消息,生产者就会收到响应,这种方式在保证一定可靠性的同时,具有较高的吞吐量。
当 acks = all
时,生产者需要等待所有的 ISR(In-Sync Replicas)
副本都确认接收到消息才会收到响应,这提供了最高的可靠性,但会略微降低吞吐量,因为需要等待更多的确认信息。在对消息可靠性要求极高的场景下,如金融交易数据的传输,建议使用 acks = all。
buffer.memory
默认:32MB
建议:32 ~ 64MB
buffer.memory
用于暂存生产者未发送到 Broker 的消息。
当生产者发送消息的速度超过了 Broker 接收的速度时,消息会被存储在buffer.memory
中。
如果缓冲区设置过小,可能会导致缓冲区很快被填满,从而使生产者阻塞,影响消息的发送效率。适当增大缓冲区大小可以避免因缓冲区满导致的阻塞问题,但也不能设置过大,否则会占用过多的系统内存资源。
max.in.flight.requests.per.connection(重要)
建议:1(性能差、顺序强一致)、n(高性能、但可能乱序)
max.in.flight.requests.per.connection
限制了单个连接上允许的最大未确认请求数。生产者可以在一个连接上同时发送多个请求而无需等待每个请求的响应。
如果将该值设置得过高,可能会导致消息的顺序被破坏。例如:当一个请求失败并重试时,后续已经发送的请求可能会先到达 Broker,从而导致消息的顺序不一致。需要根据实际情况合理设置该值,以平衡吞吐量和消息顺序性。
message.max.bytes(重要)
建议:小于128KB
生产者能够发送的最大消息大小。
设置合适的 message.max.bytes
可以确保生产者发送的消息不会超出 Broker 和消费者的处理能力。如果设置得过大,可能会导致 Broker 和消费者的内存压力增大,处理效率降低;如果设置得过小,可能无法满足某些业务场景下大消息的传输需求。需要根据实际业务中的消息大小分布来合理调整该值。
2、消费者调优
fetch.min.bytes(重要)
建议:1MB - 10MB
消费者在单次拉取数据时,从 Broker 获得的最小数据量。
通过设置一个较大的值,可以减少消费者向 Broker 发送拉取请求的频率,因为只有当 Broker 上有足够的数据满足 fetch.min.bytes
的要求时,才会将数据返回给消费者。这样可以降低网络开销,提高整体的性能。但如果设置得过大,可能会导致消费者等待时间过长,影响实时性。
fetch.max.wait.ms(重要)
建议:500ms
消费者等待 fetch.min.bytes
数据量的超时时间。
当消费者向 Broker 发送拉取请求后,如果在 fetch.max.wait.ms
时间内,Broker 上的数据量仍然没有达到 fetch.min.bytes
的要求,Broker 也会将当前可用的数据返回给消费者。
该参数用于平衡延迟和吞吐量。设置得较短,消费者可能会频繁地拉取少量数据,增加网络开销;设置得较长,可能会导致消息处理的延迟增加。
max.poll.records(重要)
建议:500 - 5000
该参数限制了消费者单次拉取的最大消息数。合理设置该值可以避免消费者处理消息时出现阻塞的情况。
设置得过大,消费者可能会一次性拉取过多的消息,导致处理这些消息的时间过长,从而影响后续消息的拉取和处理;设置得过小,消费者需要频繁地向 Broker 发送拉取请求,增加了网络开销。需要根据消费者的处理能力和 Broker 的负载情况来调整该值。
auto.offset.reset(重要)
建议:earliest
该参数决定了在消费者没有有效的偏移量时(例如,新的消费者组或者偏移量已经过期),从何处开始消费消息。当设置为 latest
时,消费者将从最新的消息开始消费,即只消费后续产生的新消息;
当设置为 earliest
时,消费者将从分区的最早消息开始消费。在需要处理历史数据的场景中,可以选择 earliest
;而在只关注最新数据的场景下,选择 latest
更为合适(会丢数据)。
session.timeout.ms
建议:10s - 30s
消费者会定期向协调者发送心跳信息,以表明自己处于活跃状态。如果在 session.timeout.ms
时间内,协调者没有收到消费者的心跳信息,就会认为该消费者已经失效,并将其从消费者组中剔除。
如果该值设置得过短,可能会因为网络抖动等原因导致消费者被误剔除;如果设置得过长,当消费者真正出现故障时,协调者不能及时发现并重新分配分区,会影响整个消费者组的性能。
heartbeat.interval.ms
建议:1/3 * session.timeout.ms
指定消费者向协调者发送心跳的时间间隔。
心跳机制用于维持消费者与协调者之间的连接,确保协调者知道消费者处于活跃状态。通常建议将其设置为 session.timeout.ms
的三分之一,这样可以在保证及时检测到消费者故障的同时,避免过于频繁的心跳请求增加网络开销。
3、Broker 调优
num.network.threads
建议:CPU 核心数 × 3
指定 Broker 用于处理网络请求的线程数。
在高并发场景下,大量的生产者和消费者会同时向 Broker 发送网络请求,如果处理网络请求的线程数不足,会导致请求处理不及时,影响系统的性能。
通过将该值设置为 CPU 核心数的 3 倍可以充分利用 CPU 资源,提高网络请求的处理能力。但具体的值还需要根据实际的并发情况和系统资源进行调整。
num.io.threads
建议:CPU 核心数 × 8
用于处理磁盘 I/O 的线程数。
Kafka 的消息存储依赖于磁盘 I/O,因此磁盘 I/O 的性能对系统的整体性能有很大影响。增加处理磁盘 I/O 的线程数可以提高磁盘 I/O 的并发能力,加快消息的读写速度。
但需要注意的是,该值需要根据磁盘的性能进行调整,如果磁盘的性能较差,过多的线程可能会导致磁盘竞争加剧,反而降低性能。
log.flush.interval.messages(重要)
建议:1(立即刷盘,性能差)、n (批量刷盘,性能好,会丢数据)
指定将消息刷写到磁盘之前,需要累积的消息数。
增大该值可以减少磁盘 I/O 次数,因为每次刷盘会将更多的消息一次性写入磁盘,从而提高磁盘的写入效率。但这也会牺牲部分持久性,因为如果在消息累积过程中 Broker 发生故障,可能会导致部分未刷盘的消息丢失。需要根据业务对数据持久性的要求来合理设置该值。
log.flush.interval.ms(重要)
建议:1000 - 5000ms
强制刷盘的时间间隔,与 log.flush.interval.messages
配合使用。
即使消息数量没有达到 log.flush.interval.messages
的要求,只要时间达到 log.flush.interval.ms
,也会将当前的消息刷写到磁盘。通过设置合理的刷盘间隔,可以在保证一定磁盘 I/O 效率的同时,提高数据的持久性。
num.replica.fetchers
建议:2 - 4
指定 Broker 用于副本同步的线程数。
副本同步是 Kafka 保证数据可靠性的重要机制,增加副本同步线程数可以提升副本同步的速度,减少副本之间的延迟。但过多的线程可能会占用过多的系统资源,因此需要根据实际情况进行调整。
auto.create.topics.enable
建议:false
在生产环境中,建议禁用自动创建 Topic 的功能。
如果启用该功能,当生产者或消费者尝试访问一个不存在的 Topic 时,Kafka 会自动创建该 Topic。这可能会导致生产环境中产生大量不必要的 Topic,增加系统的管理成本和资源消耗。
为了避免这种情况的发生,应该手动创建所需的 Topic,并将 auto.create.topics.enable
设置为 false
。
unclean.leader.election.enable
建议:false
禁止非 ISR 副本成为 Leader 可以确保数据的一致性。
当 Leader 副本失效时,如果启用了 unclean.leader.election.enable
,非 ISR 副本可能会被选举为新的 Leader。由于非 ISR 副本可能落后于其他副本,成为 Leader 后可能会导致数据丢失或不一致。为了保证数据的一致性,建议将该参数设置为 false
。
log.retention.hours
建议:72小时
该参数指定了 Kafka 消息在磁盘上的保留时间。
根据存储容量和业务需求,可以调整该值。如果存储容量充足,且业务需要长期保留数据,可以适当增大该值;如果存储容量有限,或者业务只需要短期的数据,可以减小该值。