kafka介绍,kafka集群环境搭建,kafka命令测试,C++实现kafka客户端

目录

  • kafka介绍
  • kafka集群环境搭建
    • zookeeper安装与配置
    • kafka安装与配置
  • kafka命令测试
  • C++实现kafka客户端
    • librdkafka库编译
    • 新版本cmake编译
    • cppkafka库编译
    • C++实现kafka生产者和消费者客户端

kafka介绍

定义与概述
Apache Kafka 是一个开源的分布式流处理平台,最初由 LinkedIn 开发,后来贡献给了 Apache 软件基金会。它被设计用于处理实时数据流,能够以高吞吐量、低延迟的方式处理大量的消息。Kafka 本质上是一个消息队列,但它在传统消息队列的基础上进行了扩展,更适合处理大规模的实时数据。
Kafka特点是生产者消费者模式,先进先出(FIFO)保证顺序,自己不丢数据,默认每隔7天清理数据。
核心概念

  • 主题(Topic):是 Kafka 中消息的逻辑分类,类似于数据库中的表或者文件系统中的文件夹。生产者将消息发布到特定的主题,消费者从主题中订阅并消费消息。
  • 生产者(Producer):负责将消息发送到 Kafka 的主题中。生产者可以根据需要将消息发送到不同的主题,并且可以控制消息的分区。
  • 消费者(Consumer):从 Kafka 的主题中读取消息。消费者通常以消费者组(Consumer Group)的形式存在,同一个消费者组中的消费者可以共同消费一个主题中的消息,从而实现负载均衡。
  • 分区(Partition):每个主题可以被划分为多个分区,分区是 Kafka 实现分布式的基础。分区中的消息是有序的,但不同分区之间的消息顺序是不确定的。通过分区,Kafka 可以实现消息的并行处理,提高系统的吞吐量。
  • 代理(Broker):Kafka 集群由多个代理组成,每个代理是一个独立的 Kafka 服务器。代理负责存储和管理消息,客户端(生产者和消费者)通过与代理进行通信来发送和接收消息。
    应用场景
  • 日志收集:Kafka 可以作为日志收集系统的核心组件,将各个服务产生的日志收集到 Kafka 中,然后由其他系统进行处理和分析。
  • 消息系统:作为传统的消息队列使用,实现系统之间的解耦和异步通信。
  • 流式处理:结合 Kafka Streams 等流处理库,对实时数据流进行处理和分析,例如实时计算、实时监控等。
    在这里插入图片描述

Kafka 与 ZooKeeper 的关系
在早期版本的 Kafka 中,ZooKeeper 是 Kafka 集群的重要组成部分,Kafka 使用 ZooKeeper 来存储集群的元数据,如主题、分区、代理的信息,以及进行领导者选举等操作。但从 Kafka 2.8 版本开始,Kafka 引入了 KRaft 模式,该模式允许 Kafka 集群在不依赖 ZooKeeper 的情况下运行,不过目前很多生产环境依然在使用 Kafka 与 ZooKeeper 结合的部署方式。
在这里插入图片描述

kafka集群环境搭建

使用三个centos7虚拟机模拟集群,三个虚拟机修改主机名:

hostnamectl set-hostname node0
hostnamectl set-hostname node1
hostnamectl set-hostname node2

配置hosts: vim /etc/hosts

192.168.3.147 node0
192.168.3.146 node1
192.168.3.145 node2

zookeeper安装与配置

官网下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
下载二进制文件包,这里下载:apache-zookeeper-3.9.3-bin.tar.gz

1、把压缩包拷贝到一个节点主机,比如node0主机,执行:

mkdir -p /opt/workspace
#压缩包拷贝到该目录
cd /opt/workspace
tar xvf apache-zookeeper-3.9.3-bin.tar.gz
mv apache-zookeeper-3.9.3-bin zookeeper

2、创建和修改配置文件:

cd /opt/workspace/zookeeper
# zookeeper数据存放路径
mkdir -p data/zkdata
# zookeeper日志存放路径
mkdir -p data/zklog
cp conf/zoo_sample.cfg conf/zoo.cfgvim conf/zoo.cfg

3、修改如下字段,注意字段后面不能带空格

dataDir=/opt/workspace/zookeeper/data/zkdata
dataLogDir=/opt/workspace/zookeeper/data/zklogserver.1=node0:2888:3888
server.2=node1:2888:3888
server.3=node2:2888:3888

4、创建myid文件

cd /opt/workspace/zookeeper/data/zkdata/
vim myid
# 输入内容1,保存。

5、其他节点node1和node2创建工作目录

mkdir -p /opt/workspace

6、从node0节点把zookeeper文件夹拷贝到node1和node2

scp -r zookeeper/ root@node1:/opt/workspace/
scp -r zookeeper/ root@node2:/opt/workspace/

在node1和node2把myid文件内容分别修改为2和3.

7、三个节点配置zookeeper环境变量

vim /etc/profile
# 文件最下面添加如下内容
export ZOOKEEPER_HOME=/opt/workspace/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH

使环境变量生效:

source /etc/profile

8、启动zookeeper集群,分别在三个节点执行,start后等待一会再执行status查询状态

zkServer.sh start
zkServer.sh status

主节点
在这里插入图片描述
从节点
在这里插入图片描述

kafka安装与配置

下载地址:https://kafka.apache.org/downloads
下载安装包:kafka_2.12-3.9.0.tgz

1、部署到node0主机

cd /opt/workspace/
# 安装包拷贝到当前目录
tar xvf kafka_2.12-3.9.0.tgz
mv kafka_2.12-3.9.0 kafka

2、修改kafka配置

cd kafka/
mkdir logs
vim config/server.properties

重点修改下面3个参数

broker.id=1
log.dirs=/opt/workspace/kafka/logs
zookeeper.connect=node0:2181,node1:2181,node2:2181

3、拷贝到node1和node2主机,分别修改server.properties对应的broker.id为2、3

scp -r kafka root@node1:/opt/workspace
scp -r kafka root@node2:/opt/workspace

4、三个节点配置kafka环境变量

vim /etc/profile
# 文件最下面添加如下内容
export KAFKA_HOME=/opt/workspace/kafka
export PATH=$KAFKA_HOME/bin:$PATH

使环境变量生效:

source /etc/profile

5、启动kafka集群
确保zookeeper集群已启动,然后三个节点执行:

kafka-server-start.sh -daemon /opt/workspace/kafka/config/server.properties &

在这里插入图片描述
停止kafka服务。

kafka-server-stop.sh

6、查看kafka进程

# 没有则安装jps
yum install -y  java-1.8.0-openjdk-devel.x86_64
jps

在这里插入图片描述
7、查看zookeeper上的broker节点

# 进入zookeeper命令
zookeeper-shell.sh localhost:2181
# 查看broker结点情况
ls /brokers/ids

通过下图可知三个节点都已经注册到zookeeper。
在这里插入图片描述

kafka命令测试

Topic(主题)命令

# 查看Topic列表
kafka-topics.sh --bootstrap-server localhost:9092 --list
# 创建Topic
# –partitions:分区数量
# –replication-factor:副本数量,不能大于broker数量
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic order --partitions 1 --replication-factor 3
# 查看Topic信息
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic order
# 修改Topic,分区数只能增加,不能减少;副本数不支持修改。
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic order --partitions 3
# 删除Topic
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic order

在这里插入图片描述
分区数量修改为3个:
在这里插入图片描述

生产者和消费者命令

# 生成消息
kafka-console-producer.sh  --bootstrap-server localhost:9092 --topic order
# 消费消息,从最新的地方开始
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic order
# 消费消息,从头开始
kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic order

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
消费者组,执行命令自动创建消费组。

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic order --group group_1
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic order --group group_2

如果两个kafka-console-consumer.sh命令指定了相同的消费组,则同一个消息只会被一个kafka-console-consumer.sh命令消费一次。
如果两个kafka-console-consumer.sh命令指定了不同的消费组,则同一个消息会被两个kafka-console-consumer.sh命令各消费一次。

# 查看消费者Group列表
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 查看消费者Group详情
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group_1
# 删除消费者Group
kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group group_1 --delete

正在消费的group不能被删除。
在这里插入图片描述

C++实现kafka客户端

librdkafka、cmake、libcppkafka都是从源码编译,并且不安装到系统目录。

librdkafka库编译

1、从源码编译。
下载地址:https://github.com/confluentinc/librdkafka/releases
下载源码包:librdkafka-2.8.0.tar.gz

# librdkafka选择不依赖openssl(本次当依赖openssl编译后生成动态库,调用动态库时报错找不到SSL_CTX_use_cert_and_key)
tar xvf librdkafka-2.8.0.tar.gz
cd librdkafka-2.8.0/
./configure
make
# C的封装库在src文件夹,C++的封装库在src-cpp文件夹

2、直接安装(不推荐使用)

yum install librdkafka-devel

新版本cmake编译

cppkafka需要cmake版本3.9.2以上,可能需要编译较新版本cmake。
还是从源码编译,下载地址:https://cmake.org/download/
下载最新版本:cmake-3.31.5.tar.gz

# 可能需要安装依赖openssl,也可选择不依赖openssl
yum -y install openssl-devel
xvf cmake-3.31.5.tar.gz
cd cmake-3.31.5/
./bootstrap
make
# 在bin目录生成可执行文件cmake
cd bin/
./cmake --version# 需要安装到系统目录时执行
make install

cppkafka库编译

cppkafka依赖librdkafka库。
下载地址:https://github.com/mfontanini/cppkafka/releases
下载源码包:cppkafka-0.4.1.tar.gz

tar xvf cppkafka-0.4.1.tar.gz
cd cppkafka-0.4.1
mkdir build
cd build/
# 使用刚刚编译的新版cmake构建,指定librdkafka库路径和头文件路径
# 注意:指定的头文件目录里面还有一级目录,完整路径:/root/code/librdkafka/librdkafka/rdkafkacpp.h
/root/soft/cmake-3.31.5/bin/cmake  -DRdKafka_LIBRARY_PATH=/root/code/librdkafka/lib/librdkafka++.so -DRdKafka_INCLUDE_DIR=/root/code/librdkafka ..
make
# 在cppkafka-0.4.1/build/src/lib64目录生成动态库libcppkafka.so.0.4.1

C++实现kafka生产者和消费者客户端

这里使用cppkafka自带的例子:cppkafka-0.4.1/examples/producer_example.cpp和consumer_example.cpp。

源码这里不贴了,地址在这里:
https://github.com/mfontanini/cppkafka/blob/master/examples/producer_example.cpp
https://github.com/mfontanini/cppkafka/blob/master/examples/consumer_example.cpp

简单写一个CMakeLists.txt:

cmake_minimum_required(VERSION 2.8) #设置cmake最低版本
project("consumer" CXX)	#设置项目名称
SET(PREFIX ${CMAKE_CURRENT_SOURCE_DIR})#设置普通变量,CMAKE_CURRENT_SOURCE_DIR为当前cmake文件目录
SET(BINDIR ${PREFIX})
message (">>> pwd = ${PREFIX}")#打印变量# 添加依赖头文件目录
SET(INCDIR${PREFIX}/include
)
INCLUDE_DIRECTORIES(${INCDIR})SET(SRCS# ${PREFIX}/producer_example.cpp${PREFIX}/consumer_example.cpp
)#添加依赖库目录
SET(LIBDIR${PREFIX}/lib
)
LINK_DIRECTORIES(${LIBDIR})#添加依赖库
SET(LIB_SO# -lssl# -lcrypto-lrdkafka-lcppkafka-lboost_program_options
)SET(RUN_MAIN "consumer")
#设置C++编译选项
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -std=c++11 -march=native -Wall")# 添加可执行文件,并配相关文件
ADD_EXECUTABLE(${RUN_MAIN} ${SRCS})TARGET_LINK_LIBRARIES(${RUN_MAIN} ${LIB_SO})#添加依赖库# 安装可执行文件到指定位置,并指定权限
INSTALL(TARGETS ${RUN_MAIN} DESTINATION ${BINDIR} PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE WORLD_EXECUTE)

把编译好的rdkafka和cppkafka头文件和库文件拷贝过来,项目结构如下:

├── CMakeLists.txt
├── consumer_example.cpp
├── include
│ ├── cppkafka
│ │ ├── buffer.h
│ │ ├── clonable_ptr.h
│ │ ├── CMakeLists.txt
│ │ ├── configuration_base.h
│ │ ├── configuration.h
│ │ ├── configuration_option.h
│ │ ├── consumer.h
│ │ ├── cppkafka.h
│ │ ├── detail
│ │ │ ├── callback_invoker.h
│ │ │ └── endianness.h
│ │ ├── error.h
│ │ ├── event.h
│ │ ├── exceptions.h
│ │ ├── group_information.h
│ │ ├── header.h
│ │ ├── header_list.h
│ │ ├── header_list_iterator.h
│ │ ├── kafka_handle_base.h
│ │ ├── logging.h
│ │ ├── macros.h
│ │ ├── message_builder.h
│ │ ├── message.h
│ │ ├── message_internal.h
│ │ ├── message_timestamp.h
│ │ ├── metadata.h
│ │ ├── producer.h
│ │ ├── queue.h
│ │ ├── topic_configuration.h
│ │ ├── topic.h
│ │ ├── topic_partition.h
│ │ ├── topic_partition_list.h
│ │ └── utils
│ │ ├── backoff_committer.h
│ │ ├── backoff_performer.h
│ │ ├── buffered_producer.h
│ │ ├── compacted_topic_processor.h
│ │ ├── consumer_dispatcher.h
│ │ ├── poll_interface.h
│ │ ├── poll_strategy_base.h
│ │ └── roundrobin_poll_strategy.h
│ └── librdkafka
│ ├── rdkafkacpp.h
│ ├── rdkafka.h
│ └── rdkafka_mock.h
├── lib
│ ├── libcppkafka.so -> libcppkafka.so.0.4.1
│ ├── libcppkafka.so.0.4.1
│ ├── librdkafka.a
│ ├── librdkafka++.a
│ ├── librdkafka_cgrp_synch.png
│ ├── librdkafka-dbg.a
│ ├── librdkafka+±dbg.a
│ ├── librdkafka-dbg.so.1
│ ├── librdkafka+±dbg.so.1
│ ├── librdkafka.so
│ ├── librdkafka++.so
│ ├── librdkafka.so.1
│ ├── librdkafka++.so.1
│ ├── librdkafka-static.a
│ └── librdkafka-static-dbg.a
└── producer_example.cpp

编译:

# 当前在CMakeLists.txt目录
mkdir build
cd build
/root/soft/cmake-3.31.5/bin/cmake ..
make
# 当前目录生成consumer,修改CMakeLists.txt再次编译生成producer

生产者,指定brokers、topic、partition(可选)。
生产者命令行输入信息,消费者可以接收到消息。
在这里插入图片描述
消费者,指定brokers、topic、group。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/17902.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为云+硅基流动使用Chatbox接入DeepSeek-R1满血版671B

华为云硅基流动使用Chatbox接入DeepSeek-R1满血版671B 硅基流动 1.1 注册登录 1.2 实名认证 1.3 创建API密钥 1.4 客户端工具 OllamaChatboxCherry StudioAnythingLLM 资源包下载: AI聊天本地客户端 接入Chatbox客户端 点击设置 选择SiliconFloW API 粘贴1.3创…

阿里云百炼平台对接DeepSeek官方文档

目录 1、支持的模型 2、快速开始 2.1、OpenAI兼容 2.1.1、python示例代码 返回结果 2.1.2、Node.js示例代码 返回结果 2.1.3、HTTP示例代码 返回结果 2.2、DashScope 2.2.1、python示例代码 返回结果 2.2.2、java示例代码 返回结果 2.2.3、HTTP代码示例 返回结…

【深度强化学习】策略梯度算法:REINFORCE

策略梯度 强化学习算法进阶 Q-learning、DQN 及 DQN 改进算法都是基于价值(value-based)的方法,其中 Q-learning 是处理有限状态的算法,而 DQN 可以用来解决连续状态的问题。在强化学习中,除了基于值函数的方法&#…

DeepSeek接口联调(postman版)

第一步:获取API key 获取APIkeys链接https://platform.deepseek.com/api_keys 点击创建 API key 即可免费生成一个key值,别忘记保存。 第二步:找到deepseek官方接口文档 文档地址:https://api-docs.deepseek.com/zh-cn/ 第三步…

Sublime Text 3 中的 Pylinter 配置

在 Sublime Text 3 中配置 Pylinter(如 pylint)来进行 Python 代码静态分析,可以帮助你提升代码质量、检测潜在的错误、强制遵守编码标准等。为了在 Sublime Text 3 中配置 pylint,你需要确保 pylint 已安装,并设置好相…

LC-搜索二维矩阵II、相交链表、反转链表、回文链表、环形链表、环形链表ll

搜索二维矩阵II 方法:从右上角开始搜索 我们可以从矩阵的右上角开始进行搜索。如果当前元素 matrix[i][j] 等于 target,我们直接返回 true。如果 matrix[i][j] 大于 target,说明 target 只能出现在左边的列,所以我们将列指针向左…

支持列表拖拽嵌套,AI流式输出的多模态文档编辑器flowmix/docx: 全面升级

hi, 大家好, 我是徐小夕. 马上又到周五了, 最近也收到很多用户对 flowmix/docx 多模态文档编辑器的反馈,我们也做了一波新功能的升级,今天就和大家分享一下 flowmix/docx 多模态文档编辑器的最新更新. 演示地址: https://flowmix.turntip.cn/docx 以下是…

服务器中部署大模型DeepSeek-R1 | 本地部署DeepSeek-R1大模型 | deepseek-r1部署详细教程

0. 部署前的准备 首先我们需要足够算力的机器,这里我在vultr中租了有一张A16显卡一共16GB显存的服务器作为演示。部署的模型参数为14b的。如果需要部署满血版本671b的,需要更大的算力支持,这里由于是个人资金有限,就演示14b的部署…

Linux软件编程(1)

1.总述: 2.标准io与文件io 标准C库提供的一套文件操作接口; Linux内核为Linux操作系统提供的一套文件操作接口。 3.函数接口: 注意 :什么是文件流? 数据从文件流入和流出体现的字节流。 注意:od -c 文件…

网页五子棋——通用模块

目录 项目创建 通用功能模块 错误码 自定义异常类 CommonResult jackson 加密工具 项目创建 使用 idea 创建 SpringBoot 项目,并引入相关依赖: 配置 MyBatis: 编辑 application.yml: spring:datasource: # 数据库连接配…

【工具】在idea运行go后端

场景:从gitee仓库下载一个go语言前后端分离项目,想跑通前后端 ---------------------------------------------------------------------------------------------------------------------- 后端 1.下载插件 在idea的setting里面输入go,…

通达信如何导出以往的分时数据

1当天分时数据的导出 以梦网科技为例,在分笔交易上面右键,选择“放大”,放大后选择“选项”,选择“数据导出”,弹出界面中修改路径与文件名即可。 2以往数据的导出 以梦网科技为例,今天是2025年2月14号…

【面试题系列】Java 多线程面试题深度解析

本文涉及Java 多线程面试题,从基础到高级,希望对你有所帮助! 一、基础概念类 1. 请简述 Java 中线程的几种状态及其转换条件 题目分析:这是多线程基础中的基础,考查对线程生命周期的理解,在多线程编程中&…

Java Virtual Machine(JVM)

JVM跨平台原理 跨平台:一次编译,到处运行 本质:不同操作系统上运行的JVM不一样,只需要把java程序编译成一份字节码文件,JVM执行不同的字节码文件。 Java是高级语言,提前编译一下(变成字节码文件…

duckdb导出Excel和导出CSV速度测试

运行duckdb数据库 D:>duckdb v1.2.0 5f5512b827 Enter “.help” for usage hints. Connected to a transient in-memory database. Use “.open FILENAME” to reopen on a persistent database. 生成模拟数据,10个列,100万行数据; --…

TCP/IP参考模型和网络协议

由于国防部担心他们一些重要的主机、路由器和互联网关可能会突然崩溃,所以网络必须实现的另一目标是网络不受子网硬件损失的影响,已经建立的会话不会被取消,而且整个体系结构必须相当灵活。 TCP/IP是一组用于实现网络互连的通信协议。Interne…

uniapp商场之订单模块【订单列表】

文章目录 前言一、准备静态结构(分包)二、Tabs滑动切换1.Tabs文字渲染2.点文字高亮切换3.swiper滑动切换三、Tabs页面跳转高亮四、订单列表渲染1.封装列表组件2.订单状态父传子3.封装请求API4.准备请求参数5.初始化调用6.页面渲染五、订单支付1.页面条件渲染2.事件绑定前言 …

【教程】MySQL数据库学习笔记(七)——多表操作(持续更新)

写在前面: 如果文章对你有帮助,记得点赞关注加收藏一波,利于以后需要的时候复习,多谢支持! 【MySQL数据库学习】系列文章 第一章 《认识与环境搭建》 第二章 《数据类型》 第三章 《数据定义语言DDL》 第四章 《数据操…

Mysql数据库

一.数据定义语言DDL 一.概述 DDL用于定义和管理数据库的结构 DDL关键字:1.CREATE; 2.ALTER; 3.DROP 二.SQL命名规定和规范 1.标识符命名规则 2.标识符命名规范 三.库管理 1. CREATE DATABASE 数据库名; 2. CREATE DATABASE IF NOT EXISTS 数据库名; 3. CREATE…

C++,STL容器适配器,priority_queue:优先队列深入解析

文章目录 一、容器概览与核心特性核心特性速览二、底层实现原理1. 二叉堆结构2. 容器适配器架构三、核心操作详解1. 容器初始化2. 元素操作接口3. 自定义优先队列四、实战应用场景1. 任务调度系统2. 合并K个有序链表五、性能优化策略1. 底层容器选择2. 批量建堆优化六、注意事项…