使用Linux部署Kafka教程

目录

一、部署Zookeeper

1 拉取Zookeeper镜像

2 运行Zookeeper

二、部署Kafka

1 拉取Kafka镜像

2 运行Kafka

三、验证是否部署成功

1 进入到kafka容器中

2 创建topic 生产者

3 生产者发送消息

4 消费者消费消息

四、搭建kafka管理平台

五、SpringBoot整合Kafka 

1、导入依赖

2、修改配置

3、生产者

 4、消费者

5、测试发送消息

 6、测试收到消息


一、部署Zookeeper

1 拉取Zookeeper镜像

docker pull wurstmeister/zookeeper
  • 1

2 运行Zookeeper

docker run --restart=always --name zookeeper \
--log-driver json-file \
--log-opt max-size=100m \
--log-opt max-file=2  \
-p 2181:2181 \
-v /etc/localtime:/etc/localtime \
-d wurstmeister/zookeeper

二、部署Kafka

1 拉取Kafka镜像

docker pull wurstmeister/kafka

2 运行Kafka

docker run --restart=always --name kafka \
--log-driver json-file \
--log-opt max-size=100m \
--log-opt max-file=2 \-p 9092:9092 \-e KAFKA_BROKER_ID=0 \-e KAFKA_ZOOKEEPER_CONNECT=192.168.8.102:2181 \-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.8.102:9092 \-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \-v /etc/localtime:/etc/localtime \-d wurstmeister/kafka

参数说明:
-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
-e KAFKA_ZOOKEEPER_CONNECT=172.16.0.13:2181/kafka 配置zookeeper管理kafka的路径172.16.0.13:2181/kafka
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.0.13:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口
-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间

三、验证是否部署成功

1 进入到kafka容器中

docker exec -it kafka /bin/sh

2 创建topic 生产者

cd opt/kafka_2.13-2.8.1bin/kafka-topics.sh --create --zookeeper 192.168.8.102:2181 --replication-factor 1 --partitions 1 --topic partopic

在这里插入图片描述

3 生产者发送消息

bin/kafka-console-producer.sh --broker-list 192.168.8.102:9092 --topic partopic

在这里插入图片描述

4 消费者消费消息

  • 新打开个ssh窗口
  • 跟前面步骤一样进入到容器
bin/kafka-console-consumer.sh --bootstrap-server 192.168.8.102:9092 --topic partopic --from-beginning

在这里插入图片描述

四、搭建kafka管理平台

 docker search kafdrop

docker run -d --rm  -p 9000:9000 \-e JVM_OPTS="-Xms32M -Xmx64M" \-e KAFKA_BROKERCONNECT=<host:port,host:port> \-e SERVER_SERVLET_CONTEXTPATH="/" \obsidiandynamics/kafdrop<host:port,host:port> 为 外网集群地址 多个用逗号分隔 例如xxx.xxx.xxx.xxx:9092,yyy.yyy.yyy.yyy:9092 尖角号不留上面的命令是百度的以下是我自己尝试的
docker run -d --name kafdrop -p 9001:9001 \-e JVM_OPTS="-Xms32M -Xmx64M -Dserver.port=9001" \-e KAFKA_BROKERCONNECT=192.168.58.130:9092 \-e SERVER_SERVLET_CONTEXTPATH="/" \obsidiandynamics/kafdrop因为我docker启动了其他东西占用了9001端口,而这个kafdrop其实就是一个springboot项目,以jar命令的形式启动

访问地址:Kafdrop: Broker List 

五、SpringBoot整合Kafka 

1、导入依赖

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2、修改配置

spring:kafka:bootstrap-servers: 192.168.58.130:9092 #部署linux的kafka的ip地址和端口号producer:# 发生错误后,消息重发的次数。retries: 1#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1consumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 在侦听器容器中运行的线程数。concurrency: 5#listner负责ack,每调用一次,就立即commitack-mode: manual_immediatemissing-topics-fatal: false

本次测试:linux地址:192.168.58.130

spring.kafka.bootstrap-servers=192.168.58.130:9092

advertised.listeners=192.168.58.130:9092

3、生产者

import com.alibaba.fastjson.JSON;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;/*** 事件的生产者*/
@Slf4j
@Component
public class KafkaProducer {@Autowiredpublic KafkaTemplate kafkaTemplate;/** 主题 */public static final String TOPIC_TEST = "Test";/** 消费者组 */public static final String TOPIC_GROUP = "test-consumer-group";public void send(Object obj){String obj2String = JSON.toJSONString(obj);log.info("准备发送消息为:{}",obj2String);//发送消息ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);//回调future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable ex) {//发送失败的处理log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + ex.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {//成功的处理log.info(TOPIC_TEST + " - 生产者 发送消息成功:" + result.toString());}});}}

 4、消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.Optional;/*** 事件消费者*/
@Component
public class KafkaConsumer {private Logger logger = LoggerFactory.getLogger(org.apache.kafka.clients.consumer.KafkaConsumer.class);@KafkaListener(topics = KafkaProducer.TOPIC_TEST,groupId = KafkaProducer.TOPIC_GROUP)public void topicTest(ConsumerRecord<?,?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){Optional<?> message = Optional.ofNullable(record.value());if (message.isPresent()) {Object msg = message.get();logger.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg);ack.acknowledge();}}
}

5、测试发送消息

@Testvoid kafkaTest(){kafkaProducer.send("Hello Kafka");}

 6、测试收到消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/112268.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

残差网络、Dropout正则化、Batch Normalization浅了解

残差网络&#xff1a; 为什么需要残差网络&#xff1a; 残差网络的目的是为了解决深度神经网络在训练过程中遇到的退化问题&#xff0c;即随着网络层数的增加&#xff0c;训练集的误差反而增大&#xff0c;而不是过拟合。残差网络的优点有以下几点&#xff1a; 残差网络可以…

mysql 间隙锁原理深度详解

目录 一、前言 二、mysql之mvcc 2.1 什么是mvcc 2.2 mvcc组成 2.2.1 Undo log 多版本链 2.2.2 ReadView 2.2.3 快照读与当前读 三、RR级别下的事务问题 3.1 RR隔离级别解决的问题 3.1.1 幻读问题 3.2 幻读效果演示 3.2.1 准备测试表和数据 3.2.2 修改事务级别 3.…

生态项目|Typus如何用Sui特性制作动态NFT为DeFi赋能

对于许多人来说&#xff0c;可能因其涉及的期权、认购和价差在内的DeFi而显得晦涩难懂&#xff0c;但Typus Finance找到了一种通过动态NFT使体验更加丰富的方式。Typus NFT系列的Tails为用户带来一个外观逐渐演变并在平台上提升活动水平时获得新特权的角色。 Typus表示&#x…

ES 7.6 - JAVA应用基础操作篇

ES 7.6 - JAVA应用基础操作篇 环境准备依赖配置 实体类准备使用说明索引/映射操作创建索引和映射索引和映射相关查询删除索引 文档操作插入数据更新数据删除数据批量操作 文档查询根据ID查询根据字段精准查询根据字段分词查询控制返回字段范围查询组合查询排序分页高亮搜索聚合…

图像颜色空间转换

目录 1.图像颜色空间介绍 RGB 颜色空间 2.HSV 颜色空间 3.RGBA 颜色空间 2.图像数据类型间的互相转换convertTo() 3.不同颜色空间互相转换cvtColor() 4.Android JNI demo 1.图像颜色空间介绍 RGB 颜色空间 RGB 颜色空间是最常见的颜色表示方式之一&#xff0c;其中 R、…

【MySql】mysql之基础语句

一、常用的数据类型 类型解释举例int整型用于定义整数类型的数据&#xff08;1、2、3、4、5…&#xff09;float单精度浮点&#xff08;4字节32位&#xff09;准确表示小数点后六位double双精度浮点&#xff08;8字节64位&#xff09;小数位更多&#xff0c;更精确char固定长度…

Go语言基础之指针

区别于C/C中的指针&#xff0c;Go语言中的指针不能进行偏移和运算&#xff0c;是安全指针。 要搞明白Go语言中的指针需要先知道3个概念&#xff1a;指针地址、指针类型和指针取值。 Go语言中的指针 任何程序数据载入内存后&#xff0c;在内存都有他们的地址&#xff0c;这就…

微信小程序使用stomp.js实现STOMP传输协议的实时聊天

简介&#xff1a; uniapp开发的小程序中使用 本来使用websocket&#xff0c;后端同事使用了stomp协议&#xff0c;导致前端也需要对应修改。 如何使用 在static/js中新建stomp.js和websocket.js&#xff0c;然后在需要使用的页面引入监听代码发送代码即可 代码如下&#x…

腾讯云-对象存储服务(COS)的使用总结

简介 对象存储&#xff08;Cloud Object Storage&#xff0c;COS&#xff09;是腾讯云提供的一种存储海量文件的分布式存储服务&#xff0c;具有高扩展性、低成本、可靠安全等优点。通过控制台、API、SDK 和工具等多样化方式&#xff0c;用户可简单、快速地接入 COS&#xff0…

论文笔记:从不平衡数据流中学习的综述: 分类、挑战、实证研究和可重复的实验框架

0 摘要 论文&#xff1a;A survey on learning from imbalanced data streams: taxonomy, challenges, empirical study, and reproducible experimental framework 发表&#xff1a;2023年发表在Machine Learning上。 源代码&#xff1a;https://github.com/canoalberto/imba…

Python Qt(七)Listview

源代码&#xff1a; # -*- coding: utf-8 -*-# Form implementation generated from reading ui file qt_listview.ui # # Created by: PyQt5 UI code generator 5.15.9 # # WARNING: Any manual changes made to this file will be lost when pyuic5 is # run again. Do not…

Docker之私有仓库 RegistryHarbor

目录 一、Docker私有仓库&#xff08;Registry&#xff09; 1.1 Registry的介绍 二、搭建本地私有仓库 2.1首先下载 registry 镜像 2.2在 daemon.json 文件中添加私有镜像仓库地址 2.3运行 registry 容器 2.4Docker容器的重启策略 2.5为镜像打标签 2.6上传到私有仓库 2…

jvm的内存区域

JVM 内存分为线程私有区和线程共享区&#xff0c;其中方法区和堆是线程共享区&#xff0c;虚拟机栈、本地方法栈和程序计数器是线程隔离的数据区。 1&#xff09;程序计数器 程序计数器&#xff08;Program Counter Register&#xff09;也被称为 PC 寄存器&#xff0c;是一块…

基于Echarts的大数据可视化模板:大数据医疗服务平台

目录 引言大数据在医疗领域的应用ECharts在医疗服务中的作用医疗大数据的应用方向临床决策支持药物研发与安全性监测健康管理与预防流行病监测与公共卫生基因组学与个性化医疗医疗保险与费用管理Echarts与大数据可视化Echarts库以及其在大数据可视化领域的应用优势开发过程和所…

只考一门数据结构,计算机学硕复录比1:1的山东双非学校考情分析

青岛理工大学 考研难度&#xff08;☆&#xff09; 内容&#xff1a;23考情概况&#xff08;拟录取和复试分析&#xff09;、院校概况、23专业目录、23复试详情、各专业考情分析、各科目考情分析。 正文1420字&#xff0c;预计阅读&#xff1a;3分钟 2023考情概况 青岛理工…

redis缓存雪崩、穿透、击穿解决方案

redis缓存雪崩、穿透、击穿解决方案 背景缓存雪崩缓存击穿缓存穿透总结背景 关于缓存异常,我们常见的有三个问题:缓存雪崩、缓存击穿、缓存穿透。这三个问题一旦发生,会导致大量请求直接落到数据库层面。如果请求的并发量很大,会影响数据库的运行,严重的会导致数据库宕机…

openGauss学习笔记-54 openGauss 高级特性-MOT

文章目录 openGauss学习笔记-54 openGauss 高级特性-MOT54.1 MOT特性及价值54.2 MOT关键技术54.3 MOT应用场景54.4 不支持的数据类型54.5 使用MOT54.6 将磁盘表转换为MOT openGauss学习笔记-54 openGauss 高级特性-MOT openGauss引入了MOT&#xff08;Memory-Optimized Table&…

如何高效地设计测试用例并评审

编写出好的测试用例是每一个测试工程师的职责&#xff0c;但在实际工作中大家写的测试用例往往需要不断地修改才能使用&#xff0c;这不仅浪费了时间&#xff0c;还容易让测试工程师产生自我否定的情绪&#xff0c;甚至在团队中产生各种矛盾。 那如何高效地设计测试用例呢&…

可直接运营的餐饮外卖点餐自提单多门店小程序开发演示

适合鲜花店、蛋糕店、奶茶店、餐饮店、便利店等门店商家的小程序。 小程序系统支持外卖和自提两种模式&#xff0c;帮助商家打造自己的私域流量池&#xff0c;减少对美团和饿了么的依赖&#xff0c;提升用户点餐、就餐体验。 支持会员签到获取积分的功能&#xff0c;积分可用…

jwt安全问题

文章目录 jwt安全问题jwt简介jwt组成headerpayloadsignature 潜在漏洞空加密算法web346 密钥爆破web348 敏感信息泄露web349 **修改算法RS256为HS256**web350 jwt安全问题 jwt简介 JWT的全称是Json Web Token&#xff0c;遵循JSON格式&#xff0c;跨域认证解决方案&#xff0…