大数据-玩转数据-Flink 海量数据实时去重

一、海量数据实时去重说明

借助redis的Set,需要频繁连接Redis,如果数据量过大, 对redis的内存也是一种压力;使用Flink的MapState,如果数据量过大, 状态后端最好选择 RocksDBStateBackend; 使用布隆过滤器,布隆过滤器可以大大减少存储的数据的数据量。

二、海里书实时去重为什么需要布隆过滤器

如果想判断一个元素是不是在一个集合里,一般想到的是将集合中所有元素保存起来,然后通过比较确定。链表、树、散列表(又叫哈希表,Hash table)等等数据结构都是这种思路。
但是随着集合中元素的增加,我们需要的存储空间越来越大。同时检索速度也越来越慢,上述三种结构的检索时间复杂度分别为。
布隆过滤器即可以解决存储空间的问题, 又可以解决时间复杂度的问题.
布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。

三、布隆过滤基本概念

布隆过滤器(Bloom Filter,下文简称BF)由Burton Howard Bloom在1970年提出,是一种空间效率高的概率型数据结构。它专门用来检测集合中是否存在特定的元素。
它实际上是一个很长的二进制向量和一系列随机映射函数。

实现原理
布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。
BF是由一个长度为m比特的位数组(bit array)与k个哈希函数(hash function)组成的数据结构。位数组均初始化为0,所有哈希函数都可以分别把输入数据尽量均匀地散列。
当要插入一个元素时,将其数据分别输入k个哈希函数,产生k个哈希值。以哈希值作为位数组中的下标,将所有k个对应的比特置为1。
当要查询(即判断是否存在)一个元素时,同样将其数据输入哈希函数,然后检查对应的k个比特。如果有任意一个比特为0,表明该元素一定不在集合中。如果所有比特均为1,表明该集合有(较大的)可能性在集合中。为什么不是一定在集合中呢?因为一个比特被置为1有可能会受到其他元素的影响(hash碰撞),这就是所谓“假阳性”(false positive)。相对地,“假阴性”(false negative)在BF中是绝不会出现的。
下图示出一个m=18, k=3的BF示例。集合中的x、y、z三个元素通过3个不同的哈希函数散列到位数组中。当查询元素w时,因为有一个比特为0,因此w不在该集合中。
在这里插入图片描述

优点
1.不需要存储数据本身,只用比特表示,因此空间占用相对于传统方式有巨大的优势,并且能够保密数据;
2.时间效率也较高,插入和查询的时间复杂度均为, 所以他的时间复杂度实际是
3.哈希函数之间相互独立,可以在硬件指令层面并行计算。
缺点
1.存在假阳性的概率,不适用于任何要求100%准确率的情境;
2.只能插入和查询元素,不能删除元素,这与产生假阳性的原因是相同的。我们可以简单地想到通过计数(即将一个比特扩展为计数值)来记录元素数,但仍然无法保证删除的元素一定在集合中。
使用场景
所以,BF在对查准度要求没有那么苛刻,而对时间、空间效率要求较高的场合非常合适.
另外,由于它不存在假阴性问题,所以用作“不存在”逻辑的处理时有奇效,比如可以用来作为缓存系统(如Redis)的缓冲,防止缓存穿透。
假阳性概率的计算
假阳性的概率其实就是一个不在的元素,被k个函数函数散列到的k个位置全部都是1的概率。可以按照如下的步骤进行计算: p = f(m,n,k)
其中各个字母的含义:
1.n :放入BF中的元素的总个数;
2.m:BF的总长度,也就是bit数组的个数
3.k:哈希函数的个数;
4.p:表示BF将一个不在其中的元素错判为在其中的概率,也就是false positive的概率;
A.BF中的任何一个bit在第一个元素的第一个hash函数执行完之后为 0的概率是:

B.BF中的任何一个bit在第一个元素的k个hash函数执行完之后为 0的概率是:

C.BF中的任何一个bit在所有的n元素都添加完之后为 0的概率是:

D.BF中的任何一个bit在所有的n元素都添加完之后为 1的概率是:

E.一个不存在的元素被k个hash函数映射后k个bit都是1的概率是:

结论:在哈数函数个数k一定的情况下
1.比特数组m长度越大, p越小, 表示假阳性率越低
2.已插入的元素个数n越大, p越大, 表示假阳性率越大
经过各种数学推导:
对于给定的m和n,使得假阳性率(误判率)最小的k通过如下公式定义:

四、使用布隆过滤器实现去重

Flink已经内置了布隆过滤器的实现(使用的是google的Guava)

package com.lyh.flink12;import com.atguigu.flink.java.chapter_6.UserBehavior;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;
import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;public class Flink02_UV_BoomFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建WatermarkStrategyWatermarkStrategy<UserBehavior> wms = WatermarkStrategy.<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {@Overridepublic long extractTimestamp(UserBehavior element, long recordTimestamp) {return element.getTimestamp() * 1000L;}});env.readTextFile("input/UserBehavior.csv").map(line -> { // 对数据切割, 然后封装到POJO中String[] split = line.split(",");return new UserBehavior(Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]), split[3], Long.valueOf(split[4]));}).filter(behavior -> "pv".equals(behavior.getBehavior())) //过滤出pv行为.assignTimestampsAndWatermarks(wms).keyBy(UserBehavior::getBehavior).window(TumblingEventTimeWindows.of(Time.minutes(60))).process(new ProcessWindowFunction<UserBehavior, String, String, TimeWindow>() {private ValueState<Long> countState;private ValueState<BloomFilter<Long>> bfState;@Overridepublic void open(Configuration parameters) throws Exception {countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("countState", Long.class));bfState = getRuntimeContext().getState(new ValueStateDescriptor<BloomFilter<Long>>("bfState", TypeInformation.of(new TypeHint<BloomFilter<Long>>() {})));}@Overridepublic void process(String key,Context context,Iterable<UserBehavior> elements, Collector<String> out) throws Exception {countState.update(0L);// 在状态中初始化一个布隆过滤器// 参数1: 漏斗, 存储的类型// 参数2: 期望插入的元素总个数// 参数3: 期望的误判率(假阳性率)BloomFilter<Long> bf = BloomFilter.create(Funnels.longFunnel(), 1000000, 0.001);bfState.update(bf);for (UserBehavior behavior : elements) {// 查布隆if (!bfState.value().mightContain(behavior.getUserId())) {// 不存在 计数+1countState.update(countState.value() + 1L);// 记录这个用户di, 表示来过bfState.value().put(behavior.getUserId());}}out.collect("窗口: " + context.window() + " 的uv是: " + countState.value());}}).print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/152315.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

企业门户的必备选择,WorkPlus的定制化解决方案

在当今数字化时代&#xff0c;企业门户成为了企业内外沟通与协作的重要基础设施。WorkPlus作为领先的品牌&#xff0c;为企业提供了一站式的企业门户解决方案&#xff0c;旨在提升企业形象、改善内外部沟通与协作效率。本文将深入探讨WorkPlus如何通过定制化的设计&#xff0c;…

使用运放产生各种波形

目录复制 文章目录 RC正弦振荡电路文氏电桥振荡电路移项式正弦波振荡电路 集成函数发生器运算放大器驱动电容性负载峰值检波多通道运放未使用的运放接法 RC正弦振荡电路 文氏电桥振荡电路 这个振荡器起振条件RF > 2R1,起振后又希望RF 2R1产生矛盾怎么办&#xff1f; 将RF换…

Zama的fhEVM:基于全同态加密实现的隐私智能合约

1. 引言 Zama的fhEVM定位为&#xff1a; 基于全同态加密实现的隐私智能合约 解决方案 开源代码见&#xff1a; https://github.com/zama-ai/fhevm&#xff08;TypeScript Solidity&#xff09; Zama的fhEVM协议中主要包含&#xff1a; https://github.com/zama-ai/tfhe-…

东土科技与诺贝尔物理学奖2006年度得主斯穆特签约,加快布局工业AI

近日&#xff0c;诺贝尔物理学奖2006年度得主乔治.斯穆特教授与东土科技正式签约&#xff0c;成为东土科技工业人工智能顾问。 乔治斯穆特&#xff08;George Fitzgerald Smoot&#xff09;教授也曾获得爱因斯坦奖&#xff0c;在宇宙学、大数据、生物医学诊断仪器以及人工智能…

Leetcode hot 100之前缀和、差分数组、位运算

目录 差分数组-区间增减 和为K的子数组&#xff1a;前缀和 哈希表优化 除自身以外数组的乘积&#xff1a;前后缀区间 位运算 异或&#xff1a;同为0&#xff0c;不同为1 136. 只出现一次的数字&#xff1a;除了某个元素只出现一次以外&#xff0c;其余每个元素均出现2次…

40V汽车级P沟道MOSFET SQ4401EY-T1_GE3 工作原理、特性参数、封装形式—节省PCB空间,更可靠

AEC-Q101车规认证是一种基于失效机制的分立半导体应用测试认证规范。它是为了确保在汽车领域使用的分立半导体器件能够在严苛的环境条件下正常运行和长期可靠性而制定的。AEC-Q101认证包括一系列的失效机制和应力测试&#xff0c;以验证器件在高温、湿度、振动等恶劣条件下的可…

面试经典 150 题 4 —(数组 / 字符串)— 80. 删除有序数组中的重复项 II

80. 删除有序数组中的重复项 II 方法一 class Solution { public:int removeDuplicates(vector<int>& nums) {int len 0;for(auto num : nums)if(len < 2 || nums[len-2] ! num)nums[len] num;return len;} };方法二 class Solution { public:int removeDupli…

【多线程进阶】线程安全的集合类

文章目录 前言1. 多线程环境使用 ArrayList2. 多线程环境使用队列3. 多线程环境使用哈希表3.1 HashTable3.2 ConcurrentHashMap 总结 前言 本文主要讲解 Java 线程安全的集合类, 在之前学习过的集合类中, 只有 Vector, Stack, HashTable, 是线程安全的, 因为在他们的关键方法中…

使用DNS查询Web服务器IP地址

浏览器并不具备访问网络的功能&#xff0c;其最终是通过操作系统实现的&#xff0c;委托操作系统访问服务器时提供的并不是浏览器里面输入的域名而是ip地址&#xff0c;因此第一步需要将域名转换为对应的ip地址 域名&#xff1a;www.baidu.com ip地址是一串数字 tcp/ip的网络结…

百面机器学习书刊纠错

百面机器学习书刊纠错 P243 LSTM内部结构图 2023-10-7 输入门的输出 和 candidate的输出 进行按元素乘积之后 要和 遗忘门*上一层的cell state之积进行相加。

开发者指南:如何集成一对一直播美颜SDK到你的应用中

本文将为开发者们提供一个详细的指南&#xff0c;教你如何将一对一直播美颜SDK集成到你的应用中&#xff0c;以提供更具吸引力的直播体验。 -为什么选择一对一直播美颜SDK&#xff1f; 在开始之前&#xff0c;让我们先明确一下为什么选择一对一直播美颜SDK是一个明智的决定。…

ueditor

下载文件 文档 UEditor入门部署 入门部署和体验 1.1 下载编辑器 到官网下载 UEditor 最新版&#xff1a;http://ueditor.baidu.com/website/download.html#ueditor 1.2 创建demo文件 解压下载的包&#xff0c;在解压后的目录创建 demo.html 文件&#xff0c;填入下面的…

Linux 安全 - LSM机制

文章目录 前言一、LSM起源二、LSM简介2.1 MAC2.2 LSM特征 三、Major and Minor LSMs3.1 Major LSMs3.2 Minor LSMs3.3 BPF LSM 四、LSM 框架五、LSM Capabilities Module六、LSM hooks 说明参考资料 前言 在这两篇文章中介绍了 Linux 安全机制 Credentials &#xff1a; Linu…

【LLM】主流大模型体验(文心一言 科大讯飞 字节豆包 百川 阿里通义千问 商汤商量)

note 智谱AI体验百度文心一言体验科大讯飞大模型体验字节豆包百川智能大模型阿里通义千问商汤商量简要分析&#xff1a;仅从测试“老婆饼为啥没有老婆”这个问题的结果来看&#xff0c;chatglm分点作答有条理&#xff08;但第三点略有逻辑问题&#xff09;&#xff1b;字节豆包…

云HIS信息管理系统源码 支持一体化电子病历四级

云HIS系统采用云端SaaS服务的方式提供&#xff0c;使用用户通过浏览器即能访问&#xff0c;无需关注系统的部署、维护、升级等问题&#xff0c;系统充分考虑了模板化、配置化、智能化、扩展化等设计方法&#xff0c;覆盖了基层医疗机构的主要工作流程&#xff0c;能够与监管系统…

MQ学习笔记

1.MQ基本概念 2.MQ优势 1.服务解耦 **降低服务间耦合性&#xff0c;提升可维护性及扩展性。** 如下图&#xff1a;订单系统发送数据给库存、支付、物流三个系统&#xff0c;但后期又需增加X系统&#xff0c;此时只需X系统自己从MQ获取信息即可&#xff0c;无需改动订单系统代…

LuaRadio介绍

介绍 LuaRadio是一个用于构建信号处理流程图的框架 在软件定义的无线电流图中&#xff0c;源和接收块倾向于实现某种I/O&#xff0c;如从SDR加密狗读取样本&#xff0c;或将样本写入IQ文件&#xff0c;而处理块倾向于计算&#xff0c;如滤波器和乘法器。 数据类型说明 LuaRadio…

高端品牌如何利用软文抓住顾客的心?

如今高端品市场价值巨大&#xff0c;但之前由于“口罩”影响和冲击&#xff0c;高端品牌的线上销售份额占比较少&#xff0c;同时得益于互联网和新媒体技术的发展&#xff0c;高端品的利润来源大多数是线上推广进行销售&#xff0c;而软文就是高端品常用的推广方式&#xff0c;…

【操作系统】聊聊不可中断进程和僵尸进程

当我们输入top命令之后 其中S代表的是当前进程的状态 R (Running 或 Runnable) 进程在CPU的就绪队列中&#xff0c;正在运行或者等待运行。D (Disk Sleep) 不可中断睡眠&#xff0c;进程正在跟硬件交互&#xff0c;不运行被其他进程或者中断打断。Z (Zombie) 进程已经结束&am…

Nosql redis高可用和持久化

Nosql redis高可用和持久化 1、redis高可用2、redis持久化2.1redis持久化2.2Redis 持久化方法2.3RDB 持久化2.3.1RDB持久化工作原理2.3.2触发条件2.3.3其他自动触发机制2.3.4执行流程2.3.5启动时加载 2.4AOF 持久化2.4.1AOF持久化原理2.4.2开启AOF2.4.3执行流程2.4.4文件重写的…