牛客项目(五)-使用kafka实现发送系统通知

kafka入门以及与spring整合

在这里插入图片描述

Message.java

import java.util.Date;public class Message {private int id;private int fromId;private int toId;private String conversationId;private String content;private int status;private Date createTime;public int getId() {return id;}public void setId(int id) {this.id = id;}public int getFromId() {return fromId;}public void setFromId(int fromId) {this.fromId = fromId;}public int getToId() {return toId;}public void setToId(int toId) {this.toId = toId;}public String getConversationId() {return conversationId;}public void setConversationId(String conversationId) {this.conversationId = conversationId;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}public int getStatus() {return status;}public void setStatus(int status) {this.status = status;}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}@Overridepublic String toString() {return "Message{" +"id=" + id +", fromId=" + fromId +", toId=" + toId +", conversationId='" + conversationId + '\'' +", content='" + content + '\'' +", status=" + status +", createTime=" + createTime +'}';}
}

EventConsumer.java

定义事件消费者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Component
public class EventConsumer implements CommunityConstant {
//    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@Autowiredprivate DiscussPostService discussPostService;@Autowiredprivate ElasticsearchService elasticsearchService;//加一个监听相关主题的listener@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record){if(record == null || record.value()==null){System.out.println("错误发帖");return;}Event event= JSONObject.parseObject(record.value().toString(),Event.class);if(event == null){System.out.println("错误发帖");return;}//发送站内通知Message message = new Message();message.setFromId(SYSTEM_USERID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());message.setCreateTime(new Date());//message的内容Map<String,Object> content=new HashMap<>();content.put("userId",event.getUserId());content.put("entityType",event.getEntityType());content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for(Map.Entry<String,Object> entry:event.getData().entrySet()){content.put(entry.getKey(),entry.getValue());}}message.setContent(JSONObject.toJSONString(content));System.out.println(message);messageService.addMessage(message);System.out.println("成功处理事件");}}

Event.java

定义一个事件实体 以方便在消息的发送与处理

import java.util.HashMap;
import java.util.Map;//用于事件驱动的kafka消息队列开发
public class Event {private String topic;//事件触发的人private int userId;//事件发生在哪个实体private int entityType;private int entityId;//实体作者private int entityUserId;//存储额外数据private Map<String,Object> data = new HashMap<>();public String getTopic() {return topic;}public Event setTopic(String topic) {this.topic = topic;return this;}public int getUserId() {return userId;}public Event setUserId(int userId) {this.userId = userId;return this;}public int getEntityType() {return entityType;}public Event setEntityType(int entityType) {this.entityType = entityType;return this;}public int getEntityId() {return entityId;}public Event setEntityId(int entityId) {this.entityId = entityId;return this;}public int getEntityUserId() {return entityUserId;}public Event setEntityUserId(int entityUserId) {this.entityUserId = entityUserId;return this;}public Map<String, Object> getData() {return data;}public Event setData(String key,Object value) {this.data.put(key,value);return this;}}

EventProducer.java

定义事件的生产者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.Event;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;@Component
public class EventProducer {
//生产者使用kafkaTemplate发送消息@AutowiredKafkaTemplate kafkaTemplate;//处理事件public void fireEvent(Event event){//将事件发布到指定的主题//将event转换为json数据进行消息发送kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));System.out.println("成功发送"+event.getTopic());}
}

EventConsumer.java

定义事件消费者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Component
public class EventConsumer implements CommunityConstant {
//    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@Autowiredprivate DiscussPostService discussPostService;@Autowiredprivate ElasticsearchService elasticsearchService;//加一个监听相关主题的listener@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record){if(record == null || record.value()==null){System.out.println("错误发帖");return;}Event event= JSONObject.parseObject(record.value().toString(),Event.class);if(event == null){System.out.println("错误发帖");return;}//发送站内通知Message message = new Message();message.setFromId(SYSTEM_USERID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());message.setCreateTime(new Date());//message的内容Map<String,Object> content=new HashMap<>();content.put("userId",event.getUserId());content.put("entityType",event.getEntityType());content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for(Map.Entry<String,Object> entry:event.getData().entrySet()){content.put(entry.getKey(),entry.getValue());}}message.setContent(JSONObject.toJSONString(content));System.out.println(message);messageService.addMessage(message);System.out.println("成功处理事件");}}

在特定的地方触发消息产生

CommentController

 //触发评论事件Event event=new Event().setTopic(TOPIC_COMMENT).setUserId(hostHolder.getUser().getId()).setEntityType(comment.getEntityType()).setEntityId(comment.getEntityId()).setData("postId",discussPostId);if(comment.getEntityType() == ENTITY_TYPE_POST){DiscussPost target=discussPostService.findDiscussPostById(comment.getEntityId());event.setEntityUserId(target.getUserId());}else if(comment.getEntityType()==ENTITY_TYPE_COMMENT){//根据评论的id查询评论Comment target =commentService.findCommentById(comment.getEntityId());event.setEntityUserId(target.getUserId());}eventProducer.fireEvent(event);

LikeController

 //触发点赞事件if(likeStatus ==1){Event event =new Event().setTopic(TOPIC_LIKE).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityUserId).setData("postId",postId);eventProducer.fireEvent(event);}

FollowController

 //触发关注事件Event event = new Event().setTopic(TOPIC_FOLLOW).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityId);eventProducer.fireEvent(event);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/180256.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Marp: 将 Markdown 变为 PPT 式样的 VScode 插件

样例代码&#xff1a; --- marp: true size: 16:9 theme: default header: footer: --- <!-- _footer: Jia ming<br>Gansu University of Political Science and Law --> <!-- _backgroundColor: lightskyblue --> ## <!-- fit --> 笔记检验概述>…

pytorch 中 nn.Conv2d 解释

1. pytorch nn.Con2d 中填充模式 torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride1, padding0, dilation1, groups1, biasTrue, padding_mode‘zeros’, deviceNone, dtypeNone) 1.1 padding 参数的含义 首先 &#xff0c;padd N, 代表的是 分别在 上下&…

鉴源实验室 | 自动驾驶传感器攻击研究

作者 | 付海涛 上海控安可信软件创新研究院汽车网络安全组 来源 | 鉴源实验室 社群 | 添加微信号“TICPShanghai”加入“上海控安51fusa安全社区” 01 自动驾驶汽车的脆弱性 自2015年以来&#xff0c;汽车的信息安全问题受到国内外的广泛关注。而随着汽车的智能化与网联化的…

上海物理、化学高考命题趋势及2024年上海物理、化学高考备考建议

在上海高考时&#xff0c;物理、化学虽然不像语文、英语和数学那样分数高&#xff0c;但是仍然很重要。那么&#xff0c;从这几年的上海物理、化学的高考题目来看&#xff0c;我们互发现什么命题趋势和考题特点呢&#xff1f;如何备考接下来的2024年高考物理和化学呢&#xff1…

Ubuntu下安装vscode,并解决终端打不开vscode的问题

Visual Studio Code安装 1&#xff0c;使用 apt 安装 Visual Studio Code 在官方的微软 Apt 源仓库中可用。按照下面的步骤进行即可&#xff1a; 以 sudo 用户身份运行下面的命令&#xff0c;更新软件包索引&#xff0c;并且安装依赖软件&#xff1a; sudo apt update sud…

uniapp 微信小程ios端键盘弹起后导致页面无法滚动

项目业务逻辑和出现的问题整理 新增页面 用户可以主动添加输入文本框 添加多了就会导致当前页面出现滚动条,这就导致ios端滚动页面的时候去点击输入框键盘抬起再关闭的时候去滚动页面发现页面滚动不了(偶尔出现),经过多次测试发现是键盘抬起的时候 主动向上滑动 100%出现这种问…

【Linux】进程的概念

文章目录 1. 基本概念2. 进程的描述3. 进程的一些基本操作3.1 查看进程3.2 结束进程3.3 通过系统调用获取进程标示符3.4 通过系统调用来创建子进程 4. 进程状态4.1 操作系统的进程状态4.2 Linux对于这些状态的处理方式 1. 基本概念 什么是进程&#xff1f; 在回答这个问题之前…

云安全—docker Deamon攻击面

0x00 前言 本篇文章主要是讲docker Deamon的原理以及docker Deamon攻击面相关的内容&#xff0c;属于抛砖引玉系列&#xff0c;如有不妥之处还请斧正。 0x01 docker Deamon 还是先来看一下docker Deamon的一些相关知识&#xff0c;依旧是采用问答的方式来进行。为了文章的整…

2023-11-04:用go语言,如果n = 1,打印 1*** 如果n = 2,打印 1*** 3*** 2*** 如果n = 3,打印

2023-11-04&#xff1a;用go语言&#xff0c;如果n 1&#xff0c;打印 1*** 如果n 2&#xff0c;打印 1***3*** 2*** 如果n 3&#xff0c;打印 1***3*** 2***4*** 5*** 6*** 如果n 4&#xff0c;打印 1***3*** 2***4*** 5*** 6***10** 9*** 8*** 7*** 输入…

Spring底层原理(六)

Spring底层原理(六) 本章内容 介绍AOP的实现方式、JDK代理的模拟实现与源码 AOP的实现方式 使用代理模式 jdk动态代理cglib动态代理 使用aspectj的编译器&#xff0c;该编译器会直接对字节码进行修改&#xff0c;可以实现静态方法增强 使用javaagent,在jvm option中指定-…

高匿IP有什么作用

在互联网的蓬勃发展中&#xff0c;IP地址作为网络通信的基础&#xff0c;一直扮演着举足轻重的角色。而在诸多IP地址中&#xff0c;高匿IP地址则是一种特殊类型&#xff0c;其作用和价值在某些特定场合下尤为突出。那么&#xff0c;高匿IP地址究竟有哪些用处呢&#xff1f; 首先…

[动态规划] (五) 路径问题: LeetCode 62.不同路径

[动态规划] (五) 路径问题: LeetCode 62.不同路径 文章目录 [动态规划] (五) 路径问题: LeetCode 62.不同路径题目解析解题思路状态表示状态转移方程初始化和填表返回值 代码实现总结 62. 不同路径 题目解析 (1) 机器人从左上角到右下角有多少方法 (2) 机器人只能向左或者向右…

掌握Maven和SpringBoot的灵活性:定制化lib目录和依赖范围

前言 在开发基于Maven和SpringBoot的项目时&#xff0c;我们经常会使用第三方库来满足需求。然而&#xff0c;有时候我们需要更灵活地控制这些库的依赖范围和加载方式。本文将介绍如何使用Maven和SpringBoot实现定制化的lib目录和依赖范围。经过如下定制化后&#xff0c;打包执…

[PyTorch][chapter 61][强化学习-免模型学习1]

前言&#xff1a; 在现实的学习任务中&#xff0c;环境 其中的转移概率P,奖赏函数R 是未知的&#xff0c;或者状态X也是未知的 称为免模型学习&#xff08;model-free learning&#xff09; 目录&#xff1a; 1: 蒙特卡洛强化学习 2&#xff1a;同策略-蒙特卡洛强化学习 3&am…

阿里云免费服务器

文章目录 最近的阿里云活动By the way在云服务器ECS上搭建个人网站正文补充:定期释放补充:不知道阿里云服务器的密码怎么办?成果补充&#xff1a;怎么找到实例操作的后台&#xff1f;补充&#xff1a;怎么查看服务器到期时间&#xff1f; 究竟白嫖了多少&#xff1f;最后&…

修复dinput8.dll文件的缺失,以及修复dinput8.dll文件时需要注意什么

dinput8.dll文件通常在使用大型游戏时容易出现dinput8.dll文件丢失的情况&#xff0c;今天这篇文章将要教大家修复dinput8.dll文件的缺失&#xff0c;同时在修复dinput8.dll文件时需要注意些什么&#xff1f;防止文件在修复的过程中出现其他的错误。 dinput8.dll是DirectInput库…

部署ELK

一、elasticsearch #拉取镜像 docker pull elasticsearch:7.12.1 #创建ELK docker网络 docker network create elk #启动ELK docker run -d --name es --net elk -P -e "discovery.typesingle-node" elasticsearch:7.12.1 #拷贝配置文件 docker cp es:/usr/share/el…

Visual Studio Code 常用快捷键大全

Visual Studio Code 常用快捷键大全 快捷键是编码过程中经常使用&#xff0c;且能够极大提升效率的部分&#xff0c;这里给大家介绍一些VS Code中非常有用的快捷键。 打开和关闭侧边栏 Mac — Command B Windows — Ctrl B Ubuntu — Ctrl B 选择单词 Mac — Command D …

“第六十天”

SRAM和DRAM&#xff1a; DRAM&#xff1a;动态RAM&#xff08;随机存期存储器&#xff09;&#xff0c;是使用栅极电容存储信息的&#xff1b; SRAM&#xff1a;静态RAM&#xff0c;是使用双稳态触发器存储信息的。 重点在于DRAM由于要通过电容放电来表示信息&#xff0c;所…

逆袭Flutter? Facebook 发布全新跨平台引擎 Hermes!

Facebook 于前日发布了新的 JavaScript 引擎&#xff1a;Hermes&#xff0c;专注于提高 React Native 应用的性能&#xff0c;并且在市面上那些内存较少、存储速度较慢且计算能力低下的移动设备上都有良好的表现。但是不是为了追赶Flutter&#xff1f;这块作者没有说明。 移动应…