RisingWave最佳实践-利用Dynamic filters 和 Temporal filters 实现监控告警

心得的体会

刚过了年刚开工,闲暇之余调研了分布式SQL流处理数据库–RisingWave,本人是Flink(包括FlinkSQL和Flink DataStream API)的资深用户,但接触到RisingWave令我眼前一亮,并且拿我们生产上的监控告警场景在RisingWave上做了验证,以下是自己的心得体会:

RisingWave架构简单,运维成本底,基于云原生(可以分别基于计算和存储动态伸缩),同时在开发上屏蔽了Flink等实时处理框架底层需要处理的一些技术细节(状态存储,数据一致性,分布式集群扩展等)。提供了与PostgreSQL兼容的标准SQL接口,用户可以像使用 PostgreSQL 一样处理数据流。并且RisingWave不单单可以处理流式数据,还提供了数据其他流式处理框架(如:Flink、storm)所不具备的数据存储能力,基本可以完全取代FlinkSQL。相对于其他OLAP系统(如:apache doris,starrocks),RisingWave采用同步实时,可以保证实时的新鲜度;强一致性,而不是最终一致性。用户需要做的仅仅是通过开发SQL就可以处理流数据,当然首先需要具备流式数据处理思维(相对于离线)。

RisingWave当然也有自身的不足,相对于Flink可以通过DataStream API自定义灵活的处理流式数据,RisingWave只能解决一些特定的流式场景,无法做太多定制开发;相对于Apache Doris等OLAP实时分析性数据库,RisingWave不适合做分析型随机查询。另外RisingWave是个新事物,正在发展阶段,周边生态和相关文档还不健全,作为尝鲜者可能会踩很多坑。然而令人欣慰的是RisingWave的社区回复还是很及时的,RisingWave官方投入了很多精力在做RisingWave的布道和答疑。

至于争论比较厉害的RisingWav VS FLink的性能和吞吐量上孰优孰劣,针对不同应用场景可能有不同表现,因此没有亲自调研就没有发言权。但我认为在不同的场景下他们应该有各自的优势。无论如何RisingWave部署简单,上手容易,试错成本低是一个不争的事实。RisingWave可以应用在一些数据看版,监控,实时指标等场景。

利用动态和时间过滤器实现监控告警

FlinkSQL解决不了定时触发的问题,FlinkSQL的流处理逻辑只是按event触发,不能按时间条件触发,也就是没有触发器机制。FlinkSQL窗口的定时触发,归根结底也是基于event触发,event驱动的机制。因此需要触发器的场景就需要用到Flink DataStream API的KeyedProcessFunction等算子。但RisingWave利用Dynamic filters 和 Temporal filters 可以间接实现类似场景的触发器机制。

场景描述

现有如下群消息实时指标监控场景:
数据有初始化(init)、查询(query)、回调(callback:succ+fail)三种先后顺序状态。
数据是按预设时间批次分组的,例如:2024-01-01 08:00:00、2024-01-01 08:30:00,实时统计每一个批次内三种不同状态的数据count。

监控指标一:在某一个批次延迟指定的时间(query_timeout)之内(例如:2024-01-01 08:00:00延迟1小时触发时间为系统时间2024-01-01 09:00:00),该批次的query状态数据count没有达到init状态的数量count阀值(即query_count<init_count*query_threshold)就触发告警。
同时结束该批次数据统计,下发该批次数据的指标包括:批次时间、init_count、query_count等

监控指标二:如果指标一告警没有被触发,该批次在满足query状态数据count达到init状态数量count的阀值(即query_count>=init_countquery_threshold)以后,在指定的延迟时间内(callback_timeout),该批次的callback状态数据count没有达到query状态的数量count阀值(即callback_count<query_countcallback_threshold)就触发告警。
同时结束该批次数据统计,下发该批次数据的指标包括:批次时间、init_count、query_count、callback_count等

群消息实时指标监控流程图如下:
群消息实时指标监控流程图

实例demo

RisingWave部署可以参考:RisingWave分布式SQL流处理数据库调研

假设:
query_threshold=1, callback_count=1
query_timeout= ‘5 minute’, callback_timeout= ‘1 minute’
0->init,1->query,2->callback

RisingWave SQL:

 
DROP TABLE t_msg;
CREATE TABLE t_msg(msg_id int,status smallint ,public_time timestamp,process_time timestamp as proctime()
) APPEND ONLY;set timezone = 'PRC';--PRC(People’s Republic of China)
show timezone;select * from t_msg;--统计不同状态的count
DROP MATERIALIZED VIEW mv_t_msg_groupby; 
CREATE MATERIALIZED VIEW mv_t_msg_groupby AS
SELECT public_time
,sum(case when status = 0  then 1 else 0 end) AS init_count
,sum(case when status = 1  then 1 else 0 end) AS query_count
,sum(case when status = 2  then 1 else 0 end) AS callback_count
,max(process_time) as process_time
FROM t_msg 
group by public_time;select * from mv_t_msg_groupby;--sink_query_alarm 
DROP SINK sink_query_alarm;
CREATE SINK sink_query_alarm AS 
SELECT public_time
,init_count
,query_count
,process_time
FROM mv_t_msg_groupby
where public_time + INTERVAL '5 minute' <= now()   --query_timeout=1 minute, public_time相当于当前时间延迟1分钟触发,【Delay table changes】
and   init_count*1 > query_count --query_threshold=1
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink_query_alarm'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);--由于RisingWave不支持在【MATERIALIZED VIEW】和【SINK】等【可伸缩流】中指定处理时间字段,因此需要借助外部存储kafka周转
--RisngWave官方给的解释:support a proctime on an append only stream might be easier but on retractable stream could take extra cost. We must think it carefully to introduce such a feature.--sink_query_succ 
DROP SINK sink_query_succ;
CREATE SINK sink_query_succ AS 
SELECT public_time
,init_count
,query_count
,callback_count
,process_time as query_succ_process_time
FROM mv_t_msg_groupby
where public_time + INTERVAL '5 minute' >= now()   --query_timeout=1 minute, 在指定的时间内,【Delete and clean expired data】
and   init_count*1 <= query_count --query_threshold=1,query_count达到了指定值
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink_query_succ'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);--source连接器
DROP SOURCE source_query_succ;
CREATE SOURCE IF NOT EXISTS source_query_succ (init_count int,query_count int ,callback_count int ,public_time timestamp,query_succ_process_time timestamp
)
WITH (connector='kafka',topic='t_sink_query_succ',properties.bootstrap.server='192.168.1.100:8092',scan.startup.mode='earliest', -- earliest ,latest,default:earliest 
) FORMAT PLAIN ENCODE JSON;select * from source_query_succ;--sink_callback_alarm,用到动态过滤器和时间过滤器
DROP SINK sink_callback_alarm;
CREATE SINK sink_callback_alarm AS 
WITH tmp AS ( 
select public_time, min(query_succ_process_time) as query_succ_process_time  -- 动态过滤器
FROM source_query_succ 
group by public_time
)
SELECT b.public_time
,b.init_count
,b.query_count
,b.callback_count
,b.process_time
,a.query_succ_process_time
FROM  tmp a
JOIN  mv_t_msg_groupby b ON a.public_time=b.public_time
where a.query_succ_process_time + INTERVAL '1 minute' <= now()   --query_timeout=1 minute, public_time相当于当前时间延迟1分钟触发,【Delay table changes】
and   b.query_count*1 > b.callback_count --callback_threshold=1
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink_callback_alarm'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);-- 模拟数据
--init
INSERT INTO t_msg values(1,0,'2024-02-23 15:55:00'::TIMESTAMP); --比当前系统时间早
INSERT INTO t_msg values(2,0,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,0,'2024-02-23 15:55:00'::TIMESTAMP);
--query                                
INSERT INTO t_msg values(1,1,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(2,1,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,1,'2024-02-23 15:55:00'::TIMESTAMP);
--callback                             
INSERT INTO t_msg values(1,2,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(2,2,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,2,'2024-02-23 15:55:00'::TIMESTAMP);

查看监控结果:

docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink_query_alarm -C 
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink_callback_alarm -C 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/263644.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android Gradle 开发与应用 (一) : Gradle基础

1. Gradle是什么 Gradle是一个通用的构建工具&#xff0c;支持诸多主要的 IDE&#xff0c;包括 Android Studio、IntelliJ IDEA、Visual Studio 等 Gradle 的底层实现(核心引擎和框架)其实是用 Java 编写的开发者通常使用 Groovy 或 Kotlin 来编写构建脚本 1.1 那么为什么Gra…

Order By Limit不稳定性

文章目录 前置解决不确定性场景1 Order By索引1.1 背景1.2 不确定性产生原因1.2.1 正常情况下1.2.2 但是 1.3 补充1.4 场景1总结 场景2 Order by id2.1 背景2.2 不会产生不确定性原因1原因2 2.3 推荐使用方式 场景3 filesort3.1 背景3.2 不确定性产生原因3.3 内存排序和磁盘临时…

Jmeter基础(3) 发起一次请求

目录 Jmeter 一次请求添加线程组添加HTTP请求添加监听器 Jmeter 一次请求 用Jmeter进行一次请求的过程&#xff0c;需要几个步骤呢&#xff1f; 1、添加线程组2、添加HTTP请求3、添加监听器&#xff0c;查看结果树 现在就打开jmeter看下如何创建一个请求吧 添加线程组 用来…

【Java程序设计】【C00282】基于Springboot的校园台球厅人员与设备管理系统(有论文)

基于Springboot的校园台球厅人员与设备管理系统&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的校园台球厅人员与设备管理系统 本系统分为系统功能模块、管理员功能模块以及用户功能模块。 系统功能模块&#xf…

【数据结构与算法】(16)基础算法 之哈希表 相关示例 详细代码讲解

目录 3.6 哈希表第一版生成 hashCode思考习题E01. 两数之和-Leetcode 1E02. 无重复字符的最长字串-Leetcode 3E03. 字母异位词分组-Leetcode 49E04. 判断有没有重复元素-Leetcode 217E05. 找出出现一次的数字-Leetcode 136E06. 判断字母异位词-Leetcode 242E07. 第一个不重复字…

前端工程化面试题 | 16.精选前端工程化高频面试题

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

vue : 无法加载文件 C:\Program Files\nodejs\node_global\vue.ps1,因为在此系统上禁止运行脚本。

解决方法&#xff1a; 打开PowerShell&#xff0c;在命令框输入set-ExecutionPolicy RemoteSigned 在PowerShell中输入会出现如下图&#xff0c;输入y即可。

数据结构-列表LinkedList

一,链表的简单的认识. 数组,栈,队列是线性数据结构,但都算不上是动态数据结构,底层都是依托静态数组,但是链表是确实真正意义上的动态数组. 为什么要学习链表? 1,链表时最简单的动态数据结构 2,掌握链表有助于学习更复杂的数据结构,例如,二叉树,trie. 3,学习链表有助于更深入…

unity学习(40)——创建(create)角色脚本(panel)——UI

1.点击不同的头像按钮&#xff0c;分别选择职业1和职业2&#xff0c;create脚本中对应的函数。 2.调取inputfield中所输入的角色名&#xff08;限制用户名长度为7字符&#xff09;&#xff0c;但愿逆向的服务器可以查重名&#xff1a; 3.点击头衔&#xff0c;显示选择的职业&a…

Spring Boot 手写starter!!!

原因&#xff1a;为什么要手写starter&#xff1f;&#xff1f;&#xff1f; 原因&#xff1a;简化功能。 实例&#xff1a;以分页为例&#xff1a;写一个starter。 1.首先定义一个PageX注解。 Target({ElementType.METHOD}) Retention(RetentionPolicy.RUNTIME) Documented p…

LeetCode 热题 100 | 二叉树(一)

目录 1 基础知识 1.1 先序遍历 1.2 中序遍历 1.3 后序遍历 2 94. 二叉树的中序遍历 3 104. 二叉树的最大深度 4 226. 翻转二叉树 5 101. 对称二叉树 菜鸟做题&#xff0c;语言是 C 1 基础知识 二叉树常见的遍历方式有&#xff1a; 先序遍历中序遍历后序遍历…

RocketMQ-架构与设计

RocketMQ架构与设计 一、简介二、框架概述1.设计特点 三、架构图1.Producer2.Consumer3.NameServer4.BrokerServer 四、基本特性1.消息顺序性1.1 全局顺序1.2 分区顺序 2.消息回溯3.消息重投4.消息重试5.延迟队列&#xff08;定时消息&#xff09;6.重试队列7.死信队列8.消息语…

神经网络系列---感知机(Neuron)

文章目录 感知机(Neuron)感知机(Neuron)的决策函数可以表示为&#xff1a;感知机(Neuron)的学习算法主要包括以下步骤&#xff1a;感知机可以实现逻辑运算中的AND、OR、NOT和异或(XOR)运算。 感知机(Neuron) 感知机(Neuron)是一种简单而有效的二分类算法&#xff0c;用于将输入…

jmeter下载base64加密版pdf文件

一、何为base64加密版pdf文件 如下图所示&#xff0c;接口jmeter执行后&#xff0c;返回一串包含大小写英文字母、数字、、/、的长字符串&#xff0c;直接另存为pdf文件后&#xff0c;文件有大小&#xff0c;但是打不开&#xff1b;另存为doc文件后&#xff0c;打开可以看到和…

Puppeteer 使用实战:如何将自己的 CSDN 专栏文章导出并用于 Hexo 博客(二)

文章目录 上一篇效果演示Puppeteer 修改浏览器的默认下载位置控制并发数错误重试并发控制 错误重试源码 上一篇 Puppeteer 使用实战&#xff1a;如何将自己的 CSDN 专栏文章导出并用于 Hexo 博客&#xff08;一&#xff09; 效果演示 上一篇实现了一些基本功能&#xff0c;…

Maxwell安装部署

1 Maxwell输出格式 database&#xff1a;变更数据所属的数据库table&#xff1a;变更数据所属的表type&#xff1a;数据变更类型ts&#xff1a;数据变更发生的时间xid&#xff1a;事务idcommit&#xff1a;事务提交标志&#xff0c;可用于重新组装事务data&#xff1a;对于inse…

uni-app nvue vue3 setup中实现加载webview,解决nvue中获取不到webview实例的问题

注意下面的方法只能在app端使用&#xff0c; let wv plus.webview.create("","custom-webview",{plusrequire:"none", uni-app: none, width: 300,height:400,top:uni.getSystemInfoSync().statusBarHeight44 }) wv.loadURL("https://ww…

浅析Linux设备驱动:DMA内存映射

文章目录 概述DMA与Cache一致性DMA映射类型一致性DMA映射dma_alloc_coherent 流式DMA映射dma_map_single数据同步操作dma_direct_sync_single_for_cpudma_direct_sync_single_for_device 相关参考 概述 现代计算机系统中&#xff0c;CPU访问内存需要经过Cache&#xff0c;但外…

第6.4章:StarRocks查询加速——Colocation Join

目录 一、StarRocks数据划分 1.1 分区 1.2 分桶 二、Colocation Join实现原理 2.1 Colocate Join概述 2.2 Colocate Join实现原理 三、应用案例 注&#xff1a;本篇文章阐述的是StarRocks-3.2版本的Colocation Join 官网文章地址&#xff1a; Colocate Join | StarRoc…

32单片机基础:GPIO输出

目录 简介&#xff1a; GPIO输出的八种模式 STM32的GPIO工作方式 GPIO支持4种输入模式&#xff1a; GPIO支持4种输出模式&#xff1a; 浮空输入模式 上拉输入模式 下拉输入模式 模拟输入模式&#xff1a; 开漏输出模式&#xff1a;&#xff08;PMOS无效&#xff0c;就…