Kafka学习-Java使用Kafka

文章目录

  • 前言
  • 一、Kafka
    • 1、什么是消息队列
      • offset
    • 2、高性能
      • topic
      • partition
    • 3、高扩展
      • broker
    • 4、高可用
      • replicas、leader、follower
    • 5、持久化和过期策略
    • 6、消费者组
    • 7、Zookeeper
    • 8、架构图
  • 二、安装Zookeeper
  • 三、安装Kafka
  • 四、Java中使用Kafka
    • 1、引入依赖
    • 2、生产者
    • 3、消费者
    • 4、运行效果


前言

Kafka消息中间件

一、Kafka

1、什么是消息队列

假设我们有两个服务:生产者A每秒能生产200个消息,消费者B每秒能消费100个消息。

在这里插入图片描述

那么B服务是处理不了A这么多消息的,那么怎么使B不被压垮的同时还能处理A的消息呢,我们引入一个中间件,即Kafka。(当然着并不能使消费者的处理速度上升)

在这里插入图片描述

offset

那么我们可以在B服务中加入一个队列,也就是一个链表,链表的每个节点相当于一条消息,每个节点有一个序号即offset,记录消息的位置。

在这里插入图片描述

在这里插入图片描述

但是这样也会有个问题,还没有处理的消息是存储在内存中的,如果B服务挂掉,那么消息也就丢失了。
所以我们可以把队列移出,变成一个单独的进程,即使B服务挂掉,消息也不会丢失。

在这里插入图片描述

2、高性能

B服务由于性能差,队列中未处理的消息会越来越多,我们可以增加更多的消费者来处理消息,相对的也可以增加更多的生产者来生成消息。

在这里插入图片描述

topic

但是,生产者与消费者会争抢同一个队列,没有抢到就要等待,那么怎么解决呢?
我们可以将消息进行分类,每一类消息是一个topic,生产者按消息的类型投递到不同的topic中,消费者也按照不同的topic进行消费。

在这里插入图片描述

partition

但是单个topic的消息还是有可能过多,我们可以将单个队列拆分,每段是一个partition分区,每个消费者负责一个partition

在这里插入图片描述

3、高扩展

broker

随着partition过多,所有的partition都在同一个机器上,就可能会导致单机的cpu和内存过高,影响性能,那么我们可以使用多台机器,将partition分散部署在不同的机器上。每台机器就代表一个broker
我们可以增加broker来缓解服务器的cpu过高的性能问题。

在这里插入图片描述

4、高可用

replicas、leader、follower

假如某个broker挂了, 那么其中partition中的消息也就都丢失了,那么这个问题怎么解决呢?
我们可以给partition多加几个副本,统称replicas,并将它们分为leaderfollower
leader负责生产者和消费者的读写,follower只负责同步leader的数据。假如leader挂了,也不会影响follower,随后在follower中选出一个leader,保证消息队列的高可用。

在这里插入图片描述

5、持久化和过期策略

在上面讲述了leader挂掉的情况,如果所有的broker都挂了,消息不就都丢失了?
为了解决这个问题,就不能只把数据存在内存中,也要存在磁盘中。
但是如果所有消息一直保存在磁盘中,那磁盘也会被占满,所以引入保留策略。

6、消费者组

如果我想在原有的基础上增加一个消费者,那么它只能跟着最新的offset接着消费,如果我想从某个offset开始消费呢?
我们引入消费者组,实现不同消费者维护自己的消费进度。

在这里插入图片描述

7、Zookeeper

上面介绍了很多的组件,每个组件都有自己的状态信息,那么就需要有一个组件去统一维护这些组件的状态信息,于是引入了Zookeeper组件,它会定期与broker通信,获取Kafka集群的状态,判断哪些broker挂了,消费者组消费到哪了等等。

8、架构图

在这里插入图片描述

二、安装Zookeeper

1、官网地址

https://zookeeper.apache.org/

2、下载

在这里插入图片描述

选择稳定版本下载

在这里插入图片描述

3、解压,修改配置文件

解压后,复制 zoo_sample.cfg,重命名为 zoo.cfg

在这里插入图片描述

修改数据文件目录位置

在这里插入图片描述

4、启动

我们是在windows系统下安装的,运行 bin 目录下的 zkServer.cmd

在这里插入图片描述

三、安装Kafka

1、官网地址

https://kafka.apache.org/

2、下载

在这里插入图片描述

3、解压,修改配置文件

修改 config 目录下 server.properties 文件
修改日志文件位置,其他参数(如zookeeper端口,根据需要修改)

在这里插入图片描述

4、启动

bin\windows\kafka-server-start.bat config\server.properties

在这里插入图片描述

四、Java中使用Kafka

1、引入依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

2、生产者

public static void main(String[] args) throws InterruptedException {Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.ACKS_CONFIG, "all");prop.put(ProducerConfig.RETRIES_CONFIG, 0);prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);prop.put(ProducerConfig.LINGER_MS_CONFIG, 1);prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);String topic = "hello";KafkaProducer<String, String> producer = new KafkaProducer<>(prop);for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<String, String>(topic, Integer.toString(2), "hello kafka" + i));System.out.println("生产消息:" + i);Thread.sleep(1000);}producer.close();
}

3、消费者

public static void main(String[] args) {Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.GROUP_ID_CONFIG, "con-1");    // 消费者组prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);    //自动提交偏移量prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);     //自动提交时间KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);ArrayList<String> topics = new ArrayList<>();//可以订阅多个消息topics.add("hello");consumer.subscribe(topics);try {while(true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(10));for (TopicPartition topicPartition : poll.partitions()) {//	通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息List<ConsumerRecord<String, String>> partitionRecords = poll.records(topicPartition);//	获取TopicPartition对应的主题名称String topic = topicPartition.topic();//	获取TopicPartition对应的分区位置int partition = topicPartition.partition();//	获取当前TopicPartition下的消息条数int size = partitionRecords.size();System.out.printf("--- 获取topic: %s, 分区位置:%s, 消息总数: %s%n",topic,partition,size);for(int i = 0; i < size; i++) {ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);//	实际的数据内容String key = consumerRecord.key();//	实际的数据内容String value = consumerRecord.value();//	当前获取的消息偏移量long offset = consumerRecord.offset();//	表示下一次从什么位置(offset)拉取消息long commitOffser = offset + 1;System.out.printf("消费消息 key:%s, value:%s, 消息offset: %s, 提交offset: %s%n",key, value, offset, commitOffser);Thread.sleep(1500);}}}} catch (Exception e) {e.printStackTrace();} finally {consumer.close();}
}

4、运行效果

生产消息

在这里插入图片描述

消费消息

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/327511.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C语言】/*操作符(下)*/

目录 一、操作符的分类 二、二进制和进制转换 2.1 进制 2.2 进制之间的转换 三、原码、反码、补码 四、单目操作符 五、逗号表达式 六、下标引用操作符[] 七、函数调用操作符() 八、结构体成员访问操作符 8.1 直接访问操作符(.) 8.2 间接访问操作符(->) 九、操作符…

【Spring】初识 Spring AOP(面向切面编程)

目录 1、介绍AOP 1.1、AOP的定义 1.2、AOP的作用 1.3、AOP的核心概念及术语 2、AOP实现示例 3、EnableAspectJAutoProxy注解 1、介绍AOP 1.1、AOP的定义 AOP&#xff08;Aspect Orient Programming&#xff09;&#xff0c;直译过来就是面向切面编程&#xff0c;AOP 是一…

[动画详解]LeetCode151.翻转字符串里的单词

&#x1f496;&#x1f496;&#x1f496;欢迎来到我的博客&#xff0c;我是anmory&#x1f496;&#x1f496;&#x1f496; 又和大家见面了 欢迎来到动画详解LeetCode算法系列 用通俗易懂的动画让算法题不再神秘 先来自我推荐一波 个人网站欢迎访问以及捐款 推荐阅读 如何低成…

十二生肖Midjourney绘画大挑战:释放你的创意火花

随着AI艺术逐渐进入大众视野&#xff0c;使用Midjourney绘制十二生肖不仅能够激发我们的想象力&#xff0c;还能让我们与传统文化进行一场新式的对话。在这里&#xff0c;我们会逐一提供给你创意满满的绘画提示词&#xff0c;让你的作品别具一格。而且&#xff0c;我们还精选了…

Python进行excel处理-01

最近干采购&#xff0c;每个月要对供应商的对账单&#xff0c;对对应的采购订单号和物料编号的价格和数量&#xff0c;是不是和物料管控总表里面的价格数量是不是一致&#xff0c;于是写了一个代码。 从总表里面找到&#xff0c;对账单里对应采购订单和物料编码的数据&#xf…

vscode 通过ssh 远程执行ipynb +可以切换conda env

主要是保证几个点 远程服务器python 环境没问题 conda这些也都有的ssh的账户 是有conda权限的没有免密就输入密码 免密教程就是最基本的那种 公钥copy过去就行了vscode 那几个插件都要装好 开始操作 首先 vscode 点击左侧工具栏中的扩展&#xff0c;搜索“ssh”&#xff0c;…

计算机vcruntime140.dll找不到如何修复,分享5种靠谱的修复教程

当您在运行某个应用程序或游戏时遇到提示“找不到vcruntime140.dll”&#xff0c;这通常意味着系统中缺少了Visual C Redistributable for Visual Studio 2015或更高版本的一个重要组件。这个错误通常发生在运行某些程序时&#xff0c;系统无法找到所需的动态链接库文件。小编将…

(四十二)第 6 章 树和二叉树(树的二叉链表(孩子-兄弟)存储)

1. 背景说明 2. 示例代码 1) errorRecord.h // 记录错误宏定义头文件#ifndef ERROR_RECORD_H #define ERROR_RECORD_H#include <stdio.h> #include <string.h> #include <stdint.h>// 从文件路径中提取文件名 #define FILE_NAME(X) strrchr(X, \\) ? strrch…

15-ps命令

常用选项 aux axjf a&#xff1a;显示一个终端所有的进程u&#xff1a;显示进程的归属用户及内存使用情况x&#xff1a;显示没有关联控制终端j&#xff1a;显示进程归属的进程组idf&#xff1a;以ASCII码的形式显示出进程的层次关系 ps aux其中| more是只显示一部分内容&…

【实战】算法思路总结

面试过程中&#xff0c;总是被拷打&#xff0c;信心都要没了。但是也慢慢摸索出一些思路&#xff0c;希望对大家有帮助。 &#xff08;需要多用一下ACM模式&#xff0c;力扣模式提供好了模板&#xff0c;自己在IDEA里面写的话&#xff0c;还是会有些陌生&#xff09; 0、基本…

Edge(微软)——一款充满创新精神的浏览器

随着科技的不断进步&#xff0c;互联网浏览器已经成为我们日常生活中不可或缺的工具。在这个领域&#xff0c;微软Edge作为一款新型的浏览器&#xff0c;凭借其独特的功能和优秀的性能&#xff0c;逐渐在市场上占据了一席之地。本文将深入探索微软Edge的特点、优势以及它如何改…

Acrobat Pro DC 2023 for Mac:PDF处理的终极解决方案

Acrobat Pro DC 2023 for Mac为Mac用户提供了PDF处理的终极解决方案。它具备强大的文档处理能力&#xff0c;无论是查看、编辑还是创建PDF文件&#xff0c;都能轻松胜任。在编辑功能方面&#xff0c;Acrobat Pro DC 2023支持对文本、图像进行精准的修改和调整&#xff0c;还能添…

一台linux通过另一台linux访问互联网-TinyProxy

参考&#xff1a; https://blog.csdn.net/weixin_41831919/article/details/113061317https://www.yuncongz.com/archives/1.htmlhttps://blog.csdn.net/aoc68397/article/details/101893369 环境&#xff1a;ubuntu 18.04 机器1: IP 219.216.65.252 (可以访问外网) 机器2: IP…

廉洁教育vr虚拟全景展览馆成为社会普法的重要基石

廉政教育是社会文明的重要基石&#xff0c;也是我们每个人的责任与担当。在这个数字化、信息化的新时代&#xff0c;我们特别推出廉政3D线上数字展厅&#xff0c;为公众打造一个沉浸式、互动式的廉政教育新平台。 走进廉政3D线上数字展厅&#xff0c;就如同置身于一个充满智慧与…

[笔试训练](二十二)064:添加字符065:数组变换066:装箱问题

目录 064:添加字符 065:数组变换 066:装箱问题 064:添加字符 添加字符_牛客笔试题_牛客网 (nowcoder.com) 题目&#xff1a; 题解&#xff1a; 枚举所有A&#xff0c;B字符串可能的对应位置&#xff0c;得出对应位置不同字符数量的最小情况 两字符串的字符数量差n-m&…

Keil编程不同驱动文件引用同一个常量的处理方法

基础不牢&#xff0c;地动山摇&#xff0c;最近单片机编程又遇到一个基础问题。 我在头文件中定义了一个常量同时给两个驱动文件使用&#xff0c;封装的时候编译没问题&#xff0c;但是在main函数中引用驱动函数的时候就出现了重定义的问题&#xff0c;如下如所示。 解决方法很…

搜索引擎的设计与实现(三)

目录 5 系统详细实现 5.1实现环境配置 5.2功能实现 5.2.1 建立索引 5.2.2 文件搜索实现 5.2.3 数据库的连接配置 5.2.4 数据库搜索实现 5.2.5 后台数据编辑实现 前面内容请移步 搜索引擎的设计与实现&#xff08;二&#xff09; 免费源代码&毕业设计论文 搜索…

Linux学习笔记1---Windows上运行Linux

在正点原子的教程中学习linux需要安装虚拟机或者在电脑上安装一个Ubuntu系统&#xff0c;但个人觉得太麻烦了&#xff0c;现在linux之父加入了微软&#xff0c;因此在Windows上也可以运行linux 了。具体方法如下&#xff1a; 一、 在Windows上的设置 在window的搜索框内&#…

Java的类和对象(一)—— 初始类和对象,this关键字,构造方法

前言 从这篇文章开始&#xff0c;我们就进入到了JavaSE的核心部分。这篇文章是Java类和对象的第一篇&#xff0c;主要介绍类和对象的概念&#xff0c;this关键字以及构造方法~~ 什么是类&#xff1f;什么是对象&#xff1f; 学过C语言的老铁们&#xff0c;可以类比struct自定义…

GPT-4o,AI实时视频通话丝滑如人类,Plus功能免费可用

不开玩笑&#xff0c;电影《她》真的来了。 OpenAI最新旗舰大模型GPT-4o&#xff0c;不仅免费可用&#xff0c;能力更是横跨听、看、说&#xff0c;丝滑流畅毫无延迟&#xff0c;就像在打一个视频电话。 现场直播的效果更是炸裂&#xff1a; 它能感受到你的呼吸节奏&#xf…