Scala 练习一 将Mysql表数据导入HBase

Scala 练习一 将Mysql表数据导入HBase

续第一篇:Java代码将Mysql表数据导入HBase表

源码仓库地址:https://gitee.com/leaf-domain/data-to-hbase

  • 一、整体介绍
  • 二、依赖
  • 三、测试结果
  • 四、源码

一、整体介绍

在这里插入图片描述

  1. HBase特质

    连接HBase, 创建HBase执行对象

    1. 初始化配置信息:多条(hbase.zookeeper.quorum=>ip:2181)
      Configuration conf = HBaseConfiguration.create()
      conf.set(String, String)
    2. 创建连接:多个连接(池化)
      Connection con = ConnectionFactory.createConnection()
    3. 创建数据表:表名: String
      Table table = con.getTable(TableName)
    def build(): HBase		// 初始化配置信息
    def initPool(): HBase	// 初始化连接池
    def finish(): Executor	// 完成 返回执行对象
    
  2. Executor特质

    对HBase进行操作的方法: 包含如下函数

    def exists(tableName: String): Boolean	// 验证数据表是否存在
    def create(tableName: String, columnFamilies: Seq[String]): Boolean	// 创建数据表
    def drop(tableName: String): Boolean	// 删除数据表
    def put(tableName: String, data: util.List[Put]): Boolean	// 批量插入数据
    
  3. Jdbc 封装

    Jdbc封装

    1. 初始化连接
      driver : com.mysql.cj.jdbc.Driver
      参数:url, username, password
      创建连接
    2. 初始化执行器
      sql, parameters
      创建执行器【初始化参数】
    3. 执行操作并返回【结果】
      DML: 返回影响数据库表行数
      DQL: 返回查询的数据集合
      EX: 出现异常结果
  4. MyHBase用于实现HBaseExecutor特质

  5. 测试数据格式

    mysql表

    SET NAMES utf8mb4;
    SET FOREIGN_KEY_CHECKS = 0;DROP TABLE IF EXISTS `test_table_for_hbase`;
    CREATE TABLE `test_table_for_hbase`  (`test_id` int NULL DEFAULT NULL,`test_name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,`test_age` int NULL DEFAULT NULL,`test_gender` varchar(6) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,`test_phone` varchar(11) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL
    ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;INSERT INTO `test_table_for_hbase` VALUES (1, 'testName1', 26, 'male', '18011111112');
    INSERT INTO `test_table_for_hbase` VALUES (2, 'testName2', 25, 'female', '18011111113');
    INSERT INTO `test_table_for_hbase` VALUES (3, 'testName3', 27, 'male', '18011111114');
    INSERT INTO `test_table_for_hbase` VALUES (4, 'testName4', 35, 'male', '18011111115');
    -- .... 省略以下数据部分
    

    hbase表

    # 创建表  库名:表名, 列族1, 列族2
    create "hbase_test:tranfer_from_mysql","baseInfo","scoreInfo"	
    truncate 'hbase_test:tranfer_from_mysql'  # 清空hbase_test命名空间下的tranfer_from_mysql表
    scan 'hbase_test:tranfer_from_mysql'	  # 查看表
    

二、依赖

<dependencies><!-- HBase 驱动 --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.3.5</version></dependency><!-- Hadoop --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-auth</artifactId><version>3.1.3</version></dependency><!-- mysql --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.33</version></dependency><!-- zookeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.6.3</version></dependency>
</dependencies>

三、测试结果

终端有个日志的小警告(无伤大雅hh),输出为 true
在这里插入图片描述

查看hbase表,发现数据正常导入

在这里插入图片描述

四、源码

scala代码较简单这里直接上源码了,去除了部分注释,更多请去仓库下载

Executor

package hbase
import org.apache.hadoop.hbase.client.Put
import java.util
trait Executor {def exists(tableName: String): Booleandef create(tableName: String, columnFamilies: Seq[String]): Booleandef drop(tableName: String): Booleandef put(tableName: String, data: util.List[Put]): Boolean
}

HBase

package hbase
import org.apache.hadoop.hbase.client.Connection
trait HBase {protected var statusCode: Int = -1def build(): HBasecase class PoolCon(var available: Boolean, con: Connection) {def out = {available = falsethis}def in = available = true}def initPool(): HBasedef finish(): Executor
}

MyHBase

package hbase.implimport hbase.{Executor, HBase}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptorBuilder, ConnectionFactory, Put, TableDescriptorBuilder}
import org.apache.hadoop.hbase.exceptions.HBaseException
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}import java.util
import scala.collection.mutable.ArrayBufferclass MyHBase (conf: Map[String, String])(pooled: Boolean = false, poolSize: Int = 3) extends HBase{private lazy val config: Configuration = HBaseConfiguration.create()private lazy val pool: ArrayBuffer[PoolCon] = ArrayBuffer()override def build(): HBase = {if(statusCode == -1){conf.foreach(t => config.set(t._1, t._2))statusCode = 0this}else{throw new HBaseException("build() function must be invoked first")}}override def initPool(): HBase = {if(statusCode == 0){val POOL_SIZE = if (pooled) {if (poolSize <= 0) 3 else poolSize} else 1for (i <- 1 to POOL_SIZE) {pool.append(PoolCon(available = true, ConnectionFactory.createConnection(config)))}statusCode = 1this}else{throw new HBaseException("initPool() function must be invoked only after build()")}}override def finish(): Executor = {if (statusCode == 1) {statusCode = 2new Executor {override def exists(tableName: String): Boolean = {var pc: PoolCon = nulltry{pc = getConval exists = pc.con.getAdmin.tableExists(TableName.valueOf(tableName))pc.inexists}catch {case e: Exception => e.printStackTrace()false}finally {close(pc)}}override def create(tableName: String, columnFamilies: Seq[String]): Boolean = {if (exists(tableName)) {return false}var pc: PoolCon = nulltry {pc = getConval builder: TableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))columnFamilies.foreach(cf => builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf)))pc.con.getAdmin.createTable(builder.build())true} catch {case e: Exception => e.printStackTrace()false} finally {close(pc)}}override def drop(tableName: String): Boolean = {if(!exists(tableName)){return false}var pc: PoolCon = nulltry {pc = getConpc.con.getAdmin.deleteTable(TableName.valueOf(tableName))true} catch {case e: Exception => e.printStackTrace()false} finally {close(pc)}}override def put(tableName: String, data: util.List[Put]): Boolean = {if(!exists(tableName)){return false}var pc: PoolCon = nulltry {pc = getConpc.con.getTable(TableName.valueOf(tableName)).put(data)true} catch {case e: Exception => e.printStackTrace()false} finally {close(pc)}}}}else {throw new HBaseException("finish() function must be invoked only after initPool()")}}private def getCon = {val left: ArrayBuffer[PoolCon] = pool.filter(_.available)if (left.isEmpty) {throw new HBaseException("no available connection")}left.apply(0).out}private def close(con: PoolCon) = {if (null != con) {con.in}}
}object MyHBase{def apply(conf: Map[String, String])(poolSize: Int): MyHBase = new MyHBase(conf)(true, poolSize)
}

Jdbc

package mysql
import java.sql.{Connection, DriverManager, ResultSet, SQLException}
import java.util
object Jdbc {object Result extends Enumeration {val EX = Value(0) val DML = Value(1) val DQL = Value(2) }// 3种结果(异常,DML,DQL)封装case class ResThree(rst: Result.Value) {def to[T <: ResThree]: T = this.asInstanceOf[T]}class Ex(throwable: Throwable) extends ResThree(Result.EX)object Ex {def apply(throwable: Throwable): Ex = new Ex(throwable)}class Dml(affectedRows: Int) extends ResThree(Result.DML) {def update = affectedRows}object Dml {def apply(affectedRows: Int): Dml = new Dml(affectedRows)}class Dql(set: ResultSet) extends ResThree(Result.DQL) {def generate[T](f: ResultSet => T) = {val list: util.List[T] = new util.ArrayList()while (set.next()) {list.add(f(set))}list}}object Dql {def apply(set: ResultSet): Dql = new Dql(set)}// JDBC 函数封装def jdbc(url: String, user: String, password: String)(sql: String, params: Seq[Any] = null): ResThree = {def con() = {// 1.1 显式加载 JDBC 驱动程序(只需要一次)Class.forName("com.mysql.cj.jdbc.Driver")// 1.2 创建连接对象DriverManager.getConnection(url, user, password)}def pst(con: Connection) = {// 2.1 创建执行对象val pst = con.prepareStatement(sql)// 2.2 初始化 SQL 参数if (null != params && params.nonEmpty) {params.zipWithIndex.foreach(t => pst.setObject(t._2 + 1, t._1))}pst}try {val connect = con()val prepared = pst(connect)sql match {case sql if sql.matches("^(insert|INSERT|delete|DELETE|update|UPDATE) .*")=> Dml(prepared.executeUpdate())case sql if sql.matches("^(select|SELECT) .*")=> Dql(prepared.executeQuery())case _ => Ex(new SQLException(s"illegal sql command : $sql"))}} catch {case e: Exception => Ex(e)}}}

Test

import hbase.impl.MyHBase
import mysql.Jdbc._
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import java.utilobject Test {def main(args: Array[String]): Unit = {// 初始化MySQL JDBC操作函数val jdbcOpr: (String, Seq[Any]) => ResThree = jdbc(user = "root",url = "jdbc:mysql://localhost:3306/test_db_for_bigdata",password = "123456")// 执行SQL查询,并将结果封装在ResThree对象中val toEntity: ResThree = jdbcOpr("select * from test_table_for_hbase where test_id between ? and ?",Seq(2, 4))// 判断ResThree对象中的结果是否为异常if (toEntity.rst == Result.EX) {// 如果异常,执行异常结果处理toEntity.to[Ex]println("出现异常结果处理")} else {// 如果没有异常,将查询结果转换为HBase的Put对象列表val puts: util.List[Put] = toEntity.to[Dql].generate(rst => {// 创建一个Put对象,表示HBase中的一行val put = new Put(Bytes.toBytes(rst.getInt("test_id")), // row key设置为test_idSystem.currentTimeMillis() // 设置时间戳)// 向Put对象中添加列值// baseInfo是列族名,test_name、test_age、test_gender、test_phone是列名put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_name"),Bytes.toBytes(rst.getString("test_name")))put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_age"),Bytes.toBytes(rst.getString("test_age")) // 注意:这里假设test_age是字符串类型,但通常应为整数类型)put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_gender"),Bytes.toBytes(rst.getString("test_gender")))put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_phone"),Bytes.toBytes(rst.getString("test_phone")))// 返回构建好的Put对象put})// 如果有数据需要插入HBaseif (puts.size() > 0) {// 初始化HBase连接池并执行Put操作val exe = MyHBase(Map("hbase.zookeeper.quorum" -> "single01:2181"))(1).build().initPool().finish()// 执行Put操作,并返回是否成功val bool = exe.put("hbase_test:tranfer_from_mysql", puts)// 打印操作结果println(bool)} else {// 如果没有数据需要插入println("查无数据")}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/344572.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从0到1:企业办公审批小程序开发笔记

可行性分析 企业办公审批小程序&#xff0c;适合各大公司&#xff0c;企业&#xff0c;机关部门办公审批流程&#xff0c;适用于请假审批&#xff0c;报销审批&#xff0c;外出审批&#xff0c;合同审批&#xff0c;采购审批&#xff0c;入职审批&#xff0c;其他审批等规划化…

使用 stress 命令进行Linux CPU 压力测试

大家好&#xff0c;在现代计算机系统中&#xff0c;对系统性能和稳定性的评估是至关重要的。特别是在服务器环境中&#xff0c;我们需要确保系统能够在高负载情况下稳定运行&#xff0c;以满足用户的需求。而 CPU 是系统中最关键的组件之一&#xff0c;其性能直接影响着整个系统…

用 DataGridView 控件显示数据

使用DataGridView&#xff0c;可以很方便显示数据。 &#xff08;1&#xff09;Visual Studio版本&#xff1a;Visual Studio 2022 &#xff08;2&#xff09;应用程序类型&#xff1a;windows form &#xff08;3&#xff09;编程语言&#xff1a;C# 一、目标框架 .NET Fra…

【NI国产替代】高速数据采集模块,最大采样率为 125 Msps,支持 FPGA 定制化

• 双通道高精度数据采集 • 支持 FPGA 定制化 • 双通道高精度采样率 最大采样率为 125 Msps12 位 ADC 分辨率 最大输入电压为 0.9 V -3 dB 带宽为 30 MHz 支持 FPGA 定制化 根据需求编程实现特定功能和性能通过定制 FPGA 实现硬件加速&#xff0c;提高系统的运算速度FPGA…

Docker中搭建likeadmin

一、使用Docker中的docker-compose搭建likeadmin 1.去网址&#xff1a;https://gitee.com/likeadmin/likeadmin_php中下载likeadmin 注册一个giee账号后 点那个克隆下载 按照序号在终端复制粘贴进去。 接着&#xff0c;输入ls 可以发现有一个这个&#xff1a; 里面有一个like…

服务器数据恢复—服务器raid5上层zfs文件系统数据恢复案例

服务器数据恢复环境&故障&#xff1a; 一台某品牌X3650M3服务器&#xff0c;服务器中有一组raid5磁盘阵列&#xff0c;上层采用zfs文件系统。 服务器未知原因崩溃&#xff0c;工作人员排查故障后发现服务器的raid5阵列中有两块硬盘离线导致该阵列不可用&#xff0c;服务器内…

Cell-在十字花科植物中年生和多次开花多年生开花行为的互相转化-文献精读21

Reciprocal conversion between annual and polycarpic perennial flowering behavior in the Brassicaceae 在十字花科植物中年生和多次开花多年生开花行为的互相转化 亮点 喜马拉雅须弥芥 和 内华达糖芥 是两个多年生植物模型 MADS-box 基因的剂量效应决定了一年生、二年生…

NodeJs实现脚本:将xlxs文件输出到json文件中

文章目录 前期工作和依赖笔记功能代码输出 最近有一个功能&#xff0c;将json文件里的内容抽取到一个xlxs中&#xff0c;然后维护xlxs文件。当要更新json文件时&#xff0c;就更新xlxs的内容并把它传回json中。这个脚本主要使用NodeJS写。 以下是完成此功能时做的一些笔记。 …

Oracle EBS AP发票创建会计科目错误:子分类帐日记帐分录未按输入币种进行平衡

系统版本 RDBMS : 12.1.0.2.0 Oracle Applications : 12.2.6 问题症状: 提交“创建会计科目”请求提示错误信息如下: 中文报错: 该子分类帐日记帐分录未按输入币种进行平衡。请检查日记帐分录行中输入的金额。 英文报错:The subledger journal entry does not balance i…

11 IP协议 - IP协议头部

什么是 IP 协议 IP&#xff08;Internet Protocol&#xff09;是一种网络通信协议&#xff0c;它是互联网的核心协议之一&#xff0c;负责在计算机网络中路由数据包&#xff0c;使数据能够在不同设备之间进行有效的传输。IP协议的主要作用包括寻址、分组、路由和转发数据包&am…

【Python教程】1-注释、变量、标识符与基本操作

在整理自己的笔记的时候发现了当年学习python时候整理的笔记&#xff0c;稍微整理一下&#xff0c;分享出来&#xff0c;方便记录和查看吧。个人觉得如果想简单了解一名语言或者技术&#xff0c;最简单的方式就是通过菜鸟教程去学习一下。今后会从python开始重新更新&#xff0…

使用OpenCV dnn c++加载YOLOv8生成的onnx文件进行实例分割

在网上下载了60多幅包含西瓜和冬瓜的图像组成melon数据集&#xff0c;使用 EISeg 工具进行标注&#xff0c;然后使用 eiseg2yolov8 脚本将.json文件转换成YOLOv8支持的.txt文件&#xff0c;并自动生成YOLOv8支持的目录结构&#xff0c;包括melon.yaml文件&#xff0c;其内容如下…

【UML用户指南】-05-对基本结构建模-类

目录 1、名称&#xff08;name&#xff09; 2、属性 &#xff08;attribute&#xff09; 3、操作&#xff08;operation&#xff09; 4、对属性和操作的组织 4.1、衍型 4.2、职责 &#xff08;responsibility&#xff09; 4.3、其他特征 4.4、对简单类型建模 5、结构良…

【Mtk Camera开发学习】06 MTK 和 Qcom 平台支持通过 Camera 标准API 打开 USBCamera

本专栏内容针对 “知识星球”成员免费&#xff0c;欢迎关注公众号&#xff1a;小驰行动派&#xff0c;加入知识星球。 #MTK Camera开发学习系列 #小驰私房菜 Google 官方介绍文档&#xff1a; https://source.android.google.cn/docs/core/camera/external-usb-cameras?hlzh-…

【传知代码】DETR[端到端目标检测](论文复现)

前言&#xff1a;想象一下&#xff0c;当自动驾驶汽车行驶在繁忙的街道上&#xff0c;DETR能够实时识别出道路上的行人、车辆、交通标志等目标&#xff0c;并准确预测出它们的位置和轨迹。这对于提高自动驾驶的安全性、减少交通事故具有重要意义。同样&#xff0c;在安防监控、…

Proxyman 现代直观的 HTTP 调试代理应用程序

Proxyman 是一款现代而直观的 HTTP 调试代理应用程序&#xff0c;它的功能强大&#xff0c;使您可以轻松捕获、检查和操作 HTTP(s) 流量。不再让繁杂的网络调试工具阻碍您的工作&#xff0c;使用 Proxyman&#xff0c;您将轻松应对网络调试的挑战。 下载地址&#xff1a;https…

BeagleBone Black入门总结

文章目录 参考连接重要路径系统镜像下载访问 BeagleBone 参考连接 镜像下载启动系统制作&#xff1a;SD卡烧录工具入门书籍推荐&#xff1a;BeagleBone cookbookBeagleBon cookbook 例程BeagleBone概况&#xff1f;BeagleBone 官方管理仓库(原理图&#xff0c;官方例程。。。)…

IP纯净度是什么,对用户有多么重要?

在网络应用和数据采集等领域&#xff0c;代理IP被广泛使用&#xff0c;而代理IP的纯净度则直接影响其性能和可用性。代理IP的纯净度主要涉及到代理IP在网络传输过程中的稳定性、匿名性和安全性。今天就带大家一起了解代理IP纯净度对用户的重要性。 第一&#xff0c;保护用户的隐…

http和https数据传输与协议区分

目录 1. 数据传输安全性2. 端口号3. URL 前缀4. SSL/TLS 证书5. 性能6. SEO 和用户信任7. 应用场景总结 HTTP&#xff08;HyperText Transfer Protocol&#xff09;和 HTTPS&#xff08;HyperText Transfer Protocol Secure&#xff09;是用于在客户端&#xff08;如浏览器&…

PyCharm中 Fitten Code插件的使用说明一

一. 简介 Fitten Code插件是是一款由非十大模型驱动的 AI 编程助手&#xff0c;它可以自动生成代码&#xff0c;提升开发效率&#xff0c;帮您调试 Bug&#xff0c;节省您的时间&#xff0c;另外还可以对话聊天&#xff0c;解决您编程碰到的问题。 前一篇文章学习了 PyCharm…