文章目录
- 一、目标:数据源池化技术实现
- 二、设计:数据源池化技术实现
- 三、实现:数据源池化技术实现
- 3.1 工程结构
- 3.2 数据源池化技术关系图
- 3.3 无池化链接实现
- 3.4 有池化链接实现
- 3.4.1 有连接的数据源
- 3.4.2 池化链接的代理
- 3.4.3 池状态定义
- 3.4.4 pushConnection 回收链接
- 3.4.5 popConnection 获取链接
- 3.5 数据源工厂
- 3.5.1 无池化工厂
- 3.5.2 有池化工厂
- 3.6 新增类型别名注册器
- 四、测试:数据源池化技术实现
- 4.1 配置数据源
- 4.1.1 无池化:UNPOOLED
- 4.1.2 有池化:POOLED
- 4.2 单元测试
- 4.2.1 基础测试
- 4.2.2 无池化测试结果:UNPOOLED
- 4.2.3 有池化测试结果:POOLED
- 4.3 连接池验证
- 五、总结:数据源池化技术实现
一、目标:数据源池化技术实现
💡 Mybatis 中自带的数据源实现?
- 无池化 UnpooledDataSource 实现。
- 有池化 pooledDataSource 实现,池化配置属性的理解:最大活跃连接数、空闲连接数、检测时长等。
二、设计:数据源池化技术实现
💡 池化技术理解为亨元模式的具体实现方案:对一些需要较高创建成本且高频使用的资源,需要进行缓存或者也称预热处理。
- 池化技术:把一些资源存放到一个预热池子中,需要用的时候从池子中获取,使用完毕在进行使用。
- 通过池化可以非常有效的控制资源的使用成本,包括:资源数量、空闲时长、获取方式等进行统一控制和管理
- 通过提供统一的数据池中心,存放数据源连接,并根据配置按照请求获取连接的操作,创建连接池的数据源连接数量。
- 包括:最大空闲连接和最大活跃连接,都随着创建过程被控制。
- 此外由于控制了连接池中连接的数量,所以当外部从连接池获取连接时,如果连接已满则会进行循环等待。
- 案例:使用DB连接池,如果一个 SQL 操作引起了慢查询,则会导致整个服务进入瘫痪的阶段,各个和数据库相关的接口调用,都不能获得到连接,接口查询 TP99 徒然提高。
- 那连接池可以配置的很大吗?
- 不可以. 因为连接池要和数据所分配的连接池对应上,避免应用配置连接池超过数据库所提供的连接池数量,否则会出现 夯住不能分配链接 的问题,导致数据库拖垮从而引起连锁反应.
三、实现:数据源池化技术实现
3.1 工程结构
mybatis-step-05
|-src|-main| |-java| |-com.lino.mybatis| |-binding| | |-MapperMethod.java| | |-MapperProxy.java| | |-MapperProxyFactory.java| | |-MapperRegistry.java| |-builder| | |-xml| | | |-XMLConfigBuilder.java| | |-BaseBuilder.java| |-datasource| | |-druid| | | |-DruidDataSourceFacroty.java| | |-pooled| | | |-PooledConnection.java| | | |-PooledDataSource.java| | | |-PooledDataSourceFacroty.java| | | |-PoolState.java| | |-unpooled| | | |-UnpooledDataSource.java| | | |-UnpooledDataSourceFacroty.java| | |-DataSourceFactory.java| |-io| | |-Resources.java| |-mapping| | |-BoundSql.java| | |-Environment.java| | |-MappedStatement.java| | |-ParameterMapping.java| | |-SqlCommandType.java| |-session| | |-defaults| | | |-DefaultSqlSession.java| | | |-DefaultSqlSessionFactory.java| | |-Configuration.java| | |-SqlSession.java| | |-SqlSessionFactory.java| | |-SqlSessionFactoryBuilder.java| | |-TransactionIsolationLevel.java| |-transaction| | |-jdbc| | | |-JdbcTransaction.java| | | |-JdbcTransactionFactory.java| | |-Transaction.java| | |-TransactionFactory.java| |-type| | |-JdbcType.java| | |-TypeAliasRegistry.java|-test|-java| |-com.lino.mybatis.test| |-dao| | |-IUserDao.java| |-po| | |-User.java| |-ApiTest.java|-resources|-mapper| |-User_Mapper.xml|-mybatis-config-datasource.xml
3.2 数据源池化技术关系图
- 在 Mybatis 数据源的实现中,包括两部分:
- 无池化的 UnpooledDataSource 实现类。
- 有池化的 PooledDataSource 实现类,对无池化的 UnpooledDataSource 进行扩展处理。把创建出来的链接保存到内存中,记录为空闲链接和活跃链接,在不同的阶段使用。
- PooledConnection 是对链接的代理操作,通过
invoke
方法的反射调用,对关闭的链接进行回收处理,并使用notifyAll
通知正在等待链接的用户进行抢链接。 - 对 DataSourceFactory 数据源工厂接口的实现,由无池化工厂实现后,有池化工厂继承的方式进行处理。
3.3 无池化链接实现
UnpooledDataSource.java
package com.lino.mybatis.datasource.unpooled;import javax.sql.DataSource;
import java.io.PrintWriter;
import java.sql.*;
import java.util.Enumeration;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;/*** @description: 无池化数据源实现*/
public class UnpooledDataSource implements DataSource {/*** 类加载器*/private ClassLoader driverClassLoader;/*** 驱动配置,也可以扩展属性信息:driver.encoding=UTF8*/private Properties driverProperties;/*** 驱动注册器*/private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>();/*** 驱动*/private String driver;/*** DB连接地址*/private String url;/*** 账号*/private String username;/*** 密码*/private String password;/*** 是否自动提交*/private Boolean autoCommit;/*** 事务隔离级别*/private Integer defaultTransactionIsolationLevel;static {Enumeration<Driver> drivers = DriverManager.getDrivers();while (drivers.hasMoreElements()) {Driver driver = drivers.nextElement();registeredDrivers.put(driver.getClass().getName(), driver);}}/*** 驱动代理*/private static class DriverProxy implements Driver {private Driver driver;DriverProxy(Driver driver) {this.driver = driver;}@Overridepublic Connection connect(String url, Properties info) throws SQLException {return this.driver.connect(url, info);}@Overridepublic boolean acceptsURL(String url) throws SQLException {return this.driver.acceptsURL(url);}@Overridepublic DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {return this.driver.getPropertyInfo(url, info);}@Overridepublic int getMajorVersion() {return this.driver.getMajorVersion();}@Overridepublic int getMinorVersion() {return this.driver.getMinorVersion();}@Overridepublic boolean jdbcCompliant() {return this.driver.jdbcCompliant();}@Overridepublic Logger getParentLogger() throws SQLFeatureNotSupportedException {return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);}}/*** 初始化驱动* 资料:https://www.kfu.com/~nsayer/Java/dyn-jdbc.html** @throws SQLException SQL异常*/private synchronized void initializerDriver() throws SQLException {if (!registeredDrivers.containsKey(driver)) {try {Class<?> driverType = Class.forName(driver, true, driverClassLoader);Driver driverInstance = (Driver) driverType.newInstance();DriverManager.registerDriver(new DriverProxy(driverInstance));registeredDrivers.put(driver, driverInstance);} catch (Exception e) {throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e);}}}private Connection doGetConnection(String username, String password) throws SQLException {Properties props = new Properties();if (driverProperties != null) {props.putAll(driverProperties);}if (username != null) {props.setProperty("user", username);}if (password != null) {props.setProperty("password", password);}return doGetConnection(props);}private Connection doGetConnection(Properties properties) throws SQLException {initializerDriver();Connection connection = DriverManager.getConnection(url, properties);if (autoCommit != null && autoCommit != connection.getAutoCommit()) {connection.setAutoCommit(autoCommit);}if (defaultTransactionIsolationLevel != null) {connection.setTransactionIsolation(defaultTransactionIsolationLevel);}return connection;}@Overridepublic Connection getConnection() throws SQLException {return doGetConnection(username, password);}@Overridepublic Connection getConnection(String username, String password) throws SQLException {return doGetConnection(username, password);}@Overridepublic <T> T unwrap(Class<T> iface) throws SQLException {throw new SQLException(getClass().getName() + "is not a wrapper.");}@Overridepublic boolean isWrapperFor(Class<?> iface) throws SQLException {return false;}@Overridepublic PrintWriter getLogWriter() throws SQLException {return DriverManager.getLogWriter();}@Overridepublic void setLogWriter(PrintWriter out) throws SQLException {DriverManager.setLogWriter(out);}@Overridepublic void setLoginTimeout(int seconds) throws SQLException {DriverManager.setLoginTimeout(seconds);}@Overridepublic int getLoginTimeout() throws SQLException {return DriverManager.getLoginTimeout();}@Overridepublic Logger getParentLogger() throws SQLFeatureNotSupportedException {return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);}public ClassLoader getDriverClassLoader() {return driverClassLoader;}public void setDriverClassLoader(ClassLoader driverClassLoader) {this.driverClassLoader = driverClassLoader;}public Properties getDriverProperties() {return driverProperties;}public void setDriverProperties(Properties driverProperties) {this.driverProperties = driverProperties;}public static Map<String, Driver> getRegisteredDrivers() {return registeredDrivers;}public static void setRegisteredDrivers(Map<String, Driver> registeredDrivers) {UnpooledDataSource.registeredDrivers = registeredDrivers;}public String getDriver() {return driver;}public void setDriver(String driver) {this.driver = driver;}public String getUrl() {return url;}public void setUrl(String url) {this.url = url;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public Boolean getAutoCommit() {return autoCommit;}public void setAutoCommit(Boolean autoCommit) {this.autoCommit = autoCommit;}public Integer getDefaultTransactionIsolationLevel() {return defaultTransactionIsolationLevel;}public void setDefaultTransactionIsolationLevel(Integer defaultTransactionIsolationLevel) {this.defaultTransactionIsolationLevel = defaultTransactionIsolationLevel;}
}
- 无池化的数据源连接实现:核心在于
initializerDriver
初始化驱动中使用Class.forName
和newInstance
的方式创建了数据源连接操作。 - 在创建完成链接之后,把链接存放到驱动注册器中,方便后续使用中可以直接获取链接,避免重复创建所带来的资源消耗。
3.4 有池化链接实现
💡 有池化的数据源链接,核心在于对无池化链接的包装,同时提供了相应的池化技术实现。
- 包括:pushConnection、popConnection、forceCloseAll、pingConnection 的操作处理。
- 当用户想要获取链接时,则会从连接池中获取链接,同时判断是否有空闲链接、最大活跃链接多少,以及是否需要等待处理或是最终抛出异常。
3.4.1 有连接的数据源
PooledDataSource.java
package com.lino.mybatis.datasource.pooled;import com.lino.mybatis.datasource.unpooled.UnpooledDataSource;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.io.PrintWriter;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.sql.*;
import java.util.logging.Logger;/*** @description: 有连接池的数据源*/
public class PooledDataSource implements DataSource {private org.slf4j.Logger logger = LoggerFactory.getLogger(PooledDataSource.class);/*** 池状态*/private final PoolState state = new PoolState(this);/*** 无池化数据源*/private final UnpooledDataSource dataSource;/*** 活跃连接数*/protected int poolMaximumActiveConnections = 10;/*** 空闲连接数*/protected int poolMaximumIdleConnections = 5;/*** 在被强制返回之前,池中连接被检查的时间*/protected int poolMaximumCheckoutTime = 20000;/*** 这是给连接池一个打印日志状态机会的低层次设置,还有重新尝试获得连接,这些情况下往往需要很长时间,为了避免连接池没有配置时静默失败*/protected int poolTimeToWait = 20000;/*** 发送到数据的侦测查询,用来验证连接是否正常工作,并且准备接受请求。* 默认是 “NO PING QUERY SET”,这回引起许多数据库驱动连接由一个错误信息而导致失败*/protected String poolPingQuery = "NO PING QUERY SET";/*** 开启或禁用侦测查询*/protected boolean poolPingEnabled = false;/*** 用来配置 poolPingQuery 多长时间被用一次*/protected int poolPingConnectionsNotUsedFor = 0;private int expectedConnectionTypeCode;public PooledDataSource() {this.dataSource = new UnpooledDataSource();}protected void pushConnection(PooledConnection connection) throws SQLException {synchronized (state) {state.activeConnections.remove(connection);// 判断连接是否有效if (connection.isValid()) {// 如果空闲连接小于设定数量,也就是太少时if (state.idleConnections.size() < poolMaximumIdleConnections && connection.getConnectionTypeCode() == expectedConnectionTypeCode) {state.accumulatedCheckoutTime += connection.getCheckOutTime();if (!connection.getRealConnection().getAutoCommit()) {connection.getRealConnection().rollback();}// 实例化一个新的DB连接,加入到idle列表PooledConnection newConnection = new PooledConnection(connection.getRealConnection(), this);state.idleConnections.add(newConnection);newConnection.setCreatedTimestamp(connection.getCreatedTimestamp());newConnection.setLastUsedTimestamp(connection.getLastUsedTimestamp());connection.invalidate();logger.info("Returned connection " + newConnection.getRealHashCode() + " to pool.");// 通知其他线程可以来抢DB连接了state.notifyAll();}// 否则,空闲连接还比较充足else {state.accumulatedCheckoutTime += connection.getCheckOutTime();if (!connection.getRealConnection().getAutoCommit()) {connection.getRealConnection().rollback();}// 将connection关闭connection.getRealConnection().close();logger.info("Closed connection " + connection.getRealHashCode() + ".");connection.invalidate();}} else {logger.info("A bad connection (" + connection.getRealHashCode() + ") attempted to return to the pool, discarding connection.");state.badConnectionCount++;}}}private PooledConnection popConnection(String username, String password) throws SQLException {boolean countteWait = false;PooledConnection conn = null;long t = System.currentTimeMillis();int localBadConnectionCount = 0;while (conn == null) {synchronized (state) {// 如果有空闲连接:返回第一个if (!state.idleConnections.isEmpty()) {conn = state.idleConnections.remove(0);logger.info("Check out connention " + conn.getRealHashCode() + " form pool.");}// 如果无空闲连接:创建新的连接else {// 活跃连接数不足if (state.activeConnections.size() < poolMaximumActiveConnections) {conn = new PooledConnection(dataSource.getConnection(), this);logger.info("Created connention " + conn.getRealHashCode() + ".");}// 活跃连接数已满else {// 取得活跃连接列表的第一个,也就是最老的一个连接PooledConnection oldestActiveConnection = state.activeConnections.get(0);long longestCheckoutTime = oldestActiveConnection.getCheckOutTime();// 如果checkout时间过长,则这个连接标记为过期if (longestCheckoutTime > poolMaximumCheckoutTime) {state.claimedOverdueConnectionCount++;state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;state.accumulatedCheckoutTime += longestCheckoutTime;state.activeConnections.remove(oldestActiveConnection);if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {oldestActiveConnection.getRealConnection().rollback();}// 删掉最老的连接,然后重新实例化一个新的连接conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);oldestActiveConnection.invalidate();logger.info("Claimed overdue connention " + conn.getRealHashCode() + ".");}// 如果checkout超时时间不够长,则等待else {try {if (!countteWait) {state.hadToWaitCount++;countteWait = true;}logger.info("Waiting as long as " + poolTimeToWait + " millisecond for connection.");long wt = System.currentTimeMillis();state.wait(poolTimeToWait);state.accumulatedWaitTime += System.currentTimeMillis() - wt;} catch (InterruptedException e) {break;}}}}// 获得到连接if (conn != null) {if (conn.isValid()) {if (!conn.getRealConnection().getAutoCommit()) {conn.getRealConnection().rollback();}conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));// 记录checkout时间conn.setCheckOutTimestamp(System.currentTimeMillis());conn.setLastUsedTimestamp(System.currentTimeMillis());state.activeConnections.add(conn);state.requestCount++;state.accumulatedCheckoutTime += System.currentTimeMillis() - t;} else {logger.info("A bad connection (" + conn.getRealHashCode() + ") was returned form the pool, getting another connection.");// 如果没拿到,统计信息:失败连接 +1state.badConnectionCount++;localBadConnectionCount++;conn = null;// 失败次数较多:抛异常if (localBadConnectionCount > (poolMaximumIdleConnections + 3)) {logger.debug("PooledDataSource: Could not get a good connection to the database.");throw new SQLException("PooledDataSource: Could not get a good connection to the database.");}}}}}if (conn == null) {logger.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");}return conn;}public void forceCloseAll() {synchronized (state) {expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());// 关闭活跃连接for (int i = state.activeConnections.size(); i > 0; i--) {try {PooledConnection conn = state.activeConnections.remove(i - 1);conn.invalidate();Connection realConn = conn.getRealConnection();if (!realConn.getAutoCommit()) {realConn.rollback();}realConn.close();} catch (Exception ignore) {}}// 关闭空闲连接for (int i = state.idleConnections.size(); i > 0; i--) {try {PooledConnection conn = state.idleConnections.remove(i - 1);conn.invalidate();Connection realConn = conn.getRealConnection();if (!realConn.getAutoCommit()) {realConn.rollback();}realConn.close();} catch (Exception ignore) {}}logger.info("PooledDataSource forcefully closed/removed all connections.");}}protected boolean pingConnection(PooledConnection conn) {boolean result = true;try {result = !conn.getRealConnection().isClosed();} catch (SQLException e) {logger.info("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());result = false;}if (result) {if (poolPingEnabled) {if (poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {try {logger.info("Testing connection " + conn.getRealHashCode() + " ...");Connection realConn = conn.getRealConnection();Statement statement = realConn.createStatement();ResultSet resultSet = statement.executeQuery(poolPingQuery);resultSet.close();if (!realConn.getAutoCommit()) {realConn.rollback();}result = true;logger.info("Connection " + conn.getRealHashCode() + " is GOOD!");} catch (Exception e) {logger.info("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage());try {conn.getRealConnection().close();} catch (SQLException ignore) {}result = false;logger.info("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());}}}}return result;}public static Connection unwrapConnection(Connection conn) {if (Proxy.isProxyClass(conn.getClass())) {InvocationHandler handler = Proxy.getInvocationHandler(conn);if (handler instanceof javax.sql.PooledConnection) {return ((PooledConnection) handler).getRealConnection();}}return conn;}private int assembleConnectionTypeCode(String url, String username, String password) {return ("" + url + username + password).hashCode();}@Overridepublic Connection getConnection() throws SQLException {return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();}@Overridepublic Connection getConnection(String username, String password) throws SQLException {return popConnection(username, password).getProxyConnection();}protected void finalize() throws Throwable {forceCloseAll();super.finalize();}@Overridepublic <T> T unwrap(Class<T> iface) throws SQLException {throw new SQLException(getClass().getName() + " is not a wrapper.");}@Overridepublic boolean isWrapperFor(Class<?> iface) throws SQLException {return false;}@Overridepublic PrintWriter getLogWriter() throws SQLException {return DriverManager.getLogWriter();}@Overridepublic void setLogWriter(PrintWriter out) throws SQLException {DriverManager.setLogWriter(out);}@Overridepublic void setLoginTimeout(int seconds) throws SQLException {DriverManager.setLoginTimeout(seconds);}@Overridepublic int getLoginTimeout() throws SQLException {return DriverManager.getLoginTimeout();}@Overridepublic Logger getParentLogger() throws SQLFeatureNotSupportedException {return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);}public void setDriver(String driver) {dataSource.setDriver(driver);forceCloseAll();}public void setUrl(String url) {dataSource.setUrl(url);forceCloseAll();}public void setUsername(String username) {dataSource.setUsername(username);forceCloseAll();}public void setPassword(String password) {dataSource.setPassword(password);forceCloseAll();}public void setDefaultAutoCommit(boolean defaultAutoCommit) {dataSource.setAutoCommit(defaultAutoCommit);forceCloseAll();}public int getPoolMaximumActiveConnections() {return poolMaximumActiveConnections;}public void setPoolMaximumActiveConnections(int poolMaximumActiveConnections) {this.poolMaximumActiveConnections = poolMaximumActiveConnections;}public int getPoolMaximumIdleConnections() {return poolMaximumIdleConnections;}public void setPoolMaximumIdleConnections(int poolMaximumIdleConnections) {this.poolMaximumIdleConnections = poolMaximumIdleConnections;}public int getPoolMaximumCheckoutTime() {return poolMaximumCheckoutTime;}public void setPoolMaximumCheckoutTime(int poolMaximumCheckoutTime) {this.poolMaximumCheckoutTime = poolMaximumCheckoutTime;}public int getPoolTimeToWait() {return poolTimeToWait;}public void setPoolTimeToWait(int poolTimeToWait) {this.poolTimeToWait = poolTimeToWait;}public String getPoolPingQuery() {return poolPingQuery;}public void setPoolPingQuery(String poolPingQuery) {this.poolPingQuery = poolPingQuery;}public boolean isPoolPingEnabled() {return poolPingEnabled;}public void setPoolPingEnabled(boolean poolPingEnabled) {this.poolPingEnabled = poolPingEnabled;}public int getPoolPingConnectionsNotUsedFor() {return poolPingConnectionsNotUsedFor;}public void setPoolPingConnectionsNotUsedFor(int poolPingConnectionsNotUsedFor) {this.poolPingConnectionsNotUsedFor = poolPingConnectionsNotUsedFor;}public int getExpectedConnectionTypeCode() {return expectedConnectionTypeCode;}public void setExpectedConnectionTypeCode(int expectedConnectionTypeCode) {this.expectedConnectionTypeCode = expectedConnectionTypeCode;}
}
3.4.2 池化链接的代理
PooledConnection.java
package com.lino.mybatis.datasource.pooled;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Objects;/*** @description: 池化代理的链接*/
public class PooledConnection implements InvocationHandler {private static final String CLOSE = "close";private static final Class<?>[] IFACES = new Class<?>[]{Connection.class};private int hashCode = 0;private PooledDataSource dataSource;/*** 真实的连接*/private Connection realConnection;/*** 代理的连接*/private Connection proxyConnection;private long checkOutTimestamp;private long createdTimestamp;private long lastUsedTimestamp;private int connectionTypeCode;private boolean valid;public PooledConnection(Connection connection, PooledDataSource dataSource) {this.hashCode = connection.hashCode();this.realConnection = connection;this.dataSource = dataSource;this.createdTimestamp = System.currentTimeMillis();this.lastUsedTimestamp = System.currentTimeMillis();this.valid = true;this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {String methodName = method.getName();// 如果是调用 CLOSE 关闭连接方法,则将连接加入连接池中,并返回nullif (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {dataSource.pushConnection(this);return null;} else {if (!Object.class.equals(method.getDeclaringClass())) {// 除了toString()方法,其他方法调用之前要检查connection是否还是合法的,不合法要抛出SQL异常checkConnection();}// 其他方法交给connection去调用return method.invoke(realConnection, args);}}private void checkConnection() throws SQLException {if (!valid) {throw new SQLException("Error accessing PooledConnection. Connection is invalid.");}}public void invalidate() {valid = false;}public boolean isValid() {return valid && realConnection != null && dataSource.pingConnection(this);}public Connection getRealConnection() {return realConnection;}public Connection getProxyConnection() {return proxyConnection;}public int getRealHashCode() {return realConnection == null ? 0 : realConnection.hashCode();}public int getConnectionTypeCode() {return connectionTypeCode;}public void setConnectionTypeCode(int connectionTypeCode) {this.connectionTypeCode = connectionTypeCode;}public long getCreatedTimestamp() {return createdTimestamp;}public void setCreatedTimestamp(long createdTimestamp) {this.createdTimestamp = createdTimestamp;}public long getLastUsedTimestamp() {return lastUsedTimestamp;}public void setLastUsedTimestamp(long lastUsedTimestamp) {this.lastUsedTimestamp = lastUsedTimestamp;}public long getTimeElapsedSinceLastUse() {return System.currentTimeMillis() - lastUsedTimestamp;}public long getAge() {return System.currentTimeMillis() - createdTimestamp;}public long getCheckOutTimestamp() {return checkOutTimestamp;}public void setCheckOutTimestamp(long checkOutTimestamp) {this.checkOutTimestamp = checkOutTimestamp;}public long getCheckOutTime() {return System.currentTimeMillis() - checkOutTimestamp;}@Overridepublic int hashCode() {return hashCode;}@Overridepublic boolean equals(Object obj) {if (obj instanceof PooledConnection) {return realConnection.hashCode() == ((PooledConnection) obj).realConnection.hashCode();} else if (obj instanceof Connection) {return hashCode == obj.hashCode();} else {return false;}}
}
- 当我们需要对链接进行池化处理,当链接调用一些
CLOSE
方法时,也需要把链接从池中关闭和恢复可用,允许其他用户获取到链接。 - 那么这里就需要对连接类进行代理包装,处理
CLOSE
方法。 - 通过 PooledConnection 实现
InvocationHandle#invoke
方法,包装代理链接,这样就可以对具体的调用方法进行控制。 - 在
invoke
方法中处理对 CLOSE 方法控制以外,排除toString
等 Object 的方法后,则是其他真正需要被DB链接处理的方法。 - 对于 CLOSE 方法的数据源回收操作
dataSource.pushConnection(this)
;有一个具体的实现方法,在池化实现类 PooledDataSource 中进行处理。
3.4.3 池状态定义
PoolState.java
package com.lino.mybatis.datasource.pooled;import java.util.ArrayList;
import java.util.List;/*** @description: 池状态*/
public class PoolState {protected PooledDataSource dataSource;/*** 空闲连接*/protected final List<PooledConnection> idleConnections = new ArrayList<>();/*** 活跃连接*/protected final List<PooledConnection> activeConnections = new ArrayList<>();/*** 请求次数*/protected long requestCount = 0;/*** 总请求时间*/protected long accumulatedRequestTime = 0;protected long accumulatedCheckoutTime = 0;protected long claimedOverdueConnectionCount = 0;protected long accumulatedCheckoutTimeOfOverdueConnections = 0;/*** 总等待时间*/protected long accumulatedWaitTime = 0;/*** 要等待的次数*/protected long hadToWaitCount = 0;/*** 失败连接次数*/protected long badConnectionCount = 0;public PoolState(PooledDataSource dataSource) {this.dataSource = dataSource;}public synchronized long getRequestCount() {return requestCount;}public synchronized long getAverageRequestTime() {return requestCount == 0 ? 0 : accumulatedRequestTime / requestCount;}public synchronized long getAverageWaitTime() {return hadToWaitCount == 0 ? 0 : accumulatedWaitTime / hadToWaitCount;}public synchronized long getHadToWaitCount() {return hadToWaitCount;}public synchronized long getBadConnectionCount() {return badConnectionCount;}public synchronized long getClaimedOverdueConnectionCount() {return claimedOverdueConnectionCount;}public synchronized long getAverageOverdueCheckoutTime() {return claimedOverdueConnectionCount == 0 ? 0 : accumulatedCheckoutTimeOfOverdueConnections / claimedOverdueConnectionCount;}public synchronized long getAverageCheckoutTime() {return requestCount == 0 ? 0 : accumulatedCheckoutTime / requestCount;}public synchronized int getIdleConnectionCount() {return idleConnections.size();}public synchronized int getActiveConnectionCount() {return activeConnections.size();}
}
- 定义连接池状态。包括:空闲连接、活跃连接、请求次数、总请求时间、总等待时间、要等待的数次、失败连接次数等。
- 给连接池状态添加
synchronized
锁,避免并发出现的问题。
3.4.4 pushConnection 回收链接
protected void pushConnection(PooledConnection connection) throws SQLException {synchronized (state) {state.activeConnections.remove(connection);// 判断连接是否有效if (connection.isValid()) {// 如果空闲连接小于设定数量,也就是太少时if (state.idleConnections.size() < poolMaximumIdleConnections && connection.getConnectionTypeCode() == expectedConnectionTypeCode) {state.accumulatedCheckoutTime += connection.getCheckOutTime();if (!connection.getRealConnection().getAutoCommit()) {connection.getRealConnection().rollback();}// 实例化一个新的DB连接,加入到idle列表PooledConnection newConnection = new PooledConnection(connection.getRealConnection(), this);state.idleConnections.add(newConnection);newConnection.setCreatedTimestamp(connection.getCreatedTimestamp());newConnection.setLastUsedTimestamp(connection.getLastUsedTimestamp());connection.invalidate();logger.info("Returned connection " + newConnection.getRealHashCode() + " to pool.");// 通知其他线程可以来抢DB连接了state.notifyAll();}// 否则,空闲连接还比较充足else {state.accumulatedCheckoutTime += connection.getCheckOutTime();if (!connection.getRealConnection().getAutoCommit()) {connection.getRealConnection().rollback();}// 将connection关闭connection.getRealConnection().close();logger.info("Closed connection " + connection.getRealHashCode() + ".");connection.invalidate();}} else {logger.info("A bad connection (" + connection.getRealHashCode() + ") attempted to return to the pool, discarding connection.");state.badConnectionCount++;}}
}
- 在
pooleadDataSource#pushConnection
数据源回收的处理中,核心在于 判断链接是否有效,以及进行相关的 空闲链接校验,判断是否把链接回到到 idle 空闲链接列表中,并通知其他线程来抢占。 - 如果现在有空闲链接充足,那么这个回收的链接则会进行回滚和关闭的处理。
- 回滚:
connection.getRealConnection().rollbak()
- 关闭:
connection.getRealConnection().close()
- 回滚:
3.4.5 popConnection 获取链接
private PooledConnection popConnection(String username, String password) throws SQLException {boolean countteWait = false;PooledConnection conn = null;long t = System.currentTimeMillis();int localBadConnectionCount = 0;while (conn == null) {synchronized (state) {// 如果有空闲连接:返回第一个if (!state.idleConnections.isEmpty()) {conn = state.idleConnections.remove(0);logger.info("Check out connention " + conn.getRealHashCode() + " form pool.");}// 如果无空闲连接:创建新的连接else {// 活跃连接数不足if (state.activeConnections.size() < poolMaximumActiveConnections) {conn = new PooledConnection(dataSource.getConnection(), this);logger.info("Created connention " + conn.getRealHashCode() + ".");}// 活跃连接数已满else {// 取得活跃连接列表的第一个,也就是最老的一个连接PooledConnection oldestActiveConnection = state.activeConnections.get(0);long longestCheckoutTime = oldestActiveConnection.getCheckOutTime();// 如果checkout时间过长,则这个连接标记为过期if (longestCheckoutTime > poolMaximumCheckoutTime) {state.claimedOverdueConnectionCount++;state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;state.accumulatedCheckoutTime += longestCheckoutTime;state.activeConnections.remove(oldestActiveConnection);if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {oldestActiveConnection.getRealConnection().rollback();}// 删掉最老的连接,然后重新实例化一个新的连接conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);oldestActiveConnection.invalidate();logger.info("Claimed overdue connention " + conn.getRealHashCode() + ".");}// 如果checkout超时时间不够长,则等待else {try {if (!countteWait) {state.hadToWaitCount++;countteWait = true;}logger.info("Waiting as long as " + poolTimeToWait + " millisecond for connection.");long wt = System.currentTimeMillis();state.wait(poolTimeToWait);state.accumulatedWaitTime += System.currentTimeMillis() - wt;} catch (InterruptedException e) {break;}}}}// 获得到连接if (conn != null) {if (conn.isValid()) {if (!conn.getRealConnection().getAutoCommit()) {conn.getRealConnection().rollback();}conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));// 记录checkout时间conn.setCheckOutTimestamp(System.currentTimeMillis());conn.setLastUsedTimestamp(System.currentTimeMillis());state.activeConnections.add(conn);state.requestCount++;state.accumulatedCheckoutTime += System.currentTimeMillis() - t;} else {logger.info("A bad connection (" + conn.getRealHashCode() + ") was returned form the pool, getting another connection.");// 如果没拿到,统计信息:失败连接 +1state.badConnectionCount++;localBadConnectionCount++;conn = null;// 失败次数较多:抛异常if (localBadConnectionCount > (poolMaximumIdleConnections + 3)) {logger.debug("PooledDataSource: Could not get a good connection to the database.");throw new SQLException("PooledDataSource: Could not get a good connection to the database.");}}}}}if (conn == null) {logger.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");}return conn;
}
popConnection
获取链接是一个while
死循环操作,只有获取到链接抛异常才会退出循环。- 获取链接的过程会使用
synchronized
进行加锁,因为所有线程在资源竞争的情况下,都需要进行加锁处理。 - 在加锁的代码块中通过判断是否还有空闲链接进行返回,如果没有则会判断活跃连接数是否充足,不充足则进行创建后返回。
- 在这里也会遇到活跃链接已经进行循环等待的过程,最后再不能获取则抛出异常。
3.5 数据源工厂
💡 数据源工厂包括两部分:分别是无池化和有池化,有池化的工厂继承无池化工厂。
在 mybatis 源码的实现类中,这样可以减少对 Properties 统一包装的反射方式的属性处理。
3.5.1 无池化工厂
UnpooledDataSourceFactory.java
package com.lino.mybatis.datasource.unpooled;import com.lino.mybatis.datasource.DataSourceFactory;
import javax.sql.DataSource;
import java.util.Properties;/*** @description: 无池化数据源工厂*/
public class UnpooledDataSourceFactory implements DataSourceFactory {protected Properties props;@Overridepublic void setProperties(Properties props) {this.props = props;}@Overridepublic DataSource getDataSource() {UnpooledDataSource unpooledDataSource = new UnpooledDataSource();unpooledDataSource.setDriver(props.getProperty("driver"));unpooledDataSource.setUrl(props.getProperty("url"));unpooledDataSource.setUsername(props.getProperty("username"));unpooledDataSource.setPassword(props.getProperty("password"));return unpooledDataSource;}
}
- 简单包装
getDataSource
获取数据源处理,把必要的参数进行传递。
3.5.2 有池化工厂
PooledDataSourceFactory.java
package com.lino.mybatis.datasource.pooled;import com.lino.mybatis.datasource.unpooled.UnpooledDataSourceFactory;
import javax.sql.DataSource;/*** @description: 有连接池的数据源工厂*/
public class PooledDataSourceFactory extends UnpooledDataSourceFactory {@Overridepublic DataSource getDataSource() {PooledDataSource pooledDataSource = new PooledDataSource();pooledDataSource.setDriver(props.getProperty("driver"));pooledDataSource.setUrl(props.getProperty("url"));pooledDataSource.setUsername(props.getProperty("username"));pooledDataSource.setPassword(props.getProperty("password"));return pooledDataSource;}
}
- 有池化的数据源工厂实现的也比较简单,只是继承 UnpooledDataSourceFactory 共用获取属性的能力,以及实例化出池化数据源。
3.6 新增类型别名注册器
Configuration.java
public Configuration() {typeAliasRegistry.registerAlias("JDBC", JdbcTransactionFactory.class);typeAliasRegistry.registerAlias("DRUID", DruidDataSourceFactory.class);typeAliasRegistry.registerAlias("UNPOOLED", UnpooledDataSourceFactory.class);typeAliasRegistry.registerAlias("POOLED", PooledDataSourceFactory.class);
}
- 将两个数据源和对应的工厂实现类配置到 Configuration 配置类中,这样在解析 XML 时根据不同的数据源类型获取和实例化对应的实现类。
- 在构造方法
Configuration
添加 UNPOOLED、POOLED 两个数据源注册到类型注册器中,方便后续使用 XMLConfigBuilder#envirenmentElement 方法解析 XML 处理数据源时进行使用。
四、测试:数据源池化技术实现
4.1 配置数据源
4.1.1 无池化:UNPOOLED
mybatis-config-datasource.xml
<environments default="development"><environment id="development"><transactionManager type="JDBC"/><dataSource type="UNPOOLED"><property name="driver" value="com.mysql.jdbc.Driver"/><property name="url" value="jdbc:mysql://127.0.0.1:3306/mybatis?useUnicode=true"/><property name="username" value="root"/><property name="password" value="123456"/></dataSource></environment>
</environments>
4.1.2 有池化:POOLED
mybatis-config-datasource.xml
<environments default="development"><environment id="development"><transactionManager type="JDBC"/><dataSource type="POOLED"><property name="driver" value="com.mysql.jdbc.Driver"/><property name="url" value="jdbc:mysql://127.0.0.1:3306/mybatis?useUnicode=true"/><property name="username" value="root"/><property name="password" value="123456"/></dataSource></environment>
</environments>
4.2 单元测试
4.2.1 基础测试
ApiTest.java
@Test
public void test_SqlSessionFactory() throws IOException {// 1.从SqlSessionFactory中获取SqlSessionSqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(Resources.getResourceAsReader("mybatis-config-datasource.xml"));SqlSession sqlSession = sqlSessionFactory.openSession();// 2.获取映射器对象IUserDao userDao = sqlSession.getMapper(IUserDao.class);// 3.测试验证for (int i = 0; i < 50; i++) {User user = userDao.queryUserInfoById(1L);logger.info("测试结果:{}", JSON.toJSONString(user));}
}
- 在无池化和有池化的测试中,基础的测试单元不需要改变,仍是通过 SqlSessionFactory 中获取 SqlSession 并获得映射对象和执行方法调用。
- 另外添加了50次的查询调用,便于验证连接池的创建和获取以及等待。
- 变化的在于 mybatis-config-datasource.xml 中 dataSource 数据源类型的调整
dataSource type="UNPOOLED/POOLED"
4.2.2 无池化测试结果:UNPOOLED
10:42:29.188 [main] INFO com.lino.mybatis.test.ApiTest - 测试结果:{"id":1,"userHead":"1_04","userId":"10001","userName":"小灵哥"}
10:42:29.205 [main] INFO com.lino.mybatis.test.ApiTest - 测试结果:{"id":1,"userHead":"1_04","userId":"10001","userName":"小灵哥"}
10:42:29.236 [main] INFO com.lino.mybatis.test.ApiTest - 测试结果:{"id":1,"userHead":"1_04","userId":"10001","userName":"小灵哥"}
10:42:29.252 [main] INFO com.lino.mybatis.test.ApiTest - 测试结果:{"id":1,"userHead":"1_04","userId":"10001","userName":"小灵哥"}
10:42:29.268 [main] INFO com.lino.mybatis.test.ApiTest - 测试结果:{"id":1,"userHead":"1_04","userId":"10001","userName":"小灵哥"}
10:42:29.299 [main] INFO com.lino.mybatis.test.ApiTest - 测试结果:{"id":1,"userHead":"1_04","userId":"10001","userName":"小灵哥"}
10:42:29.315 [main] INFO com.lino.mybatis.test.ApiTest - 测试结果:{"id":1,"userHead":"1_04","userId":"10001","userName":"小灵哥"}
...
- 无池化的连接池操作,会不断的与数据库建立新的链接并执行 SQL 操作,这个过程中只要数据库还有链接可以被链接,就可以创建链接。
4.2.3 有池化测试结果:POOLED
10:46:58.765 [main] INFO c.l.m.d.pooled.PooledDataSource - PooledDataSource forcefully closed/removed all connections.
10:46:58.765 [main] INFO c.l.m.d.pooled.PooledDataSource - PooledDataSource forcefully closed/removed all connections.
10:46:58.765 [main] INFO c.l.m.d.pooled.PooledDataSource - PooledDataSource forcefully closed/removed all connections.
10:46:58.765 [main] INFO c.l.m.d.pooled.PooledDataSource - PooledDataSource forcefully closed/removed all connections.
10:46:59.444 [main] INFO c.l.m.d.pooled.PooledDataSource - Created connention 1436664465.
10:46:59.507 [main] INFO com.lino.mybatis.test.ApiTest - 测试结果:{"id":1,"userHead":"1_04","userId":"10001","userName":"小灵哥"}
10:46:59.533 [main] INFO c.l.m.d.pooled.PooledDataSource - Created connention 1205406622.
10:46:59.533 [main] INFO com.lino.mybatis.test.ApiTest - 测试结果:{"id":1,"userHead":"1_04","userId":"10001","userName":"小灵哥"}
10:46:59.543 [main] INFO c.l.m.d.pooled.PooledDataSource - Created connention 796667727.
10:46:59.554 [main] INFO com.lino.mybatis.test.ApiTest - 测试结果:{"id":1,"userHead":"1_04","userId":"10001","userName":"小灵哥"}
10:46:59.570 [main] INFO c.l.m.d.pooled.PooledDataSource - Created connention 1541857308.
10:46:59.570 [main] INFO com.lino.mybatis.test.ApiTest - 测试结果:{"id":1,"userHead":"1_04","userId":"10001","userName":"小灵哥"}
10:46:59.586 [main] INFO c.l.m.d.pooled.PooledDataSource - Created connention 2095303566.
10:46:59.586 [main] INFO com.lino.mybatis.test.ApiTest - 测试结果:{"id":1,"userHead":"1_04","userId":"10001","userName":"小灵哥"}
10:46:59.602 [main] INFO c.l.m.d.pooled.PooledDataSource - Created connention 581318631.
10:46:59.602 [main] INFO com.lino.mybatis.test.ApiTest - 测试结果:{"id":1,"userHead":"1_04","userId":"10001","userName":"小灵哥"}
10:46:59.633 [main] INFO c.l.m.d.pooled.PooledDataSource - Created connention 1989184704.
10:46:59.633 [main] INFO com.lino.mybatis.test.ApiTest - 测试结果:{"id":1,"userHead":"1_04","userId":"10001","userName":"小灵哥"}
10:46:59.649 [main] INFO c.l.m.d.pooled.PooledDataSource - Created connention 199640888.
10:46:59.649 [main] INFO com.lino.mybatis.test.ApiTest - 测试结果:{"id":1,"userHead":"1_04","userId":"10001","userName":"小灵哥"}
10:46:59.665 [main] INFO c.l.m.d.pooled.PooledDataSource - Created connention 1243806178.
10:46:59.665 [main] INFO com.lino.mybatis.test.ApiTest - 测试结果:{"id":1,"userHead":"1_04","userId":"10001","userName":"小灵哥"}
10:46:59.690 [main] INFO c.l.m.d.pooled.PooledDataSource - Created connention 1007880005.
10:46:59.690 [main] INFO com.lino.mybatis.test.ApiTest - 测试结果:{"id":1,"userHead":"1_04","userId":"10001","userName":"小灵哥"}
10:46:59.690 [main] INFO c.l.m.d.pooled.PooledDataSource - Waiting as long as 20000 millisecond for connection.
10:47:19.690 [main] INFO c.l.m.d.pooled.PooledDataSource - Claimed overdue connention 1436664465.
10:47:19.690 [main] INFO com.lino.mybatis.test.ApiTest - 测试结果:{"id":1,"userHead":"1_04","userId":"10001","userName":"小灵哥"}
...
- 通过使用连接池的配置可以看到,在调用和获取链接的过程中,当调用次数达到10此以后,连接池中就有了10个活跃链接,再调用时则需要等待连接释放后才能使用并执行 SQL 操作。
4.3 连接池验证
test_pooled:连接池验证
@Test
public void test_pooled() throws IOException, SQLException, InterruptedException {PooledDataSource pooledDataSource = new PooledDataSource();pooledDataSource.setDriver("com.mysql.jdbc.Driver");pooledDataSource.setUrl("jdbc:mysql://127.0.0.1:3306/mybatis?useUnicode=true");pooledDataSource.setUsername("root");pooledDataSource.setPassword("123456");// 持续获取连接while (true) {Connection connection = pooledDataSource.getConnection();System.out.println(connection);Thread.sleep(1000);connection.close();}
}
测试结果
10:52:54.704 [main] INFO c.l.m.d.pooled.PooledDataSource - PooledDataSource forcefully closed/removed all connections.
10:52:54.704 [main] INFO c.l.m.d.pooled.PooledDataSource - PooledDataSource forcefully closed/removed all connections.
10:52:54.704 [main] INFO c.l.m.d.pooled.PooledDataSource - PooledDataSource forcefully closed/removed all connections.
10:52:54.704 [main] INFO c.l.m.d.pooled.PooledDataSource - PooledDataSource forcefully closed/removed all connections.
10:52:55.386 [main] INFO c.l.m.d.pooled.PooledDataSource - Created connention 103536485.
com.mysql.jdbc.JDBC4Connection@62bd765
10:52:56.401 [main] INFO c.l.m.d.pooled.PooledDataSource - Returned connection 103536485 to pool.
10:52:56.401 [main] INFO c.l.m.d.pooled.PooledDataSource - Check out connention 103536485 form pool.
com.mysql.jdbc.JDBC4Connection@62bd765
10:52:57.404 [main] INFO c.l.m.d.pooled.PooledDataSource - Returned connection 103536485 to pool.
10:52:57.404 [main] INFO c.l.m.d.pooled.PooledDataSource - Check out connention 103536485 form pool.
com.mysql.jdbc.JDBC4Connection@62bd765
10:52:58.412 [main] INFO c.l.m.d.pooled.PooledDataSource - Returned connection 103536485 to pool.
10:52:58.412 [main] INFO c.l.m.d.pooled.PooledDataSource - Check out connention 103536485 form pool.
com.mysql.jdbc.JDBC4Connection@62bd765
10:52:59.416 [main] INFO c.l.m.d.pooled.PooledDataSource - Returned connection 103536485 to pool.
10:52:59.416 [main] INFO c.l.m.d.pooled.PooledDataSource - Check out connention 103536485 form pool.
- 从连接的 hashCode 的值
@62bd765
,可以看出数据库链接已经被缓存了,只要有空闲链接,就会调用数据库中同一个链接,节约资源。
!https://img-blog.csdnimg.cn/0332e7801f6b410daf16a5d4d5393a4e.jpeg
五、总结:数据源池化技术实现
- 完成了 Mybatis 数据源池化的设计和实现,连接池的实现重点包括:synchronized 加锁、创建链接、活跃数量控制、休眠等待时长、抛异常逻辑等。