注:开发环境服务器:172.16.8.35 用develop用户登录。
-
进入canal安装目录:
cd /home/devops/canal/deployer/conf
-
复制dst_vehicle目录,重命名为自己服务的数据库名
cp -r dst_vehicle/ dst_bill
-
进入dst_bill目录,删除
h2.mv.db
和meta.dat
两个文件 -
编辑
instance.properties
文件,如图:
canal.instance.filter.regex=dst_db_bill\\.(bill_base_master_info)
canal.mq.dynamicTopic=dst_db_bill:(dst_db_bill\\..*)
5. canal配置完毕,开始整合代码
6. 绑定MQ交换机、路由、队列关系
@Slf4j
@Configuration
public class BillCanalMqBindConfig {@Value(value = "-${spring.profiles.active}")private String profile;public final static String DST_CANAL_EXCHANGE = "dst-canal-exchange";private final static String DST_DB_BILL_CANAL_QUEUE = "dst-db-bill-canal-queue";private final static String DST_DB_BILL_CANAL_KEY = "dst_db_bill";/*************************** 初始化交换机 ***************************/@Beanpublic TopicExchange dstCanalExchange() {TopicExchange topicExchange = new TopicExchange(DST_CANAL_EXCHANGE + profile);log.info("交换机:[{}]初始化成功................", topicExchange.getName());return topicExchange;}/*************************** 初始化消息队列 ***************************/@Beanpublic Queue dstBillCanalQueue() {Queue queue = new Queue(DST_DB_BILL_CANAL_QUEUE + profile, true);log.info("队列:[{}]初始化成功................", queue.getName());return queue;}/*************************** 绑定交换机 ***************************/@Beanpublic Binding dstBillCanalRouting() {Binding binding = BindingBuilder.bind(dstBillCanalQueue()).to(dstCanalExchange()).with(DST_DB_BILL_CANAL_KEY);log.info("消息队列:[{}] 成功绑定到交换机:[{}] ................", dstBillCanalQueue().getName(), dstCanalExchange().getName());return binding;}
}
7. 新增消费者,监听队列,打印消息
@RabbitListener(queuesToDeclare = {@Queue("dst-db-bill-canal-queue-dev")}, containerFactory = "defaultRabbitListenerContainerFactory")
public void canalListener(Channel channel, Message message) throws IOException {String msg = new String(message.getBody(), StandardCharsets.UTF_8);log.info("监听到canal数据更改消息:{}", msg);
}
8. 修改刚刚在canal里面配置好的表,消费者正常监听到消息,消费正常
9. 结束