互联网全景消息(10)之Kafka深度剖析(中)

一、深入应用

1.1 SpringBoot集成Kafka

        引入对应的依赖。

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency><!--swagger2增强,官方ui太low , 访问地址: /doc.html  --><dependency><groupId>com.github.xiaoymin</groupId><artifactId>swagger-bootstrap-ui</artifactId><version>1.8.8</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency></dependencies>

        添加配置文件:

spring:application:name: demokafka:bootstrap-servers: 这里换成自己的kafka信息producer: # producer 生产者retries: 0 # 重试次数acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)batch-size: 16384 # 批量大小buffer-memory: 33554432 # 生产端缓冲区大小key-serializer: org.apache.kafka.common.serialization.StringSerializer
#      value-serializer: com.itheima.demo.config.MySerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: # consumer消费者group-id: javagroup # 默认的消费组IDenable-auto-commit: true # 是否自动提交offsetauto-commit-interval: 100  # 提交offset延时(接收到消息后多久提交offset)# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#      value-deserializer: com.itheima.demo.config.MyDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

        启动服务:

@SpringBootApplication
@RestController
public class Demo {public static void main(String[] args) {new SpringApplicationBuilder(Demo.class).run(args);}}

        启动信息如下:

1.2 消息发送 

 1.2.1 异步发送

        KafkaTemplate调用的send默认采用的是异步发送,如果需要同步发送获取发送结果,则需要调用get方法。 

@RestController
public class KafkaProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/test/{msg}")public void sendMessage(@PathVariable("msg") String msg) {Message message = new Message();message.setMessage(msg);kafkaTemplate.send("test", JSON.toJSONString(message));}
}@KafkaListener(topics = {"test"})public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("message:{}", msg);}}

        效果如下:

        同步发送代码如下:

    @GetMapping("/kafka/sync/{msg}")public void sync(@PathVariable("msg") String msg) throws Exception {Message message = new Message();message.setMessage(msg);ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);logger.info("send result:{}",result.getProducerRecord().value());}

1.2.2 序列化 

        序列化详解:

  • 前面我们用到的是Kafka自带的字符串序列化器,
    org.apache.kafka.common.serialization.StringDeserializer
  • 除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long等;
  • 这些序列化接器都实现了org.apache.kafka.common.serialization.Serializer 

        自定义序列化,自己实现序列化对应的接口即可,如下:

public class MySerializer implements Serializer {@Overridepublic byte[] serialize(String s, Object o) {String json = JSON.toJSONString(o);return json.getBytes();}}

        然后在yaml配置自己的编辑器:

value-serializer: com.itheima.demo.config.MySerializer

         对应的我们在消费者消费消息的时候也需要按照我们自定义的方式进行解码,具体代码如下:

package com.itheima.demo.config;import com.alibaba.fastjson.JSON;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.UnsupportedEncodingException;
import java.util.Iterator;
import java.util.Map;public class MyDeserializer implements Deserializer {private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class);@Overridepublic Object deserialize(String s, byte[] bytes) {try {String json = new String(bytes,"utf-8");return JSON.parse(json);} catch (UnsupportedEncodingException e) {e.printStackTrace();}return null;}}

 1.2.3 分区策略

         分区策略决定了消息根据key投放到那个分区,也是顺序消费保障的基石。

  • 给定了分区号,直接将数据发送到指定分区里面去;
  • 没有给定了分区号,给定数据的key值,通过key取上hashcode进行分区;
  • 既没有给定分区号,也没有给定key值,直接轮询进行分区;
  • 自定义分区策略,按照自定义需求选择分区号;

        生产者代码如下:

//测试分区发送
@RestController
public class PartitionProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;//    指定分区发送
//    不管你key是什么,到同一个分区@GetMapping("/kafka/partitionSend/{key}")public void setPartition(@PathVariable("key") String key) {kafkaTemplate.send("test", 0,key,"key="+key+",msg=指定0号分区");}//    指定key发送,不指定分区
//    根据key做hash,相同的key到同一个分区@GetMapping("/kafka/keysend/{key}")public void setKey(@PathVariable("key") String key) {kafkaTemplate.send("test", key,"key="+key+",msg=不指定分区");}// 什么也不指定@GetMapping("/kafka/test/{msg}")public void sendMessage(@PathVariable("msg") String msg) {Message message = new Message();message.setMessage(msg);kafkaTemplate.send("test", JSON.toJSONString(message));}
}

        消费者代码如下:

//指定消费组消费
@Component
public class PartitionConsumer {private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class);//分区消费@KafkaListener(topics = {"test"},topicPattern = "0")public void onMessage(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("partition=0,message:[{}]", msg);}}@KafkaListener(topics = {"test"},topicPattern = "1")public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("partition=1,message:[{}]", msg);}}
}

        1)测试默认分区策略,发送什么也不指定的消息

        可以发现发送的是同一条的数据,他是跟partition轮询发送的;

         2)测试指定分区

         3)按照key的hashcode来分区

1.3 消息消费 

 1.3.1 消费者分组

public class GroupConsumer {private final Logger logger = LoggerFactory.getLogger(GroupConsumer.class);//组1,消费者1@KafkaListener(topics = {"test"},groupId = "group1")public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group1-1 , message:{}", msg);}}//组1,消费者2@KafkaListener(topics = {"test"},groupId = "group1")public void onMessage2(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group1-2 , message:{}", msg);}}//组2,只有一个消费者@KafkaListener(topics = {"test"},groupId = "group2")public void onMessage3(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group2 , message:{}", msg);}}
}

         启动:

        需要注意的是,注意分区数与消费者数的搭配,如果(消费者数 > 分区数量),消费者将会出现闲置,浪费资源!

1.3.2 位移提交

        1)自动提交,我们在前面设置了以下两个选项,则kafka会延时设置自动提交:

        2)手动提交,有些时候我们需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复,如下我们配置手动提交:

@Configuration
public class MyOffsetConfig {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 注意这里!!!设置手动提交configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));// ack模式://          AckMode针对ENABLE_AUTO_COMMIT_CONFIG=false时生效,有以下几种:////          RECORD//          每处理一条commit一次////          BATCH(默认)//          每次poll的时候批量提交一次,频率取决于每次poll的调用频率////          TIME//          每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)////          COUNT//          累积达到ackCount次的ack去commit////          COUNT_TIME//          ackTime或ackCount哪个条件先满足,就commit////          MANUAL//          listener负责ack,但是背后也是批量上去////          MANUAL_IMMEDIATE//          listner负责ack,每调用一次,就立即commitfactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/383.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

21、Transformer Masked loss原理精讲及其PyTorch逐行实现

1. Transformer结构图 2. python import torch import torch.nn as nn import torch.nn.functional as Ftorch.set_printoptions(precision3, sci_modeFalse)if __name__ "__main__":run_code 0batch_size 2seq_length 3vocab_size 4logits torch.randn(batch…

Agentless:OpenAI 采用的非代理框架

不需要代理库来解决复杂的业务问题。Agentless 是OpenAI采用的非代理框架&#xff0c;用于在 o3 的 SWE Bench 上实现最高精度。SWE-bench 是 github的真实软件工程问题基准。Agentless 遵循简单的三阶段流程&#xff1a;本地化、修复和补丁验证&#xff1a; 1 ⃣生成存储库的…

【前端动效】原生js实现拖拽排课效果

目录 1. 效果展示 2. 效果分析 2.1 关键点 2.2 实现方法 3. 代码实现 3.1 html部分 3.2 css部分 3.3 js部分 3.4 完整代码 4. 总结 1. 效果展示 如图所示&#xff0c;页面左侧有一个包含不同课程&#xff08;如语文、数学等&#xff09;的列表&#xff0c;页面右侧…

给DevOps加点料:融入安全性的DevSecOps

从前&#xff0c;安全防护只是特定团队的责任&#xff0c;在开发的最后阶段才会介入。当开发周期长达数月、甚至数年时&#xff0c;这样做没什么问题&#xff1b;但是现在&#xff0c;这种做法现在已经行不通了。 采用 DevOps 可以有效推进快速频繁的开发周期&#xff08;有时…

CTFshow—文件包含

Web78-81 Web78 这题是最基础的文件包含&#xff0c;直接?fileflag.php是不行的&#xff0c;不知道为啥&#xff0c;直接用下面我们之前在命令执行讲过的payload即可。 ?filephp://filter/readconvert.base64-encode/resourceflag.php Web79 这题是过滤了php&#xff0c;…

年度技术突破奖|中兴微电子引领汽车芯片新变革

随着以中央计算区域控制为代表的新一代整车电子架构逐步成为行业主流&#xff0c;车企在电动化与智能化之后&#xff0c;正迎来以架构创新为核心的新一轮技术竞争。中央计算SoC&#xff0c;作为支撑智驾和智舱高算力需求的核心组件&#xff0c;已成为汽车电子市场的重要新增量。…

交响曲-24-3-单细胞CNV分析及聚类

CNV概述 小于1kb是常见的插入、移位、缺失等的变异 人体内包含<10% 的正常CNV&#xff0c;我们的染色体数是两倍体&#xff0c;正常情况下&#xff0c;只有一条染色体表达&#xff0c;另一条沉默&#xff0c;当表达的那条染色体发生CNV之后&#xff0c;表达数量就会成倍增加…

Cline(原Claude Dev)开源的IDE AI插件,如何搭配OpenRouter实现cursor功能,Cline怎么使用

Cline&#xff08;原Claude Dev&#xff09;是一个开源的IDE AI插件&#xff0c;可以使用你的命令行界面和编辑器的人工智能助手。 你可以直接在VS Code编辑器进行安装。如果你使用过Cursor AI IDE的话&#xff0c;可以尝试最新发布的Cline3.1版本。 在OpenRouter上&#xff0…

单片机-定时器中断

1、相关知识 振荡周期1/12us; //振荡周期又称 S周期或时钟周期&#xff08;晶振周期或外加振荡周期&#xff09;。 状态周期1/6us; 机器周期1us; 指令周期1~4us; ①51单片机有两组定时器/计数器&#xff0c;因为既可以定时&#xff0c;又可以计数&#xff0c;故称之为定时器…

1-1 电场基本概念

目录&#xff1a; 目录 目录&#xff1a; 1.0 电荷守恒定律 2.0 互斥与相吸 3.0 电场的概念 4.0 库伦定律 5.0 矢量的概念 1.0 电荷守恒定律 电荷守恒定律是物理学中的一个基本原理&#xff0c;它指出在一个封闭系统内&#xff0c;电荷的总量是保持不变的。这意味着电荷既…

3BB学习transformer日记,attention原理

将单词数字化——方向代表语义 每一个单词可以对应到高维空间的一个向量&#xff0c;这个向量叫嵌入向量。以三维为例&#xff0c;女人-男人叔叔-婶婶&#xff08;四个都是向量&#xff09;&#xff0c;可以这么理解&#xff0c;”男人“这个词的意思转化成”女人“这个词的意…

【硬件介绍】Type-C接口详解

一、Type-C接口概述 Type-C接口特点&#xff1a;以其独特的扁头设计和无需区分正反两面的便捷性而广受欢迎。这种设计大大提高了用户的使用体验&#xff0c;避免了传统USB接口需要多次尝试才能正确插入的问题。Type-C接口内部结构&#xff1a;内部上下两排引脚的设计虽然可能不…

qt 快捷功能 快速生成 setter getter 构造函数 父类虚函数重写 成员函数实现 代码框架 查看父类及父类中的虚函数

qt 快速生成 setter getter 构造函数 父类虚函数重写 成员函数实现 代码框架 1、找到要实现的头文件 2、鼠标移动到在头文件中的类定义的类名上&#xff0c;右键进行选择。 这是插入父类虚函数(父类虚函数重写) 选项弹出来的结果。可以查看到所有父类及父类中的所有的虚函数

用vscode写latex-1

一般大伙使用 LaTeX 大体有两种方案&#xff0c; 一种是在本地配置环境或使用本地的软件&#xff0c;如 vscode LaTeX&#xff0c;texlive&#xff0c;lyx 等等&#xff1b; 另一种是线上 LaTeX 平台&#xff0c;其中用的最多的是 Overleaf&#xff0c;还有一部分高校也有自…

基于YOLOv8的高空无人机小目标检测系统(python+pyside6界面+系统源码+可训练的数据集+也完成的训练模型

目标检测系统【环境搭建过程】&#xff08;GPU版本&#xff09;-CSDN博客 摘要 本文提出了一种基于YOLOv8算法的高空无人机小目标检测系统&#xff0c;利用VisDrone数据集中的7765张图片&#xff08;6903张训练集&#xff0c;862张验证集&#xff09;进行模型训练&#xff0c;…

springCloud特色知识记录(基于黑马教程2024年)

目录 Nacos 简介 Nacos 的特点 Nacos 的使用步骤可以查看黑马教程文档&#xff1a;‍‌​‌​&#xfeff;⁠​⁠​​​​​&#xfeff;‬​​​​‍‌&#xfeff;‬⁠​&#xfeff;​‬​​​​‍​&#xfeff;⁠​&#xfeff;​​⁠​​‬​⁠&#xfeff;​​day03-微…

网络考试系统的设计与实现【源码+文档+部署讲解】

目 录 摘 要 Abstract 第1章 绪论 1.1 研究的目的及意义 1.2 研究开发现状分析 1.3 研究的内容 第2章 系统相关技术 2.1 JAVA简介 2.2 J2EE 2.3 MySQL 2.4 MyEclipse 2.5 JavaScript 2.6 JQuery 2.7 CSS3 2.8 JSP 2.9 Tomcat服务器 第3章 可行性…

uniapp实现H5页面内容居中与两边留白,打造类似微信公众号阅读体验

在 UniApp 中&#xff0c;由于需要兼容多端应用&#xff0c;我们通常使用 rpx 作为尺寸单位。然而&#xff0c;在某些情况下&#xff0c;如需要实现内容居中且两边留白时&#xff0c;直接使用 rpx 可能会带来一些限制。这时&#xff0c;我们可以考虑使用 px 或 rem 等单位&…

CES Asia 2025:VR/AR/XR引领科技新潮流

在全球科技领域蓬勃发展的大背景下&#xff0c;CES Asia 2025&#xff08;赛逸展&#xff09;即将在京盛大开幕&#xff0c;VR/AR/XR技术作为前沿科技的代表&#xff0c;将在本次展会上大放异彩&#xff0c;展现出令人瞩目的发展趋势和巨大潜力&#xff0c;同时政策优势也将为其…

el-table 合并单元格

参考文章&#xff1a;vue3.0 el-table 动态合并单元格 - flyComeOn - 博客园 <el-table :data"tableData" border empty-text"暂无数据" :header-cell-style"{ background: #f5f7fa }" class"parent-table" :span-method"obj…