Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!

代码仓库

会同步代码到 GitHub
https://github.com/turbo-duck/flink-demo

在这里插入图片描述

当前章节

继续上一节的内容:https://blog.csdn.net/w776341482/article/details/139875037

上一节中,我们需要使用 nc 或者 telnet 等工具来模拟 Socket 流。这节我们写一个 ServerSocket 来模拟这些 操作,让流自动的写入不用我们手动去操作了。

POM.xml

与上一节一致,不需要修改

编写代码

还是和上一节一样的 Socket 流,这里略去其他的代码

DataStreamSource<String> textStream = streamExecutionEnvironment.socketTextStream(ip, port, "\n");

FlinkServer

继承 Thread 启动一个线程来进行Flink的服务

package icu.wzk.demo03;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class FlinkServer extends Thread {@Overridepublic void run() {String ip = "0.0.0.0";int port = 9999;StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> textStream = streamExecutionEnvironment.socketTextStream(ip, port, "\n");SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = textStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {String[] splits = s.split("\\s");for (String word : splits) {collector.collect(Tuple2.of(word, 1L));}}});SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator.keyBy(new KeySelector<Tuple2<String, Long>, Object>() {@Overridepublic Object getKey(Tuple2<String, Long> stringLongTuple2) throws Exception {return stringLongTuple2.f0;}}).window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1))).sum(1);System.out.println("wait word print()");word.print();try {streamExecutionEnvironment.execute("stream!");} catch (Exception e) {throw new RuntimeException(e);}}
}

NumRandom

使用 ServerSocket 实现一个持续的流输出

package icu.wzk.demo03;import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;public class RandomNumClient extends Thread {@Overridepublic void run() {String ip = "0.0.0.0";int port = 9999;try {ServerSocket serverSocket = new ServerSocket();InetSocketAddress address = new InetSocketAddress(ip, port);serverSocket.bind(address);Socket socket = serverSocket.accept();OutputStream output = socket.getOutputStream();PrintWriter writer = new PrintWriter(output, true);Random random = new Random();for (int i = 0; i < 500; i ++) {int randomNumber = random.nextInt(10) + 1;writer.println(randomNumber);System.out.println("ServerSocket Send To Flink: " + randomNumber);Thread.sleep(200);}} catch (Exception e) {throw new RuntimeException(e);}}}

StartApp

将上述的两个类组装起来

请添加图片描述

package icu.wzk.demo03;public class StartApp {public static void main(String[] args) throws Exception {RandomNumClient randomNumClient = new RandomNumClient();FlinkServer flinkServer = new FlinkServer();flinkServer.start();randomNumClient.start();}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/359754.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SAPUI5基础知识9 - JSON Module与数据绑定

1. 背景 在前面的博客中&#xff0c;我们已经学习了SAPUI5中视图和控制器的使用&#xff0c;在本篇博客中&#xff0c;让我们学习下MVC架构中的M-模型了。 SAPUI5中的JSON Model是一个客户端模型&#xff0c;可以用于在SAPUI5应用程序中处理和操作JSON数据。SAPUI5提供了绑定…

IPv6知识点整理

IPv6&#xff1a;是英文“Internet Protocol Version 6”&#xff08;互联网协议第6版&#xff09;的缩写&#xff0c;是互联网工程任务组&#xff08;IETF&#xff09;设计的用于替代IPv4的下一代IP协议&#xff0c;其地址数量号称可以为全世界的每一粒沙子编上一个地址 。 国…

React的生命周期函数详解

import React,{Component} from "react";import SonApp from ./sonAppclass App extends Component{state{hobby:爱吃很多好吃的}// 是否要更新数据&#xff0c;这里返回true才会更新数据shouldComponentUpdate(nextProps,nextState){console.log("app.js第一步…

创建和探索VGG16模型

PyTorch在torchvision库中提供了一组训练好的模型。这些模型大多数接受一个称为 pretrained 的参数&#xff0c;当这个参数为True 时&#xff0c;它会下载为ImageNet 分类问题调整好的权重。让我们看一下创建 VGG16模型的代码片段&#xff1a; from torchvision import models…

视图(views)

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 下面通过一个例子讲解在Django项目中定义视图&#xff0c;代码如下&#xff1a; from django.http import HttpResponse # 导入响应对象 impo…

Flutter【组件】点击类型表单项

简介 flutter 点击表单项组件&#xff0c;适合用户输入表单的场景。 点击表单项组件是一个用户界面元素&#xff0c;通常用于表单或设置界面中&#xff0c;以便用户可以点击它们来选择或更改某些设置或输入内容。这类组件通常由一个标签和一个可点击区域组成&#xff0c;并且…

Redis-数据类型-zset

文章目录 1、查看redis是否启动2、通过客户端连接redis3、切换到db4数据库4、将一个或多个member元素及其score值加入到有序集key当中5、升序返回有序集key6、升序返回有序集key&#xff0c;让分数一起和值返回的结果集7、降序返回有序集key&#xff0c;让分数一起和值返回到结…

Charles抓包工具系列文章(一)-- Compose 拼接http请求

一、背景 众所周知&#xff0c;Charles是一款抓包工具&#xff0c;当然是http协议&#xff0c;不支持tcp。&#xff08;如果你想要抓tcp包&#xff0c;请转而使用wireshark&#xff0c;在讲述websocket的相关技术有梳理过wireshark抓包&#xff09; 话说回来&#xff0c;char…

浏览器自带的IndexDB的简单使用示例--小型学生管理系统

浏览器自带的IndexDB的简单使用示例--小型学生管理系统 文章说明代码效果展示 文章说明 本文主要为了简单学习IndexDB数据库的使用&#xff0c;写了一个简单的增删改查功能 代码 App.vue&#xff08;界面的源码&#xff09; <template><div style"padding: 30px&…

红队内网攻防渗透:内网渗透之内网对抗:横向移动篇域控系统提权NetLogonADCSPACKDC永恒之蓝CVE漏洞

红队内网攻防渗透 1. 内网横向移动1.1 横向移动-域控提权-CVE-2020-1472 NetLogon1.2 横向移动-域控提权-CVE-2021-422871.3 横向移动-域控提权-CVE-2022-269231.4 横向移动-系统漏洞-CVE-2017-01461.5 横向移动-域控提权-CVE-2014-63241. 内网横向移动 1、横向移动-域控提权-…

elementui组件库实现电影选座面板demo

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Cinema Seat Selection</title><!-- 引入E…

【学一点儿前端】单页面点击前进或后退按钮导致的内存泄露问题(history.listen监听器清除)

今天测试分配了一个比较奇怪的问题&#xff0c;在单页面应用中&#xff0c;反复点击“上一步”和“下一步”按钮时&#xff0c;界面表现出逐渐变得卡顿。为分析这一问题&#xff0c;我用Chrome的性能监控工具进行了浏览器性能录制。结果显示&#xff0c;每次点击“上一步”按钮…

区间预测 | Matlab实现CNN-ABKDE卷积神经网络自适应带宽核密度估计多变量回归区间预测

区间预测 | Matlab实现CNN-ABKDE卷积神经网络自适应带宽核密度估计多变量回归区间预测 目录 区间预测 | Matlab实现CNN-ABKDE卷积神经网络自适应带宽核密度估计多变量回归区间预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.Matlab实现CNN-ABKDE卷积神经网络自适应…

思考题:相交的几何图形

给定不超过 26 个几何图形&#xff0c;每个图形都有一个唯一大写字母作为其编号。 每个图形在平面中的具体位置已知&#xff0c;请你判断&#xff0c;对于每个图形&#xff0c;有多少个其他图形与其存在交点。 在判断交点时&#xff0c;只考虑边与边相交的情况&#xff0c;如…

Java 8 Date and Time API

Java 8引入了新的日期和时间API&#xff0c;位于java.time包下&#xff0c;旨在替代旧的java.util.Date和java.util.Calendar类。新API更为简洁&#xff0c;易于使用&#xff0c;并且与Joda-Time库的一些理念相吻合。以下是Java 8 Date and Time API中几个核心类的简要概述&…

AIGC-CVPR2024best paper-Rich Human Feedback for Text-to-Image Generation-论文精读

Rich Human Feedback for Text-to-Image Generation斩获CVPR2024最佳论文&#xff01;受大模型中的RLHF技术启发&#xff0c;团队用人类反馈来改进Stable Diffusion等文生图模型。这项研究来自UCSD、谷歌等。 在本文中&#xff0c;作者通过标记不可信或与文本不对齐的图像区域&…

【网络协议】精讲ARP协议工作原理!图解超赞超详细!!!

亲爱的用户&#xff0c;打开微信&#xff0c;搜索公众号&#xff1a;“风云说通信”&#xff0c;即可免费阅读该文章~~ 目录 前言 1. ARP协议介绍 1.1 ARP协议功能 1.2 ARP请求报文 1.3 ARP工作原理 2. ARP 缓存超时 2.1 RARP 3. ARP 攻击 3.1 ARP 攻击分类 前言 首先…

HTML(16)——边距问题

清楚默认样式 很多标签都有默认的样式&#xff0c;往往我们不需要这些样式&#xff0c;就需要清楚默认样式 写法&#xff1a; 用通配符选择器&#xff0c;选择所有标签&#xff0c;清除所有内外边距选中所有的选择器清楚 *{ margin:0; padding:0; } 盒子模型——元素溢出 作…

超越AnimateAnyone, 华中科大中科大阿里提出Unimate,可以根据单张图片和姿势指导生成视频。

阿里新发布的UniAnimate&#xff0c;与 AnimateAnyone 非常相似&#xff0c;它可以根据单张图片和姿势指导生成视频。项目核心技术是统一视频扩散模型&#xff0c;通过将参考图像和估计视频内容嵌入到共享特征空间&#xff0c;实现外观和动作的同步。 相关链接 项目&#xff1…

Eclipse使用TFS(Team Foundation Server) 超详细

Eclipse使用TFS 1、什么是TFS2、TFS和Git的区别3、签出代码4、签入代码4.1、签出以进行编辑4.2、修改本地代码4.3、签入挂起的更改4.4、签入 如果不能 签入挂起的更改&#xff0c;则先 签出以进行编辑如果 签入挂起的更改不可选中&#xff0c;则 如下操作 1、什么是TFS Team F…