Key:消息的标识。 Value:消息内容。
为了便于追踪,请为消息设置一个唯一的Key。您可以通过Key追踪某消息,打印发送日志和消费日志,了解该消息的发送和消费情况。
如果消息发送量较大,建议不要设置Key,并使用黏性分区策略。黏性分区策略详情,请参见黏性分区策略。
分布式环境下,由于网络等原因偶尔发送失败是常见的。导致这种失败的原因可能是消息已经发送成功,但是ACK失败,也有可能是确实没发送成功。
您可以根据业务需求,设置以下重试参数:
retries:消息发送失败时的重试次数。
retry.backoff.ms,消息发送失败的重试间隔,建议设置为1000,单位:毫秒。
发送接口是异步的,如果您想接收发送的结果,可以调用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)。
Producer是线程安全的,且可以往任何Topic发送消息。通常情况下,一个应用对应一个Producer。
Acks的说明如下:
acks=0:无需服务端的Response、性能较高、丢数据风险较大。
acks=1:服务端主节点写成功即返回Response、性能中等、丢数据风险中等、主节点宕机可能导致数据丢失。
acks=all:服务端主节点写成功且备节点同步成功才返回Response、性能较差、数据较为安全、主节点和备节点都宕机才会导致数据丢失。
为了提升发送性能, 建议设置为acks=1。
一般情况下,一个Kafka Topic会有多个分区。Kafka Producer客户端在向服务端发送消息时,需要先确认往哪个Topic的哪个分区发送。我们给同一个分区发送多条消息时,Producer客户端将相关消息打包成一个Batch,批量发送到服务端。Producer客户端在处理Batch时,是有额外开销的。一般情况下,小Batch会导致Producer客户端产生大量请求,造成请求队列在客户端和服务端的排队,并造成相关机器的CPU升高,从而整体推高了消息发送和消费延迟。一个合适的Batch大小,可以减少发送消息时客户端向服务端发起的请求次数,在整体上提高消息发送的吞吐和延迟。
Batch机制,Kafka Producer端主要通过两个参数进行控制:
batch.size : 发往每个分区(Partition)的消息缓存量(消息内容的字节数之和,不是条数)。达到设置的数值时,就会触发一次网络请求,然后Producer客户端把消息批量发往服务器。如果batch.size设置过小,有可能影响发送性能和稳定性。建议保持默认值16384。单位:字节。
linger.ms : 每条消息在缓存中的最长时间。若超过这个时间,Producer客户端就会忽略batch.size的限制,立即把消息发往服务器。建议根据业务场景, 设置linger.ms在100~1000之间。单位:毫秒。
因此,Kafka Producer客户端什么时候把消息批量发送至服务器是由batch.size和linger.ms共同决定的。您可以根据具体业务需求进行调整。为了提升发送的性能,保障服务的稳定性, 建议您设置batch.size=16384和linger.ms=1000。
只有发送到相同分区的消息,才会被放到同一个Batch中,因此决定一个Batch如何形成的一个因素是Kafka Producer端设置的分区策略。Kafka Producer允许通过设置Partitioner的实现类来选择适合自己业务的分区。在消息指定Key的情况下,Kafka Producer的默认策略是对消息的Key进行哈希,然后根据哈希结果选择分区,保证相同Key的消息会发送到同一个分区。
在消息没有指定Key的情况下,Kafka 2.4版本之前的默认策略是循环使用主题的所有分区,将消息以轮询的方式发送到每一个分区上。但是,这种默认策略Batch的效果会比较差,在实际使用中,可能会产生大量的小Batch,从而使得实际的延迟增加。鉴于该默认策略对无Key消息的分区效率低问题,Kafka 在2.4版本引入了黏性分区策略(Sticky Partitioning Strategy)。即使用KDP提供的kafka默认即会开启黏性分区策略。
黏性分区策略主要解决无Key消息分散到不同分区,造成小Batch问题。其主要策略是如果一个分区的Batch完成后,就随机选择另一个分区,然后后续的消息尽可能地使用该分区。这种策略在短时间内看,会将消息发送到同一个分区,如果拉长整个运行时间,消息还是可以均匀地发布到各个分区上的。这样可以避免消息出现分区倾斜,同时还可以降低延迟,提升服务整体性能。
结合Kafka 的Batch设计思路,Kafka 会缓存消息并打包发送,如果缓存太多,则有可能造成OOM(Out of Memory)。
buffer.memory : 发送的内存池大小。如果内存池设置过小,则有可能导致申请内存耗时过长,从而影响发送性能,甚至导致发送超时。建议buffer.memory ≧ batch.size * 分区数 * 2。单位:字节。
buffer.memory的默认数值是32 MB,对于单个Producer而言,可以保证足够的性能。
重要 如果您在同一个JVM中启动多个Producer,那么每个Producer都有可能占用32 MB缓存空间,此时便有可能触发OOM。
在生产时,一般没有必要启动多个Producer;如有特殊情况需要,则需要考虑buffer.memory的大小,避免触发OOM。
单个分区(Partition)内,消息是按照发送顺序储存的,是基本有序的。
默认情况下,Kafka 为了提升可用性,并不保证单个分区内绝对有序,在升级或者宕机时,会发生少量消息乱序(某个分区挂掉后把消息Failover到其它分区)。
如果业务要求分区保证严格有序,请在创建Topic时选择使用Local存储。
分区个数主要影响的是消费者的并发数量。
对于同一个Group内的消费者来说,一个分区最多只能被一个消费者消费。因此,消费实例的个数不要大于分区的数量,否则会有消费实例分配不到任何分区而处于空跑状态。
控制台的默认分区个数是12,可以满足绝大部分场景的需求。您可以根据业务使用量进行增加。不建议分区数小于12,否则可能影响消费发送性能;也不建议超过100个,否则易引发消费端Rebalance。
Kafka消费者有两个相关参数:
enable.auto.commit:是否采用自动提交位点机制。默认值为true,表示默认采用自动提交机制。
auto.commit.interval.ms: 自动提交位点时间间隔。默认值为1000,即1s。
这两个参数组合的结果就是,每次poll数据前会先检查上次提交位点的时间,如果距离当前时间已经超过参数auto.commit.interval.ms规定的时长,则客户端会启动位点提交动作。
因此,如果将enable.auto.commit设置为true,则需要在每次poll数据时,确保前一次poll出来的数据已经消费完毕,否则可能导致位点跳跃。
如果想自己控制位点提交,请把enable.auto.commit设为false,并调用commit(offsets)函数自行控制位点提交。
以下两种情况,会发生消费位点重置:
当服务端不存在曾经提交过的位点时(例如客户端第一次上线)。
当从非法位点拉取消息时(例如某个分区最大位点是10,但客户端却从11开始拉取消息)。
Java客户端可以通过auto.offset.reset来配置重置策略,主要有三种策略:
latest:从最大位点开始消费。
earliest:从最小位点开始消费。
none:不做任何操作,即不重置。
说明 建议设置成latest,而不要设置成earliest,避免因位点非法时从头开始消费,从而造成大量重复。
如果是您自己管理位点,可以设置成none。
消费过程是由客户端主动去服务端拉取消息的,在拉取大消息时,需要注意控制拉取速度,注意修改配置:
max.poll.records:每次Poll获取的最大消息数量。如果单条消息超过1 MB,建议设置为1。
fetch.max.bytes:设置比单条消息的大小略大一点。
max.partition.fetch.bytes:设置比单条消息的大小略大一点。
拉取大消息的核心是逐条拉取的。
Kafka消费的语义是at least once, 也就是至少投递一次,保证消息不丢失,但是无法保证消息不重复。在出现网络问题、客户端重启时均有可能造成少量重复消息,此时应用消费端如果对消息重复比较敏感(例如订单交易类),则应该做消息幂等。
以数据库类应用为例,常用做法是:
发送消息时,传入key作为唯一流水号ID。
消费消息时,判断key是否已经消费过,如果已经被消费,则忽略,如果没消费过,则消费一次。
如果应用本身对少量消息重复不敏感,则不需要做此类幂等检查。
Kafka是按分区逐条消息顺序向前推进消费的,如果消费端拿到某条消息后执行消费逻辑失败,例如应用服务器出现了脏数据,导致某条消息处理失败,等待人工干预,那么有以下两种处理方式:
失败后一直尝试再次执行消费逻辑。这种方式有可能造成消费线程阻塞在当前消息,无法向前推进,造成消息堆积。
Kafka没有处理失败消息的设计,实践中通常会打印失败的消息或者存储到某个服务(例如创建一个Topic专门用来放失败的消息),然后定时检查失败消息的情况,分析失败原因,根据情况处理。
Kafka的消费机制是由客户端主动去服务端拉取消息进行消费的。因此,如果客户端能够及时消费,则不会产生较大延迟。如果产生了较大延迟,请先关注是否有堆积,并注意提高消费速度。
消费端最常见的问题就是消费堆积,最常造成堆积的原因是:
消费速度跟不上生产速度,此时应该提高消费速度,详情请参见提高消费速度。
消费端产生了阻塞。
消费端拿到消息后,执行消费逻辑,通常会执行一些远程调用,如果这个时候同步等待结果,则有可能造成一直等待,消费进程无法向前推进。
消费端应该竭力避免堵塞消费线程,如果存在等待调用结果的情况,建议设置等待的超时时间,超时后作为消费失败进行处理。
提高消费速度有以下两个办法:
1.增加Consumer实例个数。
可以在进程内直接增加(需要保证每个实例对应一个线程,否则没有太大意义),也可以部署多个消费实例进程;需要注意的是,实例个数超过分区数量后就不再能提高速度,将会有消费实例不工作。
2.增加消费线程。
增加Consumer实例本质上也是增加线程的方式来提升速度,因此更加重要的性能提升方式是增加消费线程,最基本的步骤如下:
Kafka自身没有消息过滤的语义。实践中可以采取以下两个办法:
如果过滤的种类不多,可以采取多个Topic的方式达到过滤的目的。
如果过滤的种类多,则最好在客户端业务层面自行过滤。
实践中请根据业务具体情况进行选择,也可以综合运用上面两种办法。