ClickHouse10-ClickHouse中Kafka表引擎

Kafka表引擎也是一种常见的表引擎,在很多大数据量的场景下,会从源通过Kafka将数据输送到ClickHouse,Kafka作为输送的方式,ClickHouse作为存储引擎与查询引擎,大数据量的数据可以得到快速的、高压缩的存储。
在这里插入图片描述

Kafka大家肯定不陌生:

  • 它可以用于发布和订阅数据流,是常见的队列使用方式
  • 它可以组织容错存储,是常见的容错存储的使用方式
  • 它可以在流可用时对其进行处理,是常见的大数据处理的使用方式

全文概览:

  • 基本语法
  • 从 Kafka 写入到 ClickHouse
  • 从 ClickHouse 写入到 Kafka
    • 测试1:queue->ck->queue
    • 测试2:ck->queue

基本语法

分为定义表结构和定义Kafka的接入参数,Kafka的接入参数都是常见的字段

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(name1 [type1] [ALIAS expr1],name2 [type2] [ALIAS expr2],...
) ENGINE = Kafka()
SETTINGSkafka_broker_list = 'host:port',kafka_topic_list = 'topic1,topic2,...',kafka_group_name = 'group_name',kafka_format = 'data_format'[,][kafka_schema = '',][kafka_num_consumers = N,][kafka_max_block_size = 0,][kafka_skip_broken_messages = N,][kafka_commit_every_batch = 0,][kafka_client_id = '',][kafka_poll_timeout_ms = 0,][kafka_poll_max_batch_size = 0,][kafka_flush_interval_ms = 0,][kafka_thread_per_consumer = 0,][kafka_handle_error_mode = 'default',][kafka_commit_on_select = false,][kafka_max_rows_per_message = 1];

示例:

CREATE TABLE IF NOT EXISTS test_ck_sync1
(`sys_time` Datetime COMMENT '',`num` UInt32 COMMENT ''
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'test_ck_sync1', kafka_group_name = 'ck_test_ck_sync1', kafka_format = 'CSV', kafka_max_block_size = 200000, kafka_skip_broken_messages = 1000, kafka_row_delimiter = '\n', format_csv_delimiter = '|'

从 Kafka 写入到 ClickHouse

创建topic:

bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic test_ck_sync1

创建同步表:

CREATE TABLE IF NOT EXISTS test_ck_sync1
(`sys_time` Datetime COMMENT '',`num` UInt32 COMMENT ''
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'test_ck_sync1', kafka_group_name = 'ck_test_ck_sync1', kafka_format = 'CSV', kafka_max_block_size = 200000, kafka_skip_broken_messages = 1000, kafka_row_delimiter = '\n', format_csv_delimiter = '|'CREATE TABLE IF NOT EXISTS test_ck_sync1_res
(`sys_time` Datetime COMMENT '',`num` UInt32 COMMENT ''
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(sys_time)
ORDER BY tuple()

创建物化视图,进行数据样式的转换:

CREATE MATERIALIZED VIEW test_ck_sync1_mv TO test_ck_sync1_res AS
SELECTsys_time,num
FROM test_ck_sync1

通过console写入数据:

[$ kafka_2.13-3.6.1]# bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test_ck_sync1
>2024-01-01 00:00:01|89  

验证数据:

$ :) select * from test_ck_sync1_res;SELECT *
FROM test_ck_sync1_resQuery id: a666f893-5be9-4022-9327-3a1507aa5485┌────────────sys_time─┬─num─┐
│ 2024-01-01 00:00:01 │  89 │
└─────────────────────┴─────┘
┌────────────sys_time─┬─num─┐
│ 2024-01-01 00:00:00 │  88 │
└─────────────────────┴─────┘2 rows in set. Elapsed: 0.049 sec.

从 ClickHouse 写入到 Kafka

kafka_writers_reader --(view)--> kafka_writers_queue ---> 

创建一个队列:

bin/kafka-topics.sh --topic kafka_writers --create -bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1

创建同步表:

CREATE TABLE kafka_writers_reader (     `id` Int,     `platForm` String,     `appname` String,     `time` DateTime ) 
ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'kafka_writers_reader', kafka_group_name = 'kafka_writers_reader_group', kafka_format = 'CSV';CREATE TABLE kafka_writers_queue (     id Int,     platForm String,     appname String,     time DateTime ) 
ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092',        kafka_topic_list = 'kafka_writers',        kafka_group_name = 'kafka_writers_group',        kafka_format = 'CSV',       kafka_max_block_size = 1048576;

测试1:queue->ck->queue

通过写入队列kafka_writers_reader,借助ClickHouse写入队列kafka_writers

bin/kafka-topics.sh --topic kafka_writers_reader --create -bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic kafka_writers_readerbin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kafka_writers

测试2:ck->queue

通过写入表kafka_writers_reader,写入队列kafka_writers

$ :) INSERT INTO kafka_writers_reader (id, platForm, appname, time) 
VALUES (8,'Data','Test','2020-12-23 14:45:31'), 
(9,'Plan','Test1','2020-12-23 14:47:32'), 
(10,'Plan','Test2','2020-12-23 14:52:15'), 
(11,'Data','Test3','2020-12-23 14:54:39');INSERT INTO kafka_writers_reader (id, platForm, appname, time) FORMAT ValuesQuery id: 223a63ab-97fa-488d-8ea7-c2e194155d26Ok.4 rows in set. Elapsed: 1.054 sec. 
[$ kafka_2.13-3.6.1]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kafka_writers
8,"Data","Test","1970-01-01 08:00:00"9,"Plan","Test1","1970-01-01 08:00:00"10,"Plan","Test2","1970-01-01 08:00:00"11,"Data","Test3","1970-01-01 08:00:00"

如果喜欢我的文章的话,可以去GitHub上给一个免费的关注吗?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/292306.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS(四)---【链接美化、浮动布局、三种定位】

零.前言 本篇主要讲解<a>标签链接美化、页面的浮动布局&#xff0c;以及“相对定位”、“绝对定位”、“固定定位”三种定位。 关于其它请查看作者其它文章&#xff1a; CSS(一)---【CSS简介、导入方式、八种选择器、优先级】-CSDN博客 CSS(二)---【常见属性、复合属…

鸿蒙OS开发实例:【窥探网络请求】

HarmonyOS 平台中使用网络请求&#xff0c;需要引入 "ohos.net.http", 并且需要在 module.json5 文件中申请网络权限, 即 “ohos.permission.INTERNET” 本篇文章将尝试使用 ohos.net.http 来实现网络请求 场景设定 WeiBo UniDemo HuaWei : 请求顺序WeiBo1 UniDem…

Python之Opencv教程(3):人脸识别

1、人脸识别代码 直接上代码&#xff1a; import cv2# 加载训练数据集文件 recogizer cv2.face.LBPHFaceRecognizer_create()recogizer.read(trainer/trainer.yml)# 准备识别的图片 img cv2.imread(images/lisa.jpg) gray cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)face_dete…

Stata 15 for Mac:数据统计分析新标杆,让研究更高效!

Stata 是一种统计分析软件&#xff0c;适用于数据管理、数据分析和绘图。Stata 15 for Mac 具有以下功能&#xff1a; 数据管理&#xff1a;Stata 提供强大的数据管理功能&#xff0c;用户可以轻松导入、清洗、整理和管理数据集。 统计分析&#xff1a;Stata 提供了广泛的统计…

sqli第24关二次注入

注入点 # Validating the user input........$username $_SESSION["username"];$curr_pass mysql_real_escape_string($_POST[current_password]);$pass mysql_real_escape_string($_POST[password]);$re_pass mysql_real_escape_string($_POST[re_password]);if($p…

算法学习——LeetCode力扣动态规划篇5

算法学习——LeetCode力扣动态规划篇5 198. 打家劫舍 198. 打家劫舍 - 力扣&#xff08;LeetCode&#xff09; 描述 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统…

Android MediaPlayer

MediaPlayer 类是媒体框架最重要的组成部分之一。此类的对象能够获取、解码以及播放音频和视频&#xff0c;而且只需极少量设置。它支持多种不同的媒体源&#xff0c;例如&#xff1a; • 本地资源 • 内部 URI&#xff0c;例如您可能从内容解析器那获取的 URI • 外部网址…

光明源@智慧厕所公厕软件系统有哪些核心功能?

在现代城市的建设中&#xff0c;智慧公厕的建设成为了提升城市品质和居民生活质量的重要举措。而智慧公厕的核心&#xff0c;不仅仅在于其硬件设备的智能化&#xff0c;同样重要的是其背后支持的智慧厕所公厕软件系统。让我们一起探讨&#xff0c;智慧厕所公厕软件系统有哪些核…

C语言-编译和链接

目录 1.前言2.编译2.1预处理&#xff08;预编译&#xff09;2.1.1 #define 定义常量2.1.2 #define 定义宏2.1.3带有副作用的宏参数2.1.4宏替换规则2.1.5 #和##2.1.5.1 #运算符2.1.5.2 ## 运算符 2.1.6 命名约定2.1.7 #undef2.1.8 条件编译2.1.9 头文件的包含2.1.9.1 本地文件包…

ubuntu+clangd+vscode 实现项目代码快速跳转(如: Linux 内核源码)

1. 准备工作 虚拟机 ubuntu 环境&#xff0c;笔者用的是 ubuntu20.04。windows 安装好 vscode 软件。 2. 配置过程 2.1 vscode远程连接 ubuntu ubuntu 虚拟机开启 ssh 服务 sudo apt install openssh-server sudo service ssh startvscode 安装 remote-ssh 插件 vscode 远…

awesome-cheatsheets:超级速查表 - 编程语言、框架和开发工具的速查表

awesome-cheatsheets&#xff1a;超级速查表 - 编程语言、框架和开发工具的速查表&#xff0c;单个文件包含一切你需要知道的东西 官网&#xff1a;GitHub - skywind3000/awesome-cheatsheets: 超级速查表 - 编程语言、框架和开发工具的速查表&#xff0c;单个文件包含一切你需…

java Web 疫苗预约管理系统用eclipse定制开发mysql数据库BS模式java编程jdbc

一、源码特点 JSP 疫苗预约管理系统是一套完善的web设计系统&#xff0c;对理解JSP java 编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,eclipse开发&#xff0c;数据库为Mysql5.0&#xff0c;使…

小狐狸ChatGPT付费AI创作系统V2.8.0独立版 + H5端 + 小程序前端

狐狸GPT付费体验系统的开发基于国外很火的ChatGPT&#xff0c;这是一种基于人工智能技术的问答系统&#xff0c;可以实现智能回答用户提出的问题。相比传统的问答系统&#xff0c;ChatGPT可以更加准确地理解用户的意图&#xff0c;提供更加精准的答案。同时&#xff0c;小狐狸G…

【jenkins+cmake+svn管理c++项目】jenkins回传文件到svn(windows)

书接上文&#xff1a;创建一个项目 在经过cmakemsbuild顺利生成动态库之后&#xff0c;考虑到我一个项目可能会生成多个动态库&#xff0c;它们分散在build内的不同文件夹&#xff0c;我希望能将它们收拢到一个文件夹下&#xff0c;并将其回传到svn。 一、动态库移位—cmake实…

H5抓包——Android 使用电脑浏览器 DevTools调试WebView

H5抓包——Android 使用电脑浏览器 DevTools调试WebView 一、使用步骤 1、电脑通过数据线连接手机&#xff0c;开启USB调试&#xff08;打开手机开发者选项&#xff09; 2、打开待调试的H5 App&#xff0c;进入H5界面 3、打开电脑浏览器&#xff0c;调试界面入口 如果用ed…

linux命令之tput

1.tput介绍 linux命令tput是可以在终端中进行文本和颜色的控制和格式化&#xff0c;其是一个非常有用的命令 2.tput用法 命令&#xff1a; man tput 3.样例 3.1.清除屏幕 命令&#xff1a; tput clear [rootelasticsearch ~]# tput clear [rootelasticsearch ~]# 3.2.…

C#/BS手麻系统源码 手术麻醉管理系统源码 商业项目源码

C#/BS手麻系统源码 手术麻醉管理系统源码 商业项目源码 手麻系统从麻醉医生实际工作环境和流程需求方面设计&#xff0c;与HIS&#xff0c;LIS&#xff0c;PACS&#xff0c;EMR无缝连接&#xff0c;方便查看患者的信息;实现术前、术中、术后手术麻醉信息全记录;减少麻醉医师在…

AI时代-普通人的AI绘画工具对比(Midjouney与Stable Diffusion)

AI时代-普通人的AI绘画工具对比&#xff08;Midjouney与Stable Diffusion&#xff09; 前言1、基础对比Stable Diffusion&#xff08;SD&#xff09;SD界面安装与使用SD Midjouney&#xff08;MJ&#xff09; 2、硬件与运行要求对比Stable Diffusion硬件要求内存硬盘显卡 Midjo…

MySQL开窗函数

测试环境&#xff1a;mysql8.0.18 官方文档&#xff1a;https://dev.mysql.com/doc/refman/8.0/en/window-functions.html 一、窗口函数介绍二、语法结构三、自定义窗口1.rows&#xff08;重点&#xff09;2.range3.默认窗口 四、常用窗口函数示例1.row_number & rank &…

开源推荐榜【Taichi 专为高性能计算机图形学设计的编程语言】

Taichi是一个高性能的并行编程语言&#xff0c;它被嵌入在Python中&#xff0c;使得开发者能够轻松编写可移植的、高性能的并行程序。这个库的核心优势在于它能够将计算密集型的Python代码在运行时通过即时编译器(Just-In-Time, JIT)转换成快速的机器代码&#xff0c;从而加速P…