Kafka API操作

文章目录

  • 1、 Kafka 基础API
    • 1_Topic基本操作 DML管理
    • 2_生产者
    • 3_消费者 sub/assign
    • 4_自定义分区策略
    • 5_序列化
    • 6_拦截器
  • 2、Kafka API高级特性
    • 1_Offset自动控制
    • 2_Acks & Retries
    • 3_幂等性
    • 4_事务控制
      • 1、生产者事务Only
      • 2、消费者&生产者事务
      • 3、测试需要的三个消费者案例属性
  • 3、Spring Boot集成Kafka
  • 4、总结

1、 Kafka 基础API

了解基础API的使用,通过代码使用Kafka。

创建maven测试项目,引入如下依赖——最主要的是Kafka的客户端依赖:

<dependencies><!-- 与使用的kafka版本一致 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.8.0</version></dependency><!-- https://mvnrepository.com/artifact/log4j/log4j --><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.9</version></dependency>
</dependencies>

resource目录下创建日志配置文件log4j.properties

log4j.rootLogger = info,consolelog4j.appender.console = org.apache.log4j.ConsoleAppender
log4j.appender.console.Target = System.out
log4j.appender.console.layout = org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern =  %p %d{yyyy-MM-dd HH:mm:ss} %c - %m%n

1_Topic基本操作 DML管理

bootstrap.servers:Kafka的服务器地址。

package org.example.dml;import org.apache.kafka.clients.admin.*;import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;/*** @author shenyang* @version 1.0* @info TestKAFKA 3.7.0有bug*/public class KafkaTopicDML {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建KafkaAdminClientProperties props = new Properties();//配置连接设置props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"SHENYANG:9092");//运行时注意必须配置当前主机的`hosts`文件IP映射。KafkaAdminClient adminClient = (KafkaAdminClient) KafkaAdminClient.create(props);//删除Topic
//        removeTopic(adminClient);//创建Topic信息createTopicSync(adminClient);//查看topic列表findListTopic(adminClient);//查看topic的详细信息findTopicInfo(adminClient);//关闭adminClient.close();}private static void removeTopic(KafkaAdminClient adminClient) throws InterruptedException, ExecutionException {DeleteTopicsResult result = adminClient.deleteTopics(Arrays.asList("topic01"));result.all().get();}private static void findTopicInfo(KafkaAdminClient adminClient) throws InterruptedException, ExecutionException {System.out.println("=========================");DescribeTopicsResult dtr = adminClient.describeTopics(Arrays.asList("topic01"));Map<String, TopicDescription> topicDescriptionMap = dtr.all().get();for (Map.Entry<String, TopicDescription> entry : topicDescriptionMap.entrySet()) {System.out.println(entry.getKey() + "\t" + entry.getValue());}System.out.println("===================");System.out.println(dtr.allTopicNames() + "\t" + dtr.topicIdValues());}private static void findListTopic(KafkaAdminClient adminClient) throws InterruptedException, ExecutionException {ListTopicsResult listTopicsResult = adminClient.listTopics();Set<String> names = listTopicsResult.names().get();for (String name : names) {System.out.println(name);}}private static void createTopicSync(KafkaAdminClient adminClient) throws InterruptedException, ExecutionException {CreateTopicsResult result = adminClient.createTopics(Arrays.asList(new NewTopic("topic01", 3, (short) 1)));result.all().get();//同步}
}

2_生产者

  1. 创建连接
    • bootstrap.servers:Kafka的服务器地址
    • acks:表示当生产者生产数据到Kafka中,Kafka中会以什么样的策略返回
    • key.serializer:Kafka中的消息是以key、value键值对存储的,而且生产者生产的消息是需要在网络上传到的,这里指定的是StringSerializer方式,就是以字符串方式发送(将来还可以使用其他的一些序列化框架:Google ProtoBuf、Avro)
    • value.serializer:同上
  2. 创建一个生产者对象KafkaProducer
  3. 调用send方法发送消息(ProducerRecor,封装是key-value键值对)
  4. 调用Future.get表示等带服务端的响应
  5. 关闭生产者
package org.example.quickstart;import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @author shenyang* @version 1.0* @info TestKAFKA* @since 2024/9/2 21:33*/
public class KafkaProducerQuickStart {public static void main(String[] args) {//1.创建KafkaProducerProperties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"SHENYANG:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record =//指定key策略是hash 不指定就是轮询
//                    new ProducerRecord<>("topic01", "key" + i, "value" + i);new ProducerRecord<>("topic01", "value" + i);//发送消息给服务器producer.send(record);}producer.close();}
}

3_消费者 sub/assign

  • group.id:消费者组的概念,可以在一个消费组中包含多个消费者。如果若干个消费者的group.id是一样的,表示它们就在一个组中,一个组中的消费者是共同消费Kafka中topic的数据。
  • Kafka是一种拉消息模式的消息队列,在消费者中会有一个offset,表示从哪条消息开始拉取数据
  • kafkaConsumer.poll:Kafka的消费者API是一批一批数据的拉取

将下列消费者开启多份实例,生产者不指定key时默认将采取轮询的方式发送数据——新版本可能不同,不再使用默认分区策略,或者说老版本默认的分区策略在较新的版本中被废弃。

在这里插入图片描述

查看在同一消费者组下的多份实例之间是如何分配分区的——以及将某个消费实例下线,看看是不是该实例的分区会交给其他实例负责。

package org.example.quickstart;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;/*** @author shenyang* @version 1.0* @info TestKAFKA*/
public class KafkaConsumerQuickStart {public static void main(String[] args) {//1.创建KafkaProducerProperties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "SHENYANG:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "g1");//此时必须指定消费组KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//订阅相关的Topic
//        consumer.subscribe(Pattern.compile("^topic.*"));consumer.subscribe(Arrays.asList("topic01"));//遍历消息队列while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));if (!consumerRecords.isEmpty()) {//从队列中取出数据Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()) {//获取一个消费消息ConsumerRecord<String, String> record = recordIterator.next();String topic = record.topic();int partition = record.partition();long offset = record.offset();String key = record.key();String value = record.value();long timestamp = record.timestamp();System.out.println(topic + "\t" +partition + "\t" +offset + "\t" +key + "\t" +value + "\t" +timestamp);}}}}
}

手动指定消费分区和偏移量(去掉消费者组):

package org.example.quickstart;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;/*** @author shenyang* @version 1.0* @info TestKAFKA*/
public class KafkaConsumerQuickStart_1 {public static void main(String[] args) {//1.创建KafkaProducerProperties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "SHENYANG:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//订阅相关的Topic 手动指定消费分区,失去组管理的特性List<TopicPartition> partitions = Arrays.asList(new TopicPartition("topic01", 0));consumer.assign(partitions);//指定消费分区位置 从0开始消费
//        consumer.seekToBeginning(partitions);//指定偏移量消费位置consumer.seek(new TopicPartition("topic01", 0),0);//遍历消息队列while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));if (!consumerRecords.isEmpty()) {//从队列中取出数据Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()) {//获取一个消费消息ConsumerRecord<String, String> record = recordIterator.next();String topic = record.topic();int partition = record.partition();long offset = record.offset();String key = record.key();String value = record.value();long timestamp = record.timestamp();System.out.println(topic + "\t" +partition + "\t" +offset + "\t" +key + "\t" +value + "\t" +timestamp);}}if (false) {break;}}consumer.close();}
}

4_自定义分区策略

Determines which partition to send a record to when records are produced. Available options are:

  • If not set, the default partitioning logic is used. This strategy send records to a partition until at least BATCH_SIZE_CONFIG bytes is produced to the partition. It works with the strategy:
    1. If no partition is specified but a key is present, choose a partition based on a hash of the key.
    2. If no partition or key is present, choose the sticky partition that changes when at least BATCH_SIZE_CONFIG bytes are produced to the partition.
  • org.apache.kafka.clients.producer.RoundRobinPartitioner: A partitioning strategy where each record in a series of consecutive records is sent to a different partition, regardless of whether the ‘key’ is provided or not, until partitions run out and the process starts over again. Note: There’s a known issue that will cause uneven distribution when a new batch is created. See KAFKA-9965 for more detail.

Implementing the org.apache.kafka.clients.producer.Partitioner interface allows you to plug in a custom partitioner.

原文的意思就是,如果没有特别指定分区策略,Kafka 生产者将使用默认的分区逻辑。这一策略的工作方式如下:

  • 按批量大小分区:生产者将记录发送到某个分区,直到该分区中产生至少 BATCH_SIZE_CONFIG 字节的数据。这样可以提高数据的写入效率,减少网络往返时间——达到了这个大小会主动 flush 数据。
  • 按键进行分区:如果记录包含键(key),则生产者根据键的哈希值来选择分区。这确保了相同键的记录总是被发送到同一个分区,便于顺序消费。
  • 粘性分区(Sticky Partitioning):如果没有指定分区且没有提供键,生产者会选择一个粘性分区,只有在分区中的数据达到 BATCH_SIZE_CONFIG 字节时才会更换分区。这种方式可以在一定程度上保证记录的顺序性。

RoundRobinPartitioner:这是一个Kafka实现的自定义的分区策略,其中每条记录依次被发送到不同的分区,不论记录是否有键(key)。

  • 轮询分区:记录会依次发送到每个分区,直到所有分区都被使用一次,然后重新开始循环。这有助于在负载均衡时尽可能均匀地分配数据到各个分区。
  • 已知问题:在某些情况下,当新的批次(batch)创建时,这种策略可能会导致数据分布不均。

Kafka 允许你实现 org.apache.kafka.clients.producer.Partitioner 接口来创建自己的分区策略。

The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.

No attempt will be made to batch records larger than this size.

Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.

A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.

Note: This setting gives the upper bound of the batch size to be sent. If we have fewer than this many bytes accumulated for this partition, we will ‘linger’ for the linger.ms time waiting for more records to show up. This linger.ms setting defaults to 0, which means we’ll immediately send out a record even the accumulated batch size is under this batch.size setting.

注意 batch.size参数防止因为批处理导致数据发送到同一分区影响测试,之前的测试不加此参数可能会受影响。

props.put(ProducerConfig.BATCH_SIZE_CONFIG,1);

我们将仿照 RoundRobinPartitioner实现 Partitioner 接口编写自己的分区策略,如下:

package org.example.partitioner;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;import javax.rmi.CORBA.Util;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;/*** @author shenyang* @version 1.0* @info TestKAFKA*/
public class UserDefinePartition implements Partitioner {private  AtomicInteger counter = null;/*** @return 分区号*/@Overridepublic int partition(String topic,Object key,byte[] keyBytes,//key的字节数组Object value,byte[] valueBytes,Cluster cluster) {//获取所有分区List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);int numPartitions = partitionInfos.size();//如果没有提供key 自行计数采用轮询的方式if (keyBytes == null || keyBytes.length == 0) {int andIncrement = counter.getAndIncrement();//与int最大值进行与运算,直接转为正数return (andIncrement & Integer.MAX_VALUE) % numPartitions;} else {//先murmur2对其hash 再进行toPositive转成正数对分区数取余return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}@Overridepublic void close() {System.out.println("===close==");}@Overridepublic void configure(Map<String, ?> map) {counter = new AtomicInteger(1);}
}

使用时给生产者添加如下配置:

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,UserDefinePartition.class.getName());

5_序列化

无论是哪种序列化方式,在网络中都是以字节的形式传输的。我们之前使用的一直是针对 String 类型的序列化和反序列化,接下来我们通过 Apache lang3 工具包对自定义对象进行序列化测试。

自定义对象,实现序列化接口:

package org.example.serializer;import org.apache.kafka.common.serialization.Serializer;import java.io.Serializable;/*** @author shenyang* @version 1.0* @info TestKAFKA*/
public class User implements Serializable {private Integer id;private String name;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getName() {return name;}@Overridepublic String toString() {return "User{" +"id=" + id +", name=" + name +'}';}public void setName(String name) {this.name = name;}public User() {}public User(Integer id, String name) {this.id = id;this.name = name;}
}

Serializer class for value | key that implements the org.apache.kafka.common.serialization.Serializer interface.

Deserializer class for value | key that implements the org.apache.kafka.common.serialization.Deserializer interface.

序列化需要实现Serializer接口,而反序列化需要实现Deserializer接口。

自定义序列化、反序列化器:

package org.example.serializer;import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Serializer;import java.io.Serializable;
import java.util.Map;/*** @author shenyang* @version 1.0* @info TestKAFKA*/
public class ObjectDefineSerializer implements Serializer<Object> {@Overridepublic byte[] serialize(String s, Object data) {return SerializationUtils.serialize((Serializable) data);}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {System.out.println("configure");}@Overridepublic void close() {System.out.println("close");}
}package org.example.serializer;import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Deserializer;import java.util.Map;/*** @author shenyang* @version 1.0* @info TestKAFKA*/
public class ObjectDefineDeserializer implements Deserializer<Object> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {System.out.println("configure");}@Overridepublic Object deserialize(String topic, byte[] data) {return SerializationUtils.deserialize(data);}@Overridepublic void close() {System.out.println("close");}
}

在生产者中指定序列化方式:

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//使用自定义的序列化器
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectDefineSerializer.class.getName());

在消费者中指定反序列化方式:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//使用自定义的反序列化器
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ObjectDefineDeserializer.class.getName());

6_拦截器

A list of classes to use as interceptors Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.

自定义拦截器需要实现 ProducerInterceptor 接口。

拦截器主要是对生产者发出的数据拦截后进行修改操作,如下自定义拦截器对生产者发送的record value进行了添加额外后缀的操作:

package org.example.intercepters;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/*** @author shenyang* @version 1.0* @info TestKAFKA*/
public class UserDefineProducerInterceptor implements ProducerInterceptor {@Overridepublic ProducerRecord onSend(ProducerRecord record) {return new ProducerRecord(record.topic(),record.key(),record.value()+"----shenyang");}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("metadata:"+metadata+",exception"+exception);}@Overridepublic void close() {System.out.println("close()");}@Overridepublic void configure(Map<String, ?> configs) {System.out.println("configure");}
}

在kafka生产者中添加如下属性进行配置:

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, UserDefineProducerInterceptor.class.getName());

2、Kafka API高级特性

了解高级API的使用,对kafka更一步的控制。

1_Offset自动控制

Kafka消费者默认对于未订阅的topic的offset的时候,也就是系统并没有存储该消费者的消费分区的记录信息,默认Kafka消费者的默认首次消费策略:latest

auto.offset.reset=latest
  • earliest - 自动将偏移量重置为最早的偏移量。
  • latest - 自动将偏移量重置为最新的偏移量。
  • none - 如果未找到消费者组的先前偏移量,则向消费者抛出异常。

Kafka消费者在消费数据的时候默认会定期的提交消费的偏移量,这样就可以保证所有的消息至少可以被消费者消费1次,用户可以通过以下两个参数配置:

enable.auto.commit = true # 默认
auto.commit.interval.ms = 5000 #默认

如果用户需要自己管理offset的自动提交,可以关闭offset的自动提交,手动管理offset提交的偏移量,注意用户提交的offset偏移量永远都要比本次消费的偏移量+1,因为提交的offset是kafka消费者下一次抓取数据的位置。

消费策略——可自行测试,无论哪种策略关闭消费者再启动后,生产者发送的数据消费者都会进行续读,AUTO_OFFSET_RESET_CONFIG仅仅是决定没有偏移量时的首次动作

与自动提交——可以在消费到数据后立马关闭消费者,再次启动看看下次消费位置,验证是否10s提交 offset:

package org.example.offset;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;/*** @author shenyang* @version 1.0* @info TestKAFKA*/
public class KafkaConsumerOffset01 {public static void main(String[] args) {//1.创建KafkaProducerProperties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "SHENYANG:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//latest 如果系统(消费者组)没有消费者得分偏移量,系统会读取最后的偏移量。如果有会续读props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");// earliest 如果系统(消费者组)没有消费者得分偏移量,系统会读取该分区最早的偏移量。//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//配置offset自动提交 10s自动提交 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,10000);//默认自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);props.put(ConsumerConfig.GROUP_ID_CONFIG, "g1");//props.put(ConsumerConfig.GROUP_ID_CONFIG, "g4");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//订阅相关的Topic
//        consumer.subscribe(Pattern.compile("^topic.*"));consumer.subscribe(Arrays.asList("topic01"));//遍历消息队列while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));if (!consumerRecords.isEmpty()) {//从队列中取出数据Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()) {//获取一个消费消息ConsumerRecord<String, String> record = recordIterator.next();String topic = record.topic();int partition = record.partition();long offset = record.offset();String key = record.key();String value = record.value();long timestamp = record.timestamp();System.out.println(topic + "\t" +partition + "\t" +offset + "\t" +key + "\t" +value + "\t" +timestamp);}}}}
}

手动提交偏移量——关闭自动提交,并且在消费完数据后调用consumer.commitAsync方法提交偏移量:

package org.example.offset;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;/*** @author shenyang* @version 1.0* @info TestKAFKA*/
public class KafkaConsumerOffset04 {public static void main(String[] args) {//1.创建KafkaProducerProperties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "SHENYANG:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// earliest 如果系统(消费者组)没有消费者得分偏移量,系统会读取该分区最早的偏移量。props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//默认自动提交 配置手动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);props.put(ConsumerConfig.GROUP_ID_CONFIG, "g4");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//订阅相关的Topicconsumer.subscribe(Arrays.asList("topic01"));//遍历消息队列while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));if (!consumerRecords.isEmpty()) {//从队列中取出数据Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();//记录分区的消费元数据信息Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();while (recordIterator.hasNext()) {//获取一个消费消息ConsumerRecord<String, String> record = recordIterator.next();String topic = record.topic();int partition = record.partition();long offset = record.offset();String key = record.key();String value = record.value();long timestamp = record.timestamp();//记录消费分区偏移量的元素offsets.put(new TopicPartition(topic,partition),new OffsetAndMetadata(offset));//offset+1//提交消费者偏移量 因为提交的是本次读到的数据,每个分区会一直有一条数据读不完,应该提交下一次开始读取的偏移量consumer.commitAsync(offsets, (offsets1, exception) -> System.out.println("offsets:"+ offsets1 +" exception:"+exception));System.out.println(topic + "\t" +partition + "\t" +offset + "\t" +key + "\t" +value + "\t" +timestamp);}}}}
}

2_Acks & Retries

Kafka生产者在发送完一个的消息之后,要求Broker在规定的额时间Ack应答答,如果没有在规定时间内应答,Kafka生产者会尝试n次重新发送消息。

在这里插入图片描述

acks=1 默认

  • acks=1: Leader会将Record写到其本地日志中,但会在不等待所有Follower的完全确认的情况下做出响应。在这种情况下,如果Leader在确认记录后立即失败,但在Follower复制记录之前失败,则记录将丢失。
  • acks=0: 生产者根本不会等待服务器的任何确认。该记录将立即添加到套接字缓冲区中并视为已发送。在这种情况下,不能保证服务器已收到记录。
  • acks=all: 这意味着Leader将等待全套同步副本确认记录。这保证了只要至少一个同步副本仍处于活动状态,记录就不会丢失。这是最有力的保证。这等效于acks = -1设置。

如果生产者在规定的时间内,并没有得到Kafka的Leader的Ack应答,Kafka可以开启reties机制。

request.timeout.ms = 30000  默认
retries = 2147483647 默认

发送一条记录,我们查看消费者将收到几条消息:

package org.example.acks;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @author shenyang* @version 1.0* @info TestKAFKA*/
public class KafkaProducerAcks {public static void main(String[] args) {//1.创建KafkaProducerProperties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "SHENYANG:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//关闭幂等 默认开启props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,false);// 设置kafka ACK以及retriesprops.put(ProducerConfig.ACKS_CONFIG,"all");// 不包含第一次发送,如果尝试发送3次失败则系统放弃发送props.put(ProducerConfig.RETRIES_CONFIG,4);//将检测超时的时间设置为1毫秒(网络IO都不止1ms肯定会触发重试)props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,1);//不要移动位置在配置put前,不然不生效KafkaProducer<String, String> producer = new KafkaProducer<>(props);ProducerRecord<String, String> record =//指定key策略是hash 不指定就是轮询
//                    new ProducerRecord<>("topic01", "key" + i, "value" + i);new ProducerRecord<>("topic02","ack", "value" + "ack");//发送消息给服务器producer.send(record);producer.flush();producer.close();}
}

3_幂等性

当生产者发送的消息没有正确地被ack,使用重试机制发送消息时,必然会产生消息重复,为了防止数据被重复处理,消息的幂等性就需要得到保障。

HTTP/1.1中对幂等性的定义是:一次和多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外)。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

Methods can also have the property of “idempotence” in that (aside from error or expiration issues) the side-effects of N > 0 identical requests is the same as for a single request.

Kafka在0.11.0.0版本支持增加了对幂等的支持。幂等是针对生产者角度的特性。幂等可以保证上生产者发送的消息,不会丢失,而且不会重复。实现幂等的关键点就是服务端可以区分请求是否重复,过滤掉重复的请求。要区分请求是否重复的有两点:

唯一标识:要想区分请求是否重复,请求中就得有唯一标识。例如支付请求中,订单号就是唯一标识

记录下已处理过的请求标识:光有唯一标识还不够,还需要记录下那些请求是已经处理过的,这样当收到新的请求时,用新请求中的标识和处理记录进行比较,如果处理记录中有相同的标识,说明是重复记录,拒绝掉。

幂等又称为exactly once。要停止多次处理消息,必须仅将其持久化到Kafka Topic中仅仅一次。在初始化期间,kafka会给生产者生成一个唯一的ID称为Producer ID或PID

在这里插入图片描述

PID和sequence number(针对消息的一个递增序列)序列号与消息捆绑在一起,然后发送给Broker。由于序列号从零开始并且单调递增,因此,仅当消息的序列号比该PID / TopicPartition对中最后提交的消息正好大1时,Broker才会接受该消息。如果不是这种情况,则Broker认定是生产者重新发送该消息。

enable.idempotence= false #老版本默认

注意:在使用幂等性的时候,要求必须开启retries=true和acks=all

package org.example.idempotence;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @author shenyang* @version 1.0* @info TestKAFKA*/
public class KafkaProducerIdempotence {public static void main(String[] args) {//1.创建KafkaProducerProperties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "SHENYANG:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//幂等 默认开启props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//如果有多余1个记录没有应答阻塞客户端 默认5可能乱序props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,1);// 设置kafka ACK以及retriesprops.put(ProducerConfig.ACKS_CONFIG,"all");// 不包含第一次发送,如果尝试发送3次失败则系统放弃发送props.put(ProducerConfig.RETRIES_CONFIG,4);//将检测超时的时间设置为1毫秒(网络IO都不止1ms肯定会触发重试)props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,1);//不要移动位置在配置put前,不然不生效KafkaProducer<String, String> producer = new KafkaProducer<>(props);ProducerRecord<String, String> record =//指定key策略是hash 不指定就是轮询
//                    new ProducerRecord<>("topic01", "key" + i, "value" + i);new ProducerRecord<>("topic02","ack", "value" + "ack");//发送消息给服务器producer.send(record);//kafka在发送数据之前会在本地做一次缓冲,flush防止缓冲producer.flush();producer.close();}
}

4_事务控制

Kafka的幂等性,只能保证一条记录的在分区发送的原子性,但是如果要保证多条记录(多分区)之间的完整性,这个时候就需要开启kafk的事务操作。

在Kafka0.11.0.0除了引入的幂等性的概念,同时也引入了事务的概念。通常Kafka的事务分为 生产者事务Only消费者&生产者事务。一般来说默认消费者消费的消息的级别是read_uncommited数据,这有可能读取到事务失败的数据,所有在开启生产者事务之后,需要用户设置消费者的事务隔离级别。

isolation.level	=  read_uncommitted 默认

该选项有两个值read_committed | read_uncommitted,如果开始事务控制,消费端必须将事务的隔离级别设置为read_committed

开启的生产者事务的时候,只需要指定transactional.id属性即可,一旦开启了事务,默认生产者就已经开启了幂等性。但是要求"transactional.id"的取值必须是唯一的,同一时刻只能有一个"transactional.id"存在,其他的将会被关闭。

  • 生产者

    // 开启事务必须要配置事务的ID
    props.put("transactional.id", "dwd_user");
    
  • 消费者

    // 配置事务的隔离级别
    props.put("isolation.level","read_committed");
    // 关闭自动提交,一会我们需要手动来提交offset,通过事务来维护offset
    props.setProperty("enable.auto.commit", "false");
    
  • 生产者

    • 初始化事务
    • 开启事务
    • 需要使用producer来将消费者的offset提交到事务中
    • 提交事务
    • 如果出现异常回滚事务

如果使用了事务,不要使用异步发送

1、生产者事务Only

仅仅控制生产者的一些特性,假设有一个生产者同时发送了三条消息到三个分区,一条消息发送失败后 0,另外两个分区 1 2 也会进行回滚:

package org.example.transactions;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.UUID;/*** @author shenyang* @version 1.0* @info TestKAFKA*/
public class KafkaProducerTransactionsProducerOnly {public static void main(String[] args) {KafkaProducer<String, String> producer = buildKafkaProducer();//初始化事务producer.initTransactions();try {//开启事务producer.beginTransaction();for (int i = 0; i < 10; i++) {if(i == 8){int j = 10/0;}ProducerRecord<String, String> record =//指定key策略是hash 不指定就是轮询//                    new ProducerRecord<>("topic01", "transaction" + i, "error data" + i);new ProducerRecord<>("topic01", "right data" + i);//发送消息给服务器producer.send(record);producer.flush();}//事务提交producer.commitTransaction();} catch (Exception e) {System.err.println("出现错误~ " + e.getMessage());//终止事务producer.abortTransaction();} finally {producer.close();}}public static KafkaProducer<String, String> buildKafkaProducer() {//1.创建KafkaProducerProperties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "SHENYANG:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//必须配置事务idprops.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-id" + UUID.randomUUID().toString());//配置kafka批处理大小 默认在缓存池中 大小,时间props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024);//等待5ms如果batch数据不足1024props.put(ProducerConfig.LINGER_MS_CONFIG, 5);//配置Kafka的重试机制和幂等性props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 20000);return new KafkaProducer<>(props);}
}

2、消费者&生产者事务

既是消费者也是生产者,比如:消费topic01中的数据,业务处理完成后又将数据发送到topic02当中去。假设向topic02中发送数据失败了,不会成功消费掉topic01中的数据:

package org.example.transactions;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;/*** @author shenyang* @version 1.0* @info TestKAFKA*/
public class KafkaProducerTransactionsProducerAndConsumer {public static void main(String[] args) {KafkaProducer<String, String> producer = buildKafkaProducer();KafkaConsumer<String, String> consumer = buildKafkaConsumer("g1");//初始化事务producer.initTransactions();//消费数据consumer.subscribe(Arrays.asList("topic01"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));if (!consumerRecords.isEmpty()) {Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();Iterator<ConsumerRecord<String, String>> consumerRecordIterator = consumerRecords.iterator();//开启事务处理producer.beginTransaction();try {//迭代数据业务处理while(consumerRecordIterator.hasNext()){ConsumerRecord<String, String> record = consumerRecordIterator.next();//存储元数据offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset()+1));ProducerRecord<String,String> pRecord = new ProducerRecord<String, String>("topic02",record.key(),record.value()+"shenyang online");producer.send(pRecord);}//事务提交producer.sendOffsetsToTransaction(offsets, "g1");//提交消费者得偏移量producer.commitTransaction();} catch (Exception e) {System.out.println("出现错误~ " + e.getMessage());//终止事务producer.abortTransaction();}}}}public static KafkaProducer<String, String> buildKafkaProducer() {//1.创建KafkaProducerProperties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "SHENYANG:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//必须配置事务idprops.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-id" + UUID.randomUUID().toString());//配置kafka批处理大小 默认在缓存池中 大小,时间props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024);//等待5ms如果batch数据不足1024props.put(ProducerConfig.LINGER_MS_CONFIG, 5);//配置Kafka的重试机制和幂等性props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 20000);return new KafkaProducer<>(props);}public static KafkaConsumer<String, String> buildKafkaConsumer(String groupId) {//1.创建KafkaProducerProperties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "SHENYANG:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//设置消费者事务的隔离级别 read_un commitprops.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");//事务 必须关闭消费者端的自动提交(业务完成后提交)props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);return consumer;}
}

3、测试需要的三个消费者案例属性

1:

package org.example.transactions;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;/*** @author shenyang* @version 1.0* @info TestKAFKA*/
public class KafkaConsumerTransactionReadCommitted01 {public static void main(String[] args) {//1.创建KafkaProducerProperties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "SHENYANG:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "g1");//设置消费者事务的隔离级别 read_un commitprops.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//订阅相关的Topic
//        consumer.subscribe(Pattern.compile("^topic.*"));consumer.subscribe(Arrays.asList("topic01"));//遍历消息队列while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));if (!consumerRecords.isEmpty()) {//从队列中取出数据Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()) {//获取一个消费消息ConsumerRecord<String, String> record = recordIterator.next();String topic = record.topic();int partition = record.partition();long offset = record.offset();String key = record.key();String value = record.value();long timestamp = record.timestamp();System.out.println(topic + "\t" +partition + "\t" +offset + "\t" +key + "\t" +value + "\t" +timestamp);}}}}
}

2:

package org.example.transactions;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;/*** @author shenyang* @version 1.0* @info TestKAFKA*/
public class KafkaConsumerTransactionReadCommitted02 {public static void main(String[] args) {//1.创建KafkaProducerProperties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "SHENYANG:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "g1");//设置消费者事务的隔离级别 read_un commitprops.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//订阅相关的Topic
//        consumer.subscribe(Pattern.compile("^topic.*"));consumer.subscribe(Arrays.asList("topic02"));//遍历消息队列while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));if (!consumerRecords.isEmpty()) {//从队列中取出数据Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()) {//获取一个消费消息ConsumerRecord<String, String> record = recordIterator.next();String topic = record.topic();int partition = record.partition();long offset = record.offset();String key = record.key();String value = record.value();long timestamp = record.timestamp();System.out.println(topic + "\t" +partition + "\t" +offset + "\t" +key + "\t" +value + "\t" +timestamp);}}}}
}

读取未提交数据的测试消费者:

package org.example.transactions;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;/*** @author shenyang* @version 1.0* @info TestKAFKA*/
public class KafkaConsumerTransactionReadUnCommitted {public static void main(String[] args) {//1.创建KafkaProducerProperties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "SHENYANG:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "g2");//设置消费者事务的隔离级别 read_un commitprops.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_uncommitted");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//订阅相关的Topic
//        consumer.subscribe(Pattern.compile("^topic.*"));consumer.subscribe(Arrays.asList("topic01"));//遍历消息队列while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));if (!consumerRecords.isEmpty()) {//从队列中取出数据Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()) {//获取一个消费消息ConsumerRecord<String, String> record = recordIterator.next();String topic = record.topic();int partition = record.partition();long offset = record.offset();String key = record.key();String value = record.value();long timestamp = record.timestamp();System.out.println(topic + "\t" +partition + "\t" +offset + "\t" +key + "\t" +value + "\t" +timestamp);}}}}
}

3、Spring Boot集成Kafka

相关文档:https://docs.spring.io/spring-boot/reference/messaging/kafka.html#messaging.kafka,https://spring.io/projects/spring-kafka——kafka client对应版本。

引入kafka依赖:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.2.0</version> <!-- 使用你需要的版本 -->
</dependency>

spring 的配置文件:

spring:kafka:bootstrap-servers: CentOSA:9092,CentOSB:9092,CentOSC:9092 #kafka服务节点consumer:auto-commit-interval: 100 # offset 自动提交间隔auto-offset-reset: earliest # 首次加载record策略enable-auto-commit: true # 启动offset自动提交group-id: group1 #组idkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer # key的序列化器properties:isolation:level: read_committed #消费者读取数据的隔离级别value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # value的序列化器producer:acks: all # 生产者确认机制batch-size: 16384 # 批处理大小buffer-memory: 33554432 # 缓冲区大小key-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器properties:enable:idempotence: true #是否开启幂等 retries: 5 #重试次数transaction-id-prefix: transaction-id- #事务idvalue-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器

使用注解的形式监听并转发消息:

import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.KafkaListeners;
import org.springframework.messaging.handler.annotation.SendTo;import java.io.IOException;
import java.util.Arrays;
import java.util.stream.Collectors;@SpringBootApplication
@EnableKafkaStreams
@EnableKafka
public class KafkaSpringBootApplication {public static void main(String[] args) throws IOException {SpringApplication.run(KafkaSpringBootApplication.class,args);System.in.read();}/*** 从topic01中消费数据后加上后缀发送到topic02中去* @param cr 消费记录* @return record*/@KafkaListeners(value = {@KafkaListener(topics = {"topic01"})})@SendTo(value = {"topic02"})public String listener(ConsumerRecord<?, ?> cr) {return cr.value()+" shenyang test";}@Beanpublic KStream<String, String> kStream(StreamsBuilder kStreamBuilder) {KStream<String, String> stream = kStreamBuilder.stream("topic02",Consumed.with(Serdes.String(),Serdes.String()));stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String s) {return Arrays.stream(s.split(" ")).collect(Collectors.toList());}}).selectKey((k,v)->v).groupByKey(Serialized.with(Serdes.String(),Serdes.String())).count(Materialized.<String,Long, KeyValueStore<Bytes, byte[]>>as("wordcount")).toStream().print(Printed.toSysOut());return stream;}}

使用 KafkaTemplate 发送消息:

import com.example.KafkaSpringBootApplication;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes = {KafkaSpringBootApplication.class})
@RunWith(SpringRunner.class)
public class KafkaTemplateTests {@Autowiredprivate KafkaTemplate kafkaTemplate;@Test//非事务下执行public void testOrderService(){//开启事务后只能在事务环境中执行了,再使用send将会报错!kafkaTemplate.send(new ProducerRecord("topic04","001","shenyang lin"));}@Test//事务下执行public void testKafkaTemplate(){kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {@Overridepublic Object doInOperations(KafkaOperations kafkaOperations) {return kafkaOperations.send(new ProducerRecord("topic01","002","this is a demo"));}});}}

在开启事务时,还可以通过spring的事务去执行send方法,可以调用MessageServicesendMessage方法去发送消息:

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Transactional
@Service
public class MessageService {@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String id,Object message) {kafkaTemplate.send(new ProducerRecord("topic04",id,message));}
}

4、总结

了解API的使用帮助我们更好的在开发中使用kafka。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/422768.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

常用环境部署(二十)——docker部署OpenProject

一、安装Docker及Docker-compose https://blog.csdn.net/wd520521/article/details/112609796 二、docker拉取OpenProject镜像 1、拉取镜像 docker pull openproject/openproject:14 注意&#xff1a; 拉取镜像的时候会有超时的现象出现&#xff0c;大家重新拉取几次就行…

JavaWeb开发中为什么Controller里面的方法是@RequestMapping?

在Java Web开发中&#xff0c;尤其是在使用Spring MVC框架时&#xff0c;RequestMapping注解被广泛应用于Controller层的方法上&#xff0c;这是因为RequestMapping是Spring MVC提供的一个核心注解&#xff0c;用于将HTTP请求映射到相应的处理器类或处理器方法上。通过这种方式…

AWTK HTML View 控件更新

AWTK HTML View 控件基于 Lite HTML 实现&#xff0c;从最初的版本开始&#xff0c;3 年多过去了&#xff0c;Lite HTML 做了大量的更新&#xff0c;最近抽空将 AWTK HTML View 控件适配到最新版本的 Lite HTML&#xff0c;欢迎大家使用。 AWTK HTML View 控件。HTML View 控件…

【数据结构(初阶)】——二叉树

【数据结构】——二叉树 文章目录 【数据结构】——二叉树前言1. 树的概念及结构1.1 树的概念1.2 树的结构 2. 二叉树的概念及结构2.1 二叉树的概念2.2 二叉树的结构2.3 二叉树的性质 3. 二叉树顺序结构及概念3.1 二叉树的顺序结构3.2 堆的概念及结构3.3 堆的实现3.3.1 堆的基本…

OpenAI 的 o1 大模型在数学和编码方面有了几乎 10 倍的能力提升!

你有没有想过,有一天人工智能可以在数学和编程这两个领域里,真正成为人类的“得力助手”,甚至是超越我们?最近,OpenAI 发布的 o1大模型在这方面取得了几乎 10 倍的能力提升。10 倍!你没有看错。这样的进步让人不禁怀疑:AI 真的能做到“秒懂”数学和编程吗?今天,我们就…

远程访问NAS速度慢??那是因为你没用对。。。

虽然局域网&#xff08;内网&#xff09;、公网&#xff08;外网&#xff09;经常被提到&#xff0c;但很多人依旧搞不懂分不清楚。。。 其实&#xff0c;简单的方法就是把局域网IP比喻成公司的内部通讯&#xff0c;公网IP看作公共通讯平台。 这样拥有公网IP能被直接远程访问&…

redis内存清理和linux系统清理缓存以及redis启动

1清空所有数据库 redis-cli FLUSHALL 2清空所有数据库redis-cli FLUSHDB 3. 删除指定的缓存键 redis-cli DEL <key>4. 设置键过期 redis-cli EXPIRE <key> <seconds>例如&#xff1a; redis-cli EXPIRE mykey 605.启动redis 这个启动命令要在/usr/loca…

【Canvas与密铺】90年代马赛克密铺效果 1920x1080

【成图】 【代码】 <!DOCTYPE html> <html lang"utf-8"> <meta http-equiv"Content-Type" content"text/html; charsetutf-8"/> <head><title>20世纪90年代马赛克瓷砖效果1920x1080</title><style type&…

MySQL:bin log

redo log 它是物理日志&#xff0c;记录内容是“在某个数据页上做了什么修改”&#xff0c;属于 InnoDB 存储引擎。 而 binlog 是逻辑日志&#xff0c;记录内容是语句的原始逻辑&#xff0c;类似于“给 ID2 这一行的 c 字段加 1”&#xff0c;属于MySQL Server 层。 不管用什…

如何处理DDOS攻击问题

随着信息技术的飞速发展&#xff0c;网络已成为现代社会不可或缺的一部分&#xff0c;极大地便利了个人社交和商业活动。然而&#xff0c;网络空间在创造无限机遇的同时&#xff0c;也潜藏着诸多威胁&#xff0c;其中分布式拒绝服务攻击&#xff08;DDoS&#xff0c;Distribute…

全球工业经济系统极端降水暴露数据集(2010年、2016-2035年和2046-2065年)

全球工业经济系统极端降水暴露数据集 数据介绍 1. 数据的时间覆盖范围&#xff1a; 数据收集时期为2010年、2016-2035年和2046-2065年。 2. 空间覆盖和投影&#xff1a; 空间覆盖范围&#xff1a;全球 经度&#xff1a;-180 - 180 纬度&#xff1a;-90 - 90 投影&#x…

qemu和libvirt的配置对比

libvirt的很多配置选项其实是调用了qemu的接口&#xff0c;但也有增加和优化的地方&#xff0c;本文主要总结这些配置选项&#xff0c;当个手册来查询。 按照centos停服前最后一版centos-8.5.2111提供的rpm查看http://mirrors.aliyun.com/centos/8.5.2111/AppStream/aarch64/o…

【JUC】16-Java对象内存布局和对象头

1. 对象的内存布局 在HotSpot虚拟机里&#xff0c;对象在堆内存中的存储布局可以分为三个部分&#xff1a;对象头、实例数据和对齐填充。 对象头&#xff1a;由对象标记和类型指针。

[数据集][目标检测]烟叶病害检测数据集VOC+YOLO格式612张3类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;612 标注数量(xml文件个数)&#xff1a;612 标注数量(txt文件个数)&#xff1a;612 标注类别…

精心整理|算法备案大模型备案最新数据汇总

根据《互联网信息服务算法推荐管理规定》第二条 在中华人民共和国境内应用算法推荐技术提供互联网信息服务&#xff08;以下简称算法推荐服务&#xff09;&#xff0c;适用本规定。法律、行政法规另有规定的&#xff0c;依照其规定。前款所称应用算法推荐技术&#xff0c;是指利…

Excel数据转置|Excel数据旋转90°

Excel数据转置|Excel数据旋转90 将需要转置的数据复制在旁边空格处点击鼠标右键&#xff0c;选择图中转置按钮&#xff0c;即可完成数据的转置。&#xff01;&#xff01;&#xff01;&#xff01;非常有用啊啊啊&#xff01;&#xff01;&#xff01;

【数据结构-一维差分】力扣1854. 人口最多的年份

给你一个二维整数数组 logs &#xff0c;其中每个 logs[i] [birthi, deathi] 表示第 i 个人的出生和死亡年份。 年份 x 的 人口 定义为这一年期间活着的人的数目。第 i 个人被计入年份 x 的人口需要满足&#xff1a;x 在闭区间 [birthi, deathi - 1] 内。注意&#xff0c;人不…

【C++登堂入室】类和对象(中)——类的6个默认成员函数

目录 一、类的6个默认成员函数 ​编辑二、构造函数 2.1 概念 2.2 特性 三、析构函数 3.1 概念 3.2 特性 四、拷贝构造函数 4.1 概念 4.2 特征 五、赋值运算符重载 5.1 运算符重载 5.2 赋值运算符重载 5.3 前置和后置重载 六、日期类的实现 七、const成员 八、…

解锁企业潜能,Vatee万腾平台引领智能新纪元

在数字化转型的浪潮中&#xff0c;企业正站在一个前所未有的十字路口&#xff0c;面对着前所未有的机遇与挑战。解锁企业内在潜能&#xff0c;实现跨越式发展&#xff0c;已成为众多企业的共同追求。而Vatee万腾平台&#xff0c;作为智能科技的先锋&#xff0c;正以其强大的智能…

随机分类,保持均衡水平Python

1、目的&#xff1a; 10000个样本有4个指标&#xff0c;按照逾期金额分10类&#xff0c;确保每类别逾期金额均衡。 2、数据&#xff1a; 3、思路&#xff1a; 将10000个样本按照逾期金额排序&#xff0c; 等距分箱为2500个类别 增加一列随机数 根据类别和随机数升序排列 增加…