flink学习(6)——自定义source和kafka

概述

SourceFunction:非并行数据源(并行度只能=1) --接口

RichSourceFunction:多功能非并行数据源(并行度只能=1) --类

ParallelSourceFunction:并行数据源(并行度能够>=1) --接口

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

——Rich 字样代表富有,在编程中,富有代表可以调用的方法很多,功能很全的意思。

 基础案例

package com.bigdata.day02;//1、SourceFunction
// public class ZidingyiSource implements SourceFunction<Student> {
//2、RichSourceFunction
// public class ZidingyiSource extends RichSourceFunction<Student> {
//3、ParallelSourceFunction
//public class ZidingyiSource implements ParallelSourceFunction<Student> {
//4、RichParallelSourceFunction
//public class ZidingyiSource extends RichParallelSourceFunction<Student> {
// 推荐的
public class ZidingyiSource extends RichParallelSourceFunction<Student> {// ctrl + oprivate final Random random = new Random();private boolean flag = true;// 现在不用@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("实现一些资源的开启");}// 现在不用@Overridepublic void close() throws Exception {System.out.println("实现一些资源的关闭");}@Overridepublic void run(SourceContext<Student> sourceContext) throws Exception {while (flag){String stu_id = UUID.randomUUID().toString();String stu_name = "Student_"+stu_id;int stu_age = random.nextInt(8)+10;long stu_timestamp = System.currentTimeMillis();Student student = new Student(stu_id,stu_name,stu_age,stu_timestamp);sourceContext.collect(student);Thread.sleep(1000);}}// 具体什么时候 会调用还不知道@Overridepublic void cancel() {flag = false;System.out.println("停止运行");}
}//调用
public class ZiDingYi {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// add + new DataStreamSource<Student> studentDataStreamSource = env.addSource(new ZidingyiSource());int parallelism = studentDataStreamSource.getParallelism();System.out.println(parallelism);// print之前与之后的并行度是不同的studentDataStreamSource.print().setParallelism(1);env.execute();}
}

cancel+open+close的调用时机

package com.bigdata.day02;import java.util.Objects;/*
* 1、这几个方法都会按照并行度调用多次 调度的次数 按照studentDataStreamSource的并行度
*
*/public class ZiDingYi {public static void main(String[] args) throws Exception {// 在上面案例的基础上实现StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Student> studentDataStreamSource = env.addSource(new ZidingyiSource());// 此时就只会调用一次了studentDataStreamSource.setParallelism(1);// 此时打印也会有多个并行度(8个cpu)studentDataStreamSource.print();// 异步调用 此时会调用open方法JobExecutionResult execute = env.execute();JobClient flink_job = env.executeAsync("Flink Job");Thread.sleep(3000);// 此时会调用 cancel 和 close flink_job.cancel();}
}

 kafkaSource

package com.bigdata.day02;import java.util.Properties;public class KafkaSource {public static void main(String[] args) throws Exception{//envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// properties Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");// consumerFlinkKafkaConsumer<String> consumer= new FlinkKafkaConsumer<String>("yhedu",new SimpleStringSchema(),properties);// sourceDataStreamSource<String> dataStreamSource = env.addSource(consumer);dataStreamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String s) throws Exception {return s.contains("success");}}).print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/479789.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

111. UE5 GAS RPG 实现角色技能和场景状态保存到存档

实现角色的技能存档保存和加载 首先&#xff0c;我们在LoadScreenSaveGame.h文件里&#xff0c;增加一个结构体&#xff0c;用于存储技能相关的所有信息 //存储技能的相关信息结构体 USTRUCT(BlueprintType) struct FSavedAbility {GENERATED_BODY()//需要存储的技能UPROPERT…

Js-对象-04-Array

重点关注&#xff1a;Array String JSON BOM DOM Array Array对象时用来定义数组的。常用语法格式有如下2种&#xff1a; 方式1&#xff1a; var 变量名 new Array(元素列表); 例如&#xff1a; var arr new Array(1,2,3,4); //1,2,3,4 是存储在数组中的数据&#xff0…

【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序

DataStream编程模型之 窗口的划分-时间概念-窗口计算程序 1. 窗口的划分 1.1 窗口分为&#xff1a;基于时间的窗口 和 基于数量的窗口 基于时间的窗口&#xff1a;基于起始时间戳 和终止时间戳来决定窗口的大小 基于数量的窗口&#xff1a;根据固定的数量定义窗口 的大小 这…

Java代码操作Zookeeper(使用 Apache Curator 库)

1. Zookeeper原生客户端库存在的缺点 复杂性高&#xff1a;原生客户端库提供了底层的 API&#xff0c;需要开发者手动处理很多细节&#xff0c;如连接管理、会话管理、异常处理等。这增加了开发的复杂性&#xff0c;容易出错。连接管理繁琐&#xff1a;使用原生客户端库时&…

linux系统下如何将xz及ISO\img等格式压缩包(系统)烧写到优盘(TF卡)

最近用树莓派做了个NAS&#xff0c;效果一般&#xff0c;缺少监控及UI等&#xff0c;详细见这篇文章&#xff1a; https://blog.csdn.net/bugsycrack/article/details/135344782?spm1001.2014.3001.5501 所以下载了专门的基于树莓派的NAS系统直接使用。这篇文章是顺便复习一…

带有悬浮窗功能的Android应用

android api29 gradle 8.9 要求 布局文件 (floating_window_layout.xml): 增加、删除、关闭按钮默认隐藏。使用“开始”按钮来控制这些按钮的显示和隐藏。 服务类 (FloatingWindowService.kt): 实现“开始”按钮的功能&#xff0c;点击时切换增加、删除、关闭按钮的可见性。处…

MD5算法加密笔记

MD5是常见的摘要算法。 摘要算法&#xff1a; 是指把任意⻓度的输⼊消息数据转化为固定⻓度的输出数据的⼀种密码算法. 摘要算法是 不可逆的, 也就是⽆法解密. 通常⽤来检验数据的完整性的重要技术, 即对数据进⾏哈希计算然后⽐ 较摘要值, 判断是否⼀致. 常⻅的摘要算法有: MD5…

C#变量和函数如何和unity组件绑定

1.Button On_click (1)GameObject通过Add component添加上Script (2)Button选GameObject组件而不是直接选Script,直接选Script出现不了Script中的函数 2.RawImage 上面是错的 3.Text 上面是错的&#xff0c;应该是直接在GameObject里面填上对应的值 总结&#xff1a; …

开源 AI 智能名片 2 + 1 链动模式 S2B2C 商城小程序源码助力品牌共建:价值、策略与实践

摘要&#xff1a;在当今数字化商业环境下&#xff0c;品牌构建已演变为企业与消费者深度共建的过程。本文聚焦于“开源 AI 智能名片 2 1 链动模式 S2B2C 商城小程序源码”&#xff0c;探讨其如何融入品牌建设&#xff0c;通过剖析品牌价值构成&#xff0c;阐述该技术工具在助力…

介绍一下atol(arr);(c基础)

hi , I am 36 适合对象c语言初学者 atol(arr)&#xff1b;是返回整数(long型)&#xff0c;整数是arr数组中字符中数字 格式 #include<stdio.h> atol(arr); 返回值arr数组中的数字 未改变arr数组 #include<stdio.h> //atol(arr); 返 <stdlib> int main…

数据结构C语言描述5(图文结合)--广义表讲解与实现

前言 这个专栏将会用纯C实现常用的数据结构和简单的算法&#xff1b;有C基础即可跟着学习&#xff0c;代码均可运行&#xff1b;准备考研的也可跟着写&#xff0c;个人感觉&#xff0c;如果时间充裕&#xff0c;手写一遍比看书、刷题管用很多&#xff0c;这也是本人采用纯C语言…

鸿蒙学习使用本地真机运行应用/元服务 (开发篇)

文章目录 1、前提条件2、使用USB连接方式3、使用无线调试连接方式4、运行 1、前提条件 在Phone和Tablet中运行HarmonyOS应用/元服务的操作方法一致&#xff0c;可以采用USB连接方式或者无线调试的连接方式。两种连接方式是互斥的&#xff0c;只能使用一种&#xff0c;无法同时…

48-基于单片机的LCD12864时间调控和串口抱站

目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 基于51单片机的公交报站系统&#xff0c;可以手动报站&#xff0c;站名十个。 在lcd12864上显示时间&#xff08;年月日时分秒&#xff09;和站名&#xff0c;时间可以设置&#xff0c; 仿真中可以…

【汽车制动】汽车制动相关控制系统

目录 1.ABS (Anti-lock Brake System&#xff0c;防抱死制动系统) 2.EBD&#xff08;Electronic Brake-force Distribution&#xff0c;电子制动力分配系统&#xff09; 3.TCS&#xff08;Traction Control System&#xff0c;牵引力控制系统&#xff09; 4.VDC&#xff08…

DDR3与MIG IP核详解(一)

一、ddr3(全称第三代双倍速率同步动态随机存储器)&#xff1a; 1、特点&#xff1a;1&#xff1a;掉电无法保存数据&#xff0c;需要周期性的刷新。2:时钟上升沿和下降沿都会传输数据。 3&#xff1a;突发传输&#xff0c;突发长度 Burst Length一般为…

【算法 python A*算法的实现】

- 算法实现&#xff1a; import heapqclass Node:def __init__(self, position, g0, h0):self.position position # 节点的位置self.g g # 从起点到当前节点的成本self.h h # 从当前节点到终点的启发式估计成本self.f g h # 总成本self.parent None # 父节点def __…

苹果系统中利用活动监视器来终止进程

前言 苹果系统使用的时候总是感觉不太顺手。特别是转圈的彩虹球出现的时候&#xff0c;就非常令人恼火。如何找到一个像Windows那样任务管理器来终止掉进程呢&#xff1f; 解决办法 Commandspace 弹出搜索框吗&#xff0c;如下图&#xff1a; 输入“活动”进行搜索&#xff…

深度学习—损失函数及BP算法初步学习Day36

损失函数 1.MAE MAE&#xff08;Mean Absolute Error&#xff0c;平均绝对误差&#xff09;通常也被称为 L1-Loss&#xff0c;通过对预测值和真实值之间的绝对差取平均值来衡量他们之间的差异。。 MAE的公式如下&#xff1a; MAE 1 n ∑ i 1 n ∣ y i − y ^ i ∣ \text{…

[UUCTF 2022 新生赛]ez_rce

[UUCTF 2022 新生赛]ez_rce 我们来分析一下这个代码&#xff1a; 首先是isset看我们有没有传一个为空的值&#xff0c;如果为空就输出居然都不输入参数&#xff0c;可恶!!!!!!!!!不为空就GET传参赋值给$code &#xff0c;接着 如果 $code 中不包含这些模式中的任何一个&#x…

Flutter:启动屏逻辑处理02:启动页

启动屏启动之后&#xff0c;制作一个启动页面 新建splash&#xff1a;view 视图中只有一张图片sliding.png就是我们的启动图 import package:flutter/material.dart; import package:get/get.dart; import index.dart; class SplashPage extends GetView<SplashController…