Kafka入门,这一篇就够了(安装,topic,生产者,消费者)

目录

  • Kafka的安装
  • 文件与配置
    • 目录
      • bin
      • config
    • 配置文件
      • server.properties
      • producer.properties
      • consumer.properties
  • 命令行简单使用
    • kafka-topics.sh
      • 新增
      • 查看列表
      • 查看详情
      • 修改
      • 删除
    • kafka-console-producer.sh
    • kafka-console-consumer.sh
  • 概念
    • 集群
    • 代理broker
    • 主题topic
    • 分区partition
    • 偏移量offset
    • 生产者producer
    • 消费者组consumer group
    • 消费者consumer
  • FAQ
    • 如何保证一个主题下的数据,一定是有序的(生产与消费的顺序一致)?
    • 如何设置分区和消费者数?
  • 参考


Kafka的安装

前提,已安装docker和docker-compose。
拉取镜像

docker pull bitnami/zookeeper:latest
docker pull bitnami/kafka:latest

docker-compose.yaml如下

version: '3'
services:zookeeper:image: 'bitnami/zookeeper:latest'ports:- '2181:2181'environment:- ALLOW_ANONYMOUS_LOGIN=yeskafka:image: 'bitnami/kafka:latest'ports:- '9092:9092'environment:- KAFKA_BROKER_ID=1- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092- KAFKA_CFG_ADVERTISED_LISTENER=PLAINTEXT://127.0.0.1:9092- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181- ALLOW_PLAINTEXT_LISTENER=yesdepends_on:- zookeeper

启动命令

docker-compose up -d

截图
在这里插入图片描述
之后的相关命令若涉及容器id,请自行更换

文件与配置

目录

docker exec -it a0 ls /opt/bitnami/kafka

查看目录命令

截图
在这里插入图片描述
重要目录解释如下:

  • bin: 脚本目录
  • config:配置目录
  • libs:第三方依赖库目录
  • logs:日志

bin

重要的shell脚本加粗了,之后会用

connect-distributed.sh kafka-dump-log.sh kafka-storage.sh
connect-mirror-maker.sh kafka-features.sh kafka-streams-application-reset.sh
connect-standalone.sh kafka-get-offsets.sh kafka-topics.sh
kafka-acls.sh kafka-leader-election.sh kafka-transactions.sh
kafka-broker-api-versions.sh kafka-log-dirs.sh kafka-verifiable-consumer.sh
kafka-cluster.sh kafka-metadata-shell.sh kafka-verifiable-producer.sh
kafka-configs.sh kafka-mirror-maker.sh trogdor.sh
kafka-console-consumer.sh kafka-producer-perf-test.sh windows
kafka-console-producer.sh kafka-reassign-partitions.sh zookeeper-security-migration.sh
kafka-consumer-groups.sh kafka-replica-verification.sh zookeeper-server-start.sh
kafka-consumer-perf-test.sh kafka-run-class.sh zookeeper-server-stop.sh
kafka-delegation-tokens.sh kafka-server-start.sh zookeeper-shell.sh
kafka-delete-records.sh kafka-server-stop.sh

config

connect-console-sink.properties connect-mirror-maker.properties server.properties
connect-console-source.properties connect-standalone.properties tools-log4j.properties
connect-distributed.properties consumer.properties trogdor.conf
connect-file-sink.properties kraft zookeeper.properties
connect-file-source.properties log4j.properties
connect-log4j.properties producer.properties

配置文件

server.properties

  • broker.id: 唯一id值,通过环境变量设置为了1
  • log.dirs: kafka集群日志目录,默认是log.dirs=/bitnami/kafka/data
  • zookeeper.connect:zookeeper地址端口,格式域名/ip:port,这块是zookeeper:2181,在docker的网络中可以解析为另一容器的ip

更多配置可以查看参考中Dockerhub链接的Configuration部分
在这里插入图片描述

producer.properties

  • bootstrap.servers:kafka的ip:port,这里是localhost:9092
  • compression.type:压缩类型,默认是none, 一共有四种,none, gzip, snappy, lz4, zstd,推荐排序LZ4 > GZIP > Snappy,详见腾讯云压缩算法对比

consumer.properties

  • group.id:消费者组id,默认为test-consumer-group
  • auto.offset.reset:offset设置,三种latest, earliest, none,看情况设置

命令行简单使用

kafka-topics.sh

对主题topic进行增删改查的工具
在这里插入图片描述

常用选项如下:

  • --bootstrap-server:kafka服务器ip:port,必须的
  • --create:创建主题
  • --delete:删除主题
  • --describe:描述主题
  • --list:查看主题列表
  • --alter:修改主题的 partitions等
  • --topic <String: topic>:主题名
  • --topic-id <String: topic-id>:主题id
  • --partitions <Integer: # of partitions>:主题的partition

新增

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --create --topic lady_killer9

截图
在这里插入图片描述

查看列表

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --list

截图
在这里插入图片描述

查看详情

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --describe --topic lady_killer9

截图
在这里插入图片描述

修改

命令
以修改主题partiion数量为例

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --alter --topic lady_killer9 --partitions 3

截图
在这里插入图片描述

删除

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --delete --topic lady_killer9

截图
在这里插入图片描述

kafka-console-producer.sh

标准输入读数据,发送到Kafka的工具
在这里插入图片描述
常用选项如下:

  • --bootstrap-server:kafka服务器ip:port,必须的
  • --topic <String: topic> :Kafka主题,必须的
  • --sync:同步发送
  • --compression-codec [String: compression-codec] :压缩方式,‘none’,‘gzip’, ‘snappy’, ‘lz4’, , ‘zstd’,默认gzip.

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --create --topic demo
docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-producer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo

截图
在这里插入图片描述

kafka-console-consumer.sh

在这里插入图片描述
常用选项如下:

  • --bootstrap-server:kafka服务器ip:port,必须的
  • --topic <String: topic> :Kafka主题,必须的
  • --group <String: consumer group id>:消费者组id
  • --key-deserializer <String: deserializer for key>:key反序列化,默认是org.apache.kafka.common.serialization.StringDeserializer
  • --value-deserializer <String: deserializer for values>:value反序列化,默认是org.apache.kafka.common.serialization.StringDeserializer
  • --offset <String: consume offset>:消费的offset
  • --partition <Integer: partition>:消费的分区

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-consumer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo

截图
在这里插入图片描述
命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-consumer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo --partition 0 --offset 2

截图
在这里插入图片描述
上手之后我们再来了解一些概念。

概念

集群

已发布的消息保存在一组服务器中,称为Kafka集群。

代理broker

集群中的每一个服务器都是一个代理。

主题topic

每条发布到kafka集群的消息都有一个主题,这个主题被称为topic。每个topic都由一个或者多个分区构成。

分区partition

topic的partition数量可以在创建时配置,partition数量决定了每个消费者组中并发消费者的最大数量

分区的原则:

  • 生产者指定了partition,则直接使用
  • 未指定partition但指定了key,通过对key的value进行hash出一个partition
  • partition和key都未指定,使用轮询选出一个partition

偏移量offset

任何发布到partition的消息都会被直接追加到partition尾部,每条消息的位置称为offset,offset是一个long型数字,它唯一标记一条消息。消费者可以通过(topic、partition、offset)跟踪记录。

生产者producer

push消息到topc的叫生产者,push后可以获得offset。生产者可以指定partition,但不建议这么做。

消费者组consumer group

包含多个消费者,有一个 group id,可以订阅topic进行消费。消费偏移以消费者组为单位。

消费者consumer

从topic中pull数据,可以指定partition和offset。

FAQ

如何保证一个主题下的数据,一定是有序的(生产与消费的顺序一致)?

Kafka每个partition中的消息在写入是都是有序的,消费时,每个partition只能被每一个group中的消费者消费,因此,topic下只有一个partition时一定有序。

如何设置分区和消费者数?

建议分区数与消费者数一致,防止消费不过来。

参考

dockerhub-bitnami/kafka
腾讯云CKafka 压缩算法对比
python-kafka客户端封装

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/127840.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

android 车载widget小部件部分详细源码实战开发-千里马车载车机framework开发实战课程

官网参考链接&#xff1a;https://developer.android.google.cn/develop/ui/views/appwidgets/overview 1、什么是小部件 App widgets are miniature application views that can be embedded in other applications (such as the home screen) and receive periodic updates…

什么是Linux

什么是Linux&#xff1f; 不知道大家是什么时候开始接触Linux&#xff0c;我记得我是大三的时候&#xff0c;那时候通过国嵌、韦东山的教学视频&#xff0c;跟着搭bootloader&#xff0c;修改内核&#xff0c;制作根文件系统&#xff0c;一步步&#xff0c;视频真的很简单&…

GRU门控循环单元

GRU 视频链接 https://www.bilibili.com/video/BV1Pk4y177Xg?p23&spm_id_frompageDriver&vd_source3b42b36e44d271f58e90f86679d77db7Zt—更新门 Rt—重置门 控制保存之前一层信息多&#xff0c;还是保留当前神经元得到的隐藏层的信息多。 Bi-GRU GRU比LSTM参数少 …

大数据Flink(七十四):SQL的滑动窗口(HOP)

文章目录 SQL的滑动窗口(HOP) SQL的滑动窗口(HOP) 滑动窗口定义:滑动窗口也是将元素指定给固定长度的窗口。与滚动窗口功能一样,也有窗口大小的概念。不一样的地方在于,滑动窗口有另一个参数控制窗口计算的频率(滑动窗口滑动的步长)。因此,如果滑动的步长小于窗口大…

10.Xaml ListBox控件

1.运行界面 2.运行源码 a.Xaml 源码 <Grid Name="Grid1"><!--IsSelected="True" 表示选中--><ListBox x:Name="listBo

0003号因子测试结果、代码和数据

这篇文章共分为四个部分:第一个部分是因子测试结果,第二个部分是因子逻辑,第三个部分是因子代码,第四个部分是整个因子测试用的数据、代码、分析结果的下载地址。 因子测试结果: 因子描述 因子属性-量价因子因子构建:计算成交量的变化率和日振幅率,计算两者在过去一定…

LASSO回归

LASSO回归 LASSO(Least Absolute Shrinkage and Selection Operator&#xff0c;最小绝对值收敛和选择算子算法)是一种回归分析技术&#xff0c;用于变量选择和正则化。它由Robert Tibshirani于1996年提出&#xff0c;作为传统最小二乘回归方法的替代品。 损失函数 1.线性回…

MySQL学习5:事务、存储引擎

事务 简介 事务是一组数据库操作的执行单元&#xff0c;它要么完全执行&#xff0c;要么完全不执行。事务是确保数据库中的数据一致性和完整性的重要机制之一。 事务具有以下四个特性&#xff08;称为ACID特性&#xff09;&#xff1a; 原子性&#xff08;Atomicity&#xf…

将 ordinals 与 比特币智能合约集成 : 第 1 部分

将序数与比特币智能合约集成&#xff1a;第 1 部分 最近&#xff0c;比特币序数在区块链领域引起了广泛关注。 据称&#xff0c;与以太坊 ERC-721 等其他代币标准相比&#xff0c;Ordinals 的一个主要缺点是缺乏对智能合约的支持。 我们展示了如何向 Ordinals 添加智能合约功…

Spring Boot 中使用 Poi-tl 渲染数据并生成 Word 文档

本文 Demo 已收录到 demo-for-all-in-java 项目中&#xff0c;欢迎大家 star 支持&#xff01;后续将持续更新&#xff01; 前言 产品经理急冲冲地走了过来。「现在需要将按这些数据生成一个 Word 报告文档&#xff0c;你来安排下」 项目中有这么一个需求&#xff0c;需要将用户…

MySQL——主从复制

简介 在实际的生产中&#xff0c;为了解决Mysql的单点故障已经提高MySQL的整体服务性能&#xff0c;一般都会采用「主从复制」。 主从复制开始前有个前提条件&#xff1a;两边的数据要一样&#xff0c;主必须开启二进制日志 dump thread 线程 基于位置点从是否需要开启二进…

Codeforces Round 895 (Div. 3)

Codeforces Round 895 (Div. 3) A. Two Vessels 思路&#xff1a; 我们可以发现当在 a 拿 c 到 b 其实可以让他们差值减少 2c&#xff0c;所以对a和b的差值除以2c向上取整即可 #include<bits/stdc.h> using namespace std; #define int long long #define rep(i,a,n) …

烟感报警器单片机方案开发,解决方案

烟感报警器也叫做烟雾报警器。烟感报警器适用于火灾发生时有大量烟雾&#xff0c;而正常情况下无烟的场所。例如写字楼、医院、学校、博物馆等场所。烟感报警器一般安装于所需要保护或探测区域的天花板上&#xff0c;因火灾中烟雾比空气轻&#xff0c;更容易向上飘散&#xff0…

如何利用 Selenium 对已打开的浏览器进行爬虫

大家好&#xff01; 在对某些网站进行爬虫时&#xff0c;如果该网站做了限制&#xff0c;必须完成登录才能展示数据&#xff0c;而且只能通过短信验证码才能登录 这时候&#xff0c;我们可以通过一个已经开启的浏览器完成登录&#xff0c;然后利用程序继续操作这个浏览器&…

Redis常见命令

命令可以查看的文档 http://doc.redisfans.com/ https://redis.io/commands/ 官方文档&#xff08;英文&#xff09; http://www.redis.cn/commands.html 中文 https://redis.com.cn/commands.html 个人推荐这个 https://try.redis.io/ redis命令在线测试工具 https://githubfa…

《向量数据库》——向量数据库Milvus 和大模型出联名款AI原生Milvus Cloud

大模型技术的发展正加速对千行百业的改革和重塑,向量数据库作为大模型的海量记忆体、云计算作为大模型的大算力平台,是大模型走向行业的基石。而电商行业因其高度的数字化程度,成为打磨大模型的绝佳“战场”。 在此背景下,Zilliz 联合亚马逊云科技举办的【向量数据库 X 云计…

在FPGA上快速搭建以太网

在本文中&#xff0c;我们将介绍如何在FPGA上快速搭建以太网 &#xff08;LWIP &#xff09;。为此&#xff0c;我们将使用 MicroBlaze 作为主 CPU 运行其应用程序。 LWIP 是使用裸机设计以太网的良好起点&#xff0c;在此基础上我们可以轻松调整软件应用程序以提供更详细的应用…

Chrome 108版(64-bit 108.0.5359.125)网盘下载

还在用Selenium的朋友们注意了&#xff0c;目前Chrome的最新版是116&#xff0c;而官方的Chromedriver只支持到115版。 可惜Google不提供旧版Chrome的下载方式&#xff0c;需要旧版的很难回去了。如果真的想要旧版的Chrome&#xff0c;只能民间自救。 我在2022年12月备份了C盘…

线性代数的学习和整理21,向量的模,矩阵的模,矩阵的模和行列式比较(未完成)

目录 1 模的定义 2 向量的模是距离 2.1 向量的模的定义 2.2 向量的模的计算公式 3 矩阵的模 3.1 矩阵/向量组的模的定义 3.2 矩阵的模的公式 4 矩阵的模和行列式的关系&#xff1f; 1 模的定义 模&#xff0c;又称为范数。范数&#xff0c;是具有“长度”概念的函数。…

机器人任务挖掘与智能超级自动化技术解析

本文为上海财经大学教授、安徽财经大学学术副校长何贤杰出席“会计科技Acctech应对不确定性挑战”高峰论坛时的演讲内容整理。何贤杰详细介绍了机器人任务挖掘与智能超级自动化技术的发展背景、关键技术和应用场景。 从本质来说&#xff0c;会计是非常适合智能化、自动化的。会…