flink处理函数--副输出功能

背景

在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出

副输出

本文还是基于streaming-with-flink这本书的例子作为演示,它实现一个把温度低于32度的记录输出到副输出的功能,正常的记录还是从主输出中输出.代码如下:

package wikiedits.processfunc.job;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.OutputTag;import wikiedits.processfunc.pojo.SensorReading;
import wikiedits.processfunc.process.FreezingMonitor;
import wikiedits.processfunc.source.SensorSource;public class SideOutPutJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<SensorReading> readings = see.addSource(new SensorSource());SingleOutputStreamOperator<SensorReading> monitoredReadings = readings.process(new FreezingMonitor());// 打印附输出monitoredReadings.getSideOutput(new OutputTag<String>("freezing-alarms"){}).print();// 打印主输出monitoredReadings.print();see.execute();}
}package wikiedits.processfunc.process;import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import wikiedits.processfunc.pojo.SensorReading;public class FreezingMonitor extends ProcessFunction<SensorReading, SensorReading> {private OutputTag<String> freezingAlarmOutput = new OutputTag<String>("freezing-alarms") {};@Overridepublic void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {if (value.temperature < 32.0) {ctx.output(freezingAlarmOutput, "freezing alarm for " + value.id + " :" + value.temperature);}out.collect(value);}}
package wikiedits.processfunc.source;/** Copyright 2015 Fabian Hueske / Vasia Kalavri** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at**  http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import wikiedits.processfunc.pojo.SensorReading;import java.util.Calendar;
import java.util.Random;/*** Flink SourceFunction to generate SensorReadings with random temperature values.** Each parallel instance of the source simulates 10 sensors which emit one sensor reading every 100 ms.** Note: This is a simple data-generating source function that does not checkpoint its state.* In case of a failure, the source does not replay any data.*/
public class SensorSource extends RichParallelSourceFunction<SensorReading> {// flag indicating whether source is still runningprivate boolean running = true;/** run() continuously emits SensorReadings by emitting them through the SourceContext. */@Overridepublic void run(SourceContext<SensorReading> srcCtx) throws Exception {// initialize random number generatorRandom rand = new Random();// look up index of this parallel taskint taskIdx = this.getRuntimeContext().getIndexOfThisSubtask();// initialize sensor ids and temperaturesString[] sensorIds = new String[10];double[] curFTemp = new double[10];for (int i = 0; i < 10; i++) {sensorIds[i] = "sensor_" + (taskIdx * 10 + i);curFTemp[i] = 65 + (rand.nextGaussian() * 20);}while (running) {// get current timelong curTime = Calendar.getInstance().getTimeInMillis();// emit SensorReadingsfor (int i = 0; i < 10; i++) {// update current temperaturecurFTemp[i] += rand.nextGaussian() * 0.5;// emit readingsrcCtx.collect(new SensorReading(sensorIds[i], curTime, curFTemp[i]));}// wait for 100 msThread.sleep(3000);}}/** Cancels this SourceFunction. */@Overridepublic void cancel() {this.running = false;}
}

程序运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/149405.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c#设计模式-行为型模式 之 责任链模式

&#x1f680;简介 又名职责链模式&#xff0c;为了避免请求发送者与多个请求处理者耦合在一起&#xff0c;将所有请求的处理者通过前一对 象记住其下一个对象的引用而连成一条链&#xff1b;当有请求发生时&#xff0c;可将请求沿着这条链传递&#xff0c;直到有对象处理它为…

sigmoid和softmax函数有什么区别

Sigmoid函数和Softmax函数都是常用的激活函数&#xff0c;但它们的主要区别在于应用场景和输出结果的性质。 Sigmoid函数&#xff08;也称为 Logistic函数&#xff09;&#xff1a; Sigmoid函数将输入值映射到0到1之间的连续实数范围&#xff0c;通常用于二元分类问题。 Si…

重置Jetson设备的Ubuntu密码:通过挂载根目录到另一个Linux系统

在本文中&#xff0c;我们将介绍如何在忘记Ubuntu 20.04密码的情况下重置密码。我们将通过将Ubuntu的根目录挂载到另一个Linux系统来实现这一目的。我们还将介绍chroot命令的功能。 1. 背景 最近&#xff0c;我们研发团队遇到了一个棘手的问题。一台用于研发&#xff0c;多人使…

小黑上午参加婚礼,下午去姥姥家帮着看店学习人情世故的leetcode之旅:213. 打家劫舍 II

动态规划 class Solution:def rob(self, nums: List[int]) -> int:# 数组长度n len(nums)if n 1:return nums[0]# 打家劫舍1中的函数def rob1(start, end):if start end:return nums[start]# 动态规划指针first 0second nums[start]third nums[start]# 动态规划操作f…

【C++】:类和对象(2)

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家解读一下有关Linux的基础知识点&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入门到精通 数…

信息学奥赛一本通-编程启蒙3330:【例56.1】 和为给定数

3330&#xff1a;【例56.1】 和为给定数 时间限制: 1000 ms 内存限制: 65536 KB 提交数: 625 通过数: 245 【题目描述】 现给出若干个整数&#xff0c;询问其中是否有一对数的和等于给定的数。 【输入】 共三行&#xff1a; 第一行是整数nn(0<n≤100,000)&…

Python小技巧:快速合并字典dict()

文章目录 前言知识点字典合并1. dict.update()基础合并2. 字典推导式 update() 后话 前言 这里是Python小技巧的系列文章。这是第四篇&#xff0c;快速合并字典。 在Python的使用中&#xff0c;有时候需要将两个 dict(字典) 进行合并。 通常我们会借助 dict(字典) 的内置方法 …

Python之元组

Python之元组 元组tuple 一个有序的元素组成的集合使用小括号 ( ) 表示元组是不可变对象 tuple(), (), type(()) # 空元组 ((), (), tuple)(1,), (1) # 元组中只有1必须加逗号&#xff0c;否则就是1了 # ((1,), 1)x 1, 2 # 以逗号分隔的内容会形成元组&#xff0c;封装元组x…

ubuntu配置vscode c++环境

下载vscode deb安装包 Get Started with C on Linux in Visual Studio Code 1. 安装vscode sudo dpkg -i code_1.83.0-1696350811_amd64.deb 2. 确保gcc编译器已经安装 g --version 如果没有安装&#xff0c;执行以下命令安装 sudo apt-get update sudo apt-get install …

博弈论——议价博弈(Bargaining)

议价博弈(Bargaining) 0 引言 议价(bargaining) 是市场经济中最常见的事情,也是博弈论最早研究的问题。这里介绍一种议价的动态博弈模型。同样地&#xff0c;对于动态博弈模型&#xff0c;我们还是用常见的逆推归纳法去寻找该博弈的子博弈完美纳什均衡。 1 议价博弈 议价博弈…

数据在内存中的存储(1)

文章目录 目录1. 数据类型介绍1.1 类型的基本归类 2. 整形在内存中的存储2.1 原码、反码、补码2.2 大小端介绍2.3 练习 附&#xff1a; 目录 数据类型介绍整形在内存中的存储大小端字节序介绍及判断浮点型在内存中的存储 1. 数据类型介绍 前面我们已经学习了基本的内置类型以…

2023 年 Bitget Wallet 测评

对Bitget Wallet钱包的看法 Bitget Wallet在安全性、产品实力和使用体验方面可与Metamask媲美&#xff0c;甚至有所超越&#xff0c;唯一稍显不足的是知名度稍逊一筹。在众多钱包中&#xff0c;Bitget Wallet是拥有最全面的钱包之一&#xff0c;尤其适合那些希望一步到位&…

【数据结构】初探时间与空间复杂度:算法评估与优化的基础

&#x1f6a9;纸上得来终觉浅&#xff0c; 绝知此事要躬行。 &#x1f31f;主页&#xff1a;June-Frost &#x1f680;专栏&#xff1a;数据结构 &#x1f525;该文章主要了解算法的时间复杂度与空间复杂度等相关知识。 目录&#xff1a; &#x1f30f; 时间复杂度&#x1f52d…

数据结构--》数组和广义表:从基础到应用的全面剖析

数据结构为我们提供了组织和处理数据的基本工具。而在这个广袤的数据结构领域中&#xff0c;数组和广义表是两个不可或缺的重要概念。它们作为线性结构的代表&#xff0c;在算法与应用中扮演着重要的角色。 无论你是初学者还是进阶者&#xff0c;本文将为你提供简单易懂、实用可…

Linux 安全 - SUID机制

文章目录 一、文件权限位二、SUID简介 一、文件权限位 &#xff08;1&#xff09; $ ls -l text.txt -rw-rw-r-- 1 yl yl 0 Sep 28 16:25 text.txt其中第一个字段-rw-rw-r–&#xff0c;我们可以把它分为四部分看&#xff1a; -rw-rw-r--&#xff08;1&#xff09;- &a…

第二课 前缀和、差分、双指针扫描

文章目录 第二课 前缀和、差分、双指针扫描lc1.两数之和--简单题目描述代码展示 lc11.盛最多水的容器--中等题目描述代码展示 lc15.三数之和--中等题目描述代码展示 lc42.接雨水--困难题目描述代码展示 lc53.最大子数组和--中等题目描述代码展示 第二课 前缀和、差分、双指针扫…

小样本学习——匹配网络

目录 匹配网络 &#xff08;1&#xff09;简单介绍&#xff1a; &#xff08;2&#xff09;专业术语 &#xff08;3&#xff09;主要思想 &#xff08;4&#xff09;训练过程 问题 回答 MANN 匹配网络 &#xff08;1&#xff09;简单介绍&#xff1a; Matching netwo…

Docker 配置基础优化

Author&#xff1a;rab 为什么要优化&#xff1f; 你有没有发现&#xff0c;Docker 作为线上环境使用时&#xff0c;Docker 日志驱动程序的日志、存储驱动数据都比较大&#xff08;尤其是在你容器需要增删比较频繁的时候&#xff09;&#xff0c;动不动就好几百 G 的大小&…

一个.NET开发的开源跨平台二维码生成库

虽然已经有很多生成二维码的解决方案&#xff0c;但是它们大多依赖System.Drawing&#xff0c;而.NET 6开始&#xff0c;使用System.Drawing操作图片&#xff0c;在生成解决方案或打包时&#xff0c;会收到一条警告&#xff0c;大致意思是System.Drawing仅在 ‘windows’ 上受支…

凉鞋的 Godot 笔记 106. 第二轮循环2D 场景视图Label

从这一篇开始&#xff0c;我们开始进行第二轮循环。 这次我们至少能够在游戏运行窗口能看到一些东西。 首先还是在场景窗口进行编辑&#xff0c;先创建一个节点: 在弹出的窗口&#xff0c;我们找到 Control/Label &#xff0c;如下所示: 点击创建&#xff0c;然后我们在 2D 的…