Flink实时电商数仓之DWS层

需求分析

  • 关键词
    在这里插入图片描述
  • 统计关键词出现的频率

IK分词

进行分词需要引入IK分词器,使用它时需要引入相关的依赖。它能够将搜索的关键字按照日常的使用习惯进行拆分。比如将苹果iphone 手机,拆分为苹果,iphone, 手机。

<dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.17</artifactId>
</dependency><dependency><groupId>com.janeluo</groupId><artifactId>ikanalyzer</artifactId>
</dependency>

测试代码如下:

public class IkUtil {public static void main(String[] args) throws IOException {String s = "Apple 苹果15 5G手机";StringReader stringReader = new StringReader(s);IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true);//第二个参数表示是否再对拆分后的单词再进行拆分,true时表示不在继续拆分Lexeme next = ikSegmenter.next();while (next!= null) {System.out.println(next.getLexemeText());next = ikSegmenter.next();}}
}

整体流程

  1. 创建自定义分词工具类IKUtil,IK是一个分词工具依赖
  2. 创建自定义函数类
  3. 注册函数
  4. 消费kafka DWD页面主题数据并设置水位线
  5. 从主流中过滤搜索行为
    • page[‘item’] is not null
    • item_type : “keyword”
    • last_page_id: “search”
  6. 使用分词函数对keyword进行拆分
  7. 对keyword进行分组开窗聚合
  8. 写出到doris
    • 创建doris sink
    • flink需要打开检查点才能将数据写出到doris

在这里插入图片描述

具体实现

import com.atguigu.gmall.realtime.common.base.BaseSQLApp;
import com.atguigu.gmall.realtime.common.constant.Constant;
import com.atguigu.gmall.realtime.common.util.SQLUtil;
import com.atguigu.gmall.realtime.dws.function.KwSplit;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;/*** title:** @Author 浪拍岸* @Create 28/12/2023 上午11:06* @Version 1.0*/
public class DwsTrafficSourceKeywordPageViewWindow extends BaseSQLApp {public static void main(String[] args) {new DwsTrafficSourceKeywordPageViewWindow().start(10021,4,"dws_traffic_source_keyword_page_view_window");}@Overridepublic void handle(StreamExecutionEnvironment env, TableEnvironment tableEnv, String groupId) {//1. 读取主流dwd页面主题数据tableEnv.executeSql("create table page_info(\n" +"    `common` map<string,string>,\n" +"    `page` map<string,string>,\n" +"    `ts` bigint,\n" +"    `row_time` as to_timestamp_ltz(ts,3),\n" +"     WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" +")" + SQLUtil.getKafkaSourceSQL(Constant.TOPIC_DWD_TRAFFIC_PAGE, groupId));//测试是否获取到数据//tableEnv.executeSql("select * from page_info").print();//2. 筛选出关键字keywordsTable keywrodTable = tableEnv.sqlQuery("select\n" +"    page['item'] keywords,\n" +"    `row_time`,\n" +"    ts\n" +" from page_info\n" +" where page['last_page_id'] = 'search'\n" +" and page['item_type'] = 'keyword'\n" +" and page['item'] is not null");tableEnv.createTemporaryView("keywords_table", keywrodTable);// 测试是否获取到数据//tableEnv.executeSql("select * from keywords_table").print();//3. 自定义分词函数并注册tableEnv.createTemporarySystemFunction("kwSplit", KwSplit.class );//4. 调用分词函数对keywords进行拆分Table splitKwTable = tableEnv.sqlQuery("select keywords, keyword, `row_time`" +" from keywords_table" +" left join lateral Table(kwSplit(keywords)) on true");tableEnv.createTemporaryView("split_kw_table", splitKwTable);//tableEnv.executeSql("select * from split_kw_table").print();//5. 对keyword进行分组开窗聚合Table windowAggTable = tableEnv.sqlQuery("select\n" +"    keyword,\n" +"    cast(tumble_start(row_time,interval '10' second ) as string) wStart,\n" +"    cast(tumble_end(row_time,interval '10' second ) as string) wEnd,\n" +"    cast(current_date as string)  cur_date,\n" +"    count(*) keyword_count\n" +"from split_kw_table\n" +"group by tumble(row_time, interval '10' second), keyword");//tableEnv.createTemporaryView("result_table",table);//tableEnv.executeSql("select keyword,keyword_count+1 from result_table").print();//6. 写出到doristableEnv.executeSql("create table doris_sink\n" +"(\n" +"    keyword                STRING,\n" +"    wStart                 STRING,\n" +"    wEnd                   STRING,\n" +"    cur_date               STRING,\n" +"    keyword_count          BIGINT\n" +")" + SQLUtil.getDorisSinkSQL(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW));windowAggTable.insertInto("doris_sink").execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/228985.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【IoT网络层】STM32 + ESP8266 +MQTT + 阿里云物联网平台 |开源,附资料|

目标&#xff1a;实现STM32连接阿里云物联网平台发送数据同时接收数据&#xff0c;IOT studio界面显示数据。具体来说&#xff1a;使用ESP8266 ESP-01来连接网络&#xff0c;获取设备数据发送到阿里云物联网平台并显示且oled显示屏当前的设备数据&#xff0c;通过IOT studio界面…

网络层解读

基本介绍 概述 当两台主机之间的距离较远(如相隔几十或几百公里&#xff0c;甚至几千公里)时&#xff0c;就需要另一种结构的网络&#xff0c;即广域网。广域网尚无严格的定义。通常是指覆盖范围很广(远超过一个城市的范围)的长距离的单个网络。它由一些结点交换机以及连接这些…

Opencv(C++)学习之cv::calcHist 任意bin数量进行直方图计算

**背景&#xff1a;**当前网上常见的直方图使用方法都是默认使用256的范围&#xff0c;而对于使用特定范围的直方图方法讲的不够清楚。仔细研究后总结如下&#xff1a; 1、常见使用方法&#xff0c;直接对灰度图按256个Bin进行计算。 Mat mHistUn; int channels[1] { 0 }; {…

AC——对HTTPS数据进行行为审计时的解密方式

目录 SSL中间人解密 客户端代理解密&#xff08;准入插件解密&#xff09; 深信服的AC提供两种SSL解密技术用于对https行为进行解密 中间人解密和准入插件解密 SSL中间人解密 解密工作原理 当内网PC端发起SSL连接请求的时候&#xff0c;AC会以代理服务器的身份&#xff0…

Mybatis枚举类型处理和类型处理器

专栏精选 引入Mybatis Mybatis的快速入门 Mybatis的增删改查扩展功能说明 mapper映射的参数和结果 Mybatis复杂类型的结果映射 Mybatis基于注解的结果映射 Mybatis枚举类型处理和类型处理器 再谈动态SQL Mybatis配置入门 Mybatis行为配置之Ⅰ—缓存 Mybatis行为配置…

Postman!IDEA中也能用!

Postman是大家最常用的API调试工具&#xff0c;那么有没有一种方法可以不用手动写入接口到Postman&#xff0c;即可进行接口调试操作&#xff1f;今天给大家推荐一款IDEA插件&#xff1a;Apipost Helper&#xff0c;写完代码就可以调试接口并一键生成接口文档&#xff01;而且还…

「微服务」Saga 模式 如何使用微服务实现业务事务-第二部分

在上一篇文章中&#xff0c;我们看到了实现分布式事务的一些挑战&#xff0c;以及如何使用Event / Choreography方法实现Saga的模式。在本文中&#xff0c;我们将讨论如何通过使用另一种类型的Saga实现&#xff08;称为Command或Orchestration&#xff09;来解决一些问题&#…

2024年原创深度学习算法项目分享

原创深度学习算法项目分享&#xff0c;包括以下领域&#xff1a; 图像视频、文本分析、知识图谱、推荐系统、问答系统、强化学习、机器学习、多模态、系统界面、爬虫、增量学习等领域… 有需要的话&#xff0c;评论区私聊

Unity Meta Quest 一体机开发(十二):【手势追踪】Poke 交互 - 用手指点击由 3D 物体制作的 UI 按钮

文章目录 &#x1f4d5;教程说明&#x1f4d5;给玩家配置 HandPokeInteractor&#x1f4d5;用 3D 物体制作可以被点击的 UI 按钮⭐搭建物体层级⭐给物体添加脚本⭐为脚本变量赋值 &#x1f4d5;模仿官方样例按钮的样式&#x1f4d5;在按钮上添加文字&#x1f4d5;修改按钮图片 …

linux安装rabbitmq

文章目录 前言一、下载安装包二、erlang1.安装依赖2.解压3.安装4.环境变量5.验证 三、rabbitmq1.安装依赖2.解压3.新建目录4.rabbitmq.env.conf5.rabbitmq.conf6.环境变量7.启动8.验证9.停止 四、安装web1.安装插件2.访问控制台界面 五、开机启动1.编写脚本2.设置开机启动3.测试…

c语言-string.h库函数初识

目录 前言一、库函数strlen()1.1 strlen()介绍1.2 模拟实现strlen() 二、库函数strcpy()2.1 strcpy()介绍2.2 模拟实现strcpy() 三、库函数strcmp()3.1 strcmp()介绍3.3 模拟实现strcmp() 总结 前言 本篇文章介绍c语言<string.h>头文件中的库函数&#xff0c;包含strlen…

从仿写持久层框架到MyBatis核心源码阅读

接上篇手写持久层框架&#xff1a;https://blog.csdn.net/liwenyang1992/article/details/134884703 MyBatis源码 MyBatis架构原理&主要组件 MyBatis架构设计 MyBatis架构四层作用是什么呢&#xff1f; API接口层&#xff1a;提供API&#xff0c;增加、删除、修改、查询…

Matlab技巧[绘画逻辑分析仪产生的数据]

绘画逻辑分析仪产生的数据 逻分上抓到了ADC数字信号,一共是10Bit,12MHZ的波形: 这里用并口协议已经解析出数据: 导出csv表格数据(这个数据为补码,所以要做数据转换): 现在要把这个数据绘制成波形,用Python和表格直接绘制速度太慢了,转了一圈发现MATLAB很好用,操作方法如下:…

若依(Spring boot)框架中如何在不同的控制器之间共享与使用数据

在若依框架或Spring boot框架中&#xff0c;控制器共享和使用数据是为了确保数据一致性、传递信息、提高效率和降低系统复杂性。这可以通过全局变量、依赖注入或数据库/缓存等方式实现。共享和使用数据对框架的正常运行非常关键&#xff0c;有助于促进控制器之间的协同工作&…

阶段十-分布式-nginx服务器

一、Nginx简介 Nginx 是高性能的 HTTP 和反向代理的服务器&#xff0c;处理高并发能力是十分强大的&#xff0c;能经受高负载的考验,有报告表明能支持高达 50,000 个并发连接数。tomcat并发数量理论值是500&#xff0c;实际也就300左右。 1.2 正向代理 正向代理代理的是客户…

OpenGL FXAA抗锯齿算法(Qt)

文章目录 一、简介二、实现代码三、实现效果参考资料一、简介 之前已经提供了使用VCG读取Mesh的方式,接下来就需要针对读取的网格数据进行一些渲染操作了。在绘制Mesh数据时总会遇到图形的抗锯齿问题,OpenGL本身已经为我们提供了一种MSAA技术,但该技术对于一些实时渲染性能有…

产品经理学习-从0-1搭建策略产品

从0-1搭建策略产品 目录&#xff1a; 回顾策略产品 如何从0-1搭建策略产品 回顾策略产品 之前也了解过从产品实施的角度来看&#xff0c;策略就是针对问题的解决方案&#xff0c;在互联网时代更集中体现在2个维度&#xff1a;业务场景和数据应用 如何从0-1搭建策略产品 我们…

交叉验证的种类和原理(sklearn.model_selection import *)

交叉验证的种类和原理 所有的来自https://scikit-learn.org/stable/modules/cross_validation.html#cross-validation-iterators并掺杂了自己的理解。 文章目录 前言一、基础知识1.1 交叉验证图形表示1.2 交叉验证主要类别 二、部分交叉验证函数&#xff08;每类一个&#xff0…

如何在VSCode搭建ESP-IDF开发ESP32

文章目录 概要安装VScode安装ESP-IDF插件使用官方例程小结 概要 ESP-IDF(Espressif IoT Development Framework) 即乐鑫物联网开发框架&#xff0c;它基于 C/C 语言提供了一个自给自足的 SDK&#xff0c;可为在 Windows、Linux 和 macOS 系统平台上开发 ESP32 应用程序提供工具…

旅游平台网页前后端

功能清单 游客功能 用户注册、登录登录权限拦截按名称搜索房间支付流程查看订单信息和状态评论预定过的房间&#xff0c;并自动修改订单状态查看统计剩余房间数量&#xff0c;数量为0时不可预定 管理员功能 房间分类管理 类型的删除、修改、查询&#xff08;准备添加增添功能…