简介
跟 Spring Data Redis、Spring Data MongoDB、Spring Data JPA 等项目类似,Spring Kafka 提供了在 Spring 应用中通过简单配置从而访问 Kafka 集群的途径。
本文主要介绍在 Spring 应用中消息生产者如何向 Kafka 集群发送消息、消息消费者如何消费消息、如何批量消费消息以及多消费者组同时消费消息等等。
使用 Spring Kafka 的最新特性,以下测试代码采用了 Spring Boot 2.0.0 构建
Spring Kafka 的基本用法
在 pom.xml 中添加依赖:
1 | <dependency> |
基本配置
springBoot properties
1 | #kafka,更多配置:org.springframework.boot.autoconfigure.kafka.KafkaProperties |
普通 Maven 构建项目,或者想要自定义更多配置,可以采用 JavaConfig 配置
1 | /** |
详细含义参考官方文档 : https://kafka.apache.org/documentation/#producerapi
第一个消息生产 / 消费的实例
SimpleProducer :1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class SimpleProducer {
private static final Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
private KafkaTemplate<Object, Object> kafkaTemplate;
/**
* 向topic中发送消息
*/
public void send (String topic, String msg) {
try {
kafkaTemplate.send(topic, msg);
logger.info("推送数据成功!");
} catch (Exception e) {
logger.error(MessageFormat.format("推送数据出错,topic:{},data:{}",topic,msg));
}
}
/**
* 向topic中发送消息
*/
public void send (String topic, List<String> msgs) {
msgs.forEach(msg -> kafkaTemplate.send(topic, msg));
}
}
SimpleConsumer :1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class SimpleConsumer {
private static final Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
private KafkaTemplate<String, String> kafkaTemplate;
"test", topics = {"topicName"}) (id =
public void listen(String data) {
System.out.println("SimpleConsumer收到消息:" + data);
logger.info(MessageFormat.format("SimpleConsumer收到消息:{}", data));
}
}
批量消费消息
如果生产者写入消息的速度比消费者读取的速度快的情况下,随着时间增长,消息堆积会越来越严重。
对于这种场景,我们需要增加多个消费者来进行水平扩展。
Kafka 消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。
假设有一个 T1 主题,该主题有 4 个分区;同时我们有一个消费组 G1,这个消费组只有一个消费者 C1。那么消费者 C1 将会收到这 4 个分区的消息,如下所示:
如果我们增加新的消费者 C2 到消费组 G1,那么每个消费者将会分别收到两个分区的消息,如下所示:
如果增加到 4 个消费者,那么每个消费者将会分别收到一个分区的消息,如下所示:
但如果我们继续增加消费者到这个消费组,剩余的消费者将会空闲,不会收到任何消息:
总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助
Kafka 一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。
换句话说,每个应用都可以读到全量的消息。
为了使得每个应用都能读到全量消息,应用需要有不同的消费组。
对于上面的例子,假如我们新增了一个新的消费组 G2,而这个消费组有两个消费者,那么会是这样的:
在这个场景中,消费组 G1 和消费组 G2 都能收到 T1 主题的全量消息,在逻辑意义上来说它们属于不同的应用。
最后,总结起来就是:
如果应用需要读取全量消息,那么请为该应用设置一个消费组;
如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者。
监听指定Topic的partition0,代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15"id0", topicPartitions = { (topic = TOPIC, partitions = {"0"})}) (id =
public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
log.info("Id0 records size " + records.size());
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
log.info("Received: " + record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = record.topic();
log.info("p0 Received message={},topic={}", message,topic);
}
}
}
Demo解析
地址: http://git.blz.netease.com/june.wang/springKafka-demo.git
如何创建消费者
读取 Kafka 消息只需要创建一个 kafkaConsumer,创建过程与 KafkaProducer 非常相像。
我们需要使用四个基本属性,bootstrap.servers、key.deserializer、value.deserializer 和 group.id。
其中,bootstrap.servers 与创建 KafkaProducer 的含义一样;
key.deserializer 和 value.deserializer 是用来做反序列化的,也就是将字节数组转换成对象;
group.id 不是严格必须的,但通常都会指定,这个参数是消费者的消费组。1
2
3
4
5
6Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
上面的例子中只设置了几个最基本的消费者参数,bootstrap.servers,group.id,key.deserializer 和 value.deserializer,其他的参数可以看Kafka文档 。
虽然我们很多情况下只是使用默认设置就行,但了解一些比较重要的参数还是很有帮助的。
一些比较重要的参数:
订阅主题
1 | consumer.subscribe(Collections.singletonList("topicName")); |
循环拉取
消费数据的 API 和处理方式很简单,我们只需要循环不断拉取消息即可。Kafka 对外暴露了一个非常简洁的 poll 方法,其内部实现了协作、分区重平衡、心跳、数据拉取等功能,但使用时这些细节都被隐藏了,我们也不需要关注这些。
需要提醒的是,消费者对象不是线程安全的,也就是不能够多个线程同时使用一个消费者对象;而且也不能够一个线程有多个消费者对象。简而言之,一个线程一个消费者,如果需要多个消费者那么请使用多线程来进行一一对应。
提交(commit)与位移(offset)
当我们调用 poll () 时,该方法会返回我们没有消费的消息。
当消息从 broker 返回消费者时,broker 并不跟踪这些消息是否被消费者接收到;
Kafka 让消费者自身来管理消费的位移(offset),并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit)。
在正常情况下,消费者会发送分区的提交信息到 Kafka,Kafka 进行记录。
当消费者宕机或者新消费者加入时,Kafka 会进行重平衡,这会导致消费者负责之前并不属于它的分区。
重平衡完成后,消费者会重新获取分区的位移,下面来看下两种有意思的情况。
假如一个消费者在重平衡前后都负责某个分区,如果提交位移比之前实际处理的消息位移要小,那么会导致消息重复消费。
假如在重平衡前某个消费者拉取分区消息,在进行消息处理前提交了位移,但还没完成处理宕机了,然后 Kafka 进行重平衡,新的消费者负责此分区并读取提交位移,此时会 “丢失”消息
因此,提交位移的方式会对应用有比较大的影响
自动确认offset :1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class AutoCommitConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.106.203:9092");
props.put("group.id", "autoCommitConsumers_group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
"resource") (
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
自动提交 offset 的方式非常简单,但多数情况下,我们不会使用自动提交的方式。
因为不论从 Kafka 集群中拉取的数据是否被处理成功,offset 都会被更新,也就是如果处理过程中出现错误可能会出现数据丢失的情况。所以多数情况下我们会选择手动提交方式
手动提交offset:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43public class ManualCommitConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.106.203:9092");
props.put("group.id", "manualCommitConsumers_group");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topic02"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
//do something
insertIntoDb(buffer);
//异步提交,防止阻塞进程。但是异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试(避免多个异步提交,导致位移覆盖)。
//consumer.commitSync();
//一般情况下对于异步提交,我们可能会通过回调的方式记录提交结果
consumer.commitAsync((offsets, e) -> {
if (e != null) {
log.error("Commit failed for offsets {}", offsets, e);
}
});
buffer.clear();
}
}
}
private static void insertIntoDb(List<ConsumerRecord<String, String>> buffer) {
System.out.println(buffer);
// Insert into db
}
}
KafkaConsumer从指定位移(offset)开始消费。:
1 |
|
另外注意的是,seek () 只是指定了 poll () 拉取的开始位移,这并不影响在 Kafka 中保存的提交位移(当然我们可以在 seek 和 poll 之后提交位移覆盖)。
优雅退出
在一般情况下,我们会在一个主线程中循环 poll 消息并进行处理。
当需要退出 poll 循环时,我们可以使用另一个线程调用 consumer.wakeup (),调用此方法会使得 poll () 抛出 WakeupException。如果调用 wakup 时,主线程正在处理消息,那么在下一次主线程调用 poll 时会抛出异常WakeUpException。
主线程在抛出 WakeUpException 后,需要调用 consumer.close (),此方法会提交位移,同时发送一个退出消费组的消息到 Kafka 的组协调者。组协调者收到消息后会立即进行重平衡(而无需等待此消费者会话过期)。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
System.out.println(System.currentTimeMillis() + "-- waiting for data...");
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());
}
for (TopicPartition tp: consumer.assignment())
System.out.println("Committing offset at position:" + consumer.position(tp));
consumer.commitSync();
}
} catch (WakeupException e) {
log.error("", e);
} finally {
consumer.close();
System.out.println("Consumer Closed");
}