【毕设选题】flink大数据淘宝用户行为数据实时分析与可视化

文章目录

  • 0 前言
  • 1、环境准备
    • 1.1 flink 下载相关 jar 包
    • 1.2 生成 kafka 数据
    • 1.3 开发前的三个小 tip
  • 2、flink-sql 客户端编写运行 sql
    • 2.1 创建 kafka 数据源表
    • 2.2 指标统计:每小时成交量
      • 2.2.1 创建 es 结果表, 存放每小时的成交量
      • 2.2.2 执行 sql ,统计每小时的成交量
    • 2.3 指标统计:每10分钟累计独立用户数
      • 2.3.1 创建 es 结果表,存放每10分钟累计独立用户数
      • 2.3.2 创建视图
      • 2.3.3 执行 sql ,统计每10分钟的累计独立用户数
    • 2.4 指标统计:商品类目销量排行
      • 2.4.1 创建商品类目维表
      • 2.4.1 创建 es 结果表,存放商品类目排行表
      • 2.4.2 创建视图
      • 2.4.3 执行 sql , 统计商品类目销量排行
  • 3、最终效果与体验心得
    • 3.1 最终效果
    • 3.2 体验心得
      • 3.2.1 执行
      • 3.2.2 存储
  • 4 最后


0 前言

🔥 这两年开始毕业设计和毕业答辩的要求和难度不断提升,传统的毕设题目缺少创新和亮点,往往达不到毕业答辩的要求,这两年不断有学弟学妹告诉学长自己做的项目系统达不到老师的要求。

为了大家能够顺利以及最少的精力通过毕设,学长分享优质毕业设计项目,今天要分享的是

🚩 flink大数据淘宝用户行为数据实时分析与可视化

🥇学长这里给一个题目综合评分(每项满分5分)

  • 难度系数:3分
  • 工作量:3分
  • 创新点:4分

1、环境准备

1.1 flink 下载相关 jar 包

flink-sql 连接外部系统时,需要依赖特定的 jar 包,所以需要事先把这些 jar 包准备好。说明与下载入口

本项目使用到了以下的 jar 包 ,下载后直接放在了 flink/lib 里面。

需要注意的是 flink-sql 执行时,是转化为 flink-job 提交到集群执行的,所以 flink 集群的每一台机器都要添加以下的 jar 包。

外部版本jar
kafka4.1flink-sql-connector-kafka_2.11-1.10.2.jar
flink-json-1.10.2-sql-jar.jar
elasticsearch7.6flink-sql-connector-elasticsearch7_2.11-1.10.2.jar
mysql5.7flink-jdbc_2.11-1.10.2.jar
mysql-connector-java-8.0.11.jar

1.2 生成 kafka 数据

用户行为数据来源: 阿里云天池公开数据集

网盘:https://pan.baidu.com/s/1wDVQpRV7giIlLJJgRZAInQ 提取码:gja5

商品类目纬度数据来源: category.sql

数据生成器:datagen.py

有了数据文件之后,使用 python 读取文件数据,然后并发写入到 kafka。

修改生成器中的 kafka 地址配置,然后运行 以下命令,开始不断往 kafka 写数据

# 5000 并发
nohup python3 datagen.py 5000 &                  

1.3 开发前的三个小 tip

  • 生成器往 kafka 写数据,会自动创建主题,无需事先创建

  • flink 往 elasticsearch 写数据,会自动创建索引,无需事先创建

  • Kibana 使用索引模式从 Elasticsearch 索引中检索数据,以实现诸如可视化等功能。

使用的逻辑为:创建索引模式 》Discover (发现) 查看索引数据 》visualize(可视化)创建可视化图表》dashboards(仪表板)创建大屏,即汇总多个可视化的图表

2、flink-sql 客户端编写运行 sql

# 进入 flink-sql 客户端, 需要指定刚刚下载的 jar 包目录
./bin/sql-client.sh embedded -l lib

2.1 创建 kafka 数据源表

-- 创建 kafka 表, 读取 kafka 数据
CREATE TABLE user_behavior (user_id BIGINT,item_id BIGINT,category_id BIGINT,behavior STRING,ts TIMESTAMP(3),proctime as PROCTIME(),WATERMARK FOR ts as ts - INTERVAL '5' SECOND  
) WITH ('connector.type' = 'kafka', 'connector.version' = 'universal',  'connector.topic' = 'user_behavior',  'connector.startup-mode' = 'earliest-offset', 'connector.properties.zookeeper.connect' = '172.16.122.24:2181', 'connector.properties.bootstrap.servers' = '172.16.122.17:9092', 'format.type' = 'json'  
);
SELECT * FROM user_behavior;

2.2 指标统计:每小时成交量

2.2.1 创建 es 结果表, 存放每小时的成交量

CREATE TABLE buy_cnt_per_hour (hour_of_day BIGINT,buy_cnt BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7',  'connector.hosts' = 'http://172.16.122.13:9200',  'connector.index' = 'buy_cnt_per_hour','connector.document-type' = 'user_behavior','connector.bulk-flush.max-actions' = '1','update-mode' = 'append','format.type' = 'json'
);

2.2.2 执行 sql ,统计每小时的成交量

INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

2.3 指标统计:每10分钟累计独立用户数

2.3.1 创建 es 结果表,存放每10分钟累计独立用户数

CREATE TABLE cumulative_uv (time_str STRING,uv BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7',  'connector.hosts' = 'http://172.16.122.13:9200',  'connector.index' = 'cumulative_uv','connector.document-type' = 'user_behavior',    'update-mode' = 'upsert','format.type' = 'json'
);

2.3.2 创建视图

CREATE VIEW uv_per_10min AS
SELECTMAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);

2.3.3 执行 sql ,统计每10分钟的累计独立用户数

INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;

2.4 指标统计:商品类目销量排行

2.4.1 创建商品类目维表

先在 mysql 创建一张商品类目的维表,然后配置 flink 读取 mysql。

CREATE TABLE category_dim (sub_category_id BIGINT,parent_category_name STRING
) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://172.16.122.25:3306/flink','connector.table' = 'category','connector.driver' = 'com.mysql.jdbc.Driver','connector.username' = 'root','connector.password' = 'root','connector.lookup.cache.max-rows' = '5000','connector.lookup.cache.ttl' = '10min'
);

2.4.1 创建 es 结果表,存放商品类目排行表

CREATE TABLE top_category  (category_name  STRING,buy_cnt  BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7',  'connector.hosts' = 'http://172.16.122.13:9200',  'connector.index' = 'top_category','connector.document-type' = 'user_behavior','update-mode' = 'upsert','format.type' = 'json'
);

2.4.2 创建视图

CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;

2.4.3 执行 sql , 统计商品类目销量排行

INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;

3、最终效果与体验心得

3.1 最终效果

整个开发过程,只用到了 flink-sql ,无需写 java 或者其它代码,就完成了这样一个实时报表。

image-20201201175438743

3.2 体验心得

3.2.1 执行

  • flink-sql 的 ddl 语句不会触发 flink-job , 同时创建的表、视图仅在会话级别有效。

  • 对于连接表的 insert、select 等操作,则会触发相应的流 job, 并自动提交到 flink 集群,无限地运行下去,直到主动取消或者 job 报错。

  • flink-sql 客户端关闭后,对于已经提交到 flink 集群的 job 不会有任何影响。

本次开发,执行了 3 个 insert , 因此打开 flink 集群面板,可以看到有 3 个无限的流 job 。即使 kafka 数据全部写入完毕,关闭 flink-sql 客户端,这个 3 个 job 都不会停止。
image-20201201175523916

3.2.2 存储

  • flnik 本身不存储业务数据,只作为流批一体的引擎存在,所以主要的用法为读取外部系统的数据,处理后,再写到外部系统。

  • flink 本身的元数据,包括表、函数等,默认情况下只是存放在内存里面,所以仅会话级别有效。但是,似乎可以存储到 Hive Metastore 中,关于这一点就留到以后再实践。

4 最后

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/133451.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

康耐视读码器DataMan软件详细使用步骤

1、 点击桌面已经安装好的 dataman 软件并打开 2、 打开之后,点击刷新,刷出来读码器的图标,双击进行连接,或者选中后,点击右下角 的连接。(也可先进行第 9—(2)步更改读码器的 IP,对应的连接对象也更改到同一网 段)如图 3、 连接之后,在设置 快速设置下面把实时显…

【栈与队列面试题】有效的括号(动图演示)

leetcode20.括号匹配问题 前言: 💥🎈个人主页:​​​​​​Dream_Chaser~ 🎈💥 ✨✨刷题专栏:http://t.csdn.cn/UlvTc ⛳⛳本篇内容:力扣上栈与队列的面试OJ题目 目录 leetcode20.括号匹配问题 1.问题描…

msvcp120.dll怎么修复?msvcp120.dll丢失的解决方法

在当今这个信息化的时代,电脑已经成为我们生活和工作中不可或缺的一部分。然而,随着电脑技术的不断发展,我们也会遇到各种各样的问题。其中,msvcp120.dll丢失是一个常见的问题。一、msvcp120.dll 文件介绍 1 msvcp120.dll 文件的定…

关于 firefox 不能访问 http 的解决

情景: 我在虚拟机 192.168.x.111 上配置了 DNS 服务器,在 kali 上设置 192.168.x.111 为 DNS 服务器后,使用 firefox 地址栏搜索域名 www.xxx.com ,访问在 192.168.x.111 搭建的网站,本来经 192.168.x.111 DNS 服务器解…

MATLAB入门-数据的导入和导出

MATLAB入门-数据的导入和导出 注:本篇文章是课程学习笔记,课程链接为:头歌 常见的几个导入数据的方法 load函数 load函数专门用于引入MATLAB的.mat格式数据,十分的简单方便。 例如:一个-ASCII编码形式存储的数据文件…

VMware启用共享文件夹

1. 启用 编辑虚拟机设置 - 选项 - 共享文件夹 - 总是启用 - 添加 2. 启动Ubuntu查看 正常情况/mnt目录会出现文件夹hgfs 如果不存在,可参考 这篇文章 操作 如果安装VMWare tools后/mnt中有hgfs但没共享文件,可参考 这篇文章 如果出现 mount: unkno…

【Unity每日一记】资源加载相关和检测相关

👨‍💻个人主页:元宇宙-秩沅 👨‍💻 hallo 欢迎 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍💻 本文由 秩沅 原创 👨‍💻 收录于专栏:uni…

ROS学习笔记(四)---使用 VScode 启动launch文件运行多个节点

ROS学习笔记文章目录 01. ROS学习笔记(一)—Linux安装VScode 02. ROS学习笔记(二)—使用 VScode 开发 ROS 的Python程序(简例) 03. ROS学习笔记(三)—好用的终端Terminator 一、什么是launch文件 虽然说Terminator终端是能够比较方便直观的看运行的节点…

查看视频文件关键帧间隔

一、Elecard StreamEye Tools拖放视频文件查看。 红的是I帧;蓝的是P帧;绿的是B帧。 二、ffprobe -show_streams统计。 1、统计视频关键帧、非关键帧 ffprobe.exe -i 1.mp4 -show_streams v -show_packets -print_format json > d:\1.json 再统计1.j…

SQL8 查找某个年龄段的用户信息

描述 题目:现在运营想要针对20岁及以上且23岁及以下的用户开展分析,请你取出满足条件的设备ID、性别、年龄。 用户信息表:user_profile iddevice_idgenderageuniversityprovince12138male21北京大学Beijing23214male复旦大学Shanghai36543…

徐亦达机器学习:Kalman Filter 卡尔曼滤波笔记 (一)

P ( x t P(x_t P(xt​| x t − 1 ) x_{t-1}) xt−1​) P ( y t P(y_t P(yt​| x t ) x_t) xt​) P ( x 1 ) P(x_1) P(x1​)Discrete State DM A X t − 1 , X t A_{X_{t-1},X_t} AXt−1​,Xt​​Any π \pi πLinear Gassian Kalman DM N ( A X t − 1 B , Q ) N(AX_{t-1}B,Q)…

Git多人开发解决冲突案例

准备工作: 1.创建一个gitee远程仓库https://gitee.com/xxxxxxx.git 2.初始化两个本地git仓库用户,目的是模拟多人协作开发时提交代码发生冲突的场景 3.解决冲突并提交。 进入正题: lisi 通过vim指令修改readme.md文件内容,推送到…

正则表达式使用总结

一、字符匹配 普通字符:普通字符按照字面意义进行匹配,例如匹配字母 "a" 将匹配到文本中的 "a" 字符。 元字符:元字符具有特殊的含义,例如 \d 匹配任意数字字符,\w 匹配任意字母数字字符&#xf…

广州口腔医院种植牙-广东省爱牙工程公益种牙,获湾区群众点赞

广州种植牙价格表-自2017年成立以来,广东省爱牙工程一直坚持以公益惠民为宗旨、公益种牙为服务方向,针对群众普遍存在的口腔健康问题,开展形式多样的公益性口腔医疗惠民活动。 广州种植牙费用表-日前,广东省爱牙工程“种植牙惠民行动”第二十季已正式启动。据广东省爱牙工程官方…

Datax 数据同步-使用总结(二)

一、前言 这部分主要记录 datax 实现增量同步的方案。 二、核心思路 结合datax 提供的preSql、 postSql以及占位符,外加另外一张表同步日志表来记录相关同步信息。 三、版本迭代 3.1 初版本 where tbq.opera_date > cast(date_format(DATE_SUB(NOW(), inte…

C++数据结构X篇_13_二叉树基本概念、性质及表示法

文章目录 1. 二叉树基本概念1.1 定义1.2 逻辑结构:1.3 基本特征1.4 基本形态:1.5 问题 :(仅做了解) 2. 二叉树性质2.1 性质1:在二叉树第i层上至多有2的(i-1)次方个节点(i>0)2.2 性…

RecSysOps: 大规模推荐系统运维最佳实践

运维大规模系统总会面临很多挑战,本文介绍了Netflix在运维大规模推荐系统时总结出的最佳实践。原文: RecSysOps: Best Practices for Operating a Large-Scale Recommender System 运维大规模推荐系统是一项复杂的工作,这样的系统有高可用性和吞吐量的需…

JSP ssm 特殊人群防走失系统myeclipse开发mysql数据库springMVC模式java编程计算机网页设计

一、源码特点 JSP ssm 特殊人群防走失系统是一套完善的web设计系统(系统采用SSM框架进行设计开发,springspringMVCmybatis),对理解JSP java编程开发语言有帮助,系统具有完整的源 代码和数据库,系统主要…

C语言_指针进阶(下)

文章目录 前言一、函数指针数组二、指向函数指针数组的指针三. 回调函数四. qsort 函数五. 数组名的理解 sizeof5.1 数组名的理解(二维数组)5.1.1 数组名的理解 strlen5.1.2 例题:例一.例二.例三.例四. 前言 一、函数指针数组 数组是一个存放相同类型数…

Ubuntu22.04配置WiFi

Ubuntu22.04配置WiFi 注意:在/etc/netplan/​下的配置文件,格式一定要正确,否则用sudo netplan try​的时候会报错 一、查看无线网卡的名称 //choice-1 ls /sys/class/net//choice-2 ip a//choice-3 ifconfig -a‍ 二、修改配置文件 文件…