Kafka学习
Kafka基础
快速了解Kafka
MQ的作用
MQ:MessageQueue,消息队列。是一种FIFIO先进先出的数据结构。一个典型的MQ系统,会将消息由生产者发送到MQ进行队列,然后根据一定的顺序交由消息的消费者进行处理。
MQ的作用主要有以下三个方面:
异步
异步能够提高系统的响应速度、吞吐量。
解耦
服务器之间进行解耦才可以减少服务之间的影响。提高系统整体的稳定性一级可扩展性。
解耦后可以实现数据分发。生产者发送一个消息后,可以由一个或者多个消费者进行消费,并且消费者的增加或者减少对生产者没有影响。
削峰
以稳定的系统资源应对突发的流量冲击。
Kafka产品介绍
Kafka是目前最具影响力的开源MQ产品,官网地址:https://kafka.apache.org/
Apache Kafka最初是由Linkedln开发并于2011年开源。它主要解决大规模数据的实时流式处理和数据管道问题。
Kafka是一个分布式的发布-订阅消息系统,可以快速地处理高吞吐量的数据流,并将数据实时地分发到多个消费者中。Kafka消息系统有多个broker(服务器)组成,这些borker可以在多个数据中心之间分布式部署,以提供高可用性和容错性。
Kafka使用高效的数据存储和管理技术,能够轻松地处理TB级别地数据量。其优点包括高吞吐量、低延迟、高扩展性、持久性和容错性等。
Kafka在企业级应用中被广泛应用,包括实时流处理、日志聚合、监控和数据分析等方面。同时,Kafka还可以与其他大数据工具集成,如Hadoop、Spark和Storm等,构建一个完整的数据生态系统。
Kafka的特点
Kafka最初诞生于Linkedln公司,其核心作用就是用来收集并处理庞大复杂的应用日志。

特点
- 数据吞吐量大:需要能够快速收集各个渠道的海量日志
- 集群容错性高:允许集群中少量节点崩溃
- 功能不需要太复杂:Kafka的设计目标就是高吞吐、低延迟和可扩展,主要关注消息传递而不是消息处理。所以,Kafka并没有支持死信队列、顺序消息等高级功能。
- 允许少量数据丢失:在海量的应用日志中,少量的日志丢失是不会影响结果的,所以Kafka的设计初衷是允许少量数据丢失的,当然Kafka本身也在不断优化数据安全问题。
Kafka快速上手
快速搭建单机服务
Kafka的运行环境很简单,只要有JVM虚拟机就可以运行。我们基于Ubuntu22.04安装JDK1.8环境进行搭建。
安装JDK环境
JDK下载地址:Java Downloads | Oracle
将下载好的安装包上传到服务器上。
解压安装包
1 | root@qh:/tools# tar -zxvf jdk-8u461-linux-x64.tar.gz |
将解压后的文件夹移动至/usr/local/目录下并重命名为java
1 | root@qh:/tools# mv jdk1.8.0_461 /usr/local/java |
配置系统环境变量
1 | root@qh:/usr/local/java# cat > /etc/environment << EOF |
验证java版本
1 | root@qh:~# java -version |
Kafka
Kafka下载地址:Apache Kafka
选择Kafka_2.13-3.8.0.tgz下载。下载速度可能有点慢,需要挂梯子进行下载。
也可以选择Index of /kafka进行下载。
Scala是一种运行于JVM虚拟机之上的语言。在运行时,只需要安装JDK就可以了,选哪个Scala版本没有区别。但是如果要调试源码就必须选择对应的Scala版本。因为Scala语言的版本并不是向后兼容的。
将安装包上传到服务器后进行解压。
1 | root@qh:/tools# tar -zvxf kafka_2.13-3.8.0.tgz |
将解压后的目录里的文件移动到/app/kafka方便管理。
1 | root@qh:/tools# mkdir -p /app/kafka |
试用一下。
1 | root@qh:/tools# cd /app/kafka/ |
查看当前进程。
1 | root@qh:/app/kafka# jps |
简单接收消息
Kafka的基础工作机制是消息发送者可以将消息发送到Kafka上指定的topic,而消费者可以从指定的topic上消费消息。

首先可以使用Kafka提供的客户端脚本创建Topic
1 | 创建test topic |
创建生产者
1 | root@qh:/app/kafka# bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test |
启用另一个窗口创建消费者,生产者方发消息,则消费者这方就会接收消息,加上--from-beginning即可从头开始消费,也可使用--partition 0 --offset 2指定从某个位置开始消费。
1 | root@qh:/app/kafka# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test |
理解消费者组
对于每个消费者,可以指定⼀个消费者组。kafka中的同⼀条消息,只能被同⼀个消费者组下的某⼀个消费者消费。⽽不属于同⼀个消费者组的其他消费者,也可以消费到这⼀条消息。在kafka-console-consumer.sh脚本中,可以通过–consumer-property group.id=testGroup来指定所属的消费者组。例如,可以启动三个消费者组,来验证⼀下分组消费机制。
1 | 两个消费者实例属于同⼀个消费者组 |
查看消费者组的偏移量
接下来,还可以使⽤kafka-consumer-groups.sh观测消费者组的情况。包括他们的消费进度。
1 | bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group |
理解Kafka的消息传递机制
从之前的实验可以看到, Kafka的消息发送者和消息消费者通过Topic这样⼀个逻辑概念来进⾏业务沟通。但是实际上,所有的消息是存在服务端的Partition这样⼀个数据结构当中的。

在Kafka的技术体系中,有以下⼀些概念需要先熟悉。
- 客户端Client:包括消息⽣产者和消息消费者。之前简单接触过。
- 消费者组:每个消费者可以指定⼀个所属的消费者组,相同消费者组的消费者共同构成⼀个逻辑消费者组。每⼀个消息会被多个感兴趣的消费者组消费,但是在每⼀个消费者组内部,⼀个消息只会被消费⼀次。
- 服务端Broker:⼀个Kafka服务器就是⼀个Broker。
- 话题Topic:这是⼀个逻辑概念,⼀个Topic被认为是业务含义相同的⼀组消息。客户端都通过绑定Topic来⽣产或者消费⾃⼰感兴趣的话题。
- 分区Partition:Topic只是⼀个逻辑概念,⽽Partition就是实际存储消息的组件。每个Partiton就是⼀个queue队列结构。所有消息以FIFO先进先出的顺序保存在这些Partition分区中。
理解Kafka的集群⼯作机制
对于Kafka这样⼀个追求消息吞吐量的产品来说,集群基本上是必备的。接下来,我们就动⼿搭建⼀个Kafka集群,并来理解⼀下Kafka集群的⼯作机制。
搭建Kafka集群
Kafka的集群架构⼤体如图。

单机服务下,Kafka已经具备了⾮常⾼的性能。TPS能够达到百万级别。但是,在实际⼯作中使⽤时,单机搭建的Kafka会有很⼤的局限性。
⼀⽅⾯:消息太多,需要分开保存。Kafka是⾯向海量消息设计的,⼀个Topic下的消息会⾮常多,单机服务很难存得下来。这些消息就需要分成不同的Partition,分布到多个不同的Broker上。这样每个Broker就只需要保存⼀部分数据。这些分区的个数就称为分区数。
另⼀⽅⾯:服务不稳定,数据容易丢失。单机服务下,如果服务崩溃,数据就丢失了。为了保证数据安全,就需要给每个Partition配置⼀个或多个备份,保证数据不丢失。Kafka的集群模式下,每个Partition都有⼀个或多个备份。Kafka会通过⼀个统⼀的Zookeeper集群作为选举中⼼,给每个Partition选举出⼀个主节点Leader,其他节点就是从节点Follower。主节点负责响应客户端的具体业务请求,并保存消息。⽽从节点则负责同步主节点的数据。当主节点发⽣故障时,Kafka会选举出⼀个从节点成为新的主节点。
最后,Kafka集群中的这些Broker信息,包括Partition的选举信息,都会保存在额外部署的Zookeeper集群当中,这样,kafka集群就不会因为某⼀些Broker服务崩溃⽽中断。
准备实验环境
准备三台同样的Ubuntu服务器,预先安装好了JDK,并关闭防⽕墙。
1 | systemctl stop ufw |
分别配置机器名worker1,worker2,worker3.
1 | cat > /etc/hosts << EOF |
接下来先来部署⼀个基于Zookeeper的Kafka集群。其中,选举中⼼部分,Zookeeper是⼀种多数同意的选举机制,允许集群中少数节点出现故障。因此,在搭建集群时,通常都是采⽤3,5,7这样的奇数节点,这样可以最⼤化集群的⾼可⽤特性。 在后续的实验过程中,我们会在三台服务器上都部署Zookeeper和Kafka。
部署ZooKeeper
ZooKeeper3.8.4稳定版下载地址:Apache Download Mirrors
Kafka的安装程序中自带了ZooKeeper,可以在Kafka的安装包的libs目录下查看到ZooKeeper的客户端jar包,但是通常情况下,为了让应用更好维护,我们会使用单独部署的ZooKeeper,而不是使用Kafka的自带的ZooKeeper。这里选择3.8.4稳定版下载。
Zookeeper是⼀种多数同意的选举机制,允许集群中少半数节点出现故障。因此,在搭建集群时,通常采⽤奇数节点,这样可以最⼤化集群的⾼可⽤特性。在后续的实现过程中,我们会在三台服务器上都部署Zookeeper。
将下载下来的Zookeeper解压到/app/zookeeper⽬录,然后进⼊conf⽬录,修改配置⽂件。在conf⽬录中,提供了⼀个zoo_sample.cfg⽂件,这是⼀个示例⽂件。我们只需要将这个⽂件复制⼀份zoo.cfg(cp zoo_sample.cfg zoo.cfg),修改下其中的关键配置就可以了。
1 | mkdir -p /app/zookeeper/data && cd /app/zookeeper/conf |
集群配置部分, server.x这个x就是节点在集群中的myid。后⾯的2888端⼝是集群内部数据传输使⽤的端⼝。3888是集群内部进⾏选举使⽤的端⼝。
接下来将整个Zookeeper的应⽤⽬录分发到另外两台机器上,其中myid里的内容根据节点的情况修改对应的内容。
1 | cd /app/zookeeper/data && echo 1 > myid |
这个myid⽂件的内容就是在zoo.cfg中配置的对应的server.id。
接下来可以在三台机器上都启动Zookeeper服务了。
1 | bin/zkServer.sh --config conf start |
启动完成后,使⽤jps指令可以看到⼀个QuorumPeerMain进程就表示服务启动成功。
三台机器都启动完成后,可以查看下集群状态。
1 | bin/zkServer.sh status |
部署Kafka集群
kafka服务并不需要进⾏选举,因此也没有奇数台服务的建议。
部署Kafka的⽅式跟部署Zookeeper差不多,就是解压、配置、启服务三板斧。
⾸先将Kafka解压到/app/kafka⽬录下,然后进⼊config⽬录,修改server.properties。这个配置⽂件⾥⾯的配置项⾮常多,但只需修改几个核心配置。
1 | broker 的全局唯⼀编号,不能重复,只能是数字。 |
broker.id需要每个服务器上不⼀样,分发到其他服务器上时,要注意修改⼀下。
多个Kafka服务注册到同⼀个zookeeper集群上的节点,会⾃动组成集群。
接下来就可以启动kafka服务了。启动服务时需要指定配置⽂件。
1 | bin/kafka-server-start.sh -daemon config/server.properties |
-daemon表示后台启动kafka服务,这样就不会占⽤当前命令窗⼝。
理解服务端的Topic、Partition和Broker
接下来可以对⽐⼀下之前的单机服务,快速理解Kafka的集群当中核⼼的Topic、Partition、Broker。
创建⼀个分布式的Topic
1 | ./kafka-topics.sh --bootstrap-server worker1:9092 --create --replication-factor 2 --partitions 4 --topic disTopic |
列出所有的Topic
1 | ./kafka-topics.sh --bootstrap-server worker1:9092 --list |
查看列表情况
1 | ./kafka-topics.sh --bootstrap-server worker1:9092 --describe --topic disTopic |
从这⾥可以看到,
–create创建集群,可以指定⼀些补充的参数。⼤部分的参数都可以在配置⽂件中指定默认值。
- partitons参数表示分区数,这个Topic下的消息会分别存⼊这些不同的分区中。示例中创建的disTopic,指定了四个分区,也就是说这个Topic下的消息会划分为四个部分。
- replication-factor表示每个分区有⼏个备份。示例中创建的disTopic,指定了每个partition有两个备份。
–describe查看Topic信息。
- partiton参数列出了四个partition,后⾯带有分区编号,⽤来标识这些分区。
- Leader表示这⼀组partiton中的Leader节点是哪⼀个。这个Leader节点就是负责响应客户端请求的主节点。从这⾥可以看到,Kafka中的每⼀个Partition都会分配Leader,也就是说每个Partition都有不同的节点来负责响应客户端的请求。这样就可以将客户端的请求做到尽量的分散。
- Replicas参数表示这个partition的多个备份是分配在哪些Broker上的。也称为AR。这⾥的0,1,2就对应配置集群时指定的broker.id。但是,Replicas列出的只是⼀个逻辑上的分配情况,并不关⼼数据实际是不是按照这个分配。甚⾄有些节点服务挂了之后,Replicas中也依然会列出节点的ID。
- ISR参数表示partition的实际分配情况。他是AR的⼀个⼦集,只列出那些当前还存活,能够正常同步数据的那些Broker节点。
还可以查看Topic下的Partition分布情况。在Broker上,与消息,联系最为紧密的,其实就是Partition了。之前在配置Kafka集群时,指定了⼀个log.dirs属性,指向了⼀个服务器上的⽇志⽬录。进⼊这个⽬录,就能看到每个Broker的实际数据承载情况。
Broker上的⼀个Partition对应了⽇志⽬录中的⼀个⽬录。⽽这个Partition上的所有消息,就保存在这个对应的⽬录当中。
从整个过程可以看到,Kafka当中,Topic是⼀个数据集合的逻辑单元。同⼀个Topic下的数据,实际上是存储在Partition分区中的,Partition就是数据存储的物理单元。⽽Broker是Partition的物理载体,这些Partition分区会尽量均匀的分配到不同的Broker机器上。⽽之前接触到的offset,就是每个消息在partition上的偏移量。

- Kafka设计需要⽀持海量的数据,⽽这样庞⼤的数据量,⼀个Broker是存不下的。那就拆分成多个Partition,每个Broker只存⼀部分数据。这样极⼤的扩展了集群的吞吐量。
- 每个Partition保留了⼀部分的消息副本,如果放到⼀个Broker上,就容易出现单点故障。所以就给每个Partition设计Follower节点,进⾏数据备份,从⽽保证数据安全。另外,多备份的Partition设计也提⾼了读取消息时的并发度。
- 在同⼀个Topic的多个Partition中,会产⽣⼀个Partition作为Leader。这个Leader Partition会负责响应客户端的请求,并将数据往其他Partition分发。
Kafka集群的消息流转模型
经过上⾯的实验,我们接触到了很多Kafka中的概念。将这些基础概念整合起来,就形成了Kafka集群的整体结构。

- Topic是⼀个逻辑概念,Producer和Consumer通过Topic进⾏业务沟通。
- Topic并不存储数据,Topic下的数据分为多组Partition,尽量平均的分散到各个Broker上。每组Partition包含Topic下⼀部分的消息。每组Partition包含⼀个Leader Partition以及若⼲个Follower Partition进⾏备份,每组Partition的个数称为备份因⼦ replica factor。
- Producer将消息发送到对应的Partition上,然后Consumer通过Partition上的Offset偏移量,记录⾃⼰所属消费者组Group在当Partition上消费消息的进度。
- Producer发送给⼀个Topic的消息,会由Kafka推送给所有订阅了这个Topic的消费者组进⾏处理。但是在每个消费者组内部,只会有⼀个消费者实例处理这⼀条消息。
- 最后,Kafka的Broker通过Zookeeper组成集群。然后在这些Broker中,需要选举产⽣⼀个担任Controller⻆⾊的Broker。这Controller的主要任务就是负责Topic的分配以及后续管理⼯作。在我们实验的集群中,这个Controller实际上是通过ZooKeeper产⽣的。


