Kafka应用Demo:按主题订阅消费消息

安装环境

  Kafka安装可参考官方网站的指导(https://kafka.apache.org/quickstart), 按步骤解压压缩包,修改配置。然后再启动zookeeper和kafka-server即可。

  需要注意的一点:如果是在VMware虚拟机上启动的kafka, 需要修改一下server.properties配置文件,增加如下配置:
在这里插入图片描述
  advertised.listener指定访问kafka的IP和端口,IP设置为虚拟机暴露给外部访问的IP。通过本地代码连接kafka,需要使用该配置

生产者代码样例

public class KafkaProducerService {private static final String NEO_TOPIC = "elon-topic";private KafkaProducer<String, String> producer = null;public KafkaProducerService() {Properties props = new Properties();props.put("bootstrap.servers", "192.168.5.128:9092");props.put("acks", "0");props.put("group.id", "1111");props.put("retries", "2");//设置key和value序列化方式props.put("key.serializer", StringSerializer.class);props.put("value.serializer", StringSerializer.class);//生产者实例producer = new KafkaProducer<>(props);}/*** 外部调用的发消息接口*/public void sendMessage() {for (int i = 0; i < 10; ++i) {int p = i % 2;ProducerRecord<String, String> record = new ProducerRecord(NEO_TOPIC, p, "neo", JSON.toJSONString(i));producer.send(record);}}
}

 发送消息时,将10个数据分别发送到0分区和1分区。

消费者代码样例

public class KafkaConsumerService {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);private static final String NEO_TOPIC = "elon-topic";Properties properties = new Properties();private KafkaConsumer consumer = null;public KafkaConsumerService() {properties.put("bootstrap.servers","192.168.5.128:9092");  // 指定 Brokerproperties.put("group.id", "neo1");              // 指定消费组群 IDproperties.put("max.poll.records", "5");properties.put("enable.auto.commit", "false");properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象properties.put("value.deserializer", StringDeserializer.class);  // 将 value 的字节数组转成 Java 对象consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(Collections.singletonList(NEO_TOPIC));  // 订阅主题 order-eventsnew Thread(this::receiveMessage).start();}public void receiveMessage() {try {while (true) {synchronized (this) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));LOGGER.info("Fetch record num:{}", records.count());for (ConsumerRecord<String,String> record: records) {String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]",record.topic(), record.partition(), record.offset(), record.key(), record.value());LOGGER.info("Received:" + info);Thread.sleep(100);}consumer.commitSync();}}} catch (Exception e){} finally {consumer.close();}}

 消费者按主题订阅。从打印的结果可以看到,消费者循环从topic下取出各个分区的消息依次消费。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/327046.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI浪潮再起,2024年中国大模型产业深度解析

国内 AI大模型产业发展深度分析 2024 人工智能技术的迅猛发展&#xff0c;使AI大模型成为科技竞争的核心、产业变革的先锋、经济增长的新动力。我国已将人工智能列为国家战略&#xff0c;出台系列政策扶持其发展&#xff0c;为AI大模型产业创造优越环境&#xff0c;展现巨大潜力…

CentOS 7安装配置docker

CentOS 7、8安装、配置docker 这里宿主机的型号选择是centos7.9.2009的版本 1.宿主机关闭防火墙和selinux&#xff0c;配置ipv4 #设置SELinuxdisabled vim /etc/selinux/config SELinuxdisabled 查看防火墙状态&#xff1a;firewall-cmd --state 关闭防火墙&#xff1a;syst…

FloodFill算法---BFS

目录 一、前言 二、算法模板套路 2.1 创建所需的全局变量&#xff1a; 2.2 BFS模板&#xff1a; 2.3 细节处理&#xff1a; 三、例题练习 3.1 例题1&#xff1a;图像渲染 3.2 例题2&#xff1a;岛屿数量 3.3 例题3&#xff1a;岛屿的最大面积 3.4 例题4&#xff1a;被…

在做题中学习(54):点名

LCR 173. 点名 - 力扣&#xff08;LeetCode&#xff09; 此题有不同的几种解法&#xff1a; 解法一&#xff1a;暴力枚举 O(n); 解法二&#xff1a;哈希表 把原数组丢入哈希表&#xff0c;遍历哈希表&#xff0c;看看哪个数值为0即可。 O(n)空间O(n)时间 解法三&…

OpenAI推出新模型GPT-4o:可实时交互,检测人的情绪,支持多模态输出

GPT-4o作为OpenAI新发布的人工智能模型&#xff0c;据官方及媒体报道&#xff0c;是面向全球用户发布的&#xff0c;包括中国在内的用户理论上应该能够通过相应平台和应用访问。不过&#xff0c;实际可用性还需考虑地区政策、网络访问限制以及具体平台是否在中国有本地化服务等…

1694jsp宿舍管理系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 JSP 宿舍管理系统 是一套完善的web设计系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统采用web模式&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库…

网络安全快速入门(十一)vi/vim

11.1 了解vi 前面我i们已经在基础命令中大致了解了vi&#xff0c;本章我们针对vi来细讲一下&#xff0c;vi和vim 11.1.1 什么是vi/vim&#xff1f; vi和vim&#xff0c;都是一个模块化的文本编辑工具&#xff0c;换句话讲&#xff0c;通过vi下的一系列的命令&#xff0c;可以实…

Redis 源码安装和入门介绍

Linux下的redis源码安装 redis介绍 Redis 是一个开源&#xff08;BSD许可&#xff09;的&#xff0c;内存中的数据结构存储系统&#xff0c;它可以用作数据库、缓存和消息中间件。它支持多种类型的数据结构&#xff0c;如 字符串&#xff08;strings&#xff09;&#xff0c;…

专访安克创新CEO阳萌:仿生算法与存算一体芯片的兴起

在这篇博客中&#xff0c;我们将探讨人工智能的未来发展方向&#xff0c;特别是围绕大模型、存算一体芯片以及仿生算法的讨论。通过对安克创新CEO阳萌的专访内容进行分析&#xff0c;我们将尝试解答一些关于AI发展的关键问题&#xff0c;并对未来的技术趋势进行预测。 引言 …

AD原理图设置:如何在编译工程时,报未连接线或引脚错误

如下图&#xff0c;AD默认在编译原理图时&#xff0c;如果出现未连接的引脚或线时&#xff0c;并不会报相关的错误&#xff0c;这样做其实很危险 所以&#xff0c;我们应该让它提示错误 具体配置方法&#xff1a; 1、找到工程选项 2、切换到第二个选项“Connection Matrix”&a…

OBS插件--源录制

源录制 将应用这个滤镜的源录制成视频保存下来&#xff0c;可以选择音轨&#xff0c;也可以针对应用此滤镜的源单独的推流等。 如果在直播或录制视频的过程中场景里面布置了多个源&#xff0c;而只想保存其中一个源的视频或音频这个插件非常使用。 下面截图演示下操作步骤&a…

面试中的算法(查找缺失的整数)

在一个无序数组里有99个不重复的正整数&#xff0c;范围是1~100&#xff0c;唯独缺少1个1~100中的整数。如何找出这个缺失的整数? 一个很简单也很高效的方法&#xff0c;先算出1~100之和&#xff0c;然后依次减去数组里的元素&#xff0c;最后得到的差值&#xff0c;就是那个缺…

数据库入门(sql文档+命令行)

一.基础知识 1.SQL&#xff08;Structured Query Language&#xff09;结构化查询语言分类&#xff1a; DDL数据定义语言用来定义数据库对象&#xff1a;数据库、表、字段DML数据操作语言对数据库进行增删改查DQL数据查询语言查询数据库中表的信息DCL数据控制语言用来创建数据…

安装adobe系列,提示错误代码146解决办法

安装Adobe系列产品如PS、PR、Lrc等产品时&#xff0c;会因为各种各样的错误导致安装失败&#xff01;今天小编为大家带来的是安装adobe系列&#xff0c;提示错误代码146解决办法&#xff0c;收藏起来吧&#xff01; 方法一&#xff1a;就是传说中的万能大法&#xff0c;关机重启…

苍穹外卖项目---------收获以及改进(9-12)

①Spring Task-------实现系统定时任务 概念&#xff1a; 应用场景&#xff1a; 使用步骤&#xff1a; 实现订单超时和前一天派送中的订单的自动任务处理&#xff1a; Component Slf4j public class Mytask {Autowiredprivate OrderServiceimpl orderServiceimpl;/*** 处理订…

基于uniapp+vue3+ts小程序项目实战之项目初始化

&#x1f680; 作者 &#xff1a;“二当家-小D” &#x1f680; 博主简介&#xff1a;⭐前荔枝FM架构师、阿里资深工程师||曾任职于阿里巴巴担任多个项目负责人&#xff0c;8年开发架构经验&#xff0c;精通java,擅长分布式高并发架构,自动化压力测试&#xff0c;微服务容器化k…

OpenCV使用 Kinect 和其他兼容 OpenNI 的深度传感器(75)

返回:OpenCV系列文章目录&#xff08;持续更新中......&#xff09; 上一篇:使用 OpenCV 创建视频(74) 下一篇 :OpenCV使用 Orbbec Astra 3D 相机(76) 目的&#xff1a;​ 通过 VideoCapture 类支持与 OpenNI 兼容的深度传感器&#xff08;Kinect、XtionPRO 等&#xff09;。…

【数据结构】解密链表之旅(单链表篇)

前言 哈喽大家好&#xff0c;我是野生的编程萌新&#xff0c;首先感谢大家的观看。数据结构的学习者大多有这样的想法&#xff1a;数据结构很重要&#xff0c;一定要学好&#xff0c;但数据结构比较抽象&#xff0c;有些算法理解起来很困难&#xff0c;学的很累。我想让大家知道…

QLExpress入门及实战总结

文章目录 1.背景2.简介3.QLExpress实战3.1 基础例子3.2 低代码实战3.2.1 需求描述3.2.1 使用规则引擎3.3.2 运行结果 参考文档 1.背景 最近研究低代码实现后端业务逻辑相关功能&#xff0c;使用LiteFlow作为流程编排后端service服务, 但是LiteFlow官方未提供图形界面编排流程。…

大型语言模型自我进化综述

24年4月来自北大的论文“A Survey on Self-Evolution of Large Language Models”。 大语言模型&#xff08;LLM&#xff09;在各个领域和智体应用中取得了显着的进步。 然而&#xff0c;目前从人类或外部模型监督中学习的LLM成本高昂&#xff0c;并且随着任务复杂性和多样性的…