001 Kafka入门及安装

Kafka入门及安装

文章目录

  • Kafka入门及安装
    • 1.介绍
      • Kafka的基本概念和核心组件
    • 2.安装
      • 1.docker快速安装
          • zookeeper安装
          • kafka安装
        • 添加topic
        • 删除topic
        • kafka-ui安装
      • 2.Docker安装(SASL/PLAIN认证配置-用户名密码)

来源参考的deepseek,如有侵权联系立删

1.介绍

Kafka的基本概念和核心组件

Kafka是分布式流处理平台,由 Scala 和 Java 编写。Kafka 的核心概念和组件包括:

  1. Producer:发布消息的客户端,向Broker发送数据。
  2. Consumer:订阅消息的客户端,从Broker读取数据。
  3. Broker:Kafka服务节点,负责消息存储和转发。
  4. Topic:消息的逻辑分类单位,类似数据库表。
  5. Partition:主题的分区,实现并行处理和扩展性。
  6. Consumer Group:多个消费者组成的组,协同消费同一主题的不同分区。

详细介绍

  1. 主题(Topic) :主题是消息的逻辑分类单位,类似于数据库中的表。每个主题可以包含多个分区(Partition),用于实现水平扩展和高吞吐量。
  2. 分区(Partition) :分区是物理上的消息分组,每个分区是一个有序且不可变的消息队列。分区可以分布在不同的服务器上,以实现数据的分布式存储和处理。
  3. Broker(服务器) :Broker 是 Kafka 集群中的节点,负责接收和存储消息。一个集群可以由多个 Broker 组成,每个 Broker 可以管理多个分区和副本。
  4. 生产者(Producer) :生产者是向 Kafka 主题发送消息的客户端。生产者将消息发布到指定的主题中,消息会被路由到相应的分区。
  5. 消费者(Consumer) :消费者是从 Kafka 主题中拉取消息的客户端。消费者可以订阅一个或多个主题,并从这些主题中读取消息。消费者可以组成消费者组(Consumer Group),以实现消息的并行消费和故障恢复。
  6. 消费者组(Consumer Group) :消费者组由多个消费者组成,组内的每个消费者负责消费不同分区的消息,以实现高效的消息分发和容错能力。
  7. ZooKeeper:ZooKeeper 是一个分布式协调服务,用于管理 Kafka 集群的元数据,如主题、分区和副本的配置信息。ZooKeeper 还负责选举和维护集群的领导者。
  8. 日志(Log) :Kafka 使用日志来存储消息,每个主题的日志由多个分区组成,每个分区包含一系列有序的消息。日志可以进行压缩和持久化,以提高存储效率和数据可靠性。
  9. 偏移量(Offset) :偏移量用于跟踪消息的消费位置。消费者通过偏移量来确定从何处继续消费消息。
  10. 复制机制:Kafka 通过复制机制提高容错能力。每个分区可以有多个副本,即使部分 Broker 失效,服务仍可继续运行。

这些组件共同构成了 Kafka 的核心架构,使其能够实现高吞吐量、低延迟、可扩展性和持久化的消息传递和处理。

2.安装

1.docker快速安装

zookeeper安装
docker pull wurstmeister/zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
kafka安装
docker pull wurstmeister/kafka
docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper  \
-v /etc/localtime:/etc/localtime  \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka
--name kafka: 设置容器的名字为“kafka”。-p 9092:9092: 将容器的9092端口映射到宿主机的9092端口。--link zookeeper:zookeeper: 连接到名为“zookeeper”的另一个Docker容器,并且在当前的容器中可以通过zookeeper这个别名来访问它。--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181: 设置环境变量,指定ZooKeeper的连接字符串。--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092: 设置环境变量,指定Kafka的advertised listeners。--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092: 设置环境变量,指定Kafka的listeners。--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1: 设置环境变量,指定offsets topic的副本因子。wurstmeister/kafka: 使用的Docker镜像名字。/opt/kafka/config/server.properties  #镜像内部配置文件地址
./kafka-server-start.sh -daemon ./config/server.properties  #启动命令
添加topic
kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 \
--replication-factor 1 --partitions 1 --topic mytest
删除topic
kafka-topics.sh \--delete --topic lx \--zookeeper 127.0.0.1:2181
kafka-ui安装
docker run --name kafka-ui  -p 19092:8080 \-e KAFKA_CLUSTERS_0_NAME=local \-e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=127.0.0.1:9092 \-d provectuslabs/kafka-ui:latest         

2.Docker安装(SASL/PLAIN认证配置-用户名密码)

1.宿主机创建用户密码配置文件

#创建文件
touch   /dockerData/kafka/config/kafka_server_jaas.conf

2.编辑kafka_server_jaas.conf配置

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="123456"user_admin="123456"user_moshangshang="123456";
};                            
user_admin="123456"
user_后面跟的是用户名,然后123456为密码,可配置多个用户密码

3.运行kafka容器

docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper  \
-v /etc/localtime:/etc/localtime  \
-v /dockerData/kafka/config/kafka_server_jaas.conf:/opt/kafka/config/kafka_server_jaas.conf  \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env 192.168.1.250:9092 \
--env KAFKA_LISTENERS=SASL_PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" \
--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka

4.宿主机创建kafka的server.properties 配置

5.修改并添加认证相关配置

listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://192.168.1.250:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN

6.拷贝宿主机配置文件覆盖容器内部原本文件

docker  cp /dockerData/kafka/config/server.properties   kafka:/opt/kafka/config

7.重启容器
8.整合springboot的yml配置

#kafka配置
spring:kafka:#kafka集群地址bootstrap-servers: 192.168.25.100:9092producer:#批量发送的数据量大小batch-size: 1#可用发送数量的最大缓存buffer-memory: 33554432#key序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer#value序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer#达到多少时间后,会发送properties:linger.ms: 1security.protocol: SASL_PLAINTEXTsasl.mechanism: PLAINsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";#代表集群中从节点都持久化后才认为发送成功acks: -1consumer:enable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 15000security.protocol: SASL_PLAINTEXTsasl.mechanism: PLAINsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";group-id: testauto-offset-reset: earliestlistener:ack-mode: MANUAL # 精准控制offset提交concurrency: 3 # 并发消费者数type: batch

9.kafka-ui配置认证

docker run --name kafka-ui  -p 19092:8080 \-e KAFKA_CLUSTERS_0_NAME=local \-e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=192.168.1.250:9092 \-e KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=SASL_PLAINTEXT \-e KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM=PLAIN \-e KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username='moshangshang' password='123456';" \-d provectuslabs/kafka-ui:latest

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/25786.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

蓝桥真题讲解

目录 第一题 题目链接 题目解析 代码原理 代码编写 本题总结 第二题 题目链接 题目解析 代码原理 代码编写 本题总结 第三题 题目链接 题目解析 代码原理 代码编写 本题总结 第一题 题目链接 题目解析 代码原理 图一 图二 图二中的红色字,请仔…

Allegro PCB元件库文件引起的问题-看不见器件,但是不能预览,也就不能放置了

PCB元件库必须包含PCB(.psm, .dra)文件,和PAD(.pad)文件 在Allegro里Path设置注意事项: 针对psmpath & padpath会有多个文件夹地址的情况。在放一个新的元件到PCB,需要把对应的元件的PCB & PAD地址都放在第一…

Prompt Engineering for Large Language Models

题目 大型语言模型的快速工程 简介 随着 OpenAI 的 ChatGPT 和 Google 的 Bard 等软件的普及,大语言模型(LLM)已经渗透到生活和工作的许多方面。例如,ChatGPT 可用于提供定制食谱,建议替换缺失的成分。它可用于起草研…

python-leetcode-使用最小花费爬楼梯

746. 使用最小花费爬楼梯 - 力扣(LeetCode) 解法 1:动态规划(O(n) 时间,O(n) 空间) class Solution:def minCostClimbingStairs(self, cost: List[int]) -> int:n len(cost)dp [0] * (n 1) # 额外多…

服务器IPMI用户名、密码批量检查

背景 大规模服务器部署的时候,少不了较多的网管和监测平台,这些平台会去监控服务器的性能、硬件等指标参数,为了便于管理和控制,则需要给服务器IPMI带外管理添加较多的用户,这就需要对较多的服务器检查所对应的IPMI用…

docker-compose部署开源堡垒机Orion-Visor——筑梦之路

git clone --depth1 https://github.com/dromara/orion-visorcd orion-visor docker compose pull# 配置,此处我保持默认cp .env.example .env# 启动进行数据库初始化docker compose up -d# 访问http://[ip]:8081进行登陆Adminer# 依次导入这些初始化sql orion-visor/sql/init-…

【大模型系列篇】DeepSeek开源周,解锁AI黑科技

🔥 Day1:FlashMLA —— GPU推理加速器 专为处理长短不一的AI推理请求而生,就像给Hopper GPU装上了智能导航,让数据在芯片上跑出3000GB/s的"磁悬浮"速度。✅ 已支持BF16格式|580万亿次浮点运算/秒FlashMLA G…

scala基础

Scala基础 scala基础Scala介绍第一个scala代码object和class的区别关键区别伴生类和伴生对象: 字节码解析在java中创建三个类 反编译代码编译User.class源码后的结果编译Emp.class源码后的结果 注释Scala类型推断&至简原则变量var和val之间的区别可变变量不可变…

智能家居遥控革命!昂瑞微HS6621EM:用「芯」定义AIoT时代的语音交互标杆

AIoT爆发期,遥控器为何成为智能家居的「隐形战场」? 随着Meta、苹果等巨头加速布局空间计算,智能家居生态正从「单一设备联网」向「全场景无感交互」跃迁。作为高频使用的入口设备,语音遥控器的性能直接决定用户体验天花板。昂瑞微…

绕过密码卸载360终端安全管理系统

一不小心在电脑上安装了360终端安全管理系统,就会发现没有密码,就无法退出无法卸载360,很容易成为一个心病,360终端安全管理系统,没有密码,进程无法退出,软件无法卸载,前不久听同事说…

MongoDB 笔记

一、基础概念 MongoDB 的特点是什么? MongoDB是一种NoSQL数据库,具有以下特点: 文档存储模型 MongoDB 使用 BSON(Binary JSON) 格式存储数据,数据以文档的形式组织,类似于JSON对象。文档可以包…

小程序Three Dof识别 实现景区AR体验

代码工程 GitCode - 全球开发者的开源社区,开源代码托管平台 dof

ABAP语言的动态程序

通过几个例子,由浅入深讲解 ABAP 动态编程。ABAP 动态编程主要通过 RTTS (Runtime Type Services) 来实现,包括 RTTI 和 RTTC: 运行时类型标识(RTTI) – 提供在运行时获取数据对象的类型定义的方法。运行时类型创建(R…

【安卓】BroadcastReceiver 动态声明为 RECEIVER_NOT_EXPORTED 后无法接收任何 Intent 的问题

一、问题起因 自 Android 14 (API 级别 34) 起,使用 context.registerReceiver(receiver, filter, flags) 动态注册广播接收器时,必须显式地声明 RECEIVER_NOT_EXPORTED 或 RECEIVER_EXPORTED 。 如果声明为 RECEIVER_EXPORTED ,任何第三方应…

unity pico开发二:创建基本的交互

文章目录 导入UnityXR Interaction ToolKit构建基础内容 导入UnityXR Interaction ToolKit 检查一下packagemanager,unityxr interactionToolkit是否自动导入 我们需要升级到一个不超过3.x的版本,因为pico还不支持3.x的内容 然后右侧samples里导入初始…

[STM32]从零开始的STM32 DEBUG问题讲解及解决办法

一、前言 最近也是重装了一次keil,想着也是重装了,也是去官网下载了一个5.41的最新版,在安装和配置编译器和别的版本keil都没太大的区别,但是在调试时,遇到问题了,在我Debug的System Viewer窗口中没有GPIO&…

学习threejs,使用ShaderMaterial自定义着色器材质

👨‍⚕️ 主页: gis分享者 👨‍⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍⚕️ 收录于专栏:threejs gis工程师 文章目录 一、🍀前言1.1 ☘️THREE.ShaderMaterial1.1.1…

查看ITHOR全部仿真家庭场景

1. 目标 按序号显示所有120个家庭场景统计单个场景里物体数量 2. 代码 import time from ai2thor.controller import Controller# 统计当前场景中的物体数量 def count_objects_in_scene(controller):objects controller.last_event.metadata["objects"]object_c…

ES6 特性全面解析与应用实践

1、let let 关键字用来声明变量,使用let 声明的变量有几个特点: 1) 不允许重复声明 2) 块儿级作用域 3) 不存在变量提升 4) 不影响作用域链 5) 暂时性死区 6)不与顶级对象挂钩 在代码块内,使用let命令声明变量之前&#x…

VSCode轻松调试运行C#控制台程序

1.背景 我一直都是用VS来开发C#项目的,用的比较顺手,也习惯了。看其他技术文章有介绍VS Code更轻量,更方便。所以我专门花时间来使用VS Code,看看它是如何调试代码、如何运行C#控制台。这篇文章是一个记录的过程。 2.操作 2.1 V…