kafka快速入门+应用

Kafka, 构建TB级异步消息系统

1.快速入门

1.1 阻塞队列

        在生产线程  和  消费线程  之间起到了 , 缓冲作用,即避免CPU  资源被浪费掉

  • BlockingQueue
    • 解决  线程通信  的问题
    • 阻塞方法  put 、 take
  • 生产者、消费者 模式
    • 生产者:产生数据的线程
    • 消费者:使用数据的线程
  • 实现类
    • ArrayBlockingQueue
    • LinkedBlockingQueue
    • PriorityBlockingQueue、SynchronousQueue、DelayQueue等

1.2 Kafaka 入门

Kafaka 简介

  • Kafaka 是一个分布式的(消息队列    发展到---》 功能更加丰富) 流媒体平台
  • 应用:消息系统、日志收集、用户行为追踪、流式处理

Kafaka 特点

  • 高吞吐量(对于硬盘的顺序读写,性能很高,高于对于内存的随机读写)
  • 消息持久化(将数据存储在硬盘中)
  • 高可靠性 (分布式集群部署,一台服务器挂了,切换到其他服务器)
  • 高扩展性(服务器不够用,则加一台服务器)

Kafaka 术语

  • Broker(集群中的每一台服务器称为一个Broker)、Zookeeper(管理集群的软件)
  • Topic(发布订阅模式,生产者把消息发送的位置-》topic)、Partition(表示对于主题位置的分区,每一个分区按照顺序往其中追加数据)、Offset(消息在分区内存在的索引)
  • Leader Replica(主副本可以提供想要读取某个分区的数据,如果主副本挂了,则从集群中选用一个从副本作为主副本) 、Follower Replica(从副本只是 备份,不负责做响应)
    • 每一个分区有多个副本,如果一个副本坏了,还有备份,提高容错率

1.3 Spring 整合 Kafka

  1. 引入依赖
    1. spring-kafka
  2. 配置Kafka
    1. 配置server
    2. 配置consumer
  3. 访问Kafka
  • 生产者
    • kafkaTemplate.send(topic, data);
  • 消费者
    • @KafkaListener(topics = {"test"})  public void handleMessage(ConsumerRecord record) {}

2.应用

2.1 安装

(1)安装包位置

链接:https://pan.baidu.com/s/1QjZdOUYdmSCSf0zjprJQIQ
提取码:9630

下载后解压缩

(2)相关配置

1

 2

2.2 启动+使用

(1)启动zookeeper
C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

(2)启动kafka
C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0>bin\windows\kafka-server-start.bat config\server.properties

 

(3)创建主题

C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-topics.bat --create --bootstrap-server localhost:9092 -replication-factor 1 --partitions 1 --topic test1

C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-topics.bat --list --bootstrap-server localhost:9092
__consumer_offsets
test
test1

(4)创建生产者

C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test1

(5)创建消费者

C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1 --from-beginning

再次生产消息,会自动消费消息

 

2.3 spring中整合kafka

(1)配置Properties

#9. kafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
# 是否自动提交(记录)   消费者偏移量
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000

其中spring.kafka.consumer.group-id=community-consumer-group与。。。保持一致

(2)导入依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

(3)test

1.定义producer

@Component
class KafkaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic, String content) {kafkaTemplate.send(topic,content);}
}

2.定义consumer

@Component
class KafkaConsumer {@KafkaListener(topics ={"test"})public void handleMessage(ConsumerRecord record){System.out.println(record.value());//PHDVB干嘛呢//呕吼}
}

3.编写测试类

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {@Autowiredprivate KafkaProducer kafkaProducer;@Testpublic void testKafka() {kafkaProducer.sendMessage("test","PHDVB干嘛呢");kafkaProducer.sendMessage("test","呕吼");// 生产者发消息是  主动的 ,消费者收消息是  被动地try {Thread.sleep(1000 * 10);} catch (InterruptedException e) {e.printStackTrace();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/306417.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【VSCode+Keil5+STM32CubeMX】开发环境配置

一、软件下载 二、软件安装 三、配置环境 四、验证开发环境 五、Keil与VS Code的同步 从0到1搭建VS Code Keil5 STM32CubeMX开发环境 优点 支持标准库HAL库LL库代码编辑更“现代化”&#xff1a;代码提示、函数跳转、更高自由度的定制主题等优点多端同步&#xff0c;VS Code和…

【云计算】云网络产品体系概述

云网络产品体系概述 在介绍云网络产品体系前&#xff0c;先介绍几个与云计算相关的基础概念。 阿里云在基础设施层面分为 地域 和 可用区 两层&#xff0c;关系如下图所示。在一个地域内有多个可用区&#xff0c;每个地域完全独立&#xff0c;每个可用区完全隔离&#xff0c;同…

蓝桥杯 每日2题 day5

碎碎念&#xff1a;哦哈呦&#xff0c;到第二天也是哦哈哟&#xff0c;&#xff0c;学前缀和差分学了半天&#xff01;day6堂堂连载&#xff01; 0.单词分析 14.单词分析 - 蓝桥云课 (lanqiao.cn) 关于这题就差在input前加一个sorted&#xff0c;记录一下下。接下来就是用字…

[dvwa] file upload

file upload 0x01 low 直接上传.php 内容写<? eval($_POST[jj]);?> 用antsword连 路径跳两层 0x02 medium 添加了两种验证&#xff0c;格式为图片&#xff0c;大小限制小于1000 上传 POST /learndvwa/vulnerabilities/upload/ HTTP/1.1 Host: dvt.dv Content-Le…

苹果电脑(Mac)怎么清理 itunes 备份?

苹果电脑用户广泛利用 iTunes 应用程序对 iPhone 或 iPad进行定期备份&#xff0c;以确保珍贵的数据安全无虞。然而&#xff0c;随着备份历史的增长&#xff0c;它们会在磁盘上积累大量空间&#xff0c;尤其当您频繁为多台设备备份时&#xff0c;存储资源可能会迅速消耗殆尽。为…

《Sky光遇》无视steam锁区的两种下载入库游玩教程(图文一览)

玩家在光遇游戏中需要找到每一关的子民&#xff0c;然后穿过艰难险阻&#xff0c;最终在石碑前接受祝福&#xff0c;就是通过这一关了。玩家只有通关后会来到一个祭坛&#xff0c;从这个祭坛周围的门前往下一关或是回到遇境&#xff0c;通关就相当于是解锁地图场景&#xff0c;…

JavaEE 初阶篇-深入了解 CAS 机制与12种锁的特征(如乐观锁和悲观锁、轻量级锁与重量级锁、自旋锁与挂起等待锁、可重入锁与不可重入锁等等)

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 乐观锁与悲观锁概述 1.1 悲观锁&#xff08;Pessimistic Locking&#xff09; 1.2 乐观锁&#xff08;Optimistic Locking&#xff09; 1.3 区别与适用场景 2.0 轻…

IPSec简介

起源 随着Internet的发展&#xff0c;越来越多的企业直接通过Internet进行互联&#xff0c;但由于IP协议未考虑安全性&#xff0c;而且Internet上有大量的不可靠用户和网络设备&#xff0c;所以用户业务数据要穿越这些未知网络&#xff0c;根本无法保证数据的安全性&#xff0…

Docker 安装 Linux 系统可视化监控 Netdata

docker 安装 netdata 前提准备Docker 两种方式部署 Netdata1、使用 docker run 命令运行 netdata 服务2、使用 docker compose 运行 netdata 服务 Netdata 服务可视化界面Netdata 汉化处理 前提准备 说明&#xff1a;此处使用 windows11 安装的 docker desktop & wsl2/apli…

10-热点文章-定时计算

xxl-Job分布式任务调度 1 今日内容 1.1 需求分析 目前实现的思路&#xff1a;从数据库直接按照发布时间倒序查询 问题1&#xff1a; 如何访问量较大&#xff0c;直接查询数据库&#xff0c;压力较大 问题2&#xff1a; 新发布的文章会展示在前面&#xff0c;并不是热点文章 …

halcon 两图叠加 显示

halcon 两图叠加 显示 代码 read_image (Image, E:/test/CameraWeldHead/二号机焊头图片.bmp) read_image (Image1, E:/test/CameraWeldHead/右1.bmp) mirror_image (Image1, ImageMirror, column)crop_part (Image, ImagePart1, 0, 0, 4096, 4096) crop_part (ImageMirror, …

【面试题】细说mysql中的各种锁

前言 作为一名IT从业人员&#xff0c;无论你是开发&#xff0c;测试还是运维&#xff0c;在面试的过程中&#xff0c;我们经常会被数据库&#xff0c;数据库中最经常被问到就是MySql。当面试官问MySql的时候经常会问道一个问题&#xff0c;”MySQL中有哪些锁&#xff1f;“当我…

LeetCode - 1702. 修改后的最大二进制字符串

文章目录 解析AC CODE 题目链接&#xff1a;LeetCode - 1702. 修改后的最大二进制字符串 解析 详细题解&#xff1a;贪心&#xff0c;简洁写法&#xff08;Python/Java/C/Go/JS/Rust&#xff09; 思路很牛b。 简单来说我们需要想办法将0配对&#xff0c;将其变为10&#xff0…

深度学习【向量化(array)】

为什么要向量化 在深度学习安全领域、深度学习练习中&#xff0c;你经常发现在训练大量数据时&#xff0c;深度学习算法表现才更加优越&#xff0c;所以你的代码运行的非常快至关重要&#xff0c;否则&#xff0c;你将要等待非常长的时间去得到结果。所以在深度学习领域向量化…

C/S医学检验LIS实验室信息管理系统源码 医院LIS源码

LIS系统即实验室信息管理系统。LIS系统能实现临床检验信息化&#xff0c;检验科信息管理自动化。其主要功能是将检验科的实验仪器传出的检验数据经数据分析后&#xff0c;自动生成打印报告&#xff0c;通过网络存储在数据库中&#xff0c;使医生能够通过医生工作站方便、及时地…

如何卸载干净 IDEA(图文讲解)

更新时间 2022-12-20 11:一则或许对你有用的小广告 星球 内第一个项目&#xff1a;全栈前后端分离博客项目&#xff0c;演示地址&#xff1a;Weblog 前后端分离博客, 1.0 版本已经更新完毕&#xff0c;正在更新 2.0 版本。采用技术栈 Spring Boot Mybatis Plus Vue 3.x Vit…

Android开发基础:对话框,Toast,Notification的使用 选项菜单,上下文菜单,弹出式菜单的使用

目录 一&#xff0c;Android提示消息 1.提示消息的形式 2.对话框 &#xff08;1&#xff09;默认对话框的创建步骤 &#xff08;2&#xff09; 自定义对话框的创建步骤 3.Toast 4.Notification 二&#xff0c;菜单 1.选项菜单 OptionsMenu 2.上下文菜单 ContextMenu …

【python】在pycharm创建一个新的项目

双击打开pycharm,选择create new project 选择create,后进入项目 右键项目根目录,选择new一个新的python file 随意命名一下 输入p 然后后面就会出现智能补全提示,此时轻敲一下tab,代码就写好了,非常的方便 右键执行一下代码,下面两个直接运行和debug运行都是可以的 小结 …

Leetcode - 周赛392

目录 一&#xff0c;3105. 最长的严格递增或递减子数组 二&#xff0c;3106. 满足距离约束且字典序最小的字符串 三&#xff0c;3107. 使数组中位数等于 K 的最少操作数 四&#xff0c;3108. 带权图里旅途的最小代价 一&#xff0c;3105. 最长的严格递增或递减子数组 本题求…

基于Whisper语音识别的实时视频字幕生成 (二): 在线实时字幕

Whisream Whistream&#xff08;微流&#xff09;是基于Whisper语音识别的的在线字幕生成工具&#xff0c;支持rtsp/rtmp/mp4等视频流在线语音识别 1. whistream介绍 whistream将在whishow基础上引入whisper进行在线语音识别生成视频字幕 2. 使用 python&#xff1a; pyth…