Flink双流Join

在离线 Hive 中,我们经常会使用 Join 进行多表关联。那么在实时中我们应该如何实现两条流的 Join 呢?Flink DataStream API 为我们提供了3个算子来实现双流 join,分别是:

  • join

  • coGroup

  • intervalJoin

下面我们分别详细看一下这3个算子是如何实现双流 Join 的。

1. Join

Joining | Apache Flink

Join 算子提供的语义为 “Window join”,即按照指定字段和(滚动/滑动/会话)窗口进行内连接(InnerJoin)。Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。

Join 可以支持处理时间和事件时间两种时间特征。

Join 通用用法如下:

stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)

Join 语义类似与离线 Hive 的 InnnerJoin (内连接),这意味着如果一个流中的元素在另一个流中没有相对应的元素,则不会输出该元素。

下面我们看一下 Join 算子在不同类型窗口上的具体表现。

1.1 滚动窗口Join

当在滚动窗口上进行 Join 时,所有有相同 Key 并且位于同一滚动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 或 FlatJoinFunction 进行处理。

如上图所示,我们定义了一个大小为 2 秒的滚动窗口,最终产生 [0,1],[2,3],… 这种形式的数据。上图显示了每个窗口中橘色流和绿色流的所有元素成对组合。需要注意的是,在滚动窗口 [6,7] 中,由于绿色流中不存在要与橘色流中元素 6、7 相关联的元素,因此该窗口不会输出任何内容。

下面我们一起看一下如何实现上图所示的滚动窗口 Join:

:::color3 可以通过两个socket流,将数据合并为一个三元组,key,value1,value2

代码演示:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
public class _ShuangLiuJoinDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 并行度不为1 ,效果很难出来,因为本地的并行度是16,只有16个并行度都触发才能看到效果env.setParallelism(1);//2. source-加载数据   key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> greenStream = env.socketTextStream("localhost", 8888).map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("绿色:"+ Arrays.toString(arr));return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("绿色的时间:"+timeStamp);System.out.println(element.f0);return timeStamp;}}));;// 以后这个9999少用,因为kafka占用这个端口  key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> orangeStream = env.socketTextStream("localhost", 7777).map(new MapFunction<String, Tuple3<String,Integer,String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("橘色:"+ Arrays.toString(arr));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("橘色的时间:"+timeStamp);return timeStamp;}}));//3. transformation-数据处理转换DataStream resultStream = greenStream.join(orangeStream).where(tup3 -> tup3.f0).equalTo(tup3 -> tup3.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> t1, Tuple3<String, Integer, String> t2) throws Exception {System.out.println(t1.f2);System.out.println(t2.f2);return Tuple3.of(t1.f0, t1.f1, t2.f1);}});//4. sink-数据输出resultStream.print();//5. execute-执行env.execute();}
}

总结非常重要:

1) 要想测试这个效果,需要将并行度设置为1

2)窗口中数据的打印是需要触发的,没有触发的数据,窗口内是不会进行计算的,所以记得输入触发的数据。

假如使用了EventTime 作为时间语义,不管是窗口开始和结束时间还是触发的条件,都跟系统时间没有关系,而跟输入的数据有关系,举例:

假如你的第一条数据是:key,0,2021-03-26 12:09:01 窗口的大小是5s,水印是3秒 ,窗口的开始时间为:

2021-03-26 12:09:00 结束时间是 2021-03-26 12:09:05 ,触发时间是2021-03-26 12:09:08

为什么呢? 水印时间 >= 结束时间

水印时间是:2021-03-26 12:09:08 - 3 = 2021-03-26 12:09:05 >=2021-03-26 12:09:05

:::

如上代码所示为绿色流和橘色流指定 BoundedOutOfOrdernessWatermarks Watermark 策略,设置100毫秒的最大可容忍的延迟时间,同时也会为流分配事件时间戳。假设输入流为 格式,两条流输入元素如下所示:

绿色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,4,2021-03-26 12:09:04
key,5,2021-03-26 12:09:05
key,8,2021-03-26 12:09:08
key,9,2021-03-26 12:09:09
key,11,2021-03-26 12:09:11
​
橘色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,6,2021-03-26 12:09:06
key,7,2021-03-26 12:09:07
key,11,2021-03-26 12:09:11
1.2 滑动窗口Join [解释一下即 ]

当在滑动窗口上进行 Join 时,所有有相同 Key 并且位于同一滑动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 进行处理。

如上图所示,我们定义了一个窗口大小为 2 秒、滑动步长为 1 秒的滑动窗口。需要注意的是,一个元素可能会落在不同的窗口中,因此会在不同窗口中发生关联,例如,绿色流中的0元素。当滑动窗口中一个流的元素在另一个流中没有相对应的元素,则不会输出该元素。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/482810.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue3学习宝典

1.ref函数调用的方式生成响应式数据&#xff0c;可以传复杂和简单数据类型 <script setup> // reactive接收一个对象类型的数据 import { reactive } from vue;// ref用函数调用的方式生成响应式数据&#xff0c;可以传复杂和简单数据类型 import { ref } from vue // 简…

C++(4个类型转换)

1. C语言中的类型转换 1. 隐式 类型转换&#xff1a; 具有相近的类型才能进行互相转换&#xff0c;如&#xff1a;int,char,double都表示数值。 2. 强制类型转换&#xff1a;能隐式类型转换就能强制类型转换&#xff0c;隐式类型之间的转换类型强相关&#xff0c;强制类型转换…

《DSL-FIQA》论文翻译

《DSL-FIQA: Assessing Facial Image Quality Via Dual-Set Degradation Learning and Landmark-Guided Transformer》 原文链接&#xff1a;DSL-FIQA: Assessing Facial Image Quality via Dual-Set Degradation Learning and Landmark-Guided Transformer | IEEE Conference…

mac终端自定义命令打开vscode

1.打开终端配置文件 open -e ~/.bash_profile终端安装了zsh&#xff0c;那么配置文件是.zshrc&#xff08;打开zsh配置&#xff0c;这里举&#x1f330;使用zsh&#xff09; sudo open -e ~/.zshrc 2.在zshrc配置文件中添加新的脚本&#xff08;这里的code就是快捷命令可以进…

vue基础之6:计算属性、姓名案例、简写计算属性

欢迎来到“雪碧聊技术”CSDN博客&#xff01; 在这里&#xff0c;您将踏入一个专注于Java开发技术的知识殿堂。无论您是Java编程的初学者&#xff0c;还是具有一定经验的开发者&#xff0c;相信我的博客都能为您提供宝贵的学习资源和实用技巧。作为您的技术向导&#xff0c;我将…

Qt桌面应用开发 第十天(综合项目二 翻金币)

目录 1.主场景搭建 1.1重载绘制事件&#xff0c;绘制背景图和标题图片 1.2设置窗口标题&#xff0c;大小&#xff0c;图片 1.3退出按钮对应关闭窗口&#xff0c;连接信号 2.开始按钮创建 2.1封装MyPushButton类 2.2加载按钮上的图片 3.开始按钮跳跃效果 3.1按钮向上跳…

【从零开始的LeetCode-算法】35. 搜索插入位置

给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 示例 1: 输入: nums [1,3,5,6], target 5 输出: 2示例 2: 输入: …

掌上单片机实验室 — RT - Thread+ROS2 浅尝(26)

前面化解了Micro_ROS通讯问题&#xff0c;并在 RT-Thread Studio 环境下&#xff0c;使用Micro_ROS软件包中的例程&#xff0c;实现了STM32F411CE核心板和ROS2主机的通讯。之后还尝试修改例程 micro_ros_sub_twist.c &#xff0c;实现了接收 turtle_teleop_key 所发出的 turtle…

展现运动类型

同样&#xff0c;我们通过函数的方式将运动类型插入我们的HTML代码中 _renderWorkout(workout) {let html <li class"workout workout-${workout.type}" data-id"${workout.id}"><h2 class"workout__title">${workout.description}…

vscode 怎么下载 vsix 文件?

参考&#xff1a;https://marketplace.visualstudio.com/items?itemNameMarsCode.marscode-extension 更好的办法&#xff1a;直接去相关插件的 github repo 下载老版本 https://github.com/VSCodeVim/Vim/releases?page5 或者&#xff0c;去 open-vsx.org 下载老版本 点击这…

python 练习题

目录 1&#xff0c;输入三个整数&#xff0c;按升序输出 2&#xff0c;输入年份及1-12月份&#xff0c;判断月份属于大月&#xff0c;小月&#xff0c;闰月&#xff0c;平月&#xff0c;并输出本月天数 3&#xff0c;输入一个整数&#xff0c;显示其所有是素数因子 4&#…

我的第一个创作纪念日 —— 梦开始的地方

前言 时光荏苒&#xff0c;转眼间&#xff0c;我已经在CSDN这片技术沃土上耕耘了365天 今天&#xff0c;我迎来了自己在CSDN的第1个创作纪念日&#xff0c;这个特殊的日子不仅是对我过去努力的肯定&#xff0c;更是对未来持续创作的激励 机缘 回想起初次接触CSDN&#xff0c;那…

Rook入门:打造云原生Ceph存储的全面学习路径(上)

文章目录 一.Rook简介二.Rook与Ceph架构2.1 Rook结构体系2.2 Rook包含组件2.3 Rook与kubernetes结合的架构图如下2.4 ceph特点2.5 ceph架构2.6 ceph组件 三.Rook部署Ceph集群3.1 部署条件3.2 获取rook最新版本3.3 rook资源文件目录结构3.4 部署Rook/CRD/Ceph集群3.5 查看rook部…

【Gitlab】CICD使用minio作为分布式缓存

1、安装minio 下载适合自己系统版本的安装文件https://dl.min.io/server/minio/release/windows-amd64/ yum install xxx.rpm 2、配置/etc/profile export MINIO_ACCESS_KEYroot [ui登录账号] export MINIO_SECRET_KEYminioDev001 [ui登录密码] export MINIO_OPTS"…

奇数求和ᅟᅠ

奇数求和 C语言代码C 代码Java代码Python代码 &#x1f490;The Begin&#x1f490;点点关注&#xff0c;收藏不迷路&#x1f490; 计算非负整数 m 到 n&#xff08;包括m 和 n &#xff09;之间的所有奇数的和&#xff0c;其中&#xff0c;m 不大于 n&#xff0c;且n 不大于30…

Django 视图层

from django.shortcuts import render, HttpResponse, redirectfrom django.http import JsonResponse1. render: 渲染模板 def index(request):print(reverse(index))return render(request, "index.html")return render(request, index.html, context{name: lisi})…

手机实时提取SIM卡打电话的信令声音-蓝牙电话如何适配eSIM卡的手机

手机实时提取SIM卡打电话的信令声音 --蓝牙电话如何适配eSIM卡的手机 一、前言 蓝牙电话的海外战略中&#xff0c;由于海外智能手机市场中政策的差异性&#xff0c;对内置eSIM卡的手机进行支持是非常合理的需求。Android系列手机中&#xff0c;无论是更换通信运营商&#xf…

python3 + selenium 中用PIL获取全屏幕截图

获取当前屏幕截图非常简单&#xff0c;需要import PIL.ImageGrab。调用grab函数即可得到Image对象&#xff0c;显示图片如图所示。 高版本的PIL中的grab函数还提供有一些参数。要查看当前PIL包的版本&#xff0c;可以import然后查看其__version__属性。 如果是较高版本的PIL…

SpringBoot3 + Vue3 由浅入深的交互 基础交互教学2

目录 一、这篇文章是基础交互教学系列的续作 二、发送请求时&#xff0c;携带发送的数据json格式的参数&#xff1a;data 三、携带token请求头&#xff0c;进行JWT校验 四、实现throw抛出异常&#xff0c;并交互显示在前端的界面 一、这篇文章是基础交互教学系列的续作 大…

UIE与ERNIE-Layout:智能视频问答任务初探

内容来自百度飞桨ai社区UIE与ERNIE-Layout&#xff1a;智能视频问答任务初探&#xff1a; 如有侵权&#xff0c;请联系删除 1 环境准备 In [2] # 安装依赖库 !pip install paddlenlp --upgrade !pip install paddleocr --upgrade !pip install paddlespeech --upgrade In …