一文上手Kafka【中】

一、发送消息细节

在发送消息的特别注意: 在版本 3.0 中,以前返回 ListenableFuture 的方法已更改为返回 CompletableFuture。为了便于迁移,2.9 版本添加了一个方法 usingCompletableFuture(),该方法为 CompletableFuture 返回类型提供了相同的方法;此方法不再可用。

1.1 ProducerConfig

在spring kafka项目当中,提供了Kafka 生产者的相关配置.在类ProducerConfig当中,其值分别定义在不同的常量类当中. 结合上篇当中发送消息的时候控制台输出的日志,具体字段含义如下所示:

通用配置:

  • acks = 1:生产者要求领导者在确认消息写入之前收到的最少同步副本数量。设置为 1
    表示领导者成功写入消息后即确认,不等待副本同步完成。
  • auto.include.jmx.reporter = true:自动包含用于Java Management Extensions(JMX)的报告器,以便通过 JMX 监控生产者的指标。
  • batch.size =16384:当多个消息发送到同一分区时,生产者将尝试在单个请求中发送消息的批量大小(以字节为单位)。
  • bootstrap.servers =[ip:9092]:Kafka 集群的服务器地址列表,用于建立初始连接。
  • buffer.memory =33554432:生产者可以用来缓冲等待发送到服务器的记录的总内存大小(以字节为单位)。
  • client.id =rj-spring-kafka-demo-producer-1:生产者的客户端 ID,用于在 Kafka 服务器端标识此生产者。
  • compression.type = none:消息的压缩类型,可以是 none、gzip、snappy 等。
  • connections.max.idle.ms = 540000:在关闭不活动的连接之前,连接可以保持空闲的最长时间(以毫秒为单位)。
  • delivery.timeout.ms = 120000:消息发送的超时时间,包括所有可能的重试。
  • enable.idempotence =false:是否启用生产者的幂等性,确保在重试时不会产生重复的消息。 >
  • enable.metrics.push =true:是否启用推送生产者的指标数据到外部系统。
  • interceptor.classes =[]:生产者拦截器的类列表,用于在发送消息之前或之后执行自定义逻辑。
  • key.serializer = class.org.apache.kafka.common.serialization.StringSerializer:用于序列化消息键的类。
  • linger.ms = 0:生产者在发送批次之前等待的额外时间(以毫秒为单位),以允许更多消息积累在批次中。
  • max.block.ms =60000:生产者在发送消息或获取元数据等操作中阻塞的最长时间。
  • max.in.flight.requests.per.connection = 5:在单个连接上允许的最大未确认请求数量。
  • max.request.size = 1048576:生产者请求的最大大小(以字节为单位)。
  • metadata.max.age.ms = 300000:元数据(如主题分区信息)的过期时间(以毫秒为单位),之后将强制刷新元数据。
  • metadata.max.idle.ms = 300000:元数据在没有任何更新的情况下保持有效的最长时间。
  • metric.reporters = []:自定义的指标报告器类列表。 metrics.num.samples =2:用于计算指标的样本数量。
  • metrics.recording.level = INFO:指标记录的级别,例如 INFO、DEBUG 等。
  • metrics.sample.window.ms = 30000:用于计算指标的时间窗口大小(以毫秒为单位)。
  • partitioner.adaptive.partitioning.enable =true:是否启用自适应分区功能,根据负载动态调整分区分配。
  • partitioner.availability.timeout.ms =0:分区器在确定分区不可用时等待的时间(以毫秒为单位)。 >
  • partitioner.class =null:自定义分区器的类,如果未设置则使用默认分区器。
  • partitioner.ignore.keys = false:是否忽略消息的键,不基于键进行分区。
  • receive.buffer.bytes = 32768:套接字接收缓冲区的大小(以字节为单位)。
  • reconnect.backoff.max.ms =1000:重新连接的最大退避时间(以毫秒为单位)。
  • reconnect.backoff.ms =50:重新连接的初始退避时间(以毫秒为单位)。
  • request.timeout.ms =30000:生产者请求的超时时间,包括发送请求和接收响应的时间。
  • retries = 3:生产者在发送消息失败时的重试次数。
  • retry.backoff.max.ms = 1000:重试之间的最大退避时间(以毫秒为单位)。
  • retry.backoff.ms =100:重试之间的初始退避时间(以毫秒为单位)。

SASL 相关配置(用于安全认证):

  • sasl.client.callback.handler.class = null:SASL 客户端回调处理程序的类。
  • sasl.jaas.config = null:Java Authentication and Authorization Service(JAAS)配置,用于 SASL 认证。
  • sasl.kerberos.kinit.cmd = /usr/bin/kinit:Kerberos 的 kinit 命令路径。
  • sasl.kerberos.min.time.before.relogin = 60000:Kerberos 重新登录之前的最小时间(以毫秒为单位)。
  • sasl.kerberos.service.name = null:Kerberos 服务名称。
  • sasl.kerberos.ticket.renew.jitter = 0.05:Kerberos 票证更新的抖动因子。
  • sasl.kerberos.ticket.renew.window.factor = 0.8:Kerberos 票证更新的窗口因子。
  • sasl.login.callback.handler.class = null:SASL 登录回调处理程序的类。
  • sasl.login.class = null:SASL 登录机制的类。
  • sasl.login.connect.timeout.ms = null:SASL 登录连接超时时间(以毫秒为单位)。
  • sasl.login.read.timeout.ms = null:SASL 登录读取超时时间(以毫秒为单位)。
  • sasl.login.refresh.buffer.seconds = 300:SASL 登录刷新缓冲区时间(以秒为单位)。
  • sasl.login.refresh.min.period.seconds = 60:SASL 登录刷新的最小周期(以秒为单位)。
  • sasl.login.refresh.window.factor = 0.8:SASL 登录刷新的窗口因子。
  • sasl.login.refresh.window.jitter = 0.05:SASL 登录刷新的抖动因子。
  • sasl.login.retry.backoff.max.ms = 10000:SASL 登录重试的最大退避时间(以毫秒为单位)。
  • sasl.login.retry.backoff.ms = 100:SASL 登录重试的初始退避时间(以毫秒为单位)。
  • sasl.mechanism = GSSAPI:SASL 认证机制,如 GSSAPI、PLAIN 等。
  • sasl.oauthbearer.clock.skew.seconds = 30:OAuth Bearer 令牌的时钟偏差时间(以秒为单位)。
  • sasl.oauthbearer.expected.audience = null:预期的 OAuth Bearer 令牌受众。
  • sasl.oauthbearer.expected.issuer = null:预期的 OAuth Bearer 令牌发行者。
  • sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000:OAuth Bearer JWKS 端点的刷新时间(以毫秒为单位)。
  • sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000:OAuth Bearer JWKS 端点重试的最大退避时间(以毫秒为单位)。
  • sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100:OAuth Bearer JWKS 端点重试的初始退避时间(以毫秒为单位)。
  • sasl.oauthbearer.jwks.endpoint.url = null:OAuth Bearer JWKS 端点的 URL。
  • sasl.oauthbearer.scope.claim.name = scope:OAuth Bearer 令牌范围声明的名称。
  • sasl.oauthbearer.sub.claim.name = sub:OAuth Bearer 令牌主题声明的名称。
  • sasl.oauthbearer.token.endpoint.url = null:OAuth Bearer 令牌端点的 URL。

安全协议相关配置:

  • security.protocol = PLAINTEXT:生产者使用的安全协议,如 PLAINTEXT、SSL、SASL_PLAINTEXT 等。
  • security.providers = null:安全提供程序的类列表。

网络相关配置:

  • send.buffer.bytes = 131072:套接字发送缓冲区的大小(以字节为单位)。
  • socket.connection.setup.timeout.max.ms = 30000:套接字连接设置的最大超时时间(以毫秒为单位)。
  • socket.connection.setup.timeout.ms = 10000:套接字连接设置的初始超时时间(以毫秒为单位)。

SSL 相关配置(用于加密连接):

  • ssl.cipher.suites = null:SSL 加密套件列表。
  • ssl.enabled.protocols = [TLSv1.2, TLSv1.3]:启用的 SSL 协议版本列表。
  • ssl.endpoint.identification.algorithm = https:SSL 端点标识算法。
  • ssl.engine.factory.class = null:SSL 引擎工厂的类。
  • ssl.key.password = null:SSL 密钥密码。
  • ssl.keymanager.algorithm = SunX509:SSL 密钥管理器算法。
  • ssl.keystore.certificate.chain = null:SSL 密钥库证书链。
  • ssl.keystore.key = null:SSL 密钥库的密钥。
  • ssl.keystore.location = null:SSL 密钥库的位置。
  • ssl.keystore.password = null:SSL 密钥库的密码。
  • ssl.keystore.type = JKS:SSL 密钥库的类型。
  • ssl.protocol = TLSv1.3:SSL 协议版本。
  • ssl.provider = null:SSL 提供程序。
  • ssl.secure.random.implementation = null:SSL 安全随机数生成器的实现。
  • ssl.trustmanager.algorithm = PKIX:SSL 信任管理器算法。
  • ssl.truststore.certificates = null:SSL 信任库证书。
  • ssl.truststore.location = null:SSL 信任库的位置。
  • ssl.truststore.password = null:SSL 信任库的密码。
  • ssl.truststore.type = JKS:SSL 信任库的类型。

事务相关配置:

  • transaction.timeout.ms = 60000:事务的超时时间(以毫秒为单位)。
  • transactional.id = null:事务 ID,用于标识一个事务性生产者。

序列化相关配置:

  • value.serializer = class org.apache.kafka.common.serialization.StringSerializer:用于序列化消息值的类。

1.2 sendDefault

CompletableFuture<SendResult<K, V>> sendDefault(V data);

该api要求向模板提供的默认主题发送消息.要使用该模板,您可以配置生产者工厂并在模板的构造函数中提供它.

@Configuration
public class KafkaConfig {@Beanpublic Map<String, Object> producerConfig(){Map<String, Object> map = new HashMap<>();map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:9092");map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);map.put(ProducerConfig.RETRIES_CONFIG, "3");return map;}@Beanpublic ProducerFactory<Integer, Object> producerFactory(){return new DefaultKafkaProducerFactory<>(producerConfig());}@Beanpublic KafkaTemplate<Integer,Object> kafkaTemplate(){KafkaTemplate<Integer, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());// 设置默认主题kafkaTemplate.setDefaultTopic("rj-default-topic");return kafkaTemplate;}
}

此时发送的时候,可以不必指定主题了,而直接将消息发送到我们自己定义的默认的主题当中了

    @GetMapping("/default")public String sendDefaultMsg(String msg) throws ExecutionException, InterruptedException {CompletableFuture<SendResult<Integer, Object>> completableFuture = kafkaTemplate.sendDefault(msg);SendResult<Integer, Object> sendResult = completableFuture.get();log.info("sendResult:{}", sendResult);return "向默认主题发送消息";}

从版本 2.5 开始,您现在可以覆盖工厂的 ProducerConfig 属性,以从同一工厂创建具有不同生产者配置的模板。

    @Beanpublic KafkaTemplate<Integer,Object> kafkaTemplate(){KafkaTemplate<Integer, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());// 设置默认主题kafkaTemplate.setDefaultTopic("rj-default-topic");return kafkaTemplate;}/*** 从同一个工厂创建具有不同生产者的配置的模块* @param producerFactory* @return*/@Beanpublic KafkaTemplate<String, String> stringKafkaTemplate(ProducerFactory<String, String> producerFactory){return new KafkaTemplate<>(producerFactory, Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));}// 如果要重用ProducerFactory,则必须修改一下ProducerFactory的初始的泛型,修改为如下的格式@Beanpublic ProducerFactory<?, ?> producerFactory(){return new DefaultKafkaProducerFactory<>(producerConfig());}

当然以上的ProducerFactory相关配置的属性,也可以在application.yml配置文件当中进行配置.

1.3 Message接口

在使用KafkaTemplate发送数据的时候,可以直接发送一个Message.方法定义如下所示:

	@Overridepublic CompletableFuture<SendResult<K, V>> send(Message<?> message) {ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);if (!producerRecord.headers().iterator().hasNext()) { // possibly no Jacksonbyte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);if (correlationId != null) {producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);}}return observeSend((ProducerRecord<K, V>) producerRecord);}

这里需要注意, Message是在org.springframework.messaging包当中定义的,定义接口如下所示:

// 带有 headers 和 body 的通用消息表示形式。
public interface Message<T> {/*** Return the message payload.* 消息体*/T getPayload();/*** Return message headers for the message (never {@code null} but may be empty).* 消息头,可以在发送的时候指定*/MessageHeaders getHeaders();}

继承关系
这里我们使用实现类GenericMessage即可:

// 推荐使用这个构造方法,简单方便
public GenericMessage(T payload, Map<String, Object> headers) {this(payload, new MessageHeaders(headers));
}public GenericMessage(T payload, MessageHeaders headers) {Assert.notNull(payload, "Payload must not be null");Assert.notNull(headers, "MessageHeaders must not be null");this.payload = payload;this.headers = headers;
}

测试代码:

 @GetMapping("/message")public String sendMessage(){Map<String, Object> map = new HashMap<>();// 向 Kafka 发送数据时包含主题的headermap.put(KafkaHeaders.TOPIC, Constants.Kafka.TOPIC_NAME);map.put(KafkaHeaders.KEY, "rj");// 包含从中接收消息的主题的header。map.put(KafkaHeaders.RECEIVED_TOPIC, Constants.Kafka.TOPIC_NAME);// 创建MessageHeaders对象MessageHeaders messageHeaders = new MessageHeaders(map);// 构建Message对象Message<String> message = new GenericMessage<>("hello, message!", messageHeaders);// 将Message发送到指定的topicCompletableFuture<SendResult<Integer, Object>> completableFuture = kafkaTemplate.send(message);completableFuture.whenComplete((result, ex) -> {if (ex == null) {System.out.println("发送成功");} else {System.out.println("发送失败");}});return "发送成功!";}

上述Map集合的key直接使用定义好的即可: KafkaHeaders, 用的时候,需要啥就添加啥.
header定义
注意事项:

  • 使用的KafkaTemplate发送消息的时候,要注意泛型匹配的问题.这里步及到key、value的序列化与反序列化操作.
  • 如果重用了ProducerFactory则需要注意使用的泛型和发送消息的类型是否能匹配得上.

如上所示: 我们使用的的是private final KafkaTemplate<String, String> kafkaTemplate,我们通过配置文件,注入到容器的类型是:

  @Bean
public KafkaTemplate<String, String> stringKafkaTemplate(ProducerFactory<String, String> producerFactory){return new KafkaTemplate<>(producerFactory,// 【注意】: 这里重新设置了value的序列化,而对于key的序列化是在构建ProducerFactory的时候,传入的. Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
}

而注入到容器当中ProducerFactory对象,在构建的时候,分别设置了key、value的序列化规则.
序列化
以上的泛型必须得能匹配上,或者可以直接转换,否则会抛出异常.

1.4 ProducerListener

使用ProducerListener配置KafkaTemplate,以获取带有发送结果(成功或失败)的异步回调,而不是等待Future完成。下面的清单显示了ProducerListener接口的定义:

public interface ProducerListener<K, V> {// 发送成功void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);// 发送失败void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,Exception exception);
}

默认情况下,模板配置了 LoggingProducerListener,它会记录错误,并且在发送成功时不执行任何操作。

@Slf4j
public class CustomProducerListener implements ProducerListener<String, String> {@Overridepublic void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {String topic = producerRecord.topic();String key = producerRecord.key();String value = producerRecord.value();Long timestamp = producerRecord.timestamp();int partition = recordMetadata.partition();long offset = recordMetadata.offset();log.info("消息发送成功,topic:{},key:{},value:{},timestamp:{},partition:{},offset:{}", topic, key, value, timestamp, partition, offset);}@Overridepublic void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {String topic = producerRecord.topic();String key = producerRecord.key();String value = producerRecord.value();Long timestamp = producerRecord.timestamp();int partition = recordMetadata.partition();long offset = recordMetadata.offset();log.error("消息发送失败,topic:{},key:{},value:{},timestamp:{},partition:{},offset:{}, exception message: {}", topic, key, value, timestamp, partition, offset, exception.getMessage());}
}

将自定义ProducerListener的对象,配置到KafkaTemplate当中

/*** 从同一个工厂创建具有不同生产者的配置的模块* @param producerFactory* @return*/
@Bean
public KafkaTemplate<String, String> stringKafkaTemplate(ProducerFactory<String, String> producerFactory){KafkaTemplate<String, String> stringStringKafkaTemplate = new KafkaTemplate<>(producerFactory, Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));// 配置ProducerListenerstringStringKafkaTemplate.setProducerListener(new CustomProducerListener());// 配置默认主题stringStringKafkaTemplate.setDefaultTopic("rj-string-topic");return stringStringKafkaTemplate;
}

发送消息:

@GetMapping("/listener")
public String sendMsgProducerListener(String msg) {CompletableFuture<SendResult<String, String>> completableFuture = stringKafkaTemplate.sendDefault(msg);completableFuture.whenComplete((result, ex) -> {if (ex == null) {System.out.println("发送成功");} else {System.out.println("发送失败");}});return "发送成功!";
}

控制台日志输出如下所示:

2024-09-24T19:20:07.061+08:00  INFO 16436 --- [rj-spring-kafka-demo] [demo-producer-1] c.r.k.listener.CustomProducerListener    : 消息发送成功,topic:rj-string-topic,key:null,value:listener,timestamp:null,partition:0,offset:0

1.5 发送结果监听CompletableFuture

发送消息的send 方法返回 CompletableFuture<SendResult>。您可以向侦听器注册回调,以异步接收发送的结果.

CompletableFuture<SendResult<Integer, String>> future = template.send("topic-name", "msg data");
future.whenComplete((result, ex) -> {...
});

如果你希望阻塞发送线程等待结果,你可以调用 future 的 get() 方法;建议使用带有 timeout 的方法。如果你已经设置了 linger.ms,你可能希望在等待之前调用 flush(),或者为了方便起见,模板有一个带有 autoFlush 参数的构造函数,该参数会导致模板在每次发送时 flush()

  • 在使用 Kafka 生产者发送消息时,通常会得到一个表示发送任务的Future对象。如果调用future.get()方法,发送线程会被阻塞,直到发送结果返回。这意味着发送线程会暂停执行,等待消息成功发送到 Kafka 集群并获取结果。然而,直接使用get()方法可能会导致线程长时间阻塞,在实际应用中可能不太理想,所以建议使用带有超时参数的get(long timeout, TimeUnit unit)方法,这样可以在一定时间后如果还未获取到结果就不再等待,避免无限期阻塞。
  • linger.ms和flush
    • linger.ms属性
      • 当设置了linger.ms生产者属性时,生产者会在发送消息时等待一段时间,让更多的消息积累在一个批次中,以提高发送效率。如果在这段时间内积累了足够多的消息,生产者会将这些消息作为一个批次发送出去。
    • flush()方法
      • 如果希望立即发送部分批处理的消息而不是等待linger.ms指定的时间,可以调用flush()方法。这个方法会强制生产者立即发送当前缓冲区内的消息,而不管是否满足批次大小或等待时间的条件。
    • 带有autoFlush参数的构造函数
      • 为了方便起见,KafkaTemplate有一个带有autoFlush参数的构造函数。当设置autoFlushtrue时,每次发送消息后,模板会自动调用flush()方法,确保消息立即发送出去。这在需要立即确认消息发送的场景中非常有用,但可能会降低发送效率,因为每次发送都不会等待批次积累。

在使用 Kafka 生产者时,需要根据实际需求合理选择是否阻塞发送线程等待结果,以及是否使用flush()方法或带有autoFlush参数的构造函数来控制消息的发送时机。如果设置了linger.ms属性,并且需要在特定情况下立即发送部分批处理的消息,可以考虑调用flush()方法或使用带有autoFlush参数的构造函数。

linger.ms属性可以在配置文件当中进行配置.

@Bean
public Map<String, Object> producerConfig(){// ...// 配置linger.msmap.put(ProducerConfig.LINGER_MS_CONFIG, "500");return map;
}

SendResult 有两个属性:ProducerRecordRecordMetadata

public class SendResult<K, V> {// ProducerRecord是生产者发送消息时使用的数据结构private final ProducerRecord<K, V> producerRecord;// 当生产者成功发送消息后,会返回一个RecordMetadata对象private final RecordMetadata recordMetadata;public SendResult(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {this.producerRecord = producerRecord;this.recordMetadata = recordMetadata;}public ProducerRecord<K, V> getProducerRecord() {return this.producerRecord;}public RecordMetadata getRecordMetadata() {return this.recordMetadata;}@Overridepublic String toString() {return "SendResult [producerRecord=" + this.producerRecord + ", recordMetadata=" + this.recordMetadata + "]";}}

1.5.1 ProducerRecord

  • 主题(Topic)

    • 确定消息的归属主题。Kafka 中的不同主题用于区分不同类型的消息流。例如,一个电商系统可能有 “订单主题”、“用户行为主题” 等。

    • 生产者通过指定主题将消息发送到 Kafka 集群中的相应主题

  • 分区(Partition)

    • 分区的目的是为了实现可扩展性和并行处理。Kafka 将一个主题的数据分布在多个分区上,不同的分区可以由不同的消费者或消费者组同时消费。

    • 可以手动指定消息发送到特定分区,通常根据消息的键或者特定的业务规则来决定分区。如果不指定,Kafka 会根据默认的分区策略来分配分区。

  • 键(Key)

    • 键在消息处理中有多种用途。一方面,它可以用于确定消息的分区。例如,基于键的哈希值来决定消息发送到哪个分区,这样可以确保具有相同键的消息被发送到同一个分区,方便后续的有序处理。

    • 键也可以在消费者端用于消息的分组和聚合。例如,在处理订单数据时,可以根据订单 ID 作为键,将同一订单的不同状态更新消息发送到同一个分区,方便消费者对同一订单的消息进行有序处理。

  • 值(Value)

    • 这是消息的实际内容,可以是任何可序列化的对象。例如,在一个日志系统中,值可以是一条日志记录;在电商系统中,值可以是一个订单对象或者用户行为事件。
  • 消息头部(Headers)

    • 消息头部提供了一种在消息中添加额外元数据的方式。这些元数据可以用于传递特定的业务信息或者用于消息的路由和处理。

    • 例如,可以在头部添加消息的来源系统、消息的类型、处理优先级等信息

1.5.2 RecordMetadata

  • 主题(Topic)

    • 确认消息最终被发送到的主题,与ProducerRecord中的主题相对应。这可以用于验证消息是否被发送到了正确的主题。
  • 分区(Partition)

    • 指示消息存储在哪个分区。在消费者端,可以根据分区信息来确定从哪个分区读取消息。

    • 对于需要对特定分区进行监控或管理的场景,分区信息非常重要。

  • 偏移量(Offset)

    • 偏移量是消息在分区中的唯一标识,它代表了消息在分区中的位置顺序。每个分区中的消息都有一个连续的偏移量。

    • 消费者通过偏移量来确定已经消费到了哪个位置,以便在下次消费时从正确的位置继续读取消息。

    • 偏移量也可以用于数据恢复和重新处理消息的场景。例如,如果消费者出现故障,在恢复时可以根据存储的偏移量重新开始消费。

  • 时间戳(Timestamp)

    • 时间戳可以由生产者在发送消息时指定,也可以由 Kafka 自动生成。时间戳可以用于基于时间的消息处理和查询。

    • 例如,可以根据时间戳来查询特定时间段内的消息,或者对消息进行时间序列分析

二、接收消息细节

本小节,我们着重介绍一下@KafkaListener这个注解的使用.

2.1 字段解释

@KafkaListener是 Spring Kafka 提供的用于监听 Kafka 主题消息的注解。

字段名称类型说明
idString为监听器指定一个唯一的标识符。如果不指定,会自动生成一个 ID。这个 ID 可以用于在代码中通过KafkaListenerEndpointRegistry来获取特定的监听器容器。此外,如果设置了这个值,并且idIsGroup()false或者同时设置了groupId(),那么这个 ID 将覆盖消费者工厂配置中的groupId属性。支持 SpEL(Spring Expression Language)表达式#{...}和属性占位符${...}
containerFactoryString指定用于创建消息监听器容器的KafkaListenerContainerFactory的 bean 名称。如果不指定,则使用默认的容器工厂(如果存在)。SpEL 表达式可以评估为容器工厂实例或 bean 名称。
topicsString[]指定监听器要监听的一个或多个 Kafka 主题名称。可以是主题名称、属性占位符键或 SpEL 表达式,表达式必须解析为主题名称。使用组管理,Kafka 会将分区分配给组内成员。与topicPattern()topicPartitions()互斥。
topicPatternString使用正则表达式指定要监听的 Kafka 主题模式。可以是主题模式、属性占位符键或 SpEL 表达式,表达式必须解析为主题模式(支持字符串或java.util.regex.Pattern结果类型)。使用组管理,Kafka 会将分区分配给组内成员。与topics()topicPartitions()互斥。
topicPartitionsTopicPartition[]当使用手动主题/分区分配时,指定监听器要监听的主题和分区。与topicPattern()topics()互斥。
containerGroupString如果提供了这个值,监听器容器将被添加到一个以这个值为名称、类型为Collection<MessageListenerContainer>的 bean 中。这允许例如遍历这个集合来启动/停止一部分容器。从版本 2.7.3 开始,这种集合 beans 已被弃用,在 2.8 版本中将被移除。取而代之的是,应该使用名称为containerGroup + ".group"且类型为org.springframework.kafka.listener.ContainerGroup的 bean。支持 SpEL 表达式和属性占位符。
errorHandlerString设置一个KafkaListenerErrorHandler bean 的名称,在监听器方法抛出异常时调用。如果是 SpEL 表达式,可以评估为KafkaListenerErrorHandler实例或 bean 名称。
groupIdString覆盖消费者工厂的group.id属性,仅针对这个监听器。支持 SpEL 表达式和属性占位符。
idIsGroupboolean如果groupId()未提供,当这个值为true时,使用id()(如果提供了)作为消费者的group.id属性;当这个值为false时,使用消费者工厂中的group.id属性。
clientIdPrefixString如果提供了这个值,将覆盖消费者工厂配置中的客户端 ID 属性。对于每个容器实例,会添加一个后缀(‘-n’)以确保在使用并发时的唯一性。支持 SpEL 表达式和属性占位符。
beanRefString一个伪 bean 名称,在这个注解中的 SpEL 表达式中用于引用定义这个监听器的当前 bean。这允许访问封闭 bean 中的属性和方法。默认值为’__listener’。
concurrencyString覆盖容器工厂的concurrency设置,针对这个监听器。可以是属性占位符或 SpEL 表达式,评估为一个Number,然后使用Number#intValue()获取值。
autoStartupString设置为truefalse,以覆盖容器工厂中的默认自动启动设置。可以是属性占位符或 SpEL 表达式,评估为BooleanString,然后使用Boolean#parseBoolean(String)获取值。
propertiesString[]Kafka 消费者属性,它们将覆盖消费者工厂中具有相同名称的任何属性(如果消费者工厂支持属性覆盖)。支持的语法与 JavaProperties文件中的键值对语法相同。group.idclient.id将被忽略。支持 SpEL 表达式和属性占位符。SpEL 表达式必须解析为StringString[]Collection<String>,其中数组或集合的每个成员是一个属性名+值,格式与上述语法相同。
splitIterablesboolean当为false且返回类型是Iterable时,将结果作为单个回复记录的值返回,而不是为每个元素创建单独的记录。默认值为true。如果回复类型是Iterable<Message<?>>,则忽略此设置。
contentTypeConverterString设置一个SmartMessageConverter(如CompositeMessageConverter)的 bean 名称,结合org.springframework.messaging.MessageHeaders#CONTENT_TYPE头来执行转换到所需类型。如果是 SpEL 表达式,可以评估为SmartMessageConverter实例或 bean 名称。
batchString覆盖容器工厂的batchListener属性。监听器方法签名应该接收一个List<?>。如果不设置,将使用容器工厂的设置。不支持 SpEL 和属性占位符,因为监听器类型不能是可变的。
filterString设置一个RecordFilterStrategy bean 名称,以覆盖在容器工厂上配置的策略。如果是 SpEL 表达式,可以评估为RecordFilterStrategy实例或 bean 名称。
infoString静态信息将作为一个头添加,键为org.springframework.kafka.support.KafkaHeaders#LISTENER_INFO。例如,可以在RecordInterceptorRecordFilterStrategy或监听器本身中用于任何目的。支持 SpEL 表达式和属性占位符,但必须解析为Stringbyte[]。如果使用输入记录的头创建出站记录,这个头将被剥离。
containerPostProcessorString设置一个ContainerPostProcessor的 bean 名称,允许在创建和配置监听器容器后对其进行自定义。这个后处理器仅应用于当前监听器容器,与ContainerCustomizer不同,后者应用于所有监听器容器。这个后处理器在容器自定义器(如果存在)之后应用。

2.2 获取Header的值

在监听消息的时候,可以通过Header(headers)的方式读取发送消息设置的headers的值.

 @KafkaListener(groupId = "rj", topics = Constants.Kafka.TOPIC_NAME)public void listen2(String msg, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){log.info("receive msg: {}, topic: {}", msg, topic);}

日志输出:

2024-09-24T20:20:57.501+08:00  INFO 1692 --- [rj-spring-kafka-demo] [ntainer#0-0-C-1] com.rj.kafka.listener.UserListener       : receive msg: hello, message!, topic: rj-topic

2.3 获取ConsumerRecord

ConsumerRecord是消费者从 Kafka 主题中读取消息时所使用的数据结构, 具体包含如下细节:

主题和分区信息

  • topic():
    • 返回消息所属的 Kafka 主题名称。
    • 主题是消息的逻辑分类,用于区分不同类型的消息流。例如,一个电商系统可能有 “订单主题”、“用户行为主题” 等。
    • 通过这个方法可以确定消息来自哪个主题,方便消费者根据不同的主题进行不同的处理逻辑。
  • partition()
    • 返回消息所在的分区编号。
    • Kafka 将一个主题的数据分布在多个分区上,不同的分区可以由不同的消费者或消费者组同时消费,以实现可扩展性和并行处理。
    • 了解消息所在的分区可以用于一些特定的场景,例如在需要对特定分区进行监控或管理时,或者在一些需要根据分区进行数据处理的情况下。

偏移量信息

  • offset()
    • 返回消息在所属分区中的偏移量。
    • 偏移量是消息在分区中的唯一标识,代表消息在分区中的位置顺序。随着新消息的不断写入,偏移量会不断递增。
    • 消费者通过偏移量来确定已经消费到了哪个位置,以便在下次消费时从正确的位置继续读取消息。偏移量也可以用于数据恢复和重新处理消息的场景。例如,如果消费者出现故障,在恢复时可以根据存储的偏移量重新开始消费。

键和值信息

  • key():
    • 返回消息的键。
    • 消息的键可以用于分区策略以及在一些场景下可以帮助消费者进行消息的有序处理。例如,基于键的哈希值来决定消息发送到哪个分区,这样可以确保具有相同键的消息被发送到同一个分区,方便后续的有序处理。
    • 键的类型通常是可序列化的对象,可以根据具体的业务需求来设置和使用。
  • value():
    • 返回消息的实际内容。
    • 这是消费者真正关心的消息数据,可以是任何可序列化的对象。例如,在一个日志系统中,值可以是一条日志记录;在电商系统中,值可以是一个订单对象或者用户行为事件。

时间戳信息

  • timestamp()
    • 返回消息的时间戳。
    • 时间戳可以由生产者在发送消息时指定,也可以由 Kafka 自动生成。时间戳可以用于基于时间的消息处理和查询。
    • 例如,可以根据时间戳来查询特定时间段内的消息,或者对消息进行时间序列分析。

headers

  • headers()
    • 返回一个Headers对象,其中包含了消息的头部信息。
    • 消息头部提供了一种在消息中添加额外元数据的方式。这些元数据可以用于传递特定的业务信息或者用于消息的路由和处理。
    • 例如,可以在头部添加消息的来源系统、消息的类型、处理优先级等信息。消费者可以通过读取头部信息来进行更加灵活的消息处理。

同样的操作,也可以在获取方法的参数内传入ConsumerRecord,从中获取需要的信息.操作如下所示:

   @KafkaListener(groupId = "rj", topics = Constants.Kafka.TOPIC_NAME)public void listen2(ConsumerRecord<String, String> record, String msg, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){log.info("receive record: {}", record);log.info("receive msg: {}, topic: {}", msg, topic);}

日志输出:

2024-09-24T20:34:47.759+08:00  INFO 11296 --- [rj-spring-kafka-demo] [ntainer#0-0-C-1] com.rj.kafka.listener.UserListener       : receive record: ConsumerRecord(topic = rj-topic, partition = 0, leaderEpoch = 0, offset = 10, CreateTime = 1727181287120, serialized key size = 2, serialized value size = 15, headers = RecordHeaders(headers = [], isReadOnly = false), key = rj, value = hello, message!)
2024-09-24T20:34:47.759+08:00  INFO 11296 --- [rj-spring-kafka-demo] [ntainer#0-0-C-1] com.rj.kafka.listener.UserListener       : receive msg: hello, message!, topic: rj-topic

2.4 Acknowledgment

在 Kafka 中,Acknowledgment(确认机制)主要用于确保消息被正确处理和持久化,以保证数据的可靠性。
在 Kafka 生产者发送消息和消费者处理消息的过程中,确认机制起着关键作用。它确保消息在不同阶段的成功处理,从而提高系统的可靠性和数据的完整性。如果没有适当的确认机制,可能会出现消息丢失或重复处理的情况,这在许多关键业务场景中是不可接受的。

生产者端的确认机制

  • acks
    • acks是生产者的一个重要配置参数,用于控制生产者在发送消息后等待多少个副本的确认。
      • 设置acks=0, 表示 生产者在将消息发送到 Kafka 服务器后,不会等待任何确认。这种设置提供了最低的延迟,但也意味着如果在消息发送后但在 Kafka 服务器成功存储之前发生故障,消息可能会丢失。适用于对延迟要求极高且可以容忍消息丢失的场景,例如实时日志收集,其中丢失少量日志不会对系统产生重大影响。
      • 设置acks=1表示只要领导者副本成功接收到消息并写入磁盘,生产者就会认为消息发送成功。这种设置提供了较低的延迟,但如果领导者副本在确认后出现故障,可能会导致消息丢失。如果领导者副本在确认后但在其他副本同步之前出现故障,可能会导致消息丢失。适用于对延迟有一定要求,但也需要一定程度可靠性的场景,例如一些实时数据分析系统,其中少量数据丢失可以通过后续的数据处理进行补偿。
      • 设置acks=all(或acks=-1)表示生产者需要等待所有同步副本都成功接收到消息并写入磁盘后才认为消息发送成功。这提供了最高的可靠性,但会增加延迟,因为生产者需要等待多个副本的确认。确保了即使领导者副本出现故障,消息也不会丢失,因为其他同步副本中仍然存在该消息。适用于对数据可靠性要求极高的场景,例如金融交易系统或关键业务数据的处理。

消费者端的确认机制

  • 自动提交和手动提交:
    • Kafka 消费者可以选择自动提交偏移量或手动提交偏移量。
    • 自动提交:消费者会定期自动提交已经消费的消息的偏移量。这种方式比较方便,但可能会导致在处理消息过程中出现故障时,已经消费的消息被重复处理。例如,如果消费者在自动提交偏移量后但在完成消息处理之前出现故障,重新启动后会从上次提交的偏移量开始消费,导致已经处理过的消息被再次处理。
    • 手动提交:消费者可以在处理完一批消息后手动提交偏移量。这提供了更精细的控制,可以确保在消息被正确处理后才提交偏移量,避免重复处理。手动提交可以在代码中通过调用commitSync()(同步提交)或commitAsync()(异步提交)方法来实现。
  • 提交偏移量的时机:
    • 在手动提交时,需要谨慎选择提交偏移量的时机。一般来说,可以在确认消息已经被成功处理并持久化到外部系统(如果需要)后再提交偏移量。
    • 例如,在一个数据处理管道中,消费者从 Kafka 读取消息,进行数据转换和存储到数据库中。只有在数据库存储成功后,才提交偏移量,以确保消息不会被重复处理。

确认机制与性能的权衡

  • 延迟和吞吐量
    • 较高的确认级别(如acks=all)和手动提交偏移量通常会增加延迟,因为生产者和消费者需要等待更多的确认。这可能会影响系统的整体性能和吞吐量。
    • 在设计系统时,需要根据具体的业务需求和性能要求来权衡可靠性和性能。如果业务对数据的可靠性要求非常高,可以选择更严格的确认机制,但可能需要接受较低的吞吐量和较高的延迟。如果性能是关键因素,可以适当降低确认级别或使用自动提交偏移量,但需要注意可能出现的数据丢失和重复处理的风险。

在接口 Acknowledgment当中的acknowledge()十分重要, 这个方法用于消费者确认已经成功处理了一条消息。当消费者调用这个方法时,Kafka 会记录该消费者对特定消息的处理确认,并且可以根据配置决定是否更新消费者的偏移量。
使用场景: 在手动提交偏移量的情况下,消费者通常在确认消息已经被成功处理后调用这个方法。例如,在一个数据处理管道中,消费者从 Kafka 读取消息,进行数据转换和存储到数据库中。只有在数据库存储成功后,才调用acknowledge()方法来确认消息的处理。正确使用这个方法可以确保消息不会被重复处理,同时也可以保证在出现故障时能够正确地恢复处理进度。如果消费者在处理消息后没有正确地确认,可能会导致消息被重复处理,或者在消费者出现故障后无法正确地恢复处理进度。

  @KafkaListener(groupId = "rj", topics = Constants.Kafka.TOPIC_NAME)public void listen2(ConsumerRecord<String, String> record, String msg, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Acknowledgment acknowledgment){log.info("receive record: {}", record);log.info("receive msg: {}, topic: {}", msg, topic);// 处理各种各样的业务逻辑之后,再进行消息确认.acknowledgment.acknowledge();}

三、总结

本文对Kafka的消息发送、接收当中常用的功能做了一些较为详细的分析.这些在实际开发当中较为常用. 下一篇着重介绍一下kafka当中有关集群的知识.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/434572.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【韩顺平Java笔记】第2章:Java概述

按视频的标号来对应小标题&#xff0c;自用学习笔记 文章目录 5. 内容梳理6. 程序举例6.1 什么是程序 7. Java故事7.1 Java诞生小故事7.2 Java技术体系平台 8. Java特性8.1 Java重要特点 9. sublime10. jdk介绍10.1 Java运行机制及运行过程10.1.1 Java虚拟机&#xff08;JVM&a…

【Python】多个dataframe存入excel的不同的sheet表里,而不会被覆盖的方法

我发现&#xff0c;我原来用的多个工作簿存入的方法&#xff0c;发现不太可行&#xff0c;如果我用原来的方法&#xff0c;然后for循环&#xff0c;新的dataframe会把原来的覆盖掉&#xff0c;然后只剩下一个工作薄。原先的代码&#xff1a; with pd.ExcelWriter(file_name ) …

【题解】2022ICPC杭州-K

翻译 原题链接   简述一下就是每次询问重新定义一个字母排序表&#xff0c;问在这个顺序下n个字符串的序列的逆序数是多少。 字典树计算逆序数 先考虑初始状况下&#xff0c;即 a < b < . . . < z a<b<...<z a<b<...<z的情况下&#xff0c;逆序…

[SAP ABAP] 锁对象

在SAP中使用锁对象&#xff0c;用于避免在数据库中插入或更改数据时出现不一致的情况 1.创建锁对象 数据准备 学校表(ZDBT_SCH_437) 使用事务码SE11创建锁对象 点击"锁对象"单选按钮&#xff0c;输入以E开头的锁定对象的名称&#xff0c;然后点击创建按钮 锁对象名…

看480p、720p、1080p、2k、4k、视频一般需要多大带宽呢?

看视频都喜欢看高清&#xff0c;那么一般来说看电影不卡顿需要多大带宽呢&#xff1f; 以4K为例&#xff0c;这里引用一位网友的回答&#xff1a;“视频分辨率4092*2160&#xff0c;每个像素用红蓝绿三个256色(8bit)的数据表示&#xff0c;视频帧数为60fps&#xff0c;那么一秒…

基于VUE的在线茶叶购物网站的设计与实现后端SpringBoot数据库MySQL

目录 1. 项目结构规划 2. 技术选型与工具链 3. 关键功能模块设计 4. 数据库设计 5. 安全性考虑 6. 性能优化建议 在开发一个在线茶叶购物网站之前&#xff0c;了解相关的研究背景和技术发展趋势是非常重要的。以下是一些关键点&#xff0c;可以帮助理解该项目的开发背景和…

召回07 双塔模型——正负样本

正样本&#xff1a; 二八法则&#xff0c;少部分物品占据了大多数点击&#xff0c;会导致正样本大多是热门物品。以一定的概率抛弃一些热门物品&#xff0c;抛弃的概率与样本的点击次数正相关。 负样本&#xff1a; 简单负样本 上述简单负样本是从全体样本中抽样。其中&#…

Python编码系列—Python备忘录模式:掌握对象状态保存与恢复技术

&#x1f31f;&#x1f31f; 欢迎来到我的技术小筑&#xff0c;一个专为技术探索者打造的交流空间。在这里&#xff0c;我们不仅分享代码的智慧&#xff0c;还探讨技术的深度与广度。无论您是资深开发者还是技术新手&#xff0c;这里都有一片属于您的天空。让我们在知识的海洋中…

[Redis][Zset]详细讲解

目录 0.前言1.常见命令1.ZADD2.ZCARD3.ZCOUNT4.ZRANGE5.ZREVRANGE6.ZRANGEBYSCORE7.ZPOPMAX8.BZPOPMAX9.ZPOPMIN10.BZPOPMIN11.ZRANK12.ZREVRANK13.ZSCORE14.ZREM15.ZREMRANGEBYRANK16.ZREMRANGEBYSCORE17.ZINCRBY 2.集合间操作1.有序集合的交集操作2.ZINTERSTORE3.有序集合的并…

H5响应式的文化传媒娱乐公司HTML网站模板源码

源码名称&#xff1a;响应式的文化传媒娱乐公司HTML网站模板源码 源码介绍&#xff1a;一款自适应H5文化传媒娱乐公司官网源码&#xff0c;源码带有6个H5页面&#xff0c;可用于文化传媒和娱乐公司官网。 需求环境&#xff1a;H5 下载地址&#xff1a; https://www.51888w.c…

Netty系列-5 Netty启动流程

背景 Netty程序有固定的模板格式&#xff0c;以ServerBootstrap为例: public class NettyServer {public void start(int port) {ServerBootstrap serverBootstrap new ServerBootstrap();EventLoopGroup boosGroup new NioEventLoopGroup(1);EventLoopGroup workGroup ne…

Kubernetes配置管理(kubernetes)

实验环境&#xff1a; 在所有节点上拉取镜像&#xff1b;然后把资源清单拉取到第一个master节点上&#xff1b; 同步会话&#xff0c;导入镜像&#xff1a; configmap/secret 配置文件的映射 变量&#xff1a; 基于valuefrom的方式 cm--》pod 特点&#xff1a;变量的名称可…

[JavaEE] IP协议

目录 一、 IP协议 1.1 基本概念 1.2 协议头格式 1.3 特殊IP 二、 地址管理 2.1 网段划分 2.2 CIDR(Classless Interdomain Routing) 2.3 私有IP地址和公网IP地址 2.4 NAT(Network Address Translation)-网络地址转换 2.5 路由选择 三、数据链路层 3.1 认识以太网 3…

什么是AQS

目录 AQS 介绍 原理 以可重入的互斥锁 ReentrantLock 为例 以倒计时器 CountDownLatch 以例 AQS 资源共享方式 实现自定义同步器 示例 性能优化 AQS 介绍 AQS &#xff08;AbstractQueuedSynchronizer &#xff09;&#xff0c;抽象队列同步器。AQS 是一个功能强大且…

cmd命令大全详解

CMD是Windows操作系统中的命令行解释器&#xff0c;它允许用户通过键入命令来执行各种操作。以下是一些常用的CMD命令及其简要说明&#xff1a; dir - 显示目录中的文件和子目录。 cmddir cd - 更改当前目录。 cmdcd [目录路径] mkdir - 创建新目录。 cmdmkdir [目录名] rmd…

Vue.js 与 Flask/Django 后端配合开发实战

Vue.js 与 Flask/Django 后端配合开发实战 在现代的 Web 开发中&#xff0c;前后端分离已成为一种主流架构&#xff0c;其中前端使用 Vue.js 等流行的框架&#xff0c;后端采用 Flask 或 Django 提供 API 接口。在这种开发模式下&#xff0c;前端负责页面的交互和动态效果&…

将Mixamo的模型和动画导入UE5

首先进入Mixamo的官网 , 点击 Character 选择一个模型 (当然你也可以自己上传模型/绑定动画) 然后点击下载 , 这个作为带骨骼的模型 选择FBX格式 , T Pose 直接下载 点击 Animations 选择动画 , 搜索 idle 默认站立动画 点击下载 , 格式选择 FBX , 不带模型只要骨骼 , 帧数选6…

前端面试经验总结2(经典问题篇)

谈谈你对前端的理解 前端主要负责产品页面部分的实现&#xff0c;是最贴近于用户的程序员。 基本工作要求&#xff1a; 1.参与项目&#xff0c;通过与团队成员&#xff0c;UI设计&#xff0c;产品经理的沟通&#xff0c;快速高质量的实现效果图&#xff0c;并能够精确到1px 2.做…

大模型培训讲师叶梓:Llama Factory 微调模型实战分享提纲

LLaMA-Factory ——一个高效、易用的大模型训练与微调平台。它支持多种预训练模型&#xff0c;并且提供了丰富的训练算法&#xff0c;包括增量预训练、多模态指令监督微调、奖励模型训练等。 LLaMA-Factory的优势在于其简单易用的界面和强大的功能。用户可以在不编写任何代码的…

TypeScript介绍和安装

TypeScript介绍 TypeScript是由微软开发的一种编程语言&#xff0c;它在JavaScript的基础上增加了静态类型检查。静态类型允许开发者在编写代码时指定变量和函数的类型&#xff0c;这样可以在编译时捕获潜在的错误&#xff0c;而不是等到运行时才发现问题。比如&#xff0c;你…