本文章大部分内容均摘自朱忠华老师的《深入理解Kafka:核心设计与实践原理》。以下是重构后的内容:

一、概述

1. 简介

Kafka起初是由LinkedIn公司采用Scala语言开发的一个多分区、多副本且基于Zookeeper协调的分布式消息系统,现已被捐献给Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性被广泛使用。目前越来越多的开源式分布处理系统如:Storm、Spark、Flink等都支持与Kafka集成。

Kafka之所以受到越来越多的青睐,与它所“扮演”的三大角色是分不开的:

- 消息系统:Kafka和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此***Kafka还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。

- 存储系统:Kafka把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失地风险。也正是得益于Kafka的消息持久化功能和多副本机制,我们可以把Kafka作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。

- 流式处理平台:Kafka不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。

2. 使用场景

日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如Hadoop、Hbase、Solr等。

消息系统:解耦生产者和消费者、缓存消息等。

用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。

运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

流式处理:比如Spark Streaming和Storm。

事件源是一种应用程序设计风格,其中状态的改变作为事件序列被记录下来。Kafka对非常大的存储日志数据提供支持,使其成为以此风格构建的应用程序的一种优秀后端。峰值处理可以使关键应用能够顶住访问峰值,不会因超出负荷崩溃。一个典型的Kafka体系架构包括若干Producer、若干broker、若干Consumer以及一个Zookeeper集群。

1. broker是服务代理节点。Kafka集群由多个Kafka实例组成,每个实例(server)称为broker,在集群中每个broker都有一个唯一的brokerid,不能重复。

2. Producer是生产者,也就是发送消息的一方。生产者负责创建消息,然后将其发送到Kafka中。

3. Consumer和Consumer Group(CG)是消费者,也就是接收消息的一方。消费者连接Kafka并接收消息,进而进行相应的业务逻辑处理。consumer group是Kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者,它们共享一个公共的id,即group id。组内的所有消费者协调在一起来消费订阅主题的所有分区。当然,每个分区只能由同一个消费组内的一个消费者来消费。个人认为,理解consumer group记住下面这三个特性就好了:consumer group下可以有一个或多个consumer instance;consumer instance可以是一个进程,也可以是一个线程;group.id是一个字符串,唯一标识一个consumer group;consumer group订阅的topic下的每个分区只能分配给某个group下的一个consumer消费。当然该分区还可以被分配给其他consumer group。

4. Zookeeper负责Kafka集群元数据的管理、控制器的选举等操作。在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。

Kafka 的控制器信息存储在 Zookeeper 的 /controller 节点上,三个业务场景会触发 Controller 选举:集群从零启动、/controller 节点消失、/controller 节点数据发生变化。成功竞选为 Controller 的 broker 会在 ZooKeeper 中创建 /controller 临时节点,执行 get 命令可查看该临时节点的内容:其中 version 在目前版本中固定为1,brokerid 表示称为控制器的 broker 的 id 编号,timestamp 表示竞选称为控制器时的时间戳。

在Kafka中,副本之间的关系遵循“一主多从”的原则,其中leader副本负责处理读写请求,而follower副本仅负责与leader副本同步消息。这些副本分布在不同的broker上,当leader副本出现故障时,可以从follower副本中重新选举出新的leader副本来提供服务。

以一个包含4个broker的Kafka集群为例,某个主题有3个分区,且副本因子(副本数量)也为3。这样,每个分区都有1个leader副本和2个follower副本。生产者和消费者只与leader副本进行交互,而follower副本只负责消息的同步。因此,很多时候follower副本中的消息相对于leader副本来说有一定的滞后。

Kafka中有一些关键参数来描述副本之间的同步状态:

8. AR、ISR、OSR

- AR(Assigned Replicas):是Kafka所有副本的集合。

- ISR(In-Sync Replicas):是指与leader副本保持一定程度同步的所有副本(包括leader副本在内)的集合。消息会先发送到leader副本,然后follower副本才能从leader副本中拉取消息进行同步。在同步期间内,follower副本相对于leader副本而言有一定程度的滞后性,这个滞后的范围可以通过参数来配置。在这个参数范围内的副本为ISR。

- OSR(Out-of-Sync Replicas):是指超出这个参数范围的,即与leader副本同步滞后过多的follower副本组成的集合。

由此可见,AR = ISR + OSR。在正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步,即AR = ISR,OSR集合为空。leader副本负责维护和跟踪ISR集合中所有follower副本的滞后状态。当follower副本落后太多或失效时,leader副本会将其从ISR集合中移除。如果OSR集合中有follower副本“追上”了leader副本,那么leader副本会将其从OSR集合转移至ISR集合。

默认情况下,当leader副本坏掉时,只有ISR集合中的副本才有资格被选举为新的leader,而在OSR集合中的副本则没有任何机会(不过这个原则也可以通过修改相应的参数配置来改变)。

9. HW、LEO

ISR与HW和LEO也有紧密的关系。

HW(High Watermark):俗称高水位。它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。如下图所示,它代表一个日志文件,这个日志文件中有9条消息,第一条消息的offset(Log Start Offset)为0,最后一条消息的offset为8,offset为9的消息用虚线框来表示,代表下一条待写入的消息。日志文件的HW为6,表示消费者只能拉取到offset从0到5之间的消息(不包括HW),而offset为6的消息(HW)对消费者而言是不可见的。

LEO(Log End Offset):标识当前日志文件中下一条待写入消息的offset,如上图offset为9的位置即为当前日志文件的LEO,LEO的大小相当于当前日志分区中最后一条消息的offset值加1。分区ISR集合中的每个副本都会维护自身的LEO。ISR集合中最小的LEO即为分区的HW,对消费者而言只能消费HW之前的消息。

10. HW截断机制

如果leader副本宕机,选出了新的leader副本,而新的leader并不能保证已经完全同步了之前leader的所有数据,只能保证HW之前的数据是同步过的,此时所有的follower副本都要将数据截断到HW的位置,再和新的leader同步数据,来保证数据一致。

当宕机的leader恢复,发现新的leader中的数据和自己持有的数据不一致,此时宕机的leader会将自己的数据截断到宕机之前的HW位置,然后同步新leader的数据。宕机的leader活过来也像follower一样同步数据,来保证数据的一致性。

三、生产者、消费者示例

1. 创建主题

afka 提供了许多实用的脚本工具,存放在 Kafka 源码的 bin 目录下。其中与主题有关的脚本是 `kafka-topics.sh`,我们可以使用该脚本创建一个分区数为 4,副本数为 3 的主题 test。示例如下:

```bash

bin/kafka-topics.sh --create --zookeeper --replication-factor <副本个数> --partitions <分区个数> --topic <主题名称>

```

其中,`--create` 是创建主题的命令,`--zookeeper` 指定了 Kafka 所连接的 Zookeeper 地址,`--replication-factor` 指定了分区副本的个数,`--partitions` 指定了分区个数,`--topic` 指定了所要创建主题的名称。

主题创建好之后,我们可以查看具体的主题存储目录。主题存储目录有参数 `log.dirs` 指定。如下图所示:

Kafka 的存储目录为 `/kafka-logs`,test-0 ~ test-3 为主题 test 的 4 个分区。分区文件夹的名字是主题名加上分区编号,编号从 0 开始。主题的数据就存储在分区文件夹下的 `.log` 文件内。

2. 查看主题的分区和副本情况

使用以下命令查看主题的详细信息:

```bash

bin/kafka-topics.sh --describe --zookeeper --topic <主题名称>

```

结果输出的第一行是对 Topic 信息的汇总:Topic 名称,分区个数以及副本个数。Configs 后面的输出代表该 Topic 每个分区副本在 broker 的分布情况。例如:

```plaintext

Topic: test PartitionCount: 4 ReplicationFactor: 3 Configs: segment.bytes=1073741824

replica.selector.class=org.apache.kafka.common.replica.ReplicaSelector Leader: 1003 Replicas: 1003,1001,1002

Isr: 1003,1001,1002

```

就第一条而言,代表的意思为:编号为 0 的 Partition,leader 副本在 brokerid = 1003 这个节点上;该分区所有的副本分布在 brokerid 为 1003、1001、1002 这三个节点;Isr 为 Replicas 的子集,子集内的所有副本均分布在 brokerid 为 1003、1001、1002 这三个节点上,并与所属 Partition 的 leader 副本保持一定程度的同步。

3.

以下是重构后的内容:

Kafka 消费者示例:

1. 连接 Kafka 集群地址和订阅主题

使用以下命令连接到 Kafka 集群并订阅指定的主题:

```bash

kafka-console-consumer.sh --bootstrap-server --topic [--group ]

```

其中,`` 是连接 Kafka 集群的地址,`` 是消费者订阅的主题。如果不加 `--group` 参数,系统会自动创建一个消费者组。目前主题 `test` 尚未有任何消息存入,所以此脚本还不能消费任何信息。

2. 发送消息到主题

在打开一个 shell 终端,然后使用 `kafka-console-producer.sh` 脚本发送一条消息 “This is a message” 到主题 `test`,示例如下:

输入完 “This is a message” 之后,按下回车,返回 consumer 的 shell 终端,可以接收到刚刚键入的消息 “This is a message”。

3. 查看主题偏移量

对于消费者来说,我们可以执行增加一些参数来消费指定的数据,比如:

- 增加 `--partition` 选项:从指定的分区消费消息;

- 增加 `--offset` 选项:从指定的偏移位置消费消息。

关于更多参数可以直接执行消费者脚本查看参数说明。看下面这个消费者示例:

```bash

kafka-console-consumer.sh --bootstrap-server --topic --partition --offset [--group ]

```

其中,`` 是连接 Kafka 集群的地址,`` 是消费者订阅的主题,`` 是指定的分区,`` 是指定的偏移位置。如果不加 `--group` 参数,系统会自动创建一个消费者组。HW(High Watermark)是指高水位线,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个 offset 之前的消息。所以上述命令只消费了三条信息。

4. 查看消费者组

可以使用以下命令查看消费者组的信息:

```bash

kafka-consumer-groups.sh --bootstrap-server --list [--describe] [--group ] [--topology] [--execute-command ] [--execute-script