SpringBoot-集成Kafka详解

SpringBoot集成Kafka

1、构建项目

1.1、引入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.5.RELEASE</version>
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.28</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency>
</dependencies>
1.2、application.yml配置
spring:application:name: application-kafkakafka:bootstrap-servers: localhost:9092 #这个是kafka的地址,对应你server.properties中配置的producer:batch-size: 16384 #批量大小acks: -1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)retries: 10 # 消息发送重试次数#transaction-id-prefix: transactionbuffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:linger:ms: 2000 #提交延迟#partitioner: #指定分区器#class: pers.zhang.config.CustomerPartitionHandlerconsumer:group-id: testGroup #默认的消费组IDenable-auto-commit: true #是否自动提交offsetauto-commit-interval: 2000 #提交offset延时# 当kafka中没有初始offset或offset超出范围时将自动重置offset# earliest:重置为分区中最小的offset;# latest:重置为分区中最新的offset(消费分区中新产生的数据);# none:只要有一个分区不存在已提交的offset,就抛出异常;auto-offset-reset: latestmax-poll-records: 500 #单次拉取消息的最大条数key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session:timeout:ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)request:timeout:ms: 18000 # 消费请求的超时时间listener:missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
#      type: batch
1.3、简单生产
@RestController
public class kafkaProducer {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/normal/{message}")public void sendNormalMessage(@PathVariable("message") String message) {kafkaTemplate.send("sb_topic", message);}
}
1.4、简单消费
@Component
public class KafkaConsumer {//监听消费@KafkaListener(topics = {"sb_topic"})public void onNormalMessage(ConsumerRecord<String, Object> record) {System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +record.value());}}
简单消费:sb_topic-0=111
简单消费:sb_topic-0=222
简单消费:sb_topic-0=333

2、生产者

2.1、带回调的生产者

kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法,

/*** 回调的第一种写法* @param message*/
@GetMapping("/kafka/callbackOne/{message}")
public void sendCallbackOneMessage(@PathVariable("message") String message) {kafkaTemplate.send("sb_topic", message).addCallback(new SuccessCallback<SendResult<String, Object>>() {//成功的回调@Overridepublic void onSuccess(SendResult<String, Object> success) {// 消息发送到的topicString topic = success.getRecordMetadata().topic();// 消息发送到的分区int partition = success.getRecordMetadata().partition();// 消息在分区内的offsetlong offset = success.getRecordMetadata().offset();System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset);}}, new FailureCallback() {//失败的回调@Overridepublic void onFailure(Throwable throwable) {System.out.println("发送消息失败1:" + throwable.getMessage());}});
}

 

发送消息成功1:sb_topic-0-3
简单消费:sb_topic-0=one
/*** 回调的第二种写法* @param message*/
@GetMapping("/kafka/callbackTwo/{message}")
public void sendCallbackTwoMessage(@PathVariable("message") String message) {kafkaTemplate.send("sb_topic", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {System.out.println("发送消息失败2:"+throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {System.out.println("发送消息成功2:" + result.getRecordMetadata().topic() + "-"+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());}});
}

发送消息成功2:sb_topic-0-4
简单消费:sb_topic-0=two
2.2、监听器

Kafka提供了ProducerListener 监听器来异步监听生产者消息是否发送成功,我们可以自定义一个kafkaTemplate添加ProducerListener,当消息发送失败我们可以拿到消息进行重试或者把失败消息记录到数据库定时重试。

@Configuration
public class KafkaConfig {@AutowiredProducerFactory producerFactory;@Beanpublic KafkaTemplate<String, Object> kafkaTemplate() {KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<String, Object>();kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {@Overridepublic void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {System.out.println("发送成功 " + producerRecord.toString());}@Overridepublic void onSuccess(String topic, Integer partition, String key, Object value, RecordMetadata recordMetadata) {System.out.println("发送成功 topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);}@Overridepublic void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {System.out.println("发送失败" + producerRecord.toString());System.out.println(exception.getMessage());}@Overridepublic void onError(String topic, Integer partition, String key, Object value, Exception exception) {System.out.println("发送失败" + "topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);System.out.println(exception.getMessage());}});return kafkaTemplate;}
}

注意:当我们发送一条消息,既会走 ListenableFutureCallback 回调,也会走ProducerListener回调。

2.3、自定义分区器

我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:

1、若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;
2、若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;
3、patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;
我们来自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区
 

public class CustomizePartitioner implements Partitioner {@Overridepublic int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {//自定义分区规则,默认全部发送到0号分区return 0;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

在application.properties中配置自定义分区器,配置的值就是分区器类的全路径名

# 自定义分区器
spring.kafka.producer.properties.partitioner.class=pers.zhang.config.CustomizePartitioner
2.4、事务提交

如果在发送消息时需要创建事务,可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务:

@GetMapping("/kafka/transaction/{message}")
public void sendTransactionMessage(@PathVariable("message") String message) {//声明事务:后面报错消息不会发出去kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, Object, Object>() {@Overridepublic Object doInOperations(KafkaOperations<String, Object> operations) {operations.send("sb_topic", message + " test executeInTransaction");throw new RuntimeException("fail");}});// //不声明事务:后面报错但前面消息已经发送成功了// kafkaTemplate.send("sb_topic", message + " test executeInNoTransaction");// throw new RuntimeException("fail");
}

注意:如果声明了事务,需要在application.yml中指定:

spring:kafka:producer:transaction-id-prefix: tx_ #事务id前缀

3、消费者

3.1、指定topic、partition、offset消费

前面我们在监听消费topic1的时候,监听的是topic1上所有的消息,如果我们想指定topic、指定partition、指定offset来消费呢?也很简单,@KafkaListener注解已全部为我们提供

spring:kafka:listener:type: batch #设置批量消费consumer:max-poll-records: 50 #每次最多消费多少条消息

属性解释:

  • id:消费者ID
  • groupId:消费组ID
  • topics:监听的topic,可监听多个
  • topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听,手动分区。
//批量消费
@KafkaListener(id = "consumer2", topics = {"sb_topic"}, groupId = "sb_group")
public void onBatchMessage(List<ConsumerRecord<String, Object>> records) {System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size());for (ConsumerRecord<String, Object> record : records) {System.out.println(record.value());}
}
>>> 批量消费一次,recoreds.size()=4
hello
hello
hello
hello
>>> 批量消费一次,recoreds.size()=2
hello
hello
3.2、异常处理

ConsumerAwareListenerErrorHandler 异常处理器,新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器。
 

//异常处理器
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {return new ConsumerAwareListenerErrorHandler() {@Overridepublic Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {System.out.println("消费异常:" + message.getPayload());return null;}};
}// 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
@KafkaListener(topics = {"sb_topic"},errorHandler = "consumerAwareErrorHandler")
public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {throw new Exception("简单消费-模拟异常");
}// 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
@KafkaListener(topics = "sb_topic",errorHandler="consumerAwareErrorHandler")
public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {System.out.println("批量消费一次...");throw new Exception("批量消费-模拟异常");
}
批量消费一次...
消费异常:[ConsumerRecord(topic = sb_topic, partition = 0, leaderEpoch = 0, offset = 19, CreateTime = 1692604586558, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello), ConsumerRecord(topic = sb_topic, partition = 0, leaderEpoch = 0, offset = 20, CreateTime = 1692604587164, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello), ConsumerRecord(topic = sb_topic, partition = 0, leaderEpoch = 0, offset = 21, CreateTime = 1692604587790, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)]
3.3、消息过滤器

消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。

配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。
 

@Autowired
ConsumerFactory consumerFactory;//消息过滤器
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory);//被过滤的消息将被丢弃factory.setAckDiscarded(true);//消息过滤策略factory.setRecordFilterStrategy(new RecordFilterStrategy() {@Overridepublic boolean filter(ConsumerRecord consumerRecord) {if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {return false;}return true;}});return factory;
}//消息过滤监听
@KafkaListener(topics = {"sb_topic"},containerFactory = "filterContainerFactory")
public void onMessage6(ConsumerRecord<?, ?> record) {System.out.println(record.value());
}

上面实现了一个"过滤奇数、接收偶数"的过滤策略,我们向topic发送0-9总共10条消息,看一下监听器的消费情况,可以看到监听器只消费了偶数:

3.4、消息转发

在实际开发中,我们可能有这样的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。

在SpringBoot集成Kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容,如下:
 

//消息转发 从sb_topic转发到sb_topic2
@KafkaListener(topics = {"sb_topic"})
@SendTo("sb_topic2")
public String onMessage7(ConsumerRecord<?, ?> record) {return record.value()+"-forward message";
}@KafkaListener(topics = {"sb_topic2"})
public void onMessage8(ConsumerRecord<?, ?> record) {System.out.println("收到sb_topic转发过来的消息:" + record.value());
}
收到sb_topic转发过来的消息:hello-forward message
收到sb_topic转发过来的消息:hello-forward message
收到sb_topic转发过来的消息:hello-forward message
收到sb_topic转发过来的消息:hello-forward message
3.5、定时启动、停止

默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢——使用KafkaListenerEndpointRegistry,下面我们就来实现:

1、禁止监听器自启动;
2、创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;
新建一个定时任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry 在Spring中已经被注册为Bean,直接注入,设置禁止KafkaListener自启动
 

@EnableScheduling
@Component
public class CronTimer {/*** @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,* 而是会被注册在KafkaListenerEndpointRegistry中,* 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean**/@Autowiredprivate KafkaListenerEndpointRegistry registry;@Autowiredprivate ConsumerFactory consumerFactory;
​// 监听器容器工厂(设置禁止KafkaListener自启动)@Beanpublic ConcurrentKafkaListenerContainerFactory delayContainerFactory() {ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();container.setConsumerFactory(consumerFactory);//禁止KafkaListener自启动container.setAutoStartup(false);return container;}// 监听器@KafkaListener(id="timingConsumer",topics = "sb_topic",containerFactory = "delayContainerFactory")public void onMessage1(ConsumerRecord<?, ?> record){System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());}// 定时启动监听器@Scheduled(cron = "0 42 11 * * ? ")public void startListener() {System.out.println("启动监听器...");// "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器if (!registry.getListenerContainer("timingConsumer").isRunning()) {registry.getListenerContainer("timingConsumer").start();}//registry.getListenerContainer("timingConsumer").resume();}
​// 定时停止监听器@Scheduled(cron = "0 45 11 * * ? ")public void shutDownListener() {System.out.println("关闭监听器...");registry.getListenerContainer("timingConsumer").pause();}
}

启动项目,触发生产者向topic1发送消息,可以看到consumer没有消费,因为这时监听器还没有开始工作,

11:42分监听器启动开始工作,消费消息

11:45分监听器停止工作:

3.6、手动确认消息

默认情况下Kafka的消费者是自动确认消息的,通常情况下我们需要在业务处理成功之后手动触发消息的签收,否则可能会出现:消息消费到一半消费者异常,消息并未消费成功但是消息已经自动被确认,也不会再投递给消费者,也就导致消息丢失了。

当 auto.commit.enable 设置为false时,表示kafak的offset由customer手动维护,spring-kafka提供了通过ackMode的值表示不同的手动提交方式;
 

public enum AckMode {// 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交RECORD,// 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交BATCH,// 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交TIME,// 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交COUNT,// TIME | COUNT 有一个条件满足时提交COUNT_TIME,// 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交MANUAL,// 手动调用Acknowledgment.acknowledge()后立即提交MANUAL_IMMEDIATE,
}

如果设置AckMode模式为MANUAL或者MANUAL_IMMEDIATE,则需要对监听消息的方法中,引入Acknowledgment对象参数,并调用acknowledge()方法进行手动提交;

第一步:添加kafka配置,把 spring.kafka.listener.ack-mode = manual 设置为手动
 

spring:kafka:listener:ack-mode: manual consumer:enable-auto-commit: false

第二步;消费消息的时候,给方法添加Acknowledgment参数签收消息:

@KafkaListener(topics = {"sb_topic"})
public void onMessage9(ConsumerRecord<String, Object> record, Acknowledgment ack) {System.out.println("收到消息:" + record.value());//确认消息ack.acknowledge();
}

4、配置详解

4.1、生产者yml方式
server:port: 8081
spring:kafka:producer:# Kafka服务器bootstrap-servers: 175.24.228.202:9092# 开启事务,必须在开启了事务的方法中发送,否则报错transaction-id-prefix: kafkaTx-# 发生错误后,消息重发的次数,开启事务必须设置大于0。retries: 3# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。# 开启事务时,必须设置为allacks: all# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 生产者内存缓冲区的大小。buffer-memory: 1024000# 键的序列化方式key-serializer: org.springframework.kafka.support.serializer.JsonSerializer# 值的序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
4.2、生产者Config方式
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.transaction.KafkaTransactionManager;import java.util.HashMap;
import java.util.Map;/*** kafka配置,也可以写在yml,这个文件会覆盖yml*/
@SpringBootConfiguration
public class KafkaProviderConfig {@Value("${spring.kafka.producer.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.transaction-id-prefix}")private String transactionIdPrefix;@Value("${spring.kafka.producer.acks}")private String acks;@Value("${spring.kafka.producer.retries}")private String retries;@Value("${spring.kafka.producer.batch-size}")private String batchSize;@Value("${spring.kafka.producer.buffer-memory}")private String bufferMemory;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>(16);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。//acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。//acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。//开启事务必须设为allprops.put(ProducerConfig.ACKS_CONFIG, acks);//发生错误后,消息重发的次数,开启事务必须大于0props.put(ProducerConfig.RETRIES_CONFIG, retries);//当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送//批次的大小可以通过batch.size 参数设置.默认是16KB//较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。//比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟//实测batchSize这个参数没有用props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);//有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,//即使数据没达到16KB,也将这个批次发送出去props.put(ProducerConfig.LINGER_MS_CONFIG, "5000");//生产者内存缓冲区的大小props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);//反序列化,和生产者的序列化方式对应props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);return props;}@Beanpublic ProducerFactory<Object, Object> producerFactory() {DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());//开启事务,会导致 LINGER_MS_CONFIG 配置失效factory.setTransactionIdPrefix(transactionIdPrefix);return factory;}@Beanpublic KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {return new KafkaTransactionManager<>(producerFactory);}@Beanpublic KafkaTemplate<Object, Object> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}
4.3、消费者yml方式
server:port: 8082
spring:kafka:consumer:# Kafka服务器bootstrap-servers: 175.24.228.202:9092group-id: firstGroup# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D#auto-commit-interval: 2s# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)# none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式#key-deserializer: org.apache.kafka.common.serialization.StringDeserializerkey-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 值的反序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要properties:spring:json:trusted:packages: "*"# 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。# 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,# 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。# 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数# 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况max-poll-records: 3properties:# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancemax:poll:interval:ms: 600000# 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10ssession:timeout:ms: 10000listener:# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数concurrency: 4# 自动提交关闭,需要设置手动消息确认ack-mode: manual_immediate# 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误missing-topics-fatal: false# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancepoll-timeout: 600000
4.4、消费者Config方式
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;
import java.util.Map;/*** kafka配置,也可以写在yml,这个文件会覆盖yml*/
@SpringBootConfiguration
public class KafkaConsumerConfig {@Value("${spring.kafka.consumer.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Value("${spring.kafka.properties.session.timeout.ms}")private String sessionTimeout;@Value("${spring.kafka.properties.max.poll.interval.ms}")private String maxPollIntervalTime;@Value("${spring.kafka.consumer.max-poll-records}")private String maxPollRecords;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${spring.kafka.listener.concurrency}")private Integer concurrency;@Value("${spring.kafka.listener.missing-topics-fatal}")private boolean missingTopicsFatal;@Value("${spring.kafka.listener.poll-timeout}")private long pollTimeout;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>(16);propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);//自动提交的时间间隔,自动提交开启时生效propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");//该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理://earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录//latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)//none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);//两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancepropsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);//这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。//这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,//如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,//然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。//要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数//注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10spropsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);//序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类)propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);return propsMap;}@Beanpublic ConsumerFactory<Object, Object> consumerFactory() {//配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要try(JsonDeserializer<Object> deserializer = new JsonDeserializer<>()) {deserializer.trustedPackages("*");return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer);}}@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//在侦听器容器中运行的线程数,一般设置为 机器数*分区数factory.setConcurrency(concurrency);//消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误factory.setMissingTopicsFatal(missingTopicsFatal);//自动提交关闭,需要设置手动消息确认factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);factory.getContainerProperties().setPollTimeout(pollTimeout);//设置为批量监听,需要用List接收//factory.setBatchListener(true);return factory;}
}

5、注解消费示例

5.1、简单消费
    /*** 指定一个消费者组,一个主题主题。* @param record*/@KafkaListener(topics = IPHONE_TOPIC,groupId = APPLE_GROUP)public void simpleConsumer(ConsumerRecord<String, String> record) {System.out.println("进入simpleConsumer方法");System.out.printf("分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}
5.2、监听多个主题
    /*** 指定多个主题。** @param record*/@KafkaListener(topics = {IPHONE_TOPIC,IPAD_TOPIC},groupId = APPLE_GROUP)public void topics(ConsumerRecord<String, String> record) {System.out.println("进入topics方法");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}
5.3、监听一个主题,指定分区消费
    /*** 监听一个主题,且指定消费主题的哪些分区。* 参数详解:消费者组=apple_group;监听主题=iphoneTopic;只消费的分区=1,2;消费者数量=2* @param record*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"})},concurrency = "2")public void consumeByPattern(ConsumerRecord<String, String> record) {System.out.println("consumeByPattern");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}
5.4、指定多个分区,指定起始偏移量,多线程消费
    /*** 指定多个分区从哪个偏移量开始消费。* 10个线程,也就是10个消费者*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPAD_TOPIC,partitions = {"0","1"},partitionOffsets = {@PartitionOffset(partition = "2", initialOffset = "10"),@PartitionOffset(partition = "3", initialOffset = "0"),})},concurrency = "10")public void consumeByPartitionOffsets(ConsumerRecord<String, String> record) {System.out.println("consumeByPartitionOffsets");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}
5.5、监听多个主题,指定多个分区,指定起始偏移量
    /*** 指定多个主题。参数详解如上面的方法。* @param record*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),@TopicPartition(topic = IPAD_TOPIC, partitions = "1",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))},concurrency = "4")public void topics2(ConsumerRecord<String, String> record) {System.out.println("topics2");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}
5.6、指定多个监听器
    /*** 指定多个消费者组。参数详解如上面的方法。** @param record*/@KafkaListeners({@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),@TopicPartition(topic = IPAD_TOPIC, partitions = "1",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))},concurrency = "3"),@KafkaListener(groupId = XM_GROUP,topicPartitions = {@TopicPartition(topic = XMPHONE_TOPIC, partitions = {"1", "2"}),@TopicPartition(topic = XMPAD_TOPIC, partitions = "1",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))},concurrency = "3")})public void groupIds(ConsumerRecord<String, String> record) {System.out.println("groupIds");System.out.println("内容:" + record.value());System.out.println("分区:" + record.partition());System.out.println("偏移量:" + record.offset());System.out.println("创建消息的时间戳:" + record.timestamp());}
5.7、手动提交偏移量
    /*** 设置手动提交偏移量** @param record*/@KafkaListener(topics = IPHONE_TOPIC,groupId = APPLE_GROUP,//3个消费者concurrency = "3")public void setCommitType(ConsumerRecord<String, String> record, Acknowledgment ack) {System.out.println("setCommitType");System.out.println("内容:" + record.value());System.out.println("分区:" + record.partition());System.out.println("偏移量:" + record.offset());System.out.println("创建消息的时间戳:" + record.timestamp());ack.acknowledge();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/198431.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

毅速丨嫁接打印在模具制造中应用广泛

在模具行业中&#xff0c;3D打印随形水路已经被广泛认可&#xff0c;它可以提高冷却效率&#xff0c;从而提高产品良率。然而&#xff0c;全打印模具制造的成本相对较高&#xff0c;因为需要使用金属3D打印机和专用材料。为了节省打印成本&#xff0c;同时利用3D打印的优势&…

【PyQt小知识 - 4】:QGroupBox分组框控件 - 边框和标题设置

QGroupBox QGroupBox 是 PyQt 中的一个小部件&#xff0c;用于创建一个带有标题的组框。 可以使用 QGroupBox 将相关控件分组并添加一个标题。 以下是一个使用 QGroupBox 的示例代码&#xff08;示例一&#xff09;&#xff1a; from PyQt5.QtWidgets import * import sysa…

Centos(Linux)服务器安装Dotnet8 及 常见问题解决

1. 下载dotnet8 sdk 下载 .NET 8.0 SDK (v8.0.100) - Linux x64 Binaries 拿到 dotnet-sdk-8.0.100-linux-x64.tar.gz 文件 2. 把文件上传到 /usr/local/software 目录 mkdir -p /usr/local/software/dotnet8 把文件拷贝过去 mv dotnet-sdk-8.0.100-linux-x64.tar.gz /usr/loc…

【LeetCode】每日一题 2023_11_20 最大子数组和(dp)

文章目录 刷题前唠嗑题目&#xff1a;最大子数组和题目描述代码与解题思路 刷题前唠嗑 LeetCode? 启动&#xff01;&#xff01;&#xff01; 今天是一道 LeetCode 的经典题目&#xff0c;如果是 LeetCode 老手&#xff0c;估计都刷过&#xff0c;话是这么说&#xff0c;但咱…

探索亚马逊大语言模型:开启人工智能时代的语言创作新篇章

文章目录 前言一、大语言模型是什么&#xff1f;应用范围 二、Amazon Bedrock总结 前言 想必大家在ChatGPT的突然兴起&#xff0c;大家多多少少都会有各种各样的问题&#xff0c;比如&#xff1a;大语言模型和生成式AI有什么关系呢&#xff1f;大语言模型为什么这么火&#xf…

人机交互——自然语言生成

自然语言生成是让计算机自动或半自动地生成自然语言的文本。这个领域涉及到自然语言处理、语言学、计算机科学等多个领域的知识。 1.简介 自然语言生成系统可以分为基于规则的方法和基于统计的方法两大类。基于规则的方法主要依靠专家知识库和语言学规则来生成文本&#xff0…

拼图小游戏

package li;import ui.tu; //启动类 public class 主 {public static void main(String[] args) {new tu(); //创建登陆界面} }package ui;import javax.swing.*; import javax.swing.border.BevelBorder; import java.awt.event.ActionEvent; import java.awt.event.ActionLi…

腾讯微服务平台TSF学习笔记(一)--如何使用TSF的Sidecar过滤器实现mesh应用的故障注入

Mesh应用的故障注入 故障注入前世今生Envoy设置故障注入-延迟类型设置故障注入-延迟类型并带有自定义状态码总结 故障注入前世今生 故障注入是一种系统测试方法&#xff0c;通过引入故障来找到系统的bug&#xff0c;验证系统的稳健性。istio支持延迟故障注入和异常故障注入。 …

MySQL的执行器是怎么工作的

作为优化器后的真正执行语句的层&#xff0c;执行器有三种方式和存储引擎&#xff08;一般是innoDB&#xff09;交互 主键索引查询 查询的条件用到了主键&#xff0c;这个是全表唯一的&#xff0c;优化器会选择const类型来查询&#xff0c;然后while循环去根据主键索引的B树结…

Zabbix Proxy分布式监控

目录 Zabbix Proxy简介 实验环境 proxy端配置 1.安装仓库 2.安装zabbix-proxy 3.创建初始数据库 4.导入初始架构和数据&#xff0c;系统将提示您输入新创建的密码 5.编辑配置文件 /etc/zabbix/zabbix_proxy.conf&#xff0c;配置完成后要重启。 agent客户端配置 zabbix…

day07_数组初识

数组的概述 数组就是用于存储数据的长度固定的容器&#xff0c;保证多个数据的数据类型要一致。 数组适合做一批同种类型数据的存储 数组是属于引用数据类型&#xff0c; 数组变量名中存储的数组在内存中的地址信息。 数组中的元素可以是基本数据类型&#xff0c;也可以是引用…

java智慧校园信息管理系统源码带微信小程序

一、智慧校园的定义 智慧校园指的是以云计算和物联网为基础的智慧化的校园工作、学习和生活一体化环境。以各种应用服务系统为载体&#xff0c;将教学、科研、管理和校园生活进行充分融合&#xff0c;让校园实现无处不在的网络学习、融合创新的网络科研、透明高效的校务治理、…

Kafka学习笔记(二)

目录 第3章 Kafka架构深入3.3 Kafka消费者3.3.1 消费方式3.3.2 分区分配策略3.3.3 offset的维护 3.4 Kafka高效读写数据3.5 Zookeeper在Kafka中的作用3.6 Kafka事务3.6.1 Producer事务3.6.2 Consumer事务&#xff08;精准一次性消费&#xff09; 第4章 Kafka API4.1 Producer A…

电子学会2023年6月青少年软件编程(图形化)等级考试试卷(四级)真题,含答案解析

青少年软件编程(图形化)等级考试试卷(四级) 一、单选题(共10题,共30分) 1. 下列积木运行后的结果是?( )(说明:逗号后面无空格) A.

Canal+Kafka实现MySQL与Redis数据同步(二)

CanalKafka实现MySQL与Redis数据同步&#xff08;二&#xff09; 创建MQ消费者进行同步 在application.yml配置文件加上kafka的配置信息&#xff1a; spring:kafka:# Kafka服务地址bootstrap-servers: 127.0.0.1:9092consumer:# 指定一个默认的组名group-id: consumer-group…

08.智慧商城——购物车布局、全选反选、功能实现

01. 购物车 - 静态布局 基本结构 <template><div class"cart"><van-nav-bar title"购物车" fixed /><!-- 购物车开头 --><div class"cart-title"><span class"all">共<i>4</i>件商品…

STM32存储左右互搏 SPI总线FATS文件读写FLASH W25QXX

STM32存储左右互搏 SPI总线FATS文件读写FLASH W25QXX FLASH是常用的一种非易失存储单元&#xff0c;W25QXX系列Flash有不同容量的型号&#xff0c;如W25Q64的容量为64Mbit&#xff0c;也就是8MByte。这里介绍STM32CUBEIDE开发平台HAL库实现FATS文件操作W25Q各型号FLASH的例程。…

好莱坞罢工事件!再次警醒人类重视AI监管,人工智能矛盾一触即发!

原创 | 文 BFT机器人 关注国外新闻的应该都知道&#xff0c;最近焦点新闻是好莱坞史上最大规模的一场罢工运动。这场维持118天的罢工运动&#xff0c;终于在11月9号早上12点在好莱坞宣布结束。这场罢工运动虽是演员工会和代表资方的影视制片人联盟的茅盾&#xff0c;但直接引发…

求二叉树的高度(可运行)

输入二叉树为&#xff1a;ABD##E##C##。 运行环境&#xff1a;main.cpp 运行结果&#xff1a;3 #include "bits/stdc.h" using namespace std; typedef struct BiTNode{char data;struct BiTNode *lchild,*rchild;int tag; }BiTNode,*BiTree;void createTree(BiTre…

【数据结构】详解链表结构

目录 引言一、链表的介绍二、链表的几种分类三、不带头单链表的一些常用接口3.1 动态申请一个节点3.2 尾插数据3.3 头插数据3.4 尾删数据3.5 头删数据3.6 查找数据3.7 pos位置后插入数据3.8 删除pos位置数据3.9 释放空间 四、带头双向链表的常见接口4.1创建头节点&#xff08;初…