dongyl
浅谈 Kafka
1. 数据生产流程
2. 生产者必要的参数配置
bootstrap.servers: 生产者客户端与broker集群建立初始连接需要的broker地址列表
key.serializer : 实现了接口 org.apache.kafka.common.serialization.Serializer的 key 序列化类。
value.serializer : 实现了接口 org.apache.kafka.common.serialization.Serializer的 value 序列化类。
acks: 0=> 生产者不等待broker的任何消息确认。只要将消息放到了 socket的缓冲区,就认为消息已发送。
1=> leader将记录写到它本地日志,就响应客户端确认消息, 而不等待follower副本的确认。
all => leader等待所有同步的副本确认该消息。保证了只要有 一个同步副本存在,消息就不会丢失。
compression.type: 生产者生成数据的压缩格式。默认是none(没有压缩)。允许的 值:none,gzip,snappy和lz4。
retries: 设置该属性为一个大于1的值,将在消息发送失败的时候重新发送消 息。
3. ISR 和 OSR
在所有副本中,只有leader 负责读写,follower 只负责从 leader 复制,保持同步。
ISR: in-sync
leader 会维护一个基本保持同步的副本 列表
OSR: out-sync
与leader同步滞后的副本
4. 消费者必要的参数配置
bootstrap.servers: 向Kafka集群建立初始连接用到的host/port列表
key.deserializer: key的反序列化类,该类需要实现 org.apache.kafka.common.serialization.Deserializer 接口。
value.deserializer: 实现了 org.apache.kafka.common .serialization.Deserializer 接口的反序列化器,用于对消息的value进行反序列化。
group.id: 用于唯一标志当前消费者所属的消费组的字符串。
auto.offset.reset: 当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被
删除了),该如何处理?
earliest:自动重置偏移量到最早的偏移量
latest:自动重置偏移量为最新的偏移量
none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常
anything:向消费者抛异常
enable.auto.commit: 如果设置为true,消费者会自动周期性地向服务器提交偏移量。
5. 偏移量
概念: 为每个分区记录消费的偏移量 _consumer_offsets
- Consumer需要向Kafka记录自己的位移数据,这个汇报过程称为 提交位移(Committing
Offsets) 在拉取前提交偏移量
- Consumer 需要为分配给它的每个分区提交各自的位移数据
- 位移提交的由Consumer端负责的,Kafka只负责保管。__consumer_offsets
- 位移提交分为自动提交和手动提交
- 位移提交分为同步提交和异步提交
6. leader 选举
Leader副本和Follower副本之间的关系并不是固定不变的,在Leader所在的broker发生故障的时候,就需要进行分区的Leader副本和Follower副本之间的切换,需要选举Leader副本。
动态维护 一个ISR 集合。
Kafka中Leader分区选举,通过维护一个动态变化的ISR集合来实现,一旦Leader分区丢掉,则从
ISR中随机挑选一个副本做新的Leader分区。
如果ISR中的副本都丢失了,则:
- 可以等待ISR中的副本任何一个恢复,接着对外提供服务,需要时间等待。
- 从OSR中选出一个副本做Leader副本,此时会造成数据丢失
7. 分区分配策略
PartitionAssignor 接口用于用户定义实现分区分配算法,以实现Consumer之间的分区分配。
分区数/消费者数 。余下的 给消费者id 最小的。 平均分配!
StickyAssignor : 在有消费者宕机时,将所属的宕机的分区重新分配给现有的消费者。
(根据上次分配的结果再分配、) 粘性分配!
RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽
量均衡的分配。 即 轮询分配!
如果消费组内,消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间分配到的分区数的差值不会超过1)。
如果订阅的Topic列表是不同的,那么分配结果是不保证“尽量均衡”的,因为某些消费者不参与一些Topic的分配。
8. 日志存储
Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响。
每个主题又可以分为一个或多个分区。
每个分区各自存在一个记录消息数据的日志文件。
- 分区日志文件中包含很多的 LogSegment
- Kafka 日志追加是顺序写入的
- LogSegment 可以减小日志文件的大小
- 进行日志删除的时候和数据查找的时候可以快速定位。
- ActiveLogSegment 是活跃的日志分段,拥有文件拥有写入权限,其余的 LogSegment 只有
只读的权限。
日志文件存在多种后缀文件,重点需要关注 .index、.timestamp、.log 三种类型。
偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。时间戳索引文件则根据时间戳查
找对应的偏移量。
index: 偏移量索引文件
timestamp: 时间戳索引文件
log : 日志文件 日志大小默认是 1G
切分文件
当满足如下几个条件中的其中之一,就会触发文件的切分:
- 当前日志分段文件的大小超过了 broker 端参数 log.segment.bytes 配置的值。
log.segment.bytes 参数的默认值为 1073741824,即 1GB。
- 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于 log.roll.ms 或
log.roll.hours 参数配置的值。如果同时配置了 log.roll.ms 和 log.roll.hours 参
数,那么 log.roll.ms 的优先级高。默认情况下,只配置了 log.roll.hours 参数,其值
为168,即 7 天。
- 偏移量索引文件或时间戳索引文件的大小达到 broker 端参数 log.index.size.max.bytes
配置的值。 log.index.size.max.bytes 的默认值为 10485760,即 10MB。
- 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于 Integer.MAX_VALUE ,即要追
加的消息的偏移量不能转变为相对偏移量。
8.1 日志删除
基于时间
日志删除任务会根据 log.retention.hours/log.retention.minutes/log.retention.ms 设
定日志保留的时间节点。如果超过该设定值,就需要进行删除。默认是 7 天, log.retention.ms 优
先级最高。
基于日志大小
日志删除任务会检查当前日志的大小是否超过设定值。设定项为 log.retention.bytes ,单个日
志分段的大小由 log.segment.bytes 进行设定
基于偏移量
根据日志分段的下一个日志分段的起始偏移量是否大于等于日志文件的起始偏移量,若是,则可以
删除此日志分段。
注意:日志文件的起始偏移量并不一定等于第一个日志分段的基准偏移量,存在数据删除,可能与
之相等的那条数据已经被删除了。
9. 速度快? 磁盘存储?
Kafka速度快是因为:
- partition顺序读写,充分利用磁盘特性,这是基础;
- Producer生产的数据持久化到broker,采用mmap文件映射,实现顺序的快速写入;
Customer从broker读取数据,采用sendfile,将磁盘文件读到OS内核缓冲区后,直接转到
socket buffer进行网络发送。