Kafka学习笔记

Published: by Creative Commons Licence

  • Tags:

基础知识

Kafka的数据单元称为消息,消息由字节数组组成。消息可以有一个可选的元数据,也就是键,键也是一个字节数组,键用来决定消息最后存储在主题的哪个分区中。

消息按照主题进行分类,一个主题可以细分为若干个分区,一个分区就是一个提交日志,消息以追加的方式写入到分区尾部,然后以先入先出的方式被读取。分区分布在不同的机器上,从而提供比单机更强的性能。

由于一个主题有多个分区,因此不能保证整个主题消息的顺序,但是可以保证单个分区中消息的顺序。

消费者是消费者群组的一部分。主题的分区会被分配给群组中的某个消费者,同一个分区仅被一个消费者所使用。如果群组中的某个消费者下线,它负责的分区会被重新分配给群组中的其它消费者。一个主题可以同时被多个消费者群组消费。

一个独立的kafka服务器被称为broker。broker提供读写能力,单个broker可以轻松处理数千个分区以及每秒百万级的消息量。

一个集群是由多个broker组成。其中一个broker作为控制器,负责管理工作,包括分配分区、监控broker。

一个分区会被分配以副本的形式分配到多台broker上,其中仅一个副本作为分区的master,所有写操作都会经由master分发给其它的副本。如果master失效,则会由其它副本接管。

一个主题下的分区数可以增加,但是不允许减少。并且如果分区数增加,是不会做数据迁移,因此老数据保留在老分区中,而新的数据会被重新散列到不同的分区中,这时候不能保证相同键的消息都处于同一个分区中。

生产者

https://raw.githubusercontent.com/taodaling/taodaling.github.io/master/assets/images/2021-07-01-kafka/produce_message.PNG

ProducerRecord对应一个消息,其中包含主题和发送的内容。我们还可以额外指定键和分区。

序列化器负责把其中的键和值对象进行序列化,方便在网络中传输。

接下来,分区器负责为消息选择分区。如果ProducerRecord中我们指定了分区,则按照这个分区投递,否则按照其中的键进行分区。

之后记录被添加到一个记录批次中,批次中的所有消息都会被发送到相同的主题和分区上。有一个独立的线程负责将这些记录发送到相应的broker上。

服务器在收到消息后会返回一个响应,如果是成功写入,则返回一个RecordMetaData对象,它包含主题和分区信息,以及记录在分区中的偏移量。如果发送失败,则会收到一个错误。根据错误类型可以分为两类,第一类是临时性错误,比如说连接错误,无主错误,这时候生产者在收到错误之后会尝试重新发送消息,如果几次重试都是失败,就返回错误信息。还有一类错误是非临时性错误,比如消息体太大了,对于这种错误,kafka不会进行重试。

生产者配置

  • bootstrap.servers:指定broker地址,不需要指定全部的broker,生产者会自动从连接的broker得到集群中所有broker的信息。
  • key.serializer:键的序列化器类型
  • value.serializer:值的序列化器类型
  • acks:指定必须由多少个分区副本收到消息,生产者才会认为消息写入是成功的。其有如下选项。
    • 0:生产者发送消息后不等待服务器响应
    • 1:分区的master节点必须收到消息
    • all:所有分区同步副本收到消息
  • buffer.memory:生产者内存缓存区大小,用于存储批次数据。如果应用程序发送消息的速度超过发送到服务器的速度,会导致缓存区空间不足。接下来的send方法调用会阻塞等待。
  • compression.type:指定压缩算法。
    • snappy:不错的压缩比,以及很高的性能
    • gzip:更高的压缩比,性能较差
    • lz4
  • retries:对于临时性错误,最大重发次数。
  • batch.size:一个批次的大小。并非只有满批次才会被发送,一个只有一条消息的批次也有可能会被发送。
  • linger.ms:一个批次在创建后的超时时间,一个批次会在达到超时时间或占满了的情况下被发送。
  • client.id:消息来源。
  • max.in.flight.requests.per.connection:生产者在收到服务器响应之前可以发送多少个消息。(同时发送的消息不一定能保证顺序,因此如果强制要求有序,必须将这个值设为1,但是会严重影响性能)
  • timeout.ms:broker 等待同步副本返回消息确认的时间
  • request.timeout.ms:了生产者在发送数据时等待服务器返回响应的时间
  • metadata.fetch.timeout.ms:生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间
  • max.block.ms:当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms时,生产者会抛出超时异常。
  • max.request.size:单个请求里所有消息总的大小,同时也是能发送的单个消息的最大值。
  • receive.buffer.bytessend.buffer.bytes:TCP socket 接收和发送数据包的缓冲区大小,-1表示使用操作系统的默认值。

消费者

Kafka中一个消费群组订阅同一批主题,主题下的分区会分配给群组中的消费者,每个分区都正好分配给一个消费者,一个消费者可能分配到多个分区(如果消费者数目多于分区数,则会有部分消费者闲置)。同一个主题下可以同时有多个消费者群组。

一个消费者群组可以通过正则表达式同时订阅多个主题,如果有新的主题被创建,会与正则表达式比较,如果匹配则也会被这个群组所订阅。

在消费者加入退出,或者主题加入新的分区的情况下,会发生分区重分配,这个过程称为再平衡。

消费者需要向群组协调器的broker(不同的群组可能有不同的协调器)发送心跳来维持它们和群组的从属关系,以及它们对分区的所有权。如果协调器长时间没有收到某个消费者发送的心跳,那么会认为这个消费者已经下线。

当消费者加入群组的时候,它会向群组协调器发送一个JoinGroup请求。第一个加入群组的消费者成为群主。群主从协调器那里获得群组中的活跃成员列表,并负责为每一个消费者分配分区。它使用一个实现了PartitionAssigner接口的类来决定哪些分区属于哪些消费者。分配完成后,群主把分配列表发送给群组协调器,协调器再把分配信息分发给群组中的所有消费者。每个消费者只能看到自己的分配信息。这个过程在每次再平衡的时候发生。

消费者配置

  • fetch.min.bytes:该属性指定了消费者从服务器获取记录的最小字节数。
  • fetch.max.wait.ms:指定broker在记录不足时的等待时间,默认是 500ms。
  • max.partition.fetch.bytes:服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB。max.partition.fetch.bytes的值必须比broker能够接收的最大消息的字节数(通过max.message.size属性配置)大,否则消费者可能无法读取这些消息。
  • session.timeout.ms:消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。
  • heartbeat.interval.ms:心跳的频率。
  • auto.offset.reset:消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。默认值是 latest,意思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。另一个值是 earliest,意思是说,在偏移量无效的情况下,消费者将从起始位置读取分区的记录。
  • enable.auto.commit:消费者是否自动提交偏移量,默认值是true。为了尽量避免出现重复数据和数据丢失,可以把它设为 false,由自己控制何时提交偏移量。如果把它设为 true,还可以通过配置auto.commit.interval.ms属性来控制提交的频率。
  • partition.assignment.strategy:分配策略。
    • Range:该策略会把主题的若干个连续的分区分配给消费者。默认策略。
    • RoundRobin:把主题的所有分区通过轮询的方式分配给消费者(持有最多分区的消费者分区数比最少分区最多多1)。
  • client.id:客户端标识符
  • max.poll.records:单次调用 call() 方法能够返回的记录数量。
  • receive.buffer.bytessend.buffer.bytes:socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为 -1,就使用操作系统的默认值。

偏移量

消费者通过分区偏移量区分分区中哪些消息已经被消费,哪些消息未被消费。在发生再平衡的时候一些消费者掌管之前不属于它的分区,这时候就可以通过偏移量从上一条未被消费的消息开始读取。

消费者需要负责提交偏移量。其实现原理是消费者向一个叫做_consumer_offset的特殊主题发送消息,消息里面包含每个分区的偏移量。

如果在消息处理完后才提交偏移量,那么可能会导致消息被重复消费。而如果在消息处理前提交偏移量,那么可能会导致消息丢失。

如果启用了自动提交偏移量(enable.auto.commit),那么每经过auto.commit.interval.ms时间,消费者会把poll方法接收到的最大偏移量提交上去。

一般我们都是接受消息被重复消费,而不接受消息丢失。因此会选择较大的auto.commit.interval.ms或者手动通过commitSynccommitAsync方法提交偏移量,前者是同步版本,后者是异步的(如果使用重试的话,可能会出现较大偏移量先被提交,而较小偏移量后被提交,导致重复消费)。一般较好的方法是使用commitAsync提交偏移量但是不重试,在消费者关闭之前使用commitSync同步提交最终的偏移量并不断重试直到成功。

你也可以手动设置分区的偏移量,比如你手动将偏移量保存在了数据库(这样就能保证消息的偏移量和数据库事务同时被提交),那么可以使用seek方法手动设置偏移量。

再均衡监听

消费者在关闭和再平衡之前,需要做一些清理操作。比如关闭必要的文件、连接,清理缓冲区,提交最新的偏移量等。

我们可以在通过subscribe订阅主题的时候,提供一个ConsumerRebalanceListener实例监听再均衡事件。

优雅关闭

要优雅关闭消费者,我们可以通过另外一个线程(比如JVM的关闭钩子线程)调用consumer.wakeup()方法,它会打断当前进行中的poll并抛出异常或在下一次poll发生的时候抛出WakeupException

主循环应该负责提交偏移量,并调用consumer.close()通知群组协调器自己要离开群组。

Kafka

集群

Kafka利用zookeeper维护集群成员的信息,每个broker都有一个唯一的标识符,这个标识符可以在配置文件中指定,也可以自动生成。

在broker启动的时候,它通过在注册路径/broker/ids下创建临时节点把自己的标识符注册到zookeeper。kafka组件还会订阅zookeeper的注册路径,当有broker加入或退出集群时,这些组件就能获得通知。

控制器

控制器是一个特殊的broker,除了具备一般broker的功能外,还负责分区首领的选举。集群里的broker会通过创建/controller的临时节点让自己成为控制器,只有一个broker能创建成功,这个broker会成为控制器,其余broker会通过watcher监控临时节点的状态。

如果控制器与zookeeper断开连接,那么/controller节点会被自动删除,其余节点会尝试创建/controller,当然只有一个broker成功。成为控制器的broker通过zookeeper的条件递增操作获取一个更大的controller epoch。所有控制器发出的命令都带上epoch,所有broker只会接受最新epoch的控制器的命令,而忽略较小的epoch(防止脑裂)。

当控制器发现一个broker已经离开了集群,需要为那些失去master的分区重新选主。控制器遍历这些分区,并确定谁应该成为新的master,让后向负责这些分区副本的所有broker广播任命消息。随后,新master负责处理来自生产者和消费者的请求,而其余副本broker需要从master复制消息。

如果一个broker加入集群,控制器会利用broker的标识符来检查它的分区副本信息,之后控制器把任命消息发送给新的broker。

复制

Kafka使用主题来组织数据,每个主题分成多个分区,每个分区有多个副本,这些副本保存在对应的broker上。

副本分成两类:

  • master:每个分区有唯一的一个master,为了保证一致性,所有生产者和消费者的请求都会经过这个副本。
  • follower:其余所有的副本都是follower,follower副本不处理来自客户端的请求,它们唯一的任务就是从master那里复制消息,保持与master一致的状态。如果master下线,那么控制器会选择一个follower,把他晋升为master。

master需要了解哪些副本和自己的状态一致。follower会在有新消息到达的时候尝试从master那里复制消息。但是有各种原因会导致同步失败,比如网络问题,broker崩溃等。

follower向master发送获取数据的请求,其中带上复制偏移量。而master需要将复制偏移量开始的数据返回给follower。通过查看follower请求数据时带上的赋值偏移量,master就能知道每个follower是否和自己完全同步。

只有完全同步的follower才能竞选master。

考虑到一个broker可能同时负责多个分区的master,而master会处理该分区的所有客户端请求。因此为了均衡broker的负载,除了master以外,每个分区还会有一个首选master,实际上是创建主题时选定的master。之所以叫做首选master,是因为在创建分区的时候,需要在broker之间均衡master。因此我们希望所有首选master同时是实际的master的时候,broker之间的负载能得到均衡。默认情况下,kafka的auto.leader.rebalance.enable会被设置为true,它会检查首选master是否是master,如果不是,但是首选master是同步副本,那么会触发master选举,优先让首选master成为新的master。

请求处理

master需要负责来自客户端和其余follower的复制请求。master通过请求到大的顺序来处理它们,这种顺序保证了存储的消息是有序的。

broker会在它监听的每个端口都运行一个Acceptor线程,这个线程负责与客户端建立连接,并交给Processor线程处理。Processor线程的数量是可以配置的,它负责从客户端连接获取请求消息,并加入请求队列,然后从响应队列获取响应消息,把它发送给客户端。

IO线程负责消费请求队列,并将结果放入响应队列。

所有请求必须发送给master,如果发送给了follower,那么客户端会收到“非分区master”的错误信息。kafka客户端需要自己负责把请求发送到master上。

而客户端如何得知master是哪个broker呢。客户端可以向任意一个broker请求主题的元数据,其中包含分区信息,以及每个分区副本的信息,分区的master信息。出于性能考虑,客户端需要把这些元数据缓存起来,并且在broker不能正常响应的时候重新请求元数据。

写请求

对于生产者请求,master收到请求后会对请求做校验:

  • 发送数据的用户是否有主题写入权限
  • 请求里的acks是否有效
  • 如果acks=all,那么是否所有副本都在线

之后消息写入到本地磁盘,不保证刷盘时间,因此broker崩溃后可能会导致消息丢失。kafka通过复制机制来保证数据的持久性和高可用。

消息写入完成后,如果acks为0或1,那么master会立即返回响应。如果acks为all,那么请求会保存在一个叫做炼狱的缓冲区中,直到maste发现所有的follower都复制了消息,响应才会返回给客户端。

读请求

客户端发送请求,向broker请求某个分区从特定偏移量开始的消息。客户端还可以指定broker从一个分区最多能返回的消息总大小的上限,这个限制很重要,因为客户端需要为broker返回的数据分配足够的内存。如果没有这个限制,broker返回的数据可能会最终耗尽客户端的内存。

broker收到请求后,需要检查请求是否有效。比如指定的偏移量是否存在,如果数据已经被删除或偏移量不存在,broker需要返回一个错误。

如果偏移量存在,broker会按照客户端指定的大小上限从分区中读取消息,再把消息返回给客户端。这里kafka使用了零拷贝技术,数据拷贝的过程仅用到了DMA,而不涉及CPU运算。

客户端同时还能设置一次性读取消息大小的下限,broker会等待有足够消息的情况下才会返回,这样可以减少网络往返次数。当然为了避免消息处理过大的延迟,会设置一个最大等待时间。

比较特殊的是master只会返回那些已经被完全同步到所有副本的消息,而未被完全同步的消息是不会被返回的。

物理存储

分区是kafka中的基本存储单位,无法继续细分下去。因此分区的大小受限于单个挂载点的可用空间大小。

在配置kafka的时候,管理员需要指定log.dirs,表示存储分区的目录的目录清单。

分配副本

在创建主题的时候,kafka会首先决定如何在broker之间分配分区。假设你有6个broker,打算创建一个包含10个分区的主题,复制系数为3,那么这个主题下总共有30个分区副本。kafka会按照下面原则分配分区:

  • broker会均分分区副本,即每个broker会分到5个副本。
  • 确保同个分区的副本处于不同的broker上。
  • 如果为broker指定了机架信息,那么每个分区的副本会尽可能分配到不同机架的broker上。

分配算法分两种情况:

  • 没有为broker指定机架信息:这时候会将所有分区副本排序,首选master副本排在前面。之后选择一个随机数$x$,编号为$i$的副本会被分配给编号为$i+x\pmod 6$的broker。这时候不仅能保证副本的均分,还能保证首选master副本的均分。
  • 如果为broker配置了机架信息:这时候会先将所有分区副本排序,其中相同分区的副本排在一起。设共有$k$个机架,之后假设$c_i$表示编号为$i$的机架的broker数。那么编号为$i$的副本会被分配给编号为$i\pmod k$的机架上的编号为$\lfloor i/k \rfloor$的broker。这能保证在复制系数大于$1$的情况下,每个分区的副本都不会仅落在一个机架上。

分配目录

在为每个副本选择broker后,每个broker需要为这些副本分配存储目录。分配的规则就是总是选择包含最少副本的目录。(注意是按照副本数来选择,而不是副本大小)

片段

考虑到我们会淘汰分区中过期的数据,而在大文件中查找和删除消息比较费时,因此分区还会进一步切分为片段,片段中保存的分区某个特定时间区间中的所有数据。默认情况下,一个片段包含1GB或一周的数据。当向片段写入数据,且已达到片段上限,那么会关闭当前片段,并新建下一个片段。

当前正在写入数据的片段称为活跃片段,活跃片段不会被淘汰,过期时间是从片段边的非活跃(切换到新的活跃片段)开始计时的。

索引

消费者可以从任意偏移量开始读取消息。为了确定第$k$条消息在哪个片段的哪个位置,kafka会为每个分区维护一个索引。索引也会被分成片段。

Kafka不会维护索引的校验和,如果索引出现损坏,Kafka会重建索引。

清理

一般情况下,kafka会选择删除过期消息。不过如果你利用kafka存储多个应用的实时状态(比如群组的偏移量),那么你更加希望删除应用各自的旧数据(应用最新提交之前的所有数据)。这种情况下,Kafka支持compact策略来保留相同键的最新消息,而之前的消息会被清除。当然如果存在null键,就会导致清理失败。

生产运维

同步副本

分区master为同步副本,而对于follower,它需要满足下面所有条件才能被称为同步副本:

  • 与Zookeeper之间有活跃会话
  • 在过去一段时间(默认10s)从master获取过消息
  • 在过去一段时间(默认10s)从master获取过最新消息

同步副本数的增加可以保证数据不容易丢失,但是单个较慢的同步副本可能会拖慢整个分区的速度。

配置

kafka的配置参数有两类:

  • broker级别:应用于所有的主题
  • 主题级别:应用于特定的主题

下面是一些配置参数:

  • replication.factor:主题级别复制系数
  • default.replication.factor:broker级别复制系数,默认值为3
  • broker.rack:broker所在机架名称
  • unclean.leader.election:是否允许在所有存活副本都不同步的情况下,选择不同步的副本作为master,默认值为true(可能会导致消息丢失)。
  • min.insync.replicas:最少同步副本。如果向master副本写消息的时候发现当前同步副本数少于阈值,就会直接返回失败。默认值为1。

流式处理

基础定义

数据流是无边界数据的抽象表示。数据流具有下述性质:

  • 数据流是有序的。
  • 数据不允许修改。
  • 数据流是可以重播的。

流式处理是指实时地处理一个或多个事件流。流式处理是一种编程范式,常见的编程范式有:

  • 请求响应:延迟最小的模型,响应时间在毫秒级,一般阻塞式,应用程序向服务器发起请求然后等待响应。
  • 批处理:高延迟高吞吐。处理系统定时执行任务,读取所有可用数据并处理。完成后等待下一次被执行。适合生成每日报表之类的。
  • 流式处理:延迟和吞吐在请求响应和批处理之间。

时间

在流处理中时间有下面几种:

  • 事件时间:事件的发生时间
  • 日志追加时间:broker收到消息的时间。
  • 处理时间:应用程序处理消息的时间

状态

在做聚合操作的时候,应用程序需要用变量来维护状态。但是应用程序重启会导致状态丢失。所以应用程序需要负责把状态进行持久化,并在重启后恢复。

状态有如下分类:

  • 本地状态:这种状态只能被应用程序自身所访问,一般通过内嵌的数据库进行维护管理。本地状态的优点是速度,不足是受到内存大小的限制。所以可能需要把数据流拆分成多个子流,从而减少状态的大小。
  • 外部状态:状态由外部存储服务维护,一般使用NoSQL系统。优点是没有大小限制,可以被多个应用程序所共享。缺点是使用外部存储服务会带来更大的延迟和复杂性。

状态类似于数据库中的记录,而数据流则代表会记录的变更,因此状态实际上就是数据流聚合后的结果。状态关注的是最终的结果,而数据流关注的是发生的变化。

时间窗口

大部分时候我们关注的是最近一段时间的数据,比如一周内的销量最高的商品,一月内的支出。

时间窗口有下面性质:

  • 窗口的大小:考虑的时间段
  • 窗口移动的频率:窗口每次移动的幅度
  • 窗口的可更新时间:如果收到很早以前事件,是修改以前的状态还是就丢弃。一般方案是如果事件发生在可更新时间范围内,就修改,否则丢弃。

设计模式

map-filter模式

参考资料

  • 《Kafka权威指南》