文章目录
- 1、Openfire插件接收binlog数据
- 1.1、创建用户组
- 1.2、接口实现
- 2、Canal客户端开发
- 3、Smack消息客户端实现。
mysql的binlog的实时数据订阅
(1) canal安装与客户端使用
(2) openfire 4.7.5 Web插件开发
(3) 使用canal和openfire实现Mysql的实时数据订阅
业务系统每天产生的数据在5000-1万之间,数据量不大,但是订阅这些数据的用户量比较多,当前要支持100多用户,并且要求数据实时同步。产生的数据是包含类别的,用户按实际购买情况订阅自己需要的数据,但是类别总数不多,可以用户分成不同的组,同一组用户订阅相同的类别数据。
简单的构思了一下实现:
1、要实现数据实时同步,最好的方式是基于binlog日志同步,可选的读取binlog工具:canal、floinkcdc等。
2、要把数据同步到用户的数据库,可选方案:
(1)binlog数据保存到kafka,在用户的服务器上安装读取kafka的客户端,实现数据入库。但是这方案在用户管理不方便。
(2)基于netty实现数据同步的服务器端和客户端,binlog数据发送到netty服务器端,netty服务器端再通过传输协议把数据同步到netty客户端入库。但是这方案比较可行,但是开发工作量比较大。
(3)使用开源openfire即时消息服务器和客户端Smack库实现数据同步的服务器端和客户端,这方案开发工作量不大,而且Openfire自带用户和组管理,单机也支持1万用户的并发量,后期用户增长也没压力。但是需要开发人员先学习openfire相关技术,需要一定的学习时间。
本文使用方案(3)测试数据同步方案。
数据同步链路如下:
1、Openfire插件接收binlog数据
1.1、创建用户组
在openfire后台创建用户组data-group,组内添加用户test1、test2,这个组内的数据只同步给test1、test2两个用户。
1.2、接口实现
实现代码是在前文openfire 4.7.5 Web插件开发 的Jersey接口例子基础上,增加MessageService类,实现接收数据的http接口,接收来自canal客户端的sql数据,并将数据发送到Smack客户端。
package org.igniterealtime.openfire.service;import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.igniterealtime.openfire.exampleplugin.DbCdcPlugin;
import org.jivesoftware.admin.AuthCheckFilter;
import org.jivesoftware.openfire.MessageRouter;
import org.jivesoftware.openfire.XMPPServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import javax.annotation.PostConstruct;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import java.util.*;@Path("dbcdc/v1/message")
public class MessageService {private static final Logger log = LoggerFactory.getLogger(MessageService.class);private DbCdcPlugin plugin;@PostConstructpublic void init() {plugin = (DbCdcPlugin) XMPPServer.getInstance().getPluginManager().getPlugin( "dbcdc" );AuthCheckFilter.addExclude("dbcdc/v1/message/sendMsgToOne");}// http://localhost:9090/plugins/dbcdc/v1/message/sendMsgToOne?msg=aaaaabbbbbb@POST@Path("/sendMsgToOne")@Produces(MediaType.APPLICATION_JSON)@Consumes(MediaType.APPLICATION_FORM_URLENCODED)public Map<String, Object> sendMsgToOne(@FormParam("msg") String msg) throws Exception{List<String> jids = getMembers();// 给组内用户同步binlogMap<String, Object> data = new HashMap<>();data.put("msg", msg);String from = "admin@21doc.net";jids.forEach(jid->{String to = jid;Message message = new Message();message.setFrom(new JID(from));message.setTo(new JID(to));message.setID(nextID());message.setType(Message.Type.chat);message.setBody(msg);MessageRouter messageRouter = plugin.server.getMessageRouter();messageRouter.route(message);data.put(jid, "success");});return data;}private List<String> getMembers(){List<String> list = new ArrayList();try{// 测试只发给data-group用户组Collection<JID> jids = plugin.groupManager.getGroup("data-group").getMembers();for(JID jid:jids){list.add(jid.toBareJID());}}catch(Exception e){log.error("getMembers error=====",e);}return list;}public static synchronized String nextID() {Random random = new Random();int number1 = random.nextInt(899) + 100;int number2 = random.nextInt(899) + 100;return new StringBuffer().append(number1).append("-").append(number2).toString();}
}
2、Canal客户端开发
Canal客户端接收Canal服务器端传输的binlog数据,并转成对应的sql语句,再发送到Openfire服务器端分发给用户。
是在前文canal安装与客户端使用的基础上,重新实现了CanalClient类,例子只处理了删除、更新、插入三种SQL操作。
package com.penngo.canal.component;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.jsoup.Connection;
import org.jsoup.Jsoup;
import org.springframework.stereotype.Component;
import java.util.List;@Component
@Slf4j
public class CanalClient {private final static String imUrl = "http://localhost:9090/plugins/dbcdc/v1/message/sendMsgToOne";@Resourceprivate CanalConnector canalConnector;/*** canal入库方法*/public void run() {int batchSize = 1000;try {canalConnector.connect();canalConnector.subscribe("flinktest\\..*");canalConnector.rollback();try {while (true) {Message message = canalConnector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();System.out.println("batchId=======" + batchId + ",size:" + size);if (batchId == -1 || size == 0) {Thread.sleep(1000);} else {dataHandle(message.getEntries());}canalConnector.ack(batchId);}} catch (InterruptedException e) {e.printStackTrace();} catch (InvalidProtocolBufferException e) {e.printStackTrace();}} finally {canalConnector.disconnect();}}/*** 数据处理* @param entrys*/private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {for (Entry entry : entrys) {if (EntryType.ROWDATA == entry.getEntryType()) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());EventType eventType = rowChange.getEventType();if (eventType == EventType.DELETE) {genDeleteSql(entry);} else if (eventType == EventType.UPDATE) {genUpdateSql(entry);} else if (eventType == EventType.INSERT) {genInsertSql(entry);}}}}private void genUpdateSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> newColumnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");for (int i = 0; i < newColumnList.size(); i++) {sql.append(" " + newColumnList.get(i).getName()+ " = '" + newColumnList.get(i).getValue() + "'");if (i != newColumnList.size() - 1) {sql.append(",");}}sql.append(" where ");List<Column> oldColumnList = rowData.getBeforeColumnsList();for (Column column : oldColumnList) {if (column.getIsKey()) {sql.append(column.getName() + "=" + column.getValue());break;}}this.sqlToIm(sql.toString());}} catch (Exception e) {e.printStackTrace();}}private void genDeleteSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> columnList = rowData.getBeforeColumnsList();StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");for (Column column : columnList) {if (column.getIsKey()) {sql.append(column.getName() + "=" + column.getValue());break;}}this.sqlToIm(sql.toString());}} catch (Exception e) {e.printStackTrace();}}private void genInsertSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> columnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");for (int i = 0; i < columnList.size(); i++) {sql.append(columnList.get(i).getName());if (i != columnList.size() - 1) {sql.append(",");}}sql.append(") VALUES (");for (int i = 0; i < columnList.size(); i++) {sql.append("'" + columnList.get(i).getValue() + "'");if (i != columnList.size() - 1) {sql.append(",");}}sql.append(")");this.sqlToIm(sql.toString());}} catch (Exception e) {e.printStackTrace();}}/*** 同步到openfire服务器端* @param sql*/public void sqlToIm(String sql) throws Exception {String body = Jsoup.connect(imUrl).ignoreContentType(true).header("Content-Type", "application/x-www-form-urlencoded").data("msg", sql).method(Connection.Method.POST).execute().body();log.info("sqlToIm result=======" + body);}
}
3、Smack消息客户端实现。
Smack消息客户端接收来自Openfire服务器的消息,并把数据同步到数据库。实现代码
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.penngo.example</groupId><artifactId>Smack-Test</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><java.version>8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-core --><dependency><groupId>org.igniterealtime.smack</groupId><artifactId>smack-core</artifactId><version>4.4.6</version></dependency><!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-extensions --><dependency><groupId>org.igniterealtime.smack</groupId><artifactId>smack-extensions</artifactId><version>4.4.6</version></dependency><!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-im --><dependency><groupId>org.igniterealtime.smack</groupId><artifactId>smack-im</artifactId><version>4.4.6</version></dependency><!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-tcp --><dependency><groupId>org.igniterealtime.smack</groupId><artifactId>smack-tcp</artifactId><version>4.4.6</version></dependency><!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-debug --><dependency><groupId>org.igniterealtime.smack</groupId><artifactId>smack-debug</artifactId><version>4.4.6</version></dependency><!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-experimental --><dependency><groupId>org.igniterealtime.smack</groupId><artifactId>smack-experimental</artifactId><version>4.4.6</version></dependency><!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-legacy --><dependency><groupId>org.igniterealtime.smack</groupId><artifactId>smack-legacy</artifactId><version>4.4.6</version></dependency><!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-bosh --><dependency><groupId>org.igniterealtime.smack</groupId><artifactId>smack-bosh</artifactId><version>4.4.6</version></dependency><!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-resolver-minidns --><dependency><groupId>org.igniterealtime.smack</groupId><artifactId>smack-resolver-minidns</artifactId><version>4.4.6</version></dependency><!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-resolver-javax --><dependency><groupId>org.igniterealtime.smack</groupId><artifactId>smack-resolver-javax</artifactId><version>4.4.6</version></dependency><!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-resolver-dnsjava --><dependency><groupId>org.igniterealtime.smack</groupId><artifactId>smack-resolver-dnsjava</artifactId><version>4.4.6</version></dependency><!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-xmlparser-xpp3 --><dependency><groupId>org.igniterealtime.smack</groupId><artifactId>smack-xmlparser-xpp3</artifactId><version>4.4.6</version></dependency><!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-sasl-javax --><dependency><groupId>org.igniterealtime.smack</groupId><artifactId>smack-sasl-javax</artifactId><version>4.4.6</version></dependency><!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-java8 --><dependency><groupId>org.igniterealtime.smack</groupId><artifactId>smack-java8</artifactId><version>4.4.6</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version></dependency><dependency><groupId>commons-dbutils</groupId><artifactId>commons-dbutils</artifactId><version>1.6</version></dependency></dependencies><repositories><repository><id>alimaven</id><name>Maven Aliyun Mirror</name><url>https://maven.aliyun.com/repository/central</url></repository></repositories><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.22.2</version><configuration><skip>true</skip></configuration></plugin></plugins></build>
</project>
业务逻辑实现SqlAgentIM.java
package com.penngo.example;import org.jivesoftware.smack.AbstractXMPPConnection;
import org.jivesoftware.smack.ConnectionConfiguration;
import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.SmackException;import org.jivesoftware.smack.chat.Chat;
import org.jivesoftware.smack.chat.ChatManager;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Presence;
import org.jivesoftware.smack.tcp.XMPPTCPConnection;
import org.jivesoftware.smack.tcp.XMPPTCPConnectionConfiguration;
import org.jxmpp.jid.EntityBareJid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import java.sql.Connection;
import java.sql.DriverManager;public class SqlAgentIM {private static final Logger log = LoggerFactory.getLogger(SqlAgentIM.class);private String dburl = "jdbc:mysql://localhost:3306/flinktest2?serverTimezone=Asia/Shanghai&characterEncoding=utf-8&useSSL=false";private AbstractXMPPConnection conn = null;private DbDao dbDao = null;public SqlAgentIM(){}public void run(){dbDao = new DbDao(dburl, "root", "test123");login();}public void login(){try{// 打开调试窗口SmackConfiguration.DEBUG = true;XMPPTCPConnectionConfiguration config = XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword("test2", "123456").setXmppDomain("21doc.net").setHost("127.0.0.1").setPort(5222).setSecurityMode(ConnectionConfiguration.SecurityMode.disabled).build();conn = new XMPPTCPConnection(config);conn.connect().login();setOnlineStatus();ChatManager chatManager = ChatManager.getInstanceFor(conn);chatManager.addChatListener((chat, b) -> chat.addMessageListener((chat1, message) ->{String sql = message.getBody();//System.out.println("New message from " + chat1 + ": " + message.getBody());System.out.println("New message from " + chat1.getParticipant().asEntityBareJidString() + ": " + sql);try{int successCount = dbDao.execute(sql);log.info("execute result====successCount:" + successCount);}catch(Exception e){e.printStackTrace();log.error("execute error", e);}}));}catch(Exception e){e.printStackTrace();}}public void setOnlineStatus() throws SmackException.NotConnectedException, InterruptedException {Presence presence = new Presence(Presence.Type.available, "online", 1, Presence.Mode.available);conn.sendStanza(presence);Presence presence2 = new Presence(Presence.Type.subscribe, "online", 1, Presence.Mode.available);conn.sendStanza(presence2);Presence presence3 = new Presence(Presence.Type.subscribed, "online", 1, Presence.Mode.available);conn.sendStanza(presence3);}class DbDao {private String jdbc_driver;private String jdbc_url;private String jdbc_username;private String jdbc_password;private Connection conn = null;public DbDao(String url, String username, String password){if(conn == null){try{if(jdbc_driver == null){jdbc_driver = "";jdbc_url = url;jdbc_username = username;jdbc_password = password;}DbUtils.loadDriver(jdbc_driver);conn = DriverManager.getConnection(jdbc_url, jdbc_username, jdbc_password);}catch(Exception e){e.printStackTrace();}}}public int execute(String sql, Object... params) throws Exception{QueryRunner qr = new QueryRunner();int i = qr.update(conn, sql, params);return i;}public void close(){DbUtils.closeQuietly(conn);}}public static void main(String[] args) {SqlAgentIM sqlAgentIM = new SqlAgentIM();sqlAgentIM.run();}
}
本文仅实现了用户在线时的mysql实现更新,未实现内容:
- 1、客户端离线时,离线数据的同步;
- 2、用户第一次订阅数据时的数据全量更新。
后续可以根据方案采用情况再优化完善。