1、Kafka 安装与简单使用

第 1 章 Kafka 概述
1.1 定义

Kafka传统定义: Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
Kafka最新定义 : Kafka是 一个开源的 分 布式事件流平台 (Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

在这里插入图片描述

1.2 消息队列
目 前企 业中比 较常 见的 消息 队列产 品主 要有 Kafka、ActiveMQ 、RabbitMQ 、RocketMQ 等。
在大数据场景主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。

1.2.1 传统消息队列的应用场景

传统的消息队列的主要应用场景包括:缓存/消峰、解耦和异步通信。

缓冲/消峰: 有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况

在这里插入图片描述
解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束

在这里插入图片描述
异步通信: 允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。

在这里插入图片描述
1.2.2 消息队列的两种模式

1)点对点模式
• 消费者主动拉取数据,消息收到后清除消息
在这里插入图片描述
2)发布/订阅模式
• 可以有多个topic主题(浏览、点赞、收藏、评论等)
• 消费者消费数据之后,不删除数据
• 每个消费者相互独立,都可以消费到数据
在这里插入图片描述

1.3 Kafka 基础架构
在这里插入图片描述
(1)Producer:消息生产者,就是向 Kafka broker 发消息的客户端。
(2)Consumer:消息消费者,向 Kafka broker 取消息的客户端。
(3)Consumer Group(CG):消费者组,由多个 consumer 组成。消费者组内每个消
费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不
影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
(4)Broker:一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个
broker 可以容纳多个 topic。
(5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个 topic。
(6)Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服
务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
(7)Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个
Follower。
(8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数
据的对象都是 Leader。
(9)Follower:每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和
Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。

第 2 章 Kafka 快速入门

2.1 安装部署
2.1.1 集群规划
在这里插入图片描述
2.1.2 集群部署
0)官方下载地址:http://kafka.apache.org/downloads.html
1)解压安装包

[hadoop102 software]$tar -zxvf kafka_2.12-3.0.0.tgz -C 
/opt/module/

2)修改解压后的文件名称

[hadoop102 module]$ mv kafka_2.12-3.0.0/ kafka

3)进入到/opt/module/kafka 目录,修改配置文件

[hadoop102 kafka]$ cd config/
[hadoop102 config]$ vim server.properties

输入以下内容:

#broker 的全局唯一编号,不能重复,只能是数字。
broker.id=0
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以
配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个 topic 创建时的副本数,默认时 1 个副本
offsets.topic.replication.factor=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/ka
fka

4)分发安装包

[hadoop102 module]$ xsync kafka/

5)分别在 hadoop103 和 hadoop104 上修改配置文件/opt/module/kafka/config/server.properties
中的 broker.id=1、broker.id=2
注:broker.id 不得重复,整个集群中唯一。

[hadoop103 module]$ vim kafka/config/server.properties
修改:
# The id of the broker. This must be set to a unique integer for 
each broker.
broker.id=1
[atguigu@hadoop104 module]$ vim kafka/config/server.properties
修改:
# The id of the broker. This must be set to a unique integer for 
each broker.
broker.id=2

6)配置环境变量
(1)在/etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置

 sudo vim /etc/profile.d/my_env.sh

增加如下内容:
#KAFKA_HOME

export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

(2)刷新一下环境变量。

[hadoop102 module]$ source /etc/profile

(3)分发环境变量文件到其他节点,并 source。

[hadoop102 module]$ sudo /home/atguigu/bin/xsync 
/etc/profile.d/my_env.sh
[hadoop103 module]$ source /etc/profile
[hadoop104 module]$ source /etc/profile

7)启动集群
(1)先启动 Zookeeper 集群,然后启动 Kafka。
[atguigu@hadoop102 kafka]$ zk.sh start
(2)依次在 hadoop102、hadoop103、hadoop104 节点上启动 Kafka。

[hadoop102 kafka]$ bin/kafka-server-start.sh -daemon
config/server.properties
[hadoop103 kafka]$ bin/kafka-server-start.sh -daemon
config/server.properties
[hadoop104 kafka]$ bin/kafka-server-start.sh -daemon
config/server.properties

注意:配置文件的路径要能够到 server.properties

8)关闭集群

[hadoop102 kafka]$ bin/kafka-server-stop.sh 
[ahadoop103 kafka]$ bin/kafka-server-stop.sh 
[ahadoop104 kafka]$ bin/kafka-server-stop.sh

2.1.3 集群启停脚本
1)在/home/atguigu/bin 目录下创建文件 kf.sh 脚本文件

[hadoop102 bin]$ vim kf.sh

脚本如下:

#! /bin/bash
case $1 in
"start"){for i in hadoop102 hadoop103 hadoop104doecho " --------启动 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -
daemon /opt/module/kafka/config/server.properties"done
};;
"stop"){for i in hadoop102 hadoop103 hadoop104doecho " --------停止 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "done
};;
esac

2)添加执行权限

[hadoop102 bin]$ chmod +x kf.sh

3)启动集群命令

[hadoop102 ~]$ kf.sh start

4)停止集群命令

[hadoop102 ~]$ kf.sh stop

注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper
集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,
Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。

2.2 Kafka 命令行操作
在这里插入图片描述
2.2.1 主题命令行操作
1)查看操作主题命令参数

[hadoop102 kafka]$ bin/kafka-topics.sh

在这里插入图片描述
2)查看当前服务器中的所有 topic

[ahadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
hadoop102:9092 --list

3)创建 first topic

[hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
hadoop102:9092 --create --partitions 1 --replication-factor 3 --
topic first

选项说明:
–topic 定义 topic 名
–replication-factor 定义副本数
–partitions 定义分区数
4)查看 first 主题的详情

[hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
hadoop102:9092 --describe --topic first

5)修改分区数(注意:分区数只能增加,不能减少)

[hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
hadoop102:9092 --alter --topic first --partitions 3

6)再次查看 first 主题的详情

[hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
hadoop102:9092 --describe --topic first

7)删除 topic

[hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
hadoop102:9092 --delete --topic first

2.2.2 生产者命令行操作
1)查看操作生产者命令参数

[hadoop102 kafka]$ bin/kafka-console-producer.sh

在这里插入图片描述
2)发送消息

[hadoop102 kafka]$ bin/kafka-console-producer.sh --
bootstrap-server hadoop102:9092 --topic first
>hello world
>atguigu atguigu

2.2.3 消费者命令行操作
1)查看操作消费者命令参数

[hadoop102 kafka]$ bin/kafka-console-consumer.sh

在这里插入图片描述
2)消费消息
(1)消费 first 主题中的数据。

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

(2)把主题中所有的数据都读取出来(包括历史数据)。

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --from-beginning --topic first

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/146522.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《CTFshow-Web入门》10. Web 91~110

Web 入门 索引web91题解总结 web92题解总结 web93题解 web94题解 web95题解 web96题解 web97题解 web98题解 web99题解总结 web100题解 web101题解 web102题解 web103题解 web104题解 web105题解总结 web106题解 web107题解 web108题解 web109题解 web110题解 ctf - web入门 索…

Scala第十四章节

Scala第十四章节 1. 隐式转换和隐式参数介绍 2. 隐式转换 3. 隐式参数 4. 案例: 获取列表元素平均值 scala总目录 文档资料下载

大数据Flink(九十):Lookup Join(维表 Join)

文章目录 Lookup Join(维表 Join) Lookup Join(维表 Join) Lookup Join 定义(支持 Batch\Streaming):Lookup Join 其实就是维表 Join,比如拿离线数仓来说,常常会有用户画像,设备画像等数据,而对应到实时数仓场景中,这种实时获取外部缓存的 Join 就叫做维表 Join。…

10.1 今日任务:select实现服务器并发

#include <myhead.h>#define ERR_MSG(msg) do{\fprintf(stderr, "__%d__:", __LINE__); \perror(msg);\ }while(0)#define PORT 8888 //端口号&#xff0c;范围1024~49151 #define IP "192.168.112.115" //本机IP&#xff0c;ifco…

Qt自定义菜单

Qt开发过程中&#xff0c;弹出菜单时我们一般使用QMenu,但是QMenu都是一条项固定的格式&#xff0c;如查想要自己的设计界面就没法使用默认的Action项了&#xff0c;因此我们得用自定义的QMenu。 本篇介绍使用自定义的QMenu设计出UI。我们使用QWidget QWidgetAction来实现。Q…

Spring注册Bean系列--方法1:@Component

原文网址&#xff1a;Spring注册Bean系列--方法1&#xff1a;Component_IT利刃出鞘的博客-CSDN博客 简介 本文介绍Spring注册Bean的方法&#xff1a;Component。 注册Bean的方法我写了一个系列&#xff0c;见&#xff1a;Spring注册Bean(提供Bean)系列--方法大全_IT利刃出鞘…

前期开发用最内聚环境,最直观简单的方法管理代码

代码稍微一多我们就会思考代码如何去规划和管理&#xff0c;首先想到MVN等管理工具去帮助我们&#xff0c;结果配置了好长时间感觉不是很方便。首先因为没有系统去了解这个工具的使用方法&#xff0c;另外发现IDEA在原本松散的工具之间以插件的形式做了一定的配置&#xff0c;将…

数据分析三剑客之一:Numpy详解及实战

1 NumPy介绍 NumPy 软件包是Python生态系统中数据分析、机器学习和科学计算的主力军。它极大地简化了向量和矩阵的操作处理。Python的一些主要软件包&#xff08;如 scikit-learn、SciPy、pandas 和 tensorflow&#xff09;都以 NumPy 作为其架构的基础部分。除了能对数值数据…

基于Java的城市天然气费管理系统的设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09;有保障的售后福利 代码参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作…

QT:鼠标画线(双画布)

widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QPoint> //点 #include <QMouseEvent> //鼠标事件 #include <QPaintEvent> //绘图事件class Widget : public QWidget {Q_OBJECTpublic:Widget(QWidget *parent 0);~Wi…

大模型 Decoder 的生成策略

本文将介绍以下内容&#xff1a; IntroductionGreedy Searchbeam searchSamplingTop-K SamplingTop-p (nucleus) sampling总结 一、Introduction 1、简介 近年来&#xff0c;由于在数百万个网页数据上训练的大型基于 Transformer 的语言模型的兴起&#xff0c;开放式语言生…

[题]修剪草坪 #单调队列优化

题目 洛谷上的题目 Acwing上的题目 根据y总的一波分析&#xff0c;我们得出……公式就是一切…… 所以&#xff0c;我要学会推公式…… 推公式…… 公式…… #include<bits/stdc.h> using namespace std; typedef long long ll; const int N 1e5 10; int n, m; ll s[N…

Matlab随机数的产生

目录 1、常见分布随机数的产生 1.1 二项分布 1.2 泊松分布 1.3 几何分布 1.4 均匀分布&#xff08;离散&#xff0c;等可能分布&#xff09; 1.5 均匀分布&#xff08;连续型等可能&#xff09; 1.6 指数分布&#xff08;描述“寿命”问题&#xff09; 1.7 正态分布 1.8…

支持向量机SVM:从数学原理到实际应用

目录 一、引言背景SVM算法的重要性 二、SVM基础线性分类器简介什么是支持向量&#xff1f;超平面和决策边界SVM的目标函数 三、数学背景和优化拉格朗日乘子法&#xff08;Lagrange Multipliers&#xff09;KKT条件核技巧&#xff08;Kernel Trick&#xff09;双重问题和主问题&…

[JAVAee]MyBatis

目录 MyBatis简介 MyBatis的准备工作 框架的添加 连接数据库字符串的配置 MyBatis中XML路径的配置 ​编辑 MyBatis的使用 各层的实现 进行数据库操作 增加操作 拓展 修改操作 删除操作 查询操作 结果映射 单表查询 多表查询 like模糊查询 动态SQL / MyBa…

传输层协议—UDP协议

传输层协议—UDP协议 文章目录 传输层协议—UDP协议传输层再谈端口号端口号范围划分pidofnetstat UDP协议端格式UDP报文UDP特点UDP缓冲区基于UDP的应用层协议 传输层 在学习HTTP/HTTPS等应用层协议时&#xff0c;为了方便理解&#xff0c;可以简单认为HTTP将请求和响应直接发送…

【算法优选】双指针专题——贰

文章目录 &#x1f60e;前言&#x1f332;[快乐数](https://leetcode.cn/problems/happy-number/)&#x1f6a9;题目描述&#x1f6a9;题⽬分析&#xff1a;&#x1f6a9;算法思路&#xff1a;&#x1f6a9;代码实现&#xff1a; &#x1f38b;[盛水最多的容器](https://leetco…

CocosCreator3.8研究笔记(二十二)CocosCreator 动画系统-动画剪辑和动画组件介绍

国庆假期&#xff0c;闲着没事&#xff0c;在家研究技术~ 大家都知道在Cocos Creator3.x 的版本的动画编辑器中&#xff0c;可以实现不用写一行代码就能实现各种动态效果。 Cocos Creator动画编辑器中主要实现关键帧动画&#xff0c;不仅支持位移、旋转、缩放、帧动画&#xff…

【算法】算法基础课模板大全

一、基础算法 快速排序算法模板 void quick_sort(int q[], int l, int r) {//递归的终止情况if (l > r) return;//选取分界线。这里选数组中间那个数int i l - 1, j r 1, x q[l r >> 1];//划分成左右两个部分while (i < j){do i ; while (q[i] < x);do …

正点原子嵌入式linux驱动开发——TF-A初探

上一篇笔记中&#xff0c;正点原子的文档简单讲解了一下什么是TF-A&#xff0c;并且也学习了如何编译TF-A。但是TF-A是如何运行的&#xff0c;它的一个运行流程并未涉及。TF-A的详细运行过程是很复杂的&#xff0c;涉及到很多ARM处理器底层知识&#xff0c;所以这一篇笔记的内容…