防止推送消息乱码
import org.jetbrains.annotations.NotNull;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.nio.charset.StandardCharsets;/*** @Description 防止中文乱码* @Author WangKun* @Date 2024/7/30 11:22* @Version*/
public class SseUTF8 extends SseEmitter {public SseUTF8(Long timeout) {super(timeout);}@Overrideprotected void extendResponse(@NotNull ServerHttpResponse outputMessage) {super.extendResponse(outputMessage);HttpHeaders headers = outputMessage.getHeaders();headers.setContentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8));}}
SseEmitter工具类
import com.harmonywisdom.enums.ResultCode;
import com.harmonywisdom.utils.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.lang.reflect.Field;
import java.io.IOException;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @Description SSE消息推送工具* @Author WangKun* @Date 2024/7/29 14:55* @Version*/
@Component
@Slf4j
public class SseUtils {/*** 容器*/private static final ConcurrentHashMap<String, SseUTF8> SSE_MAP_CACHE = new ConcurrentHashMap<>(0);/*** 默认时长不过期(默认30s)*/private static final long DEFAULT_TIMEOUT = 0L;/*** @param userId* @Description 创建连接* @Throws* @Return SseUTF8* @Date 2024-07-29 15:01:58* @Author WangKun**/public static SseUTF8 createConnect(String userId) {SseUTF8 sseEmitter = new SseUTF8(DEFAULT_TIMEOUT);// 需要给客户端推送IDif (SSE_MAP_CACHE.containsKey(userId)) {remove(userId);}// 长链接完成后回调接口(关闭连接时调用)sseEmitter.onCompletion(() -> {log.info("SSE连接结束:{}", userId);remove(userId);});// 连接超时回调sseEmitter.onTimeout(() -> {log.error("SSE连接超时:{}", userId);remove(userId);});// 连接异常时,回调方法sseEmitter.onError(throwable -> {try {log.info("SSE{}连接异常,{}", userId, throwable.toString());sseEmitter.send(SseUTF8.event().id(userId).name("发生异常!").data("发生异常重试!").reconnectTime(3000));SSE_MAP_CACHE.put(userId, sseEmitter);} catch (IOException e) {log.error("用户--->{} SSE连接失败重试,异常信息--->{}", userId, e.getMessage());e.printStackTrace();}});SSE_MAP_CACHE.put(userId, sseEmitter);try {// 注册成功返回用户信息sseEmitter.send(SseUTF8.event().id(String.valueOf(ResultCode.CONNECT_SUCCESS.getCode())).data(userId, MediaType.APPLICATION_JSON));} catch (IOException e) {log.error("用户--->{} SSE连接失败,异常信息--->{}", userId, e.getMessage());}return sseEmitter;}/*** @param userId* @Description 移除用户连接* @Throws* @Return void* @Date 2024-07-29 15:07:03* @Author WangKun**/private static void remove(String userId) {SSE_MAP_CACHE.remove(userId);log.info("SSE移除用户连接--->{} ", userId);}/*** @param userId* @Description 关闭连接* @Throws* @Return void* @Date 2024-07-29 15:38:16* @Author WangKun**/public static void closeConnect(String userId) {SseUTF8 sseEmitter = SSE_MAP_CACHE.get(userId);if (sseEmitter != null) {sseEmitter.complete();log.info("SSE关闭连接:{}", userId);remove(userId);}}/*** @param userId* @param message* @param sseEmitter* @Description 推送消息到客户端* @Throws* @Return void* @Date 2024-07-29 15:48:19* @Author WangKun**/private static boolean sendMsgToClient(String userId, String message, SseUTF8 sseEmitter) {// 推送之前检测心态是否存在boolean isAlive = checkSseConnectAlive(sseEmitter);if (!isAlive) {// 失去连接移除log.error("SSE推送消息失败:客户端{}未创建长链接或者关闭,失败消息:{}", userId, message);SSE_MAP_CACHE.remove(userId);return false;}SseUTF8.SseEventBuilder sendData = SseUTF8.event().id(String.valueOf(ResultCode.CONNECT_SUCCESS.getCode())).data(message, MediaType.APPLICATION_JSON);try {sseEmitter.send(sendData);return true;} catch (IOException e) {log.error("推送消息失败:{}", message);}return true;}/*** @param sseEmitter* @Description 检测连接心跳* @Throws* @Return boolean* @Date 2024-07-30 17:27:32* @Author WangKun**/public static boolean checkSseConnectAlive(SseUTF8 sseEmitter) {if (sseEmitter == null) {return false;}// 返回true代表还连接, 返回false代表失去连接return !(Boolean) getField(sseEmitter, sseEmitter.getClass(), "sendFailed") &&!(Boolean) getField(sseEmitter, sseEmitter.getClass(), "complete");}/*** @param obj* @param clazz* @param fieldName* @Description 反射获取 sendFailed complete* @Throws* @Return java.lang.Object* @Date 2024-07-30 17:27:49* @Author WangKun**/public static Object getField(Object obj, Class<?> clazz, String fieldName) {for (; clazz != Object.class; clazz = clazz.getSuperclass()) {try {Field field;field = clazz.getDeclaredField(fieldName);field.setAccessible(true);return field.get(obj);} catch (Exception ignored) {}}return null;}/*** @param msg* @Description 发送消息给所有客户端* @Throws* @Return void* @Date 2024-07-29 15:48:40* @Author WangKun**/public static boolean sendTextMessage(String msg) {if (SSE_MAP_CACHE.isEmpty()) {return false;}if (StringUtils.isEmpty(msg) || StringUtils.isBlank(msg)) {return false;}boolean isSuccess = false;for (Map.Entry<String, SseUTF8> entry : SSE_MAP_CACHE.entrySet()) {isSuccess = sendMsgToClient(entry.getKey(), msg, entry.getValue());if (!isSuccess) {log.error("群发客户端{}消息推送,失败消息:{}", entry.getKey(), msg);}}return isSuccess;}/*** @param clientId* @param msg* @Description 给指定客户端发送消息* @Throws* @Return Boolean* @Date 2024-07-29 15:51:30* @Author WangKun**/public static boolean sendTextMessage(String clientId, String msg) {return sendMsgToClient(clientId, msg, SSE_MAP_CACHE.get(clientId));}/*** @Description 检测客户端心跳(连接状态,给客户端发送信息,如果sendFailed,complete返回false 移除客户端,说明客户端关闭)* @param* @Throws* @Return void* @Date 2024-07-31 16:22:25* @Author WangKun**/@Async("threadPoolExecutor")@Scheduled(cron = "0 0/15 * * * ?")public void checkSseAlive() {log.info("检测客户端连接状态");sendTextMessage("LIVE");}}
使用方法
/*** @Description 测试sse* @Author WangKun* @Date 2024/7/29 15:56* @Version*/
@RequiredArgsConstructor
@RestController
@RequestMapping("/api/sse")
public class TestSSEController {@Log("测试SSE消息连接")@AnonymousPostMapping(value = "/connect")public SseUTF8 connect(@RequestParam String userId) {return SseUtils.createConnect(userId);}@Log("测试SSE消息推送")@AnonymousPostMapping(value = "/send")public ResponseResult<Boolean> send(@RequestParam String userId, @RequestParam String param) {boolean flag = SseUtils.sendTextMessage(userId, param);if (!flag) {return ResponseResult.error(ResultCode.UNAVAILABLE_FOR_LEGAL_REASONS.getCode(), ResultCode.UNAVAILABLE_FOR_LEGAL_REASONS.getMsg());}return ResponseResult.success(true, ResultCode.OK.getCode());}@Log("测试SSE所有客户端消息推送")@AnonymousPostMapping(value = "/sendAll")public ResponseResult<Boolean> sendAll(@RequestParam String param) {boolean flag = SseUtils.sendTextMessage(param);if (!flag) {return ResponseResult.error(ResultCode.UNAVAILABLE_FOR_LEGAL_REASONS.getCode(), ResultCode.UNAVAILABLE_FOR_LEGAL_REASONS.getMsg());}return ResponseResult.success(true, ResultCode.OK.getCode());}@Log("测试SSE消息关闭")@AnonymousPostMapping(value = "/close")public ResponseResult<String> close(@RequestParam String userId) {SseUtils.closeConnect(userId);return ResponseResult.success(ResultCode.OK.getCode());}
}
简要说明
1:
推送消息之前,检测推送得到客户端是否存在,不存在,直接移除,避免浪费。
前端关闭浏览器或者关闭界面要调用关闭接口,将其关闭。
结果:
连接
推送:
给admin的客户端推送
给全部客户端推送