Kafka 分布式消息系统详细介绍

Kafka 分布式消息系统

  • 一、Kafka 概述
    • 1.1 Kafka 定义
    • 1.2 Kafka 设计目标
    • 1.3 Kafka 特点
  • 二、Kafka 架构设计
    • 2.1 基本架构
    • 2.2 Topic 和 Partition
    • 2.3 消费者和消费者组
    • 2.4 Replica 副本
  • 三、Kafka 分布式集群搭建
    • 3.1 下载解压
      • 3.1.1 上传解压
    • 3.2 修改 Kafka 配置文件
      • 3.2.1 修改zookeeper.properties配置文件
      • 3.2.2 修改consumer.properties配置文件
      • 3.2.3 修改producer.properties配置
      • 3.2.4 修改server.properties配置
    • 3.3 修改 Kafka 配置同步到其他节点
    • 3.4 修改 Kafka Server 编号
    • 3.5 启动Kafka 集群
    • 3.5.1 启动Zookeeper集群
    • 3.5.1 启动 Kafka 集群
    • 3.6 Kafka 集群测试
      • 3.6.1 创建Topic
      • 3.6.2 查看Topic列表
      • 3.6.2 查看Topic详情
      • 3.6.3 消费者消费Topic
      • 3.6.4 生产者向Topic发送消息
  • 四、案例实践:Flume 与 Kafka 集成开发
    • 4.1 配置Flume聚合服务
    • 4.2 Flume与Kafka集成测试
      • 4.2.1 启动Flume聚合服务
      • 4.2.2 启动 Flume 采集服务
      • 4.2.3 启动 Kafka 消费者服务
      • 4.2.4 准备测试数据

一、Kafka 概述

1.1 Kafka 定义

Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala语言编写,它以可水平扩展和高吞吐率的特点而被广泛使用。目前越来越多的开源分布式处理系统,如Spark、Flink都支持与Kafka集成。比如一个实时日志分析系统,Flume采集数据通过接口传输到Kafka集群(多台Kafka服务器组成的集群称为Kafka集群),然后Flink或者Spark直接调用接口从Kafka实时读取数据并进行统计分析。

1.2 Kafka 设计目标

  • 以时间复杂度为O(1)的方式提供消息持久化(Kafka)能力,即使对TB级以上数据也能保证常数时间的访问性能。持久化是将程序数据在持久状态和瞬时状态间转换的机制。通俗地讲,就是瞬时数据(比如内存中的数据是不能永久保存的)持久化为持久数据(比如持久化至磁盘中能够长久保存)。
  • 保证高吞吐率,即使在非常廉价的商用机器上,也能做到单机支持每秒100,000条消息的传输速度。
  • 支持Kafka Server间的消息分区,以及分布式消息消费,同时保证每个Partition内的消息顺序传输。
  • 支持离线数据处理和实时数据处理。

1.3 Kafka 特点

  • 高吞吐量、低延迟:Kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒。
  • 可扩展性:Kafka集群同Hadoop集群一样,支持横向扩展。
  • 持久性、可靠性:Kafka消息可以被持久化到本地磁盘,并且支持Partition数据备份,防止数据丢失。
  • 容错性:允许Kafka集群中的节点失败,如果Partition(分区)副本数量为n,则最多允许n-1个节点失败。
  • 高并发:单节点支持上千个客户端同时读写,每秒钟有上百MB的吞吐量,基本上达到了网卡的极限。

二、Kafka 架构设计

2.1 基本架构

在这里插入图片描述
生产者将数据写入 Kafka,消费者从 Kafka 中读取数据,Zookeeper 提供协调服务,如生产者和消费者的负载均衡

2.2 Topic 和 Partition

在这里插入图片描述
生产者将数据写入主题,实际写入分区(轮询,随机等),一个分区只能对应一个消费者组中的一个消费组,而一个消费者可以对应多个分区。

2.3 消费者和消费者组

在这里插入图片描述
一个分区只能对应一个消费者组中的一个消费者,消费者组相互独立,一个分区可以对应多个不同消费者组中的消费者,一个消费者可以对应多个分区。

2.4 Replica 副本

  • Leader:每个Replica集合中的分区都会选出一个唯一的Leader,所有的读写请求都由Leader处理,其他副本从Leader处把数据更新同步到本地。

  • Follower:是副本中的另外一个角色,可以从Leader中复制数据。

  • ISR:Kafka集群通过数据冗余来实现容错。每个分区都会有一个Leader,以及零个或多个Follower,Leader加上Follower总和就是副本因子。Follower与Leader之间的数据同步是通过Follower主动拉取Leader上面的消息来实现的。所有的Follower不可能与Leader中的数据一直保持同步,那么与Leader数据保持同步的这些Follower称为IS(In Sync Replica)。Zookeeper维护着每个分区的Leader信息和ISR信息。

三、Kafka 分布式集群搭建

3.1 下载解压

下载地址:https://archive.apache.org/dist/kafka/

此处使用的下载的版本式:kafka_2.12_2.8.2.tgz

3.1.1 上传解压

[root@hadoop1 local]# tar -zxvf kafka_2.12-2.8.2.tgz 

添加软连接

[root@hadoop1 local]# ln -s kafka_2.12-2.8.2 kafka

在这里插入图片描述

3.2 修改 Kafka 配置文件

3.2.1 修改zookeeper.properties配置文件

进入Kafka的config目录下,修改zookeeper. properties配置文件,具体内容如下:

[root@hadoop1 local]# vim /usr/local/kafka/config/zookeeper.properties 

修改如下内容:

dataDir=/usr/local/data/zookeeper/zkdata
clientPort=2181

3.2.2 修改consumer.properties配置文件

进入Kafka的config目录下,修改consumer. properties配置文件,具体内容如下:

[root@hadoop1 local]# vim /usr/local/kafka/config/consumer.properties

修改如下内容:

bootstrap.servers=hadoop1:9092,hadoop2:9092,hadoop3:9092

备注:hadoop1:9092,hadoop2:9092,hadoop3:9092 为集群hadoop地址

3.2.3 修改producer.properties配置

进入Kafka的config目录中,修改producer. properties配置文件,具体内容如下:

[root@hadoop1 local]# vim /usr/local/kafka/config/producer.properties 

修改内容如下:

bootstrap.servers=hadoop1:9092,hadoop2:9092,hadoop3:9092

3.2.4 修改server.properties配置

进入Kafka的config目录下,修改server. properties配置文件,具体内容如下:

[root@hadoop1 local]# vim /usr/local/kafka/config/server.properties 

修改内容如下:

zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181

3.3 修改 Kafka 配置同步到其他节点

将hadoop1节点中配置好的Kafka安装目录分发给hadoop2和hadoop3节点,具体操作如下所示:

[root@hadoop1 local]# deploy.sh /usr/local/kafka_2.12-2.8.2 /usr/local/ slave

给从节点创建软链接:

[root@hadoop1 local]# runRemoteCmd.sh "ln -s /usr/local/kafka_2.12-2.8.2 /usr/local/kafka" slave

备注:deploy.sh 是集群推送脚本,可以参考《ZooKeeper 集群的详细部署》

3.4 修改 Kafka Server 编号

登录hadoop1、hadoop2和hadoop3节点,分别进入Kafka的config目录下,修改server.properties配置文件中的broker.id项,具体操作如下所示:
[root@hadoop1 local]# vim /usr/local/kafka/config/server.properties
#标识hadoop1节点
broker.id=1
[root@hadoop2 local]# vim /usr/local/kafka/config/server.properties
#标识hadoop2节点
broker.id=2
[root@hadoop3 local]# vim /usr/local/kafka/config/server.properties
#标识hadoop3节点
broker.id=3

3.5 启动Kafka 集群

Zookeeper管理着Kafka Broker集群,同时Kafka将元数据信息保存在Zookeeper中,说明Kafka集群依赖Zookeeper提供协调服务,所以需要先启动Zookeeper集群,然后再启动Kafka集群。

3.5.1 启动Zookeeper集群

在集群各个节点中进入Zookeeper安装目录,使用如下命令启动Zookeeper集群。

# 启动集群
[root@hadoop1 local]# runRemoteCmd.sh "/usr/local/zookeeper/bin/zkServer.sh start" all
# 查看zookeeper 集群状态
[root@hadoop1 local]# runRemoteCmd.sh "/usr/local/zookeeper/bin/zkServer.sh status" all

在这里插入图片描述

3.5.1 启动 Kafka 集群

在集群各个节点中进入Kafka安装目录,使用如下命令启动Kafka集群。

[root@hadoop1 local]# runRemoteCmd.sh "/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties" all

在这里插入图片描述
显示 Kafka 已经启动。

3.6 Kafka 集群测试

Kafka自带有很多种Shell脚本供用户使用,包含生产消息、消费消息、Topic管理等功能。接下来利用Kafka Shell脚本测试使用Kafka集群。

3.6.1 创建Topic

使用Kafka的bin目录下的kafka-topics.sh脚本,通过create命令创建名为test的Topic,具体操作如下所示。

[root@hadoop1 local]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 3 --partitions 3

上述命令中,–zookeeper 指定 Zookeeper 集群;–create 是创建 Topic 命令;–topic指定Topic名称;–replication-factor 指定副本数量;–partitions指定分区个数。

在这里插入图片描述

3.6.2 查看Topic列表

通过list命令可以查看Kafka 的Topic列表,具体操作如下所示。

[root@hadoop1 kafka]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper hadoop1:2181  --list

在这里插入图片描述

3.6.2 查看Topic详情

通过describe命令查看Topic内部结构,具体操作如下所示。

[root@hadoop1 kafka]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper hadoop1:2181 --describe --topic test

在这里插入图片描述

3.6.3 消费者消费Topic

在hadoop1节点上,通过Kafka自带的kafka-console-consumer.sh脚本,开启消费者消费 test中的消息。

[root@hadoop1 kafka]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic test

在这里插入图片描述

3.6.4 生产者向Topic发送消息

在hadoop1节点上,通过Kafka自带的kafka-console-producer.sh脚本启动生产者,然后向 test发送3条消息,具体操作如下所示。

[root@hadoop1 logs]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list  hadoop1:9092 --topic test

生成者输入:
在这里插入图片描述
消费者展示:
在这里插入图片描述

四、案例实践:Flume 与 Kafka 集成开发

在 《Flume 日志采集系统》 的基础上进行 kafka 集成开发

4.1 配置Flume聚合服务

在 hadoop2 和 hadoop3 服务器配置分配配置 Flume 聚合服务

[root@hadoop1 conf]# vim /usr/local/flume/conf/avro-file-selector-kafka.properties
[root@hadoop2 conf]# vim /usr/local/flume/conf/avro-file-selector-kafka.properties

分别写入如下内容并保存:

#定义source、channel、sink的名称
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
# 定义和配置一个avro Source
agent1.sources.r1.type = avro
agent1.sources.r1.channels = c1
agent1.sources.r1.bind = 0.0.0.0
agent1.sources.r1.port = 1234
# 定义和配置一个file channel
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir = /usr/local/data/flume/checkpointDir
agent1.channels.c1.dataDirs = /usr/local/data/flume/dataDirs
# 定义和配置一个kafka sink
agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.topic = test
agent1.sinks.k1.brokerList = hadoop1:9092,hadoop2:9092,hadoop3:9092
agent1.sinks.k1.producer.acks = 1
agent1.sinks.k1.channel = c1

4.2 Flume与Kafka集成测试

4.2.1 启动Flume聚合服务

在 采集服务器 hadoop2 和 hadoop3 分别启动聚合服务

[root@hadoop2 conf]# /usr/local/flume/bin/flume-ng agent -n agent1 -c conf -f /usr/local/flume/conf/avro-file-selector-kafka.properties -Dflume.root.logger=INFO,console[root@hadoop3 local]# /usr/local/flume/bin/flume-ng agent -n agent1 -c conf -f /usr/local/flume/conf/avro-file-selector-kafka.properties -Dflume.root.logger=INFO,console

在这里插入图片描述

4.2.2 启动 Flume 采集服务

在 Hadoop1 启动 Flume 采集脚本:

[root@hadoop1 conf]# /usr/local/flume/bin/flume-ng agent -n agent1 -c conf -f /usr/local/flume/conf/taildir-file-selector-avro.properties -Dflume.root.logger=INFO,console

在这里插入图片描述
正常启动 Flume 采集脚本

4.2.3 启动 Kafka 消费者服务

在 hadoop1 启动 Kafka 消费者服务脚本

[root@hadoop1 data]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic test

在这里插入图片描述

4.2.4 准备测试数据

在 hadoop1 另开连接,执行如下脚本:

[root@hadoop1 logs]# echo '00:00:100971413028304674[火炬传递路线时间]1 2www.olympic.cn/news/beijing/2008-03-19/1417291.html' >> /usr/local/data/flume/logs/sogou.log

输入三条测试数据
在这里插入图片描述

消费者打印三条测试数据:
在这里插入图片描述
至此,案例测试成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/419104.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

axure之变量

一、设置我们的第一个变量 1、点击axure上方设置一个全局变量a 3 2、加入按钮、文本框元件点击按钮文档框展示变量值。 交互选择【单击时】【设置文本】再点击函数。 点击插入变量和函数直接选择刚刚定义的全局变量,也可以直接手动写入函数(注意写入格式。) 这…

Gitflow基础知识

0.理想状态 现状 听完后的理想状态 没使用过 git 知道 git 是什么,会用 git 基础流程命令 用过 git,但只通过图形化界面操作 脱离图形化界面操作,通过 git 命令操作 会 git 命令 掌握 gitflow 规范,合理使用 rebase 和解决…

spark sql 优化

1. 配置 比例内存 : core 1:2 2. 增加 core 数可以增加 执行任务的 线程数 3. 计算有大表,并发生shuffle 时,生成的任务数是由spark.sql.shuffle.partitions 决定的,所以针对大表shuffle ,要增加spark.sql.shuffle.partitio…

C++笔记20•数据结构:哈希(Hash)•

哈希 1.无序的关联式容器(unordered_map&unordered_set) unordered_map与unordered_set几乎与map与set是一样的,只是性能unordered_map与unordered_set比map与set更优一些。还有就是unordered_map与unordered_set是无序的,…

差异基因富集分析(R语言——GOKEGGGSEA)

接着上次的内容,上篇内容给大家分享了基因表达量怎么做分组差异分析,从而获得差异基因集,想了解的可以去看一下,这篇主要给大家分享一下得到显著差异基因集后怎么做一下通路富集。 1.准备差异基因集 我就直接把上次分享的拿到这…

服务器流量监控工具vnStat的简单使用以及关于Linux的软中断信号(signal)的一点内容

一、服务器流量监控工具vnStat的简单使用 vnStat是为Linux和BSD设计的基于控制台的网络流量监控工具,通过它可以非常方便在命令行查看流量统计情况。它可以保留某个或多个所选择的网络接口的网络流量日志。为了生成日志,vnStat使用内核提供的信息。换句话…

misc流量分析

一、wireshark语法 1、wireshark过滤语法 (1)过滤IP地址 ip.srcx.x..x.x 过滤源IP地址 ip.dstx.x.x.x 过滤目的IP ip.addrx.x.x.x 过滤某个IP (2)过滤端口号 tcp.port80tcp.srcport80 显示TCP的源端口80tcp.dstport80 显示…

Python和C++多尺度导图

🎯要点 热化学属性观测蒙特卡罗似然比灵敏度分析时间尺度上动力学化学催化反应动力学建模自动微分电化学分析模型反应动力学数学模型渔业生态不确定性模型敏感性分析空间统计地理模型分析技术多维数据表征实现生成艺术图案流苏物体长度比,面积比和复杂度…

深度学习实战:如何利用CNN实现人脸识别考勤系统

1. 何为CNN及其在人脸识别中的应用 卷积神经网络(CNN)是深度学习中的核心技术之一,擅长处理图像数据。CNN通过卷积层提取图像的局部特征,在人脸识别领域尤其适用。CNN的多个层次可以逐步提取面部的特征,最终实现精确的…

Django+Vue3前后端分离学习(二)(重写User类)

一、重写User类: 1、首先导入User类: from django.contrib.auth.models import User 2、然后点在User上,按住ctrl 点进去,发现 User类继承AbstractUser Ctrl点进去AbstractUser,然后将此方法全部复制到自己APP的mo…

3 html5之css新选择器和属性

要说css的变化那是发展比较快的,新增的选择器也很多,而且还有很多都是比较实用的。这里举出一些案例,看看你平时都是否用过。 1 新增的一些写法: 1.1 导入css 这个是非常好的一个变化。这样可以让我们将css拆分成公共部分或者多…

WebDriver与Chrome DevTools Protocol:如何在浏览器自动化中提升效率

介绍 随着互联网数据的爆炸式增长,爬虫技术成为了获取信息的重要工具。在实际应用中,如何提升浏览器自动化的效率是开发者常常面临的挑战。Chrome DevTools Protocol(CDP)与Selenium WebDriver相结合,为浏览器自动化提…

还不会剪音乐?试试这四款在线音频剪辑

音频剪辑很多人都没有接触过。其实这并不是一个难事,我们甚至可以用一些简单的工具来给自己做个简单的BGM,最近我尝试了几款不同的音频剪辑工具。今天就来跟大家分享一下我的使用体验,看看哪款工具更适合你的需求。 一、福昕音频剪辑 网址&…

Oracle rman 没有0级时1级备份和0级大小一样,可以用来做恢复 resetlogs后也可以

文档说了 full backup 不能 用于后续的level 1,没说level 1没有level 0 是不是level 1就是level 0? GOAL What are incremental backups? Why are archivelogs still required to recover a database from an online incremental backup? Discuss th…

python学习13:对excel格式文件进行读写操作

读取excel的话需要下载第三方库: 常用的库:xlrd(读),xlwt(写),xlutils,openpyxl[-----pip install xxx-------] 这里推荐openpyxl pip install openpyxl excel读取的基本操作 # 2)基本操作: # 2.1)打开文件,获取工作簿 filename rD:\stdutyZiLiao\pythoneProje…

动态化-鸿蒙跨端方案介绍

一、背景 👉 华为在2023.9.25官方发布会上宣布,新的鸿蒙系统将不再兼容安卓应用,这意味着,包括京东金融APP在内的所有安卓应用,在新的鸿蒙系统上将无法运行,需要重新开发专门适用于新鸿蒙系统的专版APP。 …

日语输入法平假名和片假名切换

在学日语输入法的时候,我们在使用罗马音输入的时候,在进行平假名和片假名切换: 1、使用电脑在打字,日语输入法切换的时候使用 Shift Alt 如果日语输入法显示为 A 需要切换为 あ的话可以按Caps Lock键 。(相当于中文…

zblog自动生成文章插件(百度AI写作配图,图文并茂)

最近工作比较忙,导致自己的几个网站都无法手动更新,于是乎也想偷个懒把,让AI帮忙打理下自己的网站。我接触chatgpt等AI工具还是比较早了,从openai推出gpt3.5就一直在用,说实话,开始的时候用AI自动更新网站还…

「C++系列」日期/时间

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站&#xff1a;人工智能教程 文章目录 一、日期/时间1. C标准库&#xff08;C20之前&#xff09;<ctime>库中的关键组件&#xff1a; 2…

lnmp - tp6.0的安装和简单使用

概述 使用了很长时间的Mac M2芯片的电脑在之前使用虚拟机之前总有一些bug不是那么好用&#xff0c;周末之余重新安装了一下centos虚拟机&#xff0c;搭建了lnmp环境&#xff0c;打算自己挤时间&#xff0c;做一点应用&#xff0c;作为一次新的小小的尝试。 安装&更新 ce…