docker安装和使用kafka

1. 启动zookeeper

Kafka依赖zookeeper, 首先安装zookeeper
-p:设置映射端口(默认2181

docker run --name zookeeper \--network app-tier \-e ALLOW_ANONYMOUS_LOGIN=yes \--restart=always \-d bitnami/zookeeper:latest

2. 启动kafka

docker run --name kafka \--network app-tier \-p 9092:9092 \-e ALLOW_PLAINTEXT_LISTENER=yes \-e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092	 \--restart=always \-d bitnami/kafka:latest
命令解释
ALLOW_PLAINTEXT_LISTENER=yes任何人可以访问
KAFKA_CFG_ZOOKEEPER_CONNECTzookeeper地址
KAFKA_CFG_ADVERTISED_LISTENERS当前kafka安装的主机地址 如果是服务器部署则配服务器IP或域名否则客户端监听消息会报地址错误

2. 启动kafka-map管理工具

docker run --name kafka-map \--network app-tier \-p 9001:8080 \-v /usr/local/kafka-map/data:/usr/local/kafka-map/data \-e DEFAULT_USERNAME=admin \-e DEFAULT_PASSWORD=admin \--restart=always \-d dushixiang/kafka-map:latest

启动成功后, 访问客户端: http://localhost:9001
账户: admin
密码: admin

在这里插入图片描述

3. springboot集成kafka

pom.xml配置

    <dependencies><!--kafka依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies>        

配置application.yml

#------------------------------------spring----------------------------------
spring:#------------------------------------消息队列kafka配置----------------------------------kafka:#  kafka server的地址,如果有多个,使用逗号分割bootstrap-servers: localhost:9092producer:# 发生错误后,消息重发的次数。retries: 1#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。32MB的批处理缓冲区buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1properties:# 自定义拦截器interceptor.classes: com.wms.message.kafka.interceptor.CustomProducerInterceptor#自定义分区器partitioner.classes: com.wms.message.kafka.interceptor.CustomPartitionerconsumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:# 自定义消费者拦截器interceptor.classes: com.wms.message.kafka.interceptor.CustomConsumerInterceptor# 默认消费者组group-id: code-safe-group# 设置最大轮询间隔时间(毫秒),默认值为 300000(5分钟)# 如果两次 poll() 之间的时间超过此配置值,可能导致 rebalance, 消费者会被剔除 此处设置10分钟max-poll-interval-ms: 600000# 批量一次最大拉取数据量max-poll-records: 1000batch:# 批消费并发量,小于或等于Topic的分区数concurrency: 3listener:# 在侦听器容器中运行的线程数。concurrency: 5#listner负责ack,每调用一次,就立即commitack-mode: manual_immediatemissing-topics-fatal: falsetopics:# 自定义主题名称twsm: webSocket_send_message_devgroup-id: group-idtopic-name:- topic1

测试发送消息到kafka

/*** Kafka测试** @version 1.0* @author: web* @date: 2024/1/18 15:07*/
@Slf4j
@RestController
@RequestMapping("/message/kafkaTest")
public class KafkaTestController extends BaseController
{@Autowiredprivate KafkaUtils kafkaUtils;/*** 生产者_推送消息到kafka** @param msg* @author: web* @return: AjaxResult* @date: 2024/1/18 15:16*/@PostMapping("/send")public AjaxResult send(@RequestBody Map<String, Object> msg){try{String userId = msg.get("userId").toString();Object content = msg.get("content");Message message = kafkaUtils.setMessage(userId, content);kafkaUtils.send(KafkaUtils.TOPIC_TEST, message);}catch (Exception e){log.error("生产者_推送消息到kafka发生异常");}return success();}/*** 消费者1** @param record* @param ack* @param topic* @author: web* @return: void* @date: 2024/1/18 15:07*/@KafkaListener(topics = KafkaUtils.TOPIC_TEST)public void topicTest1(ConsumerRecord<?, ?> record, Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic){Optional message = Optional.ofNullable(record.value());if (message.isPresent()){Object msg = message.get();log.info("topic.group1 消费了: Topic:" + topic + ",Message:" + msg);ack.acknowledge();}}/*** 消费者2** @param record* @param ack* @param topic* @author: web* @return: void* @date: 2024/1/18 15:07*///    @KafkaListener(topics = KafkaUtils.TOPIC_TEST, groupId = KafkaUtils.TOPIC_GROUP2)//    public void topicTest2(ConsumerRecord<?, ?> record, Acknowledgment ack,//                           @Header(KafkaHeaders.RECEIVED_TOPIC) String topic)//    {////        Optional message = Optional.ofNullable(record.value());//        if (message.isPresent())//        {//            Object msg = message.get();//            log.info("topic.group2 消费了: Topic:" + topic + ",Message:" + msg);//            ack.acknowledge();//        }//    }}

KafkaUtils类

/*** 生产者** @version: 1.0* @author: web* @date: 2024/1/18 10:37*/
@Component
@Slf4j
public class KafkaUtils
{@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;/*** 自定义topic*/public static final String TOPIC_TEST = "topic.code-safe";/*** 自定义消费组*/public static final String TOPIC_GROUP1 = "topic.group1";public static final String TOPIC_GROUP2 = "topic.group2";// 业务相关topic/*** 主题: webSocket发送消息到客户端*/public static String TOPIC_WEBSOCKET_SEND_MESSAGE;@Autowiredprivate String[] kafkaTopicName;/*** 获取配置文件中的盐值,并设置到静态变量中** @param topic 主题*/@Value("${spring.kafka.topics.twsm}")private void setTwsmTopic(String topic){TOPIC_WEBSOCKET_SEND_MESSAGE = topic;}/*** 发送消息** @param topic   主题* @param message 消息内容* @author: web* @return: void* @date: 2024/1/18 10:42*/public void send(String topic, Object message){if (StringUtils.isEmpty(topic) || StringUtils.isNull(message)){throw new ServiceException("生产者发送消息到kafka_主题或消息内容不能为空!");}String obj2String = JsonUtils.toJsonString(message);//        log.info("准备发送消息为:{}", obj2String);//发送消息ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, obj2String);// 监听回调future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>(){@Overridepublic void onFailure(Throwable throwable){//发送失败的处理log.error(topic + " - 生产者 发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> stringObjectSendResult){//成功的处理
//                log.info(topic + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());}});}/*** 设置websocket发送的消息体** @param userId 用户ID* @param msg    消息内容* @author: web* @return: Message 消息对象* @date: 2024/1/19 11:36*/public Message setMessage(String userId, Object msg){Message message = new Message();message.setSendUserId(userId);message.setSendTime(DateUtils.getTime());message.setSendContent(String.valueOf(msg));return message;}
}

Message类

@Data
public class Message implements Serializable
{private static final long serialVersionUID = -118L;/*** 发送人ID*/private String sendUserId;/*** 发送人*///    private String sendUserName;/*** 发送时间*/private String sendTime;/*** 发送内容*/private String sendContent;
}

监听消息

/*** 消息接收监听器【分布式系统】** @version: 1.0* @author: web* @date: 2024/1/19 13:44*/
@Component
@Slf4j
public class MessageListener
{/*** 根据用户id发送消息到客户端** @param record* @param ack* @param topic* @author: web* @return: void* @date: 2024/1/20 22:05*/@KafkaListener(topics = "#{'${spring.kafka.topics.twsm}'}", groupId = "#{topicGroupId}")public void sendMessageByUserId(ConsumerRecord<String, String> record, Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic){Optional<String> optional = Optional.ofNullable(record.value());if (optional.isPresent()){Message message = JsonUtils.parseObject(optional.get(), Message.class);if (StringUtils.isNull(message)){log.error("消费者收到kafka消息的内容为空!");return;}
//            log.info("消费者收到kafka消息");String sendUserId = message.getSendUserId();String sendContent = message.getSendContent();// 确认收到消息ack.acknowledge();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/272990.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LVS集群(Linux Virtual server)

集群概念lvs模型lvs调度算法lvs实现lvs高可用性&#xff0c;负载均衡 1 集群和分布式 系统性能扩展方式&#xff1a; Scale UP&#xff1a;垂直扩展&#xff0c;向上扩展,增强&#xff0c;性能更强的计算机运行同样的服务 升级单机的硬件设备Scale Out&#xff1a;水平扩展…

MySQL通过SQL语句进行递归查询

这里主要是针对于MySQL8.0以下版本&#xff0c;因为MySQL8.0版本出来了一个WITH RECURSIVE函数专门用来进行递归查询的 先看下表格数据&#xff0c;就是很普通的树结构数据&#xff0c;通过parentId关联上下级关系 下面我们先根据上级节点id递归获取所有的下级节点数据&#x…

回归测试重复测试

重复测试和回归测试在测试的过程中都会遇到过&#xff0c;出现的概率都是高频的&#xff0c;两者如何区分如下图&#xff1a; 回归测试 回归测试是什么&#xff1f; 回归测试&#xff08;Regression Testing&#xff09;是指在软件修改之后&#xff0c;对已有功能点重新执行测…

C/C++编程-理论学习-通信协议理论

通信协议理论 protobuf简述使用简介proto 文件为了nanopb 编译.proto文件修改生成器行为 streamsoutput streamsinput streams Data types(数据类型)Field callbacks(字段回调)Encoding callbacks(编码回调)Message descriptor(信息描述)三个关键字required、optional、repeate…

【C++】函数模板和类模板

目录 1.泛型编程 2.函数模板 2.1函数模板的定义格式 2.2函数模板的实例化 2.3函数模板参数的匹配原则 3.类模板 3.1类模板的定义格式 3.2类模板的实例化 3.3模板的分离编译 1.泛型编程 泛型编程&#xff1a;编写与类型无关的通用代码&#xff0c;是代码复用的一种手段…

分割模型TransNetR的pytorch代码学习笔记

这个模型在U-net的基础上融合了Transformer模块和残差网络的原理。 论文地址&#xff1a;https://arxiv.org/pdf/2303.07428.pdf 具体的网络结构如下&#xff1a; 网络的原理还是比较简单的&#xff0c; 编码分支用的是预训练的resnet模块&#xff0c;解码分支则重新设计了。…

抖音素材网站去哪下载?给你推荐六个抖音自媒体网站

各位抖音视频创作达人们&#xff0c;是否在苦苦寻觅那些能够点燃观众热情&#xff0c;让视频内容跃然屏上的素材宝库呢&#xff1f;此刻&#xff0c;你们的寻觅之旅将迎来终点&#xff01;我将向你们隆重推荐10个精心挑选的视频素材库&#xff0c;它们定能让你们的抖音视频如同…

【微服务】SpringBoot整合Resilience4j使用详解

目录 一、前言 二、熔断器出现背景 2.1 几个核心概念 2.1.1 熔断 2.1.2 限流 2.1.3 降级 2.2 为什么会出现熔断器 2.3 断路器介绍 2.3.1 断路器原理 三、Resilience4j介绍 3.1 Resilience4j概述 3.1.1 Resilience4j是什么 3.1.2 Resilience4j功能特性 3.2 Resilie…

微服务自动化管理初步认识与使用

目录 一、ETCD 1.1、ETCD简介 对于实施工程师&#xff1a; 1.2、特点 1.3. 使用场景 1.4、 关键字 1.5 工作原理 二、ETCD的安装 2.1、下载路径 2.2、介绍 2.3、具体操作 安装服务端 安装etcd客户端 测试 三、ETCD使用 3.1、前奏具体操作 3.2、 常用操作 一、ET…

利用GPT开发应用001:GPT基础知识及LLM发展

文章目录 一、惊艳的GPT二、大语言模型LLMs三、自然语言处理NLP四、大语言模型LLM发展 一、惊艳的GPT 想象一下&#xff0c;您可以与计算机的交流速度与与朋友交流一样快。那会是什么样子&#xff1f;您可以创建哪些应用程序&#xff1f;这正是OpenAI正在助力构建的世界&#x…

ELFK 分布式日志收集系统

ELFK的组成&#xff1a; Elasticsearch: 它是一个分布式的搜索和分析引擎&#xff0c;它可以用来存储和索引大量的日志数据&#xff0c;并提供强大的搜索和分析功能。 &#xff08;java语言开发&#xff0c;&#xff09;logstash: 是一个用于日志收集&#xff0c;处理和传输的…

Linux系统下使用C++推流多路视频流

先看拉取的视频流效果&#xff1a; 代码如下&#xff1a; 一开始打算使用python写多路视频推流&#xff0c;但在ubuntu系统上搞了好久就是搞不定openh264导致的错误&#xff0c;然后改用c了&#xff0c;代码如下&#xff0c;我这里推了两路视频流&#xff0c;一路是网络摄像头&…

2024护网面试题精选(二)完

0x02. 内网渗透篇 00- 内网渗透的流程 拿到跳板后&#xff0c;先探测一波内网存活主机&#xff0c;用net user /domian命令查看跳板机是否在域 内&#xff0c;探测存活主机、提权、提取hash、进行横向移动&#xff0c;定位dc位置&#xff0c;查看是否有能直接提权域 管的漏洞…

pytorch什么是梯度

目录 1.导数、偏微分、梯度1.1 导数1.2 偏微分1.3 梯度 2. 通过梯度求极小值3. learning rate3. 局部最小值4. Saddle point鞍点 1.导数、偏微分、梯度 1.1 导数 对于yx 2 2 2 的导数&#xff0c;描述了y随x值变化的一个变化趋势&#xff0c;导数是个标量反应的是变化的程度&…

【HTML】HTML基础7.1(无序列表)

目录 标签 属性 效果 注意 标签 <ul> <li>列表里要装的东西</li> <li>列表里要装的东西</li> <li>列表里要装的东西</li> </ul> 属性 type&#xff1a; circle空心圆disc实心圆square方框 效果 circle空心圆效果…

vi/vim编辑器

vi/vim编辑器 vi的特点与运用场景vi的使用简易执行一个案例按键说明第一部分&#xff1a;命令模式的按键说明(光标移动、复制粘贴、查找替换)第二部分&#xff1a;命令模式切换到输入模式的可以按键第三部分&#xff1a;命令模式切换到底线命令模式的可用按键 命令行模式的保存…

【fastllm】学习框架,本地运行,速度还可以,可以成功运行chatglm2模型

1&#xff0c;关于 fastllm 项目 https://www.bilibili.com/video/BV1fx421k7Mz/?vd_source4b290247452adda4e56d84b659b0c8a2 【fastllm】学习框架&#xff0c;本地运行&#xff0c;速度还可以&#xff0c;可以成功运行chatglm2模型 https://github.com/ztxz16/fastllm &am…

ai学习前瞻-python环境搭建

python环境搭建 Python环境搭建1. python的安装环境2. MiniConda安装3. pycharm安装4. Jupyter 工具安装5. conda搭建虚拟环境6. 安装python模块pip安装conda安装 7. 关联虚拟环境运行项目 Python环境搭建 1. python的安装环境 ​ python环境安装有4中方式。 从上图可以了解…

YOLO语义分割标注文件txt还原到图像中

最近做图像分割任务过程中&#xff0c;使用labelme对图像进行标注&#xff0c;得到的数据文件是json&#xff0c;转换为YOLO训练所需的txt格式后&#xff0c;想对标注文件进行检验&#xff0c;即将txt标注文件还原到原图像中&#xff0c;下面是代码&#xff1a; import cv2 im…

C++指针(五)完结篇

个人主页&#xff1a;PingdiGuo_guo 收录专栏&#xff1a;C干货专栏 前言 相关文章&#xff1a;C指针&#xff08;一&#xff09;、C指针&#xff08;二&#xff09;、C指针&#xff08;三&#xff09;、C指针&#xff08;四&#xff09;万字图文详解&#xff01; 本篇博客是介…