Flink实时数仓之用户埋点系统(一)

需求分析及框架选型

  • 需求分析
    • 数据采集
      • 用户行为采集
      • 业务数据采集
    • 行为日志分析
      • 用户行为日志
      • 页面日志
      • 启动日志
      • APP在线日志
    • 业务数据分析
      • 用户Insert数据
      • 用户Update数据
  • 技术选型
    • Nginx配置
    • Flume配置
    • MaxWell
    • Hadoop
    • Flink
    • 架构图

需求分析

数据采集

用户行为采集

  1. 行为数据:页面浏览、点击、在线日志等数据
  2. 活跃数据:用户注册、卸载安装、活跃等数据
  3. App性能日志:卡顿、异常等数据

业务数据采集

  1. 业务数据:支付等
  2. 维度表:渠道、商品等

行为日志分析

用户行为日志

日志结构大致可分为两类,一是页面日志,二是启动日志和在线日志。

页面日志

页面日志,以页面浏览为单位,即一个页面浏览记录,生成一条页面埋点日志。一条完整的页面日志包含,一个页面浏览记录和多个用户在该页面所做的动作记录,以及若干个该页面的曝光记录,以及一个在该页面发生的报错记录。除上述行为信息,页面日志还包含了这些行为所处的各种环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。

{"common": {                     -- 环境信息"imei":"xxx","device_id":"12323",         --客户端唯一识别id,安卓传安卓唯一id,ios传idfa (可以为空,上报空值率)"acc_id": "aad3",             -- APP注册ID,如:aad32c13aa6d008c_1D19l (可以为空,上报空值率)"app_type_id":"DFTT",        --App软件唯一识别符,如:步多多为100001 (不可为空,非DFTT/ZYSRF/SXG/6位数字则报错,空值则报错)"qid": "kuaishou1",           -- 渠道"group_qid":"kuaishou",  -- 渠道分组"asc_qid":"xiaoh",                -- 归因渠道号 (可以为空,上报空值率)"app_ver":"v2.1.134",        --App版本号 ,如:1.1.1 最大99.99.99(不可为空,非标准格式及空值 报错)"os":"IOS",                  --操作系统,如:Android、iOS (不可为空,非:Android/iOS及空值 报错 )"os_version":"11.0",         --操作系统版本号,如:7.0 (不可为空,空值 报错)"device":"xiao mi 6",        --公共参数)客户端手机型号,如:xiaomi6 (不可为空,空值 报错 )"device_brand":"xiaomi",     --机型品牌,如:HUAWEI (不可为空,空值 报错)"pixel":"1080*1920",         --屏幕分辨率,如:1080*1920 (不可为空 ,空值 报错)"network":"5g",              --网络环境,如:wifi、4g、3g、2g、other (不可为空 ,非:wifi、4g、3g、2g、other及空值 报错)"is_tourist":1 ,             --是否是游客,如:1表示游客、0表示非游客、2表示未登录"obatch_id":"ddadccae",      --本次启动唯一ID:每次启动时生成一个唯一批次号,直到下次启动才变更 (可以为空 ,上报空值率)  "ip":"127.0.0.1",            --ip"is_new": 1,                 -- 是否为新用户 0老用户,1新用户(安装后启动的第一天用户都为新用户,第一天之后都为老用户) "code": "xxx",              -- 平台标识 "lab_code": "实验A",         -- 实验code "lab_group_code": "note"     -- 实验分组code  },"actions": [{                   -- 页面动作信息"page_url": "/good_detail",  -- 页面url(取相对路径)"action_type": "show",       -- 动作类型:展现传“show”、点击传“click”、关闭传“close” (不可为空,非show、click、close报错 空值报错)"event": "Vip",              -- 事件类型"sub_event": "Me"            -- 事件子类型}],"pages": [{                        -- 页面信息"during_time": 7648,            -- 持续时间毫秒"page_url": "/good_detail",     -- 页面url(取相对路径)"last_page_url":"",             -- 上一个页面url(取相对路径,首次访问为空)"event": "Vip",                 -- 事件类型"sub_event": "Me",              -- 页面名称"last_sub_event": "login"       -- 上页的名称  }] "ts": 1585744374423             --日志上报时间戳
}

启动日志

启动日志以启动为单位,一次启动行为,生成一条启动日志。一条完整的启动日志包括一个启动记录,一个本次启动时的报错记录,以及启动时所处的环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。

{"common": {"imei":"idfv","device_id":"12323",         --客户端唯一识别id,安卓传安卓唯一id,ios传idfa (可以为空,上报空值率)"acc_id": "aad3",            -- APP注册ID,如:aad32c13aa6d008c_1D19l (可以为空,上报空值率)"app_type_id":"DFTT",        --App软件唯一识别符,如:步多多为100001 (不可为空,非DFTT/ZYSRF/SXG/6位数字则报错,空值则报错)"qid": "xxx",                -- 渠道"group_qid":"xxxx",    -- 渠道分组"asc_qid":"",                --归因渠道号 (可以为空,上报空值率)"app_ver":"v2.1.134",        --App版本号 ,如:1.1.1 最大99.99.99(不可为空,非标准格式及空值 报错)"os":"IOS",                  --操作系统,如:Android、iOS (不可为空,非:Android/iOS及空值 报错 )"os_version":"11.0",         --操作系统版本号,如:7.0 (不可为空,空值 报错)"device":"xiao mi 6",        --公共参数)客户端手机型号,如:xiaomi6 (不可为空,空值 报错 )"device_brand":"xiaomi",     --机型品牌,如:HUAWEI (不可为空,空值 报错)"pixel":"1080*1920",         --屏幕分辨率,如:1080*1920 (不可为空 ,空值 报错)"network":"5g",              --网络环境,如:wifi、4g、3g、2g、other (不可为空 ,非:wifi、4g、3g、2g、other及空值 报错)"is_tourist":1 ,             --是否是游客,如:1表示游客、0表示非游客、2表示未登录"obatch_id":"ddadccae",      --本次启动唯一ID:每次启动时生成一个唯一批次号,直到下次启动才变更 (可以为空 ,上报空值率)  "ip":"127.0.0.1",            --ip"is_new": 1,                 -- 是否为新用户 0老用户,1新用户"code": "xxx",               -- 平台标识 "lab_code": "实验A",         -- 实验code "lab_group_code": "note"     -- 实验分组code  },"start": {   "start_way": 0,          --启动方式。 0:热启动  1:代表首次安装首次启动  2:冷启动"entry": "icon",          --启动途径。icon:手机图标  notice:通知   install:安装后启动"loading_time": 18803    --启动加载时间},"ts": 1585744304000        --日志上报时间戳
}

APP在线日志

App在线日志以启动-关闭为单位,一次启动-关闭行为,生成一条启动-关闭日志。

{"common": {"imei":"idfv","device_id":"12323",         --客户端唯一识别id,安卓传安卓唯一id,ios传idfa (可以为空,上报空值率)"acc_id": "aad3",            -- APP注册ID,如:aad32c13aa6d008c_1D19l (可以为空,上报空值率)"app_type_id":"DFTT",        --App软件唯一识别符,如:步多多为100001 (不可为空,非DFTT/ZYSRF/SXG/6位数字则报错,空值则报错)"qid": "xxx",           -- 渠道"group_qid":"xxxx",  -- 渠道分组"asc_qid":"",                -- 归因渠道号 (可以为空,上报空值率)"app_ver":"v2.1.134",        --App版本号 ,如:1.1.1 最大99.99.99(不可为空,非标准格式及空值 报错)"os":"IOS",                  --操作系统,如:Android、iOS (不可为空,非:Android/iOS及空值 报错 )"os_version":"11.0",         --操作系统版本号,如:7.0 (不可为空,空值 报错)"device":"xiao mi 6",        --公共参数)客户端手机型号,如:xiaomi6 (不可为空,空值 报错 )"device_brand":"xiaomi",     --机型品牌,如:HUAWEI (不可为空,空值 报错)"pixel":"1080*1920",         --屏幕分辨率,如:1080*1920 (不可为空 ,空值 报错)"network":"5g",              --网络环境,如:wifi、4g、3g、2g、other (不可为空 ,非:wifi、4g、3g、2g、other及空值 报错)"is_tourist":1 ,             --是否是游客,如:1表示游客、0表示非游客、2表示未登录"obatch_id":"ddadccae",      --本次启动唯一ID:每次启动时生成一个唯一批次号,直到下次启动才变更 (可以为空 ,上报空值率)  "ip":"127.0.0.1",            --ip"is_new": 1,                 -- 是否为新用户 0老用户,1新用户 "code": "xxx",              -- 平台标识 "lab_code": "实验A",         -- 实验code "lab_group_code": "note"     -- 实验分组code  },"online": {   "start_way": 0,            --启动方式。 0:热启动  1:代表首次安装首次启动  2:冷启动"start_time":  18803111 ,  --开始时间(毫秒)"end_time":  188033 ,      --退出时间(毫秒)"online_time": 18803      --在线时长(毫秒)},"ts": 1585744304000        --日志上报时间戳
}

新老用户的判断规则
APP 端:用户安装 App 后,第一次打开 App 的当天,Android/iOS SDK 会在手机本地缓存内,创建一个首日为 true 的标记,并且设置第一天 24 点之前,该标记均为 true。
即:第一天触发的 APP 端所有事件中,is_new = 1。即第一天之后触发的 APP 端所有事件中,is_new = 0。
对于此类日志,如果首日之后用户清除了手机本地缓存中的标记,再次启动 APP 会重新设置一个首日为 true 的标记,导致本应为 0 的 is_new 字段被置为1
前端处理规则
is_new(1:新用户,0:老用户)用户安装 App 后,第一次打开 App 的当天,即第一天触发的 APP 端所有事件中,is_new = 1,第一天之后,该标记则为 false,即第一天之后触发的 APP 端所有事件中,is_new = 0。首日之后用户清除了手机本地缓存中的标记,is_new = 1此时由后端处理

业务数据分析

1)用户订单、支付、退款等业务的新增、修改、删除操作都会生成一个binlog日志,通过MaxWell采集这些日志到Kafka消息队列中

用户Insert数据

类型:“type”: “insert”

{"database":"databaseA","table":"t_pay_order","type":"insert","ts":1686540443,"xid":16179,"commit":true,"data":{"uid":"1660557015483727879","order_no":"P202305221603541978152962","pay_order_id":"","way_code":"APPLE_APP","amount":1800,"currency":"cny","state":1,"product_id": 1,"product_name":"商品1","product_num":1,"body":"xxxxx","user_id":"1660533343607898114","refund_state":0,"refund_times":0,"refund_amount":0,"subscribed":1,"expired_time":"2023-06-21 16:03:39","success_time":null,"create_time":"2023-05-22 08:03:38.805000","update_time":"2023-06-12 02:01:26.996053","err_code":"21011","err_msg":"订单已退款或已订阅过期"}
}

用户Update数据

{"database":"note_data","table":"t_pay_order","type":"update","ts":1686535286,"xid":4853,"commit":true,"data":{"uid":"1660557015483727876","order_no":"P202305221603541978152961","pay_order_id":"","way_code":"APPLE_APP","amount":1800,"currency":"cny","state":3,"product_id":"VIP_Moth_18","product_name":"月度VIP","product_num":1,"body":"月度会员","user_id":"1660533343607898114","refund_state":0,"refund_times":0,"refund_amount":0,"subscribed":1,"expired_time":"2023-06-21 16:03:39","success_time":null,"create_time":"2023-05-22 08:03:38.805000","update_time":"2023-06-12 02:01:26.996053","err_code":"21011","err_msg":"订单已退款或已订阅过期"},"old":{"pay_order_id":"rfsddfx","update_time":"2023-06-09 10:32:59.769593"}
}

技术选型

  1. 数据采集与传输:Nginx、Flume、Kafka、MaxWell
  2. 数据存储:HDFS、HBASE、Redis
  3. 计算引擎:Flink
  4. 数据存储:ClickHouse
  5. 任务调度:Flink On Yarn

Nginx配置

作用

  • 收集用户埋点日志:生成log_file文件。
  • 收集post请求中的request_body,在/data/logs/nginx/user_data/文件夹下生成log日志

配置

http {include       mime.types;default_type  application/octet-stream;log_format  main  '$remote_addr - $remote_user [$time_local] "$request" ';log_format data_json escape=json ' $request_body ';access_log  logs/access.log  main;sendfile        on;#tcp_nopush     on;#keepalive_timeout  0;keepalive_timeout  65;map $time_iso8601 $logdate {'~^(?<ymd>\d{4}-\d{2}-\d{2})' $ymd;default    'date-not-found';}server {listen      8090;server_name 127.0.0.1;access_log  /data/logs/nginx/user_data/user_big_data-$logdate.log  data_json;error_log /data/logs/nginx/user_data/user_big_data_error-$logdate.log  error;location / {proxy_pass  http://127.0.0.1:8090/api/log/;}location /api/log/ {return 200;}}
}

Flume配置

作用

  • 采集文件到kafka队列中,这里的source(数据源)是文件,channel(通道),sink(输出源)是kafka

关键配置

#定义组件
a1.sources = r1
a1.channels = c1#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /data/logs/nginx/user_data/user_data/.*log
a1.sources.r1.positionFile =  /opt/apache-flume-1.9.0-bin/opt/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.sinozo.data.flume.interceptor.ETLInterceptor$Builder#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers =127.0.0.1:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel#组装 
a1.sources.r1.channels = c1

MaxWell

作用
实时收集mysql中的binlog数据,输出到kafka队列中

关键配置

#Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
#目标Kafka集群地址
kafka.bootstrap.servers=localhost:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=topic_db#配置只监听note_data库下t_pay_order表
exclude_dbs=*
include_dbs=note_data
include_tables=t_pay_order#MySQL相关配置
host=localhosts
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

启动命令

#!/bin/bash
MAXWELL_HOME=/opt/maxwell-1.29.2status_maxwell(){result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`return $result
}start_maxwell(){status_maxwellif [[ $? -lt 1 ]]; thenecho "启动Maxwell"$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --filter="exclude: *.*, include: db.*, exclude: *.*, include: *.t_pay_order"  --daemonelseecho "Maxwell正在运行"fi
}stop_maxwell(){status_maxwellif [[ $? -gt 0 ]]; thenecho "停止Maxwell"ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9elseecho "Maxwell未在运行"fi
}case $1 instart )start_maxwell;;stop )stop_maxwell;;restart )stop_maxwellstart_maxwell;;
esac

Hadoop

作用
HDFS作为存储的基础组件,防止flink计算过程中的checkPoint检查点数据以及状态数据
Yarn作为调度组件,对flink的jobManager、taskManager内存等资源进行动态分配、并对taskManager进行监控

Flink

作用
作为实时计算引擎,对业务数据、用户埋点数据进行分组、统计等计算

架构图

Flink On Yarn架构图

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/270910.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IR 召回测试数据集——MS MARCO

如何评估召回系统的好坏&#xff1f;如何评估检索系统是否有提升&#xff1f;在任何人面前&#xff0c;空口无凭。 我们需要一把尺子来衡量。我们需要一个高质量的测试数据集合。每次都在相同的测试数据集上&#xff0c;进行评测。本篇文章介绍一个高质量的应为的测试数据集——…

蓝桥杯集训·每日一题2024 (差分)

前言&#xff1a; 差分笔记以前就做了&#xff0c;在这我就不再写一遍了&#xff0c;直接上例题。 例题&#xff1a; #include<bits/stdc.h> using namespace std; int a[10009],b[100009]; int main(){int n,ans10,ans20;cin>>n;for(int i1;i<n;i){cin>>…

C++复习笔记——泛型编程模板

01 模板 模板就是建立通用的模具&#xff0c;大大提高复用性&#xff1b; 02 函数模板 C另一种编程思想称为 泛型编程 &#xff0c;主要利用的技术就是模板 C 提供两种模板机制:函数模板和类模板 函数模板语法 函数模板作用&#xff1a; 建立一个通用函数&#xff0c;其函…

透视和仿射变换的区别

仿射变换矩阵通常是2x3的矩阵。 三个特点&#xff1a; 直线依然是直线平行线依然平行 [ x ′ y ′ 1 ] [ a 11 a 12 b 1 a 21 a 22 b 2 0 0 1 ] [ x y 1 ] x ′ a 11 ∗ x a 12 ∗ y b 1 y ′ a 21 ∗ x a 22 ∗ y b 2 \begin{gathered} \begin{bmatrix}x\\y\\1\end{b…

Linux Ubuntu系统安装MySQL并实现公网连接本地数据库【内网穿透】

文章目录 前言1 .安装Docker2. 使用Docker拉取MySQL镜像3. 创建并启动MySQL容器4. 本地连接测试4.1 安装MySQL图形化界面工具4.2 使用MySQL Workbench连接测试 5. 公网远程访问本地MySQL5.1 内网穿透工具安装5.2 创建远程连接公网地址5.3 使用固定TCP地址远程访问 前言 本文主…

LeetCode每日一题 二叉树的最大深度(二叉树)

题目描述 给定一个二叉树 root &#xff0c;返回其最大深度。二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;3 示例 2&#xff1a; 输入&#xff1a;root [1,nul…

【异常处理】Vue报错 Component template should contain exactly one root element.

问题描述 启动VUE项目后控制台报错&#xff1a; Component template should contain exactly one root element. If you are using v-if on multiple elements, use v-else-if to chain them instead.翻译为&#xff1a;组件模板应该只包含一个根元素 查看vue代码&#xff0…

spring-jpa

一、介绍 1.1ORM 1.2 Java Persistence API 放在javaee版本 优点 支持持久化复杂的Java对象&#xff0c;简化Java应用的对象持久化开发支持使用JPQL语言进行复杂的数据查询使用简单&#xff0c;支持使用注解定义对象关系表之间的映射规范标准化&#xff0c;由Java官 方统一规…

nmaptocsv.py脚本无法处理结果的备用工具

python nmaptocsv.py -i test.nmap -f ip-fqdn-port-protocol-service-version。 一: 使用背景&#xff1a; 使用一些其他的端口扫描软件&#xff0c;指纹识别可能有些端口 如一次&#xff1a;11011端口是mysql数据库但是别的软件扫不出来是mysql&#xff0c;借助nmap对1101…

火爆全网,软件测试数据库常用 SQL 语句总结,你要的我都有......

前言 直接上干货 数据定义语言(DDL) 主要负责数据库、数据表、视图、键、索引等结构化的操作 常用的语句有&#xff1a;CREATE DATABASE、CREATE TABLE、ALTER TABLE等 字段的常用约束有&#xff1a;PRIMARY KEY、FOREIGN KEY、NOT NULL、UNIQUE、AUTO_INCREMENT、DEFAULT 常…

应用案例 | Softing echocollect e网关助力汽车零部件制造商构建企业数据库,提升生产效率和质量

为了提高生产质量和效率&#xff0c;某知名汽车零部件制造商采用了Softing echocollect e多协议数据采集网关——从机器和设备中获取相关数据&#xff0c;并直接将数据存储在中央SQL数据库系统中用于分析处理&#xff0c;从而实现了持续监控和生产过程的改进。 一 背景 该企业…

Vue开发实例(四)Element-UI部分组件使用方法

Element-UI的使用 一、Icon图标的使用1、用 i 标签使用图标 二、用 el-button 使用图标1、使用type定义样式2、使用plain定义样式3、使用round定义样式4、使用circle定义样式5、带图标和文字的按钮6、按钮禁用7、文字按钮8、按钮组9、加载中 三、Link 文字链接1、基础用法2、禁…

springboot245科研项目验收管理系统

科研项目验收管理系统 摘 要 使用旧方法对科研项目信息进行系统化管理已经不再让人们信赖了&#xff0c;把现在的网络信息技术运用在科研项目信息的管理上面可以解决许多信息管理上面的难题&#xff0c;比如处理数据时间很长&#xff0c;数据存在错误不能及时纠正等问题。这次…

FreeROTS day2

总结DMA空闲中断接收数据的使用方法 首先要要选择串口然后配置串口的参数&#xff0c;配置MDA通道选择接受数据&#xff0c;配置空闲中断&#xff0c;定义一个数据接收的容器&#xff0c;启动MDA传输当串口收到数据时MDA将数据传输到容器中,MDA会一直检测是否有数据当有数据并…

数据结构界的终极幻神----树

目录 一.数的概念和分类 种类 二.重点概念 哈希树: 二叉树的线索化 什么是线索化 为什么要线索化 特殊的查找树 完全二叉树 三.手撕完全二叉树(堆) 重点讲解 向上搜索算法 向下搜索算法 一.数的概念和分类 树&#xff08;tree&#xff09;是包含 n(n≥0) [2] 个节…

使用lnmp环境部署laravel框架需要注意的点

1&#xff0c;上传项目文件后&#xff0c;需要chmod -R 777 storage授予文件权限&#xff0c;不然会报错file_put_contents(/): failed to open stream: Permission denied。 如果后面还是报错没有权限的话&#xff0c;就执行ps -ef |grep php查询php运行用户。然后执行chown …

express基础

express express介绍 官网传送门基于 Node.js 平台&#xff0c;快速、开放、极简的 Web 开发框架express特点 Web 应用 Express 是一个基于 Node.js 平台的极简、灵活的 web 应用开发框架&#xff0c;它提供一系列强大的特性&#xff0c;帮助你创建各种 Web 和移动设备应用。…

Mac 以SH脚本安装Arthas

SH脚本安装Aethas curl -L https://alibaba.github.io/arthas/install.sh | sh安装脚本说明 示例源文件&#xff1a; #! /bin/bash# temp file of as.sh TEMP_ARTHAS_FILE"./as.sh.$$"# target file of as.sh TARGET_ARTHAS_FILE"./as.sh"# update timeo…

什么是端点安全以及如何保护端点

什么是端点安全 端点是指可以接收信号的任何设备&#xff0c;是员工使用的一种计算设备&#xff0c;用于保存公司数据或可以访问 Internet。端点的几个示例包括&#xff1a;服务器、工作站&#xff08;台式机和笔记本电脑&#xff09;、移动设备、虚拟机、平板电脑、物联网、可…

ubuntu下使用MATLAB过程中的若干问题

ubuntu版本&#xff1a;Ubuntu 20.04 内核&#xff1a;Linux 5.15.0-97-generic MATALB版本&#xff1a;MATLAB R2022b 问题1&#xff1a;运行脚本时闪退 报错信息&#xff1a; Inconsistency detected by ld.so: ../elf/dl-tls.c: 517: _dl_allocate_tls_init: Assertion l…