【UniApp开发小程序】私聊功能后端实现 (买家、卖家 沟通商品信息)【后端基于若依管理系统开发】

文章目录

  • 声明
  • 聊天数据查询管理
    • 数据库设计
    • Vo
    • Controller
    • Service
    • Mapper
  • WebSocket引入
    • 为什么使用WebSocket
    • 依赖
    • 配置类
    • WebSocket服务
  • RabbitMQ引入
    • 为什么使用消息队列
    • 依赖
    • 启动类添加注解
    • 常量类
    • 使用配置类创建队列、交换机、绑定关系
    • 消息监听器
    • 发送消息到消息队列
  • 延时任务
    • 为什么使用延时任务
    • 延时任务类
    • 延时任务管理

声明

本文提炼于个人练手项目,其中的实现逻辑不一定标准,实现思路没有参考权威的文档和教程,仅为个人思考得出,因此可能存在较多本人未考虑到的情况和漏洞,因此仅供参考,如果大家觉得有问题,恳请大家指出有问题的地方

如果对客户端的实现感兴趣,可以转身查看【UniApp开发小程序】私聊功能uniapp界面实现 (买家、卖家 沟通商品信息)【后端基于若依管理系统开发】

聊天数据查询管理

数据库设计

【私信表】
在这里插入图片描述

Vo

package com.ruoyi.common.core.domain.vo;import lombok.Data;import java.util.Date;/*** @Author dam* @create 2023/8/22 21:39*/
@Data
public class ChatUserVo {private Long userId;private String userAvatar;private String userName;private String userNickname;/*** 最后一条消息的内容*/private String lastChatContent;/*** 最后一次聊天的日期*/private Date lastChatDate;/*** 未读消息数量*/private Integer unReadChatNum;
}

Controller

其中两个方法较为重要,介绍如下:

  • listChatUserVo:当用户进入消息界面的时候,需要查询出最近聊天的用户,其中还需要展示一些信息,如ChatUserVo的属性
  • listChat:该方法用于查询对方最近和自己的私聊内容,当用户查询了这些私聊内容,默认用户已经看过了,将这些私聊内容设置为已读状态
package com.shm.controller;import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletResponse;import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ruoyi.common.core.domain.entity.Chat;
import com.ruoyi.common.core.domain.vo.ChatUserVo;
import com.shm.service.IChatService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.ruoyi.common.annotation.Log;
import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.enums.BusinessType;
import com.ruoyi.common.utils.poi.ExcelUtil;
import com.ruoyi.common.core.page.TableDataInfo;/*** 聊天数据Controller** @author dam* @date 2023-08-19*/
@RestController
@RequestMapping("/market/chat")
@Api
public class ChatController extends BaseController {@Autowiredprivate IChatService chatService;/*** 查询聊天数据列表*/@PreAuthorize("@ss.hasPermi('market:chat:list')")@GetMapping("/list")public TableDataInfo list(Chat chat) {startPage();List<Chat> list = chatService.list(new QueryWrapper<Chat>(chat));return getDataTable(list);}/*** 查询最近和自己聊天的用户*/@ApiOperation("listChatUserVo")@PreAuthorize("@ss.hasPermi('market:chat:list')")@GetMapping("/listChatUserVo")public TableDataInfo listChatUserVo() {startPage();String username = getLoginUser().getUsername();List<ChatUserVo> list = chatService.listChatUserVo(username);return getDataTable(list);}/*** 查询用户和自己最近的聊天信息*/@ApiOperation("listUsersChatWithMe")@PreAuthorize("@ss.hasPermi('market:chat:list')")@GetMapping("/listChat/{toUsername}")public TableDataInfo listChat(@PathVariable("toUsername") String toUsername) {String curUsername = getLoginUser().getUsername();startPage();List<Chat> list = chatService.listChat(curUsername, toUsername);for (Chat chat : list) {System.out.println("chat:"+chat.toString());}System.out.println();// 查出的数据,如果消息是对方发的,且是未读状态,重新设置为已读List<Long> unReadIdList = list.stream().filter((item1) -> {if (item1.getIsRead() == 0 && item1.getFromWho().equals(toUsername)) {return true;} else {return false;}}).map(item2 -> {return item2.getId();}).collect(Collectors.toList());System.out.println("将"+ unReadIdList.toString()+"设置为已读");if (unReadIdList.size() > 0) {// 批量设置私聊为已读状态chatService.batchRead(unReadIdList);}return getDataTable(list);}/*** 导出聊天数据列表*/@PreAuthorize("@ss.hasPermi('market:chat:export')")@Log(title = "聊天数据", businessType = BusinessType.EXPORT)@PostMapping("/export")public void export(HttpServletResponse response, Chat chat) {List<Chat> list = chatService.list(new QueryWrapper<Chat>(chat));ExcelUtil<Chat> util = new ExcelUtil<Chat>(Chat.class);util.exportExcel(response, list, "聊天数据数据");}/*** 获取聊天数据详细信息*/@PreAuthorize("@ss.hasPermi('market:chat:query')")@GetMapping(value = "/getInfo/{id}")public AjaxResult getInfo(@PathVariable("id") Long id) {return success(chatService.getById(id));}/*** 新增聊天数据*/@PreAuthorize("@ss.hasPermi('market:chat:add')")@Log(title = "聊天数据", businessType = BusinessType.INSERT)@PostMappingpublic AjaxResult add(@RequestBody Chat chat) {return toAjax(chatService.save(chat));}/*** 修改聊天数据*/@PreAuthorize("@ss.hasPermi('market:chat:edit')")@Log(title = "聊天数据", businessType = BusinessType.UPDATE)@PutMappingpublic AjaxResult edit(@RequestBody Chat chat) {return toAjax(chatService.updateById(chat));}/*** 删除聊天数据*/@PreAuthorize("@ss.hasPermi('market:chat:remove')")@Log(title = "聊天数据", businessType = BusinessType.DELETE)@DeleteMapping("/{ids}")public AjaxResult remove(@PathVariable List<Long> ids) {return toAjax(chatService.removeByIds(ids));}
}

Service

package com.shm.service.impl;import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ruoyi.common.core.domain.entity.Chat;
import com.ruoyi.common.core.domain.vo.ChatUserVo;
import com.shm.mapper.ChatMapper;
import com.shm.service.IChatService;
import org.springframework.stereotype.Service;import java.util.List;/*** @author 17526* @description 针对表【chat(聊天数据表)】的数据库操作Service实现* @createDate 2023-08-19 21:12:49*/
@Service
public class IChatServiceImpl extends ServiceImpl<ChatMapper, Chat>implements IChatService {/*** 查询最近和自己聊天的用户** @return*/@Overridepublic List<ChatUserVo> listChatUserVo(String username) {return baseMapper.listChatUserVo(username);}/*** 查询用户和自己最近的聊天信息** @param curUsername* @param toUsername* @return*/@Overridepublic List<Chat> listChat(String curUsername, String toUsername) {return baseMapper.listChat(curUsername, toUsername);}@Overridepublic void batchRead(List<Long> unReadIdList) {baseMapper.batchRead(unReadIdList);}
}

Mapper

package com.shm.mapper;import com.ruoyi.common.core.domain.entity.Chat;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ruoyi.common.core.domain.vo.ChatUserVo;
import org.apache.ibatis.annotations.Param;import java.util.List;/**
* @author 17526
* @description 针对表【chat(聊天数据表)】的数据库操作Mapper
* @createDate 2023-08-19 21:12:49
* @Entity com.ruoyi.common.core.domain.entity.Chat
*/
public interface ChatMapper extends BaseMapper<Chat> {List<ChatUserVo> listChatUserVo(@Param("username") String username);List<Chat> listChat(@Param("curUsername") String curUsername, @Param("toUsername") String toUsername);void batchRead(@Param("unReadIdList") List<Long> unReadIdList);
}

【xml文件】

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapperPUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.shm.mapper.ChatMapper"><resultMap id="BaseResultMap" type="com.ruoyi.common.core.domain.entity.Chat"><id property="id" column="id" jdbcType="BIGINT"/><result property="createTime" column="create_time" jdbcType="TIMESTAMP"/><result property="updateTime" column="update_time" jdbcType="TIMESTAMP"/><result property="isDeleted" column="is_deleted" jdbcType="TINYINT"/><result property="fromWho" column="from_who" jdbcType="BIGINT"/><result property="toWho" column="to_who" jdbcType="BIGINT"/><result property="content" column="content" jdbcType="VARCHAR"/><result property="picUrl" column="pic_url" jdbcType="VARCHAR"/></resultMap><sql id="Base_Column_List">id,create_time,update_time,is_deleted,from,to,content,pic_url</sql><update id="batchRead">update chat set is_read = 1 where id in<foreach collection="unReadIdList" item="chatId" separator="," open="(" close=")">#{chatId}</foreach></update><select id="listChatUserVo" resultType="com.ruoyi.common.core.domain.vo.ChatUserVo">SELECT(CASE WHEN c.from_who=#{username} THEN c.to_who ELSE c.from_who END) AS `userName`,c.content AS `lastChatContent`,c.create_time AS lastChatDate,u.user_id AS userId,u.avatar AS userAvatar,u.nick_name AS userNickname,ur.unReadNum as unReadChatNumFROM(SELECTMAX(`id`) AS chatId,CASEWHEN `from_who` = #{username} THEN `to_who`ELSE `from_who`END AS unameFROM `chat`WHERE `from_who` = #{username} OR `to_who` = #{username}GROUP BY uname) AS tINNER JOIN `chat` c ON c.id = t.chatIdLEFT JOIN `sys_user` u ON t.uname = u.user_nameLEFT JOIN (SELECT from_who, SUM(CASE WHEN is_read=1 THEN 0 ELSE 1 END) AS unReadNum FROM chat WHERE is_deleted=0 AND to_who = #{username} GROUP BY from_who) ur ON ur.from_who = t.unameORDER BY c.create_time DESC</select><select id="listChat" resultType="com.ruoyi.common.core.domain.entity.Chat">SELECT*FROMchatWHERE( from_who = #{curUsername} AND to_who = #{toUsername} )OR ( to_who = #{curUsername} AND from_who = #{toUsername} )ORDER BYcreate_time DESC</select>
</mapper>

【查询最近聊天的用户的用户名和那条消息的id】
因为id是自增的,所以最新的那条消息的id肯定最大,因此可以使用MAX(id)来获取最近的消息

SELECT MAX(`id`) AS chatId,CASE WHEN `from_who` = 'admin' THEN `to_who`ELSE `from_who`END AS unameFROM `chat`WHERE `from_who` = 'admin' OR `to_who` = 'admin'GROUP BY uname

在这里插入图片描述
【内连接私信表获取消息的其他信息】

INNER JOIN `chat` c ON c.id = t.chatId 

【左连接用户表获取用户的相关信息】

LEFT JOIN `sys_user` u ON t.uname = u.user_name

【左联接私信表获取未读对方消息的数量】
CASE WHEN is_read=1 THEN 0 ELSE 1 END 如果已读,说明未读数量为0;否则为1

LEFT JOIN (SELECT from_who, SUM(CASE WHEN is_read=1 THEN 0 ELSE 1 END) AS unReadNum FROM chat WHERE is_deleted=0 AND to_who = 'admin' GROUP BY from_who) ur ON ur.from_who = t.uname

【最后按照用户和自己最后聊天的时间来降序排序】

ORDER BY c.create_time DESC

WebSocket引入

为什么使用WebSocket

WebSocket不仅支持客户端向服务端发送消息,同时也支持服务端向客户端发送消息,这样才能完成私聊的功能。即
用户1-->服务端-->用户2

依赖

<!-- websocket -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

配置类

package com.shm.config;import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
public class WebSocketConfig {/*** 注入一个ServerEndpointExporter,* 该Bean会自动注册使用@ServerEndpoint注解 声明的websocket endpoint*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}}

WebSocket服务

需要注意的是,Websocket是多例模式,无法直接使用@Autowired注解来注入rabbitTemplate,需要使用下面的方式,其中rabbitTemplate为静态变量

private static RabbitTemplate rabbitTemplate;@Autowiredpublic void setRabbitTemplate(RabbitTemplate rabbitTemplate) {WebSocketServer.rabbitTemplate = rabbitTemplate;}
package com.shm.component;import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.common.core.domain.entity.Chat;
import com.shm.component.delay.DelayQueueManager;
import com.shm.component.delay.DelayTask;
import com.shm.constant.RabbitMqConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @author websocket服务*/
@ServerEndpoint(value = "/websocket/{username}")
@Component//将WebSocketServer注册为spring的一个bean
public class WebSocketServer {private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);/*** 记录当前在线连接的客户端的session*/public static final Map<String, Session> usernameAndSessionMap = new ConcurrentHashMap<>();/*** 记录正在进行的聊天的发出者和接收者*/public static final Map<String, Integer> fromToMap = new ConcurrentHashMap<>();/*** 用户Session保留时间,如果超过该时间,用户还没有给服务端发送消息,认为用户下线,删除其Session* 注意:该时间需要比客户端的心跳时间更长*/private static final long expire = 6000;// websocket为多例模式,无法直接注入,需要换成下面的方式
//    @Autowired
//    RabbitTemplate rabbitTemplate;private static RabbitTemplate rabbitTemplate;@Autowiredpublic void setRabbitTemplate(RabbitTemplate rabbitTemplate) {WebSocketServer.rabbitTemplate = rabbitTemplate;}@Autowiredprivate static DelayQueueManager delayQueueManager;@Autowiredpublic void setDelayQueueManager(DelayQueueManager delayQueueManager) {WebSocketServer.delayQueueManager = delayQueueManager;}/*** 浏览器和服务端连接建立成功之后会调用这个方法*/@OnOpenpublic void onOpen(Session session, @PathParam("username") String username) {usernameAndSessionMap.put(username, session);// 建立延时任务,如果到expire时间,客户端还是没有和服务器有任何交互的话,就删除该用户的session,表示该用户下线delayQueueManager.put(new DelayTask(username, expire));log.info("有新用户加入,username={}, 当前在线人数为:{}", username, usernameAndSessionMap.size());}/*** 连接关闭调用的方法*/@OnClosepublic void onClose(Session session, @PathParam("username") String username) {usernameAndSessionMap.remove(username);log.info("有一连接关闭,移除username={}的用户session, 当前在线人数为:{}", username, usernameAndSessionMap.size());}/*** 发生错误的时候会调用这个方法*/@OnErrorpublic void onError(Session session, Throwable error) {log.error("发生错误");error.printStackTrace();}/*** 服务端发送消息给客户端*/public void sendMessage(String message, Session toSession) {try {log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);toSession.getBasicRemote().sendText(message);} catch (Exception e) {log.error("服务端发送消息给客户端失败", e);}}/*** onMessage方法是一个消息的中转站* 1、首先接受浏览器端socket.send发送过来的json数据* 2、然后解析其数据,找到消息要发送给谁* 3、最后将数据发送给相应的人** @param message 客户端发送过来的消息 数据格式:{"from":"user1","to":"admin","text":"你好呀"}*/@OnMessagepublic void onMessage(String message, Session session, @PathParam("username") String username) {
//        log.info("服务端接收到 {} 的消息,消息内容是:{}", username, message);// 收到用户的信息,删除之前的延时任务,创建新的延时任务delayQueueManager.put(new DelayTask(username, expire));if (!usernameAndSessionMap.containsKey(username)) {// 可能用户挂机了一段时间,被下线了,后面又重新回来发信息了,需要重新将用户和session添加字典中usernameAndSessionMap.put(username, session);}// 将json字符串转化为json对象JSONObject obj = JSON.parseObject(message);String status = (String) obj.get("status");// 获取消息的内容String text = (String) obj.get("text");// 查看消息要发送给哪个用户String to = (String) obj.get("to");String fromToKey = username + "-" + to;String toFromKey = to + "-" + username;if (status != null) {if (status.equals("start")) {fromToMap.put(fromToKey, 1);} else if (status.equals("end")) {System.out.println("移除销毁的fromToKey:" + fromToKey);fromToMap.remove(fromToKey);} else if (status.equals("ping")) {// 更新用户对应的时间戳
//                usernameAndTimeStampMap.put(username, System.currentTimeMillis());}} else {// 封装数据发送给消息队列Chat chat = new Chat();chat.setFromWho(username);chat.setToWho(to);chat.setContent(text);chat.setIsRead(0);//        chat.setPicUrl("");// 根据to来获取相应的session,然后通过session将消息内容转发给相应的用户Session toSession = usernameAndSessionMap.get(to);if (toSession != null) {JSONObject jsonObject = new JSONObject();// 设置消息来源的用户名jsonObject.put("from", username);// 设置消息内容jsonObject.put("text", text);// 服务端发送消息给目标客户端this.sendMessage(jsonObject.toString(), toSession);log.info("发送消息给用户 {} ,消息内容是:{} ", toSession, jsonObject.toString());if (fromToMap.containsKey(toFromKey)) {chat.setIsRead(1);}} else {log.info("发送失败,未找到用户 {} 的session", to);}rabbitTemplate.convertAndSend(RabbitMqConstant.CHAT_STORAGE_EXCHANGE, RabbitMqConstant.CHAT_STORAGE_ROUTER_KEY, chat);}}}

RabbitMQ引入

为什么使用消息队列

在用户之间进行聊天的时候,需要将用户的聊天数据存储到数据库中,但是如果大量用户同时在线的话,可能同一时间发送的消息数量太多,如果同时将这些消息存储到数据库中,会给数据库带来较大的压力,使用RabbitMQ可以先把要存储的数据放到消息队列,然后数据库服务器压力没这么大的时候,就会从消息队列中获取数据来存储,这样可以分散数据库的压力。但是如果用户是直接从数据库获取消息的话,消息可能有一定的延迟,如果用户之间正在聊天的话,消息则不会延迟,因为聊天内容会立刻通过WebSocket发送给对方。

依赖

<!-- rabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

启动类添加注解

在启动类的上方添加@EnableRabbit注解
在这里插入图片描述

常量类

因为有多处会使用到队列命名等信息,创建一个常量类来保存相关信息

package com.shm.constant;public class RabbitMqConstant {public static final String CHAT_STORAGE_QUEUE = "shm.chat-storage.queue";public static final String CHAT_STORAGE_EXCHANGE = "shm.chat-storage-event-exchange";public static final String CHAT_STORAGE_ROUTER_KEY = "shm.chat-storage.register";
}

使用配置类创建队列、交换机、绑定关系

package com.shm.config;import com.shm.constant.RabbitMqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MyRabbitConfig {/*** 使用JSON序列化机制,进行消息转换* @return*/@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}/*** 私信存储队列** @return*/@Beanpublic Queue chatStorageQueue() {Queue queue = new Queue(RabbitMqConstant.CHAT_STORAGE_QUEUE, true, false, false);return queue;}/*** 私信存储交换机* 创建交换机,由于只需要一个队列,创建direct交换机** @return*/@Beanpublic Exchange chatStorageExchange() {//durable:持久化return new DirectExchange(RabbitMqConstant.CHAT_STORAGE_EXCHANGE, true, false);}/*** 创建私信存储 交换机和队列的绑定关系** @return*/@Beanpublic Binding chatStorageBinding() {return new Binding(RabbitMqConstant.CHAT_STORAGE_QUEUE,Binding.DestinationType.QUEUE,RabbitMqConstant.CHAT_STORAGE_EXCHANGE,RabbitMqConstant.CHAT_STORAGE_ROUTER_KEY,null);}}

消息监听器

创建一个消息监听类来监听队列的消息,然后调用相关的逻辑来处理信息,本文主要的处理是将私信内容存储到数据库中

package com.shm.listener;import com.rabbitmq.client.Channel;
import com.ruoyi.common.core.domain.entity.Chat;
import com.shm.constant.RabbitMqConstant;
import com.shm.service.IChatService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.io.IOException;@Service
/*** 注意,类上面需要RabbitListener注解*/
@RabbitListener(queues = RabbitMqConstant.CHAT_STORAGE_QUEUE)
public class ChatStorageListener {@Autowiredprivate IChatService chatService;@RabbitHandlerpublic void handleStockLockedRelease(Chat chat, Message message, Channel channel) throws IOException {try {boolean save = chatService.save(chat);//解锁成功,手动确认,消息才从MQ中删除channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {//只要有异常,拒绝消息,让消息重新返回队列,让别的消费者继续解锁channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}}

发送消息到消息队列

WebSocketServer为Websocket后端服务代码,其中的onMessage方法会接受客户端发送过来的消息,当接收到消息的时候,将消息发送给消息队列

// 封装数据发送给消息队列
Chat chat = new Chat();
chat.setFromWho(username);
chat.setToWho(to);
chat.setContent(text);
chat.setPicUrl("");
rabbitTemplate.convertAndSend(RabbitMqConstant.CHAT_STORAGE_EXCHANGE,RabbitMqConstant.CHAT_STORAGE_ROUTER_KEY,chat);

延时任务

为什么使用延时任务

为了更好地感知用户的在线状态,在用户连接了WebSocket或者发送消息之后,建立一个延时任务,如果到达了所设定的延时时间,就删除用户的Session,认为用户已经下线;如果在延时期间之内,用户发送了新消息,或者发送了心跳信号,证明该用户还处于在线状态,删除前面的延时任务,并创建新的延时任务

延时任务类

package com.shm.component.delay;import lombok.Data;
import lombok.Getter;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** @Author dam* @create 2023/8/25 15:12*/
@Getter
public class DelayTask implements Delayed {/*** 用户名*/private final String userName;/*** 任务的真正执行时间*/private final long executeTime;/*** 任务延时多久执行*/private final long expire;/*** @param expire 任务需要延时的时间*/public DelayTask(String userName, long expire) {this.userName = userName;this.executeTime = expire + System.currentTimeMillis();this.expire = expire;}/*** 根据给定的时间单位,返回与此对象关联的剩余延迟时间* * @param unit the time unit 时间单位* @return 返回剩余延迟,零值或负值表示延迟已经过去*/@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(this.executeTime - System.currentTimeMillis(), unit);}@Overridepublic int compareTo(Delayed o) {return 0;}
}

延时任务管理

package com.shm.component.delay;import com.shm.component.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;/*** @Author dam* @create 2023/8/25 15:12*/
@Component
@Slf4j
public class DelayQueueManager implements CommandLineRunner {private final DelayQueue<DelayTask> delayQueue = new DelayQueue<>();private final Map<String, DelayTask> usernameAndDelayTaskMap = new ConcurrentHashMap<>();/*** 加入到延时队列中** @param task*/public void put(DelayTask task) {// 因为一个用户只能对应一个延时任务,所以如果已经存在了延时任务,将其进行删除if (usernameAndDelayTaskMap.containsKey(task.getUserName())) {this.remove(task.getUserName());}delayQueue.put(task);usernameAndDelayTaskMap.put(task.getUserName(), task);}/*** 取消延时任务** @param username 要删除的任务的用户名* @return*/public boolean remove(String username) {DelayTask remove = usernameAndDelayTaskMap.remove(username);return delayQueue.remove(remove);}@Overridepublic void run(String... args) throws Exception {this.executeThread();}/*** 延时任务执行线程*/private void executeThread() {while (true) {try {DelayTask task = delayQueue.take();//执行任务processTask(task);} catch (InterruptedException e) {break;}}}/*** 执行延时任务** @param task*/private void processTask(DelayTask task) {// 删除该用户的session,表示用户下线WebSocketServer.usernameAndSessionMap.remove(task.getUserName());log.error("执行定时任务:{}下线", task.getUserName());}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/115821.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ZooKeeper技术内幕

文章目录 1、系统模型1.1、数据模型1.2、节点特性1.2.1、节点类型 1.3、版本——保证分布式数据原子性操作1.4、 Watcher——数据变更的通知1.5、ACL——保障数据的安全1.5.1、权限模式&#xff1a;Scheme1.5.2、授权对象&#xff1a;ID1.5.3、权限扩展体系 2、序列化与协议2.1…

ChatGPT 制作可视化柱形图突出显示第1名与最后1名

对比分析柱形图的用法。在图表中显示最大值与最小值。 像这样的动态图表的展示只需要给ChatGPT,AIGC,OpenAI 发送一个指令就可以了, 人工智能会快速的写出HTML与JS代码来实现。 请使用HTML,JS,Echarts完成一个对比分析柱形图,在图表中突出显示第1名和最后1名用单独一种不…

Pytest参数详解 — 基于命令行模式

1、--collect-only 查看在给定的配置下哪些测试用例会被执行 2、-k 使用表达式来指定希望运行的测试用例。如果测试名是唯一的或者多个测试名的前缀或者后缀相同&#xff0c;可以使用表达式来快速定位&#xff0c;例如&#xff1a; 命令行-k参数.png 3、-m 标记&#xff0…

h5 ws 客户端 监听ws服务器广播的信息

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>AI智能写作</title><!-- Bootstrap CSS --><meta charset"utf-8"><meta name"viewport" content"widt…

划分字母区间【贪心算法】

划分字母区间 给你一个字符串 s 。我们要把这个字符串划分为尽可能多的片段&#xff0c;同一字母最多出现在一个片段中。 注意&#xff0c;划分结果需要满足&#xff1a;将所有划分结果按顺序连接&#xff0c;得到的字符串仍然是 s 。返回一个表示每个字符串片段的长度的列表。…

外包干了2个月,技术退步明显...

先说一下自己的情况&#xff0c;大专生&#xff0c;18年通过校招进入湖南某软件公司&#xff0c;干了接近4年的功能测试&#xff0c;今年年初&#xff0c;感觉自己不能够在这样下去了&#xff0c;长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了四年的功能测试…

【Hadoop】Hadoop入门概念简介

&#x1f341; 博主 "开着拖拉机回家"带您 Go to New World.✨&#x1f341; &#x1f984; 个人主页——&#x1f390;开着拖拉机回家_Linux,Java基础学习,大数据运维-CSDN博客 &#x1f390;✨&#x1f341; &#x1fa81;&#x1f341; 希望本文能够给您带来一定的…

Unreal5(虚幻5)学习记录 快捷键

虚幻5学习记录 快捷键 世界场景中漫游&#xff08;镜头移动): 按住鼠标右键 键盘的W(前) S(后) A(左) D(右) E(上) Q(下)键 透视 透视 ALTG 上部分 ALTJ 底视图ALTSHIFTJ 左视图 ALTK 右视图 ALTSHIFTK 前视图 ALTH 后视图 ALTSHIFTH 内容浏览器 Ctrl Space 内容浏览器…

Kind创建本地环境安装Ingress

目录 1.K8s什么要使用Ingress 2.在本地K8s集群安装Nginx Ingress controller 2.1.使用Kind创建本地集群 2.1.1.创建kind配置文件 2.1.2.执行创建命令 2.2.找到和当前k8s版本匹配的Ingress版本 2.2.1.查看当前的K8s版本 2.2.2.在官网中找到对应的合适版本 2.3.按照版本安…

/bin/bash: Resource temporarily unavailable

有现场反馈plsql无法连接数据库了&#xff0c;登录环境查看时发现从root切换到grid时报错/bin/bash: Resource temporarily unavailable [rootdb1 ~]# su - grid Last login: Thu Jul 27 18:45:04 CST 2023 su: failed to execute /bin/bash: Resource temporarily unavailab…

【pyinstaller 怎么打包python,打包后程序闪退 不打日志 找不到自建模块等问题的踩坑解决】

程序打包踩坑解决的所有问题 问题1 多个目录怎么打包 不管你包含多个层目录&#xff0c;引用多么复杂&#xff0c;只需要打包主程序所在文件即可&#xff0c;pyinstaller会自动寻找依赖包&#xff0c;如果报错自建模块找不到&#xff0c;参照问题3 pyinstaller main.py问题2…

【文心一言大模型插件制作初体验】制作面试错题本大模型插件

文心一言插件开发初体验 效果图 注意&#xff1a;目前插件仅支持在本地运行&#xff0c;虽然只能自用&#xff0c;但仍然是一个不错的选择。&#xff08;什么&#xff1f;你说没有用&#xff1f;这不可能&#xff01;文心一言app可以支持语音&#xff0c;网页端结合手机端就可…

Linux安装JenkinsCLI

项目简介安装目录 mkdir -p /opt/jenkinscli && cd /opt/jenkinscli JenkinsCLI下载 wget http://<your-jenkins-server>/jnlpJars/jenkins-cli.jar # <your-jenkins-server> 替换为你的 Jenkins 服务器地址 JenkinsCLI授权 Dashboard-->Configure Glob…

Flink 如何定位反压节点?

分析&回答 Flink Web UI 自带的反压监控 —— 直接方式 Flink Web UI 的反压监控提供了 Subtask 级别的反压监控。监控的原理是通过Thread.getStackTrace() 采集在 TaskManager 上正在运行的所有线程&#xff0c;收集在缓冲区请求中阻塞的线程数&#xff08;意味着下游阻…

解决window安装docker报错问题

第一次打开Docker Desktop后提示错误 试了网上版本都没用&#xff0c;后面发现是电脑没有下载相关虚拟机&#xff1a; 先点击链接下载wsl2&#xff0c;下载后命令行执行&#xff1a;dism.exe /online /enable-feature /featurename:Microsoft-Windows-Subsystem-Linux /all /…

盘点狼人杀中的强神与弱神 并评价操作体验

最初 强神是大家对猎人的称呼&#xff0c;但随着板子的增加 强神渐渐变成了强神神牌的统称。 狼人杀发展至今板子已经非常多了&#xff0c;而每个板子都会有不同的角色。 相同的是 大部分都会希望拿到一张强力神牌&#xff0c;这样能大大提高我们玩家的游戏体验&#xff0c;但其…

电脑报错vcomp100.dll丢失怎样修复,多个解决方法分享

今天&#xff0c;我想和大家分享一下关于vcomp100.dll丢失修复的经验。在我们的日常生活中&#xff0c;电脑出现问题是在所难免的&#xff0c;而vcomp100.dll文件丢失的问题也是很常见的。那么&#xff0c;当遇到这个问题时&#xff0c;我们应该如何进行修复呢&#xff1f;接下…

前端是leyui后端sqlserver和maraDB进行分页

项目场景&#xff1a; 前端是leyui后端sqlserver和maraDB进行分页,两种数据库在后端分页的不同写法 解决方案&#xff1a; 前端: 定义table,表格的格式在接口返回时进行创建,根据id进行绑定 <div class"layui-tab-item layui-show" style"padding-top: 10…

论文阅读_变分自编码器_VAE

英文名称: Auto-Encoding Variational Bayes 中文名称: 自编码变分贝叶斯 论文地址: http://arxiv.org/abs/1312.6114 时间: 2013 作者: Diederik P. Kingma, 阿姆斯特丹大学 引用量: 24840 1 读后感 VAE 变分自编码&#xff08;Variational Autoencoder&#xff09;是一种生…

elementUI中的table动态表单记录

form表单与table一起使用 之前一直以为form表单是单独使用&#xff0c;现在联动起来发现只是套了一层外壳&#xff0c;并不是很麻烦的事情 form的单独使用 <el-form :model"ruleForm" status-icon :rules"rules" ref"ruleForm" label-widt…