SpringBoot+Canal(监听MySQL的binlog)+RabbitMQ(处理保存变更记录)
在SpringBoot中采用一种与业务代码解耦合的方式,来实现数据的变更记录,记录的内容是新数据,如果是更新操作还得有旧数据内容。
使用Canal来监听MySQL的binlog变化可以实现这个需求,可是在监听到变化后需要马上保存变更记录,除非再做一些逻辑处理,于是又结合了RabbitMQ来处理保存变更记录的操作。
- 启动MySQL环境,并开启binlog
- 启动Canal环境,为其创建一个MySQL账号,然后以Slave的形式连接MySQL
- Canal服务模式设为TCP,用Java编写客户端代码,监听MySQL的binlog修改
- Canal服务模式设为RabbitMQ,启动RabbitMQ环境,配置Canal和RabbitMQ的连接,用消息队列去接收binlog修改事件
预先在model实体中准备
短信实体
@Data
@ApiModel(description = "短信实体")
public class MsmVo{@ApiModelProperty(value="phone")private String phone;@ApiModelProperty(value = "短信模板code")private String templateCode;@ApiModelProperty(value="短信模板参数")private Map<String,Object> param;
}
排班实体
@Data
@ApiModel(description = "OrderMqVo")
public class OrderMqVo{@ApiModelProperty(value="可预约数")private Integer reserverdNumber@ApiModelProperty(value = "剩余预约数")private Integer availableNumber;@ApiModelProperty(value = "排班id")private String scheduleId;@ApiModelProperty(value = "短信实体")private MsmVo msmVo;
}
一、安装RabbitMQ
docker pull rabbitmq:nanagemnet
docker run -d -p 5672:5672 -p 12672:15672 --name rabbitmq rabbitmq:nanagement
访问:http://IP:15672
二、rabbit-util模块封装
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId>
</dependency>
创建一个RabbitService用来发送消息
@Service
public class RabbitService{@Autowiredprivate RabbitTemplate rabbitTemplate;//发送消息public boolean sendMessage(String exchange,String routingKey,Object message){rabbitTemplate.convertAndSend(exchange,routingKey,message);return true;}
}
创建mq消息转化器
@Configuration
public class MQConfig{@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}
添加常量配置类
public class MqConst{//预约下单public static final String EXCHANGE_DIRECT_ORDER = "exchange.direct.order";public static final String ROUTING_ORDER = "order";//队列public static final String QUEUE_ORDER = "queue.order";//短信public static final String EXCHANGE_DIRECT_MSM = "exchange.direct.msm";public static final String ROUTING_MSM_ITEM = "msm.item";pulib static final String Queue_MSM_item = "queue.msm.item";
}
三、短信模块service-sms
将二中的模块依赖引入
<dependency><groupId>com.michael</groupId><artifactId>rabbit_util</artifactId><version>xxx</version>
</dependency>
配置文件application.properties
spring.rabbitmq.host=192.168.44.168
spring.rabbitmq.port=5672
spring.rabbit.uername=guest
spring.rabbitmq.password=guest
Service发送消息
public interface MsmService{//发送手机验证码boolean send(String phone,String code);//MQ使用发送短信的接口boolean send(MsmVo msmVo);
}
@Service
public class MsmServiceImpl implements MsmService{@Overridepublic boolean send(String phone,String code){//判断手机号是否为空if(StringUtils.isEmpty(phone)){return false;}//整合阿里云相关参数,短信服务DefaultProfile profile = DefaultProfile.getProfile(ConstantPropertiesUtils.REGION_Id,ConstantPropertiesUtils.ACCESS_KEY_ID,ConstantPropertiesUtils.SECRET);IAcsClient client = new DefaultAcsClient(profile);CommonRequest request = new CommonRequest();request.setMethod(MethodType.POST);request.setDomain("dysmsapi.aliyuncs.com");request.setVersion("2018-08-08");request.setAction("SendSms");//手机号request.putQueryParameter("PhoneNumbers",phone);//签名名称request.putQueryParameter("SignName","我的网站");//模板request.putQueryParameter("TemplateCode","SMS_180051135");//验证码使用json格式{"code":"123456"}Map<String,Object> param = new HashMap();param.put("code",code);request.putQueryParameter("TemplateParam",JSONObject.toJSONString(param));//调用方法进行短信发送try{CommonResponse response = client.getCommonResponse(request);System.out.println(response.getData());return response.getHttpResponse().isSuccess();}catch(ServerException e){e.printStackTrace();}catch(ClientException e){e.printStackTrace();}return false;}@Overridepublic boolean send(MsmVo msmVo){if(!StringUtils.isEmpty(msmVO.getPhone())){String code = (String)msmVo.getParam().get("code");boolean isSend = this.send(msmVo.getPhone(),code);return isSend;}return false;}
}
创建mq监控器
@Component
public class MsmReceiver{@Autowiredprivate MsmService msmService;//监听@RabbitListener(bindings = @QueueBinding(value = @Queue(value = MqConst.QUEUE_MSM_ITEM,durable = "true"),exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_MSM),key = {MqConst.ROUTING_MSM_ITEM}))public void send(MsmVo msmVo,Message message,Channel channel){msmService.ssend(msmVo);}
}
四、业务类
生成订单之后,发送短信并更新数量
①、业务模块中引入依赖
rabbit-util
②、添加配置
spring.rabbitmq.host=192.168.44.165
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
③、service接口以及实现类
@Override
public void update(Schedule schedule){schedule.setUpdata(new Date());scheduleRepository.save(schedule);
}
④、receiver包中创建MQ监听器
@Component
public class HospitalReceiver{@Autowiredprivate ScheduleService scheduleService;@Autowiredprivate RabbitService rabbitService;//监听@RabbitListener(bindings = @QueueBinding(value = @Queue(value = MqConst.QUEUE_ORDER,durable = "true"),exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_ORDER),key = {MqConst.ROUTING_ORDER}))public void receiver(OrderMqVo orderMqVo,Message message,Channel channle) throws IOException{//下单成功,更新数据Schedule schedule = scheduleService.getScheduleId(orderMqVo.getScheduleId());schedule.setReservedNumber(orderMqVo.getReservedNumber());schedule.setAvailableNumber(orderMqVo.getAvailableNumber);scheduleService.update(schedule);//发送短信MsmVo msmVo = orderMqVo.getMsmVo();if(null != msmVo){rebbitService.sendMessage(MqConst.QUEUE_MSM_ITEM,MqConst.ROUTING_MSM_ITEM,msmVo);}}
}