【数据去重】海量数据实时去重方案

文章目录

  • Prologue
  • 布隆过滤器去重
    • 什么是布隆过滤器
    • 实现的核心思想
    • 怎么理解
  • 内嵌RocksDB状态后端去重
  • 引入外部K-V存储去重

Prologue

数据去重(data deduplication)是我们大数据攻城狮司空见惯的问题了。除了统计UV等传统用法之外,去重的意义更在于消除不可靠数据源产生的脏数据——即重复上报数据或重复投递数据的影响,使流式计算产生的结果更加准确。本文以Flink处理日均亿级别及以上的日志数据为背景,讨论除了朴素方法(HashSet)之外的三种实时去重方案,即:布隆过滤器、RocksDB状态后端、外部存储。

布隆过滤器去重

什么是布隆过滤器

布隆过滤器(Bloom Filter)是一种空间效率非常高的随机数据结构,它利用位数组(BitSet)表示一个集合,并通过一定数量的哈希函数将元素映射为位数组中的位置,用于检查一个元素是否属于这个集合。

实现的核心思想

对于一个元素,通过多个哈希函数生成多个哈希值,将对应的位在位数组中设为 1,若多个哈希值对应的位都为 1,则认为该元素可能在集合中;若至少有一个哈希值对应的位为 0,则该元素一定不在集合中。这种方法可以在较小的空间中实现高效的查找,但可能存在误判率(false positive)。

怎么理解

一个典型的布隆过滤器包含三个参数: 位数组的大小(即存储元素的个数); 哈希函数的个数; 填充因子(即误判率),即将元素数量与位数组大小的比值。
在这里插入图片描述

以之前用过的子订单日志模型为例,假设上游数据源产生的消息为<Integer, Long, String>三元组,三个元素分别代表站点ID、子订单ID和数据载荷。由于数据源只能保证at least once语义(例如未开启correlation ID机制的RabbitMQ队列),会重复投递子订单数据,导致下游各统计结果偏高。现引入Guava的BloomFilter来去重,直接上代码说事。

 // dimensionedStream是个DataStream<Tuple3<Integer, Long, String>>DataStream<String> dedupStream = dimensionedStream.keyBy(0).process(new SubOrderDeduplicateProcessFunc(), TypeInformation.of(String.class)).name("process_sub_order_dedup").uid("process_sub_order_dedup");// 去重用的ProcessFunctionpublic static final class SubOrderDeduplicateProcessFuncextends KeyedProcessFunction<Tuple, Tuple3<Integer, Long, String>, String> {private static final long serialVersionUID = 1L;private static final Logger LOGGER = LoggerFactory.getLogger(SubOrderDeduplicateProcessFunc.class);private static final int BF_CARDINAL_THRESHOLD = 1000000;private static final double BF_FALSE_POSITIVE_RATE = 0.01;private volatile BloomFilter<Long> subOrderFilter;@Overridepublic void open(Configuration parameters) throws Exception {long s = System.currentTimeMillis();subOrderFilter = BloomFilter.create(Funnels.longFunnel(), BF_CARDINAL_THRESHOLD, BF_FALSE_POSITIVE_RATE);long e = System.currentTimeMillis();LOGGER.info("Created Guava BloomFilter, time cost: " + (e - s));}@Overridepublic void processElement(Tuple3<Integer, Long, String> value, Context ctx, Collector<String> out) throws Exception {long subOrderId = value.f1;if (!subOrderFilter.mightContain(subOrderId)) {subOrderFilter.put(subOrderId);out.collect(value.f2);}ctx.timerService().registerProcessingTimeTimer(UnixTimeUtil.tomorrowZeroTimestampMs(System.currentTimeMillis(), 8) + 1);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {long s = System.currentTimeMillis();subOrderFilter = BloomFilter.create(Funnels.longFunnel(), BF_CARDINAL_THRESHOLD, BF_FALSE_POSITIVE_RATE);long e = System.currentTimeMillis();LOGGER.info("Timer triggered & resetted Guava BloomFilter, time cost: " + (e - s));}@Overridepublic void close() throws Exception {subOrderFilter = null;}}// 根据当前时间戳获取第二天0时0分0秒的时间戳public static long tomorrowZeroTimestampMs(long now, int timeZone) {return now - (now + timeZone * 3600000) % 86400000 + 86400000;}

这里先按照站点ID为key分组,然后在每个分组内创建存储子订单ID的布隆过滤器。布隆过滤器的期望最大数据量应该按每天产生子订单最多的那个站点来设置,这里设为100万,并且可容忍的误判率为1%。根据上面科普文中的讲解,单个布隆过滤器需要8个哈希函数,其位图占用内存约114MB,压力不大。

每当一条数据进入时,调用BloomFilter.mightContain()方法判断对应的子订单ID是否已出现过。当没出现过时,调用put()方法将其插入BloomFilter,并交给Collector输出。

另外,通过注册第二天凌晨0时0分0秒的processing time计时器,就可以在onTimer()方法内重置布隆过滤器,开始新一天的去重。

(吐槽一句,Guava的BloomFilter竟然没有提供清零的方法,有点诡异)

内嵌RocksDB状态后端去重

布隆过滤器虽然香,但是它不能做到100%精确。在必须保证万无一失的场合,我们可以选择Flink自带的RocksDB状态后端,这样不需要依赖其他的组件。之前已经讲过,RocksDB本身是一个类似于HBase的嵌入式K-V数据库,并且它的本地性比较好,用它维护一个较大的状态集合并不是什么难事。

首先我们要开启RocksDB状态后端(平常在生产环境中,也建议总是使用它),并配置好相应的参数。这些参数同样可以在flink-conf.yaml里写入。

RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(Consts.STATE_BACKEND_PATH, true);
rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);
rocksDBStateBackend.setNumberOfTransferingThreads(2);
rocksDBStateBackend.enableTtlCompactionFilter();env.setStateBackend(rocksDBStateBackend);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(5 * 60 * 1000);

RocksDB的调优是个很复杂的话题,详情参见官方提供的tuning guide,以及Flink配置中与RocksDB相关的参数,今后会挑时间重点分析一下RocksDB存储大状态时的调优方法。好在Flink已经为我们提供了一些预调优的参数,即PredefinedOptions,请务必根据服务器的实际情况选择。我们的Flink集群统一采用SSD做存储,故选择的是PredefinedOptions.FLASH_SSD_OPTIMIZED。

另外,由于状态空间不小,打开增量检查点以及设定多线程读写RocksDB,可以提高checkpointing效率,检查点周期也不能太短。还有,为了避免状态无限增长下去,我们仍然得定期清理它(即如同上节中布隆过滤器的复位)。当然,除了自己注册定时器之外,我们也可以利用Flink提供的状态TTL机制,并打开RocksDB状态后端的TTL compaction filter,让它们在RocksDB后台执行compaction操作时自动删除。特别注意,状态TTL仅对时间特征为处理时间时生效,对事件时间是无效的。

接下来写具体的业务代码,以上节的<站点ID, 子订单ID, 消息载荷>三元组为例,有两种可实现的思路:

仍然按站点ID分组,用存储子订单ID的MapState(当做Set来使用)保存状态;
直接按子订单ID分组,用单值的ValueState保存状态。
显然,如果我们要用状态TTL控制过期的话,第二种思路更好,因为粒度更细。代码如下。

  // dimensionedStream是个DataStream<Tuple3<Integer, Long, String>>DataStream<String> dedupStream = dimensionedStream.keyBy(1).process(new SubOrderDeduplicateProcessFunc(), TypeInformation.of(String.class)).name("process_sub_order_dedup").uid("process_sub_order_dedup");// 去重用的ProcessFunctionpublic static final class SubOrderDeduplicateProcessFuncextends KeyedProcessFunction<Tuple, Tuple3<Integer, Long, String>, String> {private static final long serialVersionUID = 1L;private static final Logger LOGGER = LoggerFactory.getLogger(SubOrderDeduplicateProcessFunc.class);private ValueState<Boolean> existState;@Overridepublic void open(Configuration parameters) throws Exception {StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1)).setStateVisibility(StateVisibility.NeverReturnExpired).setUpdateType(UpdateType.OnCreateAndWrite).cleanupInRocksdbCompactFilter(10000).build();ValueStateDescriptor<Boolean> existStateDesc = new ValueStateDescriptor<>("suborder-dedup-state",Boolean.class);existStateDesc.enableTimeToLive(stateTtlConfig);existState = this.getRuntimeContext().getState(existStateDesc);}@Overridepublic void processElement(Tuple3<Integer, Long, String> value, Context ctx, Collector<String> out) throws Exception {if (existState.value() == null) {existState.update(true);out.collect(value.f2);}}}

上述代码中设定了状态TTL的相关参数:

  • 过期时间设为1天;
  • 在状态值被创建和被更新时重设TTL;
  • 已经过期的数据不能再被访问到;
  • 在每处理10000条状态记录之后,更新检测过期的时间戳。这个参数要小心设定,更新太频繁会降低compaction的性能,更新过慢会使得compaction不及时,状态空间膨胀。

在实际处理数据时,如果数据的key(即子订单ID)对应的状态不存在,说明它没有出现过,可以更新状态并输出。反之,说明它已经出现过了,直接丢弃,so easy。

最后还需要注意一点,若数据的key占用的空间比较大(如长度可能会很长的字符串类型),也会造成状态膨胀。我们可以将它hash成整型再存储,这样每个key就最多只占用8个字节了。不过任何哈希算法都无法保证不产生冲突,所以还是得根据业务场景自行决定。

引入外部K-V存储去重

如果既不想用布隆过滤器,也不想在Flink作业内维护巨大的状态,就只能用折衷方案了:利用外部K-V数据库(Redis、HBase之类)存储需要去重的键。由于外部存储对内存和磁盘占用同样敏感,所以也得设定相应的TTL,以及对大的键进行压缩。另外,外部K-V存储毕竟是独立于Flink框架之外的,一旦作业出现问题重启,外部存储是不会与作业的checkpoint同步恢复到一致的状态的,也就是说结果仍然会出现偏差,需要注意。

鉴于这种方案对第三方组件有强依赖,要关心的东西太多,所以一般情况下是不用的,我们也没有实操过,所以抱歉没有代码了。

The End
如果有其他更高效的解决方法,欢迎批评指正哈。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/21630.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

listagg结果去重

最近在一个项目中用到了listagg方法&#xff0c;但是在组合结果中出现有重复的情况。默认的结果如下 于是我就写了一个方法对listagg的结果去重&#xff0c;也可以对该格式的字符串去重&#xff0c;方法如下 create or replace function listaggpure(targetStr varchar2,seper…

非常实用的5种json数组去重方法,函数实现思路竟是chatgpt帮我写的!

文章目录 人工智能福利文章方法一 reduce实现思路 方法二 filter和Map实现思路 方法三 变异for实现思路 方法四 for filter实现思路 方法五 for实现思路 写在最后 人工智能福利文章 【分享几个国内免费可用的ChatGPT镜像】【10几个类ChatGPT国内AI大模型】【用《文心一言》1分…

AI自动播——AI虚拟主播帮你实现24小时直播带货技术分享

如今很多淘宝天猫商家会在做直播的时候&#xff0c;会遇到真人主播难招或者真人主播直播时长有限&#xff0c;那么有什么办法解决这个难题呢&#xff0c;这个时候就可以用AI自动播了&#xff0c;也就是AI虚拟主播直播带货。具体怎么操作呢&#xff1f;下面一起来看看吧。 AI自动…

亚马逊账号被关联能申诉得回来吗

关于亚马逊账号被关联能申诉回来吗&#xff1f;小编有话要说。 其实小编是一个特别谨慎的人&#xff0c;在初入亚马逊之前&#xff0c;会了解到亚马逊所有的问题之后&#xff0c;才会注册账号。那么要了解的有什么呢&#xff1f;注册需要的信息&#xff0c;注册的规则&#xf…

社区1月热门文章、ChatGPT工具汇总,强化学习安全和隐私、斯坦福大学CS234《智源社区强化学习周刊》第75期...

No.75 智源社区 强化学习组 强 化 学 习 研究 观点 资源 活动 社区热门文章 Google发布Bard与ChatGPT竞争 https://hub.baai.ac.cn/view/23925 纽约时报&#xff1a;ChatGPT诞生背后的故事 https://hub.baai.ac.cn/view/23835 ChatGPT提示与AI工具、开发工具、低代码工具汇总 《…

5款超实用的免费AI工具,让你轻松提升工作效率!

目录 1、AiChat 产品特点 任意选择内置角色对话 自定义AI角色进行对话 推荐理由&#x1f31f;&#x1f31f;&#x1f31f;&#x1f31f;&#x1f31f; 2、MINISTER AI 产品特点 集成ChatGPT聊天 简化Stable Diffusion 集成Midjourney模式 训练并分享模型 推荐理由&a…

13 款炫酷的 MySQL 可视化管理工具!好用到爆!!

MySQL 的管理维护工具非常多&#xff0c;除了系统自带的命令行管理工具之外&#xff0c;还有许多其他的图形化管理工具&#xff0c;工具好用是一方面&#xff0c;个人的使用习惯也很重要&#xff0c;这里介绍 13 款 MySQL 图形化管理工具&#xff0c;供大家参考。 1、DBeaver D…

GPT模型支持下的Python-GEE遥感云大数据分析、管理与可视化技术及多领域案例应用

查看原文>>>GPT模型支持下的Python-GEE遥感云大数据分析、管理与可视化技术及多领域案例应用 目录 第一章、理论基础 第二章、开发环境搭建 第三章、遥感大数据处理基础与ChatGPT等AI模型交互 第四章、典型案例操作实践 第五章、输入输出及数据资产高效管理 第…

【IC基础】集成电路设计领域术语缩写及名词解释(字母索引版)

前言&#xff1a; 笔者在大三上学习学习《SoC设计导论》时整理的有关集成电路设计领域的常见有英文缩写和对应的名词解释&#xff0c;文中标注的页码均出自《SoC设计方法与实现》这本参考书&#xff1a; 目录 目录 前言&#xff1a; 目录 A B C CTG(Clock Tree Generat…

浏览器扩展合集来啦!实用而有趣的浏览器扩展插件!

五彩&#xff1a;网页划线、批注、高亮工具 五彩严格意义上的同类产品是 Hypothesis、diigo 等工具&#xff0c;Cubox、Readwise、简悦则属于需要先解析一下网页后高亮的插件。 此前我已经在用 Cubox 了&#xff0c;为什么还需要五彩来完成网页高亮&#xff1f; Cubox 在移动…

GPT模型结合Python-GEE遥感云大数据分析、管理与可视化技术

GPT模型支持下的Python-GEE遥感云大数据分析、管理与可视化技术及多领域案例应用 随着航空、航天、近地空间等多个遥感平台的不断发展&#xff0c;近年来遥感技术突飞猛进。由此&#xff0c;遥感数据的空间、时间、光谱分辨率不断提高&#xff0c;数据量也大幅增长&#xff0c…

分享6个AI绘画网站

ChatGPT狂飙160天&#xff0c;世界已经不是之前的样子。 新建了人工智能中文站https://ai.weoknow.com 每天给大家更新可用的国内可用chatGPT资源 1、Midjourney 特点&#xff1a;业内标杆&#xff0c;效果最强大 Midjourney是基于diffusion的AI图画艺术生成器。生成图片不局…

又又又发现了一个 AI 插件神器 TeamSmart

简介 TeamSmart AI 是最近比较火的 Chrome 插件&#xff0c;它是基于 ChatGPT 集成的 AI 助手团队工具 对&#xff0c;没错&#xff0c;是一个团队。这个团队里面有许多不同角色的成员&#xff0c;每隔成员都有自己的专业领域&#xff0c;比如商业、市场营销、灵魂写手、程序…

忘记了谷歌Gmail账号名怎么办?用这种方法轻松找回谷歌邮箱地址

有些朋友以前注册过谷歌邮箱&#xff0c;但很久很久没有再去使用。 现在注册ChatGPT需要谷歌邮箱&#xff0c;于是打算把尘封已久的谷歌邮箱找出来&#xff0c;可是这时候你才发现&#xff0c;谷歌邮箱的账号名早已忘掉了。 今天重点来说说如何找回谷歌账号&#xff0c;希望能够…

忘记Gmail谷歌账号密码或者密码错误怎么办?用这种方法轻松搞定

有些朋友以前注册过谷歌邮箱&#xff0c;但很久很久没有再去使用。现在注册ChatGPT需要谷歌邮箱&#xff0c;于是打算把尘封已久的谷歌邮箱找出来&#xff0c;可是这时候你突然发现&#xff0c;谷歌邮箱的密码忘了。 今天重点来说说如何找回谷歌账号的密码&#xff0c;希望能够…

HTML知识点大合集

如标题所示哈&#xff0c;最近无聊&#xff0c;花了几十分钟整理一下之前HTML学过的知识点&#xff0c;写下了这个大合集&#xff0c;能力有限可能还有所不足&#xff0c;不过用来巩固基础&#xff0c;学习是够用的。 HTML知识点合集 一.介绍二.开发工具推荐三.代码实战学习3.1…

史上最详细的使用Claude和接入Claude-api教程

是什么&#xff08;What&#xff09; Claude 是最近新开放的一款 AI 聊天机器人&#xff0c;是世界上最大的语言模型之一&#xff0c;比之前的一些模型如 GPT-3 要强大得多&#xff0c;因此 Claude 被认为是 ChatGPT 最有力的竞争对手。Claude 的研发公司是专注人工智能安全和研…

BFT 最前线 | ChatGPT登顶App Store;国产中文大语言模型「天河天元」发布;华为招募天才少年;阿里分拆上市

原创 | 文 BFT机器人 AI视界 TECHNOLOGY NEWS 01 ChatGPT上架App Store登顶榜首 OpenAI&#xff1a;很快也将出现在安卓上 近日&#xff0c;ChatGPT正式发布App版本&#xff0c;上架APP Store&#xff0c;支持iPhone和iPad设备。OpenAI表示&#xff0c;ChatGPT iOS APP可免费…

对ChatGPT、阿尔法零等人工智能开发的简评与展望

1.ChatGPT的出现再次说明&#xff0c;基于人工神经网络等技术所进行的模拟的确可以比较好的模拟人脑的思维学习过程和思维结构&#xff0c;虽然其在总体上可以算是一个黑箱或灰箱模型&#xff0c;但是从目前的表现来看&#xff0c;有许多方面都与人脑的表现非常相似&#xff1a…

国产大模型狂飙,谁能率先做出第一个中国版GPT

热火烹油的大模型赛道打起了“嘴仗”。 搜狗前CEO王小川评价百度创始人李彦宏的采访发言称&#xff1a;“你们采访的可能是平行世界的他&#xff0c;不是我们这个世界里的。” 而针对王小川的评论&#xff0c;百度集团副总裁、搜索平台负责人肖阳又回应道&#xff1a;“王小…