环境搭建

首先下载Kafka的release文件,在这里我使用的版本是kafka_2.13-2.4.0

在这里我们准备部署一个三个节点的集群,首先解压release文件,之后进入文件夹 kafka_2.13-2.4.0/config,复制文件 server.properties 得到 server-1.propertiesserver-2.properties,这样一来我们就拥有三份配置文件了,之后我们会使用这三份配置文件来启动三个进程构建Kafka集群。分别对三份配置文件中的相关属性进行修改:

server.properties

# broker的序号
broker.id=0
# 服务监听的端口号
listeners=PLAINTEXT://localhost:9092
# 数据文件的存放目录
log.dirs=~/kafka/logs/kafka-logs-0
# 复制因子
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
# ZK的地址
zookeeper.connect=127.0.0.1:2181

server-1.properties

# broker的序号
broker.id=1
# 服务监听的端口号
listeners=PLAINTEXT://localhost:9093
# 数据文件的存放目录
log.dirs=~/kafka/logs/kafka-logs-1
# 复制因子
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
# ZK的地址
zookeeper.connect=127.0.0.1:2181

server-2.properties

# broker的序号
broker.id=2
# 服务监听的端口号
listeners=PLAINTEXT://localhost:9094
# 数据文件的存放目录
log.dirs=~/kafka/logs/kafka-logs-2
# 复制因子
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
# ZK的地址
zookeeper.connect=127.0.0.1:2181

上面之所以要把复制因子修改为3(默认是1)是因为Kafka默认会使用 __consumer_offsets 这个Topic保存Consumer Group的消费状况,而这个topic会在集群启动时自动的创建。因此我们没有办法在集群启动后控制它的replicas数量,所以只能在配置文件中进行设置。如果一个__consumer_offsets的replicas只有一个,那么集群中任意一个broker挂掉都可能会导致消费出现异常。

修改好配置文件之后就可以根据配置文件使用如下命令启动三个Kafka进程了

# 进程1
JMX_PORT=9192 ./bin/kafka-server-start.sh config/server.properties
# 进程2
JMX_PORT=9193 ./bin/kafka-server-start.sh config/server-1.properties
# 进程3
JMX_PORT=9194 ./bin/kafka-server-start.sh config/server-2.properties

启动完毕之后此时Kafka集群就已经构建完毕了,我们还可以安装kafka-manager这个工具来方便对Kafka集群进行管理。kafka-manager的安装和使用不再赘述,直接参考相关文章即可。

简单使用

首先我们使用kafka-manager创建一个topic,并设置相关属性如下

properties value
Topic test
Partitions 3
Replication Factor 2

之后我们构建一个maven项目并添加Kafka Client的依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>

常量属性

1
2
3
4
5
final class Constants {
static final String GROUP = "group_test";
static final String TOPIC = "test";
static final String KAFKA_SERVER_ADDRESS = "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094";
}

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class TestProducer {
private static final String SERIALIZER = StringSerializer.class.getName();

public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADDRESS);
props.put("retries", 0);
props.put("batch.size", 1024 * 16);
props.put("linger.ms", 1);
props.put("buffer.memory", 1024 * 1024 * 32);
props.put("key.serializer", SERIALIZER);
props.put("value.serializer", SERIALIZER);

Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100000; i++) {
String msg = "Message_test_" + i;
System.out.println("produce : " + msg);
// send方法是异步的,当它被调用时,它会将消息记录添加到待发送缓冲区并立即返回
producer.send(new ProducerRecord<>(Constants.TOPIC, Integer.toString(i), msg));
Thread.sleep(100);
}
producer.close();
}
}

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class TestConsumer {
private static final String DESERIALIZER = StringDeserializer.class.getName();

public static void main(String[] args) {
String consumerGroup = Constants.GROUP;
if (args.length > 0) {
consumerGroup = args[0];
}

Properties props = new Properties();
props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADDRESS);
// 同一个group会共享offset
props.put("group.id", consumerGroup);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", DESERIALIZER);
props.put("value.deserializer", DESERIALIZER);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(Constants.TOPIC));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
}
}
}

在上面我们创建了一个producer和一个consumer,之后我们可以启动一个producer和多个consumer,具体命令如下。

java TestProducer
java TestConsumer
java TestConsumer group_0
java TestConsumer group_0
java TestConsumer group_0

这样一来我们就有了一个producer和四个consumer,其中consumer分为两个group,groupId分别为group_0和group_test。不同的group会消费同一条消息,而同一个group内的consumer不会消费同一条消息。即:

  • 多个consumer group可以同时消费一个topic中的消息;(topic)
  • 同一个consumer group中的consumers会共享消费的offset;(queue)

一些原理

Kafka使用ZooKeeper的节点注册来实现Broker Controller的选举。

一个topic分为多个partition,每个partition又会有多个replicas,replicas由leader和follower组成,其中replicas的数量是包含了leader的。

producer负责push消息到partition,consumer负责从partition里pull消息。
具体发送到哪一个分区是由producer决定的,producer也可以使用自定义的分区器修改默认的分区配置。

consumer与partition之间的对应关系如下:

  • partition > consumer:一个消费者消费多个partition
  • partition = consumer:一个消费者消费一个partition
  • partition < consumer:一个消费者消费一个partition,多余的consumer会处于空闲状态

一个partition由多个segment组成,每个segment文件的名称为其第一条消息的索引值,文件分为日志文件和索引文件。

request.required.acks:

  • 1:producer发送消息到leader,leader写入本地日志成功返回(默认)
  • 0:~,leader立即返回(速度快但是有可能丢失消息)
  • -1:~,leader等待所有的follower同步完成才返回(速度慢但是强一致)

确定一个Consumer Group的GroupCoordinator的位置:

  1. Consumer Group
  2. GroupId
  3. abs(GroupId.hashCode) % NumPartition,NumPartition就是__consumer_offsets的分区数
  4. 计算结果表示了__consumer_offsets的一个partition
  5. 找到该partition的leader所在的broker
  6. GroupCoordinator就在这个broker上面

_consumer_offsets保存了消费者消费的offset信息。

后记

Kafka的内容太多了,而且感觉都比较琐碎,所以记录的东西也都是相当琐碎的。后面会继续学习,遇到相关的内容再进行补充。

参考

https://matt33.com/tags/kafka/