概念知识
消息中间件/系统
中间件
中间件是一类连接软件组件和应用的计算机软件,它包括一组服务。以便于运行在一台或多台机器上的多个软件通过网络进行交互。
该技术所提供的互操作性,推动了一致分布式体系架构的演进,该架构通常用于支持并简化那些复杂的分布式应用程序,它包括 web服务器、事务监控器和消息队列软件。
消息队列
在计算机科学中,消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。
消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。
消息队列常常保存在链表结构中。拥有权限的进程可以向消息队列中写入或读取消息。
让我们用快递员送快递的例子来理解。
最开始送上门,你有时候不在家,又只有等第二天,现在多了个菜鸟驿站,快递小哥只需要往里面放,你有快递自己去驿站拿就行了。
快递小哥也不用直接对接这么多的客户,有包裹往驿站丢就行,然后我们去驿站拿,就算有时候我们不空,也可以让包裹在驿站多放几天,这也就是它的堆积能力。
为什么我们要用消息队列
异步处理
假设你有一个系统调用的链路,系统A 调用系统B 耗时 50ms,系统B 调用系统C 又需要200ms ,系统C 调用系统 D ,需要做比较超时的操作,需要 2s,如下图所示:
现在上面最大的问题在于:一个用户请求过来,整个链路的调用时间是 50ms + 200ms + 2000ms = 2250ms,也就是2秒多。
而事实上,调用链路中,系统A 调用系统 B,系统B 调用系统 C 总共加起来也才 250ms,但是系统C调用系统D 却用了 2S。
正是加入系统C调用系统D 这个链路,导致系统响应时间 从 250ms 增加到了 2250 ms,足足慢了 10倍。
如果说,我们把系统D 从链路中抽离出去,让 C 系统异步调用D,那么在 B系统调用 C,C 处理完成自己的逻辑,发送一个异步的请求去调用D系统,不用阻赛等到 D系统响应了再返回。这是不是好很多了呢?
举一个例子,就以我们平常点外卖为例:
我们平常点完餐,付完款,系统然后平给账户扣款、创建订单、通知商家准备菜品。
接着,是不是需要找个骑手给你送餐?那这个找骑手的过程,是需要一套复杂算法来实现调度的,比较耗时。
那么我们是不是就可以把找骑手给你送餐的这个步骤从链路中抽离出去,做成异步化的,哪怕延迟个几十秒,但是只要在一定时间范围内给你找到一个骑手去送餐就可以了。
这样是不是就可以让你下订单点外卖的速度变得超快?支付成功之后,直接创建好订单、账户扣款、通知商家立马给你准备做菜就ok了,这个过程可能就几百毫秒。然后后台异步化的耗费可能几十秒通过调度算法给你找到一个骑手去送餐,但是这个步骤不影响我们快速下订单。
所以上面的链路也是同理,如果业务流程支持异步化的话,是不是就可以考虑把系统C对系统D的调用抽离出去做成异步化的,不要放在链路中同步依次调用。
整个过程如下:
消息解耦
假如你现在系统A,这个系统会产出一个核心数据,下游系统 B和 C 都需要这个数据。
那么我们平常做的就是直接调用系统 B 和系统 C,发送数据过去。
过程如下:
过几天,其他的业务系统D、E 也需要这个数据,然后成了这样
如果后续还有系统要呢?
这种情况系统耦合非常的严重,如果你发送一个系统的调用失败了怎么整?
针对上面的问题,我们可以使用消息队列来实现系统解藕。
系统A 把数据发送到消息队列中,其他的系统,谁需要,自己去 消息队列 取就完了。
流量消峰&日志收集
假设你有一个系统,平时正常的时候每秒可能就几百个请求,系统部署在8核16G的机器的上,正常处理都是ok的,每秒几百请求是可以轻松抗住的。
但是如下图所示,在高峰期一下子来了每秒钟几千请求,瞬时出现了流量高峰,此时你的选择是要搞10台机器,抗住每秒几千请求的瞬时高峰吗?
那如果瞬时高峰每天就那么半个小时,接着直接就降低为了每秒就几百请求,如果你线上部署了很多台机器,那么每台机器就处理每秒几十个请求就可以了,这不是有点浪费机器资源吗?
大部分时候,每秒几百请求,一台机器就足够了,但是为了抗那每天瞬时的高峰,硬是部署了10台机器,每天就那半个小时有用,别的时候都是浪费资源的。
此时我们就可以使用消息队列来帮忙了,进行消峰。所有机器前面部署一层MQ,平时每秒几百请求大家都可以轻松接收消息。
一旦到了瞬时高峰期,一下涌入每秒几千的请求,就可以积压在MQ里面,然后那一台机器慢慢的处理和消费。
等高峰期过了,再消费一段时间,MQ里积压的数据就消费完毕了。
如下图:
消息队列有哪些
四种常用的消息队列 ActiveMQ、RabbitMQ、RocketMQ、Kafka
消息订阅模型
目前最常见的消息引擎模型有两种:点对点模型和发布订阅模型
1.点对点模型(一对一,消费者主动拉取数据,消息收到后消息清除)
消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。 消息被消费以后,queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。 Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
2.发布-订阅模型(一对多,消费者消费数据之后不会清除消息)
消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。
Kafka是什么
Apache Kafka 是一个分布式高吞吐量的流消息系统,使用Scala语言编写。Kafka 建立在 ZooKeeper 同步服务之上。它与 Apache Storm 和 Spark 完美集成,用于实时流数据分析,与其他消息传递系统相比,Kafka具有更好的吞吐量,内置分区,数据副本和高度容错功能,因此非常适合大型消息处理应用场景。
Kafka的架构图
Kafka的核心概念
-
Broker: 消息中间件处理节点,一个 Kafka 节点就是一个 Broker,一个或者多个 Broker 可以组成一个 Kafka 集群。
-
Topic: Kafka 的消息通过 Topic 主题来分类,Topic类似于关系型数据库中的表,每个 Topic 包含一个或多(Partition)分区。
-
Partition: 为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
如图: topic 是由多个 partition 组成的, 而 Kafka 的 partition 是不可修改的有序消 息序列, 也可以说是 有序的消息日志。 每个 partition 有自己专属的 partition 号, 通常是从 0 开始的。 用户对 partition 唯一能做的操作就是 在消息序列的尾部追 加写入消息。 partition 上的每条消息都会被分配一个唯一 的序列号
该序列号被称为位移( offset ) 是从 0 开始顺序递增 的整数。 位移信息可以 唯一定位到某 partition 下的一条消息 。 -
Replica:副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
-
Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
-
Follower: 每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的follower。
-
LogSegment: 每个分区又被划分为多个日志分段 LogSegment 组成,日志段是 Kafka 日志对象分片的最小单位。LogSegment 算是一个逻辑概念,对应一个具体的日志文件(”.log” 的数据文件)和两个索引文件(”.index” 和 “.timeindex”,分别表示偏移量索引文件和消息时间戳索引文件)组成。
-
Offset: 每个分区中都由一系列有序的、不可变的消息组成,这些消息被顺序地追加到 Partition 中,每个消息都有一个连续的序列号称之为 Offset 偏移量,用于在 Partition 内唯一标识消息。
-
Message: 消息是 Kafka 中存储的最小最基本的单位,即为一个 commit log,由一个固定长度的消息头和一个可变长度的消息体组成。
-
Producer: 消息的生产者,负责发布消息到 Kafka Broker,生产者在默认情况下把消息均衡地分布到主题的所有分区上,用户也可以自定义分区器来实现消息的分区路由。
-
Consumer: 消息的消费者,从 Kafka Broker 读取消息的客户端,消费者把每个分区最后读取的消息的 Offset 偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失。
-
Consumer Group: 每个 Consumer 属于一个特定的 Consumer Group(若不指定 Group Name则属于默认的 group),一个或多个 Consumer 组成的群组可以共同消费一个 Topic 中的消息,但每个分区只能被群组中的一个消费者操作。上面提到的消费引擎模型中,Kafka默认是发布订阅模式,但Kafka通过消费者组可以同时实现了传统消息引 擎系统的两大模型:如果所有实例都属于同一个 Group, 那么它实现的就是点对点模型;如果所有实例分别属于不 同的 Group,那么它实现的就是发布 / 订阅模型。
-
Rebalance: 重平衡 Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。
Consumer Group 进行 Rebalance 呢?Rebalance 的触发条件有 3 个。- 组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。
- 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile("t.*c")) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
- 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
用两张图来展示下上面提到的这些概念以及运行流程
Kafka的使用场景
- 日志收集: 可以用 kafka 收集各种服务的日志,通过kafka以统一接口服务的方式开放给各种消费者,如 hadoop,Hbase,Solr 等。
- 消息系统: 解耦生产者和消费者、缓存消息等。
- 用户活动跟踪: Kafka 经常被用来记录web用户或者app用户的各种活动,如浏览网页,搜索,点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。
- 运营指标: Kafka也经常用来记录运营监控数据,包括收集各种分布式应用的数据,比如报警和报告等。
- 流式处理: 比如 spark streaming 和 storm。
下面是一个日志方面的典型应用场景
Kafka快的原因
顺序读写
Kafka的消息是不断追加到文件中的,这个特性使kafka可以充分利用磁盘的顺序读写性能
顺序读写不需要硬盘磁头的寻道时间,只需很少的扇区旋转时间,所以速度远快于随机读写
零拷贝
服务器先将文件从复制到内核空间,再复制到用户空间,最后再复制到内核空间并通过网卡发送出去,而零拷贝则是直接从内核到内核再到网卡,省去了用户空间的复制
Zero copy对应的是Linux中sendfile函数,这个函数会接受一个offsize来确定从哪里开始读取。现实中,不可能将整个文件全部发给消费者,他通过消费者传递过来的偏移量来使用零拷贝读取指定内容的数据返回给消费者
分区
Kafka中的topic中的内容可以被分为多分partition存在,每个partition又分为多个段segment,所以每次操作都是针对一小部分做操作,很轻便,并且增加并行操作的能力
批量发送
Kafka允许进行批量发送消息,producter发送消息的时候,可以将消息缓存在本地,等到了固定条件发送到Kafka
1.等消息条数到固定条数
2.一段时间发送一次
数据压缩
Kafka还支持对消息集合进行压缩,Producer可以通过GZIP或Snappy格式对消息集合进行压缩。
压缩的好处就是减少传输的数据量,减轻对网络传输的压力。
Producer压缩之后,在Consumer需进行解压,虽然增加了CPU的工作,但在对大数据处理上,瓶颈在网络上而不是CPU,所以这个成本很值得
基本使用
Java客户端访问Kafka
引入maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
消息发送端代码
public class MsgProducer {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.60:9092,192.168.0.60:9093,192.168.0.60:9094");
/*
发出消息持久化机制参数
(1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
(2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader
又挂掉,则消息会丢失。
(3)acks=-1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。
这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。
*/
props.put(ProducerConfig.ACKS_CONFIG, "1");
//发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在接收者那边做好消息接收的幂等性处理
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//重试间隔设置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
//设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
//kafka本地线程会从缓冲区取数据,批量发送到broker,
//设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//默认值是0,意思就是消息必须立即被发送,但这样会影响性能
//一般设置100毫秒左右,就是说这个消息发送完后会进入本地的一个batch,如果100毫秒内,这个batch满了16kb就会随batch一起被发送出去
//如果100毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
//把发送的key从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(props);
int msgNum = 5;
for (int i = 1; i <= msgNum; i++) {
Order order = new Order(i, 100 + i, 1, 1000.00);
//指定发送分区
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("order-topic"
, 0, order.getOrderId().toString(), JSON.toJSONString(order));
//未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum
/*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("my-replicated-topic"
, order.getOrderId().toString(), JSON.toJSONString(order));*/
//等待消息发送成功的同步阻塞方法
/*RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());*/
//异步方式发送消息
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("发送消息失败:" + exception.getStackTrace());
}
if (metadata != null) {
System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
}
});
}
producer.close();
}
}
消息接收端代码
public class MsgConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.60:9092,192.168.0.60:9093,192.168.0.60:9094");
// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
// 是否自动提交offset
//props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG , "1000");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
/*
心跳时间,服务端broker通过心跳确认consumer是否故障,如果发现故障,就会通过心跳下发
rebalance的指令给其他的consumer通知他们进行rebalance操作,这个时间可以稍微短一点
*/
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
//服务端broker多久感知不到一个consumer心跳就认为他故障了,默认是10秒
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
/*
如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,
会将其踢出消费组,将分区分配给别的consumer消费
*/
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消费主题
String topicName = "order-topic";
//consumer.subscribe(Arrays.asList(topicName));
// 消费指定分区
//consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));
//消息回溯消费
consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));
//指定offset消费
//consumer.seek(new TopicPartition(topicName, 0), 10);
while (true) {
/*
* poll() API 是拉取消息的长轮询,主要是判断consumer是否还活着,只要我们持续调用poll(),
* 消费者就会存活在自己所在的group中,并且持续的消费指定partition的消息。
* 底层是这么做的:消费者向server持续发送心跳,如果一个时间段(session.
* timeout.ms)consumer挂掉或是不能发送心跳,这个消费者会被认为是挂掉了,
* 这个Partition也会被重新分配给其他consumer
*/
ConsumerRecords<String, String> records = consumer.poll(Integer.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:offset = %d, key = %s, value = %s%n", record.offset(), record.key(),
record.value());
}
if (records.count() > 0) {
// 提交offset
consumer.commitSync();
}
}
}
}
Spring Boot整合Kafka示例Demo
引入Spring Boot Kafka的依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.yml配置文件如下
kafka:
listener:
missing-topics-fatal: false
bootstrap-servers: 172.16.1.254:9092
producer:
# 每次批量发送消息的数量
batch-size: 128
#发送失败重试次数
retries: 3
#缓存容量-32MB
buffer-memory: 33554432
#key序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
#value序列化方式
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
#acks:消息的确认机制,默认值是0
acks: all
linger-ms-config: 1
consumer:
#制定消费者
group-id: consumer
#自动提交offset
enable-auto-commit: false
#提交offset的时间,单位ms 即1s提交一次
auto-commit-interval: 1000
#消息读取策略
auto-offset-reset: earliest
#key反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#value反序列方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
生产者示例代码
@RestController
public class KafkaController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
public void send() {
kafkaTemplate.send("mytopic", 0, "key", "this is a msg");
}
}
消费者示例代码
@Component
public class MyConsumer {
/**
* @KafkaListener(groupId = "testGroup", topicPartitions = {
* @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
* @TopicPartition(topic = "topic2", partitions = "0",
* partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
* },concurrency = "6")
* //concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数
* @param record
*/
@KafkaListener(topics = "mytopic",groupId = "testGroup")
public void listen(ConsumerRecord<String, String> record) {
String value = record.value();
System.out.println(value);
System.out.println(record);
}
}
生产者Producer配置
生产者 ACKS 机制
ACKS 参数指定了必须要有多少个分区副本接收到消息,生产者才会认为消息写入是发送消息成功的,这个参数对消息丢失的可能性会产生重要影响,主参数有如下选项:
acks=0: 把消息发送到kafka就认为发送成功。
acks=1: 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功。
acks=all: 把消息发送到 Kafka Leader 分区,并且 Leader 分区的副本 Follower 对消息进行了同步就认为发送成功。
Producer配置类
创建 Producer 配置类,对 Kafka 生产者进行配置,在配置中需要设置三个 Bean 分别为:
- kafkaTemplate:kafka template 实例,用于 Spring 中的其它对象引入该 Bean,通过其向 Kafka 发送消息。
- producerFactory:producer 工厂,用于对 kafka producer 进行配置。
- producerConfigs:对 kafka producer 参数进行配置。
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
// 设置@Configuration、@EnableKafka两个注解,声明Config并且打开KafkaTemplate能力。
@Configuration
@EnableKafka
public class KafkaProducerConfig {
/**
* Producer Template 配置
*/
@Bean(name="kafkaTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* Producer 工厂配置
*/
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/**
* Producer 参数配置
*/
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// 指定多个kafka集群多个地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
// 重试次数,0为不启用重试机制
props.put(ProducerConfig.RETRIES_CONFIG, 0);
// acks=0 把消息发送到kafka就认为发送成功
// acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
// acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
props.put(ProducerConfig.ACKS_CONFIG,"1");
// 生产者空间不足时,send()被阻塞的时间,默认60s
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
// 控制批处理大小,单位为字节
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
// 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
// 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
// 键的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 值的序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
// 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
return props;
}
}
Producer Service 向 kafka 发送数据
创建 Producer Service 引入 KafkaTemplate 对象,再创建 sendMessageSync、sendMessageAsync 两个方法,分别利用“同步/异步”两种方法向 kafka 发送消息。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* producer 同步方式发送数据
* @param topic topic名称
* @param message producer发送的数据
*/
public void sendMessageSync(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
}
/**
* producer 异步方式发送数据
* @param topic topic名称
* @param message producer发送的数据
*/
public void sendMessageAsync(String topic, String message) {
ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
System.out.println("success");
}
@Override
public void onFailure(Throwable ex) {
System.out.println("failure");
}
});
}
}
消费者Consumer配置
Consumer配置类
创建 Consumer 配置类,对 Kafka 消费者进行配置,在配置中需要设置三个 Bean 分别为:
- kafkaListenerContainerFactory:kafka container 工厂,负责创 建container,当使用@KafkaListener时需要提供。
- consumerFactory:consumer 工厂,用于对 kafka consumer 进行配置。
- consumerConfigs:对 kafka consumer 参数进行配置。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
// 设置消费者工厂
factory.setConsumerFactory(consumerFactory());
// 消费者组中线程数量
factory.setConcurrency(3);
// 拉取超时时间
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
// Kafka地址
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
// 是否自动提交offset偏移量(默认true)
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 自动提交的频率(ms)
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
// Session超时设置
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
// 键的反序列化方式
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 值的反序列化方式
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// offset偏移量规则设置:
// (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
// (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
// (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return propsMap;
}
}
Consumer Service监听Kafka数据
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = {"test"},groupId = "group1", containerFactory="kafkaListenerContainerFactory")
public void kafkaListener(String message){
System.out.println(message);
}
}
Spring Boot操作Kafka详解
Producer Template发送消息的几种方法
KafkaTemplate 类提供了非常方便的方法将数据发送到 kafka 的 Topic,以下清单显示了该类的提供的相关方法,详情可以查看 KafkaTemplate 类方法文档
// 设定data,向kafka发送消息
ListenableFuture<SendResult<K, V>> sendDefault(V data);
// 设定key、data,向kafka发送消息
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
// 设定partition、key、data,向kafka发送消息
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
// 设定partition、timestamp、key、data,向kafka发送消息
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
// 设定topic、data,向kafka发送消息
ListenableFuture<SendResult<K, V>> send(String topic, V data);
// 设定topic、key、data,向kafka发送消息
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
// 设定topic、partition、key、data,向kafka发送消息
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
// 设定topic、partition、timestamp、 key、data,向kafka发送消息
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
// 创建ProducerRecord对象,在ProducerRecord中设置好topic、partion、key、value等信息,然后向kafka发送消息
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
// 创建Spring的Message对象,然后向kafka发送消息
ListenableFuture<SendResult<K, V>> send(Message<?> message);
// 获取指标信息
Map<MetricName, ? extends Metric> metrics();
// 显示Topic分区信息
List<PartitionInfo> partitionsFor(String topic);
//在生产者上执行一些任意操作并返回结果。
<T> T execute(ProducerCallback<K, V, T> callback);
// 生产者刷新消息
void flush();
// 用于执行生产者方法后异步回调
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
下面将写个使用示例,这里改下上面向 kafka service 发送数据的例子,通过不同的方法向 kafka 发送消息,具体代码如下:
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Service
public class ProducerService {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* producer 同步方式发送数据
*
* @param topic topic名称
* @param message producer发送的数据
*/
public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
//------- 方法:send(String topic, @Nullable V data)
kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
//------- 方法:send(String topic, K key, @Nullable V data)
kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS);
//------- 方法:send(String topic, K key, @Nullable V data)
kafkaTemplate.send(topic, 0, message).get(10, TimeUnit.SECONDS);
//------- 方法:send(String topic, Integer partition, K key, @Nullable V data)
kafkaTemplate.send(topic, 0, key, message).get(10, TimeUnit.SECONDS);
//------- 方法:send(String topic, Integer partition, Long timestamp, K key, @Nullable V data)
kafkaTemplate.send(topic, 0, new Date().getTime(),key, message).get(10, TimeUnit.SECONDS);
//------- 方法:send(Message<?> message)
Message msg = MessageBuilder.withPayload("Send Message(payload,headers) Test")
.setHeader(KafkaHeaders.MESSAGE_KEY, key)
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.PREFIX,"kafka_")
.build();
kafkaTemplate.send(msg).get(10, TimeUnit.SECONDS);
//------- 方法:send(ProducerRecord<K, V> record)
ProducerRecord<String, String> producerRecord1 = new ProducerRecord<>("test", "Send ProducerRecord(topic,value) Test");
ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>("test", "", "Send ProducerRecord(topic,key,value) Test");
kafkaTemplate.send(producerRecord1).get(10, TimeUnit.SECONDS);
kafkaTemplate.send(producerRecord2).get(10, TimeUnit.SECONDS);
}
/**
* producer 异步方式发送数据
*
* @param topic topic名称
* @param message producer发送的数据
*/
public void sendMessageAsync(String topic, String key, String message) {
//------- 方法:send(String topic, @Nullable V data)
ListenableFuture<SendResult<Integer, String>> future1 = kafkaTemplate.send(topic, message);
//------- 方法:send(String topic, K key, @Nullable V data)
ListenableFuture<SendResult<Integer, String>> future2 = kafkaTemplate.send(topic, key, message);
//------- 方法:send(String topic, K key, @Nullable V data)
ListenableFuture<SendResult<Integer, String>> future3 = kafkaTemplate.send(topic, 0, message);
//------- 方法:send(String topic, Integer partition, K key, @Nullable V data)
ListenableFuture<SendResult<Integer, String>> future4 = kafkaTemplate.send(topic, 0, key, message);
//------- 方法:send(String topic, Integer partition, Long timestamp, K key, @Nullable V data)
ListenableFuture<SendResult<Integer, String>> future5 = kafkaTemplate.send(topic, 0, new Date().getTime(),key, message);
//------- 方法:send(Message<?> message)
Message msg = MessageBuilder.withPayload("Send Message(payload,headers) Test")
.setHeader(KafkaHeaders.MESSAGE_KEY, key)
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.PREFIX,"kafka_")
.build();
ListenableFuture<SendResult<Integer, String>> future6 = kafkaTemplate.send(msg);
//------- 方法:send(ProducerRecord<K, V> record)
ProducerRecord<String, String> producerRecord1 = new ProducerRecord<>("test", "Send ProducerRecord(topic,value) Test");
ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>("test", "", "Send ProducerRecord(topic,key,value) Test");
ListenableFuture<SendResult<Integer, String>> future7 = kafkaTemplate.send(producerRecord1);
ListenableFuture<SendResult<Integer, String>> future8 = kafkaTemplate.send(producerRecord2);
// 设置异步发送消息获取发送结果后执行的动作
ListenableFutureCallback listenableFutureCallback = new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
System.out.println("success");
}
@Override
public void onFailure(Throwable ex) {
System.out.println("failure");
}
};
// 将listenableFutureCallback与异步发送消息对象绑定
future1.addCallback(listenableFutureCallback);
future2.addCallback(listenableFutureCallback);
future3.addCallback(listenableFutureCallback);
future4.addCallback(listenableFutureCallback);
future5.addCallback(listenableFutureCallback);
future6.addCallback(listenableFutureCallback);
future7.addCallback(listenableFutureCallback);
future8.addCallback(listenableFutureCallback);
}
}
Kafka Consumer监听Kafka消息
当我们需要接收 kafka 中的消息时需要使用消息监听器,Spring For Kafka 提供了八种消息监听器接口,接口如下:
/**
* 当使用"自动提交"或"ontainer-managed"中一个提交方法提交offset偏移量时,
* 使用此接口处理Kafka consumer poll()操作接收到的各个ConsumerRecord实例。
*/
public interface MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data);
}
/**
* 当使用手动提交offset偏移量时,使用此接口处理从Kafka consumer poll()操作接收到的各个ConsumerRecord实例。
*/
public interface AcknowledgingMessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
/**
* 当使用"自动提交"或"ontainer-managed"中一个提交方法提交offset偏移量时,
* 使用此接口处理Kafka consumer poll()操作接收到的各个ConsumerRecord
* 实例。并提供可访问的consumer对象。
*/
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
/**
* 当使用手动提交offset偏移量时,使用此接口处理从Kafka consumer poll()操作
* 接收到的各个ConsumerRecord实例。并提供可访问的consumer对象。
*/
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
/**
* 当使用"自动提交"或"ontainer-managed"中一个提交方法提交offset偏移量时,
* 使用此接口处理从Kafka consumer poll()操作接收到的所有ConsumerRecord实例。
*
* 注意:使用此接口时不支持ACK的AckMode.RECORD模式,因为监听器已获得完整的批处理。
*/
public interface BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data);
}
/**
* 当使用手动提交offset偏移量时,使用此接口处理从Kafka consumer poll()操作接收到的所有ConsumerRecord实例。
*/
public interface BatchAcknowledgingMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
/**
* 当使用"自动提交"或"ontainer-managed"中一个提交方法提交offset偏移量时,
* 使用此接口处理从Kafka consumer poll()操作接收到的所有ConsumerRecord实例。
* 并提供可访问的consumer对象。
*
* 注意:使用此接口时不支持ACK的AckMode.RECORD模式,因为监听器已获得完整的批处理。
*/
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
/**
* 当使用手动提交offset偏移量时,使用此接口处理从Kafka consumer poll()操作接收到的
* 所有ConsumerRecord实例。并提供可访问的consumer对象。
*/
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
上面接口中的方法归总就是:
Spring For Kafka 提供了消息监听器接口的两种实现类,分别是:
- KafkaMessageListenerContainer
- ConcurrentMessageListenerContainer
KafkaMessageListenerContainer 利用单个线程来接收全部主题中全部分区上的所有消息。
ConcurrentMessageListenerContainer 代理的一个或多个 KafkaMessageListenerContainer 实例,来实现多个线程消费。
下面将创建一个 KafkaMessageListenerContainer 实例来监听 Kafka 消息:
@Configuration
@EnableKafka
public class ConsumerConfigDemo {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return propsMap;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
/**
* 创建 KafkaMessageListenerContainer 实例监听 kafka 消息
*/
@Bean
public KafkaMessageListenerContainer demoListenerContainer() {
// 创建container配置参数,并指定要监听的 topic 名称
ContainerProperties properties = new ContainerProperties("test");
// 设置消费者组名称
properties.setGroupId("group2");
// 设置监听器监听 kafka 消息
properties.setMessageListener(new MessageListener<Integer,String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println("消息:" + record);
}
});
return new KafkaMessageListenerContainer(consumerFactory(), properties);
}
}
上面示例启动后将监听 topic 名称为 “test” 的 kafka 消息,不过这样启动只是单线程消费,如果想多线程消费就得创建多个实例来监控该 topic 不同的分区。但是这样操作来完成消费者多线程消费比较麻烦,所以一般使用 Spring For Kafka 组件时都会创建 KafkaListenerContainerFactory Bean 来代理多个 KafkaMessageListenerContainer 完成消费者多线程消费。
使用@KafkaListener注解监听Kafka消息
为了使创建 kafka 监听器更加简单,Spring For Kafka 提供了 @KafkaListener 注解,该 @KafkaListener 注解配置方法上,凡是带上此注解的方法就会被标记为是 Kafka 消息监听器,所以可以用 @KafkaListener 注解快速创建消息监听器。
下面写几个例子来简单描述下使用方法:
1.监听单个 Topic 示例
这里先写一个简单使用 @KafkaListener 完成消息监听的示例。
@Configuration
@EnableKafka
public class ConsumerConfigDemo {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return propsMap;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 创建3个线程并发消费
factory.setConcurrency(3);
// 设置拉取数据超时时间
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
/**
* ---使用@KafkaListener注解来标记此方法为kafka消息监听器,创建消费组group1监听test topic
*/
@KafkaListener(topics = {"test"},groupId = "group1")
public void kafkaListener(String message){
System.out.println("消息:"+message);
}
}
2.监听多个 Topic 示例
使用 @KafkaListener 也可以监控多个 topic 的消息,示例如下:
@KafkaListener(topics = {"test1", "test2"}, groupId = "group1")
public void kafkaListener(String message){
System.out.println("消息:"+message);
}
3.监听某个 Topic 的某个分区示例
单独监听某个分区息,示例如下:
@KafkaListener(id = "id0", groupId = "group1", topicPartitions = { @TopicPartition(topic = "test", partitions = { "0" }) })
public void kafkaListener1(String message) {
System.out.println("消息:"+message);
}
@KafkaListener(id = "id1", groupId = "group1", topicPartitions = { @TopicPartition(topic = "test", partitions = { "1", "2" }) })
public void kafkaListener2(String message) {
System.out.println("消息:"+message);
}
4.监听多个 Topic 的分区示例
同时监听多个 topic 的分区,示例如下:
@KafkaListener(id = "test", group = "group1", topicPartitions = {
@TopicPartition(topic = "test1", partitions = {"0"}),
@TopicPartition(topic = "test2", partitions = {"0", "1"})
})
public void kafkaListener(String message) {
System.out.print(message);
}
5.获取监听的 topic 消息头中的元数据
可以从消息头中获取有关消息的元数据,例如:
@KafkaListener(topics = "test", groupId = "group1")
public void kafkaListener(@Payload String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println("主题:" + topic);
System.out.println("键key:" + key);
System.out.println("消息:" + message);
}
6.监听 topic 进行批量消费
如果参数配置中设置为批量消费,则 @KafkaListener 注解的方法的参数要使用 List 来接收,例如:
@KafkaListener(topics = "test", groupId = "group1")
public void kafkaListener(List<String> messages) {
for(String msg:messages){
System.out.println(msg);
}
}
7.监听 topic 并手动提交 Offset 偏移量
如果设置为手动提交 Offset 偏移量,并且设置 Ack 模式为 MANUAL 或 MANUAL_IMMEDIATE,则需要在方法参数中引入 Acknowledgment 对象,并执行它的 acknowledge() 方法来提交偏移量。
@KafkaListener(topics = "test",groupId = "group5")
public void kafkaListener(List<String> messages, Acknowledgment acknowledgment) {
for(String msg:messages){
System.out.println(msg);
}
// 触发提交offset偏移量
acknowledgment.acknowledge();
}
使用@KafkaListener 模糊匹配多个 Topic
使用 @KafkaListener 注解时,可以添加参数 topicPattern ,输入通配符来对多个 topic 进行监听,例如这里使用 “test.*” 将监听所有以 test 开头的 topic 的消息。
@KafkaListener(topicPattern = "test.*",groupId = "group6")
public void annoListener2(String messages) {
System.err.println(messages);
}
使用@SendTo 注解转发消息
在平时处理业务逻辑时候,经常需要接收 kafka 中某个 topic 的消息,进行一系列处理来完成业务逻辑,然后再进行转发到一个新的 topic 中,由于这种业务需求,Spring For Kafka 提供了 @SendTo 注解,只要在 @KafkaListener 与 @SendTo 注解在同一个方法上,并且该方法存在返回值,那么就能将监听的数据在方法内进行处理后 return,然后转发到 @SendTo 注解内设置的 topic 中。
完成上面操作需要几个步骤:
- 配置 Producer 参数,并创建 kafkaTemplate Bean。
- 配置KafkaListenerContainerFactory的ReplyTemplate,将 kafkaTemplate 对象添加到其中。
- 创建消息监听器方法,设置该方法拥有返回值,并添加 @KafkaListener 与 @SendTo 两个注解,并在 @SendTo 注解中输入消息转发的 Topic。
1.配置 Producer 参数,并创建 kafkaTemplate Bean
@Configuration
@EnableKafka
public class KafkaProducerConfig {
/**
* kafkaTemplate Bean
*/
@Bean(name="kafkaTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
}
2.配置KafkaListenerContainerFactory的ReplyTemplate,将 kafkaTemplate 对象添加到其中
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Autowired
private KafkaTemplate kafkaTemplate;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
// ---设置ReplyTemplate参数,将kafkaTemplate对象加入
factory.setReplyTemplate(kafkaTemplate);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return propsMap;
}
3.创建消息监听器方法,设置该方法拥有返回值,并添加 @KafkaListener 与 @SendTo 两个注解,并在 @SendTo 注解中输入消息转发的 Topic。
@Service
public class KafkaConsumerMessage {
/**
* 监听test1 topic,设置返回值为string类型,并添加@SendTo注解,将消息转发到 test2
*/
@KafkaListener(topics = "test1",groupId = "group1")
@SendTo("test2")
public String kafkaListener1(String messages) {
System.out.println(messages);
String newMsg = messages + "消息转发测试";
// 将处理后的消息返回
return newMsg;
}
/**
* 监听 test2 topic
*/
@KafkaListener(topics = "test2",groupId = "group2")
public void kafkaListener2(String messages) {
System.err.println(messages);
}
}
Kafka Consumer并发批量消费信息
1.设置并发数与开启批量
- kafkaListenerContainerFactory 设置 factory.setConcurrency(3) 设置并发,这个值不能超过topic分区数目
- kafkaListenerContainerFactory 设置 factory.setBatchListener(true) 开启批量
- consumerConfigs 配置 ConsumerConfig.MAX_POLL_RECORDS_CONFIG 值,来设置批量消费每次最多消费多少条消息记录
@Configuration
@EnableKafka
public class ConsumerConfigDemo1 {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 消费者组中线程数量,例如topic有3个分区,为了加快消费将并发设置为3
factory.setConcurrency(3);
// 拉取超时时间
factory.getContainerProperties().setPollTimeout(3000);
// 当使用批量监听器时需要设置为true
factory.setBatchListener(true);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
// Kafka地址
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// 是否自动提交
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 自动提交的频率
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
// Session超时设置
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
// 键的反序列化方式
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 值的反序列化方式
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 批量消费每次最多消费多少条消息记录
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
return propsMap;
}
}
2.设置分区消费
有多个分区的 Topic,可以设置多个注解单独监听 Topic 各个分区以提高效率。
@Component
public class ConsumerMessage {
@KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = "test2", partitions = { "0" }) })
public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
System.out.println("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
System.out.println("Id0 records size " + records.size());
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
System.out.println("Received: " + record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = record.topic();
System.out.printf(topic + " p0 Received message=" + message);
}
}
}
@KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = "test2", partitions = { "1" }) })
public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
System.out.println("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
System.out.println("Id1 records size " + records.size());
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
System.out.println("Received: " + record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = record.topic();
System.out.printf(topic + " p1 Received message=" + message);
}
}
}
@KafkaListener(id = "id2", topicPartitions = { @TopicPartition(topic = "test2", partitions = { "2" }) })
public void listenPartition2(List<ConsumerRecord<?, ?>> records) {
System.out.println("Id2 Listener, Thread ID: " + Thread.currentThread().getId());
System.out.println("Id2 records size " + records.size());
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
System.out.println("Received: " + record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = record.topic();
System.out.printf(topic + " p2 Received message=" + message);
}
}
}
}
暂停和恢复Listener Containers
Spring For Kafka 提供 start()、pause() 和 resume() 方法来操作监听容器的启动、暂停和恢复。
- start():启动监听容器。
- pause():暂停监听容器。
- resume():恢复监听容器。
这些方法一般可以灵活操作 kafka 的消费,例如进行服务进行升级,暂停消费者进行消费;例如在白天高峰期不进行服务消费,等到晚上再进行,这时候可以设置定时任务,白天关闭消费者消费到晚上开启;考虑到这些情况,利用 start()、pause()、resume() 这些方法能很好控制消费者进行消费。这里写一个简单例子,通过 cotroller 操作暂停、恢复消费者监听容器。
@RestController
public class KafkaController {
@Autowired
private KafkaListenerEndpointRegistry registry;
/**
* 暂停监听容器
*/
@GetMapping("/pause")
public void pause(){
registry.getListenerContainer("pause.resume").pause();
}
/**
* 恢复监听容器
*/
@GetMapping("/resume")
public void resume(){
//判断监听容器是否启动,未启动则将其启动,否则进行恢复监听容器
if (!registry.getListenerContainer("pause.resume").isRunning()) {
registry.getListenerContainer("pause.resume").start();
}
registry.getListenerContainer("pause.resume").resume();
}
在上面例子中,调用 /pause 接口可以暂停消费者监听容器,调用 /resume 接口可以恢复消费者监听容器。
过滤监听器中的消息
在接收消息时候可以创建一个过滤器来过滤接收的消息,这样方便我们不必处理全部消息,只接收我们需要的消息进行处理。
在 kafkaListenerContainerFactory 中配置一个过滤器 RecordFilterStrategy 对象过滤消息,这里演示下如何操作:
@Configuration
@EnableKafka
public class ConsumerConfigDemo {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return propsMap;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
// 设置过滤器,只接收消息内容中包含 "test" 的消息
RecordFilterStrategy recordFilterStrategy = new RecordFilterStrategy() {
@Override
public boolean filter(ConsumerRecord consumerRecord) {
String value = consumerRecord.value().toString();
if (value !=null && value.contains("test")) {
System.err.println(consumerRecord.value());
// 返回 false 则接收消息
return false;
}
// 返回 true 则抛弃消息
return true;
}
};
// 将过滤器添添加到参数中
factory.setRecordFilterStrategy(recordFilterStrategy);
return factory;
}
/**
* 监听消息,接收过滤器过滤后的消息
*/
@KafkaListener(topics = {"test"},groupId = "group1")
public void kafkaListener(String message){
System.out.println("消息:"+message);
}
}
监听器异常处理
1.单消息消费异常处理器
@Service
public class ConsumerService {
/**
* 消息监听器
*/
@KafkaListener( topics = {"test"},groupId = "group1",errorHandler = "listenErrorHandler")
public void listen(String message) {
System.out.println(message);
// 创建异常,触发异常处理器
throw new NullPointerException("测试错误处理器");
}
/**
* 异常处理器
*/
@Bean
public ConsumerAwareListenerErrorHandler listenErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message,
ListenerExecutionFailedException e,
Consumer<?, ?> consumer) {
System.out.println("message:" + message.getPayload());
System.out.println("exception:" + e.getMessage());
consumer.seek(new TopicPartition(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class),
message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
message.getHeaders().get(KafkaHeaders.OFFSET, Long.class));
return null;
}
};
}
}
2.批量消费异常处理器
批量消费代码也是差不多的,只不过传递过来的数据都是List集合方式。
@Service
public class ConsumerService {
/**
* 消息监听器
*/
@KafkaListener( topics = {"test"},groupId = "group1",errorHandler = "listenErrorHandler")
public void listen(List<String> messages) {
for(String msg:messages){
System.out.println(msg);
}
// 创建异常,触发异常处理器
throw new NullPointerException("测试错误处理器");
}
/**
* 异常处理器
*/
@Bean
public ConsumerAwareListenerErrorHandler listenErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message,
ListenerExecutionFailedException e,
Consumer<?, ?> consumer) {
System.out.println("message:" + message.getPayload());
System.out.println("exception:" + e.getMessage());
consumer.seek(new TopicPartition(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class),
message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
message.getHeaders().get(KafkaHeaders.OFFSET, Long.class));
return null;
}
};
}
}
3.全局异常处理
将异常处理器添加到 kafkaListenerContainerFactory 中来设置全局异常处理。
@Configuration
@EnableKafka
public class ConsumerConfigDemo {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return propsMap;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
// 将单条消息异常处理器添加到参数中
factory.setErrorHandler(errorHandler);
// 将批量消息异常处理器添加到参数中
//factory.setErrorHandler(errorHandler);
return factory;
}
/**
* 单消息消费异常处理器
*/
@Bean
public ConsumerAwareListenerErrorHandler listenErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message,
ListenerExecutionFailedException e,
Consumer<?, ?> consumer) {
System.out.println("message:" + message.getPayload());
System.out.println("exception:" + e.getMessage());
consumer.seek(new TopicPartition(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class),
message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
message.getHeaders().get(KafkaHeaders.OFFSET, Long.class));
return null;
}
};
}
/**
* 批量息消费异常处理器
*/
@Bean
public ConsumerAwareListenerErrorHandler listenErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message,
ListenerExecutionFailedException e,
Consumer<?, ?> consumer) {
System.out.println("message:" + message.getPayload());
System.out.println("exception:" + e.getMessage());
consumer.seek(new TopicPartition(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class),
message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
message.getHeaders().get(KafkaHeaders.OFFSET, Long.class));
return null;
}
};
}
/**
* 监听消息,接收过滤器过滤后的消息
*/
@KafkaListener(topics = {"test"},groupId = "group1")
public void kafkaListener(String message){
System.out.println("消息:"+message);
}
}
Kafka Consumer手动/自动提交Offset
在kafka的消费者中有一个非常关键的机制,那就是 offset 机制。它使得 Kafka 在消费的过程中即使挂了或者引发再均衡问题重新分配 Partation,当下次重新恢复消费时仍然可以知道从哪里开始消费。
Kafka中偏移量的自动提交是由参数 enable_auto_commit 和 auto_commit_interval_ms 控制的,当 enable_auto_commit=true 时,Kafka在消费的过程中会以频率为 auto_commit_interval_ms 向 Kafka 自带的 topic(__consumer_offsets) 进行偏移量提交,具体提交到哪个 Partation 是以算法:”partation=hash(group_id)%50” 来计算的。
在 Spring 中对 Kafka 设置手动或者自动提交Offset如下:
1.自动提交
自动提交需要配置下面两个参数:
- auto.commit.enable=true:是否将offset维护交给kafka自动提交到zookeeper中维护,设置为true。
- auto.commit.interval.ms=10000:自动提交时间间隔。
配置示例如下:
@Configuration
@EnableKafka
public class ConsumerConfigDemo {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
// ---设置自动提交Offset为true
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return propsMap;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 消费者线程数
factory.setConcurrency(3);
// 拉取超时时间
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
/**
* -------------接收消息-------------
*/
@KafkaListener(topics = {"test"}, groupId = "group1")
public void kafkaListener(String message){
System.out.println("消息:"+message);
}
2.手动提交
手动提交需要配置下面一个参数:
- auto.commit.enable=false:是否将offset维护交给kafka自动提交到zookeeper中维护,设置为false。
然后需要在程序中设置ack模式,从而进行手动提交维护offset。
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
设置ACK模式(手动提交模式,这里有七种)
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
return factory;
}
在 kafkaListenerContainerFactory 配置中设置 AckMode,它有七种模式分别为:
- RECORD: 每处理完一条记录后提交。
- BATCH(默认): 每次poll一批数据后提交一次,频率取决于每次poll的调用频率。
- TIME: 每次间隔ackTime的时间提交。
- COUNT: 处理完poll的一批数据后并且距离上次提交处理的记录数超过了设置的ackCount就提交。
- COUNT_TIME: TIME和COUNT中任意一条满足即提交。
- MANUAL: 手动调用Acknowledgment.acknowledge()后,并且处理完poll的这批数据后提交。
- MANUAL_IMMEDIATE: 手动调用Acknowledgment.acknowledge()后立即提交。
注意:如果设置 AckMode 模式为 MANUAL 或者 MANUAL_IMMEDIATE,则需要对监听消息的方法中,引入 Acknowledgment 对象参数,并调用 acknowledge() 方法进行手动提交
手动提交下这里将列出七种ACK模式示例,如下:
- ACK 模式: RECORD
- 描述: 每处理完一条记录后提交。
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
// 设置ACK模式为RECORD
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
return factory;
}
- ACK 模式: BATCH
- 描述: 每次poll一批数据后提交一次,频率取决于每次poll的调用频率。
@Configuration
@EnableKafka
public class ConsumerConfigDemo {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
// ---设置自动提交Offset为false
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 设置每次批量消费数目,例如生产者生成10条数据,设置此值为4,那么需要三次批消费(三次中每次消费数目为:4,4,2)才能完成
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4");
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return propsMap;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
// 开启批量消费监听器
factory.setBatchListener(true);
// 设置ACK模式为BATCH
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
return factory;
}
/**
* -------------接收消息-------------
* 批量消费时,设置参数为List来接收数据
*/
@KafkaListener(topics = {"test"}, groupId = "group1")
public void kafkaListener(List<String> message){
System.out.println("消息:"+message);
}
- ACK 模式: COUNT
- 描述: 处理完poll的一批数据后并且距离上次提交处理的记录数超过了设置的ackCount值就提交
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
// 设置ACK模式为COUNT
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.COUNT);
// 设置AckCount数目,每接收AckCount条记录数就提交Offset偏移量
factory.getContainerProperties().setAckCount(10);
return factory;
}
- ACK 模式: TIME
- 描述: 每次间隔ackTime的时间提交。
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
// 设置ACK模式为TIME
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.TIME);
// 设置提交Ack的时间间隔,单位(ms)
factory.getContainerProperties().setAckTime(1000);
return factory;
}
- ACK 模式: COUNT_TIME。
- 描述: 每次间隔ackTime的时间或处理完poll的一批数据后并且距离上次提交处理的记录数超过了设置的ackCount值,任意一条满足即提交。
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
// 设置ACK模式为COUNT_TIME
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.COUNT_TIME);
// 设置提交Ack的时间间隔,单位(ms)
factory.getContainerProperties().setAckTime(1000);
// 设置AckCount数目,每接收AckCount条记录数就提交Offset偏移量
factory.getContainerProperties().setAckCount(10);
return factory;
}
- ACK 模式: MANUAL
- 描述: 手动调用Acknowledgment.acknowledge()后,并且处理完poll的这批数据后提交。
@Configuration
@EnableKafka
public class ConsumerConfigDemo {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
// ---设置自动提交Offset为false
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 设置每次批量消费数目,例如生产者生成10条数据,设置此值为4,那么需要三次批消费(三次中每次消费数目为:4,4,2)才能完成
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4");
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return propsMap;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
// 开启批量消费监听器
factory.setBatchListener(true);
// 设置ACK模式为MANUAL
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
/**
* -------------接收消息-------------
* 批量消费时,设置参数为List来接收数据,并且因为ack模式为MANUAL,所以需要手动调用acknowledge()方法提交
*/
@KafkaListener(topics = {"test"}, groupId = "group1")
public void kafkaListener(List<String> message, Acknowledgment acknowledgment){
System.out.println("消息:"+message);
// 手动执行acknowledge()提交offset偏移量
acknowledgment.acknowledge();
}
- ACK 模式: MANUAL_IMMEDIATE
- 描述: 手动调用Acknowledgment.acknowledge()后立即提交
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
// 开启批量消费监听器
factory.setBatchListener(true);
// 设置ACK模式为MANUAL_IMMEDIATE
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
Kafka防止消息丢失的一些实践配置
-
不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
-
设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
-
设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
-
设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
-
设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
-
设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
-
确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
-
确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。
参考资料: