二百七十、Kettle——ClickHouse中增量导入清洗数据错误表

一、目的

比如原始数据100条,清洗后,90条正确数据在DWD层清洗表,10条错误数据在DWD层清洗数据错误表,所以清洗数据错误表任务一定要放在清洗表任务之后。

更关键的是,Hive中原本的SQL语句,放在ClickHouse需要大改,头大!而且Kettle任务要想定时增量导入,既与清洗数据错误表最新时间相关,又与DWD层清洗表最新时间相关,搞了大半天才搞定!

二、Hive中原有代码

2.1 表结构

--21、静态排队错误数据表——动态分区  dwd_queue_error
create  table  if not exists  hurys_db.dwd_queue_error(id                  string          comment '唯一ID',device_no           string          comment '设备编号',source_device_type  string          comment '设备类型',sn                  string          comment '设备序列号 ',model               string          comment '设备型号',create_time         string       comment '创建时间',lane_no             int             comment '车道编号',lane_type           int             comment '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道',queue_count         int             comment '排队车辆数',queue_len           float           comment '排队长度(m)',queue_head          float           comment '排队头车距停止线距离(m)',queue_tail          float           comment '排队尾车距停止线距离(m)'
)
comment '静态排队错误数据表——动态分区'
partitioned by (day string)
stored as orc
;

2.2 SQL代码

--动态插入数据
insert  overwrite  table  hurys_db.dwd_queue_error  partition(day)
select
UUID()  as  id,
t2.device_no, t2.source_device_type, t2.sn, t2.model, t2.create_time,t2.lane_no, t2.lane_type,
t2.queue_count, t2.queue_len, t2.queue_head, t2.queue_tail, t2.day
from hurys_db.ods_queue as t2
left join hurys_db.dwd_queue as t3
on t3.device_no=t2.device_no and t3.create_time=t2.create_time and t3.lane_no=t2.lane_no
where t3.device_no is null and t3.create_time is null and t3.lane_no is null and t2.day='2024-09-10'
;

原有Hive代码很简单,然后把代码变成脚本,放在海豚定时调度即可,都很简单!

三、ClickHouse中现有代码

3.1 表结构

--21 静态排队数据错误表(长期存储)
create  table  if not exists  hurys_jw.dwd_queue_error(id                  String                       comment '唯一ID',device_no           String             comment '设备编号',source_device_type  Nullable(String)             comment '设备类型',sn                  Nullable(String)             comment '设备序列号 ',model               Nullable(String)             comment '设备型号',create_time         DateTime                     comment '创建时间',lane_no             Int32              comment '车道编号',lane_type           Nullable(Int32)              comment '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道',queue_count         Int32              comment '排队车辆数',queue_len           Decimal(10, 2)     comment '排队长度(m)',queue_head          Decimal(10, 2)     comment '排队头车距停止线距离(m)',queue_tail          Decimal(10, 2)     comment '排队尾车距停止线距离(m)',day                 Date                         comment '日期'
)
ENGINE = MergeTree
PARTITION BY day
PRIMARY KEY (day,id)
ORDER BY (day,id)
SETTINGS index_granularity = 8192;

注意:由于后面数据清洗记录表需要,因此部分清洗规则的字段不能用Nullable,这也是后面的一大坑!

3.2 SQL代码

select
generateUUIDv4()  as  id,
device_no, source_device_type, sn, model, create_time,
lane_no, lane_type, queue_count, queue_len, queue_head, queue_tail,
cast(day as String) day
from (selectt2.device_no, t2.source_device_type, t2.sn, t2.model,t2.create_time,t2.lane_no, t2.lane_type,t2.queue_count, t2.queue_len, t2.queue_head, t2.queue_tail, toDate(t2.create_time) dayfrom hurys_jw.ods_queue as t2ANTI join hurys_jw.dwd_queue as t3on t3.device_no=t2.device_no and t3.create_time=t2.create_time and t3.lane_no=t2.lane_no)
--where  create_time > ?
;

注意:1 生成uuid字段,Hive中是UUID() as id,而ClickHouse中是generateUUIDv4() as id

           2 ClickHouse中with语句好像不是支持,不知道是不是版本问题

           3 ClickHouse中有ANTI join函数

           4 Kettle里需要把Date字段的day变成cast(day as String) day

3.3 Kettle任务

3.3.1 newtime

获取目标表dwd_queue_error的最新时间create_time

3.3.2 替换NULL值

3.3.3 clickhouse输入

select 
generateUUIDv4()  as  id,
device_no, source_device_type, sn, model, create_time,
lane_no, 
lane_type, queue_count, queue_len, queue_head, queue_tail,
cast(day as String) day
from (
select t2.device_no, t2.source_device_type, t2.sn, t2.model,t2.create_time,t2.lane_no, t2.lane_type,
t2.queue_count, t2.queue_len, t2.queue_head, t2.queue_tail, toDate(t2.create_time) day
from hurys_jw.ods_queue as t2
ANTI join hurys_jw.dwd_queue as t3
on t3.device_no=t2.device_no and t3.create_time=t2.create_time and t3.lane_no=t2.lane_no
)
where  create_time > ?
;

3.3.4 字段选择

3.3.5 newtime3

获取清洗表dwd_queue的最新时间create_time3

3.3.6 替换NULL值3

3.3.7 记录关联 (笛卡尔输出)

注意:清洗表dwd_queue的最新时间create_time3要大于等于目标表dwd_queue_error的最新时间create_time

3.3.8 clickhouse输出

3.3.9 保存后先执行清洗表dwd_queue任务,再执行dwd_queue_error任务

3.3.10 配置海豚调度任务

搞定!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/460267.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Nas】X-Doc:jellyfin“该客户端与媒体不兼容,服务器未发送兼容的媒体格式”问题解决方案

【Nas】X-Doc:jellyfin“该客户端与媒体不兼容,服务器未发送兼容的媒体格式”问题解决方案 当使用Jellyfin播放视频时出现“该客户端与媒体不兼容,服务器未发送兼容的媒体格式”,这是与硬件解码和ffmpeg设置有关系,具体…

linux应急响应-1

声明:部分内容来源于网络,只是新手练习 靶场环境来自于知攻善防实验室 概述: 一、整体过程 初始环境设置 将Linux centOS 7配置为图形化界面,通过yum groupinstall “X Window System” -y和yum groupinstall “GNOME Desktop”&a…

视频去水印软件推荐:6款去水印工具值得一试

在视频创作和分享的过程中,水印往往会成为影响美观和平台推流。幸运的是,市面上有许多视频去水印软件能够帮助我们轻松解决这一问题。本文将为大家推荐几款实用的视频去水印软件,并详细介绍它们的功能和去除水印的方法。 1.影忆 功能介绍&…

MaxKB: 一款基于大语言模型的知识库问答系统

嗨, 大家好, 我是徐小夕. 之前一直在社区分享零代码&低代码的技术实践,最近也在研究多模态文档引擎相关的产品, 在社区发现一款非常有意思的知识库问答系统——MaxKB, 它支持通过大语言模型和RAG技术来为知识库赋能,今天就和大家分享一下这款项目. PS: 它提供了…

Java NIO2 异步IO支持

NIO2 从 Java 7 在之前的NIO基础上,它提供了异步 IO 操作、文件系统访问增强等诸多功能 路径(Path)接口 Path 接口代表了文件系统的路径。它可以用来定位一个文件或目录。 提供了多种方法来解析、转换和查询路径信息。Paths 类提供了一些静…

上市公司数字经济与实体经济融合发展程度测算数据(2008-2022年)-最新出炉_附下载链接

上市公司数字经济与实体经济融合发展程度测算数据(2008-2022年) 下载链接-点它👉👉👉:上市公司数字经济与实体经济融合发展程度测算数据(2008-2022年)-最新出炉.zip 一、引言 随着…

Prompt Engineering (Prompt工程)

2 prompt工程2大原则 2.1 给出清晰&#xff0c;详细的指令 策略1&#xff1a;使用分割符清晰的指示输出的不同部分&#xff0c;比如"",<>,<\tag>等分隔符 策略2&#xff1a;指定一个结构化的输出&#xff0c;比如json,html等格式 策略3&#xff1a;要…

Nginx防盗链配置

1. 什么是盗链? 盗链是指服务提供商自己不提供服务的内容&#xff0c;通过技术手段绕过其它有利益的最终用户界面&#xff08;如广告&#xff09;&#xff0c;直接在自己的网站上向最终用户提供其它服务提供商的服务内容&#xff0c;骗取最终用户的浏览和点击率。受益者不提供…

ppt设计软件哪个好?这5个在线ppt工具不容错过!

职场人每天的办公日常&#xff0c;大概率都离不开PPT&#xff0c;不管是制作季度汇报&#xff0c;还是向团队展示各类方案&#xff0c;诸如此类的场景都会用到ppt。 ppt是一个看重视觉效果的演示媒介&#xff0c;可以说它的外观精美与否&#xff0c;很大程度上决定了观众或潜在…

LC20. 有效的括号

用来熟悉一下栈的应用之括号匹配 题目链接 下面是大致思路 1.初始化:创建一个空栈,用于存储左括号。&#xff08;LC这题不用&#xff0c;自己写完整的需要&#xff09; 2.遍历字符串:从左到右依次扫描字符串中的每个字符。 3.处理左括号:如果是左括号,将其压入栈中。 4.处理右…

8. 性能指标

博客补充&#xff1a;CUDA C 最佳实践指南-CSDN博客https://blog.csdn.net/qq_62704693/article/details/141267262?spm1001.2014.3001.5502 在尝试优化 CUDA 代码时&#xff0c;了解如何准确测量性能并了解带宽在性能测量中的作用是值得的。本章讨论如何使用 CPU 计时器和 C…

【Stable Diffusion - Ai】小白入门必看(涂鸦、涂鸦重绘、局部重绘和重绘蒙版篇)!真材实料!不卖课!!!

【Stable Diffusion - Ai】小白入门必看&#xff08;涂鸦、涂鸦重绘、局部重绘和重绘蒙版篇&#xff09;&#xff01;真材实料&#xff01;不卖课&#xff01;&#xff01;&#xff01; 在上一篇 小白入门必看&#xff08;图生图篇&#xff09;中我们详细的介绍了文生图和图生…

Linux字体更新 使用中文字体

问题描述&#xff0c;处理之前&#xff0c;中文乱码 处理后的结果 压缩需要上传的字体&#xff1a; 上传到LInux的字体目录&#xff0c;上传后解压出来 刷新字体&#xff1a; fc-cache -fv 测试是否正常 fc-list | grep "FontName"如果还不行 可以在代码里面指定字…

信息安全工程师(72)网络安全风险评估概述

前言 网络安全风险评估是一项重要的技术任务&#xff0c;它涉及对网络系统、信息系统和网络基础设施的全面评估&#xff0c;以确定存在的安全风险和威胁&#xff0c;并量化其潜在影响以及可能的发生频率。 一、定义与目的 网络安全风险评估是指对网络系统中存在的潜在威胁和风险…

Kafka 基础入门

文章内容是学习过程中的知识总结&#xff0c;如有纰漏&#xff0c;欢迎指正 文章目录 前言 1. 核心概念 1.1 Producer 1.2 broker 1.3 consumer 1.4 zookeeper 1.5 controller 1.6 Cluster 2. 逻辑组件 2.1 Topic 2.2 Partition 2.3 Replication 2.4 leader & follower 3. …

CH569开发前的测试

为了玩转准备Ch569的开发工作 &#xff0c;准备了如下硬件和软件&#xff1a; 硬件 1.官方的 Ch569 开发板&#xff0c;官方买到的是两块插接在一起的&#xff1b;除了HSPI接口那里的电阻&#xff0c;这两块可以说是一样的。也意味着两块板子的开发也需要烧录两次&#xff1b…

OpenCV基本操作(python开发)——(7)实现图像校正

OpenCV基本操作&#xff08;python开发&#xff09;——&#xff08;1&#xff09; 读取图像、保存图像 OpenCV基本操作&#xff08;python开发&#xff09;——&#xff08;2&#xff09;图像色彩操作 OpenCV基本操作&#xff08;python开发&#xff09;——&#xff08;3&…

ffmpeg视频滤镜:网格-drawgrid

滤镜介绍 drawgrid 官网链接 》 FFmpeg Filters Documentation drawgrid会在视频上画一个网格。 滤镜使用 参数 x <string> ..FV.....T. set horizontal offset (default "0")y <string> ..FV.....T. set…

使用pytorch实现LSTM预测交通流

原始数据&#xff1a; 免费可下载原始参考数据 预测结果图&#xff1a; 根据测试数据test_data的真实值real_flow&#xff0c;与模型根据测试数据得到的输出结果pre_flow 完整源码&#xff1a; #!/usr/bin/env python # _*_ coding: utf-8 _*_import pandas as pd import nu…

Oracle视频基础1.1.3练习

1.1.3 需求&#xff1a; 完整格式查看所有用户进程里的oracle后台进程 查看物理网卡&#xff0c;虚拟网卡的ip地址 ps -ef | grep oracle /sbin/ifconfig要以完整格式查看所有用户进程中的 Oracle 后台进程&#xff0c;并查看物理和虚拟网卡的 IP 地址&#xff0c;可以使用以下…