flink入门

1.安装flink,启动flink

文档地址:Apache Flink 1.3-SNAPSHOT 中文文档: Apache Flink 中文文档

代码:GitHub - apache/flink: Apache Flink

2. 打开端口  端口号, 启动jar

### 切换到flink 目录bin下
[root@localhost ~]# cd /home/flink/flink-1.14.4/bin/
### 运行
[root@localhost bin]# ./start-cluster.sh###开启端口9000
nc -l  9000
#### 运行jar./bin/flink  run  /home/flink/flink-1.14.4/examples/streaming/SocketWindowWordCount.jar --port 9000

3.测试jar,输入字符

注:1. 部署启动遇到的jar缺失

注释:jar可以下载源码查看,方法如图所示,也可以根据错误信息搜索对应的包

附: mysql+mq+mybatis +spring 需要的包

2,.代码

package com.javaland.flink.mq;import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;public class Mq2Flink {/*** 实时监控mq数据,插入到mysql数据库*/public static void mq2mysql() throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setPort(5672).setUserName("test").setPassword("test").setVirtualHost("/").build();final DataStream<String> stream = env.addSource(new RMQSource<String>( connectionConfig, "task_queue", true, new SimpleStringSchema())).setParallelism(1);stream.addSink(new SinkToMySQL());stream.print();env.execute("mq数据插入到mysql");}public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.noRestart());RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setPort(5672).setUserName("test").setPassword("test").setVirtualHost("/").build();final DataStream<String> stream = env.addSource(new RMQSource<String>( connectionConfig, "task_queue", true, new SimpleStringSchema())).setParallelism(1);stream.addSink(new SinkToMySQL());stream.print();env.execute("mq数据插入到mysql");}}
package com.javaland.flink.mq;import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import com.javaland.flink.mapper.MessageMapper;
import com.javaland.flink.po.MessagePO;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.ibatis.datasource.pooled.PooledDataSource;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import java.util.List;public class SinkToMySQL extends RichSinkFunction<String> {static MybatisSqlSessionFactoryBean sqlSessionFactory;static SqlSessionFactory sessionFactory;static SqlSession sqlSession;static {sqlSessionFactory = new MybatisSqlSessionFactoryBean();// 配置多数据源PooledDataSource pooledDataSource = new PooledDataSource();pooledDataSource.setDriver("com.mysql.cj.jdbc.Driver");pooledDataSource.setUsername("root");pooledDataSource.setPassword("root");pooledDataSource.setUrl("jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai");sqlSessionFactory.setDataSource(pooledDataSource);try {sqlSessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mapper/*.xml"));sqlSessionFactory.setTypeAliasesPackage("com.javaland.flink.po");sessionFactory = sqlSessionFactory.getObject();} catch (Exception e) {throw new RuntimeException(e);}}/*** @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);sqlSession = sessionFactory.openSession();}/*** @throws Exception*/@Overridepublic void close() throws Exception {super.close();}/*** @param value* @param context*/@Overridepublic void invoke(String value, Context context) {MessageMapper messageMapper = sqlSession.getMapper(MessageMapper.class);MessagePO messagePO=new MessagePO();messagePO.setUsername(value);messageMapper.insert(messagePO);List<MessagePO> all = messageMapper.selectList(null);if(all!=null){for (int i = all.size() - 1; i >= 0; i--) {System.out.println("查询的message:"+all.get(i));}}}}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.javaland</groupId><artifactId>javaland</artifactId><version>0.0.1</version></parent><packaging>jar</packaging><groupId>org.javaland</groupId><artifactId>javaland-flink</artifactId><properties><flink.version>1.14.4</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>1.11.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-force-shading</artifactId><version>14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.11.1</version></dependency><dependency><groupId>commons-logging</groupId><artifactId>commons-logging</artifactId><version>1.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.31</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.18</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>2.4</version><executions><execution><id>mq2flink</id><phase>package</phase><goals><goal>jar</goal></goals><configuration><classifier>Mq2Flink</classifier><archive><manifestEntries><program-class>com.javaland.flink.Mq2Flink</program-class></manifestEntries></archive></configuration></execution><execution><id>Flink2Mq</id><phase>package</phase><goals><goal>jar</goal></goals><configuration><classifier>Flink2Mq</classifier><archive><manifestEntries><program-class>com.javaland.flink.Flink2Mq</program-class></manifestEntries></archive></configuration></execution></executions></plugin></plugins></build></project>

3. 最后打包的好处就是可以部署多个job

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/199597.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CentOS7安装部署Kafka with KRaft

文章目录 CentOS7安装部署Kafka with KRaft一、前言1.简介2.架构3.环境 二、正文1.部署服务器2.基础环境1&#xff09;主机名2&#xff09;Hosts文件3&#xff09;关闭防火墙4&#xff09;JDK 安装部署 3.单机部署1&#xff09;下载软件包2&#xff09;修改配置文件3&#xff0…

开源更安全? yum源配置/rpm 什么是SSH?

文章目录 1.开放源码有利于系统安全2.yum源配置&#xff0c;这一篇就够了&#xff01;(包括本地&#xff0c;网络&#xff0c;本地共享yum源)3.rpm包是什么4.SSH是什么意思&#xff1f;有什么功能&#xff1f; 1.开放源码有利于系统安全 开放源码有利于系统安全 2.yum源配置…

在线客服系统源码 聊天记录实时保存 附带完整的搭建教程

在线客服系统是一个企业网站进行网络营销的最重要的工具。企业进行网络宣传后&#xff0c;会有很多访客进入到网站&#xff0c;这时候网站就需要有在线客服人员进行接待&#xff0c;及时的与访客进行沟通&#xff0c;才能留住访客&#xff0c;变流量为销量。 在线客服系统可以…

synchronized锁膨胀过程

轻量级锁&#xff1a; 使用场景&#xff1a;如果一个对象虽然有多线程要加锁&#xff0c;但加锁的时间是错开的&#xff08;也就是没有竞争&#xff09;&#xff0c;那么可以 使用轻量级锁来优化。 轻量级锁原理 1.创建锁记录&#xff08;Lock Record&#xff09;对象&#…

十一、统一网关GateWay(搭建网关、过滤器、跨越解决)

目录 一、网关技术的实现 在SpringCloud中网关的实现包括两种: 作用&#xff1a; 二、搭建网关服务 1、新建模块&#xff0c;并添加依赖 2、新建Gateway包&#xff0c;并编写启动类 3、编写yml文件 4、启动服务&#xff0c;并在网页内测试 5、步骤 三、路由断言工厂 …

解决龙芯loongarch64服务器编译安装Python后yum命令无法使用的问题“no module named ‘dnf‘”

引言 在使用Linux系统时,我们经常会使用yum来管理软件包。然而,有时候我们可能会遇到yum不可用的情况,其中一个原因就是Python的问题。本文将介绍Python对yum可用性的影响,并提供解决方案。 问题引发 正常情况下,安装linux系统后,yum命令是可用状态,升级Python版本后,…

【AI】行业消息精选和分析(11月21日 星期二)

技术发展 &#x1f525; OpenAI 员工集体签署信件&#xff1a; - 员工要求董事会辞职并重新任命首席执行官奥特曼。 - 否则可能集体加入微软。 昨天就玩我们领导发言&#xff0c;后面大家接龙收到的那一套了。 &#x1f632; 奥特曼加入微软引发猜测&#xff1a; - 对于一个公…

抖音电商双11官方数据最全汇总!

11月13日&#xff0c;抖音电商数据发布“抖音商城双11好物节”数据报告&#xff0c;展现双11期间平台全域经营情况及大众消费趋势。 报告显示&#xff0c;10月20日至11月11日&#xff0c;抖音电商里的直播间累计直播时长达到5827万小时&#xff0c;挂购物车的短视频播放了1697亿…

FTX的前世今生:崛起、辉煌与崩塌

FTX&#xff0c;一度被誉为加密货币领域的明星交易所&#xff0c;其快速的崛起和令人瞩目的崩塌吸引了全球的关注。让我们回顾一下FTX的前世今生&#xff0c;了解其短暂的辉煌和骤然的崩塌。 1. 崛起&#xff1a; FTX的创始人山姆班克曼-弗里德在加密货币领域具有深厚的背景和…

DataFunSummit:2023年数据基础架构峰会-核心PPT资料下载

一、峰会简介 正如From、Join、排序等是SQL的基本算子&#xff0c;存储与计算是也是数据架构中数据生产与消费的基本算子&#xff0c;对于数据架构之下的技术栈层级&#xff0c;我们可将其定义为数据基础架构。 数据存储技术在适应大数据时代的规模需求基础之上&#xff0c;持…

海外服务器相较于国内服务器有何特点?亚马逊海外服务器为何零跑全球

随着数字时代的迅猛发展&#xff0c;云计算基础设施的重要性愈发凸显。在这个信息爆炸的全球化时代&#xff0c;很多企业的海外业务的成功往往取决于是否拥有安全、可靠、高性能、可扩展、灵活且全球覆盖的云基础设施&#xff0c;因此对很多企业来说&#xff0c;选择一款优质的…

如何给shopify motion主题的产品系列添加description

一、Description是什么 Description是一种HTML标签类型&#xff0c;通过指定Description的内容&#xff0c;可以帮助搜索引擎以及用户更好的理解当前网页包含的主要了内容。 二、Description有什么作用 1、基本作用&#xff0c;对于网站和网页做一个简单的说明。 2、吸引点击&…

没收到Win11 23H2正式版的推送怎么升级到23H2

没收到Win11 23H2正式版的推送怎么升级到23H2&#xff1f;用户反映自己没有收到Win11 23H2正式版的更新推送&#xff0c;又想升级为23H2版本。接下来小编给大家详细介绍不同的升级方法&#xff0c;帮助更多的用户完成Win11 23H2系统的更新&#xff0c;升级后就能体验到Win11 23…

Linux常用命令——builtin命令

在线Linux命令查询工具 builtin 执行shell内部命令 补充说明 builtin命令用于执行指定的shell内部命令&#xff0c;并返回内部命令的返回值。builtin命令在使用时&#xff0c;将不能够再使用Linux中的外部命令。当系统中定义了与shell内部命令相同的函数时&#xff0c;使用…

细思极恐!5秒钟克隆你的声音

Mocking Bird 是开发者 babysor 开源的比较火的 AI 拟声开源项目&#xff0c;目前在 GitHub 已经获得了 32K 的 Star&#xff0c;它能在 5 秒内克隆你的声音并生成任意语音内容&#xff0c;支持中文普通话。 01 功能特性 支持中文普通话拟声&#xff0c;并且在多个中文数据集…

第十一篇 基于JSP 技术的网上购书系统——产品类别管理、评论/留言管理、注册用户管理、新闻管理功能实现(网上商城、仿淘宝、当当、亚马逊)

目录 1.产品类别管理 1.1功能说明 1.2界面设计 1.3处理流程 1.4数据来源和算法 1.4.1数据来源 1.4.2 查询条件 1.4.3相关sql实例 2. 评论/留言管理 2.1功能说明 2.2 界面设计 2.3处理流程 2.4数据来源和算法 2.4.1数据来源 2.4.2 查询条件 2.4.3相关sql实例…

机器学习第9天:决策树分类

文章目录 机器学习专栏 介绍 基本思想 使用代码 深度探索 优点 估计概率 训练算法 CART成本函数 实例数与不纯度 正则化 在鸢尾花数据集上训练决策树 机器学习专栏 机器学习_Nowl的博客-CSDN博客 介绍 作用&#xff1a;分类 原理&#xff1a;构建一个二叉树&#…

第28期 | GPTSecurity周报

GPTSecurity是一个涵盖了前沿学术研究和实践经验分享的社区&#xff0c;集成了生成预训练Transformer&#xff08;GPT&#xff09;、人工智能生成内容&#xff08;AIGC&#xff09;以及大型语言模型&#xff08;LLM&#xff09;等安全领域应用的知识。在这里&#xff0c;您可以…

【计算机网络笔记】IPv6简介

系列文章目录 什么是计算机网络&#xff1f; 什么是网络协议&#xff1f; 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 分组交换 vs 电路交换 计算机网络性能&#xff08;1&#xff09;——速率、带宽、延迟 计算机网络性能&#xff08;2&#xff09;…

时间序列预测实战(十七)PyTorch实现LSTM-GRU模型长期预测并可视化结果(附代码+数据集+详细讲解)

一、本文介绍 本文给大家带来的实战内容是利用PyTorch实现LSTM-GRU模型&#xff0c;LSTM和GRU都分别是RNN中最常用Cell之一&#xff0c;也都是时间序列预测中最常见的结构单元之一&#xff0c;本文的内容将会从实战的角度带你分析LSTM和GRU的机制和效果&#xff0c;同时如果你…