Kafka学习笔记
基础知识
Kafka的数据单元称为消息,消息由字节数组组成。消息可以有一个可选的元数据,也就是键,键也是一个字节数组,键用来决定消息最后存储在主题的哪个分区中。
消息按照主题进行分类,一个主题可以细分为若干个分区,一个分区就是一个提交日志,消息以追加的方式写入到分区尾部,然后以先入先出的方式被读取。分区分布在不同的机器上,从而提供比单机更强的性能。
由于一个主题有多个分区,因此不能保证整个主题消息的顺序,但是可以保证单个分区中消息的顺序。
消费者是消费者群组的一部分。主题的分区会被分配给群组中的某个消费者,同一个分区仅被一个消费者所使用。如果群组中的某个消费者下线,它负责的分区会被重新分配给群组中的其它消费者。一个主题可以同时被多个消费者群组消费。
一个独立的kafka服务器被称为broker。broker提供读写能力,单个broker可以轻松处理数千个分区以及每秒百万级的消息量。
一个集群是由多个broker组成。其中一个broker作为控制器,负责管理工作,包括分配分区、监控broker。
一个分区会被分配以副本的形式分配到多台broker上,其中仅一个副本作为分区的master,所有写操作都会经由master分发给其它的副本。如果master失效,则会由其它副本接管。
一个主题下的分区数可以增加,但是不允许减少。并且如果分区数增加,是不会做数据迁移,因此老数据保留在老分区中,而新的数据会被重新散列到不同的分区中,这时候不能保证相同键的消息都处于同一个分区中。
生产者
ProducerRecord
对应一个消息,其中包含主题和发送的内容。我们还可以额外指定键和分区。
序列化器负责把其中的键和值对象进行序列化,方便在网络中传输。
接下来,分区器负责为消息选择分区。如果ProducerRecord
中我们指定了分区,则按照这个分区投递,否则按照其中的键进行分区。
之后记录被添加到一个记录批次中,批次中的所有消息都会被发送到相同的主题和分区上。有一个独立的线程负责将这些记录发送到相应的broker上。
服务器在收到消息后会返回一个响应,如果是成功写入,则返回一个RecordMetaData
对象,它包含主题和分区信息,以及记录在分区中的偏移量。如果发送失败,则会收到一个错误。根据错误类型可以分为两类,第一类是临时性错误,比如说连接错误,无主错误,这时候生产者在收到错误之后会尝试重新发送消息,如果几次重试都是失败,就返回错误信息。还有一类错误是非临时性错误,比如消息体太大了,对于这种错误,kafka不会进行重试。
生产者配置
bootstrap.servers
:指定broker地址,不需要指定全部的broker,生产者会自动从连接的broker得到集群中所有broker的信息。key.serializer
:键的序列化器类型value.serializer
:值的序列化器类型acks
:指定必须由多少个分区副本收到消息,生产者才会认为消息写入是成功的。其有如下选项。0
:生产者发送消息后不等待服务器响应1
:分区的master节点必须收到消息all
:所有分区同步副本收到消息
buffer.memory
:生产者内存缓存区大小,用于存储批次数据。如果应用程序发送消息的速度超过发送到服务器的速度,会导致缓存区空间不足。接下来的send
方法调用会阻塞等待。compression.type
:指定压缩算法。snappy
:不错的压缩比,以及很高的性能gzip
:更高的压缩比,性能较差lz4
retries
:对于临时性错误,最大重发次数。batch.size
:一个批次的大小。并非只有满批次才会被发送,一个只有一条消息的批次也有可能会被发送。linger.ms
:一个批次在创建后的超时时间,一个批次会在达到超时时间或占满了的情况下被发送。client.id
:消息来源。max.in.flight.requests.per.connection
:生产者在收到服务器响应之前可以发送多少个消息。(同时发送的消息不一定能保证顺序,因此如果强制要求有序,必须将这个值设为1,但是会严重影响性能)timeout.ms
:broker 等待同步副本返回消息确认的时间request.timeout.ms
:了生产者在发送数据时等待服务器返回响应的时间metadata.fetch.timeout.ms
:生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间max.block.ms
:当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms
时,生产者会抛出超时异常。max.request.size
:单个请求里所有消息总的大小,同时也是能发送的单个消息的最大值。receive.buffer.bytes
和send.buffer.bytes
:TCP socket 接收和发送数据包的缓冲区大小,-1表示使用操作系统的默认值。
消费者
Kafka中一个消费群组订阅同一批主题,主题下的分区会分配给群组中的消费者,每个分区都正好分配给一个消费者,一个消费者可能分配到多个分区(如果消费者数目多于分区数,则会有部分消费者闲置)。同一个主题下可以同时有多个消费者群组。
一个消费者群组可以通过正则表达式同时订阅多个主题,如果有新的主题被创建,会与正则表达式比较,如果匹配则也会被这个群组所订阅。
在消费者加入退出,或者主题加入新的分区的情况下,会发生分区重分配,这个过程称为再平衡。
消费者需要向群组协调器的broker(不同的群组可能有不同的协调器)发送心跳来维持它们和群组的从属关系,以及它们对分区的所有权。如果协调器长时间没有收到某个消费者发送的心跳,那么会认为这个消费者已经下线。
当消费者加入群组的时候,它会向群组协调器发送一个JoinGroup请求。第一个加入群组的消费者成为群主。群主从协调器那里获得群组中的活跃成员列表,并负责为每一个消费者分配分区。它使用一个实现了PartitionAssigner
接口的类来决定哪些分区属于哪些消费者。分配完成后,群主把分配列表发送给群组协调器,协调器再把分配信息分发给群组中的所有消费者。每个消费者只能看到自己的分配信息。这个过程在每次再平衡的时候发生。
消费者配置
fetch.min.bytes
:该属性指定了消费者从服务器获取记录的最小字节数。fetch.max.wait.ms
:指定broker在记录不足时的等待时间,默认是 500ms。max.partition.fetch.bytes
:服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB。max.partition.fetch.bytes
的值必须比broker能够接收的最大消息的字节数(通过max.message.size
属性配置)大,否则消费者可能无法读取这些消息。session.timeout.ms
:消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。heartbeat.interval.ms
:心跳的频率。auto.offset.reset
:消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。默认值是 latest,意思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。另一个值是 earliest,意思是说,在偏移量无效的情况下,消费者将从起始位置读取分区的记录。enable.auto.commit
:消费者是否自动提交偏移量,默认值是true。为了尽量避免出现重复数据和数据丢失,可以把它设为 false,由自己控制何时提交偏移量。如果把它设为 true,还可以通过配置auto.commit.interval.ms
属性来控制提交的频率。partition.assignment.strategy
:分配策略。Range
:该策略会把主题的若干个连续的分区分配给消费者。默认策略。RoundRobin
:把主题的所有分区通过轮询的方式分配给消费者(持有最多分区的消费者分区数比最少分区最多多1)。
client.id
:客户端标识符max.poll.records
:单次调用 call() 方法能够返回的记录数量。receive.buffer.bytes
和send.buffer.bytes
:socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为 -1,就使用操作系统的默认值。
偏移量
消费者通过分区偏移量区分分区中哪些消息已经被消费,哪些消息未被消费。在发生再平衡的时候一些消费者掌管之前不属于它的分区,这时候就可以通过偏移量从上一条未被消费的消息开始读取。
消费者需要负责提交偏移量。其实现原理是消费者向一个叫做_consumer_offset
的特殊主题发送消息,消息里面包含每个分区的偏移量。
如果在消息处理完后才提交偏移量,那么可能会导致消息被重复消费。而如果在消息处理前提交偏移量,那么可能会导致消息丢失。
如果启用了自动提交偏移量(enable.auto.commit
),那么每经过auto.commit.interval.ms
时间,消费者会把poll
方法接收到的最大偏移量提交上去。
一般我们都是接受消息被重复消费,而不接受消息丢失。因此会选择较大的auto.commit.interval.ms
或者手动通过commitSync
或commitAsync
方法提交偏移量,前者是同步版本,后者是异步的(如果使用重试的话,可能会出现较大偏移量先被提交,而较小偏移量后被提交,导致重复消费)。一般较好的方法是使用commitAsync
提交偏移量但是不重试,在消费者关闭之前使用commitSync
同步提交最终的偏移量并不断重试直到成功。
你也可以手动设置分区的偏移量,比如你手动将偏移量保存在了数据库(这样就能保证消息的偏移量和数据库事务同时被提交),那么可以使用seek
方法手动设置偏移量。
再均衡监听
消费者在关闭和再平衡之前,需要做一些清理操作。比如关闭必要的文件、连接,清理缓冲区,提交最新的偏移量等。
我们可以在通过subscribe
订阅主题的时候,提供一个ConsumerRebalanceListener
实例监听再均衡事件。
优雅关闭
要优雅关闭消费者,我们可以通过另外一个线程(比如JVM的关闭钩子线程)调用consumer.wakeup()
方法,它会打断当前进行中的poll
并抛出异常或在下一次poll
发生的时候抛出WakeupException
。
主循环应该负责提交偏移量,并调用consumer.close()
通知群组协调器自己要离开群组。
Kafka
集群
Kafka利用zookeeper维护集群成员的信息,每个broker都有一个唯一的标识符,这个标识符可以在配置文件中指定,也可以自动生成。
在broker启动的时候,它通过在注册路径/broker/ids
下创建临时节点把自己的标识符注册到zookeeper。kafka组件还会订阅zookeeper的注册路径,当有broker加入或退出集群时,这些组件就能获得通知。
控制器
控制器是一个特殊的broker,除了具备一般broker的功能外,还负责分区首领的选举。集群里的broker会通过创建/controller
的临时节点让自己成为控制器,只有一个broker能创建成功,这个broker会成为控制器,其余broker会通过watcher监控临时节点的状态。
如果控制器与zookeeper断开连接,那么/controller
节点会被自动删除,其余节点会尝试创建/controller
,当然只有一个broker成功。成为控制器的broker通过zookeeper的条件递增操作获取一个更大的controller epoch。所有控制器发出的命令都带上epoch,所有broker只会接受最新epoch的控制器的命令,而忽略较小的epoch(防止脑裂)。
当控制器发现一个broker已经离开了集群,需要为那些失去master的分区重新选主。控制器遍历这些分区,并确定谁应该成为新的master,让后向负责这些分区副本的所有broker广播任命消息。随后,新master负责处理来自生产者和消费者的请求,而其余副本broker需要从master复制消息。
如果一个broker加入集群,控制器会利用broker的标识符来检查它的分区副本信息,之后控制器把任命消息发送给新的broker。
复制
Kafka使用主题来组织数据,每个主题分成多个分区,每个分区有多个副本,这些副本保存在对应的broker上。
副本分成两类:
- master:每个分区有唯一的一个master,为了保证一致性,所有生产者和消费者的请求都会经过这个副本。
- follower:其余所有的副本都是follower,follower副本不处理来自客户端的请求,它们唯一的任务就是从master那里复制消息,保持与master一致的状态。如果master下线,那么控制器会选择一个follower,把他晋升为master。
master需要了解哪些副本和自己的状态一致。follower会在有新消息到达的时候尝试从master那里复制消息。但是有各种原因会导致同步失败,比如网络问题,broker崩溃等。
follower向master发送获取数据的请求,其中带上复制偏移量。而master需要将复制偏移量开始的数据返回给follower。通过查看follower请求数据时带上的赋值偏移量,master就能知道每个follower是否和自己完全同步。
只有完全同步的follower才能竞选master。
考虑到一个broker可能同时负责多个分区的master,而master会处理该分区的所有客户端请求。因此为了均衡broker的负载,除了master以外,每个分区还会有一个首选master,实际上是创建主题时选定的master。之所以叫做首选master,是因为在创建分区的时候,需要在broker之间均衡master。因此我们希望所有首选master同时是实际的master的时候,broker之间的负载能得到均衡。默认情况下,kafka的auto.leader.rebalance.enable
会被设置为true
,它会检查首选master是否是master,如果不是,但是首选master是同步副本,那么会触发master选举,优先让首选master成为新的master。
请求处理
master需要负责来自客户端和其余follower的复制请求。master通过请求到大的顺序来处理它们,这种顺序保证了存储的消息是有序的。
broker会在它监听的每个端口都运行一个Acceptor线程,这个线程负责与客户端建立连接,并交给Processor线程处理。Processor线程的数量是可以配置的,它负责从客户端连接获取请求消息,并加入请求队列,然后从响应队列获取响应消息,把它发送给客户端。
IO线程负责消费请求队列,并将结果放入响应队列。
所有请求必须发送给master,如果发送给了follower,那么客户端会收到“非分区master”的错误信息。kafka客户端需要自己负责把请求发送到master上。
而客户端如何得知master是哪个broker呢。客户端可以向任意一个broker请求主题的元数据,其中包含分区信息,以及每个分区副本的信息,分区的master信息。出于性能考虑,客户端需要把这些元数据缓存起来,并且在broker不能正常响应的时候重新请求元数据。
写请求
对于生产者请求,master收到请求后会对请求做校验:
- 发送数据的用户是否有主题写入权限
- 请求里的acks是否有效
- 如果acks=all,那么是否所有副本都在线
之后消息写入到本地磁盘,不保证刷盘时间,因此broker崩溃后可能会导致消息丢失。kafka通过复制机制来保证数据的持久性和高可用。
消息写入完成后,如果acks为0或1,那么master会立即返回响应。如果acks为all,那么请求会保存在一个叫做炼狱的缓冲区中,直到maste发现所有的follower都复制了消息,响应才会返回给客户端。
读请求
客户端发送请求,向broker请求某个分区从特定偏移量开始的消息。客户端还可以指定broker从一个分区最多能返回的消息总大小的上限,这个限制很重要,因为客户端需要为broker返回的数据分配足够的内存。如果没有这个限制,broker返回的数据可能会最终耗尽客户端的内存。
broker收到请求后,需要检查请求是否有效。比如指定的偏移量是否存在,如果数据已经被删除或偏移量不存在,broker需要返回一个错误。
如果偏移量存在,broker会按照客户端指定的大小上限从分区中读取消息,再把消息返回给客户端。这里kafka使用了零拷贝技术,数据拷贝的过程仅用到了DMA,而不涉及CPU运算。
客户端同时还能设置一次性读取消息大小的下限,broker会等待有足够消息的情况下才会返回,这样可以减少网络往返次数。当然为了避免消息处理过大的延迟,会设置一个最大等待时间。
比较特殊的是master只会返回那些已经被完全同步到所有副本的消息,而未被完全同步的消息是不会被返回的。
物理存储
分区是kafka中的基本存储单位,无法继续细分下去。因此分区的大小受限于单个挂载点的可用空间大小。
在配置kafka的时候,管理员需要指定log.dirs
,表示存储分区的目录的目录清单。
分配副本
在创建主题的时候,kafka会首先决定如何在broker之间分配分区。假设你有6个broker,打算创建一个包含10个分区的主题,复制系数为3,那么这个主题下总共有30个分区副本。kafka会按照下面原则分配分区:
- broker会均分分区副本,即每个broker会分到5个副本。
- 确保同个分区的副本处于不同的broker上。
- 如果为broker指定了机架信息,那么每个分区的副本会尽可能分配到不同机架的broker上。
分配算法分两种情况:
- 没有为broker指定机架信息:这时候会将所有分区副本排序,首选master副本排在前面。之后选择一个随机数$x$,编号为$i$的副本会被分配给编号为$i+x\pmod 6$的broker。这时候不仅能保证副本的均分,还能保证首选master副本的均分。
- 如果为broker配置了机架信息:这时候会先将所有分区副本排序,其中相同分区的副本排在一起。设共有$k$个机架,之后假设$c_i$表示编号为$i$的机架的broker数。那么编号为$i$的副本会被分配给编号为$i\pmod k$的机架上的编号为$\lfloor i/k \rfloor$的broker。这能保证在复制系数大于$1$的情况下,每个分区的副本都不会仅落在一个机架上。
分配目录
在为每个副本选择broker后,每个broker需要为这些副本分配存储目录。分配的规则就是总是选择包含最少副本的目录。(注意是按照副本数来选择,而不是副本大小)
片段
考虑到我们会淘汰分区中过期的数据,而在大文件中查找和删除消息比较费时,因此分区还会进一步切分为片段,片段中保存的分区某个特定时间区间中的所有数据。默认情况下,一个片段包含1GB或一周的数据。当向片段写入数据,且已达到片段上限,那么会关闭当前片段,并新建下一个片段。
当前正在写入数据的片段称为活跃片段,活跃片段不会被淘汰,过期时间是从片段边的非活跃(切换到新的活跃片段)开始计时的。
索引
消费者可以从任意偏移量开始读取消息。为了确定第$k$条消息在哪个片段的哪个位置,kafka会为每个分区维护一个索引。索引也会被分成片段。
Kafka不会维护索引的校验和,如果索引出现损坏,Kafka会重建索引。
清理
一般情况下,kafka会选择删除过期消息。不过如果你利用kafka存储多个应用的实时状态(比如群组的偏移量),那么你更加希望删除应用各自的旧数据(应用最新提交之前的所有数据)。这种情况下,Kafka支持compact策略来保留相同键的最新消息,而之前的消息会被清除。当然如果存在null键,就会导致清理失败。
生产运维
同步副本
分区master为同步副本,而对于follower,它需要满足下面所有条件才能被称为同步副本:
- 与Zookeeper之间有活跃会话
- 在过去一段时间(默认10s)从master获取过消息
- 在过去一段时间(默认10s)从master获取过最新消息
同步副本数的增加可以保证数据不容易丢失,但是单个较慢的同步副本可能会拖慢整个分区的速度。
配置
kafka的配置参数有两类:
- broker级别:应用于所有的主题
- 主题级别:应用于特定的主题
下面是一些配置参数:
replication.factor
:主题级别复制系数default.replication.factor
:broker级别复制系数,默认值为3broker.rack
:broker所在机架名称unclean.leader.election
:是否允许在所有存活副本都不同步的情况下,选择不同步的副本作为master,默认值为true(可能会导致消息丢失)。min.insync.replicas
:最少同步副本。如果向master副本写消息的时候发现当前同步副本数少于阈值,就会直接返回失败。默认值为1。
流式处理
基础定义
数据流是无边界数据的抽象表示。数据流具有下述性质:
- 数据流是有序的。
- 数据不允许修改。
- 数据流是可以重播的。
流式处理是指实时地处理一个或多个事件流。流式处理是一种编程范式,常见的编程范式有:
- 请求响应:延迟最小的模型,响应时间在毫秒级,一般阻塞式,应用程序向服务器发起请求然后等待响应。
- 批处理:高延迟高吞吐。处理系统定时执行任务,读取所有可用数据并处理。完成后等待下一次被执行。适合生成每日报表之类的。
- 流式处理:延迟和吞吐在请求响应和批处理之间。
时间
在流处理中时间有下面几种:
- 事件时间:事件的发生时间
- 日志追加时间:broker收到消息的时间。
- 处理时间:应用程序处理消息的时间
状态
在做聚合操作的时候,应用程序需要用变量来维护状态。但是应用程序重启会导致状态丢失。所以应用程序需要负责把状态进行持久化,并在重启后恢复。
状态有如下分类:
- 本地状态:这种状态只能被应用程序自身所访问,一般通过内嵌的数据库进行维护管理。本地状态的优点是速度,不足是受到内存大小的限制。所以可能需要把数据流拆分成多个子流,从而减少状态的大小。
- 外部状态:状态由外部存储服务维护,一般使用NoSQL系统。优点是没有大小限制,可以被多个应用程序所共享。缺点是使用外部存储服务会带来更大的延迟和复杂性。
状态类似于数据库中的记录,而数据流则代表会记录的变更,因此状态实际上就是数据流聚合后的结果。状态关注的是最终的结果,而数据流关注的是发生的变化。
时间窗口
大部分时候我们关注的是最近一段时间的数据,比如一周内的销量最高的商品,一月内的支出。
时间窗口有下面性质:
- 窗口的大小:考虑的时间段
- 窗口移动的频率:窗口每次移动的幅度
- 窗口的可更新时间:如果收到很早以前事件,是修改以前的状态还是就丢弃。一般方案是如果事件发生在可更新时间范围内,就修改,否则丢弃。
设计模式
map-filter模式
参考资料
- 《Kafka权威指南》