15_基于Flink将pulsar数据写入到ClickHouse

3.8.基于Flink将数据写入到ClickHouse

编写Flink完成数据写入到ClickHouse操作, 后续基于CK完成指标统计操作

3.8.1.ClickHouse基本介绍

ClickHouse 是俄罗斯的Yandex于2016年开源的列式存储数据库(DBMS),使用C++语言编写,主要用于在线分析处理查询(OLAP),能够使用SQL查询实时生成分析数据报告。
在这里插入图片描述
结论: ClickHouse像很多OLAP数据库一样,单表查询速度由于关联查询,而且ClickHouse的两者差距更为明显。

3.8.2.ClickHouse安装步骤

本项目中,我们仅需要安装单机测试版本即可使用(node2安装), 在实际生产中, 大家可以直接将分布式集群版本

  • 1-设置yum源
sudo yum install yum-utils
sudo rpm --import https://repo.clickhouse.com/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/stable/x86_64
  • 2- 直接基于yum安装即可
sudo yum install clickhouse-server clickhouse-client
  • 3-修改配置文件
vim /etc/clickhouse-server/config.xml 
修改178行: 打开这一行的注释 
<listen_host>::</listen_host>

在这里插入图片描述

  • 4-启动clickhouse的server
systemctl start clickhouse-server 
停止:
systemctl stop clickhouse-server 
重启
systemctl restart clickhouse-server
  • 5-进入客户端
    在这里插入图片描述

3.8.3.在ClickHouse中创建目标表

create database itcast_ck; 
use itcast_ck; 
create table itcast_ck.itcast_ck_ems( 
id int, 
sid varchar(128), 
ip varchar(128), 
create_time varchar(128), 
session_id varchar(128), 
yearInfo varchar(128), 
monthInfo varchar(128), 
dayInfo varchar(128), 
hourInfo varchar(128), 
seo_source varchar(128), 
area varchar(128), 
origin_channel varchar(128), 
msg_count int(128), 
from_url varchar(128), 
PRIMARY KEY (`id`) 
) ENGINE=ReplacingMergeTree();

3.8.4.编写Flink代码完成写入到CK操作

import com.itheima.pojo.PulsarTopicPojo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonDeser;
import org.apache.flink.types.Row;import java.sql.Types;
import java.util.Properties;// 基于Flink完成读取Pulsar中数据将消息数据写入到clickhouse中
public class ItcastFlinkToClickHouse {public static void main(String[] args) throws Exception {//1. 创建Flinnk流式处理核心环境类对象 和 Table API 核心环境类对象StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2. 添加Source组件, 从Pulsar中读取消息数据Properties props = new Properties();props.setProperty("topic","persistent://public/default/itcast_ems_tab");props.setProperty("partition.discovery.interval-millis","5000");FlinkPulsarSource<PulsarTopicPojo> pulsarSource = new FlinkPulsarSource<PulsarTopicPojo>("pulsar://node1:6650,node2:6650,node3:6650","http://node1:8080,node2:8080,node3:8080",JsonDeser.of(PulsarTopicPojo.class),props);//2.1 设置pulsarSource组件在消费数据的时候, 默认从什么位置开始消费pulsarSource.setStartFromLatest();DataStreamSource<PulsarTopicPojo> dataStreamSource = env.addSource(pulsarSource);//2.2  转换数据操作: 将 PulsarTopicPojo 转换为ROW对象SingleOutputStreamOperator<Row> rowDataSteam = dataStreamSource.map(new MapFunction<PulsarTopicPojo, Row>() {@Overridepublic Row map(PulsarTopicPojo pulsarTopicPojo) throws Exception {return Row.of(pulsarTopicPojo.getId(), pulsarTopicPojo.getSid(), pulsarTopicPojo.getIp(), pulsarTopicPojo.getCreate_time(),pulsarTopicPojo.getSession_id(), pulsarTopicPojo.getYearInfo(), pulsarTopicPojo.getMonthInfo(), pulsarTopicPojo.getDayInfo(),pulsarTopicPojo.getHourInfo(), pulsarTopicPojo.getSeo_source(), pulsarTopicPojo.getArea(), pulsarTopicPojo.getOrigin_channel(),pulsarTopicPojo.getMsg_count(), pulsarTopicPojo.getFrom_url());}});//2.3: 设置sink操作写入到CK操作String insertSql = "insert into itcast_ck.itcast_ck_ems (id,sid,ip,create_time,session_id,yearInfo,monthInfo,dayInfo,hourInfo,seo_source,area,origin_channel,msg_count,from_url) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";JDBCAppendTableSink tableSink = JDBCAppendTableSink.builder().setDrivername("ru.yandex.clickhouse.ClickHouseDriver").setDBUrl("jdbc:clickhouse://node2:8123/itcast_ck").setQuery(insertSql).setBatchSize(1).setParameterTypes(Types.INTEGER,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.INTEGER,Types.VARCHAR).build();tableSink.emitDataStream(rowDataSteam);//3. 提交执行env.execute("itcast_to_ck");}
}

3.9.HBase对接Phoenix实现即席查询

3.9.1.Phoenix安装操作

Phoenix是属于apache旗下的一款基于hbase的工具, 此工具提供一种全新的方式来操作hbase中数据(SQL),
同时Phoenix对hbase进行大量的优化工作, 能够让我们更加有效的操作hbase

整个安装操作, 大家可以参考资料中安装手册, 进行安装即可

3.9.2.在Phoenix中创建表

create view "itcast_h_ems" ( 
"id" integer primary key, 
"f1"."sid" varchar, 
"f1"."ip" varchar, 
"f1"."create_time" varchar, 
"f1"."session_id" varchar, 
"f1"."yearInfo" varchar, 
"f1"."monthInfo" varchar, 
"f1"."dayInfo" varchar, 
"f1"."hourInfo" varchar, 
"f1"."seo_source" varchar, 
"f1"."area" varchar, 
"f1"."origin_channel" varchar, 
"f1"."msg_count" integer, 
"f1"."from_url" varchar 
);

3.9.3.在Phoenix中类型说明

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/88789.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言快速回顾(一)

前言 在Android音视频开发中&#xff0c;网上知识点过于零碎&#xff0c;自学起来难度非常大&#xff0c;不过音视频大牛Jhuster提出了《Android 音视频从入门到提高 - 任务列表》&#xff0c;结合我自己的工作学习经历&#xff0c;我准备写一个音视频系列blog。C/C是音视频必…

MySQL数据库练习

目录 表结构 建表 插入数据 1、用SQL语句创建学生表student&#xff0c;定义主键&#xff0c;姓名不能重名&#xff0c;性别只能输入男或女&#xff0c;所在系的默认值是 “计算机”。 2、修改student 表中年龄&#xff08;age&#xff09;字段属性&#xff0c;数据类型由…

【LangChain概念】了解语言链️:第2部分

一、说明 在LangChain的帮助下创建LLM应用程序可以帮助我们轻松地链接所有内容。LangChain 是一个创新的框架&#xff0c;它正在彻底改变我们开发由语言模型驱动的应用程序的方式。通过结合先进的原则&#xff0c;LangChain正在重新定义通过传统API可以实现的极限。 在上一篇博…

【ts】【cocos creator】excel表格转JSON

需要将表格导出为text格式放到项目resources/text文件夹下 新建场景&#xff0c;挂载到Canvas上运行 表格文件格式&#xff1a; 保存格式选text tableToJson : import CryptoJS require(./FileSaver);const { ccclass, property } cc._decorator;ccclass export default c…

IDEA的常用设置,让你更快速的编程

一、前言 在使用JetBrains的IntelliJ IDEA进行软件开发时&#xff0c;了解和正确配置一些常用设置是非常重要的。IDEA的强大功能和定制性使得开发过程更加高效和舒适。 在本文中&#xff0c;我们将介绍一些常用的IDEA设置&#xff0c;帮助您更好地利用IDEA进行开发。这些设置包…

Kotlin 中的 Lambda 与 Inline

在Kotlin中&#xff0c;有很多很酷很实用的特性&#xff0c;比如Lambda和高阶函数&#xff0c;利用这些特性&#xff0c;我们可以更加快速的实现开发&#xff0c;提升效率。 比如我们实现一个捕获Throwable&#xff0c;安全执行部分代码的高阶函数 fun safeRun(runnable: () …

034_小驰私房菜_[问题复盘] Qcom平台,某些三方相机拍照旋转90度

全网最具价值的Android Camera开发学习系列资料~ 作者:8年Android Camera开发,从Camera app一直做到Hal和驱动~ 欢迎订阅,相信能扩展你的知识面,提升个人能力~ 【一、问题】 某些三方相机,预览正常,拍照旋转90度 【二、问题排查】 1 ) HAL这边Jpeg编码数据在哪个地方…

【动态map】牛客挑战赛67 B

登录—专业IT笔试面试备考平台_牛客网 题意&#xff1a; 思路&#xff1a; 考虑动态的map 可以先定义一个状态&#xff0c;然后用map统计前缀这个状态的出现次数 在这里&#xff0c;定义{a,b}为cnt1 - cnt0和cnt2 - cnt0 当cnt0 和 cnt1都和cnt2相同时&#xff0c;统计贡献…

在 IntelliJ IDEA 中使用 Docker 开发指南

目录 一、IDEA安装Docker插件 二、IDEA连接Docker 1、Docker for Windows 连接 2、SSH 连接 3、Connection successful 连接成功 三、查看Docker面板 四、使用插件生成镜像 一、IDEA安装Docker插件 打开 IntelliJ IDEA&#xff0c;点击菜单栏中的 "File" -&g…

百度屏蔽词有哪些?其中就有移民关键词指数被屏蔽?

我是百收网SEO&#xff0c;点点上面的头像&#xff0c;欢迎关注我哦&#xff01; 今日tombkeeper消息爆料&#xff1a;百度指数已经屏蔽“移民”等关键词指数。 大家好&#xff0c;我是百收网SEO商学院的狂潮微课老师&#xff0c;今天我们来讲解第 12 节课关键词优化难度分析…

【JavaEE基础学习打卡03】Java EE 平台有哪些内容?

目录 前言一、Java EE平台说明二、Java EE平台容器及组件1.平台容器2.平台组件 三、JavaEE平台API服务1.API服务概览2.平台API 总结 前言 &#x1f4dc; 本系列教程适用于Java Web初学者、爱好者&#xff0c;小白白。我们的天赋并不高&#xff0c;可贵在努力&#xff0c;坚持不…

每天一道leetcode:72. 编辑距离(动态规划困难)

今日份题目&#xff1a; 给你两个单词 word1 和 word2&#xff0c; 请返回将 word1 转换成 word2 所使用的最少操作数 。 你可以对一个单词进行如下三种操作&#xff1a; 插入一个字符 删除一个字符 替换一个字符 示例1 输入&#xff1a;word1 "horse", word…

静态库和动态库制作

文章目录 前言一、静态库和动态库介绍1、静态库2、动态库 二、静态库的制作及使用1、准备好源码2、编译源码生成 .o 文件3、制作静态库4、使用静态库 三、动态库的制作及使用1、生成位置无关的 .o 文件2、制作动态库3、使用动态库4、指定动态库路径并使其生效 四、对比1、静态库…

链表OJ详解

&#x1f495;人生不满百&#xff0c;常怀千岁忧&#x1f495; 作者&#xff1a;Mylvzi 文章主要内容&#xff1a;链表oj详解 题目一&#xff1a;移除元素 题目要求&#xff1a; 画图分析&#xff1a; 代码实现&#xff1a; struct ListNode* removeElements(struct List…

Linux实用命令合集

适用于CentOS7系统&#xff0c;其他系统有些命令不支持 yum install epel-release 失败 wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo vi/vim检索关键字 命令模式:/****"n"可以跳转到下一个关键字位置 cat 查看配置文件不显示…

Redis 6.5 服务端开启多线程源码

redis支持开启多线程&#xff0c;只有从socket到读取缓冲区和从输出缓冲区到socket这两段过程是多线程&#xff0c;而命令的执行还是单线程&#xff0c;并且是由主线程执行 借鉴&#xff1a;【Redis】事件驱动框架源码分析&#xff08;多线程&#xff09; 一、main启动时初始化…

第4章:决策树

停止 当前分支样本均为同一类时&#xff0c;变成该类的叶子节点。当前分支类型不同&#xff0c;但是已经没有可以用来分裂的属性时&#xff0c;变成类别样本更多的那个类别的叶子节点。当前分支为空时&#xff0c;变成父节点类别最多的类的叶子节点。 ID3 C4.5 Cart 过拟合 缺…

超导热催生meme,换汤不换药的投机轮回

文/章鱼哥 出品/陀螺财经 币圈对炒作meme概念的热情从未消亡过。 随着一种名为LK-99的物质被发现&#xff0c;围绕超导的兴奋不仅激发了科学界&#xff0c;加密货币相关概念也与之沸腾。不出所料&#xff0c;与此前围绕元宇宙、AI大肆炒作一样&#xff0c;许多meme代币已经出现…

Spring 使用注解开发、代理模式、AOP

使用注解开发 在Spring4之后&#xff0c;要使用注解开发&#xff0c;必须要保证AOP的包导入了 项目搭建&#xff1a; 在配置文件中导入约束&#xff0c;增加注解支持 <?xml version"1.0" encoding"UTF-8"?> <beans xmlns"http://www.spri…

华为新版ENSP PRO模拟器测评:性能表现与功能扩展一览

一、引言 在网络领域不断涌现的新技术和复杂的网络拓扑要求&#xff0c;推动了网络设备模拟器的持续发展和创新。华为作为一家领先的通信技术解决方案提供商&#xff0c;不断致力于为网络工程师和技术从业人员提供更优秀的仿真环境。最近&#xff0c;华为推出了ensp pro模拟器的…