scala中通常是通过JDBC组件来连接Mysql。JDBC, 全称为Java DataBase Connectivity standard。
加载依赖
其中包含 JDBC driver
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.29</version>
</dependency>
1.1 spark组件直接连接(推荐)
通过spark.read直接连接,直接得到dataframe
val database = "test_db"
val table = "test_table"
val user = "hive"
val password = "hive"
val url= "jdbc:mysql://localhost:10101/"+databaseval jdbcDF = (spark.read.format("jdbc")
.option("url", url)
.option("dbtable", table)
.option("user", user)
.option("password", password)
.option("driver", "com.mysql.cj.jdbc.Driver")
.load())jdbcDF.show()+---+--------+
| id| value|
+---+--------+
| 0|Record 0|
| 1|Record 1|
| 2|Record 2|
| 3|Record 3|
| 4|Record 4|
+---+--------+//通过connectionProperties避免多次写入配置
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read//可增加.option("","")添加其他参数.jdbc(url, table , connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read.jdbc(url, table , connectionProperties)// 保存数据
jdbcDF.write.format("jdbc").option("url", "jdbc:postgresql:dbserver").option("dbtable", "schema.tablename").option("user", "username").option("password", "password").save()//或等价的:
jdbcDF2.write.jdbc(url, table , connectionProperties)// Specifying create table column data types on write
jdbcDF.write.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)").jdbc(url, table , connectionProperties)//通过query取数据
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("query", "select c1, c2 from t1")
.load()
//或放到properties里
connectionProperties.put("query", "select c1, c2 from t1")
spark.read.jdbc(url, table , connectionProperties)
注意:driver的类名根据不同的JDBC版本不同,早一些的版本为com.mysql.jdbc
,而不是com.mysql.cj.jdbc.Driver
。
2.1 jdbc api方法连接
还可通过 jdbc方法获取或保存数据
/*** Created by Administrator on 2017/12/23.*/
import java.sql.{ Connection, DriverManager }object ScalaJdbcConnectSelect extends App {// 访问本地MySQL服务器,通过3306端口访问mysql数据库val url = "jdbc:mysql://localhost:3306/cgjr?useUnicode=true&characterEncoding=utf-8&useSSL=false"//驱动名称val driver = "com.mysql.cj.jdbc.Driver"//用户名val username = "root"//密码val password = "12345"//初始化数据连接var connection: Connection = _try {//注册DriverClass.forName(driver)//得到连接connection = DriverManager.getConnection(url, username, password)val statement = connection.createStatement//执行查询语句,并返回结果val rs = statement.executeQuery("SELECT name, num FROM persons")//返回java.sql的ResultSet//打印返回结果while (rs.next) {val name = rs.getString("name")val num = rs.getString("num")
// println(name+"\t"+num)println("name = %s, num = %s".format(name, num))}println("查询数据完成!")// 执行插入操作val rs2 = statement.executeUpdate("INSERT INTO `persons` (`name`, `num`) VALUES ('徐志摩', '22')")println("插入数据完成")// 执行更新操作val rs3 = statement.executeUpdate("UPDATE persons set num=55 WHERE `name`=\"徐志摩\"")println("更新数据完成!")// 执行删除操作val rs4 = statement.executeUpdate("delete from persons WHERE `name`=\"徐志摩\"")println("删除数据完成!")// 执行调用存储过程操作val rs5 = statement.executeUpdate("call add_student(3)")println("调用存储过程完成!")} catch {case e: Exception => e.printStackTrace}//关闭连接,释放资源connection.close
}
参考
Spark Scala: Load Data from MySQL
【scala 数据库操作】scala操作mysql数据库
【官方】使用jdbc连接其他数据库