案例:ZooKeeper + Kafka消息队列集群部署

目录

消息队列

概念

使用场景

不适宜

适宜

消息队列的特征

存储

异步

异步的优点

同步

为什么需要消息队列

解耦

作用

冗余

扩展性

灵活性

峰值处理能力

可恢复性

顺序保证

Kafka

概念

Kafka技术名词

(1)Broker

(2)Topic

(3)Producer

(4)Consumer

(5)Partition

(6)Consumer Group

(7)Message

ZooKeeper

概念

工作原理

Master启动

Master故障

Master恢复

角色

Server

Leader

Follower

Observer

Client

ZooKeeper在Kafka中的作用

部署Kafka集群

初步设置

安装ZooKeeper

安装Kafka

测试

创建主题

生产消费测试


消息队列

概念

  • 消息(Message)是指在应用间传送的数据
  • 消息队列(Message Queue)是一种应用间的通信方式,确保消息的可靠传递
  • 临时存放数据的空间(缓存)
  • 消息队列是中间件的一种

比如服务端产生的数据比较多,客户端无法及时处理,这时就需要一个队列系统,给消息排队,每接收一个消息,就放到队列中,等待应用程序提取该数据,应用程序提取数据之后,队列就释放该数据


使用场景

不适宜

如果有一个程序A产生的数据要交给程序B,而程序A产生的数据量和程序B能够承载数据量是一样的,就没有必要加消息队列这个中间件

如果此时加了中间件,就打破了原有的架构,本来A和B可以直接通信,结果不仅增加了数据传输的延迟,还增加了开发者和运维者对整体架构的复杂度

或者两个程序要求数据同步性较高也不适合加消息队列

适宜

如果程序A产生的数据量远远高于程序B承载的数据量,那么这时就可以增加一个消息队列的中间件


消息队列的特征

存储

将消息存储在某种类型的缓冲区中(内存),直到目标进程读取这些消息或将其从消息队列中显式移除为止

异步

程序A和消息队列连接程序B也和消息队列连接,此时A和B就是异步连接(发送和接收消息不同步),程序A(发布者)只负责发送数据到队列中不在乎谁来用,程序B(使用者)只负责获取数据而不在乎谁发送的

消息发布者只管把消息发布到MQ中而不管谁来用,消息使用者只管从MQ中取消息而不管是谁发布。

异步的优点
  • 如果程序B处理数据比较慢,可以通过中间件暂存这些数据,慢慢处理

同步

同步就是:程序A发送数据,程序B接收数据,建立连接以后,数据的发送与接收是同步的(请求和应答)


为什么需要消息队列

  1. 解耦
  2. 冗余
  3. 扩展性
  4. 灵活性
  5. 峰值处理能力
  6. 可恢复性
  7. 顺序保证
  8. 异步通信

解耦

解耦:解耦合

耦合是指两个程序结合的非常紧密,比如程序A出故障了,那么程序B就不能正常运行,而程序B出故障了程序A就不能正常运行

有一方发生故障,那么两个程序的运行都会受到影响。

作用

在上面描述的环境中,如果有了消息队列以后,就可以把AB两个程序解耦,程序A和消息队列通信,程序B也和消息队列通信,发送方只管发送数据,使用方只管使用数据


冗余

这里的冗余是指可以给消息队列做备份,或分布式存储


扩展性

类似于前面讲的Redis集群可以扩展节点。

消息队列也可以实现良好的扩展,增大消息入队和处理频率


灵活性

比如对于程序A的开发者,更加灵活,不需要知道程序B的运作原理,只需要把封装一个数据报文,按照队列的协议接口,发给消息队列就行了


峰值处理能力

当一个程序B无法及时处理突发的大量访问时,此时就可以使用消息队列,使关键组件顶住突发的访问压力,而不会因为突发的超负荷请求而崩溃


可恢复性

即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理


顺序保证

写入数据和取出数据时都是要按顺序的,当接收者每接收一次数据,那么队列的数据就会往前移一位(偏移量+1)



Kafka

Kafka是众多消息队列的其中一个

概念

  • Kafka是一种高吞吐量(高并发)的分布式发布 / 订阅消息系统
  • Kafka是Apache组织下的一个开源系统
  • 可以实时的处理大量数据以满足各种需求场景

Kafka技术名词

(1)Broker

  • Kafka集群包含一个或多个服务器,每个服务器被称为Broker(经纪人)。
  • 安装有Kafka的服务器,就可以称为Broker(经纪人)。

(2)Topic

  • 每条发布到Kafka集群的消息都有一个分类,这个类别被称为Topic(主题)。
  • Kafka消息队列中可以存放各式各样的消息,把消息做分类,不同类别的消息,属于不同的Topic(主题)。

(3)Producer

  • 消息的生产者,负责发布消息到Kafka broker。
  • 负责产生消息的应用程序

(4)Consumer

  • 消息的消费者,从Kafka broker拉取数据,并消费这些已发布的消息。
  • 负责接收消息的应用程序。

(5)Partition

  • Partition(分区)是物理上的概念,每个Topic包含一个或多个Partition,每个Partition都是一个有序的队列。
  • 同一类的消息隶属于同一个主题(Topic),Partition为每个主题的消息创建存储区域(分区),每个主题都要由内存提供存储空间,为该主题的消息提供空间,这个空间就是Partition
  • 同时,Partition中的每一条消息都会被分配一个有序的id
  • 每一个服务器可以有多个Topic,每个Topic可以有多个partition

(6)Consumer Group

  • 消费者组,可以给每个Consumer指定消费者组,若不指定消费者组,则属于默认的Group
  • 比如接收消息的程序不止一个服务器,构建成一个组,让一些消费者属于该组,便于管理

(7)Message

  • 消息,通信的基本单位,每个Producer可以向一个Topic发布一些消息
  • 接收到的每一条消息,每一个生产者会向消息队列发送存储消息的请求,放置到对应的主题中


ZooKeeper

概念

ZooKeeper是一种分布式协调技术,所谓分布式协调技术主要是用来解决分布式环境当中多个进程之间的同步控制,让他们有序的去访问某种共享资源,防止造成资源竞争(脑裂)的现象。

协调消息的生产者把消息放到哪个Kafka,协调消息的消费者从哪个Kafka提取数据,是整个Kafka环境中的协调者


工作原理

在Kafka集群中,安装Kafka的服务器也要安装ZooKeeper

Master启动

各节点向ZooKeeper中注册节点信息,ZooKeeper会为其分配一个id,以编号最小算法选举出一个主节点,另外的设备就是备用节点,这个Master就是对外服务的消息队列,其他应用程序连接的就是这个Master

哪个节点先注册,id就最小,就越可能成为Master

Master故障

如果主节点A发生故障,这时它在ZooKeeper所注册的节点信息会被自动删除,ZooKeeper会再次以编号最小算法选举出新的Master

Master恢复

如果故障的主节点恢复了,它会再次向ZooKeeper注册自身的节点信息,但注册的节点信息编号会变小,因此不会成为Master,而是另一台节点继续担任Master


角色

ZooKeeper集群主要角色有Server和Client

其中Server又分为3个角色

Server

  • 服务器端角色
Leader
  • 领导者角色,主要负责发起投票和决议,以及更新系统状态。
Follower
  • 跟随者角色,用于接受客户端请求并返回结果给客户端,在选举过程中参与投票
Observer
  • 观察者角色,用户接受客户端请求,并将请求转发给Leader,同时同步Leader的状态,但是不参与投票。Observer的目的是扩展系统,提高伸缩性(增加节点、减少节点)

Client

  • 客户端角色,用于向ZooKeeper发起请求

ZooKeeper在Kafka中的作用

  1. Broker注册
  2. Topic注册
  3. 生产者负载均衡(把消息分布存储在不同的节点)
  4. 消费者负载均衡(消费者可以选择其中任意一个节点去连接)
  5. 记录消息分区与消费者的关系
  6. 消息消费进度和Offset(偏移量)记录
  7. 消费者注册

部署Kafka集群

本案例的目的是构建一个Kafka消息队列的集群,实现了负载均衡、高可用(数据冗余)、故障转移

初步设置

开启3台虚拟机并连接上XShell

为每一台kafka服务器修改主机名(Kafka1,Kafka2,Kafka3)

[root@localhost ~]# hostnamectl set-hostname kafka1
[root@localhost ~]# bash

开启会话同步

为了方便实验关闭防火墙和内核安全机制

然后在hosts文件末尾,追加3台服务器的ip+主机名,方便通信

[root@kafka1 ~]# systemctl stop firewalld
[root@kafka1 ~]# setenforce 0
[root@kafka1 ~]# vim /etc/hosts
192.168.10.101 kafka1
192.168.10.102 kafka2
192.168.10.103 kafka3

安装ZooKeeper

安装java环境,导入软件包

[root@kafka1 ~]# yum -y install java

解压ZooKeeper软件包,因为是解压即用的程序,所以直接移动到/etc下并重命名

[root@kafka1 ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz
[root@kafka1 ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper

进入ZooKeeper目录,创建出一会在配置文件中指定的数据目录,然后进入存放配置文件的目录,通过ZooKeeper提供的配置文件拷贝出一份自己使用的文件

[root@kafka1 ~]# cd /etc/zookeeper
[root@kafka1 zookeeper]# mkdir zookeeper-data
[root@kafka1 zookeeper]# cd conf/
[root@kafka1 conf]# cp zoo_sample.cfg zoo.cfg

编辑ZooKeeper配置文件,指定存储数据的目录,然后在14行下方配置ZooKeeper集群中的服务器

[root@kafka1 conf]# vim zoo.cfg
# 指定ZooKeeper存储数据的目录。这个目录用于保存ZooKeeper的持久数据和快照文件
dataDir=/etc/zookeeper/zookeeper-data  # 第12行
server.1=192.168.10.101:2888:3888
server.2=192.168.10.102:2888:3888
server.3=192.168.10.103:2888:3888
# 保存并退出

  • 2181:对cline端提供服务
  • 2888:集群内部通信的端口
  • 3888:选举Leader的端口

关闭会话同步

在3台服务器上分别执行下方操作

在ZooKeeper的集群中,需要为每台服务器生成一个唯一的id

[root@kafka1 conf]# cd /etc/zookeeper/zookeeper-data/

不同的是每台服务器的id不能相同,这里我们把3台服务器的id分别设置为1、2、3

第一台服务器
[root@kafka1 zookeeper-data]# echo "1" > myid
第二台服务器
[root@kafka1 zookeeper-data]# echo "2" > myid
第三台服务器
[root@kafka1 zookeeper-data]# echo "3" > myid

开启会话同步

分配完id后,开启会话同步

执行bin目录下的zkServer.sh脚本,追加start参数,启动ZooKeeper服务,启动服务后显示STARTED表示正常启动

还是执行bin目录下的akServer.sh脚本,追加status参数,输出ZooKeeper的状态

  • Mode: follower: 该节点当前处于follower模式,说明它不是领导者(leader),而是从节点,正在跟随集群中的领导者。

此时这3台主机就一台的mode显示是:Mode: leader

[root@kafka1 zookeeper-data]# cd ..
[root@kafka1 zookeeper]# bin/zkServer.sh start
Starting zookeeper ... STARTED
[root@kafka1 zookeeper]# bin/zkServer.sh status
Mode: follower

安装Kafka

还是解压Kafka软件包,然后将解压目录移动到/etc下再重命名直接使用,cd进入Kafka的配置文件目录

[root@kafka1 ~]# tar zxvf kafka_2.13-2.4.1.tgz
[root@kafka1 ~]# mv kafka_2.13-2.4.1 /etc/kafka
[root@kafka1 ~]# cd /etc/kafka/config

暂时关闭会话同步,在3台服务器上分别修改对应的id

打开Kafka的配置文件,先在第21行修改每台服务器对应的id

[root@kafka1 config]# vim server.properties
第一台服务器
broker.id=1
第二台服务器
broker.id=2
第三台服务器
broker.id=3

然后在第31行,分别为三台服务器修改对应的IP

第一台服务器
listeners=PLAINTEXT://192.168.10.101:9092
第二台服务器
listeners=PLAINTEXT://192.168.10.102:9092
第三台服务器
listeners=PLAINTEXT://192.168.10.103:9092

再开启会话同步

然后在第60行指定Kafka存储数据的目录,再在第123行指定Kafka连接到Zookeeper集群的地址。

Kafka使用Zookeeper来进行集群协调和管理,所以需要提供Zookeeper服务器的IP地址和端口。

log.dirs=/etc/kafka/kafka-logs  # 第60行
zookeeper.connect=192.168.10.101:2181,192.168.10.102:2181,192.168.10.103:218  # 第123行

保存并退出,在指定的位置创建出刚才在配置文件中指定存储数据的目录

[root@kafka1 config]# cd ..
[root@kafka1 kafka]# mkdir kafka-logs
[root@kafka1 kafka]# bin/kafka-server-start.sh config/server.properties &

测试

关闭会话同步

创建主题

执行kafka-topics.sh脚本的--create选项创建一个主题,然后再使用--list选项查看该主题是否能被查到

  • --create:指定创建一个新的主题。
  • --zookeeper kafka1:2181:指定 ZooKeeper 服务器的位置。注意,Kafka 需要连接 ZooKeeper 来管理集群元数据。
  • --replication-factor 1:因为是测试环境,所以设置主题的副本树为 1,意味着只有一个副本(没有冗余)。在生产环境中,建议使用大于 1 的副本树以提高容错能力。
  • --partitions 1:设置主题的分区数为 1。分区决定了消息的并行处理能力和存储。
[root@kafka1 kafka]# cd bin/
[root@kafka1 bin]# ./kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test
[root@kafka1 bin]# ./kafka-topics.sh --list --zookeeper kafka1:2181
test

生产消费测试

选一个作为消费的服务器,执行kafka-console-consumer.sh脚本,指定一个或多个Kafka代理的地址,再指定要读取消息的主题,等待接收消息

在结尾还有一个选项:

  • 不加 --from-beginning:只输出实时数据
    • 加 --from-beginning:输出从主题创建以来的所有数据
[root@kafka3 bin]# ./kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test

然后再选一个作为发布消息的服务器,执行kafka-console-producer.sh脚本,再指定Kafka代理列表,每个代理地址通常包含主机名(或 IP 地址)和端口号,用逗号分隔

[root@kafka1 bin]# ./kafka-console-producer.sh --broker-list kafka1:9092 -topic test
>123
>222
>333

然后再来到座位消费的服务器,就会发现输入的消息都会在消费服务器延迟显示(异步)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/405515.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言一笔画迷宫

目录 开头程序程序的流程图程序游玩的效果结尾 开头 大家好&#xff0c;我叫这是我58。 程序 #define _CRT_SECURE_NO_WARNINGS 1 #include <stdio.h> #include <string.h> #include <Windows.h> void printmaze(const char strmaze[11][11]) {int ia 0;…

BUG——imx6u开发_结构体导致的死机问题(未解决)

简介&#xff1a; 最近在做imx6u的linux下裸机驱动开发&#xff0c;由于是学习的初级阶段&#xff0c;既没有现成的IDE可以使用&#xff0c;也没有GDB等在线调试工具&#xff0c;只能把代码烧写在SD卡上再反复插拔&#xff0c;仅靠卑微的亮灯来判断程序死在哪一步。 至于没有使…

41-设计规则:线宽规则

1.设置电源线规则和信号线规则 2.设置信号线规则 3.设置电源线规则 如果未生效&#xff1a; ① 提升优先级即可。 ②查看使能选项有没有勾选

20:【stm32】定时器一:时基单元

时基单元 1、什么是定时器2、时基单元的基本结构2.1&#xff1a;脉冲的来源2.2&#xff1a;预分频器PSC2.3&#xff1a;计数器CNT2.4&#xff1a;update事件与预加载 3、标准库编程3.1&#xff1a;通过定时器中断来设置延迟函数 1、什么是定时器 定时器是一种专门负责定时功能…

Vue 满屏纵向轮播图

目录 前言轮播图效果展示具体实现实现思路具体代码前言 今天汇总一个需求,还是之前写的,要求写一个满屏的轮播图,准确的说,是鼠标滑动到轮播图的时候,轮播图固定在屏幕上,随着其中的轮播子项遍历结束后,解除固定的效果。原本我最开始想直接修改Element-UI的组件的,但是…

CISAW认证考试的时间是多久

CISAW&#xff0c;即中国信息安全保障人员&#xff0c;是中国信息安全认证与审查中心进行权威认证的缩写。它是全国范围内最为权威、最高端的信息安全认证之一。作为信息安全领域的重要认证&#xff0c;对于从事网络安全工作的人员来说具有极其重要的意义。因此&#xff0c;备考…

【容器安全系列Ⅲ】- 深入了解Capabilities的作用

在本系列的上一部分中&#xff0c;我们提到 Docker 容器尚未使用 time 命名空间。我们还探讨了容器在许多情况下如何以 root 用户身份运行。考虑到这两点&#xff0c;如果我们尝试更改容器内的日期和时间会发生什么&#xff1f; 为了测试这一点&#xff0c;我们先运行 docker r…

入门网络安全工程师要学习哪些内容

大家都知道网络安全行业很火&#xff0c;这个行业因为国家政策趋势正在大力发展&#xff0c;大有可为!但很多人对网络安全工程师还是不了解&#xff0c;不知道网络安全工程师需要学什么?知了堂小编总结出以下要点。 网络安全工程师是一个概称&#xff0c;学习的东西很多&…

2000-2023年逐年最大NDVI数据集(500m)

植被指数&#xff08;NDVI, Normalized Difference Vegetation Index&#xff09;可以准确反映地表植被覆盖状况。目前&#xff0c;NDVI时序数据已经在各尺度区域的植被动态变化监测、土地利用/覆被变化检测、宏观植被覆盖分类和净初级生产力估算等研究中得到了广泛的应用。 中…

【java】RuoYi-Vue前后端分离版本-请求被拦截,怎么修改拦截过滤器,解决方案

【java】RuoYi-Vue前后端分离版本-请求被拦截&#xff0c;怎么修改拦截过滤器 它用到了一个安全管理框架Spring Security 你可以通过这篇文章《Spring Security 详解》 去了解它&#xff0c;怎么使用或者使用原理。 所有业务都受SecurityConfig配置所过滤 SecurityConfig配置…

【功能自动化】使用Excel文档获取参数数据

环境搭建&#xff1a; 1.需要配置WebTours网站 2.安装pandas pip install -i https://pypi.tuna.tsinghua.edu.cn/simple numpy pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pandas pip install -i https://pypi.tuna.tsinghua.edu.cn/simple python_dateutil…

设计模式(3)结构型模式

结构型模式 结构型模式1. Adapter&#xff08;适配器模式&#xff09;2. Bridge&#xff08;桥接模式&#xff09;3.Composite&#xff08;组合模式&#xff09;4.Decorator&#xff08;装饰模式&#xff09;5.Facade&#xff08;外观模式&#xff09;6.Flyweight&#xff08;享…

14、Ripper

难度 低->中 目标 一个root 两个flag kali 192.168.135.58 靶机 192.168.135.104 netdiscover -i eth0 -r 192.168.135.0/24 端口扫描 先访问一下80端口和10000端口&#xff0c;这两个都是web服务的样子 80端口是初始化界面&#xff0c;可以尝试扫扫目录 访问10000端口…

Linux升级lib64中的libc.so.6导致所有命令失效

ls: relocation error: libpthread.so.0: symbol __libc_dl_error_tsd, version GLIBC_PRIVATE not defined in file libc.so.6 with link time reference 升级Glibc后出现所有shell命令都不可用 # systemctl status systemctl: relocation error: /lib64/libpthread.so.0: sy…

Ollama 企业私有化部署大模型最佳解决方案

为什么要私有化部署大模型&#xff1f; 很多企业为了控制成本和减少核心数据外泄的风险&#xff0c;会通过私有化部署大模型&#xff0c;来控制成本和保障企业的数据安全。 说到本地化部署&#xff0c;这时就需要说到Ollama框架了。 Ollama 是什么&#xff1f; Ollama 是一个开…

张宇1000题vs武忠祥严选题,哪本更接近真题?

张宇1000题强化篇难度还是挺大的 首先是综合度比较高&#xff0c;如果你基础复习的不好&#xff0c;不建议做&#xff0c;张宇1000题强化篇的难度还是比较大的&#xff0c;适合基础已经比较扎实的同学来做&#xff01; 张宇1000题与张宇的高数18讲等课程紧密结合&#xff0c;…

BEV世界:通过统一的BEV潜在空间实现自动驾驶的多模态世界模型

BEVWorld: A Multimodal World Model for Autonomous Driving via Unified BEV Latent Space BEV世界&#xff1a;通过统一的BEV潜在空间实现自动驾驶的多模态世界模型 Abstract World models are receiving increasing attention in autonomous driving for their ability t…

ROS机械臂——ROS结合OpenCV案例(含资源)

纲要 摄像头驱动 图像属性 图像压缩 ### Realsense摄像头 点云展示 ### 点云图像属性 ## 摄像头标定 摄像头标定流程 如何使用标定文件 OpenCV ROS与OpenCV的集成框架 ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/b0ff143b710543839325d19c7a3c04c5.png R…

【GH】【EXCEL】P4: Chart

文章目录 data and chartdonut chart (radial chart)Radial Chart bar chartBar Chart line chartLine Chart Scatter ChartScatter Chart Surface ChartSurface Chart Chart DecoratorsChart Decorators Chart GraphicsChart Graphics data and chart donut chart (radial cha…

每日一问:深入理解JVM——结构与类的加载过程解析

每日一问&#xff1a;深入理解JVM——结构与类的加载过程解析 在Java的世界中&#xff0c;JVM&#xff08;Java Virtual Machine&#xff0c;Java虚拟机&#xff09;是一个核心概念。它是Java程序能够跨平台运行的基础&#xff0c;负责执行Java字节码&#xff0c;并为Java应用程…