背景
目前clickhouse社区对于数据的写入主要基于文件本地表、分布式表方式为主,但缺乏大批量快速写入场景下的数据写入方式,本文提供了一种基于clickhouse local 客户端工具分布式处理hdfs数据表文件,并将clickhouse以文件复制的方式完成写入clickhouse的方法。该方案通过spark程序实现,经测试:
(1)在相同资源下,与传统的写clickhouse基于http/tcp的方式,可提供3倍左右的性能。
(2)传统数据写过程中,clickhouse-server需要处理写入的数据,写性能主要受clickhouse集群网络、cpu、内存限制,无法通过扩展写入客户端端并发来提高写入性能,本方案将数据处理端放在了插入客户端,写入性能理论上可以线性扩展提升。
方案
传统基于http/tcp写方案
目前clickhouse 官方推介3种数据写入方式
- official JDBC driver
- ClickHouse-Native-JDBC
- clickhouse4j
jdbc主要基于如下形式进行:
基于jdbc写入的基本流程是spark集群(分布式计算引擎)读取hdfs文件,转换为dataframe后通过调用clickhouse jdbc方式,将数据写入至clickhouse服务端,clickhouse服务端完成本批次数据的生成。
clickhouse server端具体生成文件的流程为:
(1)ck集群分布式表接收到spark发送的写请求后,会更加分片键进行数据划分,对于数据属于本地分片分片的数据,直接写入本地表。对于数据属于其他分片的数据会先写入至临时目录下。
(2)对于clickhouse集群,数据的读写DDL都是依赖于zookeeper进行的,会将操作的日志写入至zk的/log下,并形成相应的task
(3)当数据写入本地表后,会将操作日志写入zk /log中,集群其他ck节点监听到/log变化后,会触发相应的ck节点拉去log操作并转换成task放入自身对应节点下的/queue中,其他节点将开始拉去该分片数据,并写入自身本地表中,同时对于分片副本原理类似。
(4)完成本次数据的拉取复制后,将移除/queue中对应的task,完成本次数据的写入。
从上面的流程中我们知道,数据的写入都是先将数据写入至分片表的本地然后复制至其他集群节点实现的,因此,clickhouse的分片表所在的机器常常负载比较大;数据的同步依赖zk进行的,zk压力也较大。
基于文件复制写方式
为了解决上述批量数据数据写入场景下的问题,社区提出了一种新的思路,即使用clickhouse local将先将数据处理成clickhouse 文件,完成后直接复制至clickhouse集群,这样大大减轻了clickhouse集群处理数据的压力,同时数据写入性能理论上可以与客户端并发写入线性增长。当前大多数公司使用clickhouse分析的数据不会在原始数据集上进行,常常是数仓加工后的明细数据,通常流程是原始数据集导入至数仓,输出加工处理,处理后的数据导出至clickhouse用于OLAP分析。本文针对这样的场景,提供了一种直接读取数仓加工生成的parquet等文件,使用spark、clickhouse-local分布式处理ck文件格式,并导入至clickhouse中,具体如下图所示:
其中,spark集群中的spark node并不会读写hive表数据,而只是依赖spark分布程序将hive表所在hdfs上的文件分布式的方式下载至yarn node本地机器,然后调用本地机上的clickhouse local 命名,将不同文件格式的文件(parquet\orc\text\csv等)生成为clickhouse 文件块,最后通过直接通过ssh命令的方式将加工处理好的cickhouse数据复制clickhouse集群,并调用clickhouse attach part命令将数据块merge至表中,期间clickhouse表数据的所有处理动作执行端放在了spark node中进行,ck集群只负责数据的接收,大大提高了数据批量写入性能,sparknode具体处理过程如下图所示:
性能测试
经测试,在相同spark资源情况下,基于文件复制写入clickhouse的方式比jdbc方式写入性能有2~3倍的性能提升,且理论上文件复制方式写入可以伴随spark node增加而线性增长,在parquet格式的数据表上,不同写入方式下clickhouse完成时间对比结果如下图所示: