文章目录 1、kafka确保消息不丢失? 1.1、生产者端确保消息不丢失 1.2、kafka服务端确保消息不丢失 1.3、消费者确保正确无误的消费 2、生产者发送消息 KafkaService 3、UserInfoServiceImpl -> login() 4、service-account - > AccountListener.java
1、kafka确保消息不丢失?
1.1、生产者端确保消息不丢失
发送模式:发后即忘、同步阻塞确认、异步非阻塞确认 生产者acks模式:props.put(“acks”, “all”)、acks: all(-1) 配置重试:props.put(“retries”, 3)、retries: 3
1.2、kafka服务端确保消息不丢失
kafka是文件型的消息中间件,不会单纯的因为服务器宕机导致消息丢失 消息的log日志文件损坏:搭建kafka集群(副本)
1.3、消费者确保正确无误的消费
偏移量提交 自动提交:enable-auto-commit: true 手动提交:ack-mode: manual_immediate:同步提交 异步提交(推荐) 偏移量重置: auto-offset-reset: earliest -> 如果有偏移量则继续消费,如果偏移量没了,从头重新进行消费,可能会存在幂等性问题 auto-offset-reset: latest -> 如果有偏移量则继续消费,如果偏移量不存在,只消费新消息,旧消息没消费完就丢掉了 auto-offset-reset: none -> 如果有偏移量则继续消费,如果偏移量不存在,抛出异常 消费者重试:重试主题和死信主题, @RetryableTopic()
2、生产者发送消息 KafkaService
package com. atguigu. tingshu. common. service ; import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. kafka. core. KafkaTemplate ;
import org. springframework. kafka. support. SendResult ;
import org. springframework. stereotype. Service ; import java. util. concurrent. CompletableFuture ; @Service
public class KafkaService { private static final Logger logger = LoggerFactory . getLogger ( KafkaService . class ) ; @Autowired private KafkaTemplate kafkaTemplate; public void sendMsg ( String topic, String msg) { this . sendMsg ( topic, null , null , msg) ; } public void sendMsg ( String topic, Integer partition, String key, String msg) { CompletableFuture < SendResult > future = this . kafkaTemplate. send ( topic, partition, key, msg) ; future. whenCompleteAsync ( ( result, ex) -> { if ( ex != null ) { logger. error ( "生产者发送消息失败!原因:{}" , ex. getMessage ( ) ) ; } } ) ; } }
whenCompleteAsync:异步完成时的处理、当异步操作完成时
3、UserInfoServiceImpl -> login()
此时 service-user 是生产者 发送消息
@Slf4j
@Service
@SuppressWarnings ( { "unchecked" , "rawtypes" } )
public class UserInfoServiceImpl extends ServiceImpl < UserInfoMapper , UserInfo > implements UserInfoService { @Autowired private WxMaService wxMaService; @Autowired private RedisTemplate redisTemplate; @Autowired private UserAccountFeignClient userAccountFeignClient; @Autowired private KafkaService kafkaService; @Override public Map < String , Object > login ( String code) { HashMap < String , Object > map = new HashMap < > ( ) ; try { WxMaJscode2SessionResult sessionInfo = this . wxMaService. getUserService ( ) . getSessionInfo ( code) ; String openid = sessionInfo. getOpenid ( ) ; UserInfo userInfo = this . getOne ( new LambdaQueryWrapper < UserInfo > ( ) . eq ( UserInfo :: getWxOpenId , openid) ) ; if ( userInfo == null ) { userInfo = new UserInfo ( ) ; userInfo. setWxOpenId ( openid) ; userInfo. setNickname ( "这家伙太懒" + IdWorker . getIdStr ( ) ) ; userInfo. setAvatarUrl ( "https://img0.baidu.com/it/u=1633409170,3159960019&fm=253&fmt=auto&app=138&f=JPEG?w=500&h=500" ) ; this . save ( userInfo) ; this . kafkaService. sendMsg ( KafkaConstant . QUEUE_USER_REGISTER , userInfo. getId ( ) . toString ( ) ) ; } String token = UUID . randomUUID ( ) . toString ( ) ; UserInfoVo userInfoVo = new UserInfoVo ( ) ; BeanUtils . copyProperties ( userInfo, userInfoVo) ; this . redisTemplate. opsForValue ( ) . set ( RedisConstant . USER_LOGIN_KEY_PREFIX + token, userInfoVo, RedisConstant . USER_LOGIN_KEY_TIMEOUT , TimeUnit . SECONDS ) ; map. put ( "token" , token) ; return map; } catch ( WxErrorException e) { throw new GuiguException ( ResultCodeEnum . LOGIN_AUTH ) ; } } }
4、service-account - > AccountListener.java
此时 service-account 是消费者 接收消息
@Slf4j
@Component
public class AccountListener { @Autowired private UserAccountService userAccountService; @RetryableTopic ( backoff = @Backoff ( 2000 ) ) @KafkaListener ( topics = KafkaConstant . QUEUE_USER_REGISTER ) public void listen ( String userId, Acknowledgment ack) { if ( StringUtils . isBlank ( userId) ) { ack. acknowledge ( ) ; return ; } this . userAccountService. saveAccount ( Long . valueOf ( userId) ) ; ack. acknowledge ( ) ; }
}