SpringBoot-Learning系列之Kafka整合

SpringBoot-Learning系列之Kafka整合

本系列是一个独立的SpringBoot学习系列,本着 What Why How 的思想去整合Java开发领域各种组件。

file

  • 消息系统

    • 主要应用场景
      • 流量消峰(秒杀 抢购)、应用解耦(核心业务与非核心业务之间的解耦)
      • 异步处理、顺序处理
      • 实时数据传输管道
      • 异构语言架构系统之间的通信
        • 如 C语言的CS客户端的HIS系统与java语言开发的互联网在线诊疗系统的交互
  • Kafka是什么

    kafka是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。是java领域常用的消息队列。

    核心概念:

    • 生产者(Producer) 生产者应用向主题队列中投送消息数据
    • 消费者 (Consumer) 消费者应用从订阅的Kafka的主题队列中获取数据、处理数据等后续操作
    • 主题 (Topic) 可以理解为生产者与消费者交互的桥梁
    • 分区 (Partition) 默认一个主题有一个分区,用户可以设置多个分区。每个分区可以有多个副本(Replica)。分区的作用是,将数据划分为多个小块,提高并发性和可扩展性。每个分区都有一个唯一的标识符,称为分区号。消息按照键(key)来进行分区,相同键的消息会被分配到同一个分区中。分区可以有不同的消费者同时消费。副本的作用是提供数据的冗余和故障恢复。每个分区可以有多个副本,其中一个被称为领导者(leader),其他副本被称为追随者(follower)。领导者负责处理读写请求,而追随者只负责复制领导者的数据。如果领导者宕机或不可用,某个追随者会被选举为新的领导者,保证数据的可用性。
  • windows 安装kafka

    本地环境DockerDeskTop+WSL2,基于Docker方式安装Kafka

    2.8.0后不需要依赖zk了

    • 拉取镜像

      docker pull wurstmeister/zookeeperdocker pull wurstmeister/kafka
      
    • 创建网络

      docker network create kafka-net --driver bridge
      
    • 安装zk

      docker run --net=kafka-net --name zookeeper -p 21810:2181 -d wurstmeister/zookeeper
      
    • 安装kafka

      docker run -d --name kafka --publish 9092:9092 \
      --link zookeeper \
      --env KAFKA_ZOOKEEPER_CONNECT=172.31.192.1:2181 \
      --env KAFKA_ADVERTISED_HOST_NAME=172.31.192.1 \
      --env KAFKA_ADVERTISED_PORT=9092  \
      --volume /etc/localtime:/etc/localtime \
      wurstmeister/kafka:latest
      
    • 测试

      telnet localhost:9092
      
  • SpringBoot集成

    SpringBoot3.1.0+jdk17

    • pom依赖

      								```<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>io.github.vino42</groupId><artifactId>springboot-kafka</artifactId><version>1.0-SNAPSHOT</version><properties><java.version>17</java.version><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><!--kafka--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><!--排除掉 自行添加最新的官方clients依赖--><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.5.1</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.10.1</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.21</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>3.1.0</version></plugin></plugins></build></project>```
      
    • 配置

      spring:kafka:bootstrap-servers: 172.31.192.1:9092producer:retries: 0# 每次批量发送消息的数量batch-size: 16384buffer-memory: 33554432# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerlistener:missing-topics-fatal: false
      #      MANUAL	poll()拉取一批消息,处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交#      MANUAL_IMMEDIATE	每处理完业务手动调用Acknowledgment.acknowledge()后立即提交#      RECORD	当每一条记录被消费者监听器(ListenerConsumer)处理之后提交#      BATCH	当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交#      TIME	当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交#      COUNT	当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交#      COUNT_TIME	TIME或COUNT满足其中一个时提交ack-mode: manual_immediateconsumer:group-id: test# 是否自动提交enable-auto-commit: falsemax-poll-records: 100#      用于指定消费者在启动时、重置消费偏移量时的行为。#      earliest:消费者会将消费偏移量重置为最早的可用偏移量,也就是从最早的消息开始消费。#      latest:消费者会将消费偏移量重置为最新的可用偏移量,也就是只消费最新发送的消息。#      none:如果找不到已保存的消费偏移量,消费者会抛出一个异常auto-offset-reset: earliestauto-commit-interval: 100# 指定消息key和消息体的编解码方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:max.poll.interval.ms: 3600000
      server:port: 8888spring:kafka:bootstrap-servers: 172.31.192.1:9092producer:retries: 0# 每次批量发送消息的数量batch-size: 16384buffer-memory: 33554432# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerlistener:missing-topics-fatal: falseack-mode: manual_immediateconsumer:group-id: testenable-auto-commit: falsemax-poll-records: 100auto-offset-reset: earliestauto-commit-interval: 100# 指定消息key和消息体的编解码方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:max.poll.interval.ms: 3600000
      
    • 生产者代码示例

      package io.github.vino42.publiser;import com.google.gson.Gson;
      import com.google.gson.GsonBuilder;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.kafka.core.KafkaTemplate;
      import org.springframework.stereotype.Component;/*** =====================================================================================** @Created :   2023/8/30 21:29* @Compiler :  jdk 17* @Author :    VINO* @Copyright : VINO* @Decription : kafak 消息生产者* =====================================================================================*/
      @Component
      public class KafkaPublishService {@AutowiredKafkaTemplate kafkaTemplate;/*** 这里为了简单 直接发送json字符串** @param json*/public void send(String topic, String json) {kafkaTemplate.send(topic, json);}
      }
      
      @RequestMapping("/send")public String send() {IntStream.range(0, 10000).forEach(d -> {kafkaPublishService.send("test", RandomUtil.randomString(16));});return "ok";}
    • 消费者

      @Component
      @Slf4j
      public class CustomKafkaListener {@org.springframework.kafka.annotation.KafkaListener(topics = "test")public void listenUser(ConsumerRecord<?, String> record, Acknowledgment acknowledgment) {try {String key = String.valueOf(record.key());String body = record.value();log.info("\n=====\ntopic:test,key{},message:{}\n=====\n", key, body);log.info("\n=====\ntopic:test,key{},payLoadJson:{}\n=====\n", key, body);} catch (Exception e) {e.printStackTrace();} finally {//手动ackacknowledgment.acknowledge();}}
      }
      

SpringBoot Learning系列 是笔者总结整理的一个SpringBoot学习集合。可以说算是一个SpringBoot学习的大集合。欢迎Star关注。谢谢观看。

file
关注公众号不迷路

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/129307.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在Creo 6.0中画图模板问题

在Creo 6.0中&#xff0c;文件的默认模板是英制模板“inlbs_part_solid”,此文件模板中尺寸的单位是inch。我们建模中需要的单位是mm&#xff0c;改变Creo文件默认的单位有两种方法。 1 【新建】对话框取消勾选【使用默认模板】对话框 &#xff08;1&#xff09;单击主页选项…

基于SSM的房屋租售网站

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

RabbitMQ学习笔记

1、什么是MQ&#xff1f; MQ全称message queue&#xff08;消息队列&#xff09;&#xff0c;本质是一个队列&#xff0c;FIFO先进先出&#xff0c;是消息传送过程中保存消息的容器&#xff0c;多 用于分布式系统之间进行通信。 在互联网架构中&#xff0c;MQ是一种非常常见的…

sql注入基本概念

死在山野的风里&#xff0c;活在自由的梦里 sql注入基本概念 MYSQL基本语法union合并查询2个特性&#xff1a;order by 排序三个重要的信息 Sql Server MYSQL 基本语法 登录 mysql -h ip -u user -p pass基本操作 show databases; 查看数据库crea…

2023Web前端开发面试手册

​​​​​​​​ HTML基础 1. HTML 文件中的 DOCTYPE 是什么作用&#xff1f; HTML超文本标记语言: 是一个标记语言, 就有对应的语法标准 DOCTYPE 即 Document Type&#xff0c;网页文件的文档类型标准。 主要作用是告诉浏览器的解析器要使用哪种 HTML规范 或 XHTML规范…

前端面试的话术集锦第 8 篇:高频考点(JS性能优化 性能优化琐碎事)

这是记录前端面试的话术集锦第八篇博文——高频考点(JS性能优化 & 性能优化琐碎事),我会不断更新该博文。❗❗❗ 1. 从V8中看JS性能优化 注意:该知识点属于性能优化领域。 1.1 测试性能⼯具 Chrome已经提供了⼀个⼤⽽全的性能测试⼯具Audits。 点我们点击Audits后,可…

【LInux编译器gcc/g++】gcc使用方法和动静态库相关概念

目录 一.前言 二.源代码的翻译环境 三.gcc相关指令 四.动静态库 1.什么是库&#xff1f; 2.库的命名 3.库的链接方式 4.动静态链接的优缺点 5.小结 一.前言 在Windows系统上我们常用VisualStudio来进行C/C开发&#xff0c;VS并不是一款单一的软件&#xff0c;而是集成…

DQN算法概述及基于Pytorch的DQN迷宫实战代码

一. DQN算法概述 1.1 算法定义 Q-Learing是在一个表格中存储动作对应的奖励值&#xff0c;即状态-价值函数Q(s,a)&#xff0c;这种算法存在很大的局限性。在现实中很多情况下&#xff0c;强化学习任务所面临的状态空间是连续的&#xff0c;存在无穷多个状态&#xff0c;这种情…

将Apache服务与内网穿透结合,让您的网站可以公网访问

Apache服务安装配置与结合内网穿透实现公网访问 文章目录 Apache服务安装配置与结合内网穿透实现公网访问前言1.Apache服务安装配置1.1 进入官网下载安装包1.2 Apache服务配置 2.安装cpolar内网穿透2.1 注册cpolar账号2.2 下载cpolar客户端 3. 获取远程桌面公网地址3.1 登录cpo…

Android 查看当前手机、APP的ABI架构信息

目录 查看手机 查看APP 查看手机 命令&#xff1a;adb shell "getprop |grep cpu" 命令&#xff1a;adb shell getprop ro.product.cpu.abi 查看APP 在 data/system/packages.xml 文件中找到自己 app 的相关配置信息&#xff0c;这里有明确指出该去哪里加载 so…

C++中菱形继承中的多态在底层是如何实现的。

如果还不了解菱形继承和多态的底层可以看这两篇文章&#xff1a;C中多态的底层实现_Qianxueban的博客-CSDN博客 C的继承以及virtual的底层实现_Qianxueban的博客-CSDN博客 1.只有基类有虚函数 2.派生类也有重写的虚函数

【MySQL数据库原理】MySQL Community 8.0界面工具汉化

尝试以下方法来汉化 MySQL Workbench 8.0 的菜单&#xff1a; 1、使用社区翻译版本&#xff1a;有一些热心的社区成员会将 MySQL Workbench 翻译成不同的语言&#xff0c;包括中文。你可以在一些开源或社区网站上寻找这些翻译版本&#xff0c;并按照他们的说明进行安装。 2、…

博客系统(升级(Spring))(二)获取当前用户信息、对密码进行加密、设置统一数据格式、设置未登录拦截、线程池

博客系统&#xff08;二&#xff09; 博客系统获取当前用户的信息对密码进行加密和解密的操作设置统一的数据返回格式设置未登录拦截设置线程池 博客系统 博客系统是干什么的&#xff1f; CSDN就是一个典型的博客系统。而我在这里就是通过模拟实现一个博客系统&#xff0c;这是…

Redis之缓存和数据库双写一致方案讨论解读

目录 什么是缓存双写一致 更新缓存还是删除缓存&#xff1f; 先删除缓存,再更新数据库 场景描述 解决方案&#xff1a;延时双删策略 先更新数据库&#xff0c;再删除缓存 场景描述 解决方案&#xff1a;重试机制引入MQ 为什么要引入MQ 什么是缓存双写一致 只要用缓存…

rsync远程同步

与inodify结合使用&#xff0c;实现实时同步 rsync简介 rsync&#xff08;Remote Sync&#xff0c;远程同步&#xff09;是一个开源的快速备份工具&#xff0c;可以在不同主机之间镜像同步整个目录树&#xff0c;&#xff1b;支持增量备份&#xff0c;并保持链接和权限&#…

记录造数据测试接口

一、前言 在java开发中经常需要造数据进行测试接口&#xff0c;这里记录一下常用的通过造数据测试接口的方法。 二、一般的接口传参方式 1、接口的方式最好是使用JSON或者map的方式&#xff0c;这样的好处是传参可以灵活伸缩&#xff0c;返回的结果也最好是JSON或者map的方式…

Redis7--基础篇1(概述,安装、卸载及配置)

1. Redis概述 1.1 什么是Redis Redis&#xff1a;REmote Dictionary Server&#xff08;远程字典服务器&#xff09; Remote Dictionary Server(远程字典服务)是完全开源的&#xff0c;使用ANSIC语言编写遵守BSD协议&#xff0c;是一个高性能的Key-Value数据库提供了丰富的数…

MQTT 连接优化指南

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

JVM问题排查

本文详细说明了Java应用运行过程中几种常见的JVM相关问题&#xff0c;并给出了问题排查步骤。 一、堆中OOM 现象&#xff1a;Java线程负载过高&#xff0c;JVM内存几乎占满&#xff0c;甚至抛出java.lang.OutOfMemoryError错误。 思路&#xff1a;通过jmap能查看到对内存中实…

【Apollo 自动驾驶】Win11 中 WSL2 安装配置 Apollo 环境

【Apollo 自动驾驶】Win11 中 WSL2 安装配置 Apollo 环境 【1】Win11 WSL2 安装配置 Nvidia Cuda 【1.1】检查计算机硬件的显卡信息 计算机图标右击 -> 管理 -> 设备管理器 -> 显示适配器&#xff1b; 【1.2】检查对应显卡并安装 Nvidia 显卡驱动 下载对应的 Nv…