学到羊之Kafka-创新互联
Kafka 是一款开源的消息引擎系统,用来实现解耦的异步式数据传递。即系统 A 发消息给到 消息引擎系统,系统 B 通过消息引擎系统读取 A 发送的消息,在大数据场景下,能达到削峰填谷的效果。
2 Kafka 术语Kafka 中的分区机制指的是将每个主题(Topic)划分成多个分区(Partition),每个分区是一组有序的消息日志。生产者生产的每条消息只会被发送到一个分区中,也就是说如果向一个双分区的主题发送一条消息,这条消息要么在分区 0 中,要么在分区 1 中。Kafka 的分区编号是从 0 开始的,如果 Topic 有 100 个分区,那么它们的分区号就是从 0 到 99。每个分区下可以配置若干个副本,其中只能有 1 个领导者副本和 N-1 个追随者副本。
Kafka 的三层消息架构:
1)主题层,每个主题可以配置 M 个分区,而每个分区又可以配置 N 个副本。
2)分区层,每个分区的 N 个副本中只能有一个充当领导者角色,对外提供服务;其他 N-1 个副本是追随者副本,只是提供数据冗余之用。
3)消息层,分区中包含若干条消息,每条消息的位移从 0 开始,依次递增。最后,客户端程序只能与分区的领导者副本进行交互。
Broker 如何持久化数据?
Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故避免了缓慢的随机 I/O 操作,改为性能较好的顺序 I/O 写操作,这也是实现 Kafka 高吞吐量特性的一个重要手段。如果不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此 Kafka 必然要定期地删除消息以回收磁盘。怎么删除呢?简单来说就是通过日志段(Log Segment)机制。在 Kafka 底层,一个日志又进一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。
3 生产者 3.1 消息发送Producer创建时,会创建一个Sender线程并设置为守护线程;
生产消息时,内部是异步流程。生产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建);
批次发送的条件为:缓冲区数据大小达到 batch.size 或者 linger.ms 达到上限,哪个先达到就算哪个;
批次发送后,发往指定分区,然后落盘到broker;如果生产者配置了 retrires 参数大于 0 并且失败原因允许重试,那么客户端内部会对该消息进行重试;
落盘到broker成功,返回生产元数据给生产者;
元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回。
主题是承载真实数据的逻辑容器,而在主题之下还分为若干个分区,主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。
为什么使用分区的概念而不是直接使用多个主题呢?
对数据进行分区的主要原因是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,还可以通过添加新的节点机器来增加整体系统的吞吐量。
3.3.1 分区策略所谓分区策略是决定生产者将消息发送到哪个分区的算法。
轮询策略
顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0。
轮询策略有非常优秀的负载均衡表现,它总是能保证消息大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是最常用的分区策略之一。
随机策略
随意地将消息放置到任意一个分区上
Key-ordering 策略
Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等。一旦消息被定义了 Key,就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的。
假设有一个服务需要监听某个公众号用户关注取关的事件,发送的消息必须要保证有序性,不然会导致结果混乱。如果给 Kafka 主题只设置 1 个分区,这样所有的消息都只在这一个分区内读写,因此保证了全局的顺序性。
这样做虽然实现了因果关系的顺序性,但也丧失了 Kafka 多分区带来的高吞吐量和负载均衡的优势。
可以在消息体中封装了固定的标志位,并对此标志位设定专门的分区策略,保证同一标志位的所有消息都发送到同一分区,这样既可以保证分区内的消息顺序,也可以享受到多分区带来的性能红利。
4 消费者 4.1 消费组(Consumer Group)消费组是 kafka 提供的可扩展且具有容错性的消费者机制,是 Kafka 实现单播和广播两种消息模型的手段。
多个从同一个主题消费的消费者可以加入到一个消费组中,消费组中的消费者共享 Group Id。组内的所有消费者协调在一起来消费订阅主题的所有分区,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。
4.2 消费消息Consumer 采用 pull 模式从 broker 中读取数据,可以自主控制消费方式,逐条消费或批量消费。
4.2.1 位移提交Consumer 需要向 Kafka 记录自己的位移数据,这个汇报过程称为 提交位移(Committing Offsets)。这个过程非常灵活,可以提交任何位移值,但也会由此产生系列不好的结果。假设 Consumer 消费了 10 条消息,提交的位移值却是 20,那么位移介于 11~19 之间的消息是有可能丢失的;相反地,如果提交的位移值是 5,那么位移介于 5~9 之间的消息就有可能被重复消费。
自动提交
1)开启自动提交: enable.auto.commit=true,默认为 true
2)配置自动提交间隔: auto.commit.interval.ms ,默认 5s
自动提交会导致消息被重复消费
- Consumer 每 5s 提交 offset
- 假设提交 offset 后的 3s 发生了 Rebalance
- Rebalance 之后的所有 Consumer 从上一次提交的 offset 处继续消费
- 因此 Rebalance 发生前 3s 的消息会被重复消费
虽然能通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。
手动同步提交
使用 KafkaConsumer#commitSync(),会提交 KafkaConsumer#poll() 返回的最新 offset。
该方法为同步操作,等待直到 offset 被成功提交才返回。
while (true) {
ConsumerRecordsrecords =
consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息
try {
// Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果
consumer.commitSync();
} catch (CommitFailedException e) {
// 处理提交失败异常
handle(e);
}
}
手动异步提交
while (true) {
ConsumerRecordsrecords =
consumer.poll(Duration.ofSeconds(1));
// 处理消息
process(records);
// 会立即返回结果,不会阻塞
consumer.commitAsync((offsets, exception) ->{
if (exception != null)
handle(exception);
});
}
但 commitAsync 不能替代 commitSync,因为出现问题时它不会自动重试。由于是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。
手动同步提交与异步提交结合
try {
while (true) {
ConsumerRecordsrecords =
consumer.poll(Duration.ofSeconds(1));
// 处理消息
process(records);
// 使用异步提交规避阻塞
commitAysnc();
}
} catch (Exception e) {
// 处理异常
handle(e);
} finally {
try {
// Consumer 要关闭前使用同步阻塞式提交,以确保 Consumer 关闭前能够保存正确的位移数据
consumer.commitSync();
} finally {
consumer.close();
}
}
4.2.2 位移管理Kafka默认定期自动提交位移( enable.auto.commit = true ),也手动提交位移。另外kafka会定期把group消费情况保存起来,做成一个offset map
位移管理机制将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 主题中。
5 异常处理如何保证消息不被重复消费?
如何保证消息消费的幂等性?
如何防止消息丢失?
如何处理消息积压?
消费慢了怎么处理?
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
当前文章:学到羊之Kafka-创新互联
文章地址:http://cdiso.cn/article/dscego.html