在 DDD 中优雅的发送 Kafka 消息

前言
1:host 映射
在这里插入图片描述
下载 SwitchHost 配置一个映射地址。点击 + 添加一个本地环境,之后配置你的 IP kafka 这样就能找这个地址了。IP 为你本地的IP,如果是云服务器就是公网IP地址
使用docker-compose.yml进行一键部署安装

version: '3.0'
# docker-compose -f docker-compose.yml up -d
services:zookeeper:image: zookeeper:3.9.0container_name: zookeeperrestart: alwaysports:- "2181:2181"environment:ZOO_MY_ID: 1ZOO_SERVERS: server.1=zookeeper:2888:3888;2181ZOOKEEPER_CLIENT_PORT: 2181ALLOW_ANONYMOUS_LOGIN: yesTZ: Asia/Shanghainetworks:- my-networkkafka:image: bitnami/kafka:3.7.0container_name: kafkavolumes:- /etc/localtime:/etc/localtimeports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_CFG_LISTENERS: PLAINTEXT://:9092KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1ALLOW_PLAINTEXT_LISTENER: yesKAFKA_MESSAGE_MAX_BYTES: "2000000"KAFKA_ENABLE_KRAFT: noJMX_PORT: 9999TZ: Asia/Shanghaidepends_on:- zookeepernetworks:- my-networkkafka-eagle:image: echo21bash/kafka-eagle:3.0.2container_name: kafka-eagleenvironment:KAFKA_EAGLE_ZK_LIST: zookeeper:2181volumes:- ./kafka-eagle/system-config.properties:/opt/kafka-eagle/conf/system-config.propertiesports:- "8048:8048"depends_on:- kafkanetworks:- my-networknetworks:my-network:driver: bridge

脚本在代码中提供了完整的语句
消息流程
在这里插入图片描述
代码结构
在这里插入图片描述
1:domain 是领域层,提供一个个领域服务。如果一个工程有多个领域,则有不同的 a、b、c 领域包,每个包下有一套【event、model、repository、service】。
2:在领域层定义的 event 事件,里面涵盖了事件消息。而这个事件消息可以让 UserRepository 继承实现。最终完成消息发送。
3:最后是 trigger 触发器层,所有的 http、rpc、job、mq 都是一种触发行为。通过触发器的 listener 监听,来接收 mq 消息。
环境配置

spring:kafka:bootstrap-servers: localhost:9092producer:# 发生错误后,消息重发的次数。retries: 1#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1...# 配置主题
kafka:topic:group: xmg-groupuser: xmg-topic

配置发送事件

@Slf4j
@Component
public class EventPublisher {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;public void publish(String topic, BaseEvent.EventMessage<?> eventMessage) {try {String messageJson = JSON.toJSONString(eventMessage);kafkaTemplate.send(topic, messageJson);log.info("发送MQ消息 topic:{} message:{}", topic, messageJson);} catch (Exception e) {log.error("发送MQ消息失败 topic:{} message:{}", topic, JSON.toJSONString(eventMessage), e);throw e;}}}

事件消息定义

public class UserMessageEvent extends BaseEvent<UserMessageEvent.UserMessage> {@Value("${kafka.topic.user}")private String topic;@Overridepublic EventMessage<UserMessage> buildEventMessage(UserMessage data) {return EventMessage.<UserMessage>builder().id(RandomStringUtils.randomNumeric(11)).timestamp(new Date()).data(data).build();}@Overridepublic String topic() {return topic;}/*** 要推送的事件消息,聚合到当前类下。*/@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic static class UserMessage {private String userId;private String userName;private String userType;}}

事件消息发送

@Service
public class UserRepository extends UserMessageEvent implements IUserRepository {@Resourceprivate EventPublisher publisher;@Overridepublic void doSaveUser(UserEntity userEntity) {// 推送消息publisher.publish(this.topic(), this.buildEventMessage(UserMessageEvent.UserMessage.builder().userId(userEntity.getUserId()).userName(userEntity.getUserName()).userType(userEntity.getUserTypeVO().getDesc()).build()));}}

事件消息监听

@Slf4j
@Component
public class KafkaMessageListener {@KafkaListener(topics = "${kafka.topic.user}", groupId = "${kafka.topic.group}", concurrency = "1")public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional<?> message = Optional.ofNullable(record.value());if (message.isPresent()) {Object msg = message.get();try {// 逻辑处理// 确认消息消费完成,如果抛异常消息会进入重试ack.acknowledge();log.info("Kafka消费成功! Topic:" + topic + ",Message:" + msg);} catch (Exception e) {e.printStackTrace();log.error("Kafka消费失败!Topic:" + topic + ",Message:" + msg, e);}}}}

测试验证

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class UserServiceTest {@Resourceprivate IUserService userService;@Testpublic void test_register() throws InterruptedException {while (true) {UserEntity userEntity = new UserEntity();userEntity.setUserId("10001");userEntity.setUserName("小明哥");userEntity.setUserTypeVO(UserTypeVO.T8);userService.register(userEntity);Thread.sleep(1500);}}}

好了 至此 在 DDD 中优雅的发送 Kafka 消息 学习结束了 友友们 点点关注不迷路 老铁们!!!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/491346.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c#上班,上学,交通方式接口

using System;namespace INTERFACE {abstract class Person{public string Name { get; set; }public int Age { get; set; }public virtual void ShowInfo(){Console.WriteLine($"Name: {Name}, Age: {Age}");}}// 接口 IWorkinterface IWork{void GotoCompany();}/…

Halcon 直连相机

一、相机类别 1、大恒示例 DahengCAM 使用大华相机,待补充... 2、大华例程 GigEVision 2.1 关键算子 1、查询指定图像采集接口信息。 info_framegrabber (GigEVision, info_boards, Information, ValueList) 获取结果 unique_name:302fac01cd50_MachineVision_MVA5B57MG20…

RPC 服务与 gRPC 的入门案例

RPC 协议 RPC&#xff08;Remote Procedure Call Protocol&#xff09;即远程过程调用协议&#xff0c;它是一种通过网络从远程计算机程序上请求服务的协议&#xff0c;允许一个计算机程序可以像调用本地服务一样调用远程服务 。 RPC的主要作用是不同的服务间方法调用就像本地…

机器学习-正则化技术

文章目录 拟合正则化正则项L1 正则化&#xff08;Lasso&#xff09;L2 正则化&#xff08;Ridge&#xff09; 多元线性回归的正则化回归形式代码 拟合 过拟合&#xff1a;参数&#xff08;特征&#xff09;过多&#xff08;理解为考虑很多因素)或者说过多专注于原来的训练数据…

数据可视化-2. 条形图

目录 1. 条形图适用场景分析 1.1 比较不同类别的数据 1.2 展示数据分布 1.3 强调特定数据点 1.4 展示时间序列数据的对比 1.5 数据可视化教育 1.6 特定领域的应用 2. 条形图局限性 3. 条形图图代码实现 3.1 Python 源代码 3.2 条形图效果&#xff08;网页显示&#…

【DBeaver】连接带kerberos的hive[Apache|HDP]

目录 一、安装配置Kerberos客户端环境 1.1 安装Kerberos客户端 1.2 环境配置 二、基于Cloudera驱动创建连接 三、基于Hive原生驱动创建连接 一、安装配置Kerberos客户端环境 1.1 安装Kerberos客户端 在Kerberos官网下载,地址如下&#xff1a;https://web.mit.edu/kerberos…

SpringBoot+IDEA工具框架快捷键+注解备注

快捷键 ctrlr 搜索替换 ctrlshiftr 全局搜索和替换 altfninsert 自行补全函数和构造函数等 ctrlaltt 可以尝试添加东西 可以加try catch ctrlshiftt 生成接口对应的测试函数 ctrlh 可以查看当前类的一个继承和实现关系 大写CD回车 ide会自动生成cdata区的标签 x…

AI前沿分析:ChatGPT搜索上线,Google搜索地位能否守住?

名人说&#xff1a;莫听穿林打叶声&#xff0c;何妨吟啸且徐行。—— 苏轼 Code_流苏(CSDN)&#xff08;一个喜欢古诗词和编程的Coder&#x1f60a;&#xff09; 目录 引言&#xff1a;AI与搜索领域的激烈博弈一、ChatGPT搜索的优势是什么&#xff1f;1. 实时信息获取&#xf…

ScottPlot学习的常用笔记

ScottPlot学习的常用笔记 写在前面版本的选择第一个障碍&#xff1a;版本问题。 ScottPlot4.0的官方网站与示例官方起始页cookbook5.0Demo4.1 demo以4.1为例&#xff0c;解压和运行如下&#xff1a; 下载源代码和编译先说结论&#xff1a; 写在前面 之前调研的TraceCompass&am…

客户端(浏览器)vue3本地预览txt,doc,docx,pptx,pdf,xlsx,csv,

预览文件 1、入口文件preview/index.vue2、预览txt3、预览doc4、预览pdf5、预览pptx6、预览xlsx7、预览csv 1、入口文件preview/index.vue 预览样式&#xff0c;如pdf 文件目录如图所示&#xff1a; 代码如下 <template><div class"preview-wrap" ref&…

luckysheet与superslide冲突解决

[现象]控制台报错、界面无法操作 $是jquery。查看源码&#xff0c;发现mousewheel方法来自插件mousewheel&#xff0c;luckysheet初始应该会将mousewheel挂载在jquery上。 在控制台打印jquery取dom及其方法&#xff0c;结果如下&#xff1a; 不存在mousewheel方法&#xff0c…

MongoDB(上)

MongoDB 基础 MongoDB 是什么&#xff1f; MongoDB 是一个基于 分布式文件存储 的开源 NoSQL 数据库系统&#xff0c;由 C 编写的。MongoDB 提供了 面向文档 的存储方式&#xff0c;操作起来比较简单和容易&#xff0c;支持“无模式”的数据建模&#xff0c;可以存储比较复杂…

搭建Tomcat(四)---Servlet容器

目录 引入 Servlet容器 一、优化MyTomcat ①先将MyTomcat的main函数搬过来&#xff1a; ②将getClass()函数搬过来 ③创建容器 ④连接ServletConfigMapping和MyTomcat 连接&#xff1a; ⑤完整的ServletConfigMapping和MyTomcat方法&#xff1a; a.ServletConfigMappin…

构建一个rust生产应用读书笔记四(实战3)

从这一节开始&#xff0c;我们将继续完善邮件订阅生产级应用&#xff0c;根据作者的选型sqlx作为数据库操作的类库&#xff0c;它有如下优点&#xff1a; 它旨在提供高效、安全且易于使用的数据库交互体验。sqlx 支持多种数据库&#xff0c;包括 PostgreSQL、MySQL 和 SQLite&…

网络安全-------防止被抓包

1.Ios应用网络安全之https 安全套接字层 (Secure Socket Layer, SSL) 是用来实现互联网安全通信的最普遍的标准。Web 应用程序使用 HTTPS&#xff08;基于 SSL 的 HTTP&#xff09;&#xff0c;HTTPS 使用数字证书来确保在服务器和客户端之间进行安全、加密的通信。在 SSL 连接…

WebSocket 与 Server-Sent Events (SSE) 的对比与应用

目录 ✨WebSocket&#xff1a;全双工通信的利器&#x1f4cc;什么是 WebSocket&#xff1f;&#x1f4cc;WebSocket 的特点&#x1f4cc;WebSocket 的优点&#x1f4cc;WebSocket 的缺点&#x1f4cc;WebSocket 的适用场景 ✨Server-Sent Events (SSE)&#xff1a;单向推送的轻…

CAD c# 生成略缩图预览

代码如下&#xff1a; using (Transaction tr currentdb.TransactionManager.StartTransaction()){//当前数据库开启事务using (Database tempdb new Database(false, true)) //创建临时数据库(两个参数&#xff1a;是否创建符号表&#xff0c;不与当前文档关联){try{Bitmap …

娱乐五子棋(附加源码)

一写在开头 上期代码主要实现瀑布流功能&#xff0c;本期就来实现五子棋小游戏&#xff0c;开发久了很多功能都是通过框架组件库来完成&#xff0c;但是如果组件满足不了开发需求&#xff0c;还需要开发人员手动封装组件&#xff0c;专门出这样一期文章&#xff0c;通过原生js实…

XMOS将在CES 2025上展出多款由边缘AI驱动的创新音效、音频、识别和处理解决方案

全球智能物联网技术领导者暨匠心独到的半导体科技企业XMOS宣布&#xff1a;该公司将再次参加2025年国际消费电子展&#xff08;CES 2025&#xff09;&#xff0c;并将在本届CES上展出一系列由人工智能&#xff08;AI&#xff09;驱动的全新空间音效、语音捕获与降噪、音视频多模…

HCIA-Access V2.5_2_2_2网络通信基础_IP编址与路由

网络层数据封装 首先IP地址封装在网络层&#xff0c;它用于标识一台网络设备&#xff0c;其中IP地址分为两个部分&#xff0c;网络地址和主机地址&#xff0c;通过我们采用点分十进制的形式进行表示。 IP地址分类 对IP地址而言&#xff0c;它细分为五类&#xff0c;A,B,C,D,E,…