ClickHouse--11--ClickHouse API操作

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 1.Java 读写 ClickHouse API
    • 1.1 首先需要加入 maven 依赖
    • 1.2 Java 读取 ClickHouse 集群表数据
        • JDBC--01--简介
      • ClickHouse java代码
    • 1.3 Java 向 ClickHouse 表中写入数据
  • 2.Spark 写入 ClickHouse API
    • 2.1 导入依赖
    • 2.2 代码编写
  • 3.Flink 写入 ClickHouse API
    • 3.1 Flink 1.10.x 之前版本使用 flink-jdbc,只支持 Table API
    • 3.2 Flink 1.11.x 之后版本使用 flink-connector-jdbc,只支持DataStream API


1.Java 读写 ClickHouse API

1.1 首先需要加入 maven 依赖

<!-- 连接 ClickHouse 需要驱动包-->
<dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2.4</version>
</dependency>

1.2 Java 读取 ClickHouse 集群表数据

JDBC–01–简介

在这里插入图片描述

public class Test01 {public static void main(String[] args) throws Exception {//1.注册数据库驱动Class.forName("com.mysql.jdbc.Driver");//2.获取数据库连接Connection conn = DriverManager.getConnection( "jdbc:mysql://localhost:3306/jt_db?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8","root", "root");//3.获取传输器Statement stat = conn.createStatement();//4.发送SQL到服务器执行并返回执行结果String sql = "select * from account";ResultSet rs = stat.executeQuery( sql );//5.处理结果while( rs.next() ) {int id = rs.getInt("id");String name = rs.getString("name");double money = rs.getDouble("money");System.out.println(id+" : "+name+" : "+money);}//6.释放资源rs.close();stat.close();conn.close();System.out.println("TestJdbc.main()....");}}

ClickHouse java代码


import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.settings.ClickHouseProperties;import java.sql.ResultSet;
import java.sql.SQLException;public class test01 {public static void main(String[] args) throws SQLException {ClickHouseProperties props = new ClickHouseProperties();props.setUser("default");props.setPassword("");//1.注册数据库驱动配置BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node1:8123,node2:8123,node3:8123/default", props);//2.获取数据库连接ClickHouseConnection conn = dataSource.getConnection();//3.获取传输器ClickHouseStatement statement = conn.createStatement();//4.发送SQL到服务器执行并ResultSet rs = statement.executeQuery("select id,name,age from test");//5.处理结果while (rs.next()) {int id = rs.getInt("id");String name = rs.getString("name");int age = rs.getInt("age");System.out.println("id = " + id + ",name = " + name + ",age = " + age);}//6.释放资源conn.close();statement.close();rs.close();}
}

1.3 Java 向 ClickHouse 表中写入数据

package com.cy.demo;import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.settings.ClickHouseProperties;import java.sql.ResultSet;
import java.sql.SQLException;public class test01 {public static void main(String[] args) throws SQLException {ClickHouseProperties props = new ClickHouseProperties();props.setUser("default");props.setPassword("");//1.注册数据库驱动配置BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node1:8123/default", props);//2.获取数据库连接ClickHouseConnection conn = dataSource.getConnection();//3.获取传输器ClickHouseStatement statement = conn.createStatement();//4.发送SQL到服务器执行并statement.execute("insert into test values (100,'王五',30)");//可以拼接批量插入多条//6.释放资源conn.close();statement.close();rs.close();}
}

在这里插入图片描述

2.Spark 写入 ClickHouse API

  • SparkCore 写入 ClickHouse,可以直接采用写入方式。下面案例是使用 SparkSQL 将结果存入 ClickHouse对应的表中。在 ClickHouse 中需要预先创建好对应的结果表

2.1 导入依赖

        <!-- 连接 ClickHouse 需要驱动包--><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2.4</version><!-- 去除与 Spark 冲突的包 --><exclusions><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></exclusion><exclusion><groupId>net.jpountz.lz4</groupId><artifactId>lz4</artifactId></exclusion></exclusions></dependency><!-- Spark-core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.3.1</version></dependency><!-- SparkSQL --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.3.1</version></dependency><!-- SparkSQL ON Hive--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.3.1</version></dependency>

2.2 代码编写

val session: SparkSession =
SparkSession.builder().master("local").appName("test").getOrCreate()
val jsonList = List[String](
"{\"id\":1,\"name\":\"张三\",\"age\":18}",
"{\"id\":2,\"name\":\"李四\",\"age\":19}",
"{\"id\":3,\"name\":\"王五\",\"age\":20}"
)
//将 jsonList 数据转换成 DataSet
import session.implicits._
val ds: Dataset[String] = jsonList.toDS()
val df: DataFrame = session.read.json(ds)
df.show()
//将结果写往 ClickHouse
val url = "jdbc:clickhouse://node1:8123/default"
val table = "test"
val properties = new Properties()
properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
properties.put("user", "default")
properties.put("password", "")
properties.put("socket_timeout", "300000")
df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url,
table, properties)

3.Flink 写入 ClickHouse API

  • 可以通过 Flink 原生 JDBC Connector 包将 Flink 结果写入 ClickHouse 中,Flink 在1.11.0 版本对其 JDBC Connnector 进行了重构:

在这里插入图片描述

3.1 Flink 1.10.x 之前版本使用 flink-jdbc,只支持 Table API

  1. maven 中需要导入以下包:
<!--添加 Flink Table API 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.9.1</version>
</dependency>
<!--添加 Flink JDBC 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
  1. 代码:
/**
* 通过 flink-jdbc API 将 Flink 数据结果写入到 ClickHouse 中,只支持 Table API
*
* 注意:
* 1.由于 ClickHouse 单次插入的延迟比较高,我们需要设置 BatchSize 来批量插入数据,提高性能。
* 2.在 JDBCAppendTableSink 的实现中,若最后一批数据的数目不足 BatchSize,则不会插入剩余数
据。
*/
case class PersonInfo(id:Int,name:String,age:Int)
object FlinkWriteToClickHouse1 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为 1,后期每个并行度满批次需要的条数时,会插入 click 中
env.setParallelism(1)
val settings: EnvironmentSettings =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)
//导入隐式转换
import org.apache.flink.streaming.api.scala._
//读取 Socket 中的数据
val sourceDS: DataStream[String] = env.socketTextStream("node5",9999)
val ds: DataStream[PersonInfo] = sourceDS.map(line => {
val arr: Array[String] = line.split(",")
PersonInfo(arr(0).toInt, arr(1), arr(2).toInt)
})
//将 ds 转换成 table 对象
import org.apache.flink.table.api.scala._
val table: Table = tableEnv.fromDataStream(ds,'id,'name,'age)
//将 table 对象写入 ClickHouse 中
//需要在 ClickHouse 中创建表:create table flink_result(id Int,name String,age Int) engine =
MergeTree() order by id;
val insertIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"
//准备 ClickHouse table sink
val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
.setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
.setDBUrl("jdbc:clickhouse://node1:8123/default")
.setUsername("default")
.setPassword("")
.setQuery(insertIntoCkSql)
.setBatchSize(2) //设置批次量,默认 5000 条
.setParameterTypes(Types.INT, Types.STRING, Types.INT)
.build()
//注册 ClickHouse table Sink,设置 sink 数据的字段及 Schema 信息
tableEnv.registerTableSink("ck-sink",
sink.configure(Array("id", "name", "age"),Array(Types.INT, Types.STRING, Types.INT)))
//将数据插入到 ClickHouse Sink 中
tableEnv.insertInto(table,"ck-sink")
//触发以上执行
env.execute("Flink Table API to ClickHouse Example")
}
}

3.2 Flink 1.11.x 之后版本使用 flink-connector-jdbc,只支持DataStream API

  1. 在 Maven 中导入以下依赖包
<!-- Flink1.11 后需要 Flink-client 包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<!--添加 Flink Table API 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.11.3</version>
</dependency>
<!--添加 Flink JDBC Connector 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
  1. 代码
/**
* Flink 通过 flink-connector-jdbc 将数据写入 ClickHouse ,目前只支持 DataStream API
*/
object FlinkWriteToClickHouse2 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为 1
env.setParallelism(1)
import org.apache.flink.streaming.api.scala._
val ds: DataStream[String] = env.socketTextStream("node5",9999)
val result: DataStream[(Int, String, Int)] = ds.map(line => {
val arr: Array[String] = line.split(",")
(arr(0).toInt, arr(1), arr(2).toInt)
})
//准备向 ClickHouse 中插入数据的 sql
val insetIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"
//设置 ClickHouse Sink
val ckSink: SinkFunction[(Int, String, Int)] = JdbcSink.sink(
//插入数据 SQL
insetIntoCkSql,
//设置插入 ClickHouse 数据的参数
new JdbcStatementBuilder[(Int, String, Int)] {
override def accept(ps: PreparedStatement, tp: (Int, String, Int)): Unit = {
ps.setInt(1, tp._1)
ps.setString(2, tp._2)
ps.setInt(3, tp._3)
}
},
//设置批次插入数据
new JdbcExecutionOptions.Builder().withBatchSize(5).build(),
//设置连接 ClickHouse 的配置
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUrl("jdbc:clickhouse://node1:8123/default")
.withUsername("default")
.withUsername("")
.build()
)
//针对数据加入 sink
result.addSink(ckSink)
env.execute("Flink DataStream to ClickHouse Example")
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/258522.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

云计算基础-存储虚拟化(深信服aSAN分布式存储)

什么是存储虚拟化 分布式存储是利用虚拟化技术 “池化”集群存储卷内通用X86服务器中的本地硬盘&#xff0c;实现服务器存储资源的统一整合、管理及调度&#xff0c;最终向上层提供NFS、ISCSI存储接口&#xff0c;供虚拟机根据自身的存储需求自由分配使用资源池中的存储空间。…

数据库数据加密的 4 种常见思路的对比

应用层加解密方案数据库前置处理方案磁盘存取环节&#xff1a;透明数据加密DB 后置处理 最近由于工作需要&#xff0c;我对欧洲的通用数据保护条例做了调研和学习&#xff0c;其中有非常重要的一点&#xff0c;也是常识性的一条&#xff0c;就是需要对用户的个人隐私数据做好加…

【C基础刷题】第九讲

本系列博客为个人刷题思路分享&#xff0c;有需要借鉴即可。 1.目录大纲&#xff1a; 2.题目链接&#xff1a; 统计成绩 00&#xff1a;00&#xff1a;00⸺00&#xff1a;09&#xff1a;00题号&#xff1a;BC33 链接&#xff1a;https://www.nowcoder.com/practice/ cad8d94…

【plt.scatter绘制散点图】:从入门到精通,只需一篇文章!【Matplotlib】

【plt.scatter绘制散点图】&#xff1a;从入门到精通&#xff0c;只需一篇文章&#xff01;【Matplotlib】&#xff01;&#x1f680; 利用Matplotlib进行数据可视化示例 &#x1f335;文章目录&#x1f335; 一、plt.scatter入门&#xff1a;轻松迈出第一步 &#x1f463;二、…

牛客网SQL进阶128:未完成试卷数大于1的有效用户

官网链接&#xff1a; 未完成试卷数大于1的有效用户_牛客题霸_牛客网现有试卷作答记录表exam_record&#xff08;uid用户ID, exam_id试卷ID, st。题目来自【牛客题霸】https://www.nowcoder.com/practice/46cb7a33f7204f3ba7f6536d2fc04286?tpId240&tqId2183007&ru%2…

探讨深度学习

深度学习 深度学习概述进展崛起框架 主页传送门&#xff1a;&#x1f4c0; 传送 深度学习 概述 深度学习是机器学习领域的一个分支&#xff0c;它是一种基于人工神经网络的学习方法&#xff0c;旨在让 计算机模仿人类大脑的神经结构和学习方式&#xff0c;从大量数据中学习并…

C++11---(1)

目录 一、C11简介 二、列表初始化 2.1、{ } 初始化 三、变量类型推导 3.1、auto 3.2、decltype 为什么需要decltype 四、final和override 4.1、final 4.2、override 五、默认成员函数控制 5.1、default修饰函数 5.2、delete修饰函数 六、nullptr 一、C11简介 C11是…

Kubernetes(K8S)集群部署实战

目录 一、准备工作1.1、创建3台虚拟机1.1.1、下载虚拟机管理工具1.1.2、安装虚拟机管理工具1.1.3、下载虚Centos镜像1.1.4、创建台个虚拟机1.1.5、设置虚拟机网络环境 1.2、虚拟机基础配置&#xff08;3台虚拟机进行相同处理&#xff09;1.2.1、配置host1.2.2、关闭防火墙1.2.3…

网络原理-TCP/IP(7)

目录 网络层 路由选择 数据链路层 认识以太网 以太网帧格式 认识MAC地址 对比理解MAC地址和IP地址 认识MTU ARP协议 ARP协议的作用 ARP协议工作流程 重要应用层协议DNS(Domain Name System) DNS背景 NAT技术 NAT IP转换过程 NAPT NAT技术的优缺点 网络层 路由…

ChatGPT绘图指南:DALL.E3玩法大全(一)

一、 DALLE.3 模型介绍 1、什么是 DALLE.3 模型&#xff1f; DALLE-3模型&#xff0c;是一种由OpenAI研发的技术&#xff0c;它是一种先进的生成模型&#xff0c;可以将文字描述转化为清晰的图片。这种模型的名称"DALLE"实际上是"Deep Auto-regressive Latent …

LeetCode LCR 085. 括号生成

题目链接https://leetcode.cn/problems/IDBivT/description/ 正整数 n 代表生成括号的对数&#xff0c;请设计一个函数&#xff0c;用于能够生成所有可能的并且 有效的 括号组合。 class Solution {public List<String> generateParenthesis(int n) {List<String>…

【类与对象 -2】学习类的6个默认成员函数中的构造函数与析构函数

目录 1.类的6个默认成员函数 2.构造函数 2.1概念 2.2特性 3.析构函数 3.1析构函数的概念 3.2特性 1.类的6个默认成员函数 如果一个类中什么成员都没有&#xff0c;简称为空类。 空类中真的什么都没有吗&#xff1f;并不是&#xff0c;任何类在什么都不写时&#xff0c;…

嵌入式——Flash(W25Q64)

目录 一、初识W25Q64 1. 基本认识 2. 引脚介绍 ​编辑 二、W25Q64特性 1. SPI模式 2. 双输出SPI方式 三、状态寄存器 1. BUSY位 2. WEL位 3. BP2、BP1、 BP0位 4. TB位 5. 保留位 6. SRP位 四、常用操作指令 1. 写使能指令&#xff08;06h&#xff09; 2. 写禁…

攻防世界——re2-cpp-is-awesome

64位 我先用虚拟机跑了一下这个程序&#xff0c;结果输出一串字符串flag ——没用 IDA打开后 F5也没有什么可看的 那我们就F12查看字符串找可疑信息 这里一下就看见了 __int64 __fastcall main(int a1, char **a2, char **a3) {char *v3; // rbx__int64 v4; // rax__int64 v…

C语言指针(初阶)

文章目录 1:内存与地址1.1内存1.2:如何理解编址 2:指针变量与地址2.1:指针变量与解引用操作符2.1.1:指针变量2.1.2:如何拆解指针类型2.1.3:解引用操作符 2.2:指针变量的大小 3:指针变量类型的意义代码1解引用修改前解引用修改后 代码2解引用修改前解引用修改后 4:const修饰指针…

Rust 语言学习杂谈 (end) (各种工作中遇到的疑难杂症)

1.在运行 “cargo build --release” 的时候&#xff0c;到底发生了什么&#xff1f; 源 (GPT4.0) : 当我们运行 cargo build --release 命令时&#xff0c;实际上在进行一系列复杂的步骤来编译和构建 Rust 项目的发布版本。这个过程大致可以分解为以下几个步骤&#xff1a;…

杂谈--spconv导出中onnx的扩展阅读

Onnx 使用 Onnx 介绍 Onnx (Open Neural Network Exchange) 的本质是一种 Protobuf 格式文件&#xff0c;通常看到的 .onnx 文件其实就是通过 Protobuf 序列化储存的文件。onnx-ml.proto 通过 protoc (Protobuf 提供的编译程序) 编译得到 onnx-ml.pb.h 和 onnx-ml.pb.cc 或 on…

C#根据权重抽取随机数

&#xff08;游戏中一个很常见的简单功能&#xff0c;比如抽卡抽奖抽道具&#xff0c;或者一个怪物有多种攻击动作&#xff0c;按不同的权重随机出个攻击动作等等……&#xff09; 假如有三种物品 A、B、C&#xff0c;对应的权重分别是A&#xff08;50&#xff09;&#xff0c…

django中查询优化

在Django中&#xff0c;查询优化是一个重要的主题&#xff0c;因为不正确的查询可能会导致性能问题&#xff0c;尤其是在处理大量数据时。以下是一些在Django中进行查询优化的建议&#xff1a; 一&#xff1a;使用select_related和prefetch_related: select_related用于优化一…

论文阅读_语音识别_Wisper

英文名称: Robust Speech Recognition via Large-Scale Weak Supervision 中文名称: 通过大规模弱监督实现鲁棒语音识别 链接: https://proceedings.mlr.press/v202/radford23a.html 代码: https://github.com/openai/whisper 作者: Alec Radford, Jong Wook Kim, Tao Xu, Greg…