Kafka学习笔记
环境搭建
首先下载Kafka的release文件,在这里我使用的版本是kafka_2.13-2.4.0。
在这里我们准备部署一个三个节点的集群,首先解压release文件,之后进入文件夹 kafka_2.13-2.4.0/config
,复制文件 server.properties
得到 server-1.properties
和 server-2.properties
,这样一来我们就拥有三份配置文件了,之后我们会使用这三份配置文件来启动三个进程构建Kafka集群。分别对三份配置文件中的相关属性进行修改:
server.properties
# broker的序号
broker.id=0
# 服务监听的端口号
listeners=PLAINTEXT://localhost:9092
# 数据文件的存放目录
log.dirs=~/kafka/logs/kafka-logs-0
# 复制因子
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
# ZK的地址
zookeeper.connect=127.0.0.1:2181
server-1.properties
# broker的序号
broker.id=1
# 服务监听的端口号
listeners=PLAINTEXT://localhost:9093
# 数据文件的存放目录
log.dirs=~/kafka/logs/kafka-logs-1
# 复制因子
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
# ZK的地址
zookeeper.connect=127.0.0.1:2181
server-2.properties
# broker的序号
broker.id=2
# 服务监听的端口号
listeners=PLAINTEXT://localhost:9094
# 数据文件的存放目录
log.dirs=~/kafka/logs/kafka-logs-2
# 复制因子
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
# ZK的地址
zookeeper.connect=127.0.0.1:2181
上面之所以要把复制因子修改为3(默认是1)是因为Kafka默认会使用
__consumer_offsets
这个Topic保存Consumer Group的消费状况,而这个topic会在集群启动时自动的创建。因此我们没有办法在集群启动后控制它的replicas数量,所以只能在配置文件中进行设置。如果一个__consumer_offsets的replicas只有一个,那么集群中任意一个broker挂掉都可能会导致消费出现异常。
修改好配置文件之后就可以根据配置文件使用如下命令启动三个Kafka进程了
# 进程1
JMX_PORT=9192 ./bin/kafka-server-start.sh config/server.properties
# 进程2
JMX_PORT=9193 ./bin/kafka-server-start.sh config/server-1.properties
# 进程3
JMX_PORT=9194 ./bin/kafka-server-start.sh config/server-2.properties
启动完毕之后此时Kafka集群就已经构建完毕了,我们还可以安装kafka-manager这个工具来方便对Kafka集群进行管理。kafka-manager的安装和使用不再赘述,直接参考相关文章即可。
简单使用
首先我们使用kafka-manager创建一个topic,并设置相关属性如下
properties | value |
---|---|
Topic | test |
Partitions | 3 |
Replication Factor | 2 |
之后我们构建一个maven项目并添加Kafka Client的依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
常量属性
1 | final class Constants { |
Producer
1 | public class TestProducer { |
Consumer
1 | public class TestConsumer { |
在上面我们创建了一个producer和一个consumer,之后我们可以启动一个producer和多个consumer,具体命令如下。
java TestProducer
java TestConsumer
java TestConsumer group_0
java TestConsumer group_0
java TestConsumer group_0
这样一来我们就有了一个producer和四个consumer,其中consumer分为两个group,groupId分别为group_0和group_test。不同的group会消费同一条消息,而同一个group内的consumer不会消费同一条消息。即:
- 多个consumer group可以同时消费一个topic中的消息;(topic)
- 同一个consumer group中的consumers会共享消费的offset;(queue)
一些原理
Kafka使用ZooKeeper的节点注册来实现Broker Controller的选举。
一个topic分为多个partition,每个partition又会有多个replicas,replicas由leader和follower组成,其中replicas的数量是包含了leader的。
producer负责push消息到partition,consumer负责从partition里pull消息。
具体发送到哪一个分区是由producer决定的,producer也可以使用自定义的分区器修改默认的分区配置。
consumer与partition之间的对应关系如下:
- partition > consumer:一个消费者消费多个partition
- partition = consumer:一个消费者消费一个partition
- partition < consumer:一个消费者消费一个partition,多余的consumer会处于空闲状态
一个partition由多个segment组成,每个segment文件的名称为其第一条消息的索引值,文件分为日志文件和索引文件。
request.required.acks:
- 1:producer发送消息到leader,leader写入本地日志成功返回(默认)
- 0:~,leader立即返回(速度快但是有可能丢失消息)
- -1:~,leader等待所有的follower同步完成才返回(速度慢但是强一致)
确定一个Consumer Group的GroupCoordinator的位置:
- Consumer Group
- GroupId
- abs(GroupId.hashCode) % NumPartition,NumPartition就是__consumer_offsets的分区数
- 计算结果表示了__consumer_offsets的一个partition
- 找到该partition的leader所在的broker
- GroupCoordinator就在这个broker上面
_consumer_offsets保存了消费者消费的offset信息。
后记
Kafka的内容太多了,而且感觉都比较琐碎,所以记录的东西也都是相当琐碎的。后面会继续学习,遇到相关的内容再进行补充。