文章目录 1、发送消息 KafkaService 2、生产者 service-album -> AlbumInfoServiceImpl 2.1、新增 saveAlbumInfo() 2.2、更新 updateAlbumInfo() 2.3、删除 removeAlbumInfo() 3、消费者 service-search - > AlbumListener.java
新增:如果是公开的
专辑则发送消息给kafka,search通过监听器获取消息同步
新增数据 更新:如果是公开的
专辑则发送消息给kafka,search通过监听器获取消息同步
更新数据 如果是私有的
专辑则发送消息给kafka,search通过监听器获取消息es删除
数据 删除:发送消息给kafka,search通过监听器获取消息es删除数据
1、发送消息 KafkaService
package com. atguigu. tingshu. common. service ; import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. kafka. core. KafkaTemplate ;
import org. springframework. kafka. support. SendResult ;
import org. springframework. stereotype. Service ; import java. util. concurrent. CompletableFuture ; @Service
public class KafkaService { private static final Logger logger = LoggerFactory . getLogger ( KafkaService . class ) ; @Autowired private KafkaTemplate kafkaTemplate; public void sendMsg ( String topic, String msg) { this . sendMsg ( topic, null , null , msg) ; } public void sendMsg ( String topic, Integer partition, String key, String msg) { CompletableFuture < SendResult > future = this . kafkaTemplate. send ( topic, partition, key, msg) ; future. whenCompleteAsync ( ( result, ex) -> { if ( ex != null ) { logger. error ( "生产者发送消息失败!原因:{}" , ex. getMessage ( ) ) ; } } ) ; } }
whenCompleteAsync:异步完成时的处理、当异步操作完成时
2、生产者 service-album -> AlbumInfoServiceImpl
2.1、新增 saveAlbumInfo()
新增:如果是公开的
专辑则发送消息给kafka,search通过监听器获取消息同步
新增数据
@Slf4j
@Service
@SuppressWarnings ( { "unchecked" , "rawtypes" } )
public class AlbumInfoServiceImpl extends ServiceImpl < AlbumInfoMapper , AlbumInfo > implements AlbumInfoService { @Autowired private AlbumAttributeValueMapper attributeValueMapper; @Autowired private AlbumStatService albumStatService; @Autowired private KafkaService kafkaService; @Transactional ( rollbackFor = Exception . class ) @Override public void saveAlbumInfo ( AlbumInfoVo albumInfoVo) throws FileNotFoundException { AlbumInfo albumInfo = new AlbumInfo ( ) ; BeanUtils . copyProperties ( albumInfoVo, albumInfo) ; Long userId = AuthContextHolder . getUserId ( ) ; albumInfo. setUserId ( userId == null ? 1 : userId) ; albumInfo. setTracksForFree ( 5 ) ; albumInfo. setSecondsForFree ( 30 ) ; albumInfo. setStatus ( SystemConstant . ALBUM_STATUS_PASS ) ; this . save ( albumInfo) ; Long albumInfoId = albumInfo. getId ( ) ; List < AlbumAttributeValueVo > albumAttributeValueVoList = albumInfoVo. getAlbumAttributeValueVoList ( ) ; if ( ! CollectionUtils . isEmpty ( albumAttributeValueVoList) ) { albumAttributeValueVoList. forEach ( albumAttributeValueVo -> { AlbumAttributeValue albumAttributeValue = new AlbumAttributeValue ( ) ; BeanUtils . copyProperties ( albumAttributeValueVo, albumAttributeValue) ; albumAttributeValue. setAlbumId ( albumInfoId) ; this . attributeValueMapper. insert ( albumAttributeValue) ; } ) ; }
this . albumStatService. saveAlbumStat ( albumInfoId) ; if ( StringUtils . equals ( albumInfo. getIsOpen ( ) , "1" ) ) { this . kafkaService. sendMsg ( KafkaConstant . QUEUE_ALBUM_UPPER , albumInfoId. toString ( ) ) ; } }
}
2.2、更新 updateAlbumInfo()
更新:如果是公开的
专辑则发送消息给kafka,search通过监听器获取消息同步
更新数据 如果是私有的
专辑则发送消息给kafka,search通过监听器获取消息es删除
数据
@Slf4j
@Service
@SuppressWarnings ( { "unchecked" , "rawtypes" } )
public class AlbumInfoServiceImpl extends ServiceImpl < AlbumInfoMapper , AlbumInfo > implements AlbumInfoService { @Autowired private AlbumAttributeValueMapper attributeValueMapper; @Autowired private KafkaService kafkaService; @Transactional @Override public void updateAlbumInfo ( Long albumId, AlbumInfoVo albumInfoVo) { AlbumInfo albumInfo = new AlbumInfo ( ) ; BeanUtils . copyProperties ( albumInfoVo, albumInfo) ; albumInfo. setId ( albumId) ; this . updateById ( albumInfo) ; this . attributeValueMapper. delete ( new LambdaUpdateWrapper < AlbumAttributeValue > ( ) . eq ( AlbumAttributeValue :: getAlbumId , albumId) ) ; List < AlbumAttributeValueVo > albumAttributeValueVoList = albumInfoVo. getAlbumAttributeValueVoList ( ) ; if ( ! CollectionUtils . isEmpty ( albumAttributeValueVoList) ) { albumAttributeValueVoList. forEach ( albumAttributeValueVo -> { AlbumAttributeValue albumAttributeValue = new AlbumAttributeValue ( ) ; BeanUtils . copyProperties ( albumAttributeValueVo, albumAttributeValue) ; albumAttributeValue. setAlbumId ( albumId) ; this . attributeValueMapper. insert ( albumAttributeValue) ; } ) ; } if ( StringUtils . equals ( albumInfoVo. getIsOpen ( ) , "1" ) ) { this . kafkaService. sendMsg ( KafkaConstant . QUEUE_ALBUM_UPPER , albumId. toString ( ) ) ; } else { this . kafkaService. sendMsg ( KafkaConstant . QUEUE_ALBUM_LOWER , albumId. toString ( ) ) ; } }
}
2.3、删除 removeAlbumInfo()
删除:发送消息给kafka,search通过监听器获取消息es删除数据
@Slf4j
@Service
@SuppressWarnings ( { "unchecked" , "rawtypes" } )
public class AlbumInfoServiceImpl extends ServiceImpl < AlbumInfoMapper , AlbumInfo > implements AlbumInfoService { @Autowired private AlbumAttributeValueMapper attributeValueMapper; @Autowired private AlbumStatMapper albumStatMapper; @Autowired private KafkaService kafkaService; @Transactional @Override public void removeAlbumInfo ( Long albumId) { this . removeById ( albumId) ; this . albumStatMapper. delete ( new LambdaUpdateWrapper < AlbumStat > ( ) . eq ( AlbumStat :: getAlbumId , albumId) ) ; this . attributeValueMapper. delete ( new LambdaUpdateWrapper < AlbumAttributeValue > ( ) . eq ( AlbumAttributeValue :: getAlbumId , albumId) ) ; this . kafkaService. sendMsg ( KafkaConstant . QUEUE_ALBUM_LOWER , albumId. toString ( ) ) ; }
}
3、消费者 service-search - > AlbumListener.java
package com. atguigu. tingshu. search. listener ; @Component
public class AlbumListener { @Autowired private AlbumInfoFeignClient albumInfoFeignClient; @Autowired private UserInfoFeignClient userInfoFeignClient; @Autowired private CategoryFeignClient categoryFeignClient; @Autowired private ElasticsearchTemplate elasticsearchTemplate; @KafkaListener ( topics = KafkaConstant . QUEUE_ALBUM_UPPER ) public void upper ( String albumId) { if ( StringUtils . isBlank ( albumId) ) { return ; } Result < AlbumInfo > albumInfoResult = this . albumInfoFeignClient. getAlbumInfo ( Long . valueOf ( albumId) ) ; Assert . notNull ( albumInfoResult, "同步数据时,获取专辑信息失败!" ) ; AlbumInfo albumInfo = albumInfoResult. getData ( ) ; Assert . notNull ( albumInfo, "同步数据时,没有对应的专辑!" ) ; AlbumInfoIndex albumInfoIndex = new AlbumInfoIndex ( ) ; BeanUtils . copyProperties ( albumInfo, albumInfoIndex) ; Result < UserInfoVo > userInfoVoResult = this . userInfoFeignClient. getUserById ( albumInfo. getUserId ( ) ) ; Assert . notNull ( userInfoVoResult, "数据导入时,获取主播信息失败!" ) ; UserInfoVo userInfoVo = userInfoVoResult. getData ( ) ; if ( userInfoVo != null ) { albumInfoIndex. setAnnouncerId ( userInfoVo. getId ( ) ) ; albumInfoIndex. setAnnouncerName ( userInfoVo. getNickname ( ) ) ; } Result < BaseCategoryView > categoryResult = this . categoryFeignClient. getAllLevelCategories ( albumInfo. getCategory3Id ( ) ) ; Assert . notNull ( categoryResult, "数据导入时,获取分类信息失败!" ) ; BaseCategoryView baseCategoryView = categoryResult. getData ( ) ; if ( baseCategoryView != null ) { albumInfoIndex. setCategory1Id ( baseCategoryView. getCategory1Id ( ) ) ; albumInfoIndex. setCategory2Id ( baseCategoryView. getCategory2Id ( ) ) ; }
int playNum = ( new Random ( ) . nextInt ( 100 ) + 1 ) * 10000 ; albumInfoIndex. setPlayStatNum ( playNum) ; int subscribeNum = ( new Random ( ) . nextInt ( 100 ) + 1 ) * 10000 ; albumInfoIndex. setSubscribeStatNum ( subscribeNum) ; int buyNum = ( new Random ( ) . nextInt ( 100 ) + 1 ) * 10000 ; albumInfoIndex. setBuyStatNum ( buyNum) ; int commentNum = ( new Random ( ) . nextInt ( 100 ) + 1 ) * 10000 ; albumInfoIndex. setCommentStatNum ( commentNum) ; albumInfoIndex. setHotScore ( playNum * 0.1 + commentNum * 0.2 + subscribeNum * 0.3 + buyNum * 0.4 ) ; Result < List < AlbumAttributeValue > > albumAttributeValueResult = this . albumInfoFeignClient. getAlbumAttributeValue ( albumInfo. getId ( ) ) ; Assert . notNull ( albumAttributeValueResult, "数据导入时,获取标签及值失败!" ) ; List < AlbumAttributeValue > albumAttributeValues = albumAttributeValueResult. getData ( ) ; if ( ! CollectionUtils . isEmpty ( albumAttributeValues) ) { albumInfoIndex. setAttributeValueIndexList ( albumAttributeValues. stream ( ) . map ( albumAttributeValue -> { AttributeValueIndex attributeValueIndex = new AttributeValueIndex ( ) ; BeanUtils . copyProperties ( albumAttributeValue, attributeValueIndex) ; return attributeValueIndex; } ) . collect ( Collectors . toList ( ) ) ) ; } this . elasticsearchTemplate. save ( albumInfoIndex) ; } @KafkaListener ( topics = KafkaConstant . QUEUE_ALBUM_LOWER ) public void lower ( String albumId) { if ( StringUtils . isBlank ( albumId) ) { return ; } this . elasticsearchTemplate. delete ( albumId, AlbumInfoIndex . class ) ; }
}