Kafka的配置和使用

目录

1.服务器用docker安装kafka

2.springboot集成kafka实现生产者和消费者


1.服务器用docker安装kafka

        ①、安装docker(docker类似于linux的软件商店,下载所有应用都能从docker去下载)

                a、自动安装 

curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun

                b、启动docker

sudo systemctl start docker

                c、 通过运行hello-world镜像来验证是否正确安装了Docker Engine-Community。

// 拉取镜像

sudo docker pull hello-world

// 执行

hello-world sudo docker run hello-world

                 d、安装成功

         ②、zookeeper

                a、docker search zookeeper

                b、docker pull zookeeper

        ③、安装kafka

                a、docker search kafka

                b、docker pull wurstmeister/kafka

        ④、运行zookeeper

                a、docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper

        ⑤、运行kafka

                a、 docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=42.194.238.131:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://42.194.238.131:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka

                b、参数说明

参数说明:

-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己

-e KAFKA_ZOOKEEPER_CONNECT=172.21.10.10:2181/kafka 配置zookeeper管理kafka的路径172.21.10.10:2181/kafka

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.21.10.10:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口

-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间

        ⑥、检验kafka是否可以使用

docker exec -it kafka bash

cd /opt/kafka_2.13-2.8.1/

cd bin

                a、运行kafka生产者并发送消息

./kafka-console-producer.sh --broker-list localhost:9092 --topic test

                b、在开一个页面,运行kafka消费者发送消息

 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

        ⑦、结果是这个样子的

 

         ⑧、每条消息都有一个主题,消费者指定监听哪个主题的消息,如果进来消息队列的是我们指定监听的主题,就消费,否则不消费(topic这里指定的生产和消费的主题)

        ⑨、消费者宕掉了,生产者接着发,消息不会丢,消费者重启之后会重新接收到宕机之后发的所有消息

2.springboot集成kafka实现生产者和消费者

        ①、在pom中创建依赖

<dependency>

        <groupId>org.springframework.kafka</groupId>

        <artifactId>spring-kafka</artifactId>

         <version>2.7.8</version>

</dependency>

        ②、配置kafka

                a、在 application.yml 文件中添加以下配置:(注:yml中两个相同名字的会报错,比如两个spring)

spring:

         kafka:

                #自己的kafka所在的ip地址和端口号

                 bootstrap-servers: localhost:9092

                 consumer:

                 #一个group-id代表一个消费组,一个消息可以被几个消费组消费

                    group-id: my-group

                    auto-offset-reset: earliest

                producer: #序列化

                    value-serializer: org.apache.kafka.common.serialization.StringSerializer

                    key-serializer: org.apache.kafka.common.serialization.StringSerializer

        b、创建一个生产者

@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}}

sendMessage 方法,用于发送消息到 Kafka。

@RestController
public class KafkaController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@PostMapping("/send")public void sendMessage(@RequestBody String message) {kafkaTemplate.send("my-topic", message);}}

        c、 创建一个消费者

@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}}

@KafkaListener 注解声明了一个消费者方法,用于接收从 

my-topic 主题中读取的消息

@Service
public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group-id")public void consume(String message) {System.out.println("Received message: " + message);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/75470.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DeepVO 论文阅读

论文信息 题目&#xff1a;DeepVO Towards End-to-End Visual Odometry with Deep Recurrent Convolutional Neural Networks 作者&#xff1a;Sen Wang, Ronald Clark, Hongkai Wen and Niki Trigoni 代码地址&#xff1a;http://senwang.gitlab.io/DeepVO/ (原作者并没有开源…

C语言文件操作

&#x1f389;个人名片&#xff1a; &#x1f43c;作者简介&#xff1a;一名乐于分享在学习道路上收获的大二在校生 &#x1f43b;‍❄个人主页&#x1f389;&#xff1a;GOTXX &#x1f43c;个人WeChat&#xff1a;ILXOXVJE &#x1f43c;本文由GOTXX原创&#xff0c;首发CSDN…

机器学习入门之 pandas

pandas 有三种数据结构 一种是 Series 一种是 Dataframe import pandas as pd import numpy as np score np.random.randint(0,100,[10,5])score[0,0] 100Datascore pd.DataFrame(score)subject ["语文","数学","英语","物理&quo…

Oracle设置某个表字段递增

当Oracle设置字段递增创建触发器 先建一个序列&#xff0c;打开PLSQL 找到Sequences&#xff0c;右击新建 根据自己的需要填写 然后添加触发器&#xff0c;点新建-程序窗口-空白 --TEST_ID为触发器的名字&#xff0c;TEST是添加触发器的表名 CREATE OR REPLACE TRIGGER &qu…

Word中如何断开表格中线段

Word中如何断开表格中线段_word表格断线怎么弄_仰望星空_LiDAR的博客-CSDN博客有时候为了美观&#xff0c;需要实现如下的效果&#xff0c;即第2条线段被断开成3段步骤如下&#xff1a;选中需要断开的格网&#xff0c;如下&#xff0c;再选择段落、针对下框标即可。_word表格断…

iOS - 解压ipa包中的Assert.car文件

项目在 Archive 打包后&#xff0c;生成ipa包 将 xxx.ipa文件修改为zip后缀即 xxx.zip &#xff0c;然后再双击解压&#xff0c;会生成一个 Payload 文件夹&#xff0c;里面一个文件 如下图&#xff1a; 然后显示改文件的包内容&#xff1a; 解压 Assets.car 文件的方式&…

Redis 安装以及配置隧道连接

目录 1.CentOS 1. 安装Redis 2. Redis 启动和停⽌ 3. 操作Redis 2.Ubuntu 1. 安装Redis 2. Redis 启动/停⽌ 3. 操作 Redis 3.开启隧道 3.1 Xshell 配置隧道 3.2 windTerm 配置隧道 3.3 FinalShell配置隧道 4.可视化客户端连接 Another Redis Desktop Manager 1.Cen…

【图像去噪】基于原始对偶算法优化的TV-L1模型进行图像去噪研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

Flutter Flar动画实战

在Flare动面出现之前,Flare动画大体可以分为使用AnimationController控制的基础动画以及使用Hero的转场动画,如果遇到一些复杂的场景,使用这些动画方案实现起来还是有难度的。不过,随着Flutter开始支持Flare矢量动面,Flutter的动画开发也变得越来越简单。事实上,Flare动画…

django使用mysql数据库

Django开 发操作数据库比使用pymysql操作更简单&#xff0c;内部提供了ORM框架。 下面是pymysql 和orm操作数据库的示意图&#xff0c;pymysql就是mysql的驱动&#xff0c;代码直接操作pymysql ,需要自己写增删改查的语句 django 就是也可以使用pymysql、mysqlclient作为驱动&a…

常用SQL语句总结

SQL语句 文章目录 SQL语句1 SQL语句简介2 DQL&#xff08;数据查询语句&#xff09;3 DML&#xff08;数据操纵语句&#xff09;4 DDL&#xff08;数据定义语句&#xff09;5 DCL&#xff08;数据控制语句&#xff09;6 TCL&#xff08;事务控制语句&#xff09; 1 SQL语句简介…

DP-GAN-生成器代码

首先看一下数据生成&#xff1a; 在预处理阶段会将label经过ont-hot编码转换为35个通道&#xff0c;即每个通道都是由&#xff08;0,1&#xff09;组成。 在train文件中&#xff0c;对生成器和判别器分别进行更新&#xff0c;根据loss的不同&#xff0c;分别计算对于的损失&a…

“ARTS挑战:探索技术,分享思考“

文章目录 前言一、学习的内容二、遇到的困难及解决办法三、学习打卡成果展示四、学习技巧的总结五、未来学习打卡计划后记 关于 ARTS 的释义 ● Algorithm: 每周至少做一个 LeetCode 的算法题 ● Review: 阅读并点评至少一篇英文技术文章 ● Tips: 学习至少一个技术技巧 ● Sha…

Linux(三):Linux服务器下日常实操命令 (常年更新)

基础命令 cd命令&#xff1a;切换目录 cd &#xff1a;切换当前目录百至其它目录&#xff0c;比如进入/etc目录&#xff0c;则执行 cd /etccd / &#xff1a;在Linux 系统中斜杠“/”表示的是根目录。cd / ,即进入根目录.cd ~&#xff1a;进入用户在该系统的home目录&#…

【eNSP】静态路由

【eNSP】静态路由 原理网关路由表 实验根据图片连接模块配置路由器设备R1R2R3R4 配置PC的IP地址、掩码、网关PC1PC2PC3 配置静态路由查看路由表R1R2R3R4测试能否通信 原理 网关 网关与路由器地址相同&#xff0c;一般路由地址为.1或.254。 网关是当电脑发送的数据的目标IP不在…

mac录屏怎么打开?很简单,让我来教你!

mac电脑作为一款广受欢迎的电脑系统&#xff0c;提供了多种方式来满足用户录屏的需求。无论您是要录制教学视频、制作演示文稿&#xff0c;还是记录游戏精彩瞬间&#xff0c;mac电脑都能帮助您实现这些目标。本文将为您介绍两种mac录屏的方法。通过本文的指导&#xff0c;您将能…

数据包在网络中传输的过程

ref: 【先把这个视频看完了】&#xff1a;数据包的传输过程【网络常识10】_哔哩哔哩_bilibili 常识都看看 》Ref&#xff1a; 1. 这个写的嘎嘎好&#xff0c;解释了为啥4层7层5层&#xff0c;还有数据包封装的问题:数据包在网络中的传输过程详解_数据包传输_张孟浩_jay的博客…

CSS调色网有哪些

本文章转载于湖南五车教育&#xff0c;仅用于学习和讨论&#xff0c;如有侵权请联系 1、https://webgradients.com/ Wbgradients 是一个在线调整渐变色的网站 &#xff0c;可以根据你想要的调整效果&#xff0c;同时支持复制 CSS 代码&#xff0c;可以更好的与开发对接。 Wbg…

LeetCode--剑指Offer75(2)

目录 题目描述&#xff1a;剑指 Offer 58 - II. 左旋转字符串&#xff08;简单&#xff09;题目接口解题思路1代码解题思路2代码 PS: 题目描述&#xff1a;剑指 Offer 58 - II. 左旋转字符串&#xff08;简单&#xff09; 字符串的左旋转操作是把字符串前面的若干个字符转移到…

智慧防汛,数字科技的力量

随着夏日的脚步临近&#xff0c;台风季节即将降临。对于那些居住在沿海地区的人们来说&#xff0c;台风是一种常见的自然灾害&#xff0c;其带来的风雨可能对生命和财产造成严重威胁。然而&#xff0c;随着数字科技的飞速发展&#xff0c;可视化技术为防汛抗台工作带来了全新的…