探索ClickHouse——连接Kafka和Clickhouse

安装Kafka

新增用户

sudo adduser kafka
sudo adduser kafka sudo
su -l kafka

安装JDK

sudo apt-get install openjdk-8-jre

下载解压kafka

可以从https://downloads.apache.org/kafka/下找到希望安装的版本。需要注意的是,不要下载路径包含src的包,否则会报“Classpath is empty”之类的错误。

mkdir ~/Downloads
curl "https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz" -o ~/Downloads/kafka.tgz
mkdir ~/kafka && cd ~/kafka
tar -xvzf ~/Downloads/kafka.tgz --strip 1

配置

配置kafka

vim ~/kafka/config/server.properties

将下面这行加入文件的末尾

# ~/kafka/config/server.properties
delete.topic.enable=true

同时修改log的路径

# ~/kafka/config/server.properties
log.dirs=/home/kafka/logs

创建zookeeper service

sudo vim /etc/systemd/system/zookeeper.service

将下面内容填入上述文件中

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal[Install]
WantedBy=multi-user.target

创建kafka service

sudo vim /etc/systemd/system/kafka.service

将下面内容填入上述文件中

[Unit]
Requires=zookeeper.service
After=zookeeper.service[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal[Install]
WantedBy=multi-user.target

启动kafka#

启动服务

sudo systemctl start kafka

查看状态

sudo systemctl status kafka

● kafka.service
Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: enabled)
Active: active (running) since Thu 2023-09-28 03:09:39 UTC; 4s ago
Main PID: 3561758 (sh)
Tasks: 42 (limit: 2143)
Memory: 292.4M
CPU: 2.768s
CGroup: /system.slice/kafka.service
├─3561758 /bin/sh -c “/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1”
└─3561760 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/>
Sep 28 03:09:39 ubuntua systemd[1]: Started kafka.service.

可以看到kafka已经处于running状态。

测试

创建Topic

~/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

发送消息

echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

订阅Topic

新启动一个界面,执行下面命令

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

它会收到上面发的消息

Hello, World

连接

创建表

使用kafka engine将kafka中的流映射到一个表中。我们以《探索ClickHouse——使用Projection加速查询》中的数据为例。

clickhouse-client --stream_like_engine_allow_direct_select 1
CREATE TABLE uk_price_paid_from_kafka (uuid_string String, price_string String, time String, postcode String, a String, b String, c String, addr1 String, addr2 String, street String, locality String, town String, district String, county String, d String, e String) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list='TutorialTopic', kafka_group_name='clickhouse', kafka_format='CSV', kafka_skip_broken_messages=1, kafka_num_consumers=1;

CREATE TABLE uk_price_paid_from_kafka
(
uuid_string String,
price_string String,
time String,
postcode String,
a String,
b String,
c String,
addr1 String,
addr2 String,
street String,
locality String,
town String,
district String,
county String,
d String,
e String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = ‘localhost:9092’, kafka_topic_list = ‘TutorialTopic’, kafka_group_name = ‘clickhouse’, kafka_format = ‘CSV’, kafka_skip_broken_messages = 1, kafka_num_consumers = 1
Query id: 07a063e9-6a61-42c0-8fec-1fe2f119ee28
Ok.
0 rows in set. Elapsed: 0.008 sec.

给kafka发送消息

~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic

进入消息输入模式,发送下面两个消息

"{F887F88E-7D15-4415-804E-52EAC2F10958}","70000","1995-07-07 00:00","MK15 9HP","D","N","F","31","","ALDRICH DRIVE","WILLEN","MILTON KEYNES","MILTON KEYNES","MILTON KEYNES","A","A"
"{40FD4DF2-5362-407C-92BC-566E2CCE89E9}","44500","1995-02-03 00:00","SR6 0AQ","T","N","F","50","","HOWICK PARK","SUNDERLAND","SUNDERLAND","SUNDERLAND","TYNE AND WEAR","A","A"

在这里插入图片描述

Clickhouse收到消息

在clickhouse-client交互终端中执行下面指令:

select * from uk_price_paid_from_kafka;

在这里插入图片描述
可以看到之前发送给kafka Topic的内容在Clickhouse中被收到了。

问题

后面我再在clickhouse-client交互终端中查询不到数据了。即使我们给kafka该主题发消息,也查询不到。后面我们再将《探索ClickHouse——使用MaterializedView存储kafka传递的数据》中讲解使用MaterializedView清洗和固化kafka的数据。

参考资料

  • https://openjdk.org/install/
  • https://kafka.apache.org/quickstart
  • https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04#step-2-mdash-downloading-and-extracting-the-kafka-binaries
  • https://cloud.tencent.com/developer/article/1892086
  • https://sineyuan.github.io/post/clickhouse-kafka/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/146323.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1.4.C++项目:仿muduo库实现并发服务器之buffer模块的设计

项目完整版在: 一、buffer模块: 缓冲区模块 Buffer模块是一个缓冲区模块,用于实现通信中用户态的接收缓冲区和发送缓冲区功能。 二、提供的功能 存储数据,取出数据 三、实现思想 1.实现换出去得有一块内存空间,采…

14:00面试测试岗,14:06就出来了,问的问题有点变态。。。

从小厂出来,没想到在另一家公司又寄了。 到这家公司开始上班,加班是每天必不可少的,看在钱给的比较多的份上,就不太计较了。没想到9月一纸通知,所有人不准加班,加班费不仅没有了,薪资还要降40%,…

DAMA-DMBOK2重点知识整理CDGA/CDGP——第17章 数据管理和组织变革管理

目录 一、分值分布 二、重点知识梳理 1、引言 2、变革法则 3、并非管理变革:而是管理转型过程 4、科特的变革管理八大误区 5、科特的重大变革八步法 6、变革的秘诀 7、创新扩散和持续变革 8、持续变革 9、数据管理价值的沟通 一、分值分布 CDGA&#xff…

Guava限流器原理浅析

文章目录 基本知识限流器的类图使用示例 原理解析限流整体流程问题驱动1、限流器创建的时候会初始化令牌吗?2、令牌是如何放到桶里的?3、如果要获取的令牌数大于桶里的令牌数会怎么样4、令牌数量的更新会有并发问题吗 总结 实际工作中难免有限流的场景。…

AI智能问答系统源码/AI绘画商业系统/支持GPT联网提问/支持Midjourney绘画

一、AI创作系统 SparkAi创作系统是基于国外很火的ChatGPT进行开发的AI智能问答系统和AI绘画系统。本期针对源码系统整体测试下来非常完美,可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作ChatGPT?小编这里写一个详细图…

【数据结构】排序之插入排序和选择排序

🔥博客主页:小王又困了 📚系列专栏:数据结构 🌟人之为学,不日近则日退 ❤️感谢大家点赞👍收藏⭐评论✍️ 目录 一、排序的概念及其分类 📒1.1排序的概念 📒1.2排序…

【视频去噪】基于全变异正则化最小二乘反卷积是最标准的图像处理、视频去噪研究(Matlab代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…

Academic accumulation|英文文献速读

一、英文文献速读法 (一)明确目的 建议大家阅读一篇论文之前先问一下自己是出于怎样的目的来阅读这篇文章,是为了找选题方向、学某个问题的研究设计、学某种研究方法、学文章写作还是别的。不同的阅读目的会导致不同的关注重点,例…

基于SpringBoot的仿京东商城系统

前台部分实现效果截图 后台部分实现效果截图 源码地址:https://download.csdn.net/download/qq_50954361/87647905

二、局域网联机

目录 1.下载资源包 2.配置NetworkManager 3.编写测试UI 1.下载资源包 2.配置NetworkManager (1)在Assets/Prefabs下创建Network Prefabs List 相应设置如下: (2) 创建空物体“NetworkManager”并挂载NetworkMan…

11链表-迭代与递归

目录 LeetCode之路——206. 反转链表 分析: 解法一:迭代 解法二:递归 LeetCode之路——206. 反转链表 给你单链表的头节点 head ,请你反转链表,并返回反转后的链表。 示例 1: 输入:head […

029-从零搭建微服务-消息队列(一)

写在最前 如果这个项目让你有所收获,记得 Star 关注哦,这对我是非常不错的鼓励与支持。 源码地址(后端):mingyue: 🎉 基于 Spring Boot、Spring Cloud & Alibaba 的分布式微服务架构基础服务中心 源…

华为云云耀云服务器L实例评测|华为云云耀云服务器docker部署srs,可使用HLS协议

华为云云耀云服务器L实例评测|华为云云耀云服务器docker部署srs,可使用HLS协议 什么是华为云云耀云L实例 云耀云服务器L实例,面向初创企业和开发者打造的全新轻量应用云服务器。提供丰富严选的应用镜像,实现应用一键部署&#x…

云安全之HTTP协议介绍

HTTP的基本概念 什么是网络协议 网络协议是计算机之间为了实现网络通信而达成的一种“约定”或者”规则“,有了这种”约定不同厂商生产的设备,以及不同操作系统组成的计算机之间,就可以实现通信。 网络协议由三个要素构成:1、语…

Tomcat启动后的日志输出为乱码

天行健,君子以自强不息;地势坤,君子以厚德载物。 每个人都有惰性,但不断学习是好好生活的根本,共勉! 文章均为学习整理笔记,分享记录为主,如有错误请指正,共同学习进步。…

定时任务管理平台青龙 QingLong

一、关于 QingLong 1.1 QingLong 介绍 青龙面板是支持 Python3、JavaScript、Shell、Typescript 多语言的定时任务管理平台,支持在线管理脚本和日志等。其功能丰富,能够满足大部分需求场景,值得一试。 主要功能 支持多种脚本语言&#xf…

Mybatis学习

为什么会有mybatis?: 图片来自b站的黑马网课 截图 懒得自己打字了hh 帅气的人都要注明出处 相信在学习框架之前 都学习了JDBC 因为Mybatis可以解决旧的JDBC存在的一些问题 什么是mybatis?: ORM框架原理: Mybatis是一个ORM框架,即obje…

c++---I/o操作

5、文件操作 程序运行时产生的数据都属于临时数据&#xff0c;程序一旦运行结束都会被释放。 我们可以通过文件将数据持久化 C中对文件操作需要包含头文件 <fstream> 文件类型分为两种&#xff1a; 文本文件 - 文件以文本的ASCII码形式存储在计算机中二进制文件 - 文…

PADS9.5使用记录

目录 一、概述 二、PADS Logic IN4148二极管封装 SOD-123封装 SOD-323封装 SOD-523封装 2N3904 1AM 三极管封装 78L05 7533-1 一、概述 PADS Logic 原理图绘制PADS Layout PCB 封装设计PADS Router 布线 二、PADS Logic …